CMS 3D CMS Logo

EventProcessor.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_EventProcessor_h
2 #define FWCore_Framework_EventProcessor_h
3 
4 /*----------------------------------------------------------------------
5 
6 EventProcessor: This defines the 'framework application' object. It is
7 configured in the user's main() function, and is set running.
8 
9 ----------------------------------------------------------------------*/
10 
14 
22 
24 
29 
31 
32 #include <map>
33 #include <memory>
34 #include <set>
35 #include <string>
36 #include <vector>
37 #include <exception>
38 #include <mutex>
39 
40 namespace edm {
41 
42  class ExceptionToActionTable;
43  class BranchIDListHelper;
44  class ThinnedAssociationsHelper;
45  class EDLooperBase;
46  class HistoryAppender;
47  class ProcessDesc;
48  class SubProcess;
49  class WaitingTaskHolder;
50 
51  namespace eventsetup {
52  class EventSetupProvider;
53  class EventSetupsController;
54  }
55 
57  public:
58 
59  // Status codes:
60  // 0 successful completion
61  // 1 exception of unknown type caught
62  // 2 everything else
63  // 3 signal received
64  // 4 input complete
65  // 5 call timed out
66  // 6 input count complete
67  enum StatusCode { epSuccess=0, epException=1, epOther=2, epSignal=3,
68  epInputComplete=4, epTimedOut=5, epCountComplete=6 };
69 
70  // The input string 'config' contains the entire contents of a configuration file.
71  // Also allows the attachement of pre-existing services specified by 'token', and
72  // the specification of services by name only (defaultServices and forcedServices).
73  // 'defaultServices' are overridden by 'config'.
74  // 'forcedServices' the 'config'.
75  explicit EventProcessor(std::string const& config,
76  ServiceToken const& token = ServiceToken(),
78  std::vector<std::string> const& defaultServices = std::vector<std::string>(),
79  std::vector<std::string> const& forcedServices = std::vector<std::string>());
80 
81  // Same as previous constructor, but without a 'token'. Token will be defaulted.
82 
83  EventProcessor(std::string const& config,
84  std::vector<std::string> const& defaultServices,
85  std::vector<std::string> const& forcedServices = std::vector<std::string>());
86 
87  EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
88  ServiceToken const& token,
90 
92  EventProcessor(std::string const& config, bool isPython);
93 
94  ~EventProcessor();
95 
96  EventProcessor(EventProcessor const&) = delete; // Disallow copying and moving
97  EventProcessor& operator=(EventProcessor const&) = delete; // Disallow copying and moving
98 
103  void beginJob();
104 
108  void endJob();
109 
110  // -------------
111 
112  // Same as runToCompletion(false) but since it was used extensively
113  // outside of the framework (and is simpler) will keep
114  StatusCode run();
115 
118 
122 
123  std::vector<ModuleDescription const*>
124  getAllModuleDescriptions() const;
125 
126  ProcessConfiguration const& processConfiguration() const { return *processConfiguration_; }
127 
131  int totalEvents() const;
132 
135  int totalEventsPassed() const;
136 
139  int totalEventsFailed() const;
140 
143  void enableEndPaths(bool active);
144 
147  bool endPathsEnabled() const;
148 
151  void getTriggerReport(TriggerReport& rep) const;
152 
154  void clearCounters();
155 
156  // Really should not be public,
157  // but the EventFilter needs it for now.
159 
160  //------------------------------------------------------------------
161  //
162  // Nested classes and structs below.
163 
164  // The function "runToCompletion" will run until the job is "complete",
165  // which means:
166  // 1 - no more input data
167  // 2 - input maxEvents parameter limit reached
168  // 3 - output maxEvents parameter limit reached
169  // 4 - input maxLuminosityBlocks parameter limit reached
170  // 5 - looper directs processing to end
171  //
172  // The return values from the function are as follows:
173  // epSignal - processing terminated early, SIGUSR2 encountered
174  // epCountComplete - "runEventCount" processed the number of events
175  // requested by the argument
176  // epSuccess - all other cases
177  //
178  StatusCode runToCompletion();
179 
180  // The following functions are used by the code implementing
181  // transition handling.
182 
183  InputSource::ItemType nextTransitionType();
184  std::pair<edm::ProcessHistoryID, edm::RunNumber_t> nextRunID();
185  edm::LuminosityBlockNumber_t nextLuminosityBlockID();
186 
187  void readFile();
188  void closeInputFile(bool cleaningUpAfterException);
189  void openOutputFiles();
190  void closeOutputFiles();
191 
192  void respondToOpenInputFile();
193  void respondToCloseInputFile();
194 
195  void startingNewLoop();
196  bool endOfLoop();
197  void rewindInput();
198  void prepareForNextLoop();
199  bool shouldWeCloseOutput() const;
200 
201  void doErrorStuff();
202 
203  void beginRun(ProcessHistoryID const& phid, RunNumber_t run, bool& globalBeginSucceeded);
204  void endRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException);
205 
206  void beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool& globalBeginSucceeded);
207  void endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool globalBeginSucceeded, bool cleaningUpAfterException);
208 
209  std::pair<ProcessHistoryID,RunNumber_t> readRun();
210  std::pair<ProcessHistoryID,RunNumber_t> readAndMergeRun();
211  int readLuminosityBlock();
212  int readAndMergeLumi();
213  void writeRun(ProcessHistoryID const& phid, RunNumber_t run);
214  void deleteRunFromCache(ProcessHistoryID const& phid, RunNumber_t run);
215  void writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
216  void deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
217 
218  bool shouldWeStop() const;
219 
220  void setExceptionMessageFiles(std::string& message);
221  void setExceptionMessageRuns(std::string& message);
222  void setExceptionMessageLumis(std::string& message);
223 
224  bool setDeferredException(std::exception_ptr);
225 
226  InputSource::ItemType readAndProcessEvents();
227 
228  private:
229  //------------------------------------------------------------------
230  //
231  // Now private functions.
232  // init() is used by only by constructors
233  void init(std::shared_ptr<ProcessDesc>& processDesc,
234  ServiceToken const& token,
236 
237  bool readNextEventForStream(unsigned int iStreamIndex,
238  std::atomic<bool>* finishedProcessingEvents);
239 
240  void handleNextEventForStreamAsync(WaitingTaskHolder iTask,
241  unsigned int iStreamIndex,
242  std::atomic<bool>* finishedProcessingEvents);
243 
244 
245  //read the next event using Stream iStreamIndex
246  void readEvent(unsigned int iStreamIndex);
247 
248  //process the already read event using Stream iStreamIndex
249  void processEventAsync(WaitingTaskHolder iHolder,
250  unsigned int iStreamIndex);
251 
252  void processEventAsyncImpl(WaitingTaskHolder iHolder,
253  unsigned int iStreamIndex);
254 
255  //returns true if an asynchronous stop was requested
256  bool checkForAsyncStopRequest(StatusCode&);
257 
258  void processEventWithLooper(EventPrincipal&);
259 
260  std::shared_ptr<ProductRegistry const> preg() const {return get_underlying_safe(preg_);}
261  std::shared_ptr<ProductRegistry>& preg() {return get_underlying_safe(preg_);}
262  std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {return get_underlying_safe(branchIDListHelper_);}
263  std::shared_ptr<BranchIDListHelper>& branchIDListHelper() {return get_underlying_safe(branchIDListHelper_);}
264  std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {return get_underlying_safe(thinnedAssociationsHelper_);}
265  std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {return get_underlying_safe(thinnedAssociationsHelper_);}
266  std::shared_ptr<EDLooperBase const> looper() const {return get_underlying_safe(looper_);}
267  std::shared_ptr<EDLooperBase>& looper() {return get_underlying_safe(looper_);}
268  //------------------------------------------------------------------
269  //
270  // Data members below.
271  // Are all these data members really needed? Some of them are used
272  // only during construction, and never again. If they aren't
273  // really needed, we should remove them.
274 
275  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
283  std::unique_ptr<ExceptionToActionTable const> act_table_;
284  std::shared_ptr<ProcessConfiguration const> processConfiguration_;
288  std::vector<SubProcess> subProcesses_;
290 
293 
294  //The atomic protects concurrent access of deferredExceptionPtr_
295  std::atomic<bool> deferredExceptionPtrIsSet_;
296  std::exception_ptr deferredExceptionPtr_;
297 
299  std::shared_ptr<std::recursive_mutex> sourceMutex_;
310 
312 
314  std::atomic<InputSource::ItemType> nextItemTypeFromProcessingEvents_;
316  bool firstEventInBlock_=true;
317 
318  typedef std::set<std::pair<std::string, std::string> > ExcludedData;
319  typedef std::map<std::string, ExcludedData> ExcludedDataMap;
321 
322  bool printDependencies_ = false;
323  }; // class EventProcessor
324 
325  //--------------------------------------------------------------------
326 
327  inline
330  return runToCompletion();
331  }
332 }
333 #endif
ProcessContext processContext_
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
ProcessConfiguration const & processConfiguration() const
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
int init
Definition: HydjetWrapper.h:67
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::set< std::pair< std::string, std::string > > ExcludedData
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:15
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
std::shared_ptr< ProductRegistry const > preg() const
std::atomic< InputSource::ItemType > nextItemTypeFromProcessingEvents_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1188
std::string exceptionMessageFiles_
std::shared_ptr< BranchIDListHelper > & branchIDListHelper()
std::shared_ptr< EDLooperBase > & looper()
StatusCode asyncStopStatusCodeFromProcessingEvents_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::map< std::string, ExcludedData > ExcludedDataMap
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr< ProductRegistry > & preg()
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
HLT enums.
std::exception_ptr deferredExceptionPtr_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
unsigned int RunNumber_t
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper()
std::shared_ptr< EDLooperBase const > looper() const
PrincipalCache principalCache_
def getToken(db, tag, since)