CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
43 
44 #include "boost/shared_ptr.hpp"
45 
48 
49 #include <memory>
50 #include <sstream>
51 #include <vector>
52 
53 namespace edm {
54  class EventPrincipal;
55  class EarlyDeleteHelper;
56  class ProductHolderIndexHelper;
57  class ProductHolderIndexAndSkipBit;
58  class StreamID;
59  class StreamContext;
60 
61  namespace workerhelper {
62  template< typename O> class CallImpl;
63  }
64 
65  class Worker {
66  public:
67  enum State { Ready, Pass, Fail, Exception };
69 
70  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
71  virtual ~Worker();
72 
73  Worker(Worker const&) = delete; // Disallow copying and moving
74  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
75 
76  template <typename T>
77  bool doWork(typename T::MyPrincipal&, EventSetup const& c,
78  CPUTimer *const timer,
79  StreamID stream,
80  ParentContext const& parentContext,
81  typename T::Context const* context);
82  void beginJob() ;
83  void endJob();
84  void beginStream(StreamID id, StreamContext& streamContext);
85  void endStream(StreamID id, StreamContext& streamContext);
88 
90  void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
91 
92  void reset() { state_ = Ready; }
93 
96 
101  void setActivityRegistry(boost::shared_ptr<ActivityRegistry> areg);
102 
104 
105  //Used to make EDGetToken work
106  virtual void updateLookup(BranchType iBranchType,
107  ProductHolderIndexHelper const&) = 0;
108 
109 
110  virtual Types moduleType() const =0;
111 
112  std::pair<double, double> timeCpuReal() const {
113  return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
114  }
115 
116  void clearCounters() {
118  }
119 
120  void useStopwatch();
121 
122  int timesRun() const { return timesRun_; }
123  int timesVisited() const { return timesVisited_; }
124  int timesPassed() const { return timesPassed_; }
125  int timesFailed() const { return timesFailed_; }
126  int timesExcept() const { return timesExcept_; }
127  State state() const { return state_; }
128 
129  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
130 
131  protected:
132  template<typename O> friend class workerhelper::CallImpl;
133  virtual std::string workerType() const = 0;
134  virtual bool implDo(EventPrincipal&, EventSetup const& c,
135  ModuleCallingContext const* mcc) = 0;
136  virtual bool implDoBegin(RunPrincipal& rp, EventSetup const& c,
137  ModuleCallingContext const* mcc) = 0;
138  virtual bool implDoStreamBegin(StreamID id, RunPrincipal& rp, EventSetup const& c,
139  ModuleCallingContext const* mcc) = 0;
140  virtual bool implDoStreamEnd(StreamID id, RunPrincipal& rp, EventSetup const& c,
141  ModuleCallingContext const* mcc) = 0;
142  virtual bool implDoEnd(RunPrincipal& rp, EventSetup const& c,
143  ModuleCallingContext const* mcc) = 0;
144  virtual bool implDoBegin(LuminosityBlockPrincipal& lbp, EventSetup const& c,
145  ModuleCallingContext const* mcc) = 0;
146  virtual bool implDoStreamBegin(StreamID id, LuminosityBlockPrincipal& lbp, EventSetup const& c,
147  ModuleCallingContext const* mcc) = 0;
148  virtual bool implDoStreamEnd(StreamID id, LuminosityBlockPrincipal& lbp, EventSetup const& c,
149  ModuleCallingContext const* mcc) = 0;
150  virtual bool implDoEnd(LuminosityBlockPrincipal& lbp, EventSetup const& c,
151  ModuleCallingContext const* mcc) = 0;
152  virtual void implBeginJob() = 0;
153  virtual void implEndJob() = 0;
154  virtual void implBeginStream(StreamID) = 0;
155  virtual void implEndStream(StreamID) = 0;
156 
158 
159  private:
160 
161  virtual void itemsToGet(BranchType, std::vector<ProductHolderIndexAndSkipBit>&) const = 0;
162  virtual void itemsMayGet(BranchType, std::vector<ProductHolderIndexAndSkipBit>&) const = 0;
163 
164  virtual std::vector<ProductHolderIndexAndSkipBit> const& itemsToGetFromEvent() const = 0;
165 
166  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
167  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
168 
169  virtual void implPreForkReleaseResources() = 0;
170  virtual void implPostForkReacquireResources(unsigned int iChildIndex,
171  unsigned int iNumberOfChildren) = 0;
173 
180 
182 
183  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
184  boost::shared_ptr<cms::Exception> cached_exception_; // if state is 'exception'
185 
186  boost::shared_ptr<ActivityRegistry> actReg_;
187 
189  };
190 
191  namespace {
192  template <typename T>
193  class ModuleSignalSentry {
194  public:
195  ModuleSignalSentry(ActivityRegistry *a,
196  ModuleDescription const& md,
197  typename T::Context const* context,
198  ModuleCallingContext* moduleCallingContext) :
199  a_(a), md_(&md), context_(context), moduleCallingContext_(moduleCallingContext) {
200 
201  if(a_) T::preModuleSignal(a_, md_, context, moduleCallingContext_);
202  }
203 
204  ~ModuleSignalSentry() {
205  if(a_) T::postModuleSignal(a_, md_, context_, moduleCallingContext_);
206  }
207 
208  private:
209  ActivityRegistry* a_;
210  ModuleDescription const* md_;
211  typename T::Context const* context_;
212  ModuleCallingContext* moduleCallingContext_;
213  };
214 
215  template <typename T>
216  void exceptionContext(typename T::MyPrincipal const& principal,
217  cms::Exception& ex,
218  ModuleCallingContext const* mcc) {
219 
220  ModuleCallingContext const* imcc = mcc;
221  while(imcc->type() == ParentContext::Type::kModule) {
222  std::ostringstream iost;
223  iost << "Calling method for unscheduled module "
224  << imcc->moduleDescription()->moduleName() << "/'"
225  << imcc->moduleDescription()->moduleLabel() << "'";
226  ex.addContext(iost.str());
227  imcc = imcc->moduleCallingContext();
228  }
229  if(imcc->type() == ParentContext::Type::kInternal) {
230  std::ostringstream iost;
231  iost << "Calling method for unscheduled module "
232  << imcc->moduleDescription()->moduleName() << "/'"
233  << imcc->moduleDescription()->moduleLabel() << "' (probably inside some kind of mixing module)";
234  ex.addContext(iost.str());
235  imcc = imcc->internalContext()->moduleCallingContext();
236  }
237  while(imcc->type() == ParentContext::Type::kModule) {
238  std::ostringstream iost;
239  iost << "Calling method for unscheduled module "
240  << imcc->moduleDescription()->moduleName() << "/'"
241  << imcc->moduleDescription()->moduleLabel() << "'";
242  ex.addContext(iost.str());
243  imcc = imcc->moduleCallingContext();
244  }
245  std::ostringstream ost;
246  if (T::isEvent_) {
247  ost << "Calling event method";
248  }
249  else {
250  // It should be impossible to get here, because
251  // this function only gets called when the IgnoreCompletely
252  // exception behavior is active, which can only be true
253  // for events.
254  ost << "Calling unknown function";
255  }
256  ost << " for module " << imcc->moduleDescription()->moduleName() << "/'" << imcc->moduleDescription()->moduleLabel() << "'";
257  ex.addContext(ost.str());
258 
259  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
260  ost.str("");
261  ost << "Running path '";
262  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
263  ex.addContext(ost.str());
264  }
265  ost.str("");
266  ost << "Processing ";
267  ost << principal.id();
268  ex.addContext(ost.str());
269  }
270  }
271 
272  namespace workerhelper {
273  template<>
275  public:
276  static bool call(Worker* iWorker, StreamID, EventPrincipal& ep, EventSetup const& es,
277  ModuleCallingContext const* mcc) {
278  return iWorker->implDo(ep,es, mcc);
279  }
280  };
281 
282  template<>
284  public:
285  static bool call(Worker* iWorker,StreamID, RunPrincipal& ep, EventSetup const& es,
286  ModuleCallingContext const* mcc) {
287  return iWorker->implDoBegin(ep,es, mcc);
288  }
289  };
290  template<>
292  public:
293  static bool call(Worker* iWorker,StreamID id, RunPrincipal& ep, EventSetup const& es,
294  ModuleCallingContext const* mcc) {
295  return iWorker->implDoStreamBegin(id,ep,es, mcc);
296  }
297  };
298  template<>
300  public:
301  static bool call(Worker* iWorker,StreamID, RunPrincipal& ep, EventSetup const& es,
302  ModuleCallingContext const* mcc) {
303  return iWorker->implDoEnd(ep,es, mcc);
304  }
305  };
306  template<>
308  public:
309  static bool call(Worker* iWorker,StreamID id, RunPrincipal& ep, EventSetup const& es,
310  ModuleCallingContext const* mcc) {
311  return iWorker->implDoStreamEnd(id,ep,es, mcc);
312  }
313  };
314 
315  template<>
317  public:
318  static bool call(Worker* iWorker,StreamID, LuminosityBlockPrincipal& ep, EventSetup const& es,
319  ModuleCallingContext const* mcc) {
320  return iWorker->implDoBegin(ep,es, mcc);
321  }
322  };
323  template<>
325  public:
326  static bool call(Worker* iWorker,StreamID id, LuminosityBlockPrincipal& ep, EventSetup const& es,
327  ModuleCallingContext const* mcc) {
328  return iWorker->implDoStreamBegin(id,ep,es, mcc);
329  }
330  };
331 
332  template<>
334  public:
335  static bool call(Worker* iWorker,StreamID, LuminosityBlockPrincipal& ep, EventSetup const& es,
336  ModuleCallingContext const* mcc) {
337  return iWorker->implDoEnd(ep,es, mcc);
338  }
339  };
340  template<>
342  public:
343  static bool call(Worker* iWorker,StreamID id, LuminosityBlockPrincipal& ep, EventSetup const& es,
344  ModuleCallingContext const* mcc) {
345  return iWorker->implDoStreamEnd(id,ep,es, mcc);
346  }
347  };
348  }
349 
350  template <typename T>
351  bool Worker::doWork(typename T::MyPrincipal& ep,
352  EventSetup const& es,
353  CPUTimer* const iTimer,
354  StreamID streamID,
355  ParentContext const& parentContext,
356  typename T::Context const* context) {
357 
358  // A RunStopwatch, but only if we are processing an event.
360  iTimer);
361 
362  if (T::isEvent_) {
363  ++timesVisited_;
364  }
365  bool rc = false;
366 
367  switch(state_) {
368  case Ready: break;
369  case Pass: return true;
370  case Fail: return false;
371  case Exception: {
372  cached_exception_->raise();
373  }
374  }
375 
376  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
377 
378  try {
379  try {
380 
381  if (T::isEvent_) {
382  ++timesRun_;
383 
384  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
385  std::vector<ProductHolderIndexAndSkipBit> const& items = itemsToGetFromEvent();
386  for(auto const& item : items) {
387  ProductHolderIndex productHolderIndex = item.productHolderIndex();
388  bool skipCurrentProcess = item.skipCurrentProcess();
389  if(productHolderIndex != ProductHolderIndexAmbiguous) {
390  ep.prefetch(productHolderIndex, skipCurrentProcess, &moduleCallingContext_);
391  }
392  }
393  }
394 
396  ModuleSignalSentry<T> cpp(actReg_.get(), description(), context, &moduleCallingContext_);
397  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, &moduleCallingContext_);
398 
399  if (rc) {
400  state_ = Pass;
401  if (T::isEvent_) ++timesPassed_;
402  } else {
403  state_ = Fail;
404  if (T::isEvent_) ++timesFailed_;
405  }
406  }
407  catch (cms::Exception& e) { throw; }
408  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
409  catch (std::exception& e) { convertException::stdToEDM(e); }
411  catch(char const* c) { convertException::charPtrToEDM(c); }
412  catch (...) { convertException::unknownToEDM(); }
413  }
414  catch(cms::Exception& ex) {
415 
416  // NOTE: the warning printed as a result of ignoring or failing
417  // a module will only be printed during the full true processing
418  // pass of this module
419 
420  // Get the action corresponding to this exception. However, if processing
421  // something other than an event (e.g. run, lumi) always rethrow.
423 
424  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
425  // as IgnoreCompletely, so any subsequent OutputModules are still run.
426  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
428  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
429  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
430 
433  }
434  switch(action) {
436  rc = true;
437  ++timesPassed_;
438  state_ = Pass;
439  exceptionContext<T>(ep, ex, &moduleCallingContext_);
440  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
441  break;
442  default:
443  if (T::isEvent_) ++timesExcept_;
444  state_ = Exception;
445  cached_exception_.reset(ex.clone());
446  cached_exception_->raise();
447  }
448  }
449  return rc;
450  }
451 }
452 #endif
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal &ep, EventSetup const &es, ModuleCallingContext const *mcc)
Definition: Worker.h:343
void pathFinished(EventPrincipal &)
Definition: Worker.cc:219
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:105
ModuleDescription const & description() const
Definition: Worker.h:97
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:101
ModuleCallingContext const * getTopModuleCallingContext() const
virtual void implPostForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)=0
virtual ~Worker()
Definition: Worker.cc:94
void endJob()
Definition: Worker.cc:133
boost::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:186
static bool call(Worker *iWorker, StreamID, EventPrincipal &ep, EventSetup const &es, ModuleCallingContext const *mcc)
Definition: Worker.h:276
int timesPassed() const
Definition: Worker.h:124
void setActivityRegistry(boost::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:97
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
State state() const
Definition: Worker.h:127
void clearCounters()
Definition: Worker.h:116
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
bool doWork(typename T::MyPrincipal &, EventSetup const &c, CPUTimer *const timer, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:351
exception_actions::ActionCodes find(const std::string &category) const
unsigned int ProductHolderIndex
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:155
virtual bool implDoStreamBegin(StreamID id, RunPrincipal &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
virtual void implPreForkReleaseResources()=0
virtual std::string workerType() const =0
virtual void itemsToGet(BranchType, std::vector< ProductHolderIndexAndSkipBit > &) const =0
ExceptionToActionTable const * actions_
Definition: Worker.h:183
std::pair< double, double > timeCpuReal() const
Definition: Worker.h:112
EarlyDeleteHelper * earlyDeleteHelper_
Definition: Worker.h:188
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:181
BranchType
Definition: BranchType.h:11
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal &ep, EventSetup const &es, ModuleCallingContext const *mcc)
Definition: Worker.h:318
bool isEndPath() const
Definition: PathContext.h:42
int timesExcept() const
Definition: Worker.h:126
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal &ep, EventSetup const &es, ModuleCallingContext const *mcc)
Definition: Worker.h:326
int timesExcept_
Definition: Worker.h:178
virtual bool implDoBegin(RunPrincipal &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
virtual void updateLookup(BranchType iBranchType, ProductHolderIndexHelper const &)=0
Worker & operator=(Worker const &)=delete
boost::shared_ptr< CPUTimer > StopwatchPointer
Definition: RunStopwatch.h:22
void reset()
Definition: Worker.h:92
virtual bool implDo(EventPrincipal &, EventSetup const &c, ModuleCallingContext const *mcc)=0
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal &ep, EventSetup const &es, ModuleCallingContext const *mcc)
Definition: Worker.h:335
int timesVisited() const
Definition: Worker.h:123
void stdToEDM(std::exception const &e)
PathContext const * pathContext() const
int timesPassed_
Definition: Worker.h:176
ModuleDescription const * descPtr() const
Definition: Worker.h:98
static bool call(Worker *iWorker, StreamID id, RunPrincipal &ep, EventSetup const &es, ModuleCallingContext const *mcc)
Definition: Worker.h:309
ModuleDescription const * moduleDescription() const
void useStopwatch()
Definition: Worker.cc:215
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e, edm::JobReport *jobRep=0, int rc=-1)
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:77
static bool call(Worker *iWorker, StreamID, RunPrincipal &ep, EventSetup const &es, ModuleCallingContext const *mcc)
Definition: Worker.h:285
int timesRun() const
Definition: Worker.h:122
static bool call(Worker *iWorker, StreamID id, RunPrincipal &ep, EventSetup const &es, ModuleCallingContext const *mcc)
Definition: Worker.h:293
virtual void implEndJob()=0
virtual std::vector< ProductHolderIndexAndSkipBit > const & itemsToGetFromEvent() const =0
areg
Definition: Schedule.cc:362
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:90
virtual Types moduleType() const =0
void postDoEvent(EventPrincipal &)
Definition: Worker.cc:224
void charPtrToEDM(char const *c)
void stringToEDM(std::string &s)
boost::shared_ptr< cms::Exception > cached_exception_
Definition: Worker.h:184
virtual void implBeginJob()=0
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:86
virtual void itemsMayGet(BranchType, std::vector< ProductHolderIndexAndSkipBit > &) const =0
int timesRun_
Definition: Worker.h:174
int timesPass() const
Definition: Worker.h:129
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
virtual void implEndStream(StreamID)=0
int timesFailed() const
Definition: Worker.h:125
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:87
void preForkReleaseResources()
Definition: Worker.h:89
double a
Definition: hdecay.h:121
void beginJob()
Definition: Worker.cc:111
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:185
RunStopwatch::StopwatchPointer stopwatch_
Definition: Worker.h:172
int timesFailed_
Definition: Worker.h:177
static bool call(Worker *iWorker, StreamID, RunPrincipal &ep, EventSetup const &es, ModuleCallingContext const *mcc)
Definition: Worker.h:301
virtual bool implDoEnd(RunPrincipal &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
PlaceInPathContext const * placeInPathContext() const
virtual bool implDoStreamEnd(StreamID id, RunPrincipal &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
State state_
Definition: Worker.h:179
int timesVisited_
Definition: Worker.h:175