CMS 3D CMS Logo

EventProcessor.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_EventProcessor_h
2 #define FWCore_Framework_EventProcessor_h
3 
4 /*----------------------------------------------------------------------
5 
6 EventProcessor: This defines the 'framework application' object. It is
7 configured in the user's main() function, and is set running.
8 
9 ----------------------------------------------------------------------*/
10 
14 
23 
25 
30 
32 
33 #include "boost/thread/condition.hpp"
34 
35 #include <map>
36 #include <memory>
37 #include <set>
38 #include <string>
39 #include <vector>
40 #include <exception>
41 #include <mutex>
42 
43 namespace statemachine {
44  class Machine;
45  class Run;
46 }
47 
48 namespace edm {
49 
50  class ExceptionToActionTable;
51  class BranchIDListHelper;
52  class ThinnedAssociationsHelper;
53  class EDLooperBase;
54  class HistoryAppender;
55  class ProcessDesc;
56  class SubProcess;
57  class WaitingTaskHolder;
58  class WaitingTask;
59 
60  namespace eventsetup {
61  class EventSetupProvider;
62  class EventSetupsController;
63  }
64 
66  public:
67 
68  // The input string 'config' contains the entire contents of a configuration file.
69  // Also allows the attachement of pre-existing services specified by 'token', and
70  // the specification of services by name only (defaultServices and forcedServices).
71  // 'defaultServices' are overridden by 'config'.
72  // 'forcedServices' override the 'config'.
73  explicit EventProcessor(std::string const& config,
74  ServiceToken const& token = ServiceToken(),
76  std::vector<std::string> const& defaultServices = std::vector<std::string>(),
77  std::vector<std::string> const& forcedServices = std::vector<std::string>());
78 
79  // Same as previous constructor, but without a 'token'. Token will be defaulted.
80 
81  EventProcessor(std::string const& config,
82  std::vector<std::string> const& defaultServices,
83  std::vector<std::string> const& forcedServices = std::vector<std::string>());
84 
85  EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
86  ServiceToken const& token,
88 
90  EventProcessor(std::string const& config, bool isPython);
91 
92  ~EventProcessor();
93 
94  EventProcessor(EventProcessor const&) = delete; // Disallow copying and moving
95  EventProcessor& operator=(EventProcessor const&) = delete; // Disallow copying and moving
96 
101  void beginJob();
102 
106  void endJob();
107 
108  // -------------
109 
110  // Same as runToCompletion(false) but since it was used extensively
111  // outside of the framework (and is simpler) will keep
112  StatusCode run();
113 
116 
120 
121  std::vector<ModuleDescription const*>
122  getAllModuleDescriptions() const;
123 
124  ProcessConfiguration const& processConfiguration() const { return *processConfiguration_; }
125 
129  int totalEvents() const;
130 
133  int totalEventsPassed() const;
134 
137  int totalEventsFailed() const;
138 
141  void enableEndPaths(bool active);
142 
145  bool endPathsEnabled() const;
146 
149  void getTriggerReport(TriggerReport& rep) const;
150 
152  void clearCounters();
153 
154  // Really should not be public,
155  // but the EventFilter needs it for now.
157 
158  //------------------------------------------------------------------
159  //
160  // Nested classes and structs below.
161 
162  // The function "runToCompletion" will run until the job is "complete",
163  // which means:
164  // 1 - no more input data
165  // 2 - input maxEvents parameter limit reached
166  // 3 - output maxEvents parameter limit reached
167  // 4 - input maxLuminosityBlocks parameter limit reached
168  // 5 - looper directs processing to end
169  //
170  // The return values from the function are as follows:
171  // epSignal - processing terminated early, SIGUSR2 encountered
172  // epCountComplete - "runEventCount" processed the number of events
173  // requested by the argument
174  // epSuccess - all other cases
175  //
176  virtual StatusCode runToCompletion() override;
177 
178  // The following functions are used by the code implementing our
179  // boost statemachine
180 
181  virtual void readFile() override;
182  virtual void closeInputFile(bool cleaningUpAfterException) override;
183  virtual void openOutputFiles() override;
184  virtual void closeOutputFiles() override;
185 
186  virtual void respondToOpenInputFile() override;
187  virtual void respondToCloseInputFile() override;
188 
189  virtual void startingNewLoop() override;
190  virtual bool endOfLoop() override;
191  virtual void rewindInput() override;
192  virtual void prepareForNextLoop() override;
193  virtual bool shouldWeCloseOutput() const override;
194 
195  virtual void doErrorStuff() override;
196 
197  virtual void beginRun(statemachine::Run const& run) override;
198  virtual void endRun(statemachine::Run const& run, bool cleaningUpAfterException) override;
199 
200  virtual void beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override;
201  virtual void endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException) override;
202 
203  virtual statemachine::Run readRun() override;
204  virtual statemachine::Run readAndMergeRun() override;
205  virtual int readLuminosityBlock() override;
206  virtual int readAndMergeLumi() override;
207  virtual void writeRun(statemachine::Run const& run) override;
208  virtual void deleteRunFromCache(statemachine::Run const& run) override;
209  virtual void writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override;
210  virtual void deleteLumiFromCache(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi) override;
211 
212  virtual void readAndProcessEvent() override;
213  virtual bool shouldWeStop() const override;
214 
215  virtual void setExceptionMessageFiles(std::string& message) override;
216  virtual void setExceptionMessageRuns(std::string& message) override;
217  virtual void setExceptionMessageLumis(std::string& message) override;
218 
219  virtual bool alreadyHandlingException() const override;
220 
221  //returns 'true' if this was a child and we should continue processing
222  bool forkProcess(std::string const& jobReportFile);
223 
224  private:
225  //------------------------------------------------------------------
226  //
227  // Now private functions.
228  // init() is used by only by constructors
229  void init(std::shared_ptr<ProcessDesc>& processDesc,
230  ServiceToken const& token,
232 
233  void terminateMachine(std::unique_ptr<statemachine::Machine>);
234  std::unique_ptr<statemachine::Machine> createStateMachine();
235 
236  void setupSignal();
237 
238  void possiblyContinueAfterForkChildFailure();
239 
240  bool readNextEventForStream(unsigned int iStreamIndex,
241  std::atomic<bool>* finishedProcessingEvents);
242 
243  void handleNextEventForStreamAsync(WaitingTask* iTask,
244  unsigned int iStreamIndex,
245  std::atomic<bool>* finishedProcessingEvents);
246 
247 
248  //read the next event using Stream iStreamIndex
249  void readEvent(unsigned int iStreamIndex);
250 
251  //process the already read event using Stream iStreamIndex
252  void processEventAsync(WaitingTaskHolder iHolder,
253  unsigned int iStreamIndex);
254 
255  //returns true if an asynchronous stop was requested
256  bool checkForAsyncStopRequest(StatusCode&);
257 
258  void processEventWithLooper(EventPrincipal&);
259 
260  std::shared_ptr<ProductRegistry const> preg() const {return get_underlying_safe(preg_);}
261  std::shared_ptr<ProductRegistry>& preg() {return get_underlying_safe(preg_);}
262  std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {return get_underlying_safe(branchIDListHelper_);}
263  std::shared_ptr<BranchIDListHelper>& branchIDListHelper() {return get_underlying_safe(branchIDListHelper_);}
264  std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {return get_underlying_safe(thinnedAssociationsHelper_);}
265  std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {return get_underlying_safe(thinnedAssociationsHelper_);}
266  std::shared_ptr<EDLooperBase const> looper() const {return get_underlying_safe(looper_);}
267  std::shared_ptr<EDLooperBase>& looper() {return get_underlying_safe(looper_);}
268  //------------------------------------------------------------------
269  //
270  // Data members below.
271  // Are all these data members really needed? Some of them are used
272  // only during construction, and never again. If they aren't
273  // really needed, we should remove them.
274 
275  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
283  std::unique_ptr<ExceptionToActionTable const> act_table_;
284  std::shared_ptr<ProcessConfiguration const> processConfiguration_;
288  std::vector<SubProcess> subProcesses_;
290 
293 
294  //The atomic protects concurrent access of deferredExceptionPtr_
295  std::atomic<bool> deferredExceptionPtrIsSet_;
296  std::exception_ptr deferredExceptionPtr_;
297 
299  std::shared_ptr<std::recursive_mutex> sourceMutex_;
313 
318 
320 
324  bool firstEventInBlock_=true;
325 
326  typedef std::set<std::pair<std::string, std::string> > ExcludedData;
327  typedef std::map<std::string, ExcludedData> ExcludedDataMap;
329 
330  bool printDependencies_ = false;
331  }; // class EventProcessor
332 
333  //--------------------------------------------------------------------
334 
335  inline
338  return runToCompletion();
339  }
340 }
341 #endif
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
ProcessContext processContext_
SharedResourcesAcquirer sourceResourcesAcquirer_
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
ProcessConfiguration const & processConfiguration() const
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
int init
Definition: HydjetWrapper.h:67
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
Definition: config.py:1
std::set< std::pair< std::string, std::string > > ExcludedData
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
std::vector< SubProcess > subProcesses_
void beginJob()
Definition: Breakpoints.cc:15
ServiceToken serviceToken_
std::atomic< bool > deferredExceptionPtrIsSet_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
std::shared_ptr< ProductRegistry const > preg() const
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr< ProcessConfiguration const > processConfiguration_
std::shared_ptr< std::recursive_mutex > sourceMutex_
rep
Definition: cuy.py:1188
std::string exceptionMessageFiles_
std::shared_ptr< BranchIDListHelper > & branchIDListHelper()
std::shared_ptr< EDLooperBase > & looper()
StatusCode asyncStopStatusCodeFromProcessingEvents_
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
std::map< std::string, ExcludedData > ExcludedDataMap
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr< ProductRegistry > & preg()
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
HLT enums.
std::exception_ptr deferredExceptionPtr_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
unsigned int RunNumber_t
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper()
std::shared_ptr< EDLooperBase const > looper() const
PrincipalCache principalCache_
def getToken(db, tag, since)