CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
53 
55 
56 #include <atomic>
57 #include <map>
58 #include <memory>
59 #include <sstream>
60 #include <string>
61 #include <vector>
62 #include <exception>
63 #include <unordered_map>
64 
65 namespace edm {
66  class EventPrincipal;
67  class EarlyDeleteHelper;
68  class ProductResolverIndexHelper;
69  class ProductResolverIndexAndSkipBit;
70  class StreamID;
71  class StreamContext;
72  class ProductRegistry;
73  class ThinnedAssociationsHelper;
74 
75  namespace workerhelper {
76  template< typename O> class CallImpl;
77  }
78 
79  class Worker {
80  public:
81  enum State { Ready, Pass, Fail, Exception };
82  enum Types { kAnalyzer, kFilter, kProducer, kOutputModule};
84  SerialTaskQueueChain* serial_ = nullptr;
85  LimitedTaskQueue* limited_ = nullptr;
86 
87  TaskQueueAdaptor() = default;
88  TaskQueueAdaptor(SerialTaskQueueChain* iChain): serial_(iChain) {}
89  TaskQueueAdaptor(LimitedTaskQueue* iLimited): limited_(iLimited) {}
90 
91  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
92 
93  template <class F>
94  void push(F&& iF) {
95  if(serial_) {
96  serial_->push(iF);
97  } else {
98  limited_->push(iF);
99  }
100  }
101  template <class F>
102  void pushAndWait(F&& iF) {
103  if(serial_) {
104  serial_->pushAndWait(iF);
105  } else {
106  limited_->pushAndWait(iF);
107  }
108  }
109 
110  };
111 
112  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
113  virtual ~Worker();
114 
115  Worker(Worker const&) = delete; // Disallow copying and moving
116  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
117 
118  virtual bool wantsGlobalRuns() const = 0;
119  virtual bool wantsGlobalLuminosityBlocks() const = 0;
120  virtual bool wantsStreamRuns() const = 0;
121  virtual bool wantsStreamLuminosityBlocks() const = 0;
122 
123  template <typename T>
124  bool doWork(typename T::MyPrincipal const&, EventSetup const& c,
125  StreamID stream,
126  ParentContext const& parentContext,
127  typename T::Context const* context);
128 
129  void prePrefetchSelectionAsync(WaitingTask* task,
130  StreamID stream,
131  EventPrincipal const*);
132 
134  StreamID stream,
135  void const*) {assert(false);}
136 
137  template <typename T>
138  void doWorkAsync(WaitingTask* task,
139  typename T::MyPrincipal const&, EventSetup const& c,
140  StreamID stream,
141  ParentContext const& parentContext,
142  typename T::Context const* context);
143 
144  template <typename T>
145  void doWorkNoPrefetchingAsync(WaitingTask* task,
146  typename T::MyPrincipal const&,
147  EventSetup const& c,
148  StreamID stream,
149  ParentContext const& parentContext,
150  typename T::Context const* context);
151 
152  template <typename T>
153  std::exception_ptr runModuleDirectly(typename T::MyPrincipal const& ep,
154  EventSetup const& es,
155  StreamID streamID,
156  ParentContext const& parentContext,
157  typename T::Context const* context);
158 
160  waitingTasks_.add(task);
161  }
162  void skipOnPath();
163  void beginJob() ;
164  void endJob();
165  void beginStream(StreamID id, StreamContext& streamContext);
166  void endStream(StreamID id, StreamContext& streamContext);
167  void respondToOpenInputFile(FileBlock const& fb) {implRespondToOpenInputFile(fb);}
168  void respondToCloseInputFile(FileBlock const& fb) {implRespondToCloseInputFile(fb);}
169 
170  void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper) { implRegisterThinnedAssociations(registry, helper); }
171 
172  void reset() {
173  cached_exception_ = std::exception_ptr();
174  state_ = Ready;
175  waitingTasks_.reset();
176  workStarted_ = false;
177  numberOfPathsLeftToRun_ = numberOfPathsOn_;
178  }
179 
180  void postDoEvent(EventPrincipal const&);
181 
182  ModuleDescription const& description() const {return *(moduleCallingContext_.moduleDescription());}
183  ModuleDescription const* descPtr() const {return moduleCallingContext_.moduleDescription(); }
186  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
187 
188  void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
189 
190  //Used to make EDGetToken work
191  virtual void updateLookup(BranchType iBranchType,
192  ProductResolverIndexHelper const&) = 0;
193  virtual void resolvePutIndicies(BranchType iBranchType,
194  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const& iIndicies) = 0;
195 
196  virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>& modules,
197  ProductRegistry const& preg,
198  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
199 
200  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
201 
202  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
203 
204  virtual Types moduleType() const =0;
205 
206  void clearCounters() {
207  timesRun_.store(0,std::memory_order_release);
208  timesVisited_.store(0,std::memory_order_release);
209  timesPassed_.store(0,std::memory_order_release);
210  timesFailed_.store(0,std::memory_order_release);
211  timesExcept_.store(0,std::memory_order_release);
212  }
213 
214  void addedToPath() {
215  ++numberOfPathsOn_;
216  }
217  //NOTE: calling state() is done to force synchronization across threads
218  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
219  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
220  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
221  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
222  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
223  State state() const { return state_; }
224 
225  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
226 
227  protected:
228  template<typename O> friend class workerhelper::CallImpl;
229  virtual std::string workerType() const = 0;
230  virtual bool implDo(EventPrincipal const&, EventSetup const& c,
231  ModuleCallingContext const* mcc) = 0;
232 
233  virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
234  virtual bool implNeedToRunSelection() const = 0;
235 
236  virtual void implDoAcquire(EventPrincipal const&, EventSetup const& c,
237  ModuleCallingContext const* mcc,
238  WaitingTaskWithArenaHolder& holder) = 0;
239 
240  virtual bool implDoPrePrefetchSelection(StreamID id,
241  EventPrincipal const& ep,
242  ModuleCallingContext const* mcc) = 0;
243  virtual bool implDoBegin(RunPrincipal const& rp, EventSetup const& c,
244  ModuleCallingContext const* mcc) = 0;
245  virtual bool implDoStreamBegin(StreamID id, RunPrincipal const& rp, EventSetup const& c,
246  ModuleCallingContext const* mcc) = 0;
247  virtual bool implDoStreamEnd(StreamID id, RunPrincipal const& rp, EventSetup const& c,
248  ModuleCallingContext const* mcc) = 0;
249  virtual bool implDoEnd(RunPrincipal const& rp, EventSetup const& c,
250  ModuleCallingContext const* mcc) = 0;
251  virtual bool implDoBegin(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
252  ModuleCallingContext const* mcc) = 0;
253  virtual bool implDoStreamBegin(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
254  ModuleCallingContext const* mcc) = 0;
255  virtual bool implDoStreamEnd(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
256  ModuleCallingContext const* mcc) = 0;
257  virtual bool implDoEnd(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
258  ModuleCallingContext const* mcc) = 0;
259  virtual void implBeginJob() = 0;
260  virtual void implEndJob() = 0;
261  virtual void implBeginStream(StreamID) = 0;
262  virtual void implEndStream(StreamID) = 0;
263 
264  void resetModuleDescription(ModuleDescription const*);
265 
266  ActivityRegistry* activityRegistry() { return actReg_.get(); }
267 
268  private:
269 
270  template <typename T>
271  bool runModule(typename T::MyPrincipal const&, EventSetup const& c,
272  StreamID stream,
273  ParentContext const& parentContext,
274  typename T::Context const* context);
275 
276  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
277  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
278 
279  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
280 
281 
282  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
283 
284  virtual void preActionBeforeRunEventAsync(WaitingTask* iTask, ModuleCallingContext const& moduleCallingContext, Principal const& iPrincipal) const = 0;
285 
286  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
287  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
288 
289  virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
290 
291  virtual TaskQueueAdaptor serializeRunModule() = 0;
292 
293  static void exceptionContext(cms::Exception& ex,
294  ModuleCallingContext const* mcc);
295 
296  /*This base class is used to hide the differences between the ID used
297  for Event, LuminosityBlock and Run. Using the base class allows us
298  to only convert the ID to string form if it is actually needed in
299  the call to shouldRethrowException.
300  */
302  public:
303  virtual std::string value() const = 0;
305  };
306 
307  template< typename T>
309  public:
310  TransitionIDValue(T const& iP): p_(iP) {}
311  std::string value() const override {
312  std::ostringstream iost;
313  iost<<p_.id();
314  return iost.str();
315  }
316  private:
317  T const& p_;
318 
319  };
320 
321  bool shouldRethrowException(std::exception_ptr iPtr,
322  ParentContext const& parentContext,
323  bool isEvent,
324  TransitionIDValueBase const& iID) const;
325 
326  template<bool IS_EVENT>
327  bool setPassed() {
328  if(IS_EVENT) {
329  timesPassed_.fetch_add(1,std::memory_order_relaxed);
330  }
331  state_ = Pass;
332  return true;
333  }
334 
335  template<bool IS_EVENT>
336  bool setFailed() {
337  if(IS_EVENT) {
338  timesFailed_.fetch_add(1,std::memory_order_relaxed);
339  }
340  state_ = Fail;
341  return false;
342  }
343 
344  template<bool IS_EVENT>
345  std::exception_ptr setException(std::exception_ptr iException) {
346  if (IS_EVENT) {
347  timesExcept_.fetch_add(1,std::memory_order_relaxed);
348  }
349  cached_exception_ = iException; // propagate_const<T> has no reset() function
350  state_ = Exception;
351  return cached_exception_;
352  }
353 
354  void prefetchAsync(WaitingTask*,
355  ParentContext const& parentContext,
356  Principal const& );
357 
359  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
360  }
361 
362  virtual bool hasAcquire() const = 0;
363 
364  template<typename T>
365  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const * iEPtr,
366  typename T::MyPrincipal const& ep,
367  EventSetup const& es,
368  StreamID streamID,
369  ParentContext const& parentContext,
370  typename T::Context const* context);
371 
372  void runAcquire(EventPrincipal const& ep,
373  EventSetup const& es,
374  ParentContext const& parentContext,
376 
377  void runAcquireAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
378  EventPrincipal const& ep,
379  EventSetup const& es,
380  ParentContext const& parentContext,
382 
383  std::exception_ptr handleExternalWorkException(std::exception_ptr const* iEPtr,
384  ParentContext const& parentContext);
385 
386  template< typename T>
387  class RunModuleTask : public WaitingTask {
388  public:
390  typename T::MyPrincipal const& ep,
391  EventSetup const& es,
392  StreamID streamID,
393  ParentContext const& parentContext,
394  typename T::Context const* context):
395  m_worker(worker),
396  m_principal(ep),
397  m_es(es),
398  m_streamID(streamID),
399  m_parentContext(parentContext),
400  m_context(context),
401  m_serviceToken(ServiceRegistry::instance().presentToken()) {}
402 
403  tbb::task* execute() override {
404  //Need to make the services available early so other services can see them
405  ServiceRegistry::Operate guard(m_serviceToken);
406 
407  //incase the emit causes an exception, we need a memory location
408  // to hold the exception_ptr
409  std::exception_ptr temp_excptr;
410  auto excptr = exceptionPtr();
411  if(T::isEvent_ && !m_worker->hasAcquire()) {
412  try {
413  //pre was called in prefetchAsync
414  m_worker->emitPostModuleEventPrefetchingSignal();
415  }catch(...) {
416  temp_excptr = std::current_exception();
417  if(not excptr) {
418  excptr = &temp_excptr;
419  }
420  }
421  }
422 
423  if( not excptr) {
424  if(auto queue = m_worker->serializeRunModule()) {
425  auto const & principal = m_principal;
426  auto& es = m_es;
427  queue.push( [worker = m_worker, &principal,
428  &es, streamID = m_streamID,
429  parentContext = m_parentContext,
430  sContext = m_context, serviceToken = m_serviceToken]()
431  {
432  //Need to make the services available
433  ServiceRegistry::Operate guard(serviceToken);
434 
435  std::exception_ptr* ptr = nullptr;
436  worker->template runModuleAfterAsyncPrefetch<T>(ptr,
437  principal,
438  es,
439  streamID,
440  parentContext,
441  sContext);
442  });
443  return nullptr;
444  }
445  }
446 
447  m_worker->runModuleAfterAsyncPrefetch<T>(excptr,
448  m_principal,
449  m_es,
450  m_streamID,
451  m_parentContext,
452  m_context);
453  return nullptr;
454  }
455 
456  private:
458  typename T::MyPrincipal const& m_principal;
459  EventSetup const& m_es;
462  typename T::Context const* m_context;
464  };
465 
466  // AcquireTask is only used for the Event case, but we define
467  // it as a template so all cases will compile.
468  // DUMMY exists to work around the C++ Standard prohibition on
469  // fully specializing templates nested in other classes.
470  template <typename T, typename DUMMY = void>
471  class AcquireTask : public WaitingTask {
472  public:
474  typename T::MyPrincipal const& ep,
475  EventSetup const& es,
476  ParentContext const& parentContext,
477  WaitingTaskWithArenaHolder holder) {}
478  tbb::task* execute() override { return nullptr; }
479  };
480 
481  template <typename DUMMY>
483  public:
485  EventPrincipal const& ep,
486  EventSetup const& es,
487  ParentContext const& parentContext,
489  m_worker(worker),
490  m_principal(ep),
491  m_es(es),
492  m_parentContext(parentContext),
493  m_holder(std::move(holder)),
494  m_serviceToken(ServiceRegistry::instance().presentToken()) {}
495 
496  tbb::task* execute() override {
497  //Need to make the services available early so other services can see them
498  ServiceRegistry::Operate guard(m_serviceToken);
499 
500  //incase the emit causes an exception, we need a memory location
501  // to hold the exception_ptr
502  std::exception_ptr temp_excptr;
503  auto excptr = exceptionPtr();
504  try {
505  //pre was called in prefetchAsync
506  m_worker->emitPostModuleEventPrefetchingSignal();
507  } catch(...) {
508  temp_excptr = std::current_exception();
509  if(not excptr) {
510  excptr = &temp_excptr;
511  }
512  }
513 
514  if( not excptr) {
515  if(auto queue = m_worker->serializeRunModule()) {
516  auto const & principal = m_principal;
517  auto& es = m_es;
518  queue.push( [worker = m_worker, &principal,
519  &es, parentContext = m_parentContext,
520  serviceToken = m_serviceToken, holder = m_holder]()
521  {
522  //Need to make the services available
523  ServiceRegistry::Operate guard(serviceToken);
524 
525  std::exception_ptr* ptr = nullptr;
526  worker->runAcquireAfterAsyncPrefetch(ptr,
527  principal,
528  es,
529  parentContext,
530  holder);
531  });
532  return nullptr;
533  }
534  }
535 
536  m_worker->runAcquireAfterAsyncPrefetch(excptr,
537  m_principal,
538  m_es,
539  m_parentContext,
540  std::move(m_holder));
541  return nullptr;
542  }
543 
544  private:
547  EventSetup const& m_es;
551  };
552 
553  // This class does nothing unless there is an exception originating
554  // in an "External Worker". In that case, it handles converting the
555  // exception to a CMS exception and adding context to the exception.
557  public:
558 
560  WaitingTask* runModuleTask,
561  ParentContext const& parentContext);
562 
563  tbb::task* execute() override;
564 
565  private:
569  };
570 
571  std::atomic<int> timesRun_;
572  std::atomic<int> timesVisited_;
573  std::atomic<int> timesPassed_;
574  std::atomic<int> timesFailed_;
575  std::atomic<int> timesExcept_;
576  std::atomic<State> state_;
578  std::atomic<int> numberOfPathsLeftToRun_;
579 
581 
582  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
583  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
584 
585  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
586 
587  edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
588 
589  edm::WaitingTaskList waitingTasks_;
590  std::atomic<bool> workStarted_;
591  bool ranAcquireWithoutException_;
592  };
593 
594  namespace {
595  template <typename T>
596  class ModuleSignalSentry {
597  public:
598  ModuleSignalSentry(ActivityRegistry *a,
599  typename T::Context const* context,
600  ModuleCallingContext const* moduleCallingContext) :
601  a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
602 
603  if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
604  }
605 
606  ~ModuleSignalSentry() {
607  if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
608  }
609 
610  private:
611  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
612  typename T::Context const* context_;
613  ModuleCallingContext const* moduleCallingContext_;
614  };
615 
616  }
617 
618  namespace workerhelper {
619  template<>
621  public:
623  static bool call(Worker* iWorker, StreamID,
624  EventPrincipal const& ep, EventSetup const& es,
625  ActivityRegistry* /* actReg */,
626  ModuleCallingContext const* mcc,
627  Arg::Context const* /* context*/) {
628  //Signal sentry is handled by the module
629  return iWorker->implDo(ep,es, mcc);
630  }
631  static bool wantsTransition(Worker const* iWorker) {
632  return true;
633  }
634  static bool needToRunSelection( Worker const* iWorker) {
635  return iWorker->implNeedToRunSelection();
636  }
637  };
638 
639  template<>
641  public:
643  static bool call(Worker* iWorker,StreamID,
644  RunPrincipal const& ep, EventSetup const& es,
645  ActivityRegistry* actReg,
646  ModuleCallingContext const* mcc,
647  Arg::Context const* context) {
648  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
649  return iWorker->implDoBegin(ep,es, mcc);
650  }
651  static bool wantsTransition(Worker const* iWorker) {
652  return iWorker->wantsGlobalRuns();
653  }
654  static bool needToRunSelection( Worker const* iWorker) {
655  return false;
656  }
657  };
658  template<>
660  public:
662  static bool call(Worker* iWorker,StreamID id,
663  RunPrincipal const & ep, EventSetup const& es,
664  ActivityRegistry* actReg,
665  ModuleCallingContext const* mcc,
666  Arg::Context const* context) {
667  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
668  return iWorker->implDoStreamBegin(id,ep,es, mcc);
669  }
670  static bool wantsTransition(Worker const* iWorker) {
671  return iWorker->wantsStreamRuns();
672  }
673  static bool needToRunSelection( Worker const* iWorker) {
674  return false;
675  }
676  };
677  template<>
679  public:
681  static bool call(Worker* iWorker,StreamID,
682  RunPrincipal const& ep, EventSetup const& es,
683  ActivityRegistry* actReg,
684  ModuleCallingContext const* mcc,
685  Arg::Context const* context) {
686  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
687  return iWorker->implDoEnd(ep,es, mcc);
688  }
689  static bool wantsTransition(Worker const* iWorker) {
690  return iWorker->wantsGlobalRuns();
691  }
692  static bool needToRunSelection( Worker const* iWorker) {
693  return false;
694  }
695  };
696  template<>
698  public:
700  static bool call(Worker* iWorker,StreamID id,
701  RunPrincipal const& ep, EventSetup const& es,
702  ActivityRegistry* actReg,
703  ModuleCallingContext const* mcc,
704  Arg::Context const* context) {
705  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
706  return iWorker->implDoStreamEnd(id,ep,es, mcc);
707  }
708  static bool wantsTransition(Worker const* iWorker) {
709  return iWorker->wantsStreamRuns();
710  }
711  static bool needToRunSelection( Worker const* iWorker) {
712  return false;
713  }
714  };
715 
716  template<>
718  public:
720  static bool call(Worker* iWorker,StreamID,
721  LuminosityBlockPrincipal const& ep, EventSetup const& es,
722  ActivityRegistry* actReg,
723  ModuleCallingContext const* mcc,
724  Arg::Context const* context) {
725  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
726  return iWorker->implDoBegin(ep,es, mcc);
727  }
728  static bool wantsTransition(Worker const* iWorker) {
729  return iWorker->wantsGlobalLuminosityBlocks();
730  }
731  static bool needToRunSelection( Worker const* iWorker) {
732  return false;
733  }
734  };
735  template<>
737  public:
739  static bool call(Worker* iWorker,StreamID id,
740  LuminosityBlockPrincipal const& ep, EventSetup const& es,
741  ActivityRegistry* actReg,
742  ModuleCallingContext const* mcc,
743  Arg::Context const* context) {
744  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
745  return iWorker->implDoStreamBegin(id,ep,es, mcc);
746  }
747  static bool wantsTransition(Worker const* iWorker) {
748  return iWorker->wantsStreamLuminosityBlocks();
749  }
750  static bool needToRunSelection( Worker const* iWorker) {
751  return false;
752  }
753 };
754 
755  template<>
757  public:
759  static bool call(Worker* iWorker,StreamID,
760  LuminosityBlockPrincipal const& ep, EventSetup const& es,
761  ActivityRegistry* actReg,
762  ModuleCallingContext const* mcc,
763  Arg::Context const* context) {
764  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
765  return iWorker->implDoEnd(ep,es, mcc);
766  }
767  static bool wantsTransition(Worker const* iWorker) {
768  return iWorker->wantsGlobalLuminosityBlocks();
769  }
770  static bool needToRunSelection( Worker const* iWorker) {
771  return false;
772  }
773  };
774  template<>
776  public:
778  static bool call(Worker* iWorker,StreamID id,
779  LuminosityBlockPrincipal const& ep, EventSetup const& es,
780  ActivityRegistry* actReg,
781  ModuleCallingContext const* mcc,
782  Arg::Context const* context) {
783  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
784  return iWorker->implDoStreamEnd(id,ep,es, mcc);
785  }
786  static bool wantsTransition(Worker const* iWorker) {
787  return iWorker->wantsStreamLuminosityBlocks();
788  }
789  static bool needToRunSelection( Worker const* iWorker) {
790  return false;
791  }
792  };
793  }
794 
795  template <typename T>
797  typename T::MyPrincipal const& ep,
798  EventSetup const& es,
799  StreamID streamID,
800  ParentContext const& parentContext,
801  typename T::Context const* context) {
803  return;
804  }
805 
806  waitingTasks_.add(task);
807  if(T::isEvent_) {
808  timesVisited_.fetch_add(1,std::memory_order_relaxed);
809  }
810 
811  bool expected = false;
812  if(workStarted_.compare_exchange_strong(expected,true)) {
813  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching,parentContext,nullptr);
814 
815  //if have TriggerResults based selection we want to reject the event before doing prefetching
817  //We need to run the selection in a different task so that
818  // we can prefetch the data needed for the selection
819  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
820  this, ep,es,streamID,parentContext,context);
821 
822  //make sure the task is either run or destroyed
823  struct DestroyTask {
824  DestroyTask(edm::WaitingTask* iTask):
825  m_task(iTask) {}
826 
827  ~DestroyTask() {
828  auto p = m_task.load();
829  if(p) {
831  }
832  }
833 
835  auto t = m_task.load();
836  m_task.store(nullptr);
837  return t;
838  }
839 
840  std::atomic<edm::WaitingTask*> m_task;
841  };
842 
843  auto ownRunTask = std::make_shared<DestroyTask>(runTask);
844  auto token = ServiceRegistry::instance().presentToken();
845  auto selectionTask = make_waiting_task(tbb::task::allocate_root(), [ownRunTask,parentContext,&ep,token, this] (std::exception_ptr const* ) mutable {
846 
847  ServiceRegistry::Operate guard(token);
848  prefetchAsync(ownRunTask->release(), parentContext, ep);
849  });
850  prePrefetchSelectionAsync(selectionTask,streamID, &ep);
851  } else {
852  WaitingTask* moduleTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
853  this, ep, es, streamID, parentContext, context);
854  if (T::isEvent_ && hasAcquire()) {
855  WaitingTaskWithArenaHolder runTaskHolder(
856  new (tbb::task::allocate_root())
858  moduleTask,
859  parentContext));
860  moduleTask = new (tbb::task::allocate_root()) AcquireTask<T>(
861  this, ep, es, parentContext, std::move(runTaskHolder));
862  }
863  prefetchAsync(moduleTask, parentContext, ep);
864  }
865  }
866  }
867 
868  template<typename T>
869  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
870  typename T::MyPrincipal const& ep,
871  EventSetup const& es,
872  StreamID streamID,
873  ParentContext const& parentContext,
874  typename T::Context const* context) {
875  std::exception_ptr exceptionPtr;
876  if(iEPtr) {
877  assert(*iEPtr);
879  if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
880  exceptionPtr = *iEPtr;
881  setException<T::isEvent_>(exceptionPtr);
882  } else {
883  setPassed<T::isEvent_>();
884  }
885  moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr);
886  } else {
887  try {
888  runModule<T>(ep,es,streamID,parentContext,context);
889  } catch(...) {
890  exceptionPtr = std::current_exception();
891  }
892  }
893  waitingTasks_.doneWaiting(exceptionPtr);
894  return exceptionPtr;
895  }
896 
897  template <typename T>
899  typename T::MyPrincipal const& principal,
900  EventSetup const& es,
901  StreamID streamID,
902  ParentContext const& parentContext,
903  typename T::Context const* context) {
905  return;
906  }
907  waitingTasks_.add(task);
908  bool expected = false;
909  if(workStarted_.compare_exchange_strong(expected,true)) {
910  auto serviceToken = ServiceRegistry::instance().presentToken();
911 
912  auto toDo =[this, &principal, &es, streamID,parentContext,context, serviceToken]()
913  {
914  std::exception_ptr exceptionPtr;
915  try {
916  //Need to make the services available
917  ServiceRegistry::Operate guard(serviceToken);
918 
919  this->runModule<T>(principal,
920  es,
921  streamID,
922  parentContext,
923  context);
924  } catch( ... ) {
925  exceptionPtr = std::current_exception();
926  }
927  this->waitingTasks_.doneWaiting(exceptionPtr);
928  };
929  if(auto queue = this->serializeRunModule()) {
930  queue.push( toDo);
931  } else {
932  auto task = make_functor_task( tbb::task::allocate_root(), toDo);
933  tbb::task::spawn(*task);
934  }
935  }
936  }
937 
938  template <typename T>
939  bool Worker::doWork(typename T::MyPrincipal const& ep,
940  EventSetup const& es,
941  StreamID streamID,
942  ParentContext const& parentContext,
943  typename T::Context const* context) {
944 
945  if (T::isEvent_) {
946  timesVisited_.fetch_add(1,std::memory_order_relaxed);
947  }
948  bool rc = false;
949 
950  switch(state_) {
951  case Ready: break;
952  case Pass: return true;
953  case Fail: return false;
954  case Exception: {
955  std::rethrow_exception(cached_exception_);
956  }
957  }
958 
959  bool expected = false;
960  if(not workStarted_.compare_exchange_strong(expected, true) ) {
961  //another thread beat us here
962  auto waitTask = edm::make_empty_waiting_task();
963  waitTask->increment_ref_count();
964 
965  waitingTasks_.add(waitTask.get());
966 
967  waitTask->wait_for_all();
968 
969  switch(state_) {
970  case Ready: assert(false);
971  case Pass: return true;
972  case Fail: return false;
973  case Exception: {
974  std::rethrow_exception(cached_exception_);
975  }
976  }
977  }
978 
979  //Need the context to be set until after any exception is resolved
980  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching,parentContext,nullptr);
981 
982  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
983  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
984 
985  if (T::isEvent_) {
986  //if have TriggerResults based selection we want to reject the event before doing prefetching
988  auto waitTask = edm::make_empty_waiting_task();
989  waitTask->set_ref_count(2);
990  prePrefetchSelectionAsync(waitTask.get(), streamID, &ep);
991  waitTask->decrement_ref_count();
992  waitTask->wait_for_all();
993 
994  if(state() != Ready) {
995  //The selection must have rejected this event
996  return true;
997  }
998  }
999  auto waitTask = edm::make_empty_waiting_task();
1000  {
1001  //Make sure signal is sent once the prefetching is done
1002  // [the 'pre' signal was sent in prefetchAsync]
1003  //The purpose of this block is to send the signal after wait_for_all
1004  auto sentryFunc = [this](void*) {
1005  emitPostModuleEventPrefetchingSignal();
1006  };
1007  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
1008 
1009  //set count to 2 since wait_for_all requires value to not go to 0
1010  waitTask->set_ref_count(2);
1011 
1012  prefetchAsync(waitTask.get(),parentContext, ep);
1013  waitTask->decrement_ref_count();
1014  waitTask->wait_for_all();
1015  }
1016  if(waitTask->exceptionPtr() != nullptr) {
1018  if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
1019  setException<T::isEvent_>(*waitTask->exceptionPtr());
1020  waitingTasks_.doneWaiting(cached_exception_);
1021  std::rethrow_exception(cached_exception_);
1022  } else {
1023  setPassed<T::isEvent_>();
1024  waitingTasks_.doneWaiting(nullptr);
1025  return true;
1026  }
1027  }
1028  }
1029 
1030  //successful prefetch so no reset necessary
1031  prefetchSentry.release();
1032  if(auto queue = serializeRunModule()) {
1033  auto serviceToken = ServiceRegistry::instance().presentToken();
1034  queue.pushAndWait([&]() {
1035  //Need to make the services available
1036  ServiceRegistry::Operate guard(serviceToken);
1037  try {
1038  rc = runModule<T>(ep,es,streamID,parentContext,context);
1039  } catch(...) {
1040  }
1041  });
1042  } else {
1043  try {
1044  rc = runModule<T>(ep,es,streamID,parentContext,context);
1045  } catch(...) {
1046  }
1047  }
1048  if(state_ == Exception) {
1049  waitingTasks_.doneWaiting(cached_exception_);
1050  std::rethrow_exception(cached_exception_);
1051  }
1052 
1053  waitingTasks_.doneWaiting(nullptr);
1054  return rc;
1055  }
1056 
1057 
1058  template <typename T>
1059  bool Worker::runModule(typename T::MyPrincipal const& ep,
1060  EventSetup const& es,
1061  StreamID streamID,
1062  ParentContext const& parentContext,
1063  typename T::Context const* context) {
1064  //unscheduled producers should advance this
1065  //if (T::isEvent_) {
1066  // ++timesVisited_;
1067  //}
1068  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1069  if (T::isEvent_) {
1070  timesRun_.fetch_add(1,std::memory_order_relaxed);
1071  }
1072 
1073  bool rc = true;
1074  try {
1076  {
1077  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
1078 
1079  if (rc) {
1080  setPassed<T::isEvent_>();
1081  } else {
1082  setFailed<T::isEvent_>();
1083  }
1084  });
1085  } catch(cms::Exception& ex) {
1086  exceptionContext(ex, &moduleCallingContext_);
1088  if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
1089  assert(not cached_exception_);
1090  setException<T::isEvent_>(std::current_exception());
1091  std::rethrow_exception(cached_exception_);
1092  } else {
1093  rc = setPassed<T::isEvent_>();
1094  }
1095  }
1096 
1097  return rc;
1098  }
1099 
1100  template <typename T>
1101  std::exception_ptr Worker::runModuleDirectly(typename T::MyPrincipal const& ep,
1102  EventSetup const& es,
1103  StreamID streamID,
1104  ParentContext const& parentContext,
1105  typename T::Context const* context) {
1106 
1107  timesVisited_.fetch_add(1,std::memory_order_relaxed);
1108  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
1109  return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
1110  }
1111 }
1112 #endif
void push(T &&iAction)
asynchronously pushes functor iAction into queue
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:759
tbb::task * execute() override
Definition: Worker.h:403
std::atomic< int > timesVisited_
Definition: Worker.h:572
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:389
ModuleDescription const & description() const
Definition: Worker.h:182
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
Definition: Worker.h:462
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:1059
void callWhenDoneAsync(WaitingTask *task)
Definition: Worker.h:159
T::MyPrincipal const & m_principal
Definition: Worker.h:458
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:578
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:345
virtual bool wantsGlobalRuns() const =0
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:700
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:738
Definition: hltDiff.cc:290
static PFTauRenderPlugin instance
ParentContext const m_parentContext
Definition: Worker.h:461
def destroy(e)
Definition: pyrootRender.py:13
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< int > timesExcept_
Definition: Worker.h:575
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:680
std::atomic< int > timesFailed_
Definition: Worker.h:574
int timesPassed() const
Definition: Worker.h:220
void addedToPath()
Definition: Worker.h:214
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:939
State state() const
Definition: Worker.h:223
void prePrefetchSelectionAsync(WaitingTask *task, StreamID stream, void const *)
Definition: Worker.h:133
void clearCounters()
Definition: Worker.h:206
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:720
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:88
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:643
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:623
void push(T &&iAction)
asynchronously pushes functor iAction into queue
ActivityRegistry * activityRegistry()
Definition: Worker.h:266
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:778
void exceptionContext(std::ostream &, GlobalContext const &)
ExceptionToActionTable const * actions_
Definition: Worker.h:582
ServiceToken presentToken() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:1101
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
BranchType
Definition: BranchType.h:11
void beginJob()
Definition: Breakpoints.cc:15
tbb::task * execute() override
Definition: Worker.h:478
bool setFailed()
Definition: Worker.h:336
int timesExcept() const
Definition: Worker.h:222
void pushAndWait(F &&iF)
Definition: Worker.h:102
def principal(options)
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
void reset()
Definition: Worker.h:172
virtual bool implNeedToRunSelection() const =0
int timesVisited() const
Definition: Worker.h:219
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:642
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
ModuleDescription const * descPtr() const
Definition: Worker.h:183
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.h:170
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:758
virtual bool wantsStreamLuminosityBlocks() const =0
std::atomic< State > state_
Definition: Worker.h:576
int timesRun() const
Definition: Worker.h:218
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:662
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:681
Definition: Types.py:1
int numberOfPathsOn_
Definition: Worker.h:577
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:777
bool setPassed()
Definition: Worker.h:327
TransitionIDValue(T const &iP)
Definition: Worker.h:310
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:89
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:622
std::string value() const override
Definition: Worker.h:311
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:898
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:661
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:869
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:167
virtual bool wantsGlobalLuminosityBlocks() const =0
int timesPass() const
Definition: Worker.h:225
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
const T & get() const
Definition: EventSetup.h:58
AcquireTask(Worker *worker, EventPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:484
Definition: hltDiff.cc:291
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:739
int timesFailed() const
Definition: Worker.h:221
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:168
virtual bool wantsStreamRuns() const =0
HLT enums.
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:358
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
double a
Definition: hdecay.h:121
std::atomic< int > timesPassed_
Definition: Worker.h:573
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
AcquireTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:473
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:796
static Interceptor::Registry registry("Interceptor")
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:719
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:281
std::atomic< int > timesRun_
Definition: Worker.h:571
long double T
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:699
def move(src, dest)
Definition: eostools.py:510
ServiceToken m_serviceToken
Definition: Worker.h:463
EventSetup const & m_es
Definition: Worker.h:459