CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
43 
44 #include "boost/shared_ptr.hpp"
45 
48 
49 #include <memory>
50 #include <sstream>
51 #include <vector>
52 
53 namespace edm {
54  class EventPrincipal;
55  class EarlyDeleteHelper;
56  class ProductHolderIndexHelper;
57  class ProductHolderIndexAndSkipBit;
58  class StreamID;
59  class StreamContext;
60 
61  namespace workerhelper {
62  template< typename O> class CallImpl;
63  }
64 
65  class Worker {
66  public:
67  enum State { Ready, Pass, Fail, Exception };
69 
70  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
71  virtual ~Worker();
72 
73  Worker(Worker const&) = delete; // Disallow copying and moving
74  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
75 
76  template <typename T>
77  bool doWork(typename T::MyPrincipal&, EventSetup const& c,
78  CPUTimer *const timer,
80  ParentContext const& parentContext,
81  typename T::Context const* context);
82  void beginJob() ;
83  void endJob();
84  void beginStream(StreamID id, StreamContext& streamContext);
85  void endStream(StreamID id, StreamContext& streamContext);
88 
90  void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
91 
92  void reset() { state_ = Ready; }
93 
96 
101  void setActivityRegistry(boost::shared_ptr<ActivityRegistry> areg);
102 
104 
105  //Used to make EDGetToken work
106  virtual void updateLookup(BranchType iBranchType,
107  ProductHolderIndexHelper const&) = 0;
108 
109  virtual void modulesDependentUpon(std::vector<const char*>& oModuleLabels) const = 0;
110 
111  virtual Types moduleType() const =0;
112 
113  std::pair<double, double> timeCpuReal() const {
114  return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
115  }
116 
117  void clearCounters() {
119  }
120 
121  void useStopwatch();
122 
123  int timesRun() const { return timesRun_; }
124  int timesVisited() const { return timesVisited_; }
125  int timesPassed() const { return timesPassed_; }
126  int timesFailed() const { return timesFailed_; }
127  int timesExcept() const { return timesExcept_; }
128  State state() const { return state_; }
129 
130  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
131 
132  protected:
133  template<typename O> friend class workerhelper::CallImpl;
134  virtual std::string workerType() const = 0;
135  virtual bool implDo(EventPrincipal&, EventSetup const& c,
136  ModuleCallingContext const* mcc) = 0;
137  virtual bool implDoBegin(RunPrincipal& rp, EventSetup const& c,
138  ModuleCallingContext const* mcc) = 0;
139  virtual bool implDoStreamBegin(StreamID id, RunPrincipal& rp, EventSetup const& c,
140  ModuleCallingContext const* mcc) = 0;
141  virtual bool implDoStreamEnd(StreamID id, RunPrincipal& rp, EventSetup const& c,
142  ModuleCallingContext const* mcc) = 0;
143  virtual bool implDoEnd(RunPrincipal& rp, EventSetup const& c,
144  ModuleCallingContext const* mcc) = 0;
145  virtual bool implDoBegin(LuminosityBlockPrincipal& lbp, EventSetup const& c,
146  ModuleCallingContext const* mcc) = 0;
147  virtual bool implDoStreamBegin(StreamID id, LuminosityBlockPrincipal& lbp, EventSetup const& c,
148  ModuleCallingContext const* mcc) = 0;
149  virtual bool implDoStreamEnd(StreamID id, LuminosityBlockPrincipal& lbp, EventSetup const& c,
150  ModuleCallingContext const* mcc) = 0;
151  virtual bool implDoEnd(LuminosityBlockPrincipal& lbp, EventSetup const& c,
152  ModuleCallingContext const* mcc) = 0;
153  virtual void implBeginJob() = 0;
154  virtual void implEndJob() = 0;
155  virtual void implBeginStream(StreamID) = 0;
156  virtual void implEndStream(StreamID) = 0;
157 
159 
161 
162  private:
163 
164  virtual void itemsToGet(BranchType, std::vector<ProductHolderIndexAndSkipBit>&) const = 0;
165  virtual void itemsMayGet(BranchType, std::vector<ProductHolderIndexAndSkipBit>&) const = 0;
166 
167  virtual std::vector<ProductHolderIndexAndSkipBit> const& itemsToGetFromEvent() const = 0;
168 
169  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
170  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
171 
172  virtual void implPreForkReleaseResources() = 0;
173  virtual void implPostForkReacquireResources(unsigned int iChildIndex,
174  unsigned int iNumberOfChildren) = 0;
176 
183 
185 
186  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
187  boost::shared_ptr<cms::Exception> cached_exception_; // if state is 'exception'
188 
189  boost::shared_ptr<ActivityRegistry> actReg_;
190 
192  };
193 
194  namespace {
195  template <typename T>
196  class ModuleSignalSentry {
197  public:
198  ModuleSignalSentry(ActivityRegistry *a,
199  typename T::Context const* context,
200  ModuleCallingContext const* moduleCallingContext) :
201  a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
202 
203  if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
204  }
205 
206  ~ModuleSignalSentry() {
207  if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
208  }
209 
210  private:
211  ActivityRegistry* a_;
212  typename T::Context const* context_;
213  ModuleCallingContext const* moduleCallingContext_;
214  };
215 
216  template <typename T>
217  void exceptionContext(typename T::MyPrincipal const& principal,
218  cms::Exception& ex,
219  ModuleCallingContext const* mcc) {
220 
221  ModuleCallingContext const* imcc = mcc;
222  while(imcc->type() == ParentContext::Type::kModule) {
223  std::ostringstream iost;
224  iost << "Calling method for unscheduled module "
225  << imcc->moduleDescription()->moduleName() << "/'"
226  << imcc->moduleDescription()->moduleLabel() << "'";
227  ex.addContext(iost.str());
228  imcc = imcc->moduleCallingContext();
229  }
230  if(imcc->type() == ParentContext::Type::kInternal) {
231  std::ostringstream iost;
232  iost << "Calling method for unscheduled module "
233  << imcc->moduleDescription()->moduleName() << "/'"
234  << imcc->moduleDescription()->moduleLabel() << "' (probably inside some kind of mixing module)";
235  ex.addContext(iost.str());
236  imcc = imcc->internalContext()->moduleCallingContext();
237  }
238  while(imcc->type() == ParentContext::Type::kModule) {
239  std::ostringstream iost;
240  iost << "Calling method for unscheduled module "
241  << imcc->moduleDescription()->moduleName() << "/'"
242  << imcc->moduleDescription()->moduleLabel() << "'";
243  ex.addContext(iost.str());
244  imcc = imcc->moduleCallingContext();
245  }
246  std::ostringstream ost;
247  if (T::isEvent_) {
248  ost << "Calling event method";
249  }
250  else {
251  // It should be impossible to get here, because
252  // this function only gets called when the IgnoreCompletely
253  // exception behavior is active, which can only be true
254  // for events.
255  ost << "Calling unknown function";
256  }
257  ost << " for module " << imcc->moduleDescription()->moduleName() << "/'" << imcc->moduleDescription()->moduleLabel() << "'";
258  ex.addContext(ost.str());
259 
260  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
261  ost.str("");
262  ost << "Running path '";
263  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
264  ex.addContext(ost.str());
265  }
266  ost.str("");
267  ost << "Processing ";
268  ost << principal.id();
269  ex.addContext(ost.str());
270  }
271  }
272 
273  namespace workerhelper {
274  template<>
276  public:
278  static bool call(Worker* iWorker, StreamID,
279  EventPrincipal& ep, EventSetup const& es,
280  ActivityRegistry* actReg,
281  ModuleCallingContext const* mcc,
282  Arg::Context const* context) {
283  //Signal sentry is handled by the module
284  return iWorker->implDo(ep,es, mcc);
285  }
286  };
287 
288  template<>
290  public:
292  static bool call(Worker* iWorker,StreamID,
293  RunPrincipal& ep, EventSetup const& es,
294  ActivityRegistry* actReg,
295  ModuleCallingContext const* mcc,
296  Arg::Context const* context) {
297  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
298  return iWorker->implDoBegin(ep,es, mcc);
299  }
300  };
301  template<>
303  public:
305  static bool call(Worker* iWorker,StreamID id,
306  RunPrincipal& ep, EventSetup const& es,
307  ActivityRegistry* actReg,
308  ModuleCallingContext const* mcc,
309  Arg::Context const* context) {
310  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
311  return iWorker->implDoStreamBegin(id,ep,es, mcc);
312  }
313  };
314  template<>
316  public:
318  static bool call(Worker* iWorker,StreamID,
319  RunPrincipal& ep, EventSetup const& es,
320  ActivityRegistry* actReg,
321  ModuleCallingContext const* mcc,
322  Arg::Context const* context) {
323  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
324  return iWorker->implDoEnd(ep,es, mcc);
325  }
326  };
327  template<>
329  public:
331  static bool call(Worker* iWorker,StreamID id,
332  RunPrincipal& ep, EventSetup const& es,
333  ActivityRegistry* actReg,
334  ModuleCallingContext const* mcc,
335  Arg::Context const* context) {
336  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
337  return iWorker->implDoStreamEnd(id,ep,es, mcc);
338  }
339  };
340 
341  template<>
343  public:
345  static bool call(Worker* iWorker,StreamID,
346  LuminosityBlockPrincipal& ep, EventSetup const& es,
347  ActivityRegistry* actReg,
348  ModuleCallingContext const* mcc,
349  Arg::Context const* context) {
350  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
351  return iWorker->implDoBegin(ep,es, mcc);
352  }
353  };
354  template<>
356  public:
358  static bool call(Worker* iWorker,StreamID id,
359  LuminosityBlockPrincipal& ep, EventSetup const& es,
360  ActivityRegistry* actReg,
361  ModuleCallingContext const* mcc,
362  Arg::Context const* context) {
363  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
364  return iWorker->implDoStreamBegin(id,ep,es, mcc);
365  }
366  };
367 
368  template<>
370  public:
372  static bool call(Worker* iWorker,StreamID,
373  LuminosityBlockPrincipal& ep, EventSetup const& es,
374  ActivityRegistry* actReg,
375  ModuleCallingContext const* mcc,
376  Arg::Context const* context) {
377  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
378  return iWorker->implDoEnd(ep,es, mcc);
379  }
380  };
381  template<>
383  public:
385  static bool call(Worker* iWorker,StreamID id,
386  LuminosityBlockPrincipal& ep, EventSetup const& es,
387  ActivityRegistry* actReg,
388  ModuleCallingContext const* mcc,
389  Arg::Context const* context) {
390  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
391  return iWorker->implDoStreamEnd(id,ep,es, mcc);
392  }
393  };
394  }
395 
396  template <typename T>
397  bool Worker::doWork(typename T::MyPrincipal& ep,
398  EventSetup const& es,
399  CPUTimer* const iTimer,
400  StreamID streamID,
401  ParentContext const& parentContext,
402  typename T::Context const* context) {
403 
404  // A RunStopwatch, but only if we are processing an event.
406  iTimer);
407 
408  if (T::isEvent_) {
409  ++timesVisited_;
410  }
411  bool rc = false;
412 
413  switch(state_) {
414  case Ready: break;
415  case Pass: return true;
416  case Fail: return false;
417  case Exception: {
418  cached_exception_->raise();
419  }
420  }
421 
422  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
423 
424  try {
425  convertException::wrap([&]() {
426 
427  if (T::isEvent_) {
428  ++timesRun_;
429 
430  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
431  std::vector<ProductHolderIndexAndSkipBit> const& items = itemsToGetFromEvent();
432  for(auto const& item : items) {
433  ProductHolderIndex productHolderIndex = item.productHolderIndex();
434  bool skipCurrentProcess = item.skipCurrentProcess();
435  if(productHolderIndex != ProductHolderIndexAmbiguous) {
436  ep.prefetch(productHolderIndex, skipCurrentProcess, &moduleCallingContext_);
437  }
438  }
439  }
440 
442  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
443 
444  if (rc) {
445  state_ = Pass;
446  if (T::isEvent_) ++timesPassed_;
447  } else {
448  state_ = Fail;
449  if (T::isEvent_) ++timesFailed_;
450  }
451  });
452  }
453  catch(cms::Exception& ex) {
454 
455  // NOTE: the warning printed as a result of ignoring or failing
456  // a module will only be printed during the full true processing
457  // pass of this module
458 
459  // Get the action corresponding to this exception. However, if processing
460  // something other than an event (e.g. run, lumi) always rethrow.
462 
463  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
464  // as IgnoreCompletely, so any subsequent OutputModules are still run.
465  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
467  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
468  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
469 
472  }
473  switch(action) {
475  rc = true;
476  ++timesPassed_;
477  state_ = Pass;
478  exceptionContext<T>(ep, ex, &moduleCallingContext_);
479  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
480  break;
481  default:
482  if (T::isEvent_) ++timesExcept_;
483  state_ = Exception;
484  cached_exception_.reset(ex.clone());
485  cached_exception_->raise();
486  }
487  }
488  return rc;
489  }
490 }
491 #endif
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:372
void pathFinished(EventPrincipal &)
Definition: Worker.cc:189
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:99
ModuleDescription const & description() const
Definition: Worker.h:97
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:95
ModuleCallingContext const * getTopModuleCallingContext() const
virtual void implPostForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)=0
virtual ~Worker()
Definition: Worker.cc:88
void endJob()
Definition: Worker.cc:121
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:357
boost::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:189
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:385
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:317
int timesPassed() const
Definition: Worker.h:125
void setActivityRegistry(boost::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:91
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
State state() const
Definition: Worker.h:128
void clearCounters()
Definition: Worker.h:117
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
bool doWork(typename T::MyPrincipal &, EventSetup const &c, CPUTimer *const timer, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:397
exception_actions::ActionCodes find(const std::string &category) const
unsigned int ProductHolderIndex
ActivityRegistry * activityRegistry()
Definition: Worker.h:160
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:137
virtual bool implDoStreamBegin(StreamID id, RunPrincipal &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
virtual void implPreForkReleaseResources()=0
virtual std::string workerType() const =0
virtual void itemsToGet(BranchType, std::vector< ProductHolderIndexAndSkipBit > &) const =0
ExceptionToActionTable const * actions_
Definition: Worker.h:186
std::pair< double, double > timeCpuReal() const
Definition: Worker.h:113
EarlyDeleteHelper * earlyDeleteHelper_
Definition: Worker.h:191
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:184
BranchType
Definition: BranchType.h:11
static bool call(Worker *iWorker, StreamID id, RunPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:331
bool isEndPath() const
Definition: PathContext.h:42
int timesExcept() const
Definition: Worker.h:127
int timesExcept_
Definition: Worker.h:181
virtual bool implDoBegin(RunPrincipal &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
virtual void updateLookup(BranchType iBranchType, ProductHolderIndexHelper const &)=0
Worker & operator=(Worker const &)=delete
boost::shared_ptr< CPUTimer > StopwatchPointer
Definition: RunStopwatch.h:22
void reset()
Definition: Worker.h:92
virtual bool implDo(EventPrincipal &, EventSetup const &c, ModuleCallingContext const *mcc)=0
int timesVisited() const
Definition: Worker.h:124
PathContext const * pathContext() const
int timesPassed_
Definition: Worker.h:179
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:291
ModuleDescription const * descPtr() const
Definition: Worker.h:98
ModuleDescription const * moduleDescription() const
void useStopwatch()
Definition: Worker.cc:185
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e, edm::JobReport *jobRep=0, int rc=-1)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:371
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:71
int timesRun() const
Definition: Worker.h:123
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:384
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:345
static bool call(Worker *iWorker, StreamID id, RunPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:305
virtual void implEndJob()=0
virtual std::vector< ProductHolderIndexAndSkipBit > const & itemsToGetFromEvent() const =0
virtual void modulesDependentUpon(std::vector< const char * > &oModuleLabels) const =0
areg
Definition: Schedule.cc:369
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:90
virtual Types moduleType() const =0
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:277
void postDoEvent(EventPrincipal &)
Definition: Worker.cc:194
boost::shared_ptr< cms::Exception > cached_exception_
Definition: Worker.h:187
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:304
virtual void implBeginJob()=0
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:86
virtual void itemsMayGet(BranchType, std::vector< ProductHolderIndexAndSkipBit > &) const =0
int timesRun_
Definition: Worker.h:177
int timesPass() const
Definition: Worker.h:130
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
virtual void implEndStream(StreamID)=0
int timesFailed() const
Definition: Worker.h:126
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:87
void preForkReleaseResources()
Definition: Worker.h:89
double a
Definition: hdecay.h:121
void beginJob()
Definition: Worker.cc:105
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:161
static bool call(Worker *iWorker, StreamID, EventPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:278
RunStopwatch::StopwatchPointer stopwatch_
Definition: Worker.h:175
int timesFailed_
Definition: Worker.h:180
auto wrap(F iFunc) -> decltype(iFunc())
virtual bool implDoEnd(RunPrincipal &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:344
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:358
PlaceInPathContext const * placeInPathContext() const
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:330
virtual bool implDoStreamEnd(StreamID id, RunPrincipal &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
State state_
Definition: Worker.h:182
int timesVisited_
Definition: Worker.h:178
static bool call(Worker *iWorker, StreamID, RunPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:292
static bool call(Worker *iWorker, StreamID, RunPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:318