CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
51 
53 
54 #include <atomic>
55 #include <map>
56 #include <memory>
57 #include <sstream>
58 #include <string>
59 #include <vector>
60 #include <exception>
61 #include <unordered_map>
62 
63 namespace edm {
64  class EventPrincipal;
65  class EarlyDeleteHelper;
66  class ProductResolverIndexHelper;
67  class ProductResolverIndexAndSkipBit;
68  class StreamID;
69  class StreamContext;
70  class ProductRegistry;
71  class ThinnedAssociationsHelper;
72  class WaitingTask;
73 
74  namespace workerhelper {
75  template< typename O> class CallImpl;
76  }
77 
78  class Worker {
79  public:
80  enum State { Ready, Pass, Fail, Exception };
81  enum Types { kAnalyzer, kFilter, kProducer, kOutputModule};
83  SerialTaskQueueChain* serial_ = nullptr;
84  LimitedTaskQueue* limited_ = nullptr;
85 
86  TaskQueueAdaptor() = default;
87  TaskQueueAdaptor(SerialTaskQueueChain* iChain): serial_(iChain) {}
88  TaskQueueAdaptor(LimitedTaskQueue* iLimited): limited_(iLimited) {}
89 
90  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
91 
92  template <class F>
93  void push(F&& iF) {
94  if(serial_) {
95  serial_->push(iF);
96  } else {
97  limited_->push(iF);
98  }
99  }
100  template <class F>
101  void pushAndWait(F&& iF) {
102  if(serial_) {
103  serial_->pushAndWait(iF);
104  } else {
105  limited_->pushAndWait(iF);
106  }
107  }
108 
109  };
110 
111  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
112  virtual ~Worker();
113 
114  Worker(Worker const&) = delete; // Disallow copying and moving
115  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
116 
117  template <typename T>
118  bool doWork(typename T::MyPrincipal const&, EventSetup const& c,
119  StreamID stream,
120  ParentContext const& parentContext,
121  typename T::Context const* context);
122  template <typename T>
123  void doWorkAsync(WaitingTask* task,
124  typename T::MyPrincipal const&, EventSetup const& c,
125  StreamID stream,
126  ParentContext const& parentContext,
127  typename T::Context const* context);
128 
129  template <typename T>
130  void doWorkNoPrefetchingAsync(WaitingTask* task,
131  typename T::MyPrincipal const&,
132  EventSetup const& c,
133  StreamID stream,
134  ParentContext const& parentContext,
135  typename T::Context const* context);
136 
137  template <typename T>
138  std::exception_ptr runModuleDirectly(typename T::MyPrincipal const& ep,
139  EventSetup const& es,
140  StreamID streamID,
141  ParentContext const& parentContext,
142  typename T::Context const* context);
143 
145  waitingTasks_.add(task);
146  }
147  void skipOnPath();
148  void beginJob() ;
149  void endJob();
150  void beginStream(StreamID id, StreamContext& streamContext);
151  void endStream(StreamID id, StreamContext& streamContext);
152  void respondToOpenInputFile(FileBlock const& fb) {implRespondToOpenInputFile(fb);}
153  void respondToCloseInputFile(FileBlock const& fb) {implRespondToCloseInputFile(fb);}
154 
155  void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper) { implRegisterThinnedAssociations(registry, helper); }
156 
157  void reset() {
158  cached_exception_ = std::exception_ptr();
159  state_ = Ready;
160  waitingTasks_.reset();
161  workStarted_ = false;
162  numberOfPathsLeftToRun_ = numberOfPathsOn_;
163  }
164 
165  void postDoEvent(EventPrincipal const&);
166 
167  ModuleDescription const& description() const {return *(moduleCallingContext_.moduleDescription());}
168  ModuleDescription const* descPtr() const {return moduleCallingContext_.moduleDescription(); }
171  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
172 
173  void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
174 
175  //Used to make EDGetToken work
176  virtual void updateLookup(BranchType iBranchType,
177  ProductResolverIndexHelper const&) = 0;
178  virtual void resolvePutIndicies(BranchType iBranchType,
179  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const& iIndicies) = 0;
180 
181  virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>& modules,
182  ProductRegistry const& preg,
183  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
184 
185  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
186 
187  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
188 
189  virtual Types moduleType() const =0;
190 
191  void clearCounters() {
192  timesRun_.store(0,std::memory_order_release);
193  timesVisited_.store(0,std::memory_order_release);
194  timesPassed_.store(0,std::memory_order_release);
195  timesFailed_.store(0,std::memory_order_release);
196  timesExcept_.store(0,std::memory_order_release);
197  }
198 
199  void addedToPath() {
200  ++numberOfPathsOn_;
201  }
202  //NOTE: calling state() is done to force synchronization across threads
203  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
204  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
205  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
206  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
207  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
208  State state() const { return state_; }
209 
210  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
211 
212  protected:
213  template<typename O> friend class workerhelper::CallImpl;
214  virtual std::string workerType() const = 0;
215  virtual bool implDo(EventPrincipal const&, EventSetup const& c,
216  ModuleCallingContext const* mcc) = 0;
217  virtual bool implDoPrePrefetchSelection(StreamID id,
218  EventPrincipal const& ep,
219  ModuleCallingContext const* mcc) = 0;
220  virtual bool implDoBegin(RunPrincipal const& rp, EventSetup const& c,
221  ModuleCallingContext const* mcc) = 0;
222  virtual bool implDoStreamBegin(StreamID id, RunPrincipal const& rp, EventSetup const& c,
223  ModuleCallingContext const* mcc) = 0;
224  virtual bool implDoStreamEnd(StreamID id, RunPrincipal const& rp, EventSetup const& c,
225  ModuleCallingContext const* mcc) = 0;
226  virtual bool implDoEnd(RunPrincipal const& rp, EventSetup const& c,
227  ModuleCallingContext const* mcc) = 0;
228  virtual bool implDoBegin(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
229  ModuleCallingContext const* mcc) = 0;
230  virtual bool implDoStreamBegin(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
231  ModuleCallingContext const* mcc) = 0;
232  virtual bool implDoStreamEnd(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
233  ModuleCallingContext const* mcc) = 0;
234  virtual bool implDoEnd(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
235  ModuleCallingContext const* mcc) = 0;
236  virtual void implBeginJob() = 0;
237  virtual void implEndJob() = 0;
238  virtual void implBeginStream(StreamID) = 0;
239  virtual void implEndStream(StreamID) = 0;
240 
241  void resetModuleDescription(ModuleDescription const*);
242 
243  ActivityRegistry* activityRegistry() { return actReg_.get(); }
244 
245  private:
246 
247  template <typename T>
248  bool runModule(typename T::MyPrincipal const&, EventSetup const& c,
249  StreamID stream,
250  ParentContext const& parentContext,
251  typename T::Context const* context);
252 
253  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
254  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
255 
256  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
257 
258 
259  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
260 
261  virtual void preActionBeforeRunEventAsync(WaitingTask* iTask, ModuleCallingContext const& moduleCallingContext, Principal const& iPrincipal) const = 0;
262 
263  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
264  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
265 
266  virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
267 
268  virtual TaskQueueAdaptor serializeRunModule() = 0;
269 
270  static void exceptionContext(cms::Exception& ex,
271  ModuleCallingContext const* mcc);
272 
273  /*This base class is used to hide the differences between the ID used
274  for Event, LuminosityBlock and Run. Using the base class allows us
275  to only convert the ID to string form if it is actually needed in
276  the call to shouldRethrowException.
277  */
279  public:
280  virtual std::string value() const = 0;
282  };
283 
284  template< typename T>
286  public:
287  TransitionIDValue(T const& iP): p_(iP) {}
288  std::string value() const override {
289  std::ostringstream iost;
290  iost<<p_.id();
291  return iost.str();
292  }
293  private:
294  T const& p_;
295 
296  };
297 
298  bool shouldRethrowException(std::exception_ptr iPtr,
299  ParentContext const& parentContext,
300  bool isEvent,
301  TransitionIDValueBase const& iID) const;
302 
303  template<bool IS_EVENT>
304  bool setPassed() {
305  if(IS_EVENT) {
306  timesPassed_.fetch_add(1,std::memory_order_relaxed);
307  }
308  state_ = Pass;
309  return true;
310  }
311 
312  template<bool IS_EVENT>
313  bool setFailed() {
314  if(IS_EVENT) {
315  timesFailed_.fetch_add(1,std::memory_order_relaxed);
316  }
317  state_ = Fail;
318  return false;
319  }
320 
321  template<bool IS_EVENT>
322  std::exception_ptr setException(std::exception_ptr iException) {
323  if (IS_EVENT) {
324  timesExcept_.fetch_add(1,std::memory_order_relaxed);
325  }
326  cached_exception_ = iException; // propagate_const<T> has no reset() function
327  state_ = Exception;
328  return cached_exception_;
329  }
330 
331  void prefetchAsync(WaitingTask*,
332  ParentContext const& parentContext,
333  Principal const& );
334 
336  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
337  }
338 
339  template<typename T>
340  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const * iEPtr,
341  typename T::MyPrincipal const& ep,
342  EventSetup const& es,
343  StreamID streamID,
344  ParentContext const& parentContext,
345  typename T::Context const* context);
346 
347  template< typename T>
348  class RunModuleTask : public WaitingTask {
349  public:
351  typename T::MyPrincipal const& ep,
352  EventSetup const& es,
353  StreamID streamID,
354  ParentContext const& parentContext,
355  typename T::Context const* context):
356  m_worker(worker),
357  m_principal(ep),
358  m_es(es),
359  m_streamID(streamID),
360  m_parentContext(parentContext),
361  m_context(context),
362  m_serviceToken(ServiceRegistry::instance().presentToken()) {}
363 
364  tbb::task* execute() override {
365  //Need to make the services available early so other services can see them
366  ServiceRegistry::Operate guard(m_serviceToken);
367 
368  //incase the emit causes an exception, we need a memory location
369  // to hold the exception_ptr
370  std::exception_ptr temp_excptr;
371  auto excptr = exceptionPtr();
372  if(T::isEvent_) {
373  try {
374  //pre was called in prefetchAsync
375  m_worker->emitPostModuleEventPrefetchingSignal();
376  }catch(...) {
377  temp_excptr = std::current_exception();
378  if(not excptr) {
379  excptr = &temp_excptr;
380  }
381  }
382  }
383 
384  if( not excptr) {
385  if(auto queue = m_worker->serializeRunModule()) {
386  Worker* worker = m_worker;
387  auto const & principal = m_principal;
388  auto& es = m_es;
389  auto streamID = m_streamID;
390  auto parentContext = m_parentContext;
391  auto serviceToken = m_serviceToken;
392  auto sContext = m_context;
393  queue.push( [worker, &principal, &es, streamID,parentContext,sContext, serviceToken]()
394  {
395  //Need to make the services available
396  ServiceRegistry::Operate guard(serviceToken);
397 
398  std::exception_ptr* ptr = nullptr;
399  worker->runModuleAfterAsyncPrefetch<T>(ptr,
400  principal,
401  es,
402  streamID,
403  parentContext,
404  sContext);
405  });
406  return nullptr;
407  }
408  }
409 
410  m_worker->runModuleAfterAsyncPrefetch<T>(excptr,
411  m_principal,
412  m_es,
413  m_streamID,
414  m_parentContext,
415  m_context);
416  return nullptr;
417  }
418 
419  private:
421  typename T::MyPrincipal const& m_principal;
422  EventSetup const& m_es;
425  typename T::Context const* m_context;
427  };
428 
429  std::atomic<int> timesRun_;
430  std::atomic<int> timesVisited_;
431  std::atomic<int> timesPassed_;
432  std::atomic<int> timesFailed_;
433  std::atomic<int> timesExcept_;
434  std::atomic<State> state_;
436  std::atomic<int> numberOfPathsLeftToRun_;
437 
439 
440  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
441  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
442 
443  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
444 
445  edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
446 
447  edm::WaitingTaskList waitingTasks_;
448  std::atomic<bool> workStarted_;
449  };
450 
451  namespace {
452  template <typename T>
453  class ModuleSignalSentry {
454  public:
455  ModuleSignalSentry(ActivityRegistry *a,
456  typename T::Context const* context,
457  ModuleCallingContext const* moduleCallingContext) :
458  a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
459 
460  if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
461  }
462 
463  ~ModuleSignalSentry() {
464  if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
465  }
466 
467  private:
468  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
469  typename T::Context const* context_;
470  ModuleCallingContext const* moduleCallingContext_;
471  };
472 
473  }
474 
475  namespace workerhelper {
476  template<>
478  public:
480  static bool call(Worker* iWorker, StreamID,
481  EventPrincipal const& ep, EventSetup const& es,
482  ActivityRegistry* /* actReg */,
483  ModuleCallingContext const* mcc,
484  Arg::Context const* /* context*/) {
485  //Signal sentry is handled by the module
486  return iWorker->implDo(ep,es, mcc);
487  }
488  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
489  typename Arg::MyPrincipal const& ep,
490  ModuleCallingContext const* mcc) {
491  return iWorker->implDoPrePrefetchSelection(id,ep,mcc);
492  }
493  };
494 
495  template<>
497  public:
499  static bool call(Worker* iWorker,StreamID,
500  RunPrincipal const& ep, EventSetup const& es,
501  ActivityRegistry* actReg,
502  ModuleCallingContext const* mcc,
503  Arg::Context const* context) {
504  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
505  return iWorker->implDoBegin(ep,es, mcc);
506  }
507  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
508  typename Arg::MyPrincipal const& ep,
509  ModuleCallingContext const* mcc) {
510  return true;
511  }
512  };
513  template<>
515  public:
517  static bool call(Worker* iWorker,StreamID id,
518  RunPrincipal const & ep, EventSetup const& es,
519  ActivityRegistry* actReg,
520  ModuleCallingContext const* mcc,
521  Arg::Context const* context) {
522  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
523  return iWorker->implDoStreamBegin(id,ep,es, mcc);
524  }
525  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
526  typename Arg::MyPrincipal const& ep,
527  ModuleCallingContext const* mcc) {
528  return true;
529  }
530  };
531  template<>
533  public:
535  static bool call(Worker* iWorker,StreamID,
536  RunPrincipal const& ep, EventSetup const& es,
537  ActivityRegistry* actReg,
538  ModuleCallingContext const* mcc,
539  Arg::Context const* context) {
540  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
541  return iWorker->implDoEnd(ep,es, mcc);
542  }
543  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
544  typename Arg::MyPrincipal const& ep,
545  ModuleCallingContext const* mcc) {
546  return true;
547  }
548  };
549  template<>
551  public:
553  static bool call(Worker* iWorker,StreamID id,
554  RunPrincipal const& ep, EventSetup const& es,
555  ActivityRegistry* actReg,
556  ModuleCallingContext const* mcc,
557  Arg::Context const* context) {
558  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
559  return iWorker->implDoStreamEnd(id,ep,es, mcc);
560  }
561  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
562  typename Arg::MyPrincipal const& ep,
563  ModuleCallingContext const* mcc) {
564  return true;
565  }
566  };
567 
568  template<>
570  public:
572  static bool call(Worker* iWorker,StreamID,
573  LuminosityBlockPrincipal const& ep, EventSetup const& es,
574  ActivityRegistry* actReg,
575  ModuleCallingContext const* mcc,
576  Arg::Context const* context) {
577  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
578  return iWorker->implDoBegin(ep,es, mcc);
579  }
580 
581  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
582  typename Arg::MyPrincipal const& ep,
583  ModuleCallingContext const* mcc) {
584  return true;
585  }
586  };
587  template<>
589  public:
591  static bool call(Worker* iWorker,StreamID id,
592  LuminosityBlockPrincipal const& ep, EventSetup const& es,
593  ActivityRegistry* actReg,
594  ModuleCallingContext const* mcc,
595  Arg::Context const* context) {
596  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
597  return iWorker->implDoStreamBegin(id,ep,es, mcc);
598  }
599 
600  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
601  typename Arg::MyPrincipal const& ep,
602  ModuleCallingContext const* mcc) {
603  return true;
604  }
605 };
606 
607  template<>
609  public:
611  static bool call(Worker* iWorker,StreamID,
612  LuminosityBlockPrincipal const& ep, EventSetup const& es,
613  ActivityRegistry* actReg,
614  ModuleCallingContext const* mcc,
615  Arg::Context const* context) {
616  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
617  return iWorker->implDoEnd(ep,es, mcc);
618  }
619  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
620  typename Arg::MyPrincipal const& ep,
621  ModuleCallingContext const* mcc) {
622  return true;
623  }
624 
625  };
626  template<>
628  public:
630  static bool call(Worker* iWorker,StreamID id,
631  LuminosityBlockPrincipal const& ep, EventSetup const& es,
632  ActivityRegistry* actReg,
633  ModuleCallingContext const* mcc,
634  Arg::Context const* context) {
635  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
636  return iWorker->implDoStreamEnd(id,ep,es, mcc);
637  }
638 
639  static bool prePrefetchSelection(Worker* iWorker,StreamID id,
640  typename Arg::MyPrincipal const& ep,
641  ModuleCallingContext const* mcc) {
642  return true;
643  }
644  };
645  }
646 
647 
648  template <typename T>
650  typename T::MyPrincipal const& ep,
651  EventSetup const& es,
652  StreamID streamID,
653  ParentContext const& parentContext,
654  typename T::Context const* context) {
655  waitingTasks_.add(task);
656  if(T::isEvent_) {
657  timesVisited_.fetch_add(1,std::memory_order_relaxed);
658  }
659 
660  bool expected = false;
661  if(workStarted_.compare_exchange_strong(expected,true)) {
662  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching,parentContext,nullptr);
663 
664  //if have TriggerResults based selection we want to reject the event before doing prefetching
665  try {
666  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
667  setPassed<T::isEvent_>();
668  waitingTasks_.doneWaiting(nullptr);
669  return;
670  }
671  }catch(...) {}
672 
673  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
674  this, ep,es,streamID,parentContext,context);
675  prefetchAsync(runTask, parentContext, ep);
676  }
677  }
678 
679  template<typename T>
680  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
681  typename T::MyPrincipal const& ep,
682  EventSetup const& es,
683  StreamID streamID,
684  ParentContext const& parentContext,
685  typename T::Context const* context) {
686  std::exception_ptr exceptionPtr;
687  if(iEPtr) {
688  assert(*iEPtr);
690  if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
691  exceptionPtr = *iEPtr;
692  setException<T::isEvent_>(exceptionPtr);
693  } else {
694  setPassed<T::isEvent_>();
695  }
696  moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr);
697  } else {
698  try {
699  runModule<T>(ep,es,streamID,parentContext,context);
700  } catch(...) {
701  exceptionPtr = std::current_exception();
702  }
703  }
704  waitingTasks_.doneWaiting(exceptionPtr);
705  return exceptionPtr;
706  }
707 
708  template <typename T>
710  typename T::MyPrincipal const& principal,
711  EventSetup const& es,
712  StreamID streamID,
713  ParentContext const& parentContext,
714  typename T::Context const* context) {
715  waitingTasks_.add(task);
716  bool expected = false;
717  if(workStarted_.compare_exchange_strong(expected,true)) {
718  auto serviceToken = ServiceRegistry::instance().presentToken();
719 
720  auto toDo =[this, &principal, &es, streamID,parentContext,context, serviceToken]()
721  {
722  std::exception_ptr exceptionPtr;
723  try {
724  //Need to make the services available
725  ServiceRegistry::Operate guard(serviceToken);
726 
727  this->runModule<T>(principal,
728  es,
729  streamID,
730  parentContext,
731  context);
732  } catch( ... ) {
733  exceptionPtr = std::current_exception();
734  }
735  this->waitingTasks_.doneWaiting(exceptionPtr);
736  };
737  if(auto queue = this->serializeRunModule()) {
738  queue.push( toDo);
739  } else {
740  auto task = make_functor_task( tbb::task::allocate_root(), toDo);
741  tbb::task::spawn(*task);
742  }
743  }
744  }
745 
746  template <typename T>
747  bool Worker::doWork(typename T::MyPrincipal const& ep,
748  EventSetup const& es,
749  StreamID streamID,
750  ParentContext const& parentContext,
751  typename T::Context const* context) {
752 
753  if (T::isEvent_) {
754  timesVisited_.fetch_add(1,std::memory_order_relaxed);
755  }
756  bool rc = false;
757 
758  switch(state_) {
759  case Ready: break;
760  case Pass: return true;
761  case Fail: return false;
762  case Exception: {
763  std::rethrow_exception(cached_exception_);
764  }
765  }
766 
767  bool expected = false;
768  if(not workStarted_.compare_exchange_strong(expected, true) ) {
769  //another thread beat us here
770  auto waitTask = edm::make_empty_waiting_task();
771  waitTask->increment_ref_count();
772 
773  waitingTasks_.add(waitTask.get());
774 
775  waitTask->wait_for_all();
776 
777  switch(state_) {
778  case Ready: assert(false);
779  case Pass: return true;
780  case Fail: return false;
781  case Exception: {
782  std::rethrow_exception(cached_exception_);
783  }
784  }
785  }
786 
787  //Need the context to be set until after any exception is resolved
788  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching,parentContext,nullptr);
789 
790  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
791  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
792 
793  if (T::isEvent_) {
794  try {
795  //if have TriggerResults based selection we want to reject the event before doing prefetching
796  if( not workerhelper::CallImpl<T>::prePrefetchSelection(this,streamID,ep,&moduleCallingContext_) ) {
797  timesRun_.fetch_add(1,std::memory_order_relaxed);
798  rc = setPassed<T::isEvent_>();
799  waitingTasks_.doneWaiting(nullptr);
800  return true;
801  }
802  }catch(...) {}
803  auto waitTask = edm::make_empty_waiting_task();
804  {
805  //Make sure signal is sent once the prefetching is done
806  // [the 'pre' signal was sent in prefetchAsync]
807  //The purpose of this block is to send the signal after wait_for_all
808  auto sentryFunc = [this](void*) {
809  emitPostModuleEventPrefetchingSignal();
810  };
811  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
812 
813  //set count to 2 since wait_for_all requires value to not go to 0
814  waitTask->set_ref_count(2);
815 
816  prefetchAsync(waitTask.get(),parentContext, ep);
817  waitTask->decrement_ref_count();
818  waitTask->wait_for_all();
819  }
820  if(waitTask->exceptionPtr() != nullptr) {
822  if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
823  setException<T::isEvent_>(*waitTask->exceptionPtr());
824  waitingTasks_.doneWaiting(cached_exception_);
825  std::rethrow_exception(cached_exception_);
826  } else {
827  setPassed<T::isEvent_>();
828  waitingTasks_.doneWaiting(nullptr);
829  return true;
830  }
831  }
832  }
833 
834  //successful prefetch so no reset necessary
835  prefetchSentry.release();
836  if(auto queue = serializeRunModule()) {
837  auto serviceToken = ServiceRegistry::instance().presentToken();
838  queue.pushAndWait([&]() {
839  //Need to make the services available
840  ServiceRegistry::Operate guard(serviceToken);
841  try {
842  rc = runModule<T>(ep,es,streamID,parentContext,context);
843  } catch(...) {
844  }
845  });
846  } else {
847  try {
848  rc = runModule<T>(ep,es,streamID,parentContext,context);
849  } catch(...) {
850  }
851  }
852  if(state_ == Exception) {
853  waitingTasks_.doneWaiting(cached_exception_);
854  std::rethrow_exception(cached_exception_);
855  }
856 
857  waitingTasks_.doneWaiting(nullptr);
858  return rc;
859  }
860 
861 
862  template <typename T>
863  bool Worker::runModule(typename T::MyPrincipal const& ep,
864  EventSetup const& es,
865  StreamID streamID,
866  ParentContext const& parentContext,
867  typename T::Context const* context) {
868  //unscheduled producers should advance this
869  //if (T::isEvent_) {
870  // ++timesVisited_;
871  //}
872  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
873  if (T::isEvent_) {
874  timesRun_.fetch_add(1,std::memory_order_relaxed);
875  }
876 
877  bool rc = true;
878  try {
880  {
881  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
882 
883  if (rc) {
884  setPassed<T::isEvent_>();
885  } else {
886  setFailed<T::isEvent_>();
887  }
888  });
889  } catch(cms::Exception& ex) {
890  exceptionContext(ex, &moduleCallingContext_);
892  if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
893  assert(not cached_exception_);
894  setException<T::isEvent_>(std::current_exception());
895  std::rethrow_exception(cached_exception_);
896  } else {
897  rc = setPassed<T::isEvent_>();
898  }
899  }
900 
901  return rc;
902  }
903 
904  template <typename T>
905  std::exception_ptr Worker::runModuleDirectly(typename T::MyPrincipal const& ep,
906  EventSetup const& es,
907  StreamID streamID,
908  ParentContext const& parentContext,
909  typename T::Context const* context) {
910 
911  timesVisited_.fetch_add(1,std::memory_order_relaxed);
912  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
913  return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
914  }
915 }
916 #endif
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:611
tbb::task * execute() override
Definition: Worker.h:364
std::atomic< int > timesVisited_
Definition: Worker.h:430
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:350
ModuleDescription const & description() const
Definition: Worker.h:167
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
Definition: Worker.h:425
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:863
void callWhenDoneAsync(WaitingTask *task)
Definition: Worker.h:144
T::MyPrincipal const & m_principal
Definition: Worker.h:421
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:436
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:322
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:543
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:553
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:590
Definition: hltDiff.cc:290
static PFTauRenderPlugin instance
ParentContext const m_parentContext
Definition: Worker.h:424
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< int > timesExcept_
Definition: Worker.h:433
void push(const T &iAction)
asynchronously pushes functor iAction into queue
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:534
std::atomic< int > timesFailed_
Definition: Worker.h:432
int timesPassed() const
Definition: Worker.h:205
void addedToPath()
Definition: Worker.h:199
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:747
State state() const
Definition: Worker.h:208
void clearCounters()
Definition: Worker.h:191
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:572
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:87
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:499
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:480
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:619
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:581
ActivityRegistry * activityRegistry()
Definition: Worker.h:243
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:630
void exceptionContext(std::ostream &, GlobalContext const &)
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:600
ExceptionToActionTable const * actions_
Definition: Worker.h:440
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:905
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:438
BranchType
Definition: BranchType.h:11
void beginJob()
Definition: Breakpoints.cc:15
bool setFailed()
Definition: Worker.h:313
int timesExcept() const
Definition: Worker.h:207
void pushAndWait(F &&iF)
Definition: Worker.h:101
def principal(options)
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
void reset()
Definition: Worker.h:157
int timesVisited() const
Definition: Worker.h:204
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:498
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
ModuleDescription const * descPtr() const
Definition: Worker.h:168
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:639
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.h:155
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:610
std::atomic< State > state_
Definition: Worker.h:434
int timesRun() const
Definition: Worker.h:203
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:517
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:535
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
Definition: Types.py:1
int numberOfPathsOn_
Definition: Worker.h:435
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:629
bool setPassed()
Definition: Worker.h:304
TransitionIDValue(T const &iP)
Definition: Worker.h:287
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:88
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:479
void push(const T &iAction)
asynchronously pushes functor iAction into queue
void pushAndWait(const T &iAction)
synchronously pushes functor iAction into queue
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
std::string value() const override
Definition: Worker.h:288
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:709
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:516
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:680
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:152
int timesPass() const
Definition: Worker.h:210
const T & get() const
Definition: EventSetup.h:55
Definition: hltDiff.cc:291
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:591
int timesFailed() const
Definition: Worker.h:206
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:153
HLT enums.
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:488
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:335
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
double a
Definition: hdecay.h:121
std::atomic< int > timesPassed_
Definition: Worker.h:431
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:507
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:649
static Interceptor::Registry registry("Interceptor")
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:571
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:561
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:281
std::atomic< int > timesRun_
Definition: Worker.h:429
long double T
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:552
ServiceToken m_serviceToken
Definition: Worker.h:426
static bool prePrefetchSelection(Worker *iWorker, StreamID id, typename Arg::MyPrincipal const &ep, ModuleCallingContext const *mcc)
Definition: Worker.h:525
EventSetup const & m_es
Definition: Worker.h:422