CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
50 
52 
53 #include <map>
54 #include <memory>
55 #include <sstream>
56 #include <string>
57 #include <vector>
58 #include <exception>
59 #include <unordered_map>
60 
61 namespace edm {
62  class EventPrincipal;
63  class EarlyDeleteHelper;
64  class ProductResolverIndexHelper;
65  class ProductResolverIndexAndSkipBit;
66  class StreamID;
67  class StreamContext;
68  class ProductRegistry;
69  class ThinnedAssociationsHelper;
70  class WaitingTask;
71 
72  namespace workerhelper {
73  template< typename O> class CallImpl;
74  }
75 
76  class Worker {
77  public:
78  enum State { Ready, Pass, Fail, Exception };
79  enum Types { kAnalyzer, kFilter, kProducer, kOutputModule};
80 
81  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
82  virtual ~Worker();
83 
84  Worker(Worker const&) = delete; // Disallow copying and moving
85  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
86 
87  template <typename T>
88  bool doWork(typename T::MyPrincipal const&, EventSetup const& c,
89  StreamID stream,
90  ParentContext const& parentContext,
91  typename T::Context const* context);
92  template <typename T>
93  void doWorkAsync(WaitingTask* task,
94  typename T::MyPrincipal const&, EventSetup const& c,
95  StreamID stream,
96  ParentContext const& parentContext,
97  typename T::Context const* context);
98 
99  template <typename T>
100  void doWorkNoPrefetchingAsync(WaitingTask* task,
101  typename T::MyPrincipal const&,
102  EventSetup const& c,
103  StreamID stream,
104  ParentContext const& parentContext,
105  typename T::Context const* context);
106 
108  waitingTasks_.add(task);
109  }
110  void skipOnPath();
111  void beginJob() ;
112  void endJob();
113  void beginStream(StreamID id, StreamContext& streamContext);
114  void endStream(StreamID id, StreamContext& streamContext);
115  void respondToOpenInputFile(FileBlock const& fb) {implRespondToOpenInputFile(fb);}
116  void respondToCloseInputFile(FileBlock const& fb) {implRespondToCloseInputFile(fb);}
117 
118  void preForkReleaseResources() {implPreForkReleaseResources();}
119  void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
120  void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper) { implRegisterThinnedAssociations(registry, helper); }
121 
122  void reset() {
123  cached_exception_ = std::exception_ptr();
124  state_ = Ready;
125  waitingTasks_.reset();
126  workStarted_ = false;
127  numberOfPathsLeftToRun_ = numberOfPathsOn_;
128  }
129 
130  void postDoEvent(EventPrincipal const&);
131 
132  ModuleDescription const& description() const {return *(moduleCallingContext_.moduleDescription());}
133  ModuleDescription const* descPtr() const {return moduleCallingContext_.moduleDescription(); }
136  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
137 
138  void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
139 
140  //Used to make EDGetToken work
141  virtual void updateLookup(BranchType iBranchType,
142  ProductResolverIndexHelper const&) = 0;
143  virtual void resolvePutIndicies(BranchType iBranchType,
144  std::unordered_multimap<std::string, edm::ProductResolverIndex> const& iIndicies) = 0;
145 
146  virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>& modules,
147  ProductRegistry const& preg,
148  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
149 
150  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
151 
152  virtual Types moduleType() const =0;
153 
154  void clearCounters() {
155  timesRun_.store(0,std::memory_order_release);
156  timesVisited_.store(0,std::memory_order_release);
157  timesPassed_.store(0,std::memory_order_release);
158  timesFailed_.store(0,std::memory_order_release);
159  timesExcept_.store(0,std::memory_order_release);
160  }
161 
162  void addedToPath() {
163  ++numberOfPathsOn_;
164  }
165  //NOTE: calling state() is done to force synchronization across threads
166  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
167  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
168  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
169  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
170  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
171  State state() const { return state_; }
172 
173  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
174 
175  protected:
176  template<typename O> friend class workerhelper::CallImpl;
177  virtual std::string workerType() const = 0;
178  virtual bool implDo(EventPrincipal const&, EventSetup const& c,
179  ModuleCallingContext const* mcc) = 0;
180  virtual bool implDoPrePrefetchSelection(StreamID id,
181  EventPrincipal const& ep,
182  ModuleCallingContext const* mcc) = 0;
183  virtual bool implDoBegin(RunPrincipal const& rp, EventSetup const& c,
184  ModuleCallingContext const* mcc) = 0;
185  virtual bool implDoStreamBegin(StreamID id, RunPrincipal const& rp, EventSetup const& c,
186  ModuleCallingContext const* mcc) = 0;
187  virtual bool implDoStreamEnd(StreamID id, RunPrincipal const& rp, EventSetup const& c,
188  ModuleCallingContext const* mcc) = 0;
189  virtual bool implDoEnd(RunPrincipal const& rp, EventSetup const& c,
190  ModuleCallingContext const* mcc) = 0;
191  virtual bool implDoBegin(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
192  ModuleCallingContext const* mcc) = 0;
193  virtual bool implDoStreamBegin(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
194  ModuleCallingContext const* mcc) = 0;
195  virtual bool implDoStreamEnd(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
196  ModuleCallingContext const* mcc) = 0;
197  virtual bool implDoEnd(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
198  ModuleCallingContext const* mcc) = 0;
199  virtual void implBeginJob() = 0;
200  virtual void implEndJob() = 0;
201  virtual void implBeginStream(StreamID) = 0;
202  virtual void implEndStream(StreamID) = 0;
203 
204  void resetModuleDescription(ModuleDescription const*);
205 
206  ActivityRegistry* activityRegistry() { return actReg_.get(); }
207 
208  private:
209 
210  template <typename T>
211  bool runModule(typename T::MyPrincipal const&, EventSetup const& c,
212  StreamID stream,
213  ParentContext const& parentContext,
214  typename T::Context const* context);
215 
216  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
217  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
218 
219  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFromEvent() const = 0;
220 
221  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
222 
223  virtual void preActionBeforeRunEventAsync(WaitingTask* iTask, ModuleCallingContext const& moduleCallingContext, Principal const& iPrincipal) const = 0;
224 
225  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
226  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
227 
228  virtual void implPreForkReleaseResources() = 0;
229  virtual void implPostForkReacquireResources(unsigned int iChildIndex,
230  unsigned int iNumberOfChildren) = 0;
231  virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
232 
233  virtual SerialTaskQueueChain* serializeRunModule() = 0;
234 
235  static void exceptionContext(cms::Exception& ex,
236  ModuleCallingContext const* mcc);
237 
238  /*This base class is used to hide the differences between the ID used
239  for Event, LuminosityBlock and Run. Using the base class allows us
240  to only convert the ID to string form if it is actually needed in
241  the call to shouldRethrowException.
242  */
244  public:
245  virtual std::string value() const = 0;
247  };
248 
249  template< typename T>
251  public:
252  TransitionIDValue(T const& iP): p_(iP) {}
253  virtual std::string value() const override {
254  std::ostringstream iost;
255  iost<<p_.id();
256  return iost.str();
257  }
258  private:
259  T const& p_;
260 
261  };
262 
263  bool shouldRethrowException(std::exception_ptr iPtr,
264  ParentContext const& parentContext,
265  bool isEvent,
266  TransitionIDValueBase const& iID) const;
267 
268  template<bool IS_EVENT>
269  bool setPassed() {
270  if(IS_EVENT) {
271  timesPassed_.fetch_add(1,std::memory_order_relaxed);
272  }
273  state_ = Pass;
274  return true;
275  }
276 
277  template<bool IS_EVENT>
278  bool setFailed() {
279  if(IS_EVENT) {
280  timesFailed_.fetch_add(1,std::memory_order_relaxed);
281  }
282  state_ = Fail;
283  return false;
284  }
285 
286  template<bool IS_EVENT>
287  std::exception_ptr setException(std::exception_ptr iException) {
288  if (IS_EVENT) {
289  timesExcept_.fetch_add(1,std::memory_order_relaxed);
290  }
291  cached_exception_ = iException; // propagate_const<T> has no reset() function
292  state_ = Exception;
293  return cached_exception_;
294  }
295 
296  void prefetchAsync(WaitingTask*,
297  ParentContext const& parentContext,
298  Principal const& );
299 
301  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
302  }
303 
304  template<typename T>
305  void runModuleAfterAsyncPrefetch(std::exception_ptr const * iEPtr,
306  typename T::MyPrincipal const& ep,
307  EventSetup const& es,
308  StreamID streamID,
309  ParentContext const& parentContext,
310  typename T::Context const* context);
311 
312  template< typename T>
313  class RunModuleTask : public WaitingTask {
314  public:
316  typename T::MyPrincipal const& ep,
317  EventSetup const& es,
318  StreamID streamID,
319  ParentContext const& parentContext,
320  typename T::Context const* context):
321  m_worker(worker),
322  m_principal(ep),
323  m_es(es),
324  m_streamID(streamID),
325  m_parentContext(parentContext),
326  m_context(context),
327  m_serviceToken(ServiceRegistry::instance().presentToken()) {}
328 
329  tbb::task* execute() override {
330  //Need to make the services available early so other services can see them
331  ServiceRegistry::Operate guard(m_serviceToken);
332 
333  //incase the emit causes an exception, we need a memory location
334  // to hold the exception_ptr
335  std::exception_ptr temp_excptr;
336  auto excptr = exceptionPtr();
337  try {
338  //pre was called in prefetchAsync
339  m_worker->emitPostModuleEventPrefetchingSignal();
340  }catch(...) {
341  temp_excptr = std::current_exception();
342  if(not excptr) {
343  excptr = &temp_excptr;
344  }
345  }
346 
347  if( not excptr) {
348  if(auto queue = m_worker->serializeRunModule()) {
349  Worker* worker = m_worker;
350  auto const & principal = m_principal;
351  auto& es = m_es;
352  auto streamID = m_streamID;
353  auto parentContext = m_parentContext;
354  auto serviceToken = m_serviceToken;
355  auto sContext = m_context;
356  queue->push( [worker, &principal, &es, streamID,parentContext,sContext, serviceToken]()
357  {
358  //Need to make the services available
359  ServiceRegistry::Operate guard(serviceToken);
360 
361  std::exception_ptr* ptr = nullptr;
362  worker->runModuleAfterAsyncPrefetch<T>(ptr,
363  principal,
364  es,
365  streamID,
366  parentContext,
367  sContext);
368  });
369  return nullptr;
370  }
371  }
372 
373  m_worker->runModuleAfterAsyncPrefetch<T>(excptr,
374  m_principal,
375  m_es,
376  m_streamID,
377  m_parentContext,
378  m_context);
379  return nullptr;
380  }
381 
382  private:
384  typename T::MyPrincipal const& m_principal;
385  EventSetup const& m_es;
388  typename T::Context const* m_context;
390  };
391 
392  std::atomic<int> timesRun_;
393  std::atomic<int> timesVisited_;
394  std::atomic<int> timesPassed_;
395  std::atomic<int> timesFailed_;
396  std::atomic<int> timesExcept_;
397  std::atomic<State> state_;
399  std::atomic<int> numberOfPathsLeftToRun_;
400 
402 
403  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
404  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
405 
406  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
407 
408  edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
409 
410  edm::WaitingTaskList waitingTasks_;
411  std::atomic<bool> workStarted_;
412  };
413 
414  namespace {
415  template <typename T>
416  class ModuleSignalSentry {
417  public:
418  ModuleSignalSentry(ActivityRegistry *a,
419  typename T::Context const* context,
420  ModuleCallingContext const* moduleCallingContext) :
421  a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
422 
423  if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
424  }
425 
426  ~ModuleSignalSentry() {
427  if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
428  }
429 
430  private:
431  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
432  typename T::Context const* context_;
433  ModuleCallingContext const* moduleCallingContext_;
434  };
435 
436  }
437 
438  namespace workerhelper {
439  template<>
441  public:
443  static bool call(Worker* iWorker, StreamID,
444  EventPrincipal const& ep, EventSetup const& es,
445  ActivityRegistry* /* actReg */,
446  ModuleCallingContext const* mcc,
447  Arg::Context const* /* context*/) {
448  //Signal sentry is handled by the module
449  return iWorker->implDo(ep,es, mcc);
450  }
451  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
452  typename Arg::MyPrincipal const& ep,
453  ModuleCallingContext const* mcc) {
454  return iWorker->implDoPrePrefetchSelection(id,ep,mcc);
455  }
456  };
457 
458  template<>
460  public:
462  static bool call(Worker* iWorker,StreamID,
463  RunPrincipal const& ep, EventSetup const& es,
464  ActivityRegistry* actReg,
465  ModuleCallingContext const* mcc,
466  Arg::Context const* context) {
467  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
468  return iWorker->implDoBegin(ep,es, mcc);
469  }
470  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
471  typename Arg::MyPrincipal const& ep,
472  ModuleCallingContext const* mcc) {
473  return true;
474  }
475  };
476  template<>
478  public:
480  static bool call(Worker* iWorker,StreamID id,
481  RunPrincipal const & ep, EventSetup const& es,
482  ActivityRegistry* actReg,
483  ModuleCallingContext const* mcc,
484  Arg::Context const* context) {
485  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
486  return iWorker->implDoStreamBegin(id,ep,es, mcc);
487  }
488  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
489  typename Arg::MyPrincipal const& ep,
490  ModuleCallingContext const* mcc) {
491  return true;
492  }
493  };
494  template<>
496  public:
498  static bool call(Worker* iWorker,StreamID,
499  RunPrincipal const& ep, EventSetup const& es,
500  ActivityRegistry* actReg,
501  ModuleCallingContext const* mcc,
502  Arg::Context const* context) {
503  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
504  return iWorker->implDoEnd(ep,es, mcc);
505  }
506  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
507  typename Arg::MyPrincipal const& ep,
508  ModuleCallingContext const* mcc) {
509  return true;
510  }
511  };
512  template<>
514  public:
516  static bool call(Worker* iWorker,StreamID id,
517  RunPrincipal const& ep, EventSetup const& es,
518  ActivityRegistry* actReg,
519  ModuleCallingContext const* mcc,
520  Arg::Context const* context) {
521  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
522  return iWorker->implDoStreamEnd(id,ep,es, mcc);
523  }
524  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
525  typename Arg::MyPrincipal const& ep,
526  ModuleCallingContext const* mcc) {
527  return true;
528  }
529  };
530 
531  template<>
533  public:
535  static bool call(Worker* iWorker,StreamID,
536  LuminosityBlockPrincipal const& ep, EventSetup const& es,
537  ActivityRegistry* actReg,
538  ModuleCallingContext const* mcc,
539  Arg::Context const* context) {
540  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
541  return iWorker->implDoBegin(ep,es, mcc);
542  }
543 
544  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
545  typename Arg::MyPrincipal const& ep,
546  ModuleCallingContext const* mcc) {
547  return true;
548  }
549  };
550  template<>
552  public:
554  static bool call(Worker* iWorker,StreamID id,
555  LuminosityBlockPrincipal const& ep, EventSetup const& es,
556  ActivityRegistry* actReg,
557  ModuleCallingContext const* mcc,
558  Arg::Context const* context) {
559  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
560  return iWorker->implDoStreamBegin(id,ep,es, mcc);
561  }
562 
563  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
564  typename Arg::MyPrincipal const& ep,
565  ModuleCallingContext const* mcc) {
566  return true;
567  }
568 };
569 
570  template<>
572  public:
574  static bool call(Worker* iWorker,StreamID,
575  LuminosityBlockPrincipal const& ep, EventSetup const& es,
576  ActivityRegistry* actReg,
577  ModuleCallingContext const* mcc,
578  Arg::Context const* context) {
579  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
580  return iWorker->implDoEnd(ep,es, mcc);
581  }
582  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
583  typename Arg::MyPrincipal const& ep,
584  ModuleCallingContext const* mcc) {
585  return true;
586  }
587 
588  };
589  template<>
591  public:
593  static bool call(Worker* iWorker,StreamID id,
594  LuminosityBlockPrincipal const& ep, EventSetup const& es,
595  ActivityRegistry* actReg,
596  ModuleCallingContext const* mcc,
597  Arg::Context const* context) {
598  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
599  return iWorker->implDoStreamEnd(id,ep,es, mcc);
600  }
601 
602  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
603  typename Arg::MyPrincipal const& ep,
604  ModuleCallingContext const* mcc) {
605  return true;
606  }
607  };
608  }
609 
610 
611  template <typename T>
613  typename T::MyPrincipal const& ep,
614  EventSetup const& es,
615  StreamID streamID,
616  ParentContext const& parentContext,
617  typename T::Context const* context) {
618  waitingTasks_.add(task);
619  if(T::isEvent_) {
620  timesVisited_.fetch_add(1,std::memory_order_relaxed);
621  }
622 
623  bool expected = false;
624  if(workStarted_.compare_exchange_strong(expected,true)) {
625  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching,parentContext,nullptr);
626 
627  //if have TriggerResults based selection we want to reject the event before doing prefetching
628  try {
629  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
630  setPassed<T::isEvent_>();
631  waitingTasks_.doneWaiting(nullptr);
632  return;
633  }
634  }catch(...) {}
635 
636  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
637  this, ep,es,streamID,parentContext,context);
638  prefetchAsync(runTask, parentContext, ep);
639  }
640  }
641 
642  template<typename T>
643  void Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
644  typename T::MyPrincipal const& ep,
645  EventSetup const& es,
646  StreamID streamID,
647  ParentContext const& parentContext,
648  typename T::Context const* context) {
649  std::exception_ptr exceptionPtr;
650  if(iEPtr) {
651  assert(*iEPtr);
653  if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
654  exceptionPtr = *iEPtr;
655  setException<T::isEvent_>(exceptionPtr);
656  } else {
657  setPassed<T::isEvent_>();
658  }
659  moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr);
660  } else {
661  try {
662  runModule<T>(ep,es,streamID,parentContext,context);
663  } catch(...) {
664  exceptionPtr = std::current_exception();
665  }
666  }
667  waitingTasks_.doneWaiting(exceptionPtr);
668  }
669 
670  template <typename T>
672  typename T::MyPrincipal const& principal,
673  EventSetup const& es,
674  StreamID streamID,
675  ParentContext const& parentContext,
676  typename T::Context const* context) {
677  waitingTasks_.add(task);
678  bool expected = false;
679  if(workStarted_.compare_exchange_strong(expected,true)) {
680  auto serviceToken = ServiceRegistry::instance().presentToken();
681 
682  auto toDo =[this, &principal, &es, streamID,parentContext,context, serviceToken]()
683  {
684  std::exception_ptr exceptionPtr;
685  try {
686  //Need to make the services available
687  ServiceRegistry::Operate guard(serviceToken);
688 
689  this->runModule<T>(principal,
690  es,
691  streamID,
692  parentContext,
693  context);
694  } catch( ... ) {
695  exceptionPtr = std::current_exception();
696  }
697  this->waitingTasks_.doneWaiting(exceptionPtr);
698  };
699  if(auto queue = this->serializeRunModule()) {
700  queue->push( toDo);
701  } else {
702  auto task = make_functor_task( tbb::task::allocate_root(), toDo);
703  tbb::task::spawn(*task);
704  }
705  }
706  }
707 
708  template <typename T>
709  bool Worker::doWork(typename T::MyPrincipal const& ep,
710  EventSetup const& es,
711  StreamID streamID,
712  ParentContext const& parentContext,
713  typename T::Context const* context) {
714 
715  if (T::isEvent_) {
716  timesVisited_.fetch_add(1,std::memory_order_relaxed);
717  }
718  bool rc = false;
719 
720  switch(state_) {
721  case Ready: break;
722  case Pass: return true;
723  case Fail: return false;
724  case Exception: {
725  std::rethrow_exception(cached_exception_);
726  }
727  }
728 
729  bool expected = false;
730  if(not workStarted_.compare_exchange_strong(expected, true) ) {
731  //another thread beat us here
732  auto waitTask = edm::make_empty_waiting_task();
733  waitTask->increment_ref_count();
734 
735  waitingTasks_.add(waitTask.get());
736 
737  waitTask->wait_for_all();
738 
739  switch(state_) {
740  case Ready: assert(false);
741  case Pass: return true;
742  case Fail: return false;
743  case Exception: {
744  std::rethrow_exception(cached_exception_);
745  }
746  }
747  }
748 
749  //Need the context to be set until after any exception is resolved
750  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching,parentContext,nullptr);
751 
752  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
753  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
754 
755  if (T::isEvent_) {
756  try {
757  //if have TriggerResults based selection we want to reject the event before doing prefetching
758  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
759  timesRun_.fetch_add(1,std::memory_order_relaxed);
760  rc = setPassed<T::isEvent_>();
761  waitingTasks_.doneWaiting(nullptr);
762  return true;
763  }
764  }catch(...) {}
765  auto waitTask = edm::make_empty_waiting_task();
766  {
767  //Make sure signal is sent once the prefetching is done
768  // [the 'pre' signal was sent in prefetchAsync]
769  //The purpose of this block is to send the signal after wait_for_all
770  auto sentryFunc = [this](void*) {
771  emitPostModuleEventPrefetchingSignal();
772  };
773  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
774 
775  //set count to 2 since wait_for_all requires value to not go to 0
776  waitTask->set_ref_count(2);
777 
778  prefetchAsync(waitTask.get(),parentContext, ep);
779  waitTask->decrement_ref_count();
780  waitTask->wait_for_all();
781  }
782  if(waitTask->exceptionPtr() != nullptr) {
784  if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
785  setException<T::isEvent_>(*waitTask->exceptionPtr());
786  waitingTasks_.doneWaiting(cached_exception_);
787  std::rethrow_exception(cached_exception_);
788  } else {
789  setPassed<T::isEvent_>();
790  waitingTasks_.doneWaiting(nullptr);
791  return true;
792  }
793  }
794  }
795 
796  //successful prefetch so no reset necessary
797  prefetchSentry.release();
798  if(auto queue = serializeRunModule()) {
799  auto serviceToken = ServiceRegistry::instance().presentToken();
800  queue->pushAndWait([&]() {
801  //Need to make the services available
802  ServiceRegistry::Operate guard(serviceToken);
803  try {
804  rc = runModule<T>(ep,es,streamID,parentContext,context);
805  } catch(...) {
806  }
807  });
808  } else {
809  try {
810  rc = runModule<T>(ep,es,streamID,parentContext,context);
811  } catch(...) {
812  }
813  }
814  if(state_ == Exception) {
815  waitingTasks_.doneWaiting(cached_exception_);
816  std::rethrow_exception(cached_exception_);
817  }
818 
819  waitingTasks_.doneWaiting(nullptr);
820  return rc;
821  }
822 
823 
824  template <typename T>
825  bool Worker::runModule(typename T::MyPrincipal const& ep,
826  EventSetup const& es,
827  StreamID streamID,
828  ParentContext const& parentContext,
829  typename T::Context const* context) {
830  //unscheduled producers should advance this
831  //if (T::isEvent_) {
832  // ++timesVisited_;
833  //}
834  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
835  if (T::isEvent_) {
836  timesRun_.fetch_add(1,std::memory_order_relaxed);
837  }
838 
839  bool rc = true;
840  try {
842  {
843  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
844 
845  if (rc) {
846  setPassed<T::isEvent_>();
847  } else {
848  setFailed<T::isEvent_>();
849  }
850  });
851  } catch(cms::Exception& ex) {
852  exceptionContext(ex, &moduleCallingContext_);
854  if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
855  assert(not cached_exception_);
856  setException<T::isEvent_>(std::current_exception());
857  std::rethrow_exception(cached_exception_);
858  } else {
859  rc = setPassed<T::isEvent_>();
860  }
861  }
862 
863  return rc;
864  }
865 }
866 #endif
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:574
tbb::task * execute() override
Definition: Worker.h:329
std::atomic< int > timesVisited_
Definition: Worker.h:393
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:315
ModuleDescription const & description() const
Definition: Worker.h:132
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
Definition: Worker.h:388
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:825
void callWhenDoneAsync(WaitingTask *task)
Definition: Worker.h:107
T::MyPrincipal const & m_principal
Definition: Worker.h:384
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:399
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:287
virtual std::string value() const override
Definition: Worker.h:253
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:506
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:516
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:553
Definition: hltDiff.cc:289
static PFTauRenderPlugin instance
ParentContext const m_parentContext
Definition: Worker.h:387
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< int > timesExcept_
Definition: Worker.h:396
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:497
std::atomic< int > timesFailed_
Definition: Worker.h:395
int timesPassed() const
Definition: Worker.h:168
void addedToPath()
Definition: Worker.h:162
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:709
State state() const
Definition: Worker.h:171
void clearCounters()
Definition: Worker.h:154
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:535
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:462
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:443
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:582
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:544
ActivityRegistry * activityRegistry()
Definition: Worker.h:206
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:593
void exceptionContext(std::ostream &, GlobalContext const &)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:563
ExceptionToActionTable const * actions_
Definition: Worker.h:403
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:401
BranchType
Definition: BranchType.h:11
void beginJob()
Definition: Breakpoints.cc:15
bool setFailed()
Definition: Worker.h:278
int timesExcept() const
Definition: Worker.h:170
def principal(options)
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
void reset()
Definition: Worker.h:122
int timesVisited() const
Definition: Worker.h:167
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:461
ModuleDescription const * descPtr() const
Definition: Worker.h:133
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:602
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.h:120
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:573
std::atomic< State > state_
Definition: Worker.h:397
int timesRun() const
Definition: Worker.h:166
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:480
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:498
Definition: Types.py:1
int numberOfPathsOn_
Definition: Worker.h:398
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:592
bool setPassed()
Definition: Worker.h:269
TransitionIDValue(T const &iP)
Definition: Worker.h:252
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:119
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:442
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:671
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:479
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:115
int timesPass() const
Definition: Worker.h:173
const T & get() const
Definition: EventSetup.h:56
Definition: hltDiff.cc:290
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:554
int timesFailed() const
Definition: Worker.h:169
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:116
void preForkReleaseResources()
Definition: Worker.h:118
HLT enums.
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:451
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:300
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
double a
Definition: hdecay.h:121
std::atomic< int > timesPassed_
Definition: Worker.h:394
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:470
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:612
static Interceptor::Registry registry("Interceptor")
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:534
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:524
std::atomic< int > timesRun_
Definition: Worker.h:392
long double T
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:515
ServiceToken m_serviceToken
Definition: Worker.h:389
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:488
void runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:643
EventSetup const & m_es
Definition: Worker.h:385