CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
50 
52 
53 #include <map>
54 #include <memory>
55 #include <sstream>
56 #include <string>
57 #include <vector>
58 #include <exception>
59 #include <unordered_map>
60 
61 namespace edm {
62  class EventPrincipal;
63  class EarlyDeleteHelper;
64  class ProductResolverIndexHelper;
65  class ProductResolverIndexAndSkipBit;
66  class StreamID;
67  class StreamContext;
68  class ProductRegistry;
69  class ThinnedAssociationsHelper;
70  class WaitingTask;
71 
72  namespace workerhelper {
73  template< typename O> class CallImpl;
74  }
75 
76  class Worker {
77  public:
78  enum State { Ready, Pass, Fail, Exception };
79  enum Types { kAnalyzer, kFilter, kProducer, kOutputModule};
80 
81  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
82  virtual ~Worker();
83 
84  Worker(Worker const&) = delete; // Disallow copying and moving
85  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
86 
87  template <typename T>
88  bool doWork(typename T::MyPrincipal const&, EventSetup const& c,
89  StreamID stream,
90  ParentContext const& parentContext,
91  typename T::Context const* context);
92  template <typename T>
93  void doWorkAsync(WaitingTask* task,
94  typename T::MyPrincipal const&, EventSetup const& c,
95  StreamID stream,
96  ParentContext const& parentContext,
97  typename T::Context const* context);
98 
99  template <typename T>
100  void doWorkNoPrefetchingAsync(WaitingTask* task,
101  typename T::MyPrincipal const&,
102  EventSetup const& c,
103  StreamID stream,
104  ParentContext const& parentContext,
105  typename T::Context const* context);
106 
108  waitingTasks_.add(task);
109  }
110  void skipOnPath();
111  void beginJob() ;
112  void endJob();
113  void beginStream(StreamID id, StreamContext& streamContext);
114  void endStream(StreamID id, StreamContext& streamContext);
115  void respondToOpenInputFile(FileBlock const& fb) {implRespondToOpenInputFile(fb);}
116  void respondToCloseInputFile(FileBlock const& fb) {implRespondToCloseInputFile(fb);}
117 
118  void preForkReleaseResources() {implPreForkReleaseResources();}
119  void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
120  void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper) { implRegisterThinnedAssociations(registry, helper); }
121 
122  void reset() {
123  cached_exception_ = std::exception_ptr();
124  state_ = Ready;
125  waitingTasks_.reset();
126  workStarted_ = false;
127  numberOfPathsLeftToRun_ = numberOfPathsOn_;
128  }
129 
130  void postDoEvent(EventPrincipal const&);
131 
132  ModuleDescription const& description() const {return *(moduleCallingContext_.moduleDescription());}
133  ModuleDescription const* descPtr() const {return moduleCallingContext_.moduleDescription(); }
136  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
137 
138  void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
139 
140  //Used to make EDGetToken work
141  virtual void updateLookup(BranchType iBranchType,
142  ProductResolverIndexHelper const&) = 0;
143  virtual void resolvePutIndicies(BranchType iBranchType,
144  std::unordered_multimap<std::string, edm::ProductResolverIndex> const& iIndicies) = 0;
145 
146  virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>& modules,
147  ProductRegistry const& preg,
148  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
149 
150  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
151 
152  virtual Types moduleType() const =0;
153 
154  void clearCounters() {
155  timesRun_.store(0,std::memory_order_release);
156  timesVisited_.store(0,std::memory_order_release);
157  timesPassed_.store(0,std::memory_order_release);
158  timesFailed_.store(0,std::memory_order_release);
159  timesExcept_.store(0,std::memory_order_release);
160  }
161 
162  void addedToPath() {
163  ++numberOfPathsOn_;
164  }
165  //NOTE: calling state() is done to force synchronization across threads
166  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
167  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
168  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
169  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
170  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
171  State state() const { return state_; }
172 
173  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
174 
175  protected:
176  template<typename O> friend class workerhelper::CallImpl;
177  virtual std::string workerType() const = 0;
178  virtual bool implDo(EventPrincipal const&, EventSetup const& c,
179  ModuleCallingContext const* mcc) = 0;
180  virtual bool implDoPrePrefetchSelection(StreamID id,
181  EventPrincipal const& ep,
182  ModuleCallingContext const* mcc) = 0;
183  virtual bool implDoBegin(RunPrincipal const& rp, EventSetup const& c,
184  ModuleCallingContext const* mcc) = 0;
185  virtual bool implDoStreamBegin(StreamID id, RunPrincipal const& rp, EventSetup const& c,
186  ModuleCallingContext const* mcc) = 0;
187  virtual bool implDoStreamEnd(StreamID id, RunPrincipal const& rp, EventSetup const& c,
188  ModuleCallingContext const* mcc) = 0;
189  virtual bool implDoEnd(RunPrincipal const& rp, EventSetup const& c,
190  ModuleCallingContext const* mcc) = 0;
191  virtual bool implDoBegin(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
192  ModuleCallingContext const* mcc) = 0;
193  virtual bool implDoStreamBegin(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
194  ModuleCallingContext const* mcc) = 0;
195  virtual bool implDoStreamEnd(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
196  ModuleCallingContext const* mcc) = 0;
197  virtual bool implDoEnd(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
198  ModuleCallingContext const* mcc) = 0;
199  virtual void implBeginJob() = 0;
200  virtual void implEndJob() = 0;
201  virtual void implBeginStream(StreamID) = 0;
202  virtual void implEndStream(StreamID) = 0;
203 
204  void resetModuleDescription(ModuleDescription const*);
205 
206  ActivityRegistry* activityRegistry() { return actReg_.get(); }
207 
208  private:
209 
210  template <typename T>
211  bool runModule(typename T::MyPrincipal const&, EventSetup const& c,
212  StreamID stream,
213  ParentContext const& parentContext,
214  typename T::Context const* context);
215 
216  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
217  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
218 
219  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
220 
221 
222  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
223 
224  virtual void preActionBeforeRunEventAsync(WaitingTask* iTask, ModuleCallingContext const& moduleCallingContext, Principal const& iPrincipal) const = 0;
225 
226  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
227  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
228 
229  virtual void implPreForkReleaseResources() = 0;
230  virtual void implPostForkReacquireResources(unsigned int iChildIndex,
231  unsigned int iNumberOfChildren) = 0;
232  virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
233 
234  virtual SerialTaskQueueChain* serializeRunModule() = 0;
235 
236  static void exceptionContext(cms::Exception& ex,
237  ModuleCallingContext const* mcc);
238 
239  /*This base class is used to hide the differences between the ID used
240  for Event, LuminosityBlock and Run. Using the base class allows us
241  to only convert the ID to string form if it is actually needed in
242  the call to shouldRethrowException.
243  */
245  public:
246  virtual std::string value() const = 0;
248  };
249 
250  template< typename T>
252  public:
253  TransitionIDValue(T const& iP): p_(iP) {}
254  virtual std::string value() const override {
255  std::ostringstream iost;
256  iost<<p_.id();
257  return iost.str();
258  }
259  private:
260  T const& p_;
261 
262  };
263 
264  bool shouldRethrowException(std::exception_ptr iPtr,
265  ParentContext const& parentContext,
266  bool isEvent,
267  TransitionIDValueBase const& iID) const;
268 
269  template<bool IS_EVENT>
270  bool setPassed() {
271  if(IS_EVENT) {
272  timesPassed_.fetch_add(1,std::memory_order_relaxed);
273  }
274  state_ = Pass;
275  return true;
276  }
277 
278  template<bool IS_EVENT>
279  bool setFailed() {
280  if(IS_EVENT) {
281  timesFailed_.fetch_add(1,std::memory_order_relaxed);
282  }
283  state_ = Fail;
284  return false;
285  }
286 
287  template<bool IS_EVENT>
288  std::exception_ptr setException(std::exception_ptr iException) {
289  if (IS_EVENT) {
290  timesExcept_.fetch_add(1,std::memory_order_relaxed);
291  }
292  cached_exception_ = iException; // propagate_const<T> has no reset() function
293  state_ = Exception;
294  return cached_exception_;
295  }
296 
297  void prefetchAsync(WaitingTask*,
298  ParentContext const& parentContext,
299  Principal const& );
300 
302  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
303  }
304 
305  template<typename T>
306  void runModuleAfterAsyncPrefetch(std::exception_ptr const * iEPtr,
307  typename T::MyPrincipal const& ep,
308  EventSetup const& es,
309  StreamID streamID,
310  ParentContext const& parentContext,
311  typename T::Context const* context);
312 
313  template< typename T>
314  class RunModuleTask : public WaitingTask {
315  public:
317  typename T::MyPrincipal const& ep,
318  EventSetup const& es,
319  StreamID streamID,
320  ParentContext const& parentContext,
321  typename T::Context const* context):
322  m_worker(worker),
323  m_principal(ep),
324  m_es(es),
325  m_streamID(streamID),
326  m_parentContext(parentContext),
327  m_context(context),
328  m_serviceToken(ServiceRegistry::instance().presentToken()) {}
329 
330  tbb::task* execute() override {
331  //Need to make the services available early so other services can see them
332  ServiceRegistry::Operate guard(m_serviceToken);
333 
334  //incase the emit causes an exception, we need a memory location
335  // to hold the exception_ptr
336  std::exception_ptr temp_excptr;
337  auto excptr = exceptionPtr();
338  if(T::isEvent_) {
339  try {
340  //pre was called in prefetchAsync
341  m_worker->emitPostModuleEventPrefetchingSignal();
342  }catch(...) {
343  temp_excptr = std::current_exception();
344  if(not excptr) {
345  excptr = &temp_excptr;
346  }
347  }
348  }
349 
350  if( not excptr) {
351  if(auto queue = m_worker->serializeRunModule()) {
352  Worker* worker = m_worker;
353  auto const & principal = m_principal;
354  auto& es = m_es;
355  auto streamID = m_streamID;
356  auto parentContext = m_parentContext;
357  auto serviceToken = m_serviceToken;
358  auto sContext = m_context;
359  queue->push( [worker, &principal, &es, streamID,parentContext,sContext, serviceToken]()
360  {
361  //Need to make the services available
362  ServiceRegistry::Operate guard(serviceToken);
363 
364  std::exception_ptr* ptr = nullptr;
365  worker->runModuleAfterAsyncPrefetch<T>(ptr,
366  principal,
367  es,
368  streamID,
369  parentContext,
370  sContext);
371  });
372  return nullptr;
373  }
374  }
375 
376  m_worker->runModuleAfterAsyncPrefetch<T>(excptr,
377  m_principal,
378  m_es,
379  m_streamID,
380  m_parentContext,
381  m_context);
382  return nullptr;
383  }
384 
385  private:
387  typename T::MyPrincipal const& m_principal;
388  EventSetup const& m_es;
391  typename T::Context const* m_context;
393  };
394 
395  std::atomic<int> timesRun_;
396  std::atomic<int> timesVisited_;
397  std::atomic<int> timesPassed_;
398  std::atomic<int> timesFailed_;
399  std::atomic<int> timesExcept_;
400  std::atomic<State> state_;
402  std::atomic<int> numberOfPathsLeftToRun_;
403 
405 
406  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
407  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
408 
409  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
410 
411  edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
412 
413  edm::WaitingTaskList waitingTasks_;
414  std::atomic<bool> workStarted_;
415  };
416 
417  namespace {
418  template <typename T>
419  class ModuleSignalSentry {
420  public:
421  ModuleSignalSentry(ActivityRegistry *a,
422  typename T::Context const* context,
423  ModuleCallingContext const* moduleCallingContext) :
424  a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
425 
426  if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
427  }
428 
429  ~ModuleSignalSentry() {
430  if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
431  }
432 
433  private:
434  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
435  typename T::Context const* context_;
436  ModuleCallingContext const* moduleCallingContext_;
437  };
438 
439  }
440 
441  namespace workerhelper {
442  template<>
444  public:
446  static bool call(Worker* iWorker, StreamID,
447  EventPrincipal const& ep, EventSetup const& es,
448  ActivityRegistry* /* actReg */,
449  ModuleCallingContext const* mcc,
450  Arg::Context const* /* context*/) {
451  //Signal sentry is handled by the module
452  return iWorker->implDo(ep,es, mcc);
453  }
454  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
455  typename Arg::MyPrincipal const& ep,
456  ModuleCallingContext const* mcc) {
457  return iWorker->implDoPrePrefetchSelection(id,ep,mcc);
458  }
459  };
460 
461  template<>
463  public:
465  static bool call(Worker* iWorker,StreamID,
466  RunPrincipal const& ep, EventSetup const& es,
467  ActivityRegistry* actReg,
468  ModuleCallingContext const* mcc,
469  Arg::Context const* context) {
470  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
471  return iWorker->implDoBegin(ep,es, mcc);
472  }
473  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
474  typename Arg::MyPrincipal const& ep,
475  ModuleCallingContext const* mcc) {
476  return true;
477  }
478  };
479  template<>
481  public:
483  static bool call(Worker* iWorker,StreamID id,
484  RunPrincipal const & ep, EventSetup const& es,
485  ActivityRegistry* actReg,
486  ModuleCallingContext const* mcc,
487  Arg::Context const* context) {
488  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
489  return iWorker->implDoStreamBegin(id,ep,es, mcc);
490  }
491  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
492  typename Arg::MyPrincipal const& ep,
493  ModuleCallingContext const* mcc) {
494  return true;
495  }
496  };
497  template<>
499  public:
501  static bool call(Worker* iWorker,StreamID,
502  RunPrincipal const& ep, EventSetup const& es,
503  ActivityRegistry* actReg,
504  ModuleCallingContext const* mcc,
505  Arg::Context const* context) {
506  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
507  return iWorker->implDoEnd(ep,es, mcc);
508  }
509  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
510  typename Arg::MyPrincipal const& ep,
511  ModuleCallingContext const* mcc) {
512  return true;
513  }
514  };
515  template<>
517  public:
519  static bool call(Worker* iWorker,StreamID id,
520  RunPrincipal const& ep, EventSetup const& es,
521  ActivityRegistry* actReg,
522  ModuleCallingContext const* mcc,
523  Arg::Context const* context) {
524  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
525  return iWorker->implDoStreamEnd(id,ep,es, mcc);
526  }
527  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
528  typename Arg::MyPrincipal const& ep,
529  ModuleCallingContext const* mcc) {
530  return true;
531  }
532  };
533 
534  template<>
536  public:
538  static bool call(Worker* iWorker,StreamID,
539  LuminosityBlockPrincipal const& ep, EventSetup const& es,
540  ActivityRegistry* actReg,
541  ModuleCallingContext const* mcc,
542  Arg::Context const* context) {
543  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
544  return iWorker->implDoBegin(ep,es, mcc);
545  }
546 
547  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
548  typename Arg::MyPrincipal const& ep,
549  ModuleCallingContext const* mcc) {
550  return true;
551  }
552  };
553  template<>
555  public:
557  static bool call(Worker* iWorker,StreamID id,
558  LuminosityBlockPrincipal const& ep, EventSetup const& es,
559  ActivityRegistry* actReg,
560  ModuleCallingContext const* mcc,
561  Arg::Context const* context) {
562  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
563  return iWorker->implDoStreamBegin(id,ep,es, mcc);
564  }
565 
566  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
567  typename Arg::MyPrincipal const& ep,
568  ModuleCallingContext const* mcc) {
569  return true;
570  }
571 };
572 
573  template<>
575  public:
577  static bool call(Worker* iWorker,StreamID,
578  LuminosityBlockPrincipal const& ep, EventSetup const& es,
579  ActivityRegistry* actReg,
580  ModuleCallingContext const* mcc,
581  Arg::Context const* context) {
582  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
583  return iWorker->implDoEnd(ep,es, mcc);
584  }
585  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
586  typename Arg::MyPrincipal const& ep,
587  ModuleCallingContext const* mcc) {
588  return true;
589  }
590 
591  };
592  template<>
594  public:
596  static bool call(Worker* iWorker,StreamID id,
597  LuminosityBlockPrincipal const& ep, EventSetup const& es,
598  ActivityRegistry* actReg,
599  ModuleCallingContext const* mcc,
600  Arg::Context const* context) {
601  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
602  return iWorker->implDoStreamEnd(id,ep,es, mcc);
603  }
604 
605  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
606  typename Arg::MyPrincipal const& ep,
607  ModuleCallingContext const* mcc) {
608  return true;
609  }
610  };
611  }
612 
613 
614  template <typename T>
616  typename T::MyPrincipal const& ep,
617  EventSetup const& es,
618  StreamID streamID,
619  ParentContext const& parentContext,
620  typename T::Context const* context) {
621  waitingTasks_.add(task);
622  if(T::isEvent_) {
623  timesVisited_.fetch_add(1,std::memory_order_relaxed);
624  }
625 
626  bool expected = false;
627  if(workStarted_.compare_exchange_strong(expected,true)) {
628  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching,parentContext,nullptr);
629 
630  //if have TriggerResults based selection we want to reject the event before doing prefetching
631  try {
632  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
633  setPassed<T::isEvent_>();
634  waitingTasks_.doneWaiting(nullptr);
635  return;
636  }
637  }catch(...) {}
638 
639  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
640  this, ep,es,streamID,parentContext,context);
641  prefetchAsync(runTask, parentContext, ep);
642  }
643  }
644 
645  template<typename T>
646  void Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
647  typename T::MyPrincipal const& ep,
648  EventSetup const& es,
649  StreamID streamID,
650  ParentContext const& parentContext,
651  typename T::Context const* context) {
652  std::exception_ptr exceptionPtr;
653  if(iEPtr) {
654  assert(*iEPtr);
656  if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
657  exceptionPtr = *iEPtr;
658  setException<T::isEvent_>(exceptionPtr);
659  } else {
660  setPassed<T::isEvent_>();
661  }
662  moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr);
663  } else {
664  try {
665  runModule<T>(ep,es,streamID,parentContext,context);
666  } catch(...) {
667  exceptionPtr = std::current_exception();
668  }
669  }
670  waitingTasks_.doneWaiting(exceptionPtr);
671  }
672 
673  template <typename T>
675  typename T::MyPrincipal const& principal,
676  EventSetup const& es,
677  StreamID streamID,
678  ParentContext const& parentContext,
679  typename T::Context const* context) {
680  waitingTasks_.add(task);
681  bool expected = false;
682  if(workStarted_.compare_exchange_strong(expected,true)) {
683  auto serviceToken = ServiceRegistry::instance().presentToken();
684 
685  auto toDo =[this, &principal, &es, streamID,parentContext,context, serviceToken]()
686  {
687  std::exception_ptr exceptionPtr;
688  try {
689  //Need to make the services available
690  ServiceRegistry::Operate guard(serviceToken);
691 
692  this->runModule<T>(principal,
693  es,
694  streamID,
695  parentContext,
696  context);
697  } catch( ... ) {
698  exceptionPtr = std::current_exception();
699  }
700  this->waitingTasks_.doneWaiting(exceptionPtr);
701  };
702  if(auto queue = this->serializeRunModule()) {
703  queue->push( toDo);
704  } else {
705  auto task = make_functor_task( tbb::task::allocate_root(), toDo);
706  tbb::task::spawn(*task);
707  }
708  }
709  }
710 
711  template <typename T>
712  bool Worker::doWork(typename T::MyPrincipal const& ep,
713  EventSetup const& es,
714  StreamID streamID,
715  ParentContext const& parentContext,
716  typename T::Context const* context) {
717 
718  if (T::isEvent_) {
719  timesVisited_.fetch_add(1,std::memory_order_relaxed);
720  }
721  bool rc = false;
722 
723  switch(state_) {
724  case Ready: break;
725  case Pass: return true;
726  case Fail: return false;
727  case Exception: {
728  std::rethrow_exception(cached_exception_);
729  }
730  }
731 
732  bool expected = false;
733  if(not workStarted_.compare_exchange_strong(expected, true) ) {
734  //another thread beat us here
735  auto waitTask = edm::make_empty_waiting_task();
736  waitTask->increment_ref_count();
737 
738  waitingTasks_.add(waitTask.get());
739 
740  waitTask->wait_for_all();
741 
742  switch(state_) {
743  case Ready: assert(false);
744  case Pass: return true;
745  case Fail: return false;
746  case Exception: {
747  std::rethrow_exception(cached_exception_);
748  }
749  }
750  }
751 
752  //Need the context to be set until after any exception is resolved
753  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching,parentContext,nullptr);
754 
755  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
756  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
757 
758  if (T::isEvent_) {
759  try {
760  //if have TriggerResults based selection we want to reject the event before doing prefetching
761  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
762  timesRun_.fetch_add(1,std::memory_order_relaxed);
763  rc = setPassed<T::isEvent_>();
764  waitingTasks_.doneWaiting(nullptr);
765  return true;
766  }
767  }catch(...) {}
768  auto waitTask = edm::make_empty_waiting_task();
769  {
770  //Make sure signal is sent once the prefetching is done
771  // [the 'pre' signal was sent in prefetchAsync]
772  //The purpose of this block is to send the signal after wait_for_all
773  auto sentryFunc = [this](void*) {
774  emitPostModuleEventPrefetchingSignal();
775  };
776  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
777 
778  //set count to 2 since wait_for_all requires value to not go to 0
779  waitTask->set_ref_count(2);
780 
781  prefetchAsync(waitTask.get(),parentContext, ep);
782  waitTask->decrement_ref_count();
783  waitTask->wait_for_all();
784  }
785  if(waitTask->exceptionPtr() != nullptr) {
787  if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
788  setException<T::isEvent_>(*waitTask->exceptionPtr());
789  waitingTasks_.doneWaiting(cached_exception_);
790  std::rethrow_exception(cached_exception_);
791  } else {
792  setPassed<T::isEvent_>();
793  waitingTasks_.doneWaiting(nullptr);
794  return true;
795  }
796  }
797  }
798 
799  //successful prefetch so no reset necessary
800  prefetchSentry.release();
801  if(auto queue = serializeRunModule()) {
802  auto serviceToken = ServiceRegistry::instance().presentToken();
803  queue->pushAndWait([&]() {
804  //Need to make the services available
805  ServiceRegistry::Operate guard(serviceToken);
806  try {
807  rc = runModule<T>(ep,es,streamID,parentContext,context);
808  } catch(...) {
809  }
810  });
811  } else {
812  try {
813  rc = runModule<T>(ep,es,streamID,parentContext,context);
814  } catch(...) {
815  }
816  }
817  if(state_ == Exception) {
818  waitingTasks_.doneWaiting(cached_exception_);
819  std::rethrow_exception(cached_exception_);
820  }
821 
822  waitingTasks_.doneWaiting(nullptr);
823  return rc;
824  }
825 
826 
827  template <typename T>
828  bool Worker::runModule(typename T::MyPrincipal const& ep,
829  EventSetup const& es,
830  StreamID streamID,
831  ParentContext const& parentContext,
832  typename T::Context const* context) {
833  //unscheduled producers should advance this
834  //if (T::isEvent_) {
835  // ++timesVisited_;
836  //}
837  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
838  if (T::isEvent_) {
839  timesRun_.fetch_add(1,std::memory_order_relaxed);
840  }
841 
842  bool rc = true;
843  try {
845  {
846  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
847 
848  if (rc) {
849  setPassed<T::isEvent_>();
850  } else {
851  setFailed<T::isEvent_>();
852  }
853  });
854  } catch(cms::Exception& ex) {
855  exceptionContext(ex, &moduleCallingContext_);
857  if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
858  assert(not cached_exception_);
859  setException<T::isEvent_>(std::current_exception());
860  std::rethrow_exception(cached_exception_);
861  } else {
862  rc = setPassed<T::isEvent_>();
863  }
864  }
865 
866  return rc;
867  }
868 }
869 #endif
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:577
tbb::task * execute() override
Definition: Worker.h:330
std::atomic< int > timesVisited_
Definition: Worker.h:396
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:316
ModuleDescription const & description() const
Definition: Worker.h:132
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
Definition: Worker.h:391
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:828
void callWhenDoneAsync(WaitingTask *task)
Definition: Worker.h:107
T::MyPrincipal const & m_principal
Definition: Worker.h:387
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:402
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:288
virtual std::string value() const override
Definition: Worker.h:254
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:509
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:519
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:556
Definition: hltDiff.cc:290
static PFTauRenderPlugin instance
ParentContext const m_parentContext
Definition: Worker.h:390
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< int > timesExcept_
Definition: Worker.h:399
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:500
std::atomic< int > timesFailed_
Definition: Worker.h:398
int timesPassed() const
Definition: Worker.h:168
void addedToPath()
Definition: Worker.h:162
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:712
State state() const
Definition: Worker.h:171
void clearCounters()
Definition: Worker.h:154
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:538
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:465
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:446
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:585
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:547
ActivityRegistry * activityRegistry()
Definition: Worker.h:206
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:596
void exceptionContext(std::ostream &, GlobalContext const &)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:566
ExceptionToActionTable const * actions_
Definition: Worker.h:406
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:404
BranchType
Definition: BranchType.h:11
void beginJob()
Definition: Breakpoints.cc:15
bool setFailed()
Definition: Worker.h:279
int timesExcept() const
Definition: Worker.h:170
def principal(options)
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
void reset()
Definition: Worker.h:122
int timesVisited() const
Definition: Worker.h:167
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:464
ModuleDescription const * descPtr() const
Definition: Worker.h:133
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:605
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.h:120
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:576
std::atomic< State > state_
Definition: Worker.h:400
int timesRun() const
Definition: Worker.h:166
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:483
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:501
Definition: Types.py:1
int numberOfPathsOn_
Definition: Worker.h:401
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:595
bool setPassed()
Definition: Worker.h:270
TransitionIDValue(T const &iP)
Definition: Worker.h:253
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:119
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:445
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:674
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:482
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:115
int timesPass() const
Definition: Worker.h:173
const T & get() const
Definition: EventSetup.h:56
Definition: hltDiff.cc:291
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:557
int timesFailed() const
Definition: Worker.h:169
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:116
void preForkReleaseResources()
Definition: Worker.h:118
HLT enums.
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:454
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:301
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
double a
Definition: hdecay.h:121
std::atomic< int > timesPassed_
Definition: Worker.h:397
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:473
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:615
static Interceptor::Registry registry("Interceptor")
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:537
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:527
std::atomic< int > timesRun_
Definition: Worker.h:395
long double T
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:518
ServiceToken m_serviceToken
Definition: Worker.h:392
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:491
void runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:646
EventSetup const & m_es
Definition: Worker.h:388