CMS 3D CMS Logo

EventProcessor.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_EventProcessor_h
2 #define FWCore_Framework_EventProcessor_h
3 
4 /*----------------------------------------------------------------------
5 
6 EventProcessor: This defines the 'framework application' object. It is
7 configured in the user's main() function, and is set running.
8 
9 ----------------------------------------------------------------------*/
10 
14 
23 
25 
30 
33 
35 
36 #include <map>
37 #include <memory>
38 #include <set>
39 #include <string>
40 #include <vector>
41 #include <exception>
42 #include <mutex>
43 
44 namespace edm {
45 
46  class ExceptionToActionTable;
47  class BranchIDListHelper;
48  class MergeableRunProductMetadata;
49  class ThinnedAssociationsHelper;
50  class EDLooperBase;
51  class HistoryAppender;
52  class ProcessDesc;
53  class SubProcess;
54  class WaitingTaskHolder;
56  class IOVSyncValue;
57 
58  namespace eventsetup {
59  class EventSetupProvider;
60  class EventSetupsController;
61  }
62 
64  public:
65 
66  // Status codes:
67  // 0 successful completion
68  // 1 exception of unknown type caught
69  // 2 everything else
70  // 3 signal received
71  // 4 input complete
72  // 5 call timed out
73  // 6 input count complete
74  enum StatusCode { epSuccess=0, epException=1, epOther=2, epSignal=3,
75  epInputComplete=4, epTimedOut=5, epCountComplete=6 };
76 
77  // The input 'parameterSet' contains the entire contents of a configuration file.
78  // Also allows the attachement of pre-existing services specified by 'token', and
79  // the specification of services by name only (defaultServices and forcedServices).
80  // 'defaultServices' are overridden by 'parameterSet'.
81  // 'forcedServices' the 'parameterSet'.
82  explicit EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
83  ServiceToken const& token = ServiceToken(),
85  std::vector<std::string> const& defaultServices = std::vector<std::string>(),
86  std::vector<std::string> const& forcedServices = std::vector<std::string>());
87 
88  // Same as previous constructor, but without a 'token'. Token will be defaulted.
89 
90  EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
91  std::vector<std::string> const& defaultServices,
92  std::vector<std::string> const& forcedServices = std::vector<std::string>());
93 
94  EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
95  ServiceToken const& token,
97 
98  ~EventProcessor();
99 
100  EventProcessor(EventProcessor const&) = delete; // Disallow copying and moving
101  EventProcessor& operator=(EventProcessor const&) = delete; // Disallow copying and moving
102 
107  void beginJob();
108 
112  void endJob();
113 
114  // -------------
115 
116  // Same as runToCompletion(false) but since it was used extensively
117  // outside of the framework (and is simpler) will keep
118  StatusCode run();
119 
122 
126 
127  std::vector<ModuleDescription const*>
128  getAllModuleDescriptions() const;
129 
130  ProcessConfiguration const& processConfiguration() const { return *processConfiguration_; }
131 
135  int totalEvents() const;
136 
139  int totalEventsPassed() const;
140 
143  int totalEventsFailed() const;
144 
147  void enableEndPaths(bool active);
148 
151  bool endPathsEnabled() const;
152 
155  void getTriggerReport(TriggerReport& rep) const;
156 
158  void clearCounters();
159 
160  // Really should not be public,
161  // but the EventFilter needs it for now.
163 
164  //------------------------------------------------------------------
165  //
166  // Nested classes and structs below.
167 
168  // The function "runToCompletion" will run until the job is "complete",
169  // which means:
170  // 1 - no more input data
171  // 2 - input maxEvents parameter limit reached
172  // 3 - output maxEvents parameter limit reached
173  // 4 - input maxLuminosityBlocks parameter limit reached
174  // 5 - looper directs processing to end
175  //
176  // The return values from the function are as follows:
177  // epSignal - processing terminated early, SIGUSR2 encountered
178  // epCountComplete - "runEventCount" processed the number of events
179  // requested by the argument
180  // epSuccess - all other cases
181  //
182  StatusCode runToCompletion();
183 
184  // The following functions are used by the code implementing
185  // transition handling.
186 
187  InputSource::ItemType nextTransitionType();
188  InputSource::ItemType lastTransitionType() const { if(deferredExceptionPtrIsSet_) {return InputSource::IsStop;}
189  return lastSourceTransition_;}
190  std::pair<edm::ProcessHistoryID, edm::RunNumber_t> nextRunID();
191  edm::LuminosityBlockNumber_t nextLuminosityBlockID();
192 
193  void readFile();
194  void closeInputFile(bool cleaningUpAfterException);
195  void openOutputFiles();
196  void closeOutputFiles();
197 
198  void respondToOpenInputFile();
199  void respondToCloseInputFile();
200 
201  void startingNewLoop();
202  bool endOfLoop();
203  void rewindInput();
204  void prepareForNextLoop();
205  bool shouldWeCloseOutput() const;
206 
207  void doErrorStuff();
208 
209  void beginRun(ProcessHistoryID const& phid, RunNumber_t run, bool& globalBeginSucceeded,
210  bool& eventSetupForInstanceSucceeded);
211  void endRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException);
212  void endUnfinishedRun(ProcessHistoryID const& phid, RunNumber_t run,
213  bool globalBeginSucceeded, bool cleaningUpAfterException,
214  bool eventSetupForInstanceSucceeded);
215 
216  InputSource::ItemType processLumis(std::shared_ptr<void> const& iRunResource);
217  void endUnfinishedLumi();
218 
219  void beginLumiAsync(edm::IOVSyncValue const& iSyncValue,
220  std::shared_ptr<void> const& iRunResource,
221  edm::WaitingTaskHolder iHolder);
222  void continueLumiAsync(edm::WaitingTaskHolder iHolder);
223 
224  void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus);
225  void streamEndLumiAsync(edm::WaitingTaskHolder iTask,
226  unsigned int iStreamIndex,
227  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus);
228  std::pair<ProcessHistoryID,RunNumber_t> readRun();
229  std::pair<ProcessHistoryID,RunNumber_t> readAndMergeRun();
230  void readLuminosityBlock(LuminosityBlockProcessingStatus&);
231  int readAndMergeLumi(LuminosityBlockProcessingStatus&);
232  void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const& phid, RunNumber_t run, MergeableRunProductMetadata const*);
233  void deleteRunFromCache(ProcessHistoryID const& phid, RunNumber_t run);
234  void writeLumiAsync(WaitingTaskHolder, std::shared_ptr<LuminosityBlockProcessingStatus> );
235  void deleteLumiFromCache(LuminosityBlockProcessingStatus&);
236 
237  bool shouldWeStop() const;
238 
239  void setExceptionMessageFiles(std::string& message);
240  void setExceptionMessageRuns(std::string& message);
241  void setExceptionMessageLumis(std::string& message);
242 
243  bool setDeferredException(std::exception_ptr);
244 
245  private:
246  //------------------------------------------------------------------
247  //
248  // Now private functions.
249  // init() is used by only by constructors
250  void init(std::shared_ptr<ProcessDesc>& processDesc,
251  ServiceToken const& token,
253 
254  bool readNextEventForStream(unsigned int iStreamIndex,
255  LuminosityBlockProcessingStatus& iLumiStatus);
256 
257  void handleNextEventForStreamAsync(WaitingTaskHolder iTask,
258  unsigned int iStreamIndex);
259 
260 
261  //read the next event using Stream iStreamIndex
262  void readEvent(unsigned int iStreamIndex);
263 
264  //process the already read event using Stream iStreamIndex
265  void processEventAsync(WaitingTaskHolder iHolder,
266  unsigned int iStreamIndex);
267 
268  void processEventAsyncImpl(WaitingTaskHolder iHolder,
269  unsigned int iStreamIndex);
270 
271  //returns true if an asynchronous stop was requested
272  bool checkForAsyncStopRequest(StatusCode&);
273 
274  void processEventWithLooper(EventPrincipal&);
275 
276  std::shared_ptr<ProductRegistry const> preg() const {return get_underlying_safe(preg_);}
277  std::shared_ptr<ProductRegistry>& preg() {return get_underlying_safe(preg_);}
278  std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {return get_underlying_safe(branchIDListHelper_);}
279  std::shared_ptr<BranchIDListHelper>& branchIDListHelper() {return get_underlying_safe(branchIDListHelper_);}
280  std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {return get_underlying_safe(thinnedAssociationsHelper_);}
281  std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {return get_underlying_safe(thinnedAssociationsHelper_);}
282  std::shared_ptr<EDLooperBase const> looper() const {return get_underlying_safe(looper_);}
283  std::shared_ptr<EDLooperBase>& looper() {return get_underlying_safe(looper_);}
284  //------------------------------------------------------------------
285  //
286  // Data members below.
287  // Are all these data members really needed? Some of them are used
288  // only during construction, and never again. If they aren't
289  // really needed, we should remove them.
290 
291  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
301  std::unique_ptr<ExceptionToActionTable const> act_table_;
302  std::shared_ptr<ProcessConfiguration const> processConfiguration_;
307  std::vector<edm::SerialTaskQueue> streamQueues_;
308  std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
309  std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;
310  std::atomic<unsigned int> streamLumiActive_{0}; //works as guard for streamLumiStatus
311 
312  std::vector<SubProcess> subProcesses_;
314 
317 
318  //The atomic protects concurrent access of deferredExceptionPtr_
319  std::atomic<bool> deferredExceptionPtrIsSet_;
320  std::exception_ptr deferredExceptionPtr_;
321 
323  std::shared_ptr<std::recursive_mutex> sourceMutex_;
334 
336 
339  bool firstEventInBlock_=true;
340 
341  typedef std::set<std::pair<std::string, std::string> > ExcludedData;
342  typedef std::map<std::string, ExcludedData> ExcludedDataMap;
344 
345  bool printDependencies_ = false;
346  }; // class EventProcessor
347 
348  //--------------------------------------------------------------------
349 
350  inline
353  return runToCompletion();
354  }
355 }
356 #endif
ProcessContext processContext_
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
ProcessConfiguration const & processConfiguration() const
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
int init
Definition: HydjetWrapper.h:67
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::set< std::pair< std::string, std::string > > ExcludedData
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:15
MergeableRunProductProcesses mergeableRunProductProcesses_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
std::shared_ptr< ProductRegistry const > preg() const
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1190
std::string exceptionMessageFiles_
std::shared_ptr< BranchIDListHelper > & branchIDListHelper()
std::shared_ptr< EDLooperBase > & looper()
InputSource::ItemType lastSourceTransition_
StatusCode asyncStopStatusCodeFromProcessingEvents_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::map< std::string, ExcludedData > ExcludedDataMap
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr< ProductRegistry > & preg()
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
HLT enums.
std::exception_ptr deferredExceptionPtr_
edm::SerialTaskQueue iovQueue_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
unsigned int RunNumber_t
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper()
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
std::shared_ptr< EDLooperBase const > looper() const
PrincipalCache principalCache_
def getToken(db, tag, since)