CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_EventProcessor_h
2 #define FWCore_Framework_EventProcessor_h
3 
4 /*----------------------------------------------------------------------
5 
6 EventProcessor: This defines the 'framework application' object. It is
7 configured in the user's main() function, and is set running.
8 
9 ----------------------------------------------------------------------*/
10 
14 
19 
21 
25 
26 #include "boost/shared_ptr.hpp"
27 #include "boost/thread/condition.hpp"
28 
29 #include <map>
30 #include <memory>
31 #include <set>
32 #include <string>
33 #include <vector>
34 
35 namespace statemachine {
36  class Machine;
37  class Run;
38 }
39 
40 namespace edm {
41 
42  class ActionTable;
43  class BranchIDListHelper;
44  class EDLooperBase;
45  class HistoryAppender;
46  class ProcessDesc;
47  class SubProcess;
48  namespace eventsetup {
49  class EventSetupProvider;
50  class EventSetupsController;
51  }
52 
53  namespace event_processor {
54  /*
55  Several of these state are likely to be transitory in
56  the offline because they are completly driven by the
57  data coming from the input source.
58  */
61 
66 
67  class StateSentry;
68  }
69 
71  public:
72 
73  // The input string 'config' contains the entire contents of a configuration file.
74  // Also allows the attachement of pre-existing services specified by 'token', and
75  // the specification of services by name only (defaultServices and forcedServices).
76  // 'defaultServices' are overridden by 'config'.
77  // 'forcedServices' override the 'config'.
78  explicit EventProcessor(std::string const& config,
79  ServiceToken const& token = ServiceToken(),
81  std::vector<std::string> const& defaultServices = std::vector<std::string>(),
82  std::vector<std::string> const& forcedServices = std::vector<std::string>());
83 
84  // Same as previous constructor, but without a 'token'. Token will be defaulted.
85 
86  EventProcessor(std::string const& config,
87  std::vector<std::string> const& defaultServices,
88  std::vector<std::string> const& forcedServices = std::vector<std::string>());
89 
90  EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
91  ServiceToken const& token,
93 
95  EventProcessor(std::string const& config, bool isPython);
96 
98 
99  EventProcessor(EventProcessor const&) = delete; // Disallow copying and moving
100  EventProcessor& operator=(EventProcessor const&) = delete; // Disallow copying and moving
101 
106  void beginJob();
107 
111  void endJob();
112 
116  char const* currentStateName() const;
117  char const* stateName(event_processor::State s) const;
118  char const* msgName(event_processor::Msg m) const;
120  void runAsync();
121  StatusCode statusAsync() const;
122 
123  // Concerning the async control functions:
124  // The event processor is left with the running thread.
125  // The async thread is stuck at this point and the process
126  // is likely not going to be able to continue.
127  // The reason for this timeout could be either an infinite loop
128  // or I/O blocking forever.
129  // The only thing to do is end the process.
130  // If you call endJob, you will likely get an exception from the
131  // state checks telling you that it is not valid to call this function.
132  // All these function force the event processor state into an
133  // error state.
134 
135  // tell the event loop to stop and wait for its completion
136  StatusCode stopAsync(unsigned int timeout_secs = 60 * 2);
137 
138  // tell the event loop to shutdown and wait for the completion
139  StatusCode shutdownAsync(unsigned int timeout_secs = 60 * 2);
140 
141  // wait until async event loop thread completes
142  // or timeout occurs (See StatusCode for return values)
143  StatusCode waitTillDoneAsync(unsigned int timeout_seconds = 0);
144 
145  // Both of these calls move the EP to the ready to run state but only
146  // the first actually sets the run number, the other one just stores
147  // the run number set externally in order to later compare to the one
148  // read from the input source for verification
150  void declareRunNumber(RunNumber_t runNumber);
151 
152  // -------------
153 
154  // Same as runToCompletion(false) but since it was used extensively
155  // outside of the framework (and is simpler) will keep
156  StatusCode run();
157 
160 
164 
165  std::vector<ModuleDescription const*>
166  getAllModuleDescriptions() const;
167 
171  int totalEvents() const;
172 
175  int totalEventsPassed() const;
176 
179  int totalEventsFailed() const;
180 
183  void enableEndPaths(bool active);
184 
187  bool endPathsEnabled() const;
188 
191  void getTriggerReport(TriggerReport& rep) const;
192 
194  void clearCounters();
195 
196  // Really should not be public,
197  // but the EventFilter needs it for now.
199 
204 
209 
210  //------------------------------------------------------------------
211  //
212  // Nested classes and structs below.
213 
214  // The function "runToCompletion" will run until the job is "complete",
215  // which means:
216  // 1 - no more input data
217  // 2 - input maxEvents parameter limit reached
218  // 3 - output maxEvents parameter limit reached
219  // 4 - input maxLuminosityBlocks parameter limit reached
220  // 5 - looper directs processing to end
221  //
222  // The return values from the function are as follows:
223  // epSignal - processing terminated early, SIGUSR2 encountered
224  // epCountComplete - "runEventCount" processed the number of events
225  // requested by the argument
226  // epSuccess - all other cases
227  //
228  // The online is an exceptional case. Online uses the DaqSource
229  // and the StreamerOutputModule, which are specially written to
230  // handle multiple calls of "runToCompletion" in the same job.
231  // The call to setRunNumber resets the DaqSource between those calls.
232  // With most sources and output modules, this does not work.
233  // If and only if called by the online, the argument to runToCompletion
234  // is set to true and this affects the state initial and final state
235  // transitions that are managed directly in EventProcessor.cc. (I am
236  // not sure if there is a reason for this or it is just a historical
237  // peculiarity that could be cleaned up and removed).
238 
239  virtual StatusCode runToCompletion(bool onlineStateTransitions);
240 
241  // The following functions are used by the code implementing our
242  // boost statemachine
243 
244  virtual void readFile();
245  virtual void closeInputFile(bool cleaningUpAfterException);
246  virtual void openOutputFiles();
247  virtual void closeOutputFiles();
248 
249  virtual void respondToOpenInputFile();
250  virtual void respondToCloseInputFile();
251  virtual void respondToOpenOutputFiles();
252  virtual void respondToCloseOutputFiles();
253 
254  virtual void startingNewLoop();
255  virtual bool endOfLoop();
256  virtual void rewindInput();
257  virtual void prepareForNextLoop();
258  virtual bool shouldWeCloseOutput() const;
259 
260  virtual void doErrorStuff();
261 
262  virtual void beginRun(statemachine::Run const& run);
263  virtual void endRun(statemachine::Run const& run, bool cleaningUpAfterException);
264 
265  virtual void beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
266  virtual void endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException);
267 
270  virtual int readAndCacheLumi();
271  virtual int readAndMergeLumi();
272  virtual void writeRun(statemachine::Run const& run);
273  virtual void deleteRunFromCache(statemachine::Run const& run);
274  virtual void writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
276 
277  virtual void readAndProcessEvent();
278  virtual bool shouldWeStop() const;
279 
283 
284  virtual bool alreadyHandlingException() const;
285 
286  //returns 'true' if this was a child and we should continue processing
287  bool forkProcess(std::string const& jobReportFile);
288 
289  private:
290  //------------------------------------------------------------------
291  //
292  // Now private functions.
293  // init() is used by only by constructors
294  void init(boost::shared_ptr<ProcessDesc>& processDesc,
295  ServiceToken const& token,
297 
298  void terminateMachine(std::auto_ptr<statemachine::Machine>&);
299  std::auto_ptr<statemachine::Machine> createStateMachine();
300 
302 
303  StatusCode waitForAsyncCompletion(unsigned int timeout_seconds);
304 
305  void connectSigs(EventProcessor* ep);
306 
308  void errorState();
309  void setupSignal();
310 
311  static void asyncRun(EventProcessor*);
312 
313  bool hasSubProcess() const {
314  return subProcess_.get() != 0;
315  }
316 
318  //------------------------------------------------------------------
319  //
320  // Data members below.
321  // Are all these data members really needed? Some of them are used
322  // only during construction, and never again. If they aren't
323  // really needed, we should remove them.
324 
327  boost::shared_ptr<ActivityRegistry> actReg_;
328  boost::shared_ptr<ProductRegistry const> preg_;
329  boost::shared_ptr<BranchIDListHelper> branchIDListHelper_;
331  std::unique_ptr<InputSource> input_;
332  std::unique_ptr<eventsetup::EventSetupsController> espController_;
333  boost::shared_ptr<eventsetup::EventSetupProvider> esp_;
334  std::unique_ptr<ActionTable const> act_table_;
335  boost::shared_ptr<ProcessConfiguration const> processConfiguration_;
336  std::auto_ptr<Schedule> schedule_;
337  std::auto_ptr<SubProcess> subProcess_;
338  std::unique_ptr<HistoryAppender> historyAppender_;
339 
341  boost::shared_ptr<boost::thread> event_loop_;
342 
345  boost::condition stopper_;
346  boost::condition starter_;
347  volatile int stop_count_;
348  volatile Status last_rc_;
350  volatile bool id_set_;
351  volatile pthread_t event_loop_id_;
353  std::unique_ptr<FileBlock> fb_;
354  boost::shared_ptr<EDLooperBase> looper_;
355 
368 
373 
374  typedef std::set<std::pair<std::string, std::string> > ExcludedData;
375  typedef std::map<std::string, ExcludedData> ExcludedDataMap;
378  }; // class EventProcessor
379 
380  //--------------------------------------------------------------------
381 
382  inline
385  return runToCompletion(false);
386  }
387 }
388 #endif
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
string rep
Definition: cuy.py:1188
volatile int stop_count_
event_processor::State getState() const
volatile bool id_set_
virtual void closeOutputFiles()
static boost::mutex mutex
Definition: LHEProxy.cc:11
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
boost::shared_ptr< boost::thread > event_loop_
virtual void beginLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
ActivityRegistry::PostProcessEvent postProcessEventSignal_
boost::shared_ptr< EDLooperBase > looper_
virtual void setExceptionMessageFiles(std::string &message)
void possiblyContinueAfterForkChildFailure()
tuple lumi
Definition: fjr2json.py:35
virtual void writeLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
void clearCounters()
Clears counters used by trigger report.
StatusCode doneAsync(event_processor::Msg m)
virtual StatusCode runToCompletion(bool onlineStateTransitions)
boost::shared_ptr< ActivityRegistry > actReg_
StatusCode shutdownAsync(unsigned int timeout_secs=60 *2)
boost::mutex state_lock_
void setRunNumber(RunNumber_t runNumber)
virtual void deleteRunFromCache(statemachine::Run const &run)
virtual void rewindInput()
std::set< std::pair< std::string, std::string > > ExcludedData
std::auto_ptr< statemachine::Machine > createStateMachine()
std::string exceptionMessageRuns_
unsigned int LuminosityBlockNumber_t
Definition: EventID.h:31
char const * msgName(event_processor::Msg m) const
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool forkProcess(std::string const &jobReportFile)
virtual void beginRun(statemachine::Run const &run)
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
StatusCode waitTillDoneAsync(unsigned int timeout_seconds=0)
virtual void readFile()
boost::condition stopper_
std::string last_error_text_
void terminateMachine(std::auto_ptr< statemachine::Machine > &)
virtual int readAndCacheLumi()
ServiceToken serviceToken_
bool endPathsEnabled() const
boost::mutex stop_lock_
virtual bool alreadyHandlingException() const
std::string exceptionMessageLumis_
virtual void setExceptionMessageLumis(std::string &message)
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
virtual void setExceptionMessageRuns(std::string &message)
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
void declareRunNumber(RunNumber_t runNumber)
virtual void readAndProcessEvent()
volatile event_processor::State state_
EventProcessor & operator=(EventProcessor const &)=delete
void connectSigs(EventProcessor *ep)
static void asyncRun(EventProcessor *)
int totalEvents() const
std::auto_ptr< Schedule > schedule_
ActivityRegistry::PostProcessEvent & postProcessEventSignal()
char const * stateName(event_processor::State s) const
std::string exceptionMessageFiles_
boost::shared_ptr< ProductRegistry const > preg_
virtual statemachine::Run readAndMergeRun()
char const * currentStateName() const
virtual void respondToCloseInputFile()
volatile pthread_t event_loop_id_
ActivityRegistry::PreProcessEvent & preProcessEventSignal()
volatile Status last_rc_
virtual void writeRun(statemachine::Run const &run)
void changeState(event_processor::Msg)
virtual statemachine::Run readAndCacheRun()
virtual void respondToOpenInputFile()
virtual bool shouldWeCloseOutput() const
virtual void doErrorStuff()
StatusCode statusAsync() const
virtual void endLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
virtual void respondToOpenOutputFiles()
std::unique_ptr< InputSource > input_
std::map< std::string, ExcludedData > ExcludedDataMap
ServiceToken getToken()
virtual void openOutputFiles()
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
virtual void deleteLumiFromCache(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
virtual void closeInputFile(bool cleaningUpAfterException)
int totalEventsFailed() const
virtual bool endOfLoop()
virtual void respondToCloseOutputFiles()
virtual void prepareForNextLoop()
virtual void endRun(statemachine::Run const &run, bool cleaningUpAfterException)
std::auto_ptr< SubProcess > subProcess_
virtual void startingNewLoop()
unsigned int RunNumber_t
Definition: EventRange.h:32
ActivityRegistry::PreProcessEvent preProcessEventSignal_
StatusCode stopAsync(unsigned int timeout_secs=60 *2)
std::unique_ptr< ActionTable const > act_table_
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::unique_ptr< eventsetup::EventSetupsController > espController_
int totalEventsPassed() const
virtual int readAndMergeLumi()
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
PrincipalCache principalCache_
bool hasSubProcess() const
virtual bool shouldWeStop() const
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const