CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
50 
52 
53 #include <map>
54 #include <memory>
55 #include <sstream>
56 #include <string>
57 #include <vector>
58 #include <exception>
59 #include <unordered_map>
60 
61 namespace edm {
62  class EventPrincipal;
63  class EarlyDeleteHelper;
64  class ProductResolverIndexHelper;
65  class ProductResolverIndexAndSkipBit;
66  class StreamID;
67  class StreamContext;
68  class ProductRegistry;
69  class ThinnedAssociationsHelper;
70  class WaitingTask;
71 
72  namespace workerhelper {
73  template< typename O> class CallImpl;
74  }
75 
76  class Worker {
77  public:
78  enum State { Ready, Pass, Fail, Exception };
79  enum Types { kAnalyzer, kFilter, kProducer, kOutputModule};
80 
81  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
82  virtual ~Worker();
83 
84  Worker(Worker const&) = delete; // Disallow copying and moving
85  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
86 
87  template <typename T>
88  bool doWork(typename T::MyPrincipal const&, EventSetup const& c,
89  StreamID stream,
90  ParentContext const& parentContext,
91  typename T::Context const* context);
92  template <typename T>
93  void doWorkAsync(WaitingTask* task,
94  typename T::MyPrincipal const&, EventSetup const& c,
95  StreamID stream,
96  ParentContext const& parentContext,
97  typename T::Context const* context);
98 
99  template <typename T>
100  void doWorkNoPrefetchingAsync(WaitingTask* task,
101  typename T::MyPrincipal const&,
102  EventSetup const& c,
103  StreamID stream,
104  ParentContext const& parentContext,
105  typename T::Context const* context);
106 
108  waitingTasks_.add(task);
109  }
110  void skipOnPath();
111  void beginJob() ;
112  void endJob();
113  void beginStream(StreamID id, StreamContext& streamContext);
114  void endStream(StreamID id, StreamContext& streamContext);
115  void respondToOpenInputFile(FileBlock const& fb) {implRespondToOpenInputFile(fb);}
116  void respondToCloseInputFile(FileBlock const& fb) {implRespondToCloseInputFile(fb);}
117 
118  void preForkReleaseResources() {implPreForkReleaseResources();}
119  void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
120  void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper) { implRegisterThinnedAssociations(registry, helper); }
121 
122  void reset() {
123  cached_exception_ = std::exception_ptr();
124  state_ = Ready;
125  waitingTasks_.reset();
126  workStarted_ = false;
127  numberOfPathsLeftToRun_ = numberOfPathsOn_;
128  }
129 
130  void postDoEvent(EventPrincipal const&);
131 
132  ModuleDescription const& description() const {return *(moduleCallingContext_.moduleDescription());}
133  ModuleDescription const* descPtr() const {return moduleCallingContext_.moduleDescription(); }
136  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
137 
138  void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
139 
140  //Used to make EDGetToken work
141  virtual void updateLookup(BranchType iBranchType,
142  ProductResolverIndexHelper const&) = 0;
143  virtual void resolvePutIndicies(BranchType iBranchType,
144  std::unordered_multimap<std::string, edm::ProductResolverIndex> const& iIndicies) = 0;
145 
146  virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>& modules,
147  ProductRegistry const& preg,
148  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
149 
150  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
151 
152  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
153 
154  virtual Types moduleType() const =0;
155 
156  void clearCounters() {
157  timesRun_.store(0,std::memory_order_release);
158  timesVisited_.store(0,std::memory_order_release);
159  timesPassed_.store(0,std::memory_order_release);
160  timesFailed_.store(0,std::memory_order_release);
161  timesExcept_.store(0,std::memory_order_release);
162  }
163 
164  void addedToPath() {
165  ++numberOfPathsOn_;
166  }
167  //NOTE: calling state() is done to force synchronization across threads
168  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
169  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
170  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
171  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
172  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
173  State state() const { return state_; }
174 
175  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
176 
177  protected:
178  template<typename O> friend class workerhelper::CallImpl;
179  virtual std::string workerType() const = 0;
180  virtual bool implDo(EventPrincipal const&, EventSetup const& c,
181  ModuleCallingContext const* mcc) = 0;
182  virtual bool implDoPrePrefetchSelection(StreamID id,
183  EventPrincipal const& ep,
184  ModuleCallingContext const* mcc) = 0;
185  virtual bool implDoBegin(RunPrincipal const& rp, EventSetup const& c,
186  ModuleCallingContext const* mcc) = 0;
187  virtual bool implDoStreamBegin(StreamID id, RunPrincipal const& rp, EventSetup const& c,
188  ModuleCallingContext const* mcc) = 0;
189  virtual bool implDoStreamEnd(StreamID id, RunPrincipal const& rp, EventSetup const& c,
190  ModuleCallingContext const* mcc) = 0;
191  virtual bool implDoEnd(RunPrincipal const& rp, EventSetup const& c,
192  ModuleCallingContext const* mcc) = 0;
193  virtual bool implDoBegin(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
194  ModuleCallingContext const* mcc) = 0;
195  virtual bool implDoStreamBegin(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
196  ModuleCallingContext const* mcc) = 0;
197  virtual bool implDoStreamEnd(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
198  ModuleCallingContext const* mcc) = 0;
199  virtual bool implDoEnd(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
200  ModuleCallingContext const* mcc) = 0;
201  virtual void implBeginJob() = 0;
202  virtual void implEndJob() = 0;
203  virtual void implBeginStream(StreamID) = 0;
204  virtual void implEndStream(StreamID) = 0;
205 
206  void resetModuleDescription(ModuleDescription const*);
207 
208  ActivityRegistry* activityRegistry() { return actReg_.get(); }
209 
210  private:
211 
212  template <typename T>
213  bool runModule(typename T::MyPrincipal const&, EventSetup const& c,
214  StreamID stream,
215  ParentContext const& parentContext,
216  typename T::Context const* context);
217 
218  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
219  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
220 
221  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
222 
223 
224  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
225 
226  virtual void preActionBeforeRunEventAsync(WaitingTask* iTask, ModuleCallingContext const& moduleCallingContext, Principal const& iPrincipal) const = 0;
227 
228  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
229  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
230 
231  virtual void implPreForkReleaseResources() = 0;
232  virtual void implPostForkReacquireResources(unsigned int iChildIndex,
233  unsigned int iNumberOfChildren) = 0;
234  virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
235 
236  virtual SerialTaskQueueChain* serializeRunModule() = 0;
237 
238  static void exceptionContext(cms::Exception& ex,
239  ModuleCallingContext const* mcc);
240 
241  /*This base class is used to hide the differences between the ID used
242  for Event, LuminosityBlock and Run. Using the base class allows us
243  to only convert the ID to string form if it is actually needed in
244  the call to shouldRethrowException.
245  */
247  public:
248  virtual std::string value() const = 0;
250  };
251 
252  template< typename T>
254  public:
255  TransitionIDValue(T const& iP): p_(iP) {}
256  virtual std::string value() const override {
257  std::ostringstream iost;
258  iost<<p_.id();
259  return iost.str();
260  }
261  private:
262  T const& p_;
263 
264  };
265 
266  bool shouldRethrowException(std::exception_ptr iPtr,
267  ParentContext const& parentContext,
268  bool isEvent,
269  TransitionIDValueBase const& iID) const;
270 
271  template<bool IS_EVENT>
272  bool setPassed() {
273  if(IS_EVENT) {
274  timesPassed_.fetch_add(1,std::memory_order_relaxed);
275  }
276  state_ = Pass;
277  return true;
278  }
279 
280  template<bool IS_EVENT>
281  bool setFailed() {
282  if(IS_EVENT) {
283  timesFailed_.fetch_add(1,std::memory_order_relaxed);
284  }
285  state_ = Fail;
286  return false;
287  }
288 
289  template<bool IS_EVENT>
290  std::exception_ptr setException(std::exception_ptr iException) {
291  if (IS_EVENT) {
292  timesExcept_.fetch_add(1,std::memory_order_relaxed);
293  }
294  cached_exception_ = iException; // propagate_const<T> has no reset() function
295  state_ = Exception;
296  return cached_exception_;
297  }
298 
299  void prefetchAsync(WaitingTask*,
300  ParentContext const& parentContext,
301  Principal const& );
302 
304  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
305  }
306 
307  template<typename T>
308  void runModuleAfterAsyncPrefetch(std::exception_ptr const * iEPtr,
309  typename T::MyPrincipal const& ep,
310  EventSetup const& es,
311  StreamID streamID,
312  ParentContext const& parentContext,
313  typename T::Context const* context);
314 
315  template< typename T>
316  class RunModuleTask : public WaitingTask {
317  public:
319  typename T::MyPrincipal const& ep,
320  EventSetup const& es,
321  StreamID streamID,
322  ParentContext const& parentContext,
323  typename T::Context const* context):
324  m_worker(worker),
325  m_principal(ep),
326  m_es(es),
327  m_streamID(streamID),
328  m_parentContext(parentContext),
329  m_context(context),
330  m_serviceToken(ServiceRegistry::instance().presentToken()) {}
331 
332  tbb::task* execute() override {
333  //Need to make the services available early so other services can see them
334  ServiceRegistry::Operate guard(m_serviceToken);
335 
336  //incase the emit causes an exception, we need a memory location
337  // to hold the exception_ptr
338  std::exception_ptr temp_excptr;
339  auto excptr = exceptionPtr();
340  if(T::isEvent_) {
341  try {
342  //pre was called in prefetchAsync
343  m_worker->emitPostModuleEventPrefetchingSignal();
344  }catch(...) {
345  temp_excptr = std::current_exception();
346  if(not excptr) {
347  excptr = &temp_excptr;
348  }
349  }
350  }
351 
352  if( not excptr) {
353  if(auto queue = m_worker->serializeRunModule()) {
354  Worker* worker = m_worker;
355  auto const & principal = m_principal;
356  auto& es = m_es;
357  auto streamID = m_streamID;
358  auto parentContext = m_parentContext;
359  auto serviceToken = m_serviceToken;
360  auto sContext = m_context;
361  queue->push( [worker, &principal, &es, streamID,parentContext,sContext, serviceToken]()
362  {
363  //Need to make the services available
364  ServiceRegistry::Operate guard(serviceToken);
365 
366  std::exception_ptr* ptr = nullptr;
367  worker->runModuleAfterAsyncPrefetch<T>(ptr,
368  principal,
369  es,
370  streamID,
371  parentContext,
372  sContext);
373  });
374  return nullptr;
375  }
376  }
377 
378  m_worker->runModuleAfterAsyncPrefetch<T>(excptr,
379  m_principal,
380  m_es,
381  m_streamID,
382  m_parentContext,
383  m_context);
384  return nullptr;
385  }
386 
387  private:
389  typename T::MyPrincipal const& m_principal;
390  EventSetup const& m_es;
393  typename T::Context const* m_context;
395  };
396 
397  std::atomic<int> timesRun_;
398  std::atomic<int> timesVisited_;
399  std::atomic<int> timesPassed_;
400  std::atomic<int> timesFailed_;
401  std::atomic<int> timesExcept_;
402  std::atomic<State> state_;
404  std::atomic<int> numberOfPathsLeftToRun_;
405 
407 
408  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
409  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
410 
411  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
412 
413  edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
414 
415  edm::WaitingTaskList waitingTasks_;
416  std::atomic<bool> workStarted_;
417  };
418 
419  namespace {
420  template <typename T>
421  class ModuleSignalSentry {
422  public:
423  ModuleSignalSentry(ActivityRegistry *a,
424  typename T::Context const* context,
425  ModuleCallingContext const* moduleCallingContext) :
426  a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
427 
428  if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
429  }
430 
431  ~ModuleSignalSentry() {
432  if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
433  }
434 
435  private:
436  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
437  typename T::Context const* context_;
438  ModuleCallingContext const* moduleCallingContext_;
439  };
440 
441  }
442 
443  namespace workerhelper {
444  template<>
446  public:
448  static bool call(Worker* iWorker, StreamID,
449  EventPrincipal const& ep, EventSetup const& es,
450  ActivityRegistry* /* actReg */,
451  ModuleCallingContext const* mcc,
452  Arg::Context const* /* context*/) {
453  //Signal sentry is handled by the module
454  return iWorker->implDo(ep,es, mcc);
455  }
456  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
457  typename Arg::MyPrincipal const& ep,
458  ModuleCallingContext const* mcc) {
459  return iWorker->implDoPrePrefetchSelection(id,ep,mcc);
460  }
461  };
462 
463  template<>
465  public:
467  static bool call(Worker* iWorker,StreamID,
468  RunPrincipal const& ep, EventSetup const& es,
469  ActivityRegistry* actReg,
470  ModuleCallingContext const* mcc,
471  Arg::Context const* context) {
472  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
473  return iWorker->implDoBegin(ep,es, mcc);
474  }
475  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
476  typename Arg::MyPrincipal const& ep,
477  ModuleCallingContext const* mcc) {
478  return true;
479  }
480  };
481  template<>
483  public:
485  static bool call(Worker* iWorker,StreamID id,
486  RunPrincipal const & ep, EventSetup const& es,
487  ActivityRegistry* actReg,
488  ModuleCallingContext const* mcc,
489  Arg::Context const* context) {
490  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
491  return iWorker->implDoStreamBegin(id,ep,es, mcc);
492  }
493  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
494  typename Arg::MyPrincipal const& ep,
495  ModuleCallingContext const* mcc) {
496  return true;
497  }
498  };
499  template<>
501  public:
503  static bool call(Worker* iWorker,StreamID,
504  RunPrincipal const& ep, EventSetup const& es,
505  ActivityRegistry* actReg,
506  ModuleCallingContext const* mcc,
507  Arg::Context const* context) {
508  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
509  return iWorker->implDoEnd(ep,es, mcc);
510  }
511  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
512  typename Arg::MyPrincipal const& ep,
513  ModuleCallingContext const* mcc) {
514  return true;
515  }
516  };
517  template<>
519  public:
521  static bool call(Worker* iWorker,StreamID id,
522  RunPrincipal const& ep, EventSetup const& es,
523  ActivityRegistry* actReg,
524  ModuleCallingContext const* mcc,
525  Arg::Context const* context) {
526  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
527  return iWorker->implDoStreamEnd(id,ep,es, mcc);
528  }
529  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
530  typename Arg::MyPrincipal const& ep,
531  ModuleCallingContext const* mcc) {
532  return true;
533  }
534  };
535 
536  template<>
538  public:
540  static bool call(Worker* iWorker,StreamID,
541  LuminosityBlockPrincipal const& ep, EventSetup const& es,
542  ActivityRegistry* actReg,
543  ModuleCallingContext const* mcc,
544  Arg::Context const* context) {
545  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
546  return iWorker->implDoBegin(ep,es, mcc);
547  }
548 
549  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
550  typename Arg::MyPrincipal const& ep,
551  ModuleCallingContext const* mcc) {
552  return true;
553  }
554  };
555  template<>
557  public:
559  static bool call(Worker* iWorker,StreamID id,
560  LuminosityBlockPrincipal const& ep, EventSetup const& es,
561  ActivityRegistry* actReg,
562  ModuleCallingContext const* mcc,
563  Arg::Context const* context) {
564  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
565  return iWorker->implDoStreamBegin(id,ep,es, mcc);
566  }
567 
568  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
569  typename Arg::MyPrincipal const& ep,
570  ModuleCallingContext const* mcc) {
571  return true;
572  }
573 };
574 
575  template<>
577  public:
579  static bool call(Worker* iWorker,StreamID,
580  LuminosityBlockPrincipal const& ep, EventSetup const& es,
581  ActivityRegistry* actReg,
582  ModuleCallingContext const* mcc,
583  Arg::Context const* context) {
584  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
585  return iWorker->implDoEnd(ep,es, mcc);
586  }
587  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
588  typename Arg::MyPrincipal const& ep,
589  ModuleCallingContext const* mcc) {
590  return true;
591  }
592 
593  };
594  template<>
596  public:
598  static bool call(Worker* iWorker,StreamID id,
599  LuminosityBlockPrincipal const& ep, EventSetup const& es,
600  ActivityRegistry* actReg,
601  ModuleCallingContext const* mcc,
602  Arg::Context const* context) {
603  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
604  return iWorker->implDoStreamEnd(id,ep,es, mcc);
605  }
606 
607  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
608  typename Arg::MyPrincipal const& ep,
609  ModuleCallingContext const* mcc) {
610  return true;
611  }
612  };
613  }
614 
615 
616  template <typename T>
618  typename T::MyPrincipal const& ep,
619  EventSetup const& es,
620  StreamID streamID,
621  ParentContext const& parentContext,
622  typename T::Context const* context) {
623  waitingTasks_.add(task);
624  if(T::isEvent_) {
625  timesVisited_.fetch_add(1,std::memory_order_relaxed);
626  }
627 
628  bool expected = false;
629  if(workStarted_.compare_exchange_strong(expected,true)) {
630  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching,parentContext,nullptr);
631 
632  //if have TriggerResults based selection we want to reject the event before doing prefetching
633  try {
634  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
635  setPassed<T::isEvent_>();
636  waitingTasks_.doneWaiting(nullptr);
637  return;
638  }
639  }catch(...) {}
640 
641  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
642  this, ep,es,streamID,parentContext,context);
643  prefetchAsync(runTask, parentContext, ep);
644  }
645  }
646 
647  template<typename T>
648  void Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
649  typename T::MyPrincipal const& ep,
650  EventSetup const& es,
651  StreamID streamID,
652  ParentContext const& parentContext,
653  typename T::Context const* context) {
654  std::exception_ptr exceptionPtr;
655  if(iEPtr) {
656  assert(*iEPtr);
658  if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
659  exceptionPtr = *iEPtr;
660  setException<T::isEvent_>(exceptionPtr);
661  } else {
662  setPassed<T::isEvent_>();
663  }
664  moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr);
665  } else {
666  try {
667  runModule<T>(ep,es,streamID,parentContext,context);
668  } catch(...) {
669  exceptionPtr = std::current_exception();
670  }
671  }
672  waitingTasks_.doneWaiting(exceptionPtr);
673  }
674 
675  template <typename T>
677  typename T::MyPrincipal const& principal,
678  EventSetup const& es,
679  StreamID streamID,
680  ParentContext const& parentContext,
681  typename T::Context const* context) {
682  waitingTasks_.add(task);
683  bool expected = false;
684  if(workStarted_.compare_exchange_strong(expected,true)) {
685  auto serviceToken = ServiceRegistry::instance().presentToken();
686 
687  auto toDo =[this, &principal, &es, streamID,parentContext,context, serviceToken]()
688  {
689  std::exception_ptr exceptionPtr;
690  try {
691  //Need to make the services available
692  ServiceRegistry::Operate guard(serviceToken);
693 
694  this->runModule<T>(principal,
695  es,
696  streamID,
697  parentContext,
698  context);
699  } catch( ... ) {
700  exceptionPtr = std::current_exception();
701  }
702  this->waitingTasks_.doneWaiting(exceptionPtr);
703  };
704  if(auto queue = this->serializeRunModule()) {
705  queue->push( toDo);
706  } else {
707  auto task = make_functor_task( tbb::task::allocate_root(), toDo);
708  tbb::task::spawn(*task);
709  }
710  }
711  }
712 
713  template <typename T>
714  bool Worker::doWork(typename T::MyPrincipal const& ep,
715  EventSetup const& es,
716  StreamID streamID,
717  ParentContext const& parentContext,
718  typename T::Context const* context) {
719 
720  if (T::isEvent_) {
721  timesVisited_.fetch_add(1,std::memory_order_relaxed);
722  }
723  bool rc = false;
724 
725  switch(state_) {
726  case Ready: break;
727  case Pass: return true;
728  case Fail: return false;
729  case Exception: {
730  std::rethrow_exception(cached_exception_);
731  }
732  }
733 
734  bool expected = false;
735  if(not workStarted_.compare_exchange_strong(expected, true) ) {
736  //another thread beat us here
737  auto waitTask = edm::make_empty_waiting_task();
738  waitTask->increment_ref_count();
739 
740  waitingTasks_.add(waitTask.get());
741 
742  waitTask->wait_for_all();
743 
744  switch(state_) {
745  case Ready: assert(false);
746  case Pass: return true;
747  case Fail: return false;
748  case Exception: {
749  std::rethrow_exception(cached_exception_);
750  }
751  }
752  }
753 
754  //Need the context to be set until after any exception is resolved
755  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching,parentContext,nullptr);
756 
757  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
758  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
759 
760  if (T::isEvent_) {
761  try {
762  //if have TriggerResults based selection we want to reject the event before doing prefetching
763  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
764  timesRun_.fetch_add(1,std::memory_order_relaxed);
765  rc = setPassed<T::isEvent_>();
766  waitingTasks_.doneWaiting(nullptr);
767  return true;
768  }
769  }catch(...) {}
770  auto waitTask = edm::make_empty_waiting_task();
771  {
772  //Make sure signal is sent once the prefetching is done
773  // [the 'pre' signal was sent in prefetchAsync]
774  //The purpose of this block is to send the signal after wait_for_all
775  auto sentryFunc = [this](void*) {
776  emitPostModuleEventPrefetchingSignal();
777  };
778  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
779 
780  //set count to 2 since wait_for_all requires value to not go to 0
781  waitTask->set_ref_count(2);
782 
783  prefetchAsync(waitTask.get(),parentContext, ep);
784  waitTask->decrement_ref_count();
785  waitTask->wait_for_all();
786  }
787  if(waitTask->exceptionPtr() != nullptr) {
789  if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
790  setException<T::isEvent_>(*waitTask->exceptionPtr());
791  waitingTasks_.doneWaiting(cached_exception_);
792  std::rethrow_exception(cached_exception_);
793  } else {
794  setPassed<T::isEvent_>();
795  waitingTasks_.doneWaiting(nullptr);
796  return true;
797  }
798  }
799  }
800 
801  //successful prefetch so no reset necessary
802  prefetchSentry.release();
803  if(auto queue = serializeRunModule()) {
804  auto serviceToken = ServiceRegistry::instance().presentToken();
805  queue->pushAndWait([&]() {
806  //Need to make the services available
807  ServiceRegistry::Operate guard(serviceToken);
808  try {
809  rc = runModule<T>(ep,es,streamID,parentContext,context);
810  } catch(...) {
811  }
812  });
813  } else {
814  try {
815  rc = runModule<T>(ep,es,streamID,parentContext,context);
816  } catch(...) {
817  }
818  }
819  if(state_ == Exception) {
820  waitingTasks_.doneWaiting(cached_exception_);
821  std::rethrow_exception(cached_exception_);
822  }
823 
824  waitingTasks_.doneWaiting(nullptr);
825  return rc;
826  }
827 
828 
829  template <typename T>
830  bool Worker::runModule(typename T::MyPrincipal const& ep,
831  EventSetup const& es,
832  StreamID streamID,
833  ParentContext const& parentContext,
834  typename T::Context const* context) {
835  //unscheduled producers should advance this
836  //if (T::isEvent_) {
837  // ++timesVisited_;
838  //}
839  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
840  if (T::isEvent_) {
841  timesRun_.fetch_add(1,std::memory_order_relaxed);
842  }
843 
844  bool rc = true;
845  try {
847  {
848  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
849 
850  if (rc) {
851  setPassed<T::isEvent_>();
852  } else {
853  setFailed<T::isEvent_>();
854  }
855  });
856  } catch(cms::Exception& ex) {
857  exceptionContext(ex, &moduleCallingContext_);
859  if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
860  assert(not cached_exception_);
861  setException<T::isEvent_>(std::current_exception());
862  std::rethrow_exception(cached_exception_);
863  } else {
864  rc = setPassed<T::isEvent_>();
865  }
866  }
867 
868  return rc;
869  }
870 }
871 #endif
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:579
tbb::task * execute() override
Definition: Worker.h:332
std::atomic< int > timesVisited_
Definition: Worker.h:398
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:318
ModuleDescription const & description() const
Definition: Worker.h:132
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
Definition: Worker.h:393
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:830
void callWhenDoneAsync(WaitingTask *task)
Definition: Worker.h:107
T::MyPrincipal const & m_principal
Definition: Worker.h:389
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:404
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:290
virtual std::string value() const override
Definition: Worker.h:256
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:511
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:521
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:558
Definition: hltDiff.cc:290
static PFTauRenderPlugin instance
ParentContext const m_parentContext
Definition: Worker.h:392
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< int > timesExcept_
Definition: Worker.h:401
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:502
std::atomic< int > timesFailed_
Definition: Worker.h:400
int timesPassed() const
Definition: Worker.h:170
void addedToPath()
Definition: Worker.h:164
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:714
State state() const
Definition: Worker.h:173
void clearCounters()
Definition: Worker.h:156
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:540
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:467
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:448
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:587
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:549
ActivityRegistry * activityRegistry()
Definition: Worker.h:208
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:598
void exceptionContext(std::ostream &, GlobalContext const &)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:568
ExceptionToActionTable const * actions_
Definition: Worker.h:408
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:406
BranchType
Definition: BranchType.h:11
void beginJob()
Definition: Breakpoints.cc:15
bool setFailed()
Definition: Worker.h:281
int timesExcept() const
Definition: Worker.h:172
def principal(options)
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
void reset()
Definition: Worker.h:122
int timesVisited() const
Definition: Worker.h:169
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:466
ModuleDescription const * descPtr() const
Definition: Worker.h:133
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:607
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.h:120
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:578
std::atomic< State > state_
Definition: Worker.h:402
int timesRun() const
Definition: Worker.h:168
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:485
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:503
Definition: Types.py:1
int numberOfPathsOn_
Definition: Worker.h:403
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:597
bool setPassed()
Definition: Worker.h:272
TransitionIDValue(T const &iP)
Definition: Worker.h:255
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:119
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:447
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:676
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:484
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:115
int timesPass() const
Definition: Worker.h:175
const T & get() const
Definition: EventSetup.h:56
Definition: hltDiff.cc:291
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:559
int timesFailed() const
Definition: Worker.h:171
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:116
void preForkReleaseResources()
Definition: Worker.h:118
HLT enums.
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:456
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:303
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
double a
Definition: hdecay.h:121
std::atomic< int > timesPassed_
Definition: Worker.h:399
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:475
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:617
static Interceptor::Registry registry("Interceptor")
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:539
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:529
std::atomic< int > timesRun_
Definition: Worker.h:397
long double T
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:520
ServiceToken m_serviceToken
Definition: Worker.h:394
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:493
void runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:648
EventSetup const & m_es
Definition: Worker.h:390