CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_EventProcessor_h
2 #define FWCore_Framework_EventProcessor_h
3 
4 /*----------------------------------------------------------------------
5 
6 EventProcessor: This defines the 'framework application' object. It is
7 configured in the user's main() function, and is set running.
8 
9 ----------------------------------------------------------------------*/
10 
12 
17 
19 
23 
24 #include "boost/shared_ptr.hpp"
25 #include "boost/scoped_ptr.hpp"
26 #include "boost/thread/condition.hpp"
27 #include "boost/utility.hpp"
28 
29 #include <map>
30 #include <memory>
31 #include <set>
32 #include <string>
33 #include <vector>
34 
35 namespace statemachine {
36  class Machine;
37  class Run;
38 }
39 
40 namespace edm {
41 
42  class ActionTable;
43  class EDLooperBase;
44  class HistoryAppender;
45  class ProcessDesc;
46  class SubProcess;
47  namespace eventsetup {
48  class EventSetupProvider;
50  }
51 
52  namespace event_processor {
53  /*
54  Several of these state are likely to be transitory in
55  the offline because they are completly driven by the
56  data coming from the input source.
57  */
60 
65 
66  class StateSentry;
67  }
68 
69  class EventProcessor : public IEventProcessor, private boost::noncopyable {
70  public:
71 
72  // The input string 'config' contains the entire contents of a configuration file.
73  // Also allows the attachement of pre-existing services specified by 'token', and
74  // the specification of services by name only (defaultServices and forcedServices).
75  // 'defaultServices' are overridden by 'config'.
76  // 'forcedServices' override the 'config'.
77  explicit EventProcessor(std::string const& config,
78  ServiceToken const& token = ServiceToken(),
80  std::vector<std::string> const& defaultServices = std::vector<std::string>(),
81  std::vector<std::string> const& forcedServices = std::vector<std::string>());
82 
83  // Same as previous constructor, but without a 'token'. Token will be defaulted.
84 
85  EventProcessor(std::string const& config,
86  std::vector<std::string> const& defaultServices,
87  std::vector<std::string> const& forcedServices = std::vector<std::string>());
88 
89  EventProcessor(boost::shared_ptr<ProcessDesc>& processDesc,
90  ServiceToken const& token,
92 
94  EventProcessor(std::string const& config, bool isPython);
95 
97 
102  void beginJob();
103 
107  void endJob();
108 
112  char const* currentStateName() const;
113  char const* stateName(event_processor::State s) const;
114  char const* msgName(event_processor::Msg m) const;
116  void runAsync();
117  StatusCode statusAsync() const;
118 
119  // Concerning the async control functions:
120  // The event processor is left with the running thread.
121  // The async thread is stuck at this point and the process
122  // is likely not going to be able to continue.
123  // The reason for this timeout could be either an infinite loop
124  // or I/O blocking forever.
125  // The only thing to do is end the process.
126  // If you call endJob, you will likely get an exception from the
127  // state checks telling you that it is not valid to call this function.
128  // All these function force the event processor state into an
129  // error state.
130 
131  // tell the event loop to stop and wait for its completion
132  StatusCode stopAsync(unsigned int timeout_secs = 60 * 2);
133 
134  // tell the event loop to shutdown and wait for the completion
135  StatusCode shutdownAsync(unsigned int timeout_secs = 60 * 2);
136 
137  // wait until async event loop thread completes
138  // or timeout occurs (See StatusCode for return values)
139  StatusCode waitTillDoneAsync(unsigned int timeout_seconds = 0);
140 
141  // Both of these calls move the EP to the ready to run state but only
142  // the first actually sets the run number, the other one just stores
143  // the run number set externally in order to later compare to the one
144  // read from the input source for verification
146  void declareRunNumber(RunNumber_t runNumber);
147 
148  // -------------
149 
150  // These next two functions are deprecated. Please use
151  // RunToCompletion or RunEventCount instead. These will
152  // be deleted as soon as we have time to clean up the code
153  // in packages outside the Framework that uses them already.
154  StatusCode run(int numberEventsToProcess, bool repeatable = true);
155  StatusCode run();
156 
157  // Skip the specified number of events.
158  // If numberToSkip is negative, we will back up.
159  StatusCode skip(int numberToSkip);
160 
161  // Rewind to the first event
162  void rewind();
163 
166 
170 
171  std::vector<ModuleDescription const*>
172  getAllModuleDescriptions() const;
173 
177  int totalEvents() const;
178 
181  int totalEventsPassed() const;
182 
185  int totalEventsFailed() const;
186 
189  void enableEndPaths(bool active);
190 
193  bool endPathsEnabled() const;
194 
197  void getTriggerReport(TriggerReport& rep) const;
198 
200  void clearCounters();
201 
202  // Really should not be public,
203  // but the EventFilter needs it for now.
205 
210 
215 
216  //------------------------------------------------------------------
217  //
218  // Nested classes and structs below.
219 
220  // The function "runToCompletion" will run until the job is "complete",
221  // which means:
222  // 1 - no more input data
223  // 2 - input maxEvents parameter limit reached
224  // 3 - output maxEvents parameter limit reached
225  // 4 - input maxLuminosityBlocks parameter limit reached
226  // 5 - looper directs processing to end
227  // The function "runEventCount" will pause after processing the
228  // number of input events specified by the argument. One can
229  // call it again to resume processing at the same point. This
230  // function will also stop at the same point as "runToCompletion"
231  // if the job is complete before the requested number of events
232  // are processed. If the requested number of events is less than
233  // 1, "runEventCount" interprets this as infinity and does not
234  // pause until the job is complete.
235  //
236  // The return values from these functions are as follows:
237  // epSignal - processing terminated early, SIGUSR2 encountered
238  // epCountComplete - "runEventCount" processed the number of events
239  // requested by the argument
240  // epSuccess - all other cases
241  //
242  // We expect that in most cases, processes will call
243  // "runToCompletion" once per job and not use "runEventCount".
244  //
245  // If a process used "runEventCount", then it would need to
246  // check the value returned by "runEventCount" to determine
247  // if it processed the requested number of events. It would
248  // only make sense to call it again if it returned epCountComplete
249  // on the preceding call.
250 
251  // The online is an exceptional case. Online uses the DaqSource
252  // and the StreamerOutputModule, which are specially written to
253  // handle multiple calls of "runToCompletion" in the same job.
254  // The call to setRunNumber resets the DaqSource between those calls.
255  // With most sources and output modules, this does not work.
256  // If and only if called by the online, the argument to runToCompletion
257  // is set to true and this affects the state initial and final state
258  // transitions that are managed directly in EventProcessor.cc. (I am
259  // not sure if there is a reason for this or it is just a historical
260  // peculiarity that could be cleaned up and removed).
261 
262  virtual StatusCode runToCompletion(bool onlineStateTransitions);
263  virtual StatusCode runEventCount(int numberOfEventsToProcess);
264 
265  // The following functions are used by the code implementing our
266  // boost statemachine
267 
268  virtual void readFile();
269  virtual void closeInputFile(bool cleaningUpAfterException);
270  virtual void openOutputFiles();
271  virtual void closeOutputFiles();
272 
273  virtual void respondToOpenInputFile();
274  virtual void respondToCloseInputFile();
275  virtual void respondToOpenOutputFiles();
276  virtual void respondToCloseOutputFiles();
277 
278  virtual void startingNewLoop();
279  virtual bool endOfLoop();
280  virtual void rewindInput();
281  virtual void prepareForNextLoop();
282  virtual bool shouldWeCloseOutput() const;
283 
284  virtual void doErrorStuff();
285 
286  virtual void beginRun(statemachine::Run const& run);
287  virtual void endRun(statemachine::Run const& run, bool cleaningUpAfterException);
288 
289  virtual void beginLumi(ProcessHistoryID const& phid, int run, int lumi);
290  virtual void endLumi(ProcessHistoryID const& phid, int run, int lumi, bool cleaningUpAfterException);
291 
293  virtual int readAndCacheLumi(bool merge);
294  virtual void writeRun(statemachine::Run const& run);
295  virtual void deleteRunFromCache(statemachine::Run const& run);
296  virtual void writeLumi(ProcessHistoryID const& phid, int run, int lumi);
297  virtual void deleteLumiFromCache(ProcessHistoryID const& phid, int run, int lumi);
298 
299  virtual void readAndProcessEvent();
300  virtual bool shouldWeStop() const;
301 
302  virtual void setExceptionMessageFiles(std::string& message);
303  virtual void setExceptionMessageRuns(std::string& message);
304  virtual void setExceptionMessageLumis(std::string& message);
305 
306  virtual bool alreadyHandlingException() const;
307 
308  //returns 'true' if this was a child and we should continue processing
309  bool forkProcess(std::string const& jobReportFile);
310 
311  private:
312  //------------------------------------------------------------------
313  //
314  // Now private functions.
315  // init() is used by only by constructors
316  void init(boost::shared_ptr<ProcessDesc>& processDesc,
317  ServiceToken const& token,
319 
320  StatusCode runCommon(bool onlineStateTransitions, int numberOfEventsToProcess);
321  void terminateMachine();
322 
324 
325  StatusCode waitForAsyncCompletion(unsigned int timeout_seconds);
326 
327  void connectSigs(EventProcessor* ep);
328 
330  void errorState();
331  void setupSignal();
332 
333  static void asyncRun(EventProcessor*);
334 
335  bool hasSubProcess() const {
336  return subProcess_.get() != 0;
337  }
338 
339  //------------------------------------------------------------------
340  //
341  // Data members below.
342  // Are all these data members really needed? Some of them are used
343  // only during construction, and never again. If they aren't
344  // really needed, we should remove them.
345 
348  boost::shared_ptr<ActivityRegistry> actReg_;
349  boost::shared_ptr<SignallingProductRegistry> preg_;
351  boost::shared_ptr<InputSource> input_;
352  boost::scoped_ptr<eventsetup::EventSetupsController> espController_;
353  boost::shared_ptr<eventsetup::EventSetupProvider> esp_;
354  boost::shared_ptr<ActionTable const> act_table_;
355  boost::shared_ptr<ProcessConfiguration> processConfiguration_;
356  std::auto_ptr<Schedule> schedule_;
357  std::auto_ptr<SubProcess> subProcess_;
358  boost::scoped_ptr<HistoryAppender> historyAppender_;
359 
361  boost::shared_ptr<boost::thread> event_loop_;
362 
365  boost::condition stopper_;
366  boost::condition starter_;
367  volatile int stop_count_;
368  volatile Status last_rc_;
369  std::string last_error_text_;
370  volatile bool id_set_;
371  volatile pthread_t event_loop_id_;
373  boost::shared_ptr<FileBlock> fb_;
374  boost::shared_ptr<EDLooperBase> looper_;
375 
376  std::auto_ptr<statemachine::Machine> machine_;
380  std::string fileMode_;
381  std::string emptyRunLumiMode_;
389 
393  typedef std::set<std::pair<std::string, std::string> > ExcludedData;
394  typedef std::map<std::string, ExcludedData> ExcludedDataMap;
397  }; // class EventProcessor
398 
399  //--------------------------------------------------------------------
400 
401  inline
404  return run(-1, false);
405  }
406 }
407 #endif
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
volatile int stop_count_
boost::shared_ptr< SignallingProductRegistry > preg_
boost::shared_ptr< InputSource > input_
event_processor::State getState() const
virtual void beginLumi(ProcessHistoryID const &phid, int run, int lumi)
volatile bool id_set_
virtual void closeOutputFiles()
StatusCode skip(int numberToSkip)
static boost::mutex mutex
Definition: LHEProxy.cc:11
StatusCode waitForAsyncCompletion(unsigned int timeout_seconds)
boost::shared_ptr< boost::thread > event_loop_
ActivityRegistry::PostProcessEvent postProcessEventSignal_
Definition: Hash.h:41
boost::shared_ptr< EDLooperBase > looper_
virtual void setExceptionMessageFiles(std::string &message)
tuple lumi
Definition: fjr2json.py:35
void clearCounters()
Clears counters used by trigger report.
virtual int readAndCacheLumi(bool merge)
StatusCode doneAsync(event_processor::Msg m)
virtual StatusCode runToCompletion(bool onlineStateTransitions)
virtual statemachine::Run readAndCacheRun(bool merge)
boost::shared_ptr< ActivityRegistry > actReg_
StatusCode shutdownAsync(unsigned int timeout_secs=60 *2)
virtual StatusCode runEventCount(int numberOfEventsToProcess)
boost::mutex state_lock_
void setRunNumber(RunNumber_t runNumber)
virtual void deleteRunFromCache(statemachine::Run const &run)
virtual void rewindInput()
std::set< std::pair< std::string, std::string > > ExcludedData
std::string exceptionMessageRuns_
char const * msgName(event_processor::Msg m) const
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool forkProcess(std::string const &jobReportFile)
virtual void beginRun(statemachine::Run const &run)
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
StatusCode waitTillDoneAsync(unsigned int timeout_seconds=0)
virtual void readFile()
boost::condition stopper_
std::string last_error_text_
ServiceToken serviceToken_
virtual void endLumi(ProcessHistoryID const &phid, int run, int lumi, bool cleaningUpAfterException)
bool endPathsEnabled() const
boost::mutex stop_lock_
virtual bool alreadyHandlingException() const
std::string exceptionMessageLumis_
sigc::signal< void, Event const &, EventSetup const & > PostProcessEvent
virtual void setExceptionMessageLumis(std::string &message)
sigc::signal< void, EventID const &, Timestamp const & > PreProcessEvent
virtual void setExceptionMessageRuns(std::string &message)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
void declareRunNumber(RunNumber_t runNumber)
virtual void readAndProcessEvent()
volatile event_processor::State state_
virtual void writeLumi(ProcessHistoryID const &phid, int run, int lumi)
boost::shared_ptr< ProcessConfiguration > processConfiguration_
void connectSigs(EventProcessor *ep)
static void asyncRun(EventProcessor *)
boost::scoped_ptr< eventsetup::EventSetupsController > espController_
int totalEvents() const
std::auto_ptr< statemachine::Machine > machine_
std::auto_ptr< Schedule > schedule_
ActivityRegistry::PostProcessEvent & postProcessEventSignal()
char const * stateName(event_processor::State s) const
std::string exceptionMessageFiles_
StatusCode runCommon(bool onlineStateTransitions, int numberOfEventsToProcess)
char const * currentStateName() const
virtual void respondToCloseInputFile()
volatile pthread_t event_loop_id_
ActivityRegistry::PreProcessEvent & preProcessEventSignal()
volatile Status last_rc_
virtual void writeRun(statemachine::Run const &run)
void changeState(event_processor::Msg)
virtual void respondToOpenInputFile()
virtual bool shouldWeCloseOutput() const
virtual void doErrorStuff()
StatusCode statusAsync() const
bool merge(LuminosityBlockRange &lh, LuminosityBlockRange &rh)
virtual void respondToOpenOutputFiles()
std::map< std::string, ExcludedData > ExcludedDataMap
ServiceToken getToken()
virtual void openOutputFiles()
void init(boost::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
boost::condition starter_
virtual void closeInputFile(bool cleaningUpAfterException)
boost::shared_ptr< ActionTable const > act_table_
int totalEventsFailed() const
virtual bool endOfLoop()
virtual void respondToCloseOutputFiles()
virtual void prepareForNextLoop()
virtual void endRun(statemachine::Run const &run, bool cleaningUpAfterException)
std::auto_ptr< SubProcess > subProcess_
virtual void startingNewLoop()
unsigned int RunNumber_t
Definition: EventRange.h:32
ActivityRegistry::PreProcessEvent preProcessEventSignal_
StatusCode stopAsync(unsigned int timeout_secs=60 *2)
virtual void deleteLumiFromCache(ProcessHistoryID const &phid, int run, int lumi)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
int totalEventsPassed() const
boost::shared_ptr< FileBlock > fb_
PrincipalCache principalCache_
bool hasSubProcess() const
virtual bool shouldWeStop() const
void enableEndPaths(bool active)
boost::scoped_ptr< HistoryAppender > historyAppender_
void getTriggerReport(TriggerReport &rep) const