CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
43 
45 
46 #include <memory>
47 #include <sstream>
48 #include <vector>
49 
50 namespace edm {
51  class EventPrincipal;
52  class EarlyDeleteHelper;
53  class ProductHolderIndexHelper;
54  class ProductHolderIndexAndSkipBit;
55  class StreamID;
56  class StreamContext;
57  class ProductRegistry;
58  class ThinnedAssociationsHelper;
59 
60  namespace workerhelper {
61  template< typename O> class CallImpl;
62  }
63 
64  class Worker {
65  public:
66  enum State { Ready, Pass, Fail, Exception };
68 
69  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
70  virtual ~Worker();
71 
72  Worker(Worker const&) = delete; // Disallow copying and moving
73  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
74 
75  template <typename T>
76  bool doWork(typename T::MyPrincipal&, EventSetup const& c,
78  ParentContext const& parentContext,
79  typename T::Context const* context);
80  void beginJob() ;
81  void endJob();
82  void beginStream(StreamID id, StreamContext& streamContext);
83  void endStream(StreamID id, StreamContext& streamContext);
86 
88  void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
90 
91  void reset() { state_ = Ready; }
92 
95 
100  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
101 
103 
104  //Used to make EDGetToken work
105  virtual void updateLookup(BranchType iBranchType,
106  ProductHolderIndexHelper const&) = 0;
107 
108  virtual void modulesDependentUpon(std::vector<const char*>& oModuleLabels) const = 0;
109 
110  virtual Types moduleType() const =0;
111 
112  void clearCounters() {
114  }
115 
116  int timesRun() const { return timesRun_; }
117  int timesVisited() const { return timesVisited_; }
118  int timesPassed() const { return timesPassed_; }
119  int timesFailed() const { return timesFailed_; }
120  int timesExcept() const { return timesExcept_; }
121  State state() const { return state_; }
122 
123  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
124 
125  protected:
126  template<typename O> friend class workerhelper::CallImpl;
127  virtual std::string workerType() const = 0;
128  virtual bool implDo(EventPrincipal&, EventSetup const& c,
129  ModuleCallingContext const* mcc) = 0;
130  virtual bool implDoBegin(RunPrincipal& rp, EventSetup const& c,
131  ModuleCallingContext const* mcc) = 0;
132  virtual bool implDoStreamBegin(StreamID id, RunPrincipal& rp, EventSetup const& c,
133  ModuleCallingContext const* mcc) = 0;
134  virtual bool implDoStreamEnd(StreamID id, RunPrincipal& rp, EventSetup const& c,
135  ModuleCallingContext const* mcc) = 0;
136  virtual bool implDoEnd(RunPrincipal& rp, EventSetup const& c,
137  ModuleCallingContext const* mcc) = 0;
138  virtual bool implDoBegin(LuminosityBlockPrincipal& lbp, EventSetup const& c,
139  ModuleCallingContext const* mcc) = 0;
140  virtual bool implDoStreamBegin(StreamID id, LuminosityBlockPrincipal& lbp, EventSetup const& c,
141  ModuleCallingContext const* mcc) = 0;
142  virtual bool implDoStreamEnd(StreamID id, LuminosityBlockPrincipal& lbp, EventSetup const& c,
143  ModuleCallingContext const* mcc) = 0;
144  virtual bool implDoEnd(LuminosityBlockPrincipal& lbp, EventSetup const& c,
145  ModuleCallingContext const* mcc) = 0;
146  virtual void implBeginJob() = 0;
147  virtual void implEndJob() = 0;
148  virtual void implBeginStream(StreamID) = 0;
149  virtual void implEndStream(StreamID) = 0;
150 
152 
154 
155  private:
156 
157  virtual void itemsToGet(BranchType, std::vector<ProductHolderIndexAndSkipBit>&) const = 0;
158  virtual void itemsMayGet(BranchType, std::vector<ProductHolderIndexAndSkipBit>&) const = 0;
159 
160  virtual std::vector<ProductHolderIndexAndSkipBit> const& itemsToGetFromEvent() const = 0;
161 
162  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
163  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
164 
165  virtual void implPreForkReleaseResources() = 0;
166  virtual void implPostForkReacquireResources(unsigned int iChildIndex,
167  unsigned int iNumberOfChildren) = 0;
169 
176 
178 
179  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
180  std::shared_ptr<cms::Exception> cached_exception_; // if state is 'exception'
181 
182  std::shared_ptr<ActivityRegistry> actReg_;
183 
185  };
186 
187  namespace {
188  template <typename T>
189  class ModuleSignalSentry {
190  public:
191  ModuleSignalSentry(ActivityRegistry *a,
192  typename T::Context const* context,
193  ModuleCallingContext const* moduleCallingContext) :
194  a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
195 
196  if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
197  }
198 
199  ~ModuleSignalSentry() {
200  if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
201  }
202 
203  private:
204  ActivityRegistry* a_;
205  typename T::Context const* context_;
206  ModuleCallingContext const* moduleCallingContext_;
207  };
208 
209  template <typename T>
210  void exceptionContext(typename T::MyPrincipal const& principal,
211  cms::Exception& ex,
212  ModuleCallingContext const* mcc) {
213 
214  ModuleCallingContext const* imcc = mcc;
215  while(imcc->type() == ParentContext::Type::kModule) {
216  std::ostringstream iost;
217  iost << "Calling method for unscheduled module "
218  << imcc->moduleDescription()->moduleName() << "/'"
219  << imcc->moduleDescription()->moduleLabel() << "'";
220  ex.addContext(iost.str());
221  imcc = imcc->moduleCallingContext();
222  }
223  if(imcc->type() == ParentContext::Type::kInternal) {
224  std::ostringstream iost;
225  iost << "Calling method for unscheduled module "
226  << imcc->moduleDescription()->moduleName() << "/'"
227  << imcc->moduleDescription()->moduleLabel() << "' (probably inside some kind of mixing module)";
228  ex.addContext(iost.str());
229  imcc = imcc->internalContext()->moduleCallingContext();
230  }
231  while(imcc->type() == ParentContext::Type::kModule) {
232  std::ostringstream iost;
233  iost << "Calling method for unscheduled module "
234  << imcc->moduleDescription()->moduleName() << "/'"
235  << imcc->moduleDescription()->moduleLabel() << "'";
236  ex.addContext(iost.str());
237  imcc = imcc->moduleCallingContext();
238  }
239  std::ostringstream ost;
240  if (T::isEvent_) {
241  ost << "Calling event method";
242  }
243  else {
244  // It should be impossible to get here, because
245  // this function only gets called when the IgnoreCompletely
246  // exception behavior is active, which can only be true
247  // for events.
248  ost << "Calling unknown function";
249  }
250  ost << " for module " << imcc->moduleDescription()->moduleName() << "/'" << imcc->moduleDescription()->moduleLabel() << "'";
251  ex.addContext(ost.str());
252 
253  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
254  ost.str("");
255  ost << "Running path '";
256  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
257  ex.addContext(ost.str());
258  }
259  ost.str("");
260  ost << "Processing ";
261  ost << principal.id();
262  ex.addContext(ost.str());
263  }
264  }
265 
266  namespace workerhelper {
267  template<>
269  public:
271  static bool call(Worker* iWorker, StreamID,
272  EventPrincipal& ep, EventSetup const& es,
273  ActivityRegistry* actReg,
274  ModuleCallingContext const* mcc,
275  Arg::Context const* context) {
276  //Signal sentry is handled by the module
277  return iWorker->implDo(ep,es, mcc);
278  }
279  };
280 
281  template<>
283  public:
285  static bool call(Worker* iWorker,StreamID,
286  RunPrincipal& ep, EventSetup const& es,
287  ActivityRegistry* actReg,
288  ModuleCallingContext const* mcc,
289  Arg::Context const* context) {
290  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
291  return iWorker->implDoBegin(ep,es, mcc);
292  }
293  };
294  template<>
296  public:
298  static bool call(Worker* iWorker,StreamID id,
299  RunPrincipal& ep, EventSetup const& es,
300  ActivityRegistry* actReg,
301  ModuleCallingContext const* mcc,
302  Arg::Context const* context) {
303  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
304  return iWorker->implDoStreamBegin(id,ep,es, mcc);
305  }
306  };
307  template<>
309  public:
311  static bool call(Worker* iWorker,StreamID,
312  RunPrincipal& ep, EventSetup const& es,
313  ActivityRegistry* actReg,
314  ModuleCallingContext const* mcc,
315  Arg::Context const* context) {
316  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
317  return iWorker->implDoEnd(ep,es, mcc);
318  }
319  };
320  template<>
322  public:
324  static bool call(Worker* iWorker,StreamID id,
325  RunPrincipal& ep, EventSetup const& es,
326  ActivityRegistry* actReg,
327  ModuleCallingContext const* mcc,
328  Arg::Context const* context) {
329  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
330  return iWorker->implDoStreamEnd(id,ep,es, mcc);
331  }
332  };
333 
334  template<>
336  public:
338  static bool call(Worker* iWorker,StreamID,
339  LuminosityBlockPrincipal& ep, EventSetup const& es,
340  ActivityRegistry* actReg,
341  ModuleCallingContext const* mcc,
342  Arg::Context const* context) {
343  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
344  return iWorker->implDoBegin(ep,es, mcc);
345  }
346  };
347  template<>
349  public:
351  static bool call(Worker* iWorker,StreamID id,
352  LuminosityBlockPrincipal& ep, EventSetup const& es,
353  ActivityRegistry* actReg,
354  ModuleCallingContext const* mcc,
355  Arg::Context const* context) {
356  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
357  return iWorker->implDoStreamBegin(id,ep,es, mcc);
358  }
359  };
360 
361  template<>
363  public:
365  static bool call(Worker* iWorker,StreamID,
366  LuminosityBlockPrincipal& ep, EventSetup const& es,
367  ActivityRegistry* actReg,
368  ModuleCallingContext const* mcc,
369  Arg::Context const* context) {
370  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
371  return iWorker->implDoEnd(ep,es, mcc);
372  }
373  };
374  template<>
376  public:
378  static bool call(Worker* iWorker,StreamID id,
379  LuminosityBlockPrincipal& ep, EventSetup const& es,
380  ActivityRegistry* actReg,
381  ModuleCallingContext const* mcc,
382  Arg::Context const* context) {
383  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
384  return iWorker->implDoStreamEnd(id,ep,es, mcc);
385  }
386  };
387  }
388 
389  template <typename T>
390  bool Worker::doWork(typename T::MyPrincipal& ep,
391  EventSetup const& es,
392  StreamID streamID,
393  ParentContext const& parentContext,
394  typename T::Context const* context) {
395 
396  if (T::isEvent_) {
397  ++timesVisited_;
398  }
399  bool rc = false;
400 
401  switch(state_) {
402  case Ready: break;
403  case Pass: return true;
404  case Fail: return false;
405  case Exception: {
406  cached_exception_->raise();
407  }
408  }
409 
410  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
411 
412  try {
413  convertException::wrap([&]() {
414 
415  if (T::isEvent_) {
416  ++timesRun_;
417 
418  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
419  std::vector<ProductHolderIndexAndSkipBit> const& items = itemsToGetFromEvent();
420  for(auto const& item : items) {
421  ProductHolderIndex productHolderIndex = item.productHolderIndex();
422  bool skipCurrentProcess = item.skipCurrentProcess();
423  if(productHolderIndex != ProductHolderIndexAmbiguous) {
424  ep.prefetch(productHolderIndex, skipCurrentProcess, &moduleCallingContext_);
425  }
426  }
427  }
428 
430  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
431 
432  if (rc) {
433  state_ = Pass;
434  if (T::isEvent_) ++timesPassed_;
435  } else {
436  state_ = Fail;
437  if (T::isEvent_) ++timesFailed_;
438  }
439  });
440  }
441  catch(cms::Exception& ex) {
442 
443  // NOTE: the warning printed as a result of ignoring or failing
444  // a module will only be printed during the full true processing
445  // pass of this module
446 
447  // Get the action corresponding to this exception. However, if processing
448  // something other than an event (e.g. run, lumi) always rethrow.
450 
451  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
452  // as IgnoreCompletely, so any subsequent OutputModules are still run.
453  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
455  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
456  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
457 
460  }
461  switch(action) {
463  rc = true;
464  ++timesPassed_;
465  state_ = Pass;
466  exceptionContext<T>(ep, ex, &moduleCallingContext_);
467  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
468  break;
469  default:
470  if (T::isEvent_) ++timesExcept_;
471  state_ = Exception;
472  cached_exception_.reset(ex.clone());
473  cached_exception_->raise();
474  }
475  }
476  return rc;
477  }
478 }
479 #endif
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:365
bool doWork(typename T::MyPrincipal &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:390
void pathFinished(EventPrincipal &)
Definition: Worker.cc:184
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:98
ModuleDescription const & description() const
Definition: Worker.h:96
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:94
ModuleCallingContext const * getTopModuleCallingContext() const
virtual void implPostForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)=0
virtual ~Worker()
Definition: Worker.cc:87
void endJob()
Definition: Worker.cc:120
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:350
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:378
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:310
int timesPassed() const
Definition: Worker.h:118
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:182
State state() const
Definition: Worker.h:121
void clearCounters()
Definition: Worker.h:112
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
unsigned int ProductHolderIndex
ActivityRegistry * activityRegistry()
Definition: Worker.h:153
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:136
virtual bool implDoStreamBegin(StreamID id, RunPrincipal &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
virtual void implPreForkReleaseResources()=0
virtual std::string workerType() const =0
virtual void itemsToGet(BranchType, std::vector< ProductHolderIndexAndSkipBit > &) const =0
ExceptionToActionTable const * actions_
Definition: Worker.h:179
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:90
EarlyDeleteHelper * earlyDeleteHelper_
Definition: Worker.h:184
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:177
BranchType
Definition: BranchType.h:11
static bool call(Worker *iWorker, StreamID id, RunPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:324
bool isEndPath() const
Definition: PathContext.h:42
int timesExcept() const
Definition: Worker.h:120
int timesExcept_
Definition: Worker.h:174
virtual bool implDoBegin(RunPrincipal &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
virtual void updateLookup(BranchType iBranchType, ProductHolderIndexHelper const &)=0
Worker & operator=(Worker const &)=delete
void reset()
Definition: Worker.h:91
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
virtual bool implDo(EventPrincipal &, EventSetup const &c, ModuleCallingContext const *mcc)=0
int timesVisited() const
Definition: Worker.h:117
PathContext const * pathContext() const
int timesPassed_
Definition: Worker.h:172
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:284
ModuleDescription const * descPtr() const
Definition: Worker.h:97
ModuleDescription const * moduleDescription() const
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.h:89
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e, edm::JobReport *jobRep=0, int rc=-1)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:364
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:71
int timesRun() const
Definition: Worker.h:116
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:377
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:338
static bool call(Worker *iWorker, StreamID id, RunPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:298
virtual void implEndJob()=0
virtual std::vector< ProductHolderIndexAndSkipBit > const & itemsToGetFromEvent() const =0
virtual void modulesDependentUpon(std::vector< const char * > &oModuleLabels) const =0
areg
Definition: Schedule.cc:370
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:88
virtual Types moduleType() const =0
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:270
void postDoEvent(EventPrincipal &)
Definition: Worker.cc:189
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:297
virtual void implBeginJob()=0
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:84
virtual void itemsMayGet(BranchType, std::vector< ProductHolderIndexAndSkipBit > &) const =0
int timesRun_
Definition: Worker.h:170
int timesPass() const
Definition: Worker.h:123
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
virtual void implEndStream(StreamID)=0
int timesFailed() const
Definition: Worker.h:119
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:85
void preForkReleaseResources()
Definition: Worker.h:87
double a
Definition: hdecay.h:121
std::shared_ptr< cms::Exception > cached_exception_
Definition: Worker.h:180
void beginJob()
Definition: Worker.cc:104
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:160
static bool call(Worker *iWorker, StreamID, EventPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:271
int timesFailed_
Definition: Worker.h:173
auto wrap(F iFunc) -> decltype(iFunc())
static Interceptor::Registry registry("Interceptor")
virtual bool implDoEnd(RunPrincipal &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:337
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:351
PlaceInPathContext const * placeInPathContext() const
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:323
virtual bool implDoStreamEnd(StreamID id, RunPrincipal &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
State state_
Definition: Worker.h:175
int timesVisited_
Definition: Worker.h:171
static bool call(Worker *iWorker, StreamID, RunPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:285
static bool call(Worker *iWorker, StreamID, RunPrincipal &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:311