CMS 3D CMS Logo

EventProcessor.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_EventProcessor_h
2 #define FWCore_Framework_EventProcessor_h
3 
4 /*----------------------------------------------------------------------
5 
6 EventProcessor: This defines the 'framework application' object. It is
7 configured in the user's main() function, and is set running.
8 
9 ----------------------------------------------------------------------*/
10 
14 
23 
25 
30 
33 
35 
36 #include <map>
37 #include <memory>
38 #include <set>
39 #include <string>
40 #include <vector>
41 #include <exception>
42 #include <mutex>
43 
44 namespace edm {
45 
46  class ExceptionToActionTable;
47  class BranchIDListHelper;
48  class MergeableRunProductMetadata;
49  class ThinnedAssociationsHelper;
50  class EDLooperBase;
51  class HistoryAppender;
52  class ProcessDesc;
53  class SubProcess;
54  class WaitingTaskHolder;
55  class LuminosityBlockPrincipal;
57  class IOVSyncValue;
58 
59  namespace eventsetup {
60  class EventSetupProvider;
61  class EventSetupsController;
62  } // namespace eventsetup
63 
65  public:
66  // Status codes:
67  // 0 successful completion
68  // 1 exception of unknown type caught
69  // 2 everything else
70  // 3 signal received
71  // 4 input complete
72  // 5 call timed out
73  // 6 input count complete
74  enum StatusCode {
75  epSuccess = 0,
76  epException = 1,
77  epOther = 2,
78  epSignal = 3,
79  epInputComplete = 4,
80  epTimedOut = 5,
81  epCountComplete = 6
82  };
83 
84  // The input 'parameterSet' contains the entire contents of a configuration file.
85  // Also allows the attachement of pre-existing services specified by 'token', and
86  // the specification of services by name only (defaultServices and forcedServices).
87  // 'defaultServices' are overridden by 'parameterSet'.
88  // 'forcedServices' the 'parameterSet'.
89  explicit EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
90  ServiceToken const& token = ServiceToken(),
92  std::vector<std::string> const& defaultServices = std::vector<std::string>(),
93  std::vector<std::string> const& forcedServices = std::vector<std::string>());
94 
95  // Same as previous constructor, but without a 'token'. Token will be defaulted.
96 
97  EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
98  std::vector<std::string> const& defaultServices,
99  std::vector<std::string> const& forcedServices = std::vector<std::string>());
100 
101  EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
102  ServiceToken const& token,
104 
105  ~EventProcessor();
106 
107  EventProcessor(EventProcessor const&) = delete; // Disallow copying and moving
108  EventProcessor& operator=(EventProcessor const&) = delete; // Disallow copying and moving
109 
114  void beginJob();
115 
119  void endJob();
120 
121  // -------------
122 
123  // Same as runToCompletion(false) but since it was used extensively
124  // outside of the framework (and is simpler) will keep
125  StatusCode run();
126 
129 
133 
134  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
135 
136  ProcessConfiguration const& processConfiguration() const { return *processConfiguration_; }
137 
141  int totalEvents() const;
142 
145  int totalEventsPassed() const;
146 
149  int totalEventsFailed() const;
150 
153  void enableEndPaths(bool active);
154 
157  bool endPathsEnabled() const;
158 
161  void getTriggerReport(TriggerReport& rep) const;
162 
164  void clearCounters();
165 
166  // Really should not be public,
167  // but the EventFilter needs it for now.
169 
170  //------------------------------------------------------------------
171  //
172  // Nested classes and structs below.
173 
174  // The function "runToCompletion" will run until the job is "complete",
175  // which means:
176  // 1 - no more input data
177  // 2 - input maxEvents parameter limit reached
178  // 3 - output maxEvents parameter limit reached
179  // 4 - input maxLuminosityBlocks parameter limit reached
180  // 5 - looper directs processing to end
181  //
182  // The return values from the function are as follows:
183  // epSignal - processing terminated early, SIGUSR2 encountered
184  // epCountComplete - "runEventCount" processed the number of events
185  // requested by the argument
186  // epSuccess - all other cases
187  //
188  StatusCode runToCompletion();
189 
190  // The following functions are used by the code implementing
191  // transition handling.
192 
193  InputSource::ItemType nextTransitionType();
195  if (deferredExceptionPtrIsSet_) {
196  return InputSource::IsStop;
197  }
198  return lastSourceTransition_;
199  }
200  std::pair<edm::ProcessHistoryID, edm::RunNumber_t> nextRunID();
201  edm::LuminosityBlockNumber_t nextLuminosityBlockID();
202 
203  void readFile();
204  void closeInputFile(bool cleaningUpAfterException);
205  void openOutputFiles();
206  void closeOutputFiles();
207 
208  void respondToOpenInputFile();
209  void respondToCloseInputFile();
210 
211  void startingNewLoop();
212  bool endOfLoop();
213  void rewindInput();
214  void prepareForNextLoop();
215  bool shouldWeCloseOutput() const;
216 
217  void doErrorStuff();
218 
219  void beginRun(ProcessHistoryID const& phid,
221  bool& globalBeginSucceeded,
222  bool& eventSetupForInstanceSucceeded);
223  void endRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException);
224  void endUnfinishedRun(ProcessHistoryID const& phid,
225  RunNumber_t run,
226  bool globalBeginSucceeded,
227  bool cleaningUpAfterException,
228  bool eventSetupForInstanceSucceeded);
229 
230  InputSource::ItemType processLumis(std::shared_ptr<void> const& iRunResource);
231  void endUnfinishedLumi();
232 
233  void beginLumiAsync(edm::IOVSyncValue const& iSyncValue,
234  std::shared_ptr<void> const& iRunResource,
235  edm::WaitingTaskHolder iHolder);
236  void continueLumiAsync(edm::WaitingTaskHolder iHolder);
237 
238  void handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder);
239  void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus);
240  void streamEndLumiAsync(edm::WaitingTaskHolder iTask,
241  unsigned int iStreamIndex,
242  std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus);
243  std::pair<ProcessHistoryID, RunNumber_t> readRun();
244  std::pair<ProcessHistoryID, RunNumber_t> readAndMergeRun();
245  void readLuminosityBlock(LuminosityBlockProcessingStatus&);
246  int readAndMergeLumi(LuminosityBlockProcessingStatus&);
247  void writeRunAsync(WaitingTaskHolder,
248  ProcessHistoryID const& phid,
249  RunNumber_t run,
251  void deleteRunFromCache(ProcessHistoryID const& phid, RunNumber_t run);
252  void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal& lumiPrincipal);
253  void deleteLumiFromCache(LuminosityBlockProcessingStatus&);
254 
255  bool shouldWeStop() const;
256 
257  void setExceptionMessageFiles(std::string& message);
258  void setExceptionMessageRuns(std::string& message);
259  void setExceptionMessageLumis();
260 
261  bool setDeferredException(std::exception_ptr);
262 
263  private:
264  //------------------------------------------------------------------
265  //
266  // Now private functions.
267  // init() is used by only by constructors
268  void init(std::shared_ptr<ProcessDesc>& processDesc, ServiceToken const& token, serviceregistry::ServiceLegacy);
269 
270  bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus& iLumiStatus);
271 
272  void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex);
273 
274  //read the next event using Stream iStreamIndex
275  void readEvent(unsigned int iStreamIndex);
276 
277  //process the already read event using Stream iStreamIndex
278  void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
279 
280  void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
281 
282  //returns true if an asynchronous stop was requested
283  bool checkForAsyncStopRequest(StatusCode&);
284 
285  void processEventWithLooper(EventPrincipal&);
286 
287  std::shared_ptr<ProductRegistry const> preg() const { return get_underlying_safe(preg_); }
288  std::shared_ptr<ProductRegistry>& preg() { return get_underlying_safe(preg_); }
289  std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
290  return get_underlying_safe(branchIDListHelper_);
291  }
292  std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
293  std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
294  return get_underlying_safe(thinnedAssociationsHelper_);
295  }
296  std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
297  return get_underlying_safe(thinnedAssociationsHelper_);
298  }
299  std::shared_ptr<EDLooperBase const> looper() const { return get_underlying_safe(looper_); }
300  std::shared_ptr<EDLooperBase>& looper() { return get_underlying_safe(looper_); }
301 
302  void warnAboutModulesRequiringLuminosityBLockSynchronization() const;
303  //------------------------------------------------------------------
304  //
305  // Data members below.
306  // Are all these data members really needed? Some of them are used
307  // only during construction, and never again. If they aren't
308  // really needed, we should remove them.
309 
310  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
320  std::unique_ptr<ExceptionToActionTable const> act_table_;
321  std::shared_ptr<ProcessConfiguration const> processConfiguration_;
326  std::vector<edm::SerialTaskQueue> streamQueues_;
327  std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
328  std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;
329  std::atomic<unsigned int> streamLumiActive_{0}; //works as guard for streamLumiStatus
330 
331  std::vector<SubProcess> subProcesses_;
333 
336 
337  //The atomic protects concurrent access of deferredExceptionPtr_
338  std::atomic<bool> deferredExceptionPtrIsSet_;
339  std::exception_ptr deferredExceptionPtr_;
340 
342  std::shared_ptr<std::recursive_mutex> sourceMutex_;
349  std::atomic<bool> exceptionMessageLumis_;
353 
355 
358  bool firstEventInBlock_ = true;
359 
360  typedef std::set<std::pair<std::string, std::string>> ExcludedData;
361  typedef std::map<std::string, ExcludedData> ExcludedDataMap;
363 
364  bool printDependencies_ = false;
365  }; // class EventProcessor
366 
367  //--------------------------------------------------------------------
368 
369  inline EventProcessor::StatusCode EventProcessor::run() { return runToCompletion(); }
370 } // namespace edm
371 #endif
std::atomic< bool > exceptionMessageLumis_
ProcessContext processContext_
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
ProcessConfiguration const & processConfiguration() const
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
std::unique_ptr< edm::LimitedTaskQueue > lumiQueue_
int init
Definition: HydjetWrapper.h:67
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
std::set< std::pair< std::string, std::string > > ExcludedData
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:14
MergeableRunProductProcesses mergeableRunProductProcesses_
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::shared_ptr< ProductRegistry const > preg() const
std::vector< edm::SerialTaskQueue > streamQueues_
InputSource::ItemType lastTransitionType() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1190
std::string exceptionMessageFiles_
std::shared_ptr< BranchIDListHelper > & branchIDListHelper()
std::shared_ptr< EDLooperBase > & looper()
InputSource::ItemType lastSourceTransition_
StatusCode asyncStopStatusCodeFromProcessingEvents_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::map< std::string, ExcludedData > ExcludedDataMap
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
std::vector< std::shared_ptr< LuminosityBlockProcessingStatus > > streamLumiStatus_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr< ProductRegistry > & preg()
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
HLT enums.
std::exception_ptr deferredExceptionPtr_
edm::SerialTaskQueue iovQueue_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
unsigned int RunNumber_t
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper()
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
std::shared_ptr< EDLooperBase const > looper() const
PrincipalCache principalCache_
def getToken(db, tag, since)