CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventProcessor.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_EventProcessor_h
2 #define FWCore_Framework_EventProcessor_h
3 
4 /*----------------------------------------------------------------------
5 
6 EventProcessor: This defines the 'framework application' object. It is
7 configured in the user's main() function, and is set running.
8 
9 ----------------------------------------------------------------------*/
10 
14 
22 
24 
29 
31 
32 #include "boost/thread/condition.hpp"
33 
34 #include <map>
35 #include <memory>
36 #include <set>
37 #include <string>
38 #include <vector>
39 #include <mutex>
40 #include <exception>
41 
42 namespace statemachine {
43  class Machine;
44  class Run;
45 }
46 
47 namespace edm {
48 
49  class ExceptionToActionTable;
50  class BranchIDListHelper;
51  class ThinnedAssociationsHelper;
52  class EDLooperBase;
53  class HistoryAppender;
54  class ProcessDesc;
55  class SubProcess;
56  namespace eventsetup {
57  class EventSetupProvider;
58  class EventSetupsController;
59  }
60 
62  public:
63 
64  // The input string 'config' contains the entire contents of a configuration file.
65  // Also allows the attachement of pre-existing services specified by 'token', and
66  // the specification of services by name only (defaultServices and forcedServices).
67  // 'defaultServices' are overridden by 'config'.
68  // 'forcedServices' override the 'config'.
69  explicit EventProcessor(std::string const& config,
70  ServiceToken const& token = ServiceToken(),
72  std::vector<std::string> const& defaultServices = std::vector<std::string>(),
73  std::vector<std::string> const& forcedServices = std::vector<std::string>());
74 
75  // Same as previous constructor, but without a 'token'. Token will be defaulted.
76 
77  EventProcessor(std::string const& config,
78  std::vector<std::string> const& defaultServices,
79  std::vector<std::string> const& forcedServices = std::vector<std::string>());
80 
81  EventProcessor(std::shared_ptr<ProcessDesc>& processDesc,
82  ServiceToken const& token,
84 
86  EventProcessor(std::string const& config, bool isPython);
87 
89 
90  EventProcessor(EventProcessor const&) = delete; // Disallow copying and moving
91  EventProcessor& operator=(EventProcessor const&) = delete; // Disallow copying and moving
92 
97  void beginJob();
98 
102  void endJob();
103 
104  // -------------
105 
106  // Same as runToCompletion(false) but since it was used extensively
107  // outside of the framework (and is simpler) will keep
108  StatusCode run();
109 
112 
116 
117  std::vector<ModuleDescription const*>
118  getAllModuleDescriptions() const;
119 
121 
125  int totalEvents() const;
126 
129  int totalEventsPassed() const;
130 
133  int totalEventsFailed() const;
134 
137  void enableEndPaths(bool active);
138 
141  bool endPathsEnabled() const;
142 
145  void getTriggerReport(TriggerReport& rep) const;
146 
148  void clearCounters();
149 
150  // Really should not be public,
151  // but the EventFilter needs it for now.
153 
154  //------------------------------------------------------------------
155  //
156  // Nested classes and structs below.
157 
158  // The function "runToCompletion" will run until the job is "complete",
159  // which means:
160  // 1 - no more input data
161  // 2 - input maxEvents parameter limit reached
162  // 3 - output maxEvents parameter limit reached
163  // 4 - input maxLuminosityBlocks parameter limit reached
164  // 5 - looper directs processing to end
165  //
166  // The return values from the function are as follows:
167  // epSignal - processing terminated early, SIGUSR2 encountered
168  // epCountComplete - "runEventCount" processed the number of events
169  // requested by the argument
170  // epSuccess - all other cases
171  //
172  virtual StatusCode runToCompletion();
173 
174  // The following functions are used by the code implementing our
175  // boost statemachine
176 
177  virtual void readFile();
178  virtual void closeInputFile(bool cleaningUpAfterException);
179  virtual void openOutputFiles();
180  virtual void closeOutputFiles();
181 
182  virtual void respondToOpenInputFile();
183  virtual void respondToCloseInputFile();
184 
185  virtual void startingNewLoop();
186  virtual bool endOfLoop();
187  virtual void rewindInput();
188  virtual void prepareForNextLoop();
189  virtual bool shouldWeCloseOutput() const;
190 
191  virtual void doErrorStuff();
192 
193  virtual void beginRun(statemachine::Run const& run);
194  virtual void endRun(statemachine::Run const& run, bool cleaningUpAfterException);
195 
196  virtual void beginLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
197  virtual void endLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException);
198 
199  virtual statemachine::Run readRun();
201  virtual int readLuminosityBlock();
202  virtual int readAndMergeLumi();
203  virtual void writeRun(statemachine::Run const& run);
204  virtual void deleteRunFromCache(statemachine::Run const& run);
205  virtual void writeLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi);
207 
208  virtual void readAndProcessEvent();
209  virtual bool shouldWeStop() const;
210 
214 
215  virtual bool alreadyHandlingException() const;
216 
217  //returns 'true' if this was a child and we should continue processing
218  bool forkProcess(std::string const& jobReportFile);
219 
220  private:
221  //------------------------------------------------------------------
222  //
223  // Now private functions.
224  // init() is used by only by constructors
225  void init(std::shared_ptr<ProcessDesc>& processDesc,
226  ServiceToken const& token,
228 
229  void terminateMachine(std::auto_ptr<statemachine::Machine>&);
230  std::auto_ptr<statemachine::Machine> createStateMachine();
231 
232  void setupSignal();
233 
234  bool hasSubProcesses() const {
235  return subProcesses_.get() != nullptr && !subProcesses_->empty();
236  }
237 
239 
240  friend class StreamProcessingTask;
241  void processEventsForStreamAsync(unsigned int iStreamIndex,
242  std::atomic<bool>* finishedProcessingEvents);
243 
244 
245  //read the next event using Stream iStreamIndex
246  void readEvent(unsigned int iStreamIndex);
247 
248  //process the already read event using Stream iStreamIndex
249  void processEvent(unsigned int iStreamIndex);
250 
251  //returns true if an asynchronous stop was requested
253 
254  std::shared_ptr<ProductRegistry const> preg() const {return get_underlying_safe(preg_);}
255  std::shared_ptr<ProductRegistry>& preg() {return get_underlying_safe(preg_);}
256  std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {return get_underlying_safe(branchIDListHelper_);}
257  std::shared_ptr<BranchIDListHelper>& branchIDListHelper() {return get_underlying_safe(branchIDListHelper_);}
258  std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {return get_underlying_safe(thinnedAssociationsHelper_);}
259  std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {return get_underlying_safe(thinnedAssociationsHelper_);}
260  std::shared_ptr<EDLooperBase const> looper() const {return get_underlying_safe(looper_);}
261  std::shared_ptr<EDLooperBase>& looper() {return get_underlying_safe(looper_);}
262  //------------------------------------------------------------------
263  //
264  // Data members below.
265  // Are all these data members really needed? Some of them are used
266  // only during construction, and never again. If they aren't
267  // really needed, we should remove them.
268 
269  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
277  std::unique_ptr<ExceptionToActionTable const> act_table_;
278  std::shared_ptr<ProcessConfiguration const> processConfiguration_;
284 
287 
288  //The atomic protects concurrent access of deferredExceptionPtr_
289  std::atomic<bool> deferredExceptionPtrIsSet_;
290  std::exception_ptr deferredExceptionPtr_;
291 
306 
311 
313 
317 
318  typedef std::set<std::pair<std::string, std::string> > ExcludedData;
319  typedef std::map<std::string, ExcludedData> ExcludedDataMap;
321  }; // class EventProcessor
322 
323  //--------------------------------------------------------------------
324 
325  inline
328  return runToCompletion();
329  }
330 }
331 #endif
std::string emptyRunLumiMode_
unsigned int numberOfSequentialEventsPerChild_
void readEvent(unsigned int iStreamIndex)
string rep
Definition: cuy.py:1188
ProcessContext processContext_
void init(std::shared_ptr< ProcessDesc > &processDesc, ServiceToken const &token, serviceregistry::ServiceLegacy)
edm::propagate_const< std::unique_ptr< std::vector< SubProcess > > > subProcesses_
virtual void closeOutputFiles()
edm::propagate_const< std::unique_ptr< InputSource > > input_
std::unique_ptr< ExceptionToActionTable const > act_table_
static boost::mutex mutex
Definition: LHEProxy.cc:11
virtual void beginLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
virtual void setExceptionMessageFiles(std::string &message)
void possiblyContinueAfterForkChildFailure()
tuple lumi
Definition: fjr2json.py:35
ProcessConfiguration const & processConfiguration() const
virtual statemachine::Run readRun()
virtual void writeLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
edm::propagate_const< std::unique_ptr< HistoryAppender > > historyAppender_
void clearCounters()
Clears counters used by trigger report.
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
bool checkForAsyncStopRequest(StatusCode &)
edm::propagate_const< std::shared_ptr< ProductRegistry > > preg_
virtual void deleteRunFromCache(statemachine::Run const &run)
virtual void rewindInput()
std::set< std::pair< std::string, std::string > > ExcludedData
std::auto_ptr< statemachine::Machine > createStateMachine()
std::string exceptionMessageRuns_
PreallocationConfiguration preallocations_
unsigned int LuminosityBlockNumber_t
EventProcessor(std::string const &config, ServiceToken const &token=ServiceToken(), serviceregistry::ServiceLegacy=serviceregistry::kOverlapIsError, std::vector< std::string > const &defaultServices=std::vector< std::string >(), std::vector< std::string > const &forcedServices=std::vector< std::string >())
bool forkProcess(std::string const &jobReportFile)
virtual void beginRun(statemachine::Run const &run)
virtual void readFile()
void terminateMachine(std::auto_ptr< statemachine::Machine > &)
ServiceToken serviceToken_
bool endPathsEnabled() const
std::atomic< bool > deferredExceptionPtrIsSet_
virtual bool alreadyHandlingException() const
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
std::string exceptionMessageLumis_
std::shared_ptr< ProductRegistry const > preg() const
virtual void setExceptionMessageLumis(std::string &message)
virtual void setExceptionMessageRuns(std::string &message)
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_
InputSource::ItemType nextItemTypeFromProcessingEvents_
edm::propagate_const< std::shared_ptr< EDLooperBase > > looper_
virtual void readAndProcessEvent()
void processEvent(unsigned int iStreamIndex)
virtual StatusCode runToCompletion()
EventProcessor & operator=(EventProcessor const &)=delete
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool hasSubProcesses() const
int totalEvents() const
std::string exceptionMessageFiles_
virtual statemachine::Run readAndMergeRun()
std::shared_ptr< BranchIDListHelper > & branchIDListHelper()
virtual int readLuminosityBlock()
std::shared_ptr< EDLooperBase > & looper()
virtual void respondToCloseInputFile()
virtual void writeRun(statemachine::Run const &run)
StatusCode asyncStopStatusCodeFromProcessingEvents_
virtual void respondToOpenInputFile()
virtual bool shouldWeCloseOutput() const
virtual void doErrorStuff()
edm::propagate_const< std::unique_ptr< FileBlock > > fb_
virtual void endLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi, bool cleaningUpAfterException)
std::map< std::string, ExcludedData > ExcludedDataMap
ServiceToken getToken()
edm::propagate_const< std::unique_ptr< eventsetup::EventSetupsController > > espController_
virtual void openOutputFiles()
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::shared_ptr< ProductRegistry > & preg()
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
virtual void deleteLumiFromCache(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
virtual void closeInputFile(bool cleaningUpAfterException)
std::exception_ptr deferredExceptionPtr_
int totalEventsFailed() const
virtual bool endOfLoop()
PathsAndConsumesOfModules pathsAndConsumesOfModules_
virtual void prepareForNextLoop()
virtual void endRun(statemachine::Run const &run, bool cleaningUpAfterException)
virtual void startingNewLoop()
unsigned int RunNumber_t
bool asyncStopRequestedWhileProcessingEvents_
std::shared_ptr< ActivityRegistry > actReg_
std::mutex nextTransitionMutex_
void processEventsForStreamAsync(unsigned int iStreamIndex, std::atomic< bool > *finishedProcessingEvents)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
int totalEventsPassed() const
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
virtual int readAndMergeLumi()
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper()
std::shared_ptr< EDLooperBase const > looper() const
PrincipalCache principalCache_
virtual bool shouldWeStop() const
void enableEndPaths(bool active)
void getTriggerReport(TriggerReport &rep) const