CMS 3D CMS Logo

EventProcessor.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_EventProcessor_h
2 #define FWCore_Framework_EventProcessor_h
3 
4 /*----------------------------------------------------------------------
5 
6 EventProcessor: This defines the 'framework application' object. It is
7 configured in the user's main() function, and is set running.
8 
9 ----------------------------------------------------------------------*/
10 
14 
24 
26 
31 
34 
37 
38 #include "oneapi/tbb/task_group.h"
39 
40 #include <atomic>
41 #include <map>
42 #include <memory>
43 #include <set>
44 #include <string>
45 #include <vector>
46 #include <exception>
47 #include <mutex>
48 
49 namespace edm {
50 
51  class ExceptionCollector;
52  class ExceptionToActionTable;
53  class BranchIDListHelper;
54  class MergeableRunProductMetadata;
55  class ThinnedAssociationsHelper;
56  class EDLooperBase;
57  class HistoryAppender;
58  class ProcessDesc;
59  class SubProcess;
60  class WaitingTaskHolder;
61  class LuminosityBlockPrincipal;
62  class LuminosityBlockProcessingStatus;
63  class RunProcessingStatus;
64  class IOVSyncValue;
65  class ModuleTypeResolverMaker;
66 
67  namespace eventsetup {
68  class EventSetupProvider;
69  class EventSetupsController;
70  } // namespace eventsetup
71 
73  public:
74  // Status codes:
75  // 0 successful completion
76  // 1 exception of unknown type caught
77  // 2 everything else
78  // 3 signal received
79  // 4 input complete
80  // 5 call timed out
81  // 6 input count complete
82  enum StatusCode {
83  epSuccess = 0,
85  epOther = 2,
86  epSignal = 3,
90  };
91 
92  // The input 'parameterSet' contains the entire contents of a configuration file.
93  // Also allows the attachement of pre-existing services specified by 'token', and
94  // the specification of services by name only (defaultServices and forcedServices).
95  // 'defaultServices' are overridden by 'parameterSet'.
96  // 'forcedServices' the 'parameterSet'.
97  explicit EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
98  ServiceToken const& token = ServiceToken(),
100  std::vector<std::string> const& defaultServices = std::vector<std::string>(),
101  std::vector<std::string> const& forcedServices = std::vector<std::string>());
102 
103  // Same as previous constructor, but without a 'token'. Token will be defaulted.
104 
105  EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
106  std::vector<std::string> const& defaultServices,
107  std::vector<std::string> const& forcedServices = std::vector<std::string>());
108 
109  EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
110  ServiceToken const& token,
112 
113  ~EventProcessor();
114 
115  EventProcessor(EventProcessor const&) = delete; // Disallow copying and moving
116  EventProcessor& operator=(EventProcessor const&) = delete; // Disallow copying and moving
117 
118  void taskCleanup();
119 
124  void beginJob();
125 
126  void beginStreams();
127 
128  void endStreams(ExceptionCollector&) noexcept;
129 
133  void endJob();
134 
135  // -------------
136 
137  // Same as runToCompletion(false) but since it was used extensively
138  // outside of the framework (and is simpler) will keep
139  StatusCode run();
140 
143 
147 
148  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
149 
151 
155  int totalEvents() const;
156 
159  int totalEventsPassed() const;
160 
163  int totalEventsFailed() const;
164 
166  void clearCounters();
167 
168  // Really should not be public,
169  // but the EventFilter needs it for now.
171 
172  //------------------------------------------------------------------
173  //
174  // Nested classes and structs below.
175 
176  // The function "runToCompletion" will run until the job is "complete",
177  // which means:
178  // 1 - no more input data
179  // 2 - input maxEvents parameter limit reached
180  // 3 - output maxEvents parameter limit reached
181  // 4 - input maxLuminosityBlocks parameter limit reached
182  // 5 - looper directs processing to end
183  //
184  // The return values from the function are as follows:
185  // epSignal - processing terminated early, SIGUSR2 encountered
186  // epCountComplete - "runEventCount" processed the number of events
187  // requested by the argument
188  // epSuccess - all other cases
189  //
191 
192  // The following functions are used by the code implementing
193  // transition handling.
194 
197  void nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder nextTask);
198 
199  void readFile();
200  bool fileBlockValid() { return fb_.get() != nullptr; }
201  void closeInputFile(bool cleaningUpAfterException);
202  void openOutputFiles();
203  void closeOutputFiles();
204 
205  void respondToOpenInputFile();
207 
208  void startingNewLoop();
209  bool endOfLoop();
210  void rewindInput();
211  void prepareForNextLoop();
212  bool shouldWeCloseOutput() const;
213 
214  void doErrorStuff();
215 
216  void beginProcessBlock(bool& beginProcessBlockSucceeded);
217  void inputProcessBlocks();
218  void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded);
219 
222  void streamBeginRunAsync(unsigned int iStream, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder) noexcept;
223  void releaseBeginRunResources(unsigned int iStream);
224  void endRunAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
225  void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const&);
226  void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr<RunProcessingStatus>);
227  void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex);
228  void endUnfinishedRun(bool cleaningUpAfterException);
229  void beginLumiAsync(IOVSyncValue const&, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
231  void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const&);
232  void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr<LuminosityBlockProcessingStatus>);
233  void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex);
234  void endUnfinishedLumi(bool cleaningUpAfterException);
236  std::shared_ptr<RunPrincipal> readRun();
238  std::shared_ptr<LuminosityBlockPrincipal> readLuminosityBlock(std::shared_ptr<RunPrincipal> rp);
246 
247  bool shouldWeStop() const;
248 
249  void setExceptionMessageFiles(std::string& message);
252 
253  bool setDeferredException(std::exception_ptr);
254 
255  private:
256  //------------------------------------------------------------------
257  //
258  // Now private functions.
259  // init() is used by only by constructors
260  void init(std::shared_ptr<ProcessDesc>& processDesc, ServiceToken const& token, serviceregistry::ServiceLegacy);
261 
262  void readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
263  void readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus>, WaitingTaskHolder);
264 
265  void handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
266 
267  bool readNextEventForStream(WaitingTaskHolder const&, unsigned int iStreamIndex, LuminosityBlockProcessingStatus&);
268 
269  void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex);
270 
271  //read the next event using Stream iStreamIndex
272  void readEvent(unsigned int iStreamIndex);
273 
274  //process the already read event using Stream iStreamIndex
275  void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
276 
277  void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
278 
279  //returns true if an asynchronous stop was requested
281 
282  void processEventWithLooper(EventPrincipal&, unsigned int iStreamIndex);
283 
284  std::shared_ptr<ProductRegistry const> preg() const { return get_underlying_safe(preg_); }
285  std::shared_ptr<ProductRegistry>& preg() { return get_underlying_safe(preg_); }
286  std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
288  }
289  std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
290  std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
292  }
293  std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
295  }
296  std::shared_ptr<EDLooperBase const> looper() const { return get_underlying_safe(looper_); }
297  std::shared_ptr<EDLooperBase>& looper() { return get_underlying_safe(looper_); }
298 
301 
302  bool needToCallNext() const { return needToCallNext_; }
304 
305  //------------------------------------------------------------------
306  //
307  // Data members below.
308  // Are all these data members really needed? Some of them are used
309  // only during construction, and never again. If they aren't
310  // really needed, we should remove them.
311 
312  //Guarantee that task group is the last to be destroyed
313  oneapi::tbb::task_group taskGroup_;
314 
315  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
327  std::unique_ptr<ExceptionToActionTable const> act_table_;
328  std::shared_ptr<ProcessConfiguration const> processConfiguration_;
333  std::vector<edm::SerialTaskQueue> streamQueues_;
335  std::unique_ptr<edm::LimitedTaskQueue> runQueue_;
336  std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
337  std::vector<std::shared_ptr<RunProcessingStatus>> streamRunStatus_;
338  std::shared_ptr<RunProcessingStatus> exceptionRunStatus_;
339  std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;
340  std::atomic<unsigned int> streamRunActive_{0}; //works as guard for streamRunStatus
341  std::atomic<unsigned int> streamLumiActive_{0}; //works as guard for streamLumiStatus
342 
343  std::vector<std::string> branchesToDeleteEarly_;
344  std::multimap<std::string, std::string> referencesToBranches_;
345  std::vector<std::string> modulesToIgnoreForDeleteEarly_;
346 
347  std::vector<SubProcess> subProcesses_;
349 
352 
353  //The atomic protects concurrent access of deferredExceptionPtr_
354  std::atomic<bool> deferredExceptionPtrIsSet_;
355  std::exception_ptr deferredExceptionPtr_;
356 
358  std::shared_ptr<std::recursive_mutex> sourceMutex_;
362  bool beginJobSucceeded_ = false;
366  std::atomic<bool> exceptionMessageRuns_;
367  std::atomic<bool> exceptionMessageLumis_;
371 
373 
374  bool firstEventInBlock_ = true;
375 
376  typedef std::set<std::pair<std::string, std::string>> ExcludedData;
377  typedef std::map<std::string, ExcludedData> ExcludedDataMap;
379 
380  bool printDependencies_ = false;
382  bool needToCallNext_ = true;
383  }; // class EventProcessor
384 
385  //--------------------------------------------------------------------
386 
388 } // namespace edm
389 #endif
std::atomic< bool > exceptionMessageLumis_
bool readNextEventForStream(WaitingTaskHolder const &, unsigned int iStreamIndex, LuminosityBlockProcessingStatus &)
void readEvent(unsigned int iStreamIndex)
void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex)
ProcessContext processContext_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
void clearRunPrincipal(RunProcessingStatus &)
void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr< RunProcessingStatus >)
SharedResourcesAcquirer sourceResourcesAcquirer_
void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex)
int totalEventsFailed() const
std::shared_ptr< ProductRegistry const > preg() const
void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
InputSource::ItemTypeInfo lastSourceTransition_
std::shared_ptr< RunPrincipal > readRun()
void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
SerialTaskQueue streamQueuesInserter_
void endUnfinishedRun(bool cleaningUpAfterException)
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder) noexcept
void setExceptionMessageFiles(std::string &message)
InputSource::ItemTypeInfo nextTransitionType()
void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex)
void processEventWithLooper(EventPrincipal &, unsigned int iStreamIndex)
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const &)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
void clearCounters()
Clears counters used by trigger report.
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
bool needToCallNext() const
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::shared_ptr< EDLooperBase const > looper() const
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::set< std::pair< std::string, std::string > > ExcludedData
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
PreallocationConfiguration preallocations_
edm::propagate_const< std::shared_ptr< FileBlock > > fb_
void endStreams(ExceptionCollector &) noexcept
std::vector< SubProcess > subProcesses_
std::atomic< bool > exceptionMessageRuns_
edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_
MergeableRunProductProcesses mergeableRunProductProcesses_
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
edm::propagate_const< std::unique_ptr< ModuleTypeResolverMaker const > > moduleTypeResolverMaker_
void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded)
std::multimap< std::string, std::string > referencesToBranches_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::vector< edm::SerialTaskQueue > streamQueues_
void nextTransitionTypeAsync(std::shared_ptr< RunProcessingStatus > iRunStatus, WaitingTaskHolder nextTask)
std::vector< std::string > modulesToIgnoreForDeleteEarly_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
std::unique_ptr< edm::LimitedTaskQueue > runQueue_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType)
void setNeedToCallNext(bool val)
StatusCode runToCompletion()
void clearLumiPrincipal(LuminosityBlockProcessingStatus &)
EventProcessor & operator=(EventProcessor const &)=delete
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
std::vector< std::shared_ptr< RunProcessingStatus > > streamRunStatus_
std::string exceptionMessageFiles_
void releaseBeginRunResources(unsigned int iStream)
std::shared_ptr< BranchIDListHelper > & branchIDListHelper()
std::shared_ptr< RunProcessingStatus > exceptionRunStatus_
std::shared_ptr< EDLooperBase > & looper()
void readAndMergeRun(RunProcessingStatus &)
void warnAboutModulesRequiringRunSynchronization() const
void beginLumiAsync(IOVSyncValue const &, std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void handleNextItemAfterMergingRunEntries(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
InputSource::ItemTypeInfo lastTransitionType() const
InputSource::ItemType processRuns()
oneapi::tbb::task_group taskGroup_
void throwAboutModulesRequiringLuminosityBlockSynchronization() const
std::map< std::string, ExcludedData > ExcludedDataMap
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
void endRunAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
bool shouldWeStop() const
ProcessConfiguration const & processConfiguration() const
std::atomic< unsigned int > streamRunActive_
void readAndMergeLumi(LuminosityBlockProcessingStatus &)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr< ProductRegistry > & preg()
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< std::string > branchesToDeleteEarly_
HLT enums.
void readAndMergeLumiEntriesAsync(std::shared_ptr< LuminosityBlockProcessingStatus >, WaitingTaskHolder)
void closeInputFile(bool cleaningUpAfterException)
void readAndMergeRunEntriesAsync(std::shared_ptr< RunProcessingStatus >, WaitingTaskHolder)
void readProcessBlock(ProcessBlockPrincipal &)
std::exception_ptr deferredExceptionPtr_
std::shared_ptr< LuminosityBlockPrincipal > readLuminosityBlock(std::shared_ptr< RunPrincipal > rp)
void endUnfinishedLumi(bool cleaningUpAfterException)
bool shouldWeCloseOutput() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< ProcessBlockHelper > > processBlockHelper_
void continueLumiAsync(WaitingTaskHolder)
void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr< LuminosityBlockProcessingStatus >)
std::shared_ptr< ActivityRegistry > actReg_
std::atomic< unsigned int > streamLumiActive_
void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex)
void beginProcessBlock(bool &beginProcessBlockSucceeded)
void beginRunAsync(IOVSyncValue const &, WaitingTaskHolder)
bool setDeferredException(std::exception_ptr)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
bool deleteNonConsumedUnscheduledModules_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper()
PrincipalCache principalCache_
EventProcessor(std::unique_ptr< ParameterSet > parameterSet, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())