CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
50 
52 
53 #include <atomic>
54 #include <map>
55 #include <memory>
56 #include <sstream>
57 #include <string>
58 #include <vector>
59 #include <exception>
60 #include <unordered_map>
61 
62 namespace edm {
63  class EventPrincipal;
64  class EarlyDeleteHelper;
65  class ProductResolverIndexHelper;
66  class ProductResolverIndexAndSkipBit;
67  class StreamID;
68  class StreamContext;
69  class ProductRegistry;
70  class ThinnedAssociationsHelper;
71  class WaitingTask;
72 
73  namespace workerhelper {
74  template< typename O> class CallImpl;
75  }
76 
77  class Worker {
78  public:
79  enum State { Ready, Pass, Fail, Exception };
80  enum Types { kAnalyzer, kFilter, kProducer, kOutputModule};
81 
82  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
83  virtual ~Worker();
84 
85  Worker(Worker const&) = delete; // Disallow copying and moving
86  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
87 
88  template <typename T>
89  bool doWork(typename T::MyPrincipal const&, EventSetup const& c,
90  StreamID stream,
91  ParentContext const& parentContext,
92  typename T::Context const* context);
93  template <typename T>
94  void doWorkAsync(WaitingTask* task,
95  typename T::MyPrincipal const&, EventSetup const& c,
96  StreamID stream,
97  ParentContext const& parentContext,
98  typename T::Context const* context);
99 
100  template <typename T>
101  void doWorkNoPrefetchingAsync(WaitingTask* task,
102  typename T::MyPrincipal const&,
103  EventSetup const& c,
104  StreamID stream,
105  ParentContext const& parentContext,
106  typename T::Context const* context);
107 
108  template <typename T>
109  std::exception_ptr runModuleDirectly(typename T::MyPrincipal const& ep,
110  EventSetup const& es,
111  StreamID streamID,
112  ParentContext const& parentContext,
113  typename T::Context const* context);
114 
116  waitingTasks_.add(task);
117  }
118  void skipOnPath();
119  void beginJob() ;
120  void endJob();
121  void beginStream(StreamID id, StreamContext& streamContext);
122  void endStream(StreamID id, StreamContext& streamContext);
123  void respondToOpenInputFile(FileBlock const& fb) {implRespondToOpenInputFile(fb);}
124  void respondToCloseInputFile(FileBlock const& fb) {implRespondToCloseInputFile(fb);}
125 
126  void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper) { implRegisterThinnedAssociations(registry, helper); }
127 
128  void reset() {
129  cached_exception_ = std::exception_ptr();
130  state_ = Ready;
131  waitingTasks_.reset();
132  workStarted_ = false;
133  numberOfPathsLeftToRun_ = numberOfPathsOn_;
134  }
135 
136  void postDoEvent(EventPrincipal const&);
137 
138  ModuleDescription const& description() const {return *(moduleCallingContext_.moduleDescription());}
139  ModuleDescription const* descPtr() const {return moduleCallingContext_.moduleDescription(); }
142  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
143 
144  void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
145 
146  //Used to make EDGetToken work
147  virtual void updateLookup(BranchType iBranchType,
148  ProductResolverIndexHelper const&) = 0;
149  virtual void resolvePutIndicies(BranchType iBranchType,
150  std::unordered_multimap<std::string, edm::ProductResolverIndex> const& iIndicies) = 0;
151 
152  virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>& modules,
153  ProductRegistry const& preg,
154  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
155 
156  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
157 
158  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
159 
160  virtual Types moduleType() const =0;
161 
162  void clearCounters() {
163  timesRun_.store(0,std::memory_order_release);
164  timesVisited_.store(0,std::memory_order_release);
165  timesPassed_.store(0,std::memory_order_release);
166  timesFailed_.store(0,std::memory_order_release);
167  timesExcept_.store(0,std::memory_order_release);
168  }
169 
170  void addedToPath() {
171  ++numberOfPathsOn_;
172  }
173  //NOTE: calling state() is done to force synchronization across threads
174  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
175  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
176  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
177  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
178  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
179  State state() const { return state_; }
180 
181  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
182 
183  protected:
184  template<typename O> friend class workerhelper::CallImpl;
185  virtual std::string workerType() const = 0;
186  virtual bool implDo(EventPrincipal const&, EventSetup const& c,
187  ModuleCallingContext const* mcc) = 0;
188  virtual bool implDoPrePrefetchSelection(StreamID id,
189  EventPrincipal const& ep,
190  ModuleCallingContext const* mcc) = 0;
191  virtual bool implDoBegin(RunPrincipal const& rp, EventSetup const& c,
192  ModuleCallingContext const* mcc) = 0;
193  virtual bool implDoStreamBegin(StreamID id, RunPrincipal const& rp, EventSetup const& c,
194  ModuleCallingContext const* mcc) = 0;
195  virtual bool implDoStreamEnd(StreamID id, RunPrincipal const& rp, EventSetup const& c,
196  ModuleCallingContext const* mcc) = 0;
197  virtual bool implDoEnd(RunPrincipal const& rp, EventSetup const& c,
198  ModuleCallingContext const* mcc) = 0;
199  virtual bool implDoBegin(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
200  ModuleCallingContext const* mcc) = 0;
201  virtual bool implDoStreamBegin(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
202  ModuleCallingContext const* mcc) = 0;
203  virtual bool implDoStreamEnd(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
204  ModuleCallingContext const* mcc) = 0;
205  virtual bool implDoEnd(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
206  ModuleCallingContext const* mcc) = 0;
207  virtual void implBeginJob() = 0;
208  virtual void implEndJob() = 0;
209  virtual void implBeginStream(StreamID) = 0;
210  virtual void implEndStream(StreamID) = 0;
211 
212  void resetModuleDescription(ModuleDescription const*);
213 
214  ActivityRegistry* activityRegistry() { return actReg_.get(); }
215 
216  private:
217 
218  template <typename T>
219  bool runModule(typename T::MyPrincipal const&, EventSetup const& c,
220  StreamID stream,
221  ParentContext const& parentContext,
222  typename T::Context const* context);
223 
224  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
225  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
226 
227  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
228 
229 
230  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
231 
232  virtual void preActionBeforeRunEventAsync(WaitingTask* iTask, ModuleCallingContext const& moduleCallingContext, Principal const& iPrincipal) const = 0;
233 
234  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
235  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
236 
237  virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
238 
239  virtual SerialTaskQueueChain* serializeRunModule() = 0;
240 
241  static void exceptionContext(cms::Exception& ex,
242  ModuleCallingContext const* mcc);
243 
244  /*This base class is used to hide the differences between the ID used
245  for Event, LuminosityBlock and Run. Using the base class allows us
246  to only convert the ID to string form if it is actually needed in
247  the call to shouldRethrowException.
248  */
250  public:
251  virtual std::string value() const = 0;
253  };
254 
255  template< typename T>
257  public:
258  TransitionIDValue(T const& iP): p_(iP) {}
259  virtual std::string value() const override {
260  std::ostringstream iost;
261  iost<<p_.id();
262  return iost.str();
263  }
264  private:
265  T const& p_;
266 
267  };
268 
269  bool shouldRethrowException(std::exception_ptr iPtr,
270  ParentContext const& parentContext,
271  bool isEvent,
272  TransitionIDValueBase const& iID) const;
273 
274  template<bool IS_EVENT>
275  bool setPassed() {
276  if(IS_EVENT) {
277  timesPassed_.fetch_add(1,std::memory_order_relaxed);
278  }
279  state_ = Pass;
280  return true;
281  }
282 
283  template<bool IS_EVENT>
284  bool setFailed() {
285  if(IS_EVENT) {
286  timesFailed_.fetch_add(1,std::memory_order_relaxed);
287  }
288  state_ = Fail;
289  return false;
290  }
291 
292  template<bool IS_EVENT>
293  std::exception_ptr setException(std::exception_ptr iException) {
294  if (IS_EVENT) {
295  timesExcept_.fetch_add(1,std::memory_order_relaxed);
296  }
297  cached_exception_ = iException; // propagate_const<T> has no reset() function
298  state_ = Exception;
299  return cached_exception_;
300  }
301 
302  void prefetchAsync(WaitingTask*,
303  ParentContext const& parentContext,
304  Principal const& );
305 
307  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
308  }
309 
310  template<typename T>
311  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const * iEPtr,
312  typename T::MyPrincipal const& ep,
313  EventSetup const& es,
314  StreamID streamID,
315  ParentContext const& parentContext,
316  typename T::Context const* context);
317 
318  template< typename T>
319  class RunModuleTask : public WaitingTask {
320  public:
322  typename T::MyPrincipal const& ep,
323  EventSetup const& es,
324  StreamID streamID,
325  ParentContext const& parentContext,
326  typename T::Context const* context):
327  m_worker(worker),
328  m_principal(ep),
329  m_es(es),
330  m_streamID(streamID),
331  m_parentContext(parentContext),
332  m_context(context),
333  m_serviceToken(ServiceRegistry::instance().presentToken()) {}
334 
335  tbb::task* execute() override {
336  //Need to make the services available early so other services can see them
337  ServiceRegistry::Operate guard(m_serviceToken);
338 
339  //incase the emit causes an exception, we need a memory location
340  // to hold the exception_ptr
341  std::exception_ptr temp_excptr;
342  auto excptr = exceptionPtr();
343  if(T::isEvent_) {
344  try {
345  //pre was called in prefetchAsync
346  m_worker->emitPostModuleEventPrefetchingSignal();
347  }catch(...) {
348  temp_excptr = std::current_exception();
349  if(not excptr) {
350  excptr = &temp_excptr;
351  }
352  }
353  }
354 
355  if( not excptr) {
356  if(auto queue = m_worker->serializeRunModule()) {
357  Worker* worker = m_worker;
358  auto const & principal = m_principal;
359  auto& es = m_es;
360  auto streamID = m_streamID;
361  auto parentContext = m_parentContext;
362  auto serviceToken = m_serviceToken;
363  auto sContext = m_context;
364  queue->push( [worker, &principal, &es, streamID,parentContext,sContext, serviceToken]()
365  {
366  //Need to make the services available
367  ServiceRegistry::Operate guard(serviceToken);
368 
369  std::exception_ptr* ptr = nullptr;
370  worker->runModuleAfterAsyncPrefetch<T>(ptr,
371  principal,
372  es,
373  streamID,
374  parentContext,
375  sContext);
376  });
377  return nullptr;
378  }
379  }
380 
381  m_worker->runModuleAfterAsyncPrefetch<T>(excptr,
382  m_principal,
383  m_es,
384  m_streamID,
385  m_parentContext,
386  m_context);
387  return nullptr;
388  }
389 
390  private:
392  typename T::MyPrincipal const& m_principal;
393  EventSetup const& m_es;
396  typename T::Context const* m_context;
398  };
399 
400  std::atomic<int> timesRun_;
401  std::atomic<int> timesVisited_;
402  std::atomic<int> timesPassed_;
403  std::atomic<int> timesFailed_;
404  std::atomic<int> timesExcept_;
405  std::atomic<State> state_;
407  std::atomic<int> numberOfPathsLeftToRun_;
408 
410 
411  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
412  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
413 
414  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
415 
416  edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
417 
418  edm::WaitingTaskList waitingTasks_;
419  std::atomic<bool> workStarted_;
420  };
421 
422  namespace {
423  template <typename T>
424  class ModuleSignalSentry {
425  public:
426  ModuleSignalSentry(ActivityRegistry *a,
427  typename T::Context const* context,
428  ModuleCallingContext const* moduleCallingContext) :
429  a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
430 
431  if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
432  }
433 
434  ~ModuleSignalSentry() {
435  if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
436  }
437 
438  private:
439  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
440  typename T::Context const* context_;
441  ModuleCallingContext const* moduleCallingContext_;
442  };
443 
444  }
445 
446  namespace workerhelper {
447  template<>
449  public:
451  static bool call(Worker* iWorker, StreamID,
452  EventPrincipal const& ep, EventSetup const& es,
453  ActivityRegistry* /* actReg */,
454  ModuleCallingContext const* mcc,
455  Arg::Context const* /* context*/) {
456  //Signal sentry is handled by the module
457  return iWorker->implDo(ep,es, mcc);
458  }
459  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
460  typename Arg::MyPrincipal const& ep,
461  ModuleCallingContext const* mcc) {
462  return iWorker->implDoPrePrefetchSelection(id,ep,mcc);
463  }
464  };
465 
466  template<>
468  public:
470  static bool call(Worker* iWorker,StreamID,
471  RunPrincipal const& ep, EventSetup const& es,
472  ActivityRegistry* actReg,
473  ModuleCallingContext const* mcc,
474  Arg::Context const* context) {
475  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
476  return iWorker->implDoBegin(ep,es, mcc);
477  }
478  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
479  typename Arg::MyPrincipal const& ep,
480  ModuleCallingContext const* mcc) {
481  return true;
482  }
483  };
484  template<>
486  public:
488  static bool call(Worker* iWorker,StreamID id,
489  RunPrincipal const & ep, EventSetup const& es,
490  ActivityRegistry* actReg,
491  ModuleCallingContext const* mcc,
492  Arg::Context const* context) {
493  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
494  return iWorker->implDoStreamBegin(id,ep,es, mcc);
495  }
496  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
497  typename Arg::MyPrincipal const& ep,
498  ModuleCallingContext const* mcc) {
499  return true;
500  }
501  };
502  template<>
504  public:
506  static bool call(Worker* iWorker,StreamID,
507  RunPrincipal const& ep, EventSetup const& es,
508  ActivityRegistry* actReg,
509  ModuleCallingContext const* mcc,
510  Arg::Context const* context) {
511  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
512  return iWorker->implDoEnd(ep,es, mcc);
513  }
514  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
515  typename Arg::MyPrincipal const& ep,
516  ModuleCallingContext const* mcc) {
517  return true;
518  }
519  };
520  template<>
522  public:
524  static bool call(Worker* iWorker,StreamID id,
525  RunPrincipal const& ep, EventSetup const& es,
526  ActivityRegistry* actReg,
527  ModuleCallingContext const* mcc,
528  Arg::Context const* context) {
529  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
530  return iWorker->implDoStreamEnd(id,ep,es, mcc);
531  }
532  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
533  typename Arg::MyPrincipal const& ep,
534  ModuleCallingContext const* mcc) {
535  return true;
536  }
537  };
538 
539  template<>
541  public:
543  static bool call(Worker* iWorker,StreamID,
544  LuminosityBlockPrincipal const& ep, EventSetup const& es,
545  ActivityRegistry* actReg,
546  ModuleCallingContext const* mcc,
547  Arg::Context const* context) {
548  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
549  return iWorker->implDoBegin(ep,es, mcc);
550  }
551 
552  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
553  typename Arg::MyPrincipal const& ep,
554  ModuleCallingContext const* mcc) {
555  return true;
556  }
557  };
558  template<>
560  public:
562  static bool call(Worker* iWorker,StreamID id,
563  LuminosityBlockPrincipal const& ep, EventSetup const& es,
564  ActivityRegistry* actReg,
565  ModuleCallingContext const* mcc,
566  Arg::Context const* context) {
567  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
568  return iWorker->implDoStreamBegin(id,ep,es, mcc);
569  }
570 
571  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
572  typename Arg::MyPrincipal const& ep,
573  ModuleCallingContext const* mcc) {
574  return true;
575  }
576 };
577 
578  template<>
580  public:
582  static bool call(Worker* iWorker,StreamID,
583  LuminosityBlockPrincipal const& ep, EventSetup const& es,
584  ActivityRegistry* actReg,
585  ModuleCallingContext const* mcc,
586  Arg::Context const* context) {
587  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
588  return iWorker->implDoEnd(ep,es, mcc);
589  }
590  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
591  typename Arg::MyPrincipal const& ep,
592  ModuleCallingContext const* mcc) {
593  return true;
594  }
595 
596  };
597  template<>
599  public:
601  static bool call(Worker* iWorker,StreamID id,
602  LuminosityBlockPrincipal const& ep, EventSetup const& es,
603  ActivityRegistry* actReg,
604  ModuleCallingContext const* mcc,
605  Arg::Context const* context) {
606  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
607  return iWorker->implDoStreamEnd(id,ep,es, mcc);
608  }
609 
610  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
611  typename Arg::MyPrincipal const& ep,
612  ModuleCallingContext const* mcc) {
613  return true;
614  }
615  };
616  }
617 
618 
619  template <typename T>
621  typename T::MyPrincipal const& ep,
622  EventSetup const& es,
623  StreamID streamID,
624  ParentContext const& parentContext,
625  typename T::Context const* context) {
626  waitingTasks_.add(task);
627  if(T::isEvent_) {
628  timesVisited_.fetch_add(1,std::memory_order_relaxed);
629  }
630 
631  bool expected = false;
632  if(workStarted_.compare_exchange_strong(expected,true)) {
633  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching,parentContext,nullptr);
634 
635  //if have TriggerResults based selection we want to reject the event before doing prefetching
636  try {
637  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
638  setPassed<T::isEvent_>();
639  waitingTasks_.doneWaiting(nullptr);
640  return;
641  }
642  }catch(...) {}
643 
644  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
645  this, ep,es,streamID,parentContext,context);
646  prefetchAsync(runTask, parentContext, ep);
647  }
648  }
649 
650  template<typename T>
651  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
652  typename T::MyPrincipal const& ep,
653  EventSetup const& es,
654  StreamID streamID,
655  ParentContext const& parentContext,
656  typename T::Context const* context) {
657  std::exception_ptr exceptionPtr;
658  if(iEPtr) {
659  assert(*iEPtr);
661  if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
662  exceptionPtr = *iEPtr;
663  setException<T::isEvent_>(exceptionPtr);
664  } else {
665  setPassed<T::isEvent_>();
666  }
667  moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr);
668  } else {
669  try {
670  runModule<T>(ep,es,streamID,parentContext,context);
671  } catch(...) {
672  exceptionPtr = std::current_exception();
673  }
674  }
675  waitingTasks_.doneWaiting(exceptionPtr);
676  return exceptionPtr;
677  }
678 
679  template <typename T>
681  typename T::MyPrincipal const& principal,
682  EventSetup const& es,
683  StreamID streamID,
684  ParentContext const& parentContext,
685  typename T::Context const* context) {
686  waitingTasks_.add(task);
687  bool expected = false;
688  if(workStarted_.compare_exchange_strong(expected,true)) {
689  auto serviceToken = ServiceRegistry::instance().presentToken();
690 
691  auto toDo =[this, &principal, &es, streamID,parentContext,context, serviceToken]()
692  {
693  std::exception_ptr exceptionPtr;
694  try {
695  //Need to make the services available
696  ServiceRegistry::Operate guard(serviceToken);
697 
698  this->runModule<T>(principal,
699  es,
700  streamID,
701  parentContext,
702  context);
703  } catch( ... ) {
704  exceptionPtr = std::current_exception();
705  }
706  this->waitingTasks_.doneWaiting(exceptionPtr);
707  };
708  if(auto queue = this->serializeRunModule()) {
709  queue->push( toDo);
710  } else {
711  auto task = make_functor_task( tbb::task::allocate_root(), toDo);
712  tbb::task::spawn(*task);
713  }
714  }
715  }
716 
717  template <typename T>
718  bool Worker::doWork(typename T::MyPrincipal const& ep,
719  EventSetup const& es,
720  StreamID streamID,
721  ParentContext const& parentContext,
722  typename T::Context const* context) {
723 
724  if (T::isEvent_) {
725  timesVisited_.fetch_add(1,std::memory_order_relaxed);
726  }
727  bool rc = false;
728 
729  switch(state_) {
730  case Ready: break;
731  case Pass: return true;
732  case Fail: return false;
733  case Exception: {
734  std::rethrow_exception(cached_exception_);
735  }
736  }
737 
738  bool expected = false;
739  if(not workStarted_.compare_exchange_strong(expected, true) ) {
740  //another thread beat us here
741  auto waitTask = edm::make_empty_waiting_task();
742  waitTask->increment_ref_count();
743 
744  waitingTasks_.add(waitTask.get());
745 
746  waitTask->wait_for_all();
747 
748  switch(state_) {
749  case Ready: assert(false);
750  case Pass: return true;
751  case Fail: return false;
752  case Exception: {
753  std::rethrow_exception(cached_exception_);
754  }
755  }
756  }
757 
758  //Need the context to be set until after any exception is resolved
759  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching,parentContext,nullptr);
760 
761  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
762  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
763 
764  if (T::isEvent_) {
765  try {
766  //if have TriggerResults based selection we want to reject the event before doing prefetching
767  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
768  timesRun_.fetch_add(1,std::memory_order_relaxed);
769  rc = setPassed<T::isEvent_>();
770  waitingTasks_.doneWaiting(nullptr);
771  return true;
772  }
773  }catch(...) {}
774  auto waitTask = edm::make_empty_waiting_task();
775  {
776  //Make sure signal is sent once the prefetching is done
777  // [the 'pre' signal was sent in prefetchAsync]
778  //The purpose of this block is to send the signal after wait_for_all
779  auto sentryFunc = [this](void*) {
780  emitPostModuleEventPrefetchingSignal();
781  };
782  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
783 
784  //set count to 2 since wait_for_all requires value to not go to 0
785  waitTask->set_ref_count(2);
786 
787  prefetchAsync(waitTask.get(),parentContext, ep);
788  waitTask->decrement_ref_count();
789  waitTask->wait_for_all();
790  }
791  if(waitTask->exceptionPtr() != nullptr) {
793  if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
794  setException<T::isEvent_>(*waitTask->exceptionPtr());
795  waitingTasks_.doneWaiting(cached_exception_);
796  std::rethrow_exception(cached_exception_);
797  } else {
798  setPassed<T::isEvent_>();
799  waitingTasks_.doneWaiting(nullptr);
800  return true;
801  }
802  }
803  }
804 
805  //successful prefetch so no reset necessary
806  prefetchSentry.release();
807  if(auto queue = serializeRunModule()) {
808  auto serviceToken = ServiceRegistry::instance().presentToken();
809  queue->pushAndWait([&]() {
810  //Need to make the services available
811  ServiceRegistry::Operate guard(serviceToken);
812  try {
813  rc = runModule<T>(ep,es,streamID,parentContext,context);
814  } catch(...) {
815  }
816  });
817  } else {
818  try {
819  rc = runModule<T>(ep,es,streamID,parentContext,context);
820  } catch(...) {
821  }
822  }
823  if(state_ == Exception) {
824  waitingTasks_.doneWaiting(cached_exception_);
825  std::rethrow_exception(cached_exception_);
826  }
827 
828  waitingTasks_.doneWaiting(nullptr);
829  return rc;
830  }
831 
832 
833  template <typename T>
834  bool Worker::runModule(typename T::MyPrincipal const& ep,
835  EventSetup const& es,
836  StreamID streamID,
837  ParentContext const& parentContext,
838  typename T::Context const* context) {
839  //unscheduled producers should advance this
840  //if (T::isEvent_) {
841  // ++timesVisited_;
842  //}
843  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
844  if (T::isEvent_) {
845  timesRun_.fetch_add(1,std::memory_order_relaxed);
846  }
847 
848  bool rc = true;
849  try {
851  {
852  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
853 
854  if (rc) {
855  setPassed<T::isEvent_>();
856  } else {
857  setFailed<T::isEvent_>();
858  }
859  });
860  } catch(cms::Exception& ex) {
861  exceptionContext(ex, &moduleCallingContext_);
863  if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
864  assert(not cached_exception_);
865  setException<T::isEvent_>(std::current_exception());
866  std::rethrow_exception(cached_exception_);
867  } else {
868  rc = setPassed<T::isEvent_>();
869  }
870  }
871 
872  return rc;
873  }
874 
875  template <typename T>
876  std::exception_ptr Worker::runModuleDirectly(typename T::MyPrincipal const& ep,
877  EventSetup const& es,
878  StreamID streamID,
879  ParentContext const& parentContext,
880  typename T::Context const* context) {
881 
882  timesVisited_.fetch_add(1,std::memory_order_relaxed);
883  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
884  return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
885  }
886 }
887 #endif
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:582
tbb::task * execute() override
Definition: Worker.h:335
std::atomic< int > timesVisited_
Definition: Worker.h:401
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:321
ModuleDescription const & description() const
Definition: Worker.h:138
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
Definition: Worker.h:396
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:834
void callWhenDoneAsync(WaitingTask *task)
Definition: Worker.h:115
T::MyPrincipal const & m_principal
Definition: Worker.h:392
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:407
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:293
virtual std::string value() const override
Definition: Worker.h:259
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:514
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:524
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:561
Definition: hltDiff.cc:290
static PFTauRenderPlugin instance
ParentContext const m_parentContext
Definition: Worker.h:395
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< int > timesExcept_
Definition: Worker.h:404
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:505
std::atomic< int > timesFailed_
Definition: Worker.h:403
int timesPassed() const
Definition: Worker.h:176
void addedToPath()
Definition: Worker.h:170
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:718
State state() const
Definition: Worker.h:179
void clearCounters()
Definition: Worker.h:162
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:543
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:470
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:451
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:590
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:552
ActivityRegistry * activityRegistry()
Definition: Worker.h:214
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:601
void exceptionContext(std::ostream &, GlobalContext const &)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:571
ExceptionToActionTable const * actions_
Definition: Worker.h:411
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:876
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:409
BranchType
Definition: BranchType.h:11
void beginJob()
Definition: Breakpoints.cc:15
bool setFailed()
Definition: Worker.h:284
int timesExcept() const
Definition: Worker.h:178
def principal(options)
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
void reset()
Definition: Worker.h:128
int timesVisited() const
Definition: Worker.h:175
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:469
ModuleDescription const * descPtr() const
Definition: Worker.h:139
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:610
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.h:126
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:581
std::atomic< State > state_
Definition: Worker.h:405
int timesRun() const
Definition: Worker.h:174
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:488
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:506
Definition: Types.py:1
int numberOfPathsOn_
Definition: Worker.h:406
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:600
bool setPassed()
Definition: Worker.h:275
TransitionIDValue(T const &iP)
Definition: Worker.h:258
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:450
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:680
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:487
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:651
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:123
int timesPass() const
Definition: Worker.h:181
const T & get() const
Definition: EventSetup.h:55
Definition: hltDiff.cc:291
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:562
int timesFailed() const
Definition: Worker.h:177
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:124
HLT enums.
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:459
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:306
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
double a
Definition: hdecay.h:121
std::atomic< int > timesPassed_
Definition: Worker.h:402
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:478
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:620
static Interceptor::Registry registry("Interceptor")
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:542
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:532
std::atomic< int > timesRun_
Definition: Worker.h:400
long double T
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:523
ServiceToken m_serviceToken
Definition: Worker.h:397
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:496
EventSetup const & m_es
Definition: Worker.h:393