CMS 3D CMS Logo

EventProcessor.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_EventProcessor_h
2 #define FWCore_Framework_EventProcessor_h
3 
4 /*----------------------------------------------------------------------
5 
6 EventProcessor: This defines the 'framework application' object. It is
7 configured in the user's main() function, and is set running.
8 
9 ----------------------------------------------------------------------*/
10 
14 
24 
26 
31 
34 
37 
38 #include <map>
39 #include <memory>
40 #include <set>
41 #include <string>
42 #include <vector>
43 #include <exception>
44 #include <mutex>
45 
46 namespace edm {
47 
48  class ExceptionToActionTable;
49  class BranchIDListHelper;
50  class MergeableRunProductMetadata;
51  class ThinnedAssociationsHelper;
52  class EDLooperBase;
53  class HistoryAppender;
54  class ProcessDesc;
55  class SubProcess;
56  class WaitingTaskHolder;
57  class LuminosityBlockPrincipal;
59  class IOVSyncValue;
60 
61  namespace eventsetup {
62  class EventSetupProvider;
63  class EventSetupsController;
64  } // namespace eventsetup
65 
67  public:
68  // Status codes:
69  // 0 successful completion
70  // 1 exception of unknown type caught
71  // 2 everything else
72  // 3 signal received
73  // 4 input complete
74  // 5 call timed out
75  // 6 input count complete
76  enum StatusCode {
77  epSuccess = 0,
79  epOther = 2,
80  epSignal = 3,
84  };
85 
86  // The input 'parameterSet' contains the entire contents of a configuration file.
87  // Also allows the attachement of pre-existing services specified by 'token', and
88  // the specification of services by name only (defaultServices and forcedServices).
89  // 'defaultServices' are overridden by 'parameterSet'.
90  // 'forcedServices' the 'parameterSet'.
91  explicit EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
92  ServiceToken const& token = ServiceToken(),
94  std::vector<std::string> const& defaultServices = std::vector<std::string>(),
95  std::vector<std::string> const& forcedServices = std::vector<std::string>());
96 
97  // Same as previous constructor, but without a 'token'. Token will be defaulted.
98 
99  EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
100  std::vector<std::string> const& defaultServices,
101  std::vector<std::string> const& forcedServices = std::vector<std::string>());
102 
103  EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
104  ServiceToken const& token,
106 
107  ~EventProcessor();
108 
109  EventProcessor(EventProcessor const&) = delete; // Disallow copying and moving
110  EventProcessor& operator=(EventProcessor const&) = delete; // Disallow copying and moving
111 
112  void taskCleanup();
113 
118  void beginJob();
119 
123  void endJob();
124 
125  // -------------
126 
127  // Same as runToCompletion(false) but since it was used extensively
128  // outside of the framework (and is simpler) will keep
129  StatusCode run();
130 
133 
137 
138  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
139 
141 
145  int totalEvents() const;
146 
149  int totalEventsPassed() const;
150 
153  int totalEventsFailed() const;
154 
156  void clearCounters();
157 
158  // Really should not be public,
159  // but the EventFilter needs it for now.
161 
162  //------------------------------------------------------------------
163  //
164  // Nested classes and structs below.
165 
166  // The function "runToCompletion" will run until the job is "complete",
167  // which means:
168  // 1 - no more input data
169  // 2 - input maxEvents parameter limit reached
170  // 3 - output maxEvents parameter limit reached
171  // 4 - input maxLuminosityBlocks parameter limit reached
172  // 5 - looper directs processing to end
173  //
174  // The return values from the function are as follows:
175  // epSignal - processing terminated early, SIGUSR2 encountered
176  // epCountComplete - "runEventCount" processed the number of events
177  // requested by the argument
178  // epSuccess - all other cases
179  //
181 
182  // The following functions are used by the code implementing
183  // transition handling.
184 
188  return InputSource::IsStop;
189  }
190  return lastSourceTransition_;
191  }
192  std::pair<edm::ProcessHistoryID, edm::RunNumber_t> nextRunID();
194 
195  void readFile();
196  bool fileBlockValid() { return fb_.get() != nullptr; }
197  void closeInputFile(bool cleaningUpAfterException);
198  void openOutputFiles();
199  void closeOutputFiles();
200 
201  void respondToOpenInputFile();
203 
204  void startingNewLoop();
205  bool endOfLoop();
206  void rewindInput();
207  void prepareForNextLoop();
208  bool shouldWeCloseOutput() const;
209 
210  void doErrorStuff();
211 
212  void beginProcessBlock(bool& beginProcessBlockSucceeded);
213  void inputProcessBlocks();
214  void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded);
215 
216  void beginRun(ProcessHistoryID const& phid,
218  bool& globalBeginSucceeded,
219  bool& eventSetupForInstanceSucceeded);
220  void endRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException);
221  void endUnfinishedRun(ProcessHistoryID const& phid,
223  bool globalBeginSucceeded,
224  bool cleaningUpAfterException,
225  bool eventSetupForInstanceSucceeded);
226 
227  InputSource::ItemType processLumis(std::shared_ptr<void> const& iRunResource);
228  void endUnfinishedLumi();
229 
230  void beginLumiAsync(edm::IOVSyncValue const& iSyncValue,
231  std::shared_ptr<void> const& iRunResource,
232  edm::WaitingTaskHolder iHolder);
234 
235  void handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder);
236  void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus);
237  void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex);
239  std::pair<ProcessHistoryID, RunNumber_t> readRun();
240  std::pair<ProcessHistoryID, RunNumber_t> readAndMergeRun();
246  ProcessHistoryID const& phid,
252 
253  bool shouldWeStop() const;
254 
255  void setExceptionMessageFiles(std::string& message);
256  void setExceptionMessageRuns(std::string& message);
258 
259  bool setDeferredException(std::exception_ptr);
260 
261  private:
262  //------------------------------------------------------------------
263  //
264  // Now private functions.
265  // init() is used by only by constructors
266  void init(std::shared_ptr<ProcessDesc>& processDesc, ServiceToken const& token, serviceregistry::ServiceLegacy);
267 
268  bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus& iLumiStatus);
269 
270  void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex);
271 
272  //read the next event using Stream iStreamIndex
273  void readEvent(unsigned int iStreamIndex);
274 
275  //process the already read event using Stream iStreamIndex
276  void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
277 
278  void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
279 
280  //returns true if an asynchronous stop was requested
282 
283  void processEventWithLooper(EventPrincipal&, unsigned int iStreamIndex);
284 
285  std::shared_ptr<ProductRegistry const> preg() const { return get_underlying_safe(preg_); }
286  std::shared_ptr<ProductRegistry>& preg() { return get_underlying_safe(preg_); }
287  std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
289  }
290  std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
291  std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
293  }
294  std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
296  }
297  std::shared_ptr<EDLooperBase const> looper() const { return get_underlying_safe(looper_); }
298  std::shared_ptr<EDLooperBase>& looper() { return get_underlying_safe(looper_); }
299 
301  void warnAboutLegacyModules() const;
302  //------------------------------------------------------------------
303  //
304  // Data members below.
305  // Are all these data members really needed? Some of them are used
306  // only during construction, and never again. If they aren't
307  // really needed, we should remove them.
308 
309  //Guarantee that task group is the last to be destroyed
310  oneapi::tbb::task_group taskGroup_;
311 
312  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
323  std::unique_ptr<ExceptionToActionTable const> act_table_;
324  std::shared_ptr<ProcessConfiguration const> processConfiguration_;
329  std::vector<edm::SerialTaskQueue> streamQueues_;
330  std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
331  std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;
332  std::atomic<unsigned int> streamLumiActive_{0}; //works as guard for streamLumiStatus
333 
334  std::vector<std::string> branchesToDeleteEarly_;
335 
336  std::vector<SubProcess> subProcesses_;
338 
341 
342  //The atomic protects concurrent access of deferredExceptionPtr_
343  std::atomic<bool> deferredExceptionPtrIsSet_;
344  std::exception_ptr deferredExceptionPtr_;
345 
347  std::shared_ptr<std::recursive_mutex> sourceMutex_;
354  std::atomic<bool> exceptionMessageLumis_;
358 
360 
363  bool firstEventInBlock_ = true;
364 
365  typedef std::set<std::pair<std::string, std::string>> ExcludedData;
366  typedef std::map<std::string, ExcludedData> ExcludedDataMap;
368 
369  bool printDependencies_ = false;
371  }; // class EventProcessor
372 
373  //--------------------------------------------------------------------
374 
376 } // namespace edm
377 #endif
std::atomic< bool > exceptionMessageLumis_
void readLuminosityBlock(LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
ProcessContext processContext_
bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus &iLumiStatus)
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void deleteRunFromCache(ProcessHistoryID const &phid, RunNumber_t run)
SharedResourcesAcquirer sourceResourcesAcquirer_
int totalEventsFailed() const
InputSource::ItemType nextTransitionType()
std::shared_ptr< ProductRegistry const > preg() const
void warnAboutLegacyModules() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< InputSource > > input_
void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr< LuminosityBlockProcessingStatus > iLumiStatus)
std::unique_ptr< ExceptionToActionTable const > act_table_
void beginRun(ProcessHistoryID const &phid, RunNumber_t run, bool &globalBeginSucceeded, bool &eventSetupForInstanceSucceeded)
void setExceptionMessageFiles(std::string &message)
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void clearCounters()
Clears counters used by trigger report.
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &phid, RunNumber_t run, MergeableRunProductMetadata const *)
std::shared_ptr< EDLooperBase const > looper() const
InputSource::ItemType lastTransitionType() const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::set< std::pair< std::string, std::string > > ExcludedData
std::string exceptionMessageRuns_
void deleteLumiFromCache(LuminosityBlockProcessingStatus &)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &lumiPrincipal)
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
MergeableRunProductProcesses mergeableRunProductProcesses_
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex)
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::vector< edm::SerialTaskQueue > streamQueues_
void setExceptionMessageRuns(std::string &message)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
void endRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void continueLumiAsync(edm::WaitingTaskHolder iHolder)
StatusCode runToCompletion()
EventProcessor & operator=(EventProcessor const &)=delete
void endUnfinishedRun(ProcessHistoryID const &phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException, bool eventSetupForInstanceSucceeded)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
InputSource::ItemType processLumis(std::shared_ptr< void > const &iRunResource)
std::string exceptionMessageFiles_
std::shared_ptr< BranchIDListHelper > & branchIDListHelper()
std::shared_ptr< EDLooperBase > & looper()
InputSource::ItemType lastSourceTransition_
StatusCode asyncStopStatusCodeFromProcessingEvents_
int readAndMergeLumi(LuminosityBlockProcessingStatus &)
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
std::map< std::string, ExcludedData > ExcludedDataMap
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
void beginLumiAsync(edm::IOVSyncValue const &iSyncValue, std::shared_ptr< void > const &iRunResource, edm::WaitingTaskHolder iHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
ProcessConfiguration const & processConfiguration() const
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr< ProductRegistry > & preg()
void handleEndLumiExceptions(std::exception_ptr const *iPtr, WaitingTaskHolder &holder)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
HLT enums.
void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex)
void closeInputFile(bool cleaningUpAfterException)
void readProcessBlock(ProcessBlockPrincipal &)
std::exception_ptr deferredExceptionPtr_
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
edm::LuminosityBlockNumber_t nextLuminosityBlockID()
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
unsigned int RunNumber_t
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
void beginProcessBlock(bool &beginProcessBlockSucceeded)
std::pair< ProcessHistoryID, RunNumber_t > readRun()
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper()
std::pair< ProcessHistoryID, RunNumber_t > readAndMergeRun()
PrincipalCache principalCache_
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
std::pair< edm::ProcessHistoryID, edm::RunNumber_t > nextRunID()