CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_EventProcessor_h
2 #define FWCore_Framework_EventProcessor_h
3 
4 /*----------------------------------------------------------------------
5 
6 EventProcessor: This defines the 'framework application' object. It is
7 configured in the user's main() function, and is set running.
8 
9 ----------------------------------------------------------------------*/
10 
14 
22 
24 
29 
30 #include "boost/shared_ptr.hpp"
31 #include "boost/thread/condition.hpp"
32 
33 #include <map>
34 #include <memory>
35 #include <set>
36 #include <string>
37 #include <vector>
38 #include <mutex>
39 #include <exception>
40 
41 namespace statemachine {
42  class Machine;
43  class Run;
44 }
45 
46 namespace edm {
47 
48  class ExceptionToActionTable;
49  class BranchIDListHelper;
50  class ThinnedAssociationsHelper;
51  class EDLooperBase;
52  class HistoryAppender;
53  class ProcessDesc;
54  class SubProcess;
55  namespace eventsetup {
56  class EventSetupProvider;
57  class EventSetupsController;
58  }
59 
61  public:
62 
63  // The input string 'config' contains the entire contents of a configuration file.
64  // Also allows the attachement of pre-existing services specified by 'token', and
65  // the specification of services by name only (defaultServices and forcedServices).
66  // 'defaultServices' are overridden by 'config'.
67  // 'forcedServices' override the 'config'.
68  explicit EventProcessor(std::string const& config,
69  ServiceToken const& token = ServiceToken(),
71  std::vector<std::string> const& defaultServices = std::vector<std::string>(),
72  std::vector<std::string> const& forcedServices = std::vector<std::string>());
73 
74  // Same as previous constructor, but without a 'token'. Token will be defaulted.
75 
76  EventProcessor(std::string const& config,
77  std::vector<std::string> const& defaultServices,
78  std::vector<std::string> const& forcedServices = std::vector<std::string>());
79 
80  EventProcessor(std::shared_ptr<ProcessDesc>& processDesc,
81  ServiceToken const& token,
83 
85  EventProcessor(std::string const& config, bool isPython);
86 
88 
89  EventProcessor(EventProcessor const&) = delete; // Disallow copying and moving
90  EventProcessor& operator=(EventProcessor const&) = delete; // Disallow copying and moving
91 
96  void beginJob();
97 
101  void endJob();
102 
103  // -------------
104 
105  // Same as runToCompletion(false) but since it was used extensively
106  // outside of the framework (and is simpler) will keep
107  StatusCode run();
108 
111 
115 
116  std::vector<ModuleDescription const*>
117  getAllModuleDescriptions() const;
118 
120 
124  int totalEvents() const;
125 
128  int totalEventsPassed() const;
129 
132  int totalEventsFailed() const;
133 
136  void enableEndPaths(bool active);
137 
140  bool endPathsEnabled() const;
141 
144  void getTriggerReport(TriggerReport& rep) const;
145 
147  void clearCounters();
148 
149  // Really should not be public,
150  // but the EventFilter needs it for now.
152 
153  //------------------------------------------------------------------
154  //
155  // Nested classes and structs below.
156 
157  // The function "runToCompletion" will run until the job is "complete",
158  // which means:
159  // 1 - no more input data
160  // 2 - input maxEvents parameter limit reached
161  // 3 - output maxEvents parameter limit reached
162  // 4 - input maxLuminosityBlocks parameter limit reached
163  // 5 - looper directs processing to end
164  //
165  // The return values from the function are as follows:
166  // epSignal - processing terminated early, SIGUSR2 encountered
167  // epCountComplete - "runEventCount" processed the number of events
168  // requested by the argument
169  // epSuccess - all other cases
170  //
171  virtual StatusCode runToCompletion();
172 
173  // The following functions are used by the code implementing our
174  // boost statemachine
175 
176  virtual void readFile();
177  virtual void closeInputFile(bool cleaningUpAfterException);
178  virtual void openOutputFiles();
179  virtual void closeOutputFiles();
180 
181  virtual void respondToOpenInputFile();
182  virtual void respondToCloseInputFile();
183 
184  virtual void startingNewLoop();
185  virtual bool endOfLoop();
186  virtual void rewindInput();
187  virtual void prepareForNextLoop();
188  virtual bool shouldWeCloseOutput() const;
189 
190  virtual void doErrorStuff();
191 
192  virtual void beginRun(statemachine::Run const& run);
193  virtual void endRun(statemachine::Run const& run, bool cleaningUpAfterException);
194 
195  virtual void beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
196  virtual void endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException);
197 
198  virtual statemachine::Run readRun();
200  virtual int readLuminosityBlock();
201  virtual int readAndMergeLumi();
202  virtual void writeRun(statemachine::Run const& run);
203  virtual void deleteRunFromCache(statemachine::Run const& run);
204  virtual void writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
206 
207  virtual void readAndProcessEvent();
208  virtual bool shouldWeStop() const;
209 
213 
214  virtual bool alreadyHandlingException() const;
215 
216  //returns 'true' if this was a child and we should continue processing
217  bool forkProcess(std::string const& jobReportFile);
218 
219  private:
220  //------------------------------------------------------------------
221  //
222  // Now private functions.
223  // init() is used by only by constructors
224  void init(std::shared_ptr<ProcessDesc>& processDesc,
225  ServiceToken const& token,
227 
228  void terminateMachine(std::auto_ptr<statemachine::Machine>&);
229  std::auto_ptr<statemachine::Machine> createStateMachine();
230 
231  void setupSignal();
232 
233  bool hasSubProcess() const {
234  return subProcess_.get() != 0;
235  }
236 
238 
239  friend class StreamProcessingTask;
240  void processEventsForStreamAsync(unsigned int iStreamIndex,
241  std::atomic<bool>* finishedProcessingEvents);
242 
243 
244  //read the next event using Stream iStreamIndex
245  void readEvent(unsigned int iStreamIndex);
246 
247  //process the already read event using Stream iStreamIndex
248  void processEvent(unsigned int iStreamIndex);
249 
250  //returns true if an asynchronous stop was requested
252  //------------------------------------------------------------------
253  //
254  // Data members below.
255  // Are all these data members really needed? Some of them are used
256  // only during construction, and never again. If they aren't
257  // really needed, we should remove them.
258 
259  std::shared_ptr<ActivityRegistry> actReg_;
260  std::shared_ptr<ProductRegistry> preg_;
261  std::shared_ptr<BranchIDListHelper> branchIDListHelper_;
262  std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper_;
264  std::unique_ptr<InputSource> input_;
265  std::unique_ptr<eventsetup::EventSetupsController> espController_;
266  boost::shared_ptr<eventsetup::EventSetupProvider> esp_;
267  std::unique_ptr<ExceptionToActionTable const> act_table_;
268  std::shared_ptr<ProcessConfiguration const> processConfiguration_;
271  std::auto_ptr<Schedule> schedule_;
272  std::auto_ptr<SubProcess> subProcess_;
273  std::unique_ptr<HistoryAppender> historyAppender_;
274 
275  std::unique_ptr<FileBlock> fb_;
276  boost::shared_ptr<EDLooperBase> looper_;
277 
278  //The atomic protects concurrent access of deferredExceptionPtr_
279  std::atomic<bool> deferredExceptionPtrIsSet_;
280  std::exception_ptr deferredExceptionPtr_;
281 
296 
301 
303 
307 
308  typedef std::set<std::pair<std::string, std::string> > ExcludedData;
309  typedef std::map<std::string, ExcludedData> ExcludedDataMap;
311  }; // class EventProcessor
312 
313  //--------------------------------------------------------------------
314 
315  inline
318  return runToCompletion();
319  }
320 }
321 #endif
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void readEvent(unsigned int iStreamIndex)
string rep
Definition: cuy.py:1188
ProcessContext processContext_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
virtual void closeOutputFiles()
std::unique_ptr< ExceptionToActionTable const > act_table_
static boost::mutex mutex
Definition: LHEProxy.cc:11
virtual void beginLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
boost::shared_ptr< EDLooperBase > looper_
virtual void setExceptionMessageFiles(std::string &message)
void possiblyContinueAfterForkChildFailure()
tuple lumi
Definition: fjr2json.py:35
ProcessConfiguration const & processConfiguration() const
virtual statemachine::Run readRun()
virtual void writeLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
void clearCounters()
Clears counters used by trigger report.
bool checkForAsyncStopRequest(StatusCode &)
virtual void deleteRunFromCache(statemachine::Run const &run)
virtual void rewindInput()
std::set< std::pair< std::string, std::string > > ExcludedData
std::auto_ptr< statemachine::Machine > createStateMachine()
std::string exceptionMessageRuns_
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool forkProcess(std::string const &jobReportFile)
virtual void beginRun(statemachine::Run const &run)
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
virtual void readFile()
void terminateMachine(std::auto_ptr< statemachine::Machine > &)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
virtual bool alreadyHandlingException() const
std::string exceptionMessageLumis_
virtual void setExceptionMessageLumis(std::string &message)
virtual void setExceptionMessageRuns(std::string &message)
std::unique_ptr< HistoryAppender > historyAppender_
std::unique_ptr< FileBlock > fb_
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
InputSource::ItemType nextItemTypeFromProcessingEvents_
virtual void readAndProcessEvent()
void processEvent(unsigned int iStreamIndex)
virtual StatusCode runToCompletion()
EventProcessor & operator=(EventProcessor const &)=delete
std::shared_ptr< ProcessConfiguration const > processConfiguration_
int totalEvents() const
std::shared_ptr< ProductRegistry > preg_
std::auto_ptr< Schedule > schedule_
std::string exceptionMessageFiles_
virtual statemachine::Run readAndMergeRun()
virtual int readLuminosityBlock()
virtual void respondToCloseInputFile()
virtual void writeRun(statemachine::Run const &run)
StatusCode asyncStopStatusCodeFromProcessingEvents_
virtual void respondToOpenInputFile()
virtual bool shouldWeCloseOutput() const
virtual void doErrorStuff()
virtual void endLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
std::unique_ptr< InputSource > input_
std::map< std::string, ExcludedData > ExcludedDataMap
ServiceToken getToken()
virtual void openOutputFiles()
virtual void deleteLumiFromCache(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
virtual void closeInputFile(bool cleaningUpAfterException)
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
virtual bool endOfLoop()
PathsAndConsumesOfModules pathsAndConsumesOfModules_
virtual void prepareForNextLoop()
virtual void endRun(statemachine::Run const &run, bool cleaningUpAfterException)
std::auto_ptr< SubProcess > subProcess_
virtual void startingNewLoop()
unsigned int RunNumber_t
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::mutex nextTransitionMutex_
void processEventsForStreamAsync(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::unique_ptr< eventsetup::EventSetupsController > espController_
int totalEventsPassed() const
virtual int readAndMergeLumi()
PrincipalCache principalCache_
bool hasSubProcess() const
virtual bool shouldWeStop() const
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_