CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
49 
51 
52 #include <map>
53 #include <memory>
54 #include <sstream>
55 #include <string>
56 #include <vector>
57 #include <exception>
58 #include <unordered_map>
59 
60 namespace edm {
61  class EventPrincipal;
62  class EarlyDeleteHelper;
63  class ProductResolverIndexHelper;
64  class ProductResolverIndexAndSkipBit;
65  class StreamID;
66  class StreamContext;
67  class ProductRegistry;
68  class ThinnedAssociationsHelper;
69  class WaitingTask;
70 
71  namespace workerhelper {
72  template< typename O> class CallImpl;
73  }
74 
75  class Worker {
76  public:
77  enum State { Ready, Pass, Fail, Exception };
79 
80  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
81  virtual ~Worker();
82 
83  Worker(Worker const&) = delete; // Disallow copying and moving
84  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
85 
86  template <typename T>
87  bool doWork(typename T::MyPrincipal const&, EventSetup const& c,
88  StreamID stream,
89  ParentContext const& parentContext,
90  typename T::Context const* context);
91  template <typename T>
92  void doWorkAsync(WaitingTask* task,
93  typename T::MyPrincipal const&, EventSetup const& c,
94  StreamID stream,
95  ParentContext const& parentContext,
96  typename T::Context const* context);
97 
99  waitingTasks_.add(task);
100  }
101  void skipOnPath();
102  void beginJob() ;
103  void endJob();
104  void beginStream(StreamID id, StreamContext& streamContext);
105  void endStream(StreamID id, StreamContext& streamContext);
108 
110  void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
112 
113  void reset() {
114  cached_exception_ = std::exception_ptr();
115  state_ = Ready;
117  workStarted_ = false;
119  }
120 
121  void postDoEvent(EventPrincipal const&);
122 
127  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
128 
130 
131  //Used to make EDGetToken work
132  virtual void updateLookup(BranchType iBranchType,
133  ProductResolverIndexHelper const&) = 0;
134  virtual void resolvePutIndicies(BranchType iBranchType,
135  std::unordered_multimap<std::string, edm::ProductResolverIndex> const& iIndicies) = 0;
136 
137  virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>& modules,
138  ProductRegistry const& preg,
139  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
140 
141  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
142 
143  virtual Types moduleType() const =0;
144 
145  void clearCounters() {
146  timesRun_.store(0,std::memory_order_relaxed);
147  timesVisited_.store(0,std::memory_order_relaxed);
148  timesPassed_.store(0,std::memory_order_relaxed);
149  timesFailed_.store(0,std::memory_order_relaxed);
150  timesExcept_.store(0,std::memory_order_relaxed);
151  }
152 
153  void addedToPath() {
155  }
156  //NOTE: calling state() is done to force synchronization across threads
157  int timesRun() const { return timesRun_.load(std::memory_order_relaxed); }
158  int timesVisited() const { return timesVisited_.load(std::memory_order_relaxed); }
159  int timesPassed() const { return timesPassed_.load(std::memory_order_relaxed); }
160  int timesFailed() const { return timesFailed_.load(std::memory_order_relaxed); }
161  int timesExcept() const { return timesExcept_.load(std::memory_order_relaxed); }
162  State state() const { return state_; }
163 
164  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
165 
166  protected:
167  template<typename O> friend class workerhelper::CallImpl;
168  virtual std::string workerType() const = 0;
169  virtual bool implDo(EventPrincipal const&, EventSetup const& c,
170  ModuleCallingContext const* mcc) = 0;
171  virtual bool implDoPrePrefetchSelection(StreamID id,
172  EventPrincipal const& ep,
173  ModuleCallingContext const* mcc) = 0;
174  virtual bool implDoBegin(RunPrincipal const& rp, EventSetup const& c,
175  ModuleCallingContext const* mcc) = 0;
176  virtual bool implDoStreamBegin(StreamID id, RunPrincipal const& rp, EventSetup const& c,
177  ModuleCallingContext const* mcc) = 0;
178  virtual bool implDoStreamEnd(StreamID id, RunPrincipal const& rp, EventSetup const& c,
179  ModuleCallingContext const* mcc) = 0;
180  virtual bool implDoEnd(RunPrincipal const& rp, EventSetup const& c,
181  ModuleCallingContext const* mcc) = 0;
182  virtual bool implDoBegin(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
183  ModuleCallingContext const* mcc) = 0;
184  virtual bool implDoStreamBegin(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
185  ModuleCallingContext const* mcc) = 0;
186  virtual bool implDoStreamEnd(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
187  ModuleCallingContext const* mcc) = 0;
188  virtual bool implDoEnd(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
189  ModuleCallingContext const* mcc) = 0;
190  virtual void implBeginJob() = 0;
191  virtual void implEndJob() = 0;
192  virtual void implBeginStream(StreamID) = 0;
193  virtual void implEndStream(StreamID) = 0;
194 
196 
198 
199  private:
200 
201  template <typename T>
202  bool runModule(typename T::MyPrincipal const&, EventSetup const& c,
203  StreamID stream,
204  ParentContext const& parentContext,
205  typename T::Context const* context);
206 
207  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
208  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
209 
210  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFromEvent() const = 0;
211 
212  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
213 
214  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
215  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
216 
217  virtual void implPreForkReleaseResources() = 0;
218  virtual void implPostForkReacquireResources(unsigned int iChildIndex,
219  unsigned int iNumberOfChildren) = 0;
221 
223 
224  static void exceptionContext(const std::string& iID,
225  bool iIsEvent,
226  cms::Exception& ex,
227  ModuleCallingContext const* mcc);
228 
229  /*This base class is used to hide the differences between the ID used
230  for Event, LuminosityBlock and Run. Using the base class allows us
231  to only convert the ID to string form if it is actually needed in
232  the call to shouldRethrowException.
233  */
235  public:
236  virtual std::string value() const = 0;
237  };
238 
239  template< typename T>
241  public:
242  TransitionIDValue(T const& iP): p_(iP) {}
243  virtual std::string value() const override {
244  std::ostringstream iost;
245  iost<<p_.id();
246  return iost.str();
247  }
248  private:
249  T const& p_;
250 
251  };
252 
254  ParentContext const& parentContext,
255  bool isEvent,
256  TransitionIDValueBase const& iID) const;
257 
258  template<bool IS_EVENT>
259  bool setPassed() {
260  if(IS_EVENT) {
261  timesPassed_.fetch_add(1,std::memory_order_relaxed);
262  }
263  state_ = Pass;
264  return true;
265  }
266 
267  template<bool IS_EVENT>
268  bool setFailed() {
269  if(IS_EVENT) {
270  timesFailed_.fetch_add(1,std::memory_order_relaxed);
271  }
272  state_ = Fail;
273  return false;
274  }
275 
276  template<bool IS_EVENT>
277  std::exception_ptr setException(std::exception_ptr iException) {
278  if (IS_EVENT) {
279  timesExcept_.fetch_add(1,std::memory_order_relaxed);
280  }
281  cached_exception_ = iException; // propagate_const<T> has no reset() function
282  state_ = Exception;
283  return cached_exception_;
284  }
285 
287  ParentContext const& parentContext,
288  Principal const& );
289 
291  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
292  }
293 
294  template<typename T>
295  void runModuleAfterAsyncPrefetch(std::exception_ptr const * iEPtr,
296  typename T::MyPrincipal const& ep,
297  EventSetup const& es,
298  StreamID streamID,
299  ParentContext const& parentContext,
300  typename T::Context const* context);
301 
302  template< typename T>
303  class RunModuleTask : public WaitingTask {
304  public:
306  typename T::MyPrincipal const& ep,
307  EventSetup const& es,
308  StreamID streamID,
309  ParentContext const& parentContext,
310  typename T::Context const* context):
311  m_worker(worker),
312  m_principal(ep),
313  m_es(es),
314  m_streamID(streamID),
315  m_parentContext(parentContext),
316  m_context(context),
317  m_serviceToken(ServiceRegistry::instance().presentToken()) {}
318 
319  tbb::task* execute() override {
320  //Need to make the services available early so other services can see them
322 
323  //incase the emit causes an exception, we need a memory location
324  // to hold the exception_ptr
325  std::exception_ptr temp_excptr;
326  auto excptr = exceptionPtr();
327  try {
328  //pre was called in prefetchAsync
330  }catch(...) {
331  temp_excptr = std::current_exception();
332  if(not excptr) {
333  excptr = &temp_excptr;
334  }
335  }
336 
337  if( not excptr) {
338  if(auto queue = m_worker->serializeRunModule()) {
339  Worker* worker = m_worker;
340  auto const & principal = m_principal;
341  auto& es = m_es;
342  auto streamID = m_streamID;
343  auto parentContext = m_parentContext;
344  auto serviceToken = m_serviceToken;
345  auto sContext = m_context;
346  queue->push( [worker, &principal, &es, streamID,parentContext,sContext, serviceToken]()
347  {
348  //Need to make the services available
349  ServiceRegistry::Operate guard(serviceToken);
350 
351  std::exception_ptr* ptr = nullptr;
352  worker->runModuleAfterAsyncPrefetch<T>(ptr,
353  principal,
354  es,
355  streamID,
356  parentContext,
357  sContext);
358  });
359  return nullptr;
360  }
361  }
362 
364  m_principal,
365  m_es,
366  m_streamID,
368  m_context);
369  return nullptr;
370  }
371 
372  private:
374  typename T::MyPrincipal const& m_principal;
375  EventSetup const& m_es;
378  typename T::Context const* m_context;
380  };
381 
382  std::atomic<int> timesRun_;
383  std::atomic<int> timesVisited_;
384  std::atomic<int> timesPassed_;
385  std::atomic<int> timesFailed_;
386  std::atomic<int> timesExcept_;
387  std::atomic<State> state_;
389  std::atomic<int> numberOfPathsLeftToRun_;
390 
392 
393  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
394  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
395 
396  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
397 
399 
401  std::atomic<bool> workStarted_;
402  };
403 
404  namespace {
405  template <typename T>
406  class ModuleSignalSentry {
407  public:
408  ModuleSignalSentry(ActivityRegistry *a,
409  typename T::Context const* context,
410  ModuleCallingContext const* moduleCallingContext) :
411  a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
412 
413  if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
414  }
415 
416  ~ModuleSignalSentry() {
417  if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
418  }
419 
420  private:
421  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
422  typename T::Context const* context_;
424  };
425 
426  }
427 
428  namespace workerhelper {
429  template<>
431  public:
433  static bool call(Worker* iWorker, StreamID,
434  EventPrincipal const& ep, EventSetup const& es,
435  ActivityRegistry* /* actReg */,
436  ModuleCallingContext const* mcc,
437  Arg::Context const* /* context*/) {
438  //Signal sentry is handled by the module
439  return iWorker->implDo(ep,es, mcc);
440  }
441  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
442  typename Arg::MyPrincipal const& ep,
443  ModuleCallingContext const* mcc) {
444  return iWorker->implDoPrePrefetchSelection(id,ep,mcc);
445  }
446  };
447 
448  template<>
450  public:
452  static bool call(Worker* iWorker,StreamID,
453  RunPrincipal const& ep, EventSetup const& es,
454  ActivityRegistry* actReg,
455  ModuleCallingContext const* mcc,
456  Arg::Context const* context) {
457  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
458  return iWorker->implDoBegin(ep,es, mcc);
459  }
460  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
461  typename Arg::MyPrincipal const& ep,
462  ModuleCallingContext const* mcc) {
463  return true;
464  }
465  };
466  template<>
468  public:
470  static bool call(Worker* iWorker,StreamID id,
471  RunPrincipal const & ep, EventSetup const& es,
472  ActivityRegistry* actReg,
473  ModuleCallingContext const* mcc,
474  Arg::Context const* context) {
475  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
476  return iWorker->implDoStreamBegin(id,ep,es, mcc);
477  }
478  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
479  typename Arg::MyPrincipal const& ep,
480  ModuleCallingContext const* mcc) {
481  return true;
482  }
483  };
484  template<>
486  public:
488  static bool call(Worker* iWorker,StreamID,
489  RunPrincipal const& ep, EventSetup const& es,
490  ActivityRegistry* actReg,
491  ModuleCallingContext const* mcc,
492  Arg::Context const* context) {
493  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
494  return iWorker->implDoEnd(ep,es, mcc);
495  }
496  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
497  typename Arg::MyPrincipal const& ep,
498  ModuleCallingContext const* mcc) {
499  return true;
500  }
501  };
502  template<>
504  public:
506  static bool call(Worker* iWorker,StreamID id,
507  RunPrincipal const& ep, EventSetup const& es,
508  ActivityRegistry* actReg,
509  ModuleCallingContext const* mcc,
510  Arg::Context const* context) {
511  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
512  return iWorker->implDoStreamEnd(id,ep,es, mcc);
513  }
514  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
515  typename Arg::MyPrincipal const& ep,
516  ModuleCallingContext const* mcc) {
517  return true;
518  }
519  };
520 
521  template<>
523  public:
525  static bool call(Worker* iWorker,StreamID,
526  LuminosityBlockPrincipal const& ep, EventSetup const& es,
527  ActivityRegistry* actReg,
528  ModuleCallingContext const* mcc,
529  Arg::Context const* context) {
530  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
531  return iWorker->implDoBegin(ep,es, mcc);
532  }
533 
534  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
535  typename Arg::MyPrincipal const& ep,
536  ModuleCallingContext const* mcc) {
537  return true;
538  }
539  };
540  template<>
542  public:
544  static bool call(Worker* iWorker,StreamID id,
545  LuminosityBlockPrincipal const& ep, EventSetup const& es,
546  ActivityRegistry* actReg,
547  ModuleCallingContext const* mcc,
548  Arg::Context const* context) {
549  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
550  return iWorker->implDoStreamBegin(id,ep,es, mcc);
551  }
552 
553  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
554  typename Arg::MyPrincipal const& ep,
555  ModuleCallingContext const* mcc) {
556  return true;
557  }
558 };
559 
560  template<>
562  public:
564  static bool call(Worker* iWorker,StreamID,
565  LuminosityBlockPrincipal const& ep, EventSetup const& es,
566  ActivityRegistry* actReg,
567  ModuleCallingContext const* mcc,
568  Arg::Context const* context) {
569  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
570  return iWorker->implDoEnd(ep,es, mcc);
571  }
572  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
573  typename Arg::MyPrincipal const& ep,
574  ModuleCallingContext const* mcc) {
575  return true;
576  }
577 
578  };
579  template<>
581  public:
583  static bool call(Worker* iWorker,StreamID id,
584  LuminosityBlockPrincipal const& ep, EventSetup const& es,
585  ActivityRegistry* actReg,
586  ModuleCallingContext const* mcc,
587  Arg::Context const* context) {
588  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
589  return iWorker->implDoStreamEnd(id,ep,es, mcc);
590  }
591 
592  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
593  typename Arg::MyPrincipal const& ep,
594  ModuleCallingContext const* mcc) {
595  return true;
596  }
597  };
598  }
599 
600 
601  template <typename T>
603  typename T::MyPrincipal const& ep,
604  EventSetup const& es,
605  StreamID streamID,
606  ParentContext const& parentContext,
607  typename T::Context const* context) {
608  waitingTasks_.add(task);
609  if(T::isEvent_) {
610  timesVisited_.fetch_add(1,std::memory_order_relaxed);
611  }
612 
613  bool expected = false;
614  if(workStarted_.compare_exchange_strong(expected,true)) {
616 
617  //if have TriggerResults based selection we want to reject the event before doing prefetching
619  setPassed<T::isEvent_>();
620  waitingTasks_.doneWaiting(nullptr);
621  return;
622  }
623 
624  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
625  this, ep,es,streamID,parentContext,context);
626  prefetchAsync(runTask, parentContext, ep);
627  }
628  }
629 
630  template<typename T>
631  void Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
632  typename T::MyPrincipal const& ep,
633  EventSetup const& es,
634  StreamID streamID,
635  ParentContext const& parentContext,
636  typename T::Context const* context) {
637  try {
638  convertException::wrap([&]() {
639  if(iEPtr) {
640  assert(*iEPtr);
642  std::rethrow_exception(*iEPtr);
643  }
644 
645  runModule<T>(ep,es,streamID,parentContext,context);
646  });
647  } catch( cms::Exception& iException) {
649  if(shouldRethrowException(iException, parentContext, T::isEvent_, idValue)) {
651  std::ostringstream iost;
652  if(iEPtr) {
653  iost<<"Prefetching for module ";
654  } else {
655  iost<<"Calling method for module ";
656  }
657  iost<<description().moduleName() << "/'"
658  << description().moduleLabel() << "'";
659  iException.addContext(iost.str());
660  setException<T::isEvent_>(std::current_exception());
661  waitingTasks_.doneWaiting(cached_exception_);
662  return;
663  } else {
664  setPassed<T::isEvent_>();
665  }
666  }
667  waitingTasks_.doneWaiting(nullptr);
668  }
669 
670  template <typename T>
671  bool Worker::doWork(typename T::MyPrincipal const& ep,
672  EventSetup const& es,
673  StreamID streamID,
674  ParentContext const& parentContext,
675  typename T::Context const* context) {
676 
677  if (T::isEvent_) {
678  timesVisited_.fetch_add(1,std::memory_order_relaxed);
679  }
680  bool rc = false;
681 
682  switch(state_) {
683  case Ready: break;
684  case Pass: return true;
685  case Fail: return false;
686  case Exception: {
687  std::rethrow_exception(cached_exception_);
688  }
689  }
690 
691  bool expected = false;
692  if(not workStarted_.compare_exchange_strong(expected, true) ) {
693  //another thread beat us here
694  auto waitTask = edm::make_empty_waiting_task();
695  waitTask->increment_ref_count();
696 
697  waitingTasks_.add(waitTask.get());
698 
699  waitTask->wait_for_all();
700 
701  switch(state_) {
702  case Ready: assert(false);
703  case Pass: return true;
704  case Fail: return false;
705  case Exception: {
706  std::rethrow_exception(cached_exception_);
707  }
708  }
709  }
710 
711  //Need the context to be set until after any exception is resolved
713 
714  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
715  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
716 
717  try {
718  convertException::wrap([&]() {
719 
720  if (T::isEvent_) {
721 
722  //if have TriggerResults based selection we want to reject the event before doing prefetching
724  timesRun_.fetch_add(1,std::memory_order_relaxed);
725  rc = setPassed<T::isEvent_>();
726  waitingTasks_.doneWaiting(nullptr);
727  return;
728  }
729  auto waitTask = edm::make_empty_waiting_task();
730  {
731  //Make sure signal is sent once the prefetching is done
732  // [the 'pre' signal was sent in prefetchAsync]
733  //The purpose of this block is to send the signal after wait_for_all
734  auto sentryFunc = [this](void*) {
736  };
737  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
738 
739  //set count to 2 since wait_for_all requires value to not go to 0
740  waitTask->set_ref_count(2);
741 
742  prefetchAsync(waitTask.get(),parentContext, ep);
743  waitTask->decrement_ref_count();
744  waitTask->wait_for_all();
745  }
746  if(waitTask->exceptionPtr() != nullptr) {
747  std::rethrow_exception(*(waitTask->exceptionPtr()));
748  }
749  }
750  //successful prefetch so no reset necessary
751  prefetchSentry.release();
752  if(auto queue = serializeRunModule()) {
753  auto serviceToken = ServiceRegistry::instance().presentToken();
754  queue->pushAndWait([&]() {
755  //Need to make the services available
756  ServiceRegistry::Operate guard(serviceToken);
757  rc = runModule<T>(ep,es,streamID,parentContext,context);
758  });
759  } else {
760  rc = runModule<T>(ep,es,streamID,parentContext,context);
761  }
762  });
763  }
764  catch(cms::Exception& ex) {
766  if(shouldRethrowException(ex, parentContext, T::isEvent_, idValue)) {
768  setException<T::isEvent_>(std::current_exception());
769  waitingTasks_.doneWaiting(cached_exception_);
770  std::rethrow_exception(cached_exception_);
771  } else {
772  rc = setPassed<T::isEvent_>();
773  }
774  }
775  waitingTasks_.doneWaiting(nullptr);
776  return rc;
777  }
778 
779 
780  template <typename T>
781  bool Worker::runModule(typename T::MyPrincipal const& ep,
782  EventSetup const& es,
783  StreamID streamID,
784  ParentContext const& parentContext,
785  typename T::Context const* context) {
786  //unscheduled producers should advance this
787  //if (T::isEvent_) {
788  // ++timesVisited_;
789  //}
790  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
791  if (T::isEvent_) {
792  timesRun_.fetch_add(1,std::memory_order_relaxed);
793  }
794 
795  bool rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
796 
797  if (rc) {
798  setPassed<T::isEvent_>();
799  } else {
800  setFailed<T::isEvent_>();
801  }
802  return rc;
803  }
804 }
805 #endif
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:564
tbb::task * execute() override
Definition: Worker.h:319
bool shouldRethrowException(cms::Exception &ex, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:155
std::atomic< int > timesVisited_
Definition: Worker.h:383
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:305
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:225
void skipOnPath()
Definition: Worker.cc:311
ModuleDescription const & description() const
Definition: Worker.h:123
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
Definition: Worker.h:378
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:781
void callWhenDoneAsync(WaitingTask *task)
Definition: Worker.h:98
T::MyPrincipal const & m_principal
Definition: Worker.h:374
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:389
StreamContext const * getStreamContext() const
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:277
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:221
virtual void modulesWhoseProductsAreConsumed(std::vector< ModuleDescription const * > &modules, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:496
virtual void implPostForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)=0
virtual ~Worker()
Definition: Worker.cc:91
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:506
void endJob()
Definition: Worker.cc:247
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:543
static PFTauRenderPlugin instance
ParentContext const m_parentContext
Definition: Worker.h:377
std::atomic< int > timesExcept_
Definition: Worker.h:386
std::atomic< bool > workStarted_
Definition: Worker.h:401
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:487
std::atomic< int > timesFailed_
Definition: Worker.h:385
int timesPassed() const
Definition: Worker.h:159
void add(WaitingTask *)
Adds task to the waiting list.
assert(m_qm.get())
void addedToPath()
Definition: Worker.h:153
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:396
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:671
State state() const
Definition: Worker.h:162
void clearCounters()
Definition: Worker.h:145
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:525
std::string const & moduleName() const
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:452
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:433
virtual SerialTaskQueueChain * serializeRunModule()=0
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:572
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:534
ActivityRegistry * activityRegistry()
Definition: Worker.h:197
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:583
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:263
virtual void implPreForkReleaseResources()=0
void reset()
Resets access to the resource so that added tasks will wait.
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:553
virtual std::string workerType() const =0
ExceptionToActionTable const * actions_
Definition: Worker.h:393
ServiceToken presentToken() const
std::string const & moduleLabel() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:94
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, edm::ProductResolverIndex > const &iIndicies)=0
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:391
BranchType
Definition: BranchType.h:11
std::exception_ptr cached_exception_
Definition: Worker.h:394
bool setFailed()
Definition: Worker.h:268
int timesExcept() const
Definition: Worker.h:161
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
Worker & operator=(Worker const &)=delete
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:400
void reset()
Definition: Worker.h:113
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
int timesVisited() const
Definition: Worker.h:158
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:451
ModuleDescription const * descPtr() const
Definition: Worker.h:124
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
ModuleDescription const * moduleDescription() const
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:592
virtual std::vector< ConsumesInfo > consumesInfo() const =0
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.h:111
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:563
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:72
std::atomic< State > state_
Definition: Worker.h:387
int timesRun() const
Definition: Worker.h:157
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:470
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:488
int numberOfPathsOn_
Definition: Worker.h:388
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:582
bool setPassed()
Definition: Worker.h:259
TransitionIDValue(T const &iP)
Definition: Worker.h:242
virtual void implEndJob()=0
virtual std::string value() const override
Definition: Worker.h:243
areg
Definition: Schedule.cc:384
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:110
virtual Types moduleType() const =0
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:432
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:469
virtual void implBeginJob()=0
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:106
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:198
int timesPass() const
Definition: Worker.h:164
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:544
virtual void implEndStream(StreamID)=0
int timesFailed() const
Definition: Worker.h:160
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:107
void preForkReleaseResources()
Definition: Worker.h:109
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent() const =0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:441
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:290
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
static void exceptionContext(const std::string &iID, bool iIsEvent, cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:99
double a
Definition: hdecay.h:121
std::atomic< int > timesPassed_
Definition: Worker.h:384
void beginJob()
Definition: Worker.cc:231
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:287
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:317
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFromEvent() const =0
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:398
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:460
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:602
static Interceptor::Registry registry("Interceptor")
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:524
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:514
std::atomic< int > timesRun_
Definition: Worker.h:382
virtual std::string value() const =0
preg
Definition: Schedule.cc:384
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:52
long double T
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:505
ServiceToken m_serviceToken
Definition: Worker.h:379
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:478
void runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:631
EventSetup const & m_es
Definition: Worker.h:375
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0