CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
58 
60 
61 #include <array>
62 #include <atomic>
63 #include <cassert>
64 #include <map>
65 #include <memory>
66 #include <sstream>
67 #include <string>
68 #include <vector>
69 #include <exception>
70 #include <unordered_map>
71 
72 namespace edm {
73  class EventPrincipal;
74  class EventSetupImpl;
75  class EarlyDeleteHelper;
76  class ModuleProcessName;
77  class ProductResolverIndexHelper;
78  class ProductResolverIndexAndSkipBit;
79  class StreamID;
80  class StreamContext;
81  class ProductRegistry;
82  class ThinnedAssociationsHelper;
83 
84  namespace workerhelper {
85  template <typename O>
86  class CallImpl;
87  }
88  namespace eventsetup {
90  }
91 
92  class Worker {
93  public:
94  enum State { Ready, Pass, Fail, Exception };
100 
101  TaskQueueAdaptor() = default;
103  TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
104 
105  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
106 
107  template <class F>
108  void push(oneapi::tbb::task_group& iG, F&& iF) {
109  if (serial_) {
110  serial_->push(iG, iF);
111  } else {
112  limited_->push(iG, iF);
113  }
114  }
115  };
116 
117  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
118  virtual ~Worker();
119 
120  Worker(Worker const&) = delete; // Disallow copying and moving
121  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
122 
123  void clearModule() {
124  moduleValid_ = false;
125  doClearModule();
126  }
127 
128  virtual bool wantsProcessBlocks() const = 0;
129  virtual bool wantsInputProcessBlocks() const = 0;
130  virtual bool wantsGlobalRuns() const = 0;
131  virtual bool wantsGlobalLuminosityBlocks() const = 0;
132  virtual bool wantsStreamRuns() const = 0;
133  virtual bool wantsStreamLuminosityBlocks() const = 0;
134 
135  virtual SerialTaskQueue* globalRunsQueue() = 0;
137 
139  oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, EventPrincipal const*);
140 
142  oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) {
143  assert(false);
144  }
145 
146  template <typename T>
148  typename T::TransitionInfoType const&,
149  ServiceToken const&,
150  StreamID,
151  ParentContext const&,
152  typename T::Context const*);
153 
154  template <typename T>
156  typename T::TransitionInfoType const&,
157  ServiceToken const&,
158  StreamID,
159  ParentContext const&,
160  typename T::Context const*);
161 
162  template <typename T>
163  std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
164  StreamID,
165  ParentContext const&,
166  typename T::Context const*);
167 
169  void skipOnPath(EventPrincipal const& iEvent);
170  void beginJob();
171  void endJob();
172  void beginStream(StreamID id, StreamContext& streamContext);
173  void endStream(StreamID id, StreamContext& streamContext);
178 
179  void reset() {
180  cached_exception_ = std::exception_ptr();
181  state_ = Ready;
183  workStarted_ = false;
185  }
186 
187  void postDoEvent(EventPrincipal const&);
188 
190  if (moduleValid_) {
192  }
193  return nullptr;
194  }
197  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
198 
200 
201  //Used to make EDGetToken work
202  virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
203  virtual void updateLookup(eventsetup::ESRecordsToProxyIndices const&) = 0;
204  virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
205  virtual void resolvePutIndicies(
206  BranchType iBranchType,
207  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
208  iIndicies) = 0;
209 
210  virtual void modulesWhoseProductsAreConsumed(
211  std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
212  std::vector<ModuleProcessName>& modulesInPreviousProcesses,
213  ProductRegistry const& preg,
214  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
215 
216  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
217 
218  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
219 
220  virtual Types moduleType() const = 0;
221  virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
222 
223  void clearCounters() {
224  timesRun_.store(0, std::memory_order_release);
225  timesVisited_.store(0, std::memory_order_release);
226  timesPassed_.store(0, std::memory_order_release);
227  timesFailed_.store(0, std::memory_order_release);
228  timesExcept_.store(0, std::memory_order_release);
229  }
230 
232  //NOTE: calling state() is done to force synchronization across threads
233  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
234  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
235  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
236  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
237  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
238  State state() const { return state_; }
239 
240  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
241 
242  virtual bool hasAccumulator() const = 0;
243 
244  // Used in PuttableProductResolver
246 
247  protected:
248  template <typename O>
250 
251  virtual void doClearModule() = 0;
252 
253  virtual std::string workerType() const = 0;
254  virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
255 
256  virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
257  virtual bool implNeedToRunSelection() const = 0;
258 
259  virtual void implDoAcquire(EventTransitionInfo const&,
260  ModuleCallingContext const*,
262 
264  virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
266  virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
267  virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
268  virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
269  virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
270  virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
271  virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
272  virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
273  virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
274  virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
275  virtual void implBeginJob() = 0;
276  virtual void implEndJob() = 0;
277  virtual void implBeginStream(StreamID) = 0;
278  virtual void implEndStream(StreamID) = 0;
279 
281 
283 
284  private:
285  template <typename T>
286  bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
287 
288  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
289  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
290 
291  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
292 
293  virtual std::vector<ESProxyIndex> const& esItemsToGetFrom(Transition) const = 0;
294  virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
295  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
296 
298  ModuleCallingContext const& moduleCallingContext,
299  Principal const& iPrincipal) const = 0;
300 
301  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
302  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
303  virtual void implRespondToCloseOutputFile() = 0;
304 
306 
307  virtual TaskQueueAdaptor serializeRunModule() = 0;
308 
309  bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const;
310 
311  template <bool IS_EVENT>
312  bool setPassed() {
313  if (IS_EVENT) {
314  timesPassed_.fetch_add(1, std::memory_order_relaxed);
315  }
316  state_ = Pass;
317  return true;
318  }
319 
320  template <bool IS_EVENT>
321  bool setFailed() {
322  if (IS_EVENT) {
323  timesFailed_.fetch_add(1, std::memory_order_relaxed);
324  }
325  state_ = Fail;
326  return false;
327  }
328 
329  template <bool IS_EVENT>
330  std::exception_ptr setException(std::exception_ptr iException) {
331  if (IS_EVENT) {
332  timesExcept_.fetch_add(1, std::memory_order_relaxed);
333  }
334  cached_exception_ = iException; // propagate_const<T> has no reset() function
335  state_ = Exception;
336  return cached_exception_;
337  }
338 
339  template <typename T>
341  ServiceToken const&,
342  ParentContext const&,
343  typename T::TransitionInfoType const&,
344  Transition);
345 
347  void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const;
348 
349  bool needsESPrefetching(Transition iTrans) const noexcept {
350  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
351  }
352 
354  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
355  }
356 
358  actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
360  }
361 
363  actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
365  }
366 
367  virtual bool hasAcquire() const = 0;
368 
369  template <typename T>
370  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
371  typename T::TransitionInfoType const&,
372  StreamID,
373  ParentContext const&,
374  typename T::Context const*);
375 
377 
378  void runAcquireAfterAsyncPrefetch(std::exception_ptr,
379  EventTransitionInfo const&,
380  ParentContext const&,
382 
383  std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const& parentContext);
384 
385  template <typename T>
386  class RunModuleTask : public WaitingTask {
387  public:
389  typename T::TransitionInfoType const& transitionInfo,
390  ServiceToken const& token,
391  StreamID streamID,
392  ParentContext const& parentContext,
393  typename T::Context const* context,
394  oneapi::tbb::task_group* iGroup)
395  : m_worker(worker),
396  m_transitionInfo(transitionInfo),
397  m_streamID(streamID),
398  m_parentContext(parentContext),
401  m_group(iGroup) {}
402 
406  EnableQueueGuard(EnableQueueGuard const&) = delete;
407  EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
409  EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
411  if (queue_) {
412  queue_->resume();
413  }
414  }
415  };
416 
417  void execute() final {
418  //Need to make the services available early so other services can see them
420 
421  //incase the emit causes an exception, we need a memory location
422  // to hold the exception_ptr
423  std::exception_ptr temp_excptr;
424  auto excptr = exceptionPtr();
425  if constexpr (T::isEvent_) {
426  if (!m_worker->hasAcquire()) {
427  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
428  CMS_SA_ALLOW try {
429  //pre was called in prefetchAsync
431  } catch (...) {
432  temp_excptr = std::current_exception();
433  if (not excptr) {
434  excptr = temp_excptr;
435  }
436  }
437  }
438  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
440  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
442  }
443 
444  if (not excptr) {
445  if (auto queue = m_worker->serializeRunModule()) {
446  auto f = [worker = m_worker,
448  streamID = m_streamID,
449  parentContext = m_parentContext,
450  sContext = m_context,
451  serviceToken = m_serviceToken]() {
452  //Need to make the services available
453  ServiceRegistry::Operate operateRunModule(serviceToken.lock());
454 
455  //If needed, we pause the queue in begin transition and resume it
456  // at the end transition. This can guarantee that the module
457  // only processes one run or lumi at a time
459  std::exception_ptr ptr;
460  worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
461  };
462  //keep another global transition from running if necessary
464  if (gQueue) {
465  gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
466  gQueue->pause();
467  queue.push(*group, std::move(f));
468  });
469  } else {
470  queue.push(*m_group, std::move(f));
471  }
472  return;
473  }
474  }
475 
477  }
478 
479  private:
481  typename T::TransitionInfoType m_transitionInfo;
484  typename T::Context const* m_context;
486  oneapi::tbb::task_group* m_group;
487  };
488 
489  // AcquireTask is only used for the Event case, but we define
490  // it as a template so all cases will compile.
491  // DUMMY exists to work around the C++ Standard prohibition on
492  // fully specializing templates nested in other classes.
493  template <typename T, typename DUMMY = void>
494  class AcquireTask : public WaitingTask {
495  public:
497  typename T::TransitionInfoType const&,
498  ServiceToken const&,
499  ParentContext const&,
501  void execute() final {}
502  };
503 
504  template <typename DUMMY>
506  public:
508  EventTransitionInfo const& eventTransitionInfo,
509  ServiceToken const& token,
510  ParentContext const& parentContext,
512  : m_worker(worker),
513  m_eventTransitionInfo(eventTransitionInfo),
514  m_parentContext(parentContext),
515  m_holder(std::move(holder)),
516  m_serviceToken(token) {}
517 
518  void execute() final {
519  //Need to make the services available early so other services can see them
520  ServiceRegistry::Operate guard(m_serviceToken.lock());
521 
522  //incase the emit causes an exception, we need a memory location
523  // to hold the exception_ptr
524  std::exception_ptr temp_excptr;
525  auto excptr = exceptionPtr();
526  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
527  CMS_SA_ALLOW try {
528  //pre was called in prefetchAsync
529  m_worker->emitPostModuleEventPrefetchingSignal();
530  } catch (...) {
531  temp_excptr = std::current_exception();
532  if (not excptr) {
533  excptr = temp_excptr;
534  }
535  }
536 
537  if (not excptr) {
538  if (auto queue = m_worker->serializeRunModule()) {
539  queue.push(*m_holder.group(),
540  [worker = m_worker,
541  info = m_eventTransitionInfo,
542  parentContext = m_parentContext,
543  serviceToken = m_serviceToken,
544  holder = m_holder]() {
545  //Need to make the services available
546  ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
547 
548  std::exception_ptr ptr;
549  worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
550  });
551  return;
552  }
553  }
554 
555  m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
556  }
557 
558  private:
564  };
565 
566  // This class does nothing unless there is an exception originating
567  // in an "External Worker". In that case, it handles converting the
568  // exception to a CMS exception and adding context to the exception.
570  public:
572  oneapi::tbb::task_group* group,
573  WaitingTask* runModuleTask,
574  ParentContext const& parentContext);
575 
576  void execute() final;
577 
578  private:
581  oneapi::tbb::task_group* m_group;
583  };
584 
585  std::atomic<int> timesRun_;
586  std::atomic<int> timesVisited_;
587  std::atomic<int> timesPassed_;
588  std::atomic<int> timesFailed_;
589  std::atomic<int> timesExcept_;
590  std::atomic<State> state_;
592  std::atomic<int> numberOfPathsLeftToRun_;
593 
595 
596  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
597  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
598 
599  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
600 
602 
604  std::atomic<bool> workStarted_;
606  bool moduleValid_ = true;
607  };
608 
609  namespace {
610  template <typename T>
611  class ModuleSignalSentry {
612  public:
613  ModuleSignalSentry(ActivityRegistry* a,
614  typename T::Context const* context,
615  ModuleCallingContext const* moduleCallingContext)
616  : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
617  if (a_)
618  T::preModuleSignal(a_, context, moduleCallingContext_);
619  }
620 
621  ~ModuleSignalSentry() {
622  if (a_)
623  T::postModuleSignal(a_, context_, moduleCallingContext_);
624  }
625 
626  private:
627  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
628  typename T::Context const* context_;
629  ModuleCallingContext const* moduleCallingContext_;
630  };
631 
632  } // namespace
633 
634  namespace workerhelper {
635  template <>
637  public:
639  static bool call(Worker* iWorker,
640  StreamID,
641  EventTransitionInfo const& info,
642  ActivityRegistry* /* actReg */,
643  ModuleCallingContext const* mcc,
644  Arg::Context const* /* context*/) {
645  //Signal sentry is handled by the module
646  return iWorker->implDo(info, mcc);
647  }
648  static void esPrefetchAsync(Worker* worker,
649  WaitingTaskHolder waitingTask,
650  ServiceToken const& token,
651  EventTransitionInfo const& info,
652  Transition transition) {
653  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
654  }
655  static bool wantsTransition(Worker const* iWorker) { return true; }
656  static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
657 
658  static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
659  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
660  };
661 
662  template <>
664  public:
666  static bool call(Worker* iWorker,
667  StreamID,
668  RunTransitionInfo const& info,
669  ActivityRegistry* actReg,
670  ModuleCallingContext const* mcc,
671  Arg::Context const* context) {
672  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
673  return iWorker->implDoBegin(info, mcc);
674  }
675  static void esPrefetchAsync(Worker* worker,
676  WaitingTaskHolder waitingTask,
677  ServiceToken const& token,
678  RunTransitionInfo const& info,
679  Transition transition) {
680  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
681  }
682  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
683  static bool needToRunSelection(Worker const* iWorker) { return false; }
684  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
685  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
686  };
687  template <>
689  public:
691  static bool call(Worker* iWorker,
692  StreamID id,
693  RunTransitionInfo const& info,
694  ActivityRegistry* actReg,
695  ModuleCallingContext const* mcc,
696  Arg::Context const* context) {
697  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
698  return iWorker->implDoStreamBegin(id, info, mcc);
699  }
700  static void esPrefetchAsync(Worker* worker,
701  WaitingTaskHolder waitingTask,
702  ServiceToken const& token,
703  RunTransitionInfo const& info,
704  Transition transition) {
705  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
706  }
707  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
708  static bool needToRunSelection(Worker const* iWorker) { return false; }
709  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
710  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
711  };
712  template <>
714  public:
716  static bool call(Worker* iWorker,
717  StreamID,
718  RunTransitionInfo const& info,
719  ActivityRegistry* actReg,
720  ModuleCallingContext const* mcc,
721  Arg::Context const* context) {
722  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
723  return iWorker->implDoEnd(info, mcc);
724  }
725  static void esPrefetchAsync(Worker* worker,
726  WaitingTaskHolder waitingTask,
727  ServiceToken const& token,
728  RunTransitionInfo const& info,
729  Transition transition) {
730  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
731  }
732  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
733  static bool needToRunSelection(Worker const* iWorker) { return false; }
734  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
735  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
736  };
737  template <>
739  public:
741  static bool call(Worker* iWorker,
742  StreamID id,
743  RunTransitionInfo const& info,
744  ActivityRegistry* actReg,
745  ModuleCallingContext const* mcc,
746  Arg::Context const* context) {
747  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
748  return iWorker->implDoStreamEnd(id, info, mcc);
749  }
750  static void esPrefetchAsync(Worker* worker,
751  WaitingTaskHolder waitingTask,
752  ServiceToken const& token,
753  RunTransitionInfo const& info,
754  Transition transition) {
755  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
756  }
757  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
758  static bool needToRunSelection(Worker const* iWorker) { return false; }
759  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
760  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
761  };
762 
763  template <>
765  public:
767  static bool call(Worker* iWorker,
768  StreamID,
769  LumiTransitionInfo const& info,
770  ActivityRegistry* actReg,
771  ModuleCallingContext const* mcc,
772  Arg::Context const* context) {
773  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
774  return iWorker->implDoBegin(info, mcc);
775  }
776  static void esPrefetchAsync(Worker* worker,
777  WaitingTaskHolder waitingTask,
778  ServiceToken const& token,
779  LumiTransitionInfo const& info,
780  Transition transition) {
781  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
782  }
783  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
784  static bool needToRunSelection(Worker const* iWorker) { return false; }
785  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
786  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
787  };
788  template <>
790  public:
792  static bool call(Worker* iWorker,
793  StreamID id,
794  LumiTransitionInfo const& info,
795  ActivityRegistry* actReg,
796  ModuleCallingContext const* mcc,
797  Arg::Context const* context) {
798  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
799  return iWorker->implDoStreamBegin(id, info, mcc);
800  }
801  static void esPrefetchAsync(Worker* worker,
802  WaitingTaskHolder waitingTask,
803  ServiceToken const& token,
804  LumiTransitionInfo const& info,
805  Transition transition) {
806  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
807  }
808  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
809  static bool needToRunSelection(Worker const* iWorker) { return false; }
810  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
811  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
812  };
813 
814  template <>
816  public:
818  static bool call(Worker* iWorker,
819  StreamID,
820  LumiTransitionInfo const& info,
821  ActivityRegistry* actReg,
822  ModuleCallingContext const* mcc,
823  Arg::Context const* context) {
824  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
825  return iWorker->implDoEnd(info, mcc);
826  }
827  static void esPrefetchAsync(Worker* worker,
828  WaitingTaskHolder waitingTask,
829  ServiceToken const& token,
830  LumiTransitionInfo const& info,
831  Transition transition) {
832  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
833  }
834  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
835  static bool needToRunSelection(Worker const* iWorker) { return false; }
836  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
837  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
838  };
839  template <>
841  public:
843  static bool call(Worker* iWorker,
844  StreamID id,
845  LumiTransitionInfo const& info,
846  ActivityRegistry* actReg,
847  ModuleCallingContext const* mcc,
848  Arg::Context const* context) {
849  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
850  return iWorker->implDoStreamEnd(id, info, mcc);
851  }
852  static void esPrefetchAsync(Worker* worker,
853  WaitingTaskHolder waitingTask,
854  ServiceToken const& token,
855  LumiTransitionInfo const& info,
856  Transition transition) {
857  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
858  }
859  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
860  static bool needToRunSelection(Worker const* iWorker) { return false; }
861  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
862  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
863  };
864  template <>
866  public:
868  static bool call(Worker* iWorker,
869  StreamID,
871  ActivityRegistry* actReg,
872  ModuleCallingContext const* mcc,
873  Arg::Context const* context) {
874  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
875  return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
876  }
877  static void esPrefetchAsync(
879  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
880  static bool needToRunSelection(Worker const* iWorker) { return false; }
881  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
882  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
883  };
884  template <>
886  public:
888  static bool call(Worker* iWorker,
889  StreamID,
891  ActivityRegistry* actReg,
892  ModuleCallingContext const* mcc,
893  Arg::Context const* context) {
894  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
895  return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
896  }
897  static void esPrefetchAsync(
899  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsInputProcessBlocks(); }
900  static bool needToRunSelection(Worker const* iWorker) { return false; }
901  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
902  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
903  };
904  template <>
906  public:
908  static bool call(Worker* iWorker,
909  StreamID,
911  ActivityRegistry* actReg,
912  ModuleCallingContext const* mcc,
913  Arg::Context const* context) {
914  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
915  return iWorker->implDoEndProcessBlock(info.principal(), mcc);
916  }
917  static void esPrefetchAsync(
919  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
920  static bool needToRunSelection(Worker const* iWorker) { return false; }
921  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
922  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
923  };
924  } // namespace workerhelper
925 
926  template <typename T>
928  ServiceToken const& token,
929  ParentContext const& parentContext,
930  typename T::TransitionInfoType const& transitionInfo,
931  Transition iTransition) {
932  Principal const& principal = transitionInfo.principal();
933 
935 
936  if constexpr (T::isEvent_) {
937  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
938  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
939  actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
940  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
941  actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
942  }
943 
944  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
945  edPrefetchAsync(iTask, token, principal);
946 
947  if (principal.branchType() == InEvent) {
949  }
950  }
951 
952  template <typename T>
954  typename T::TransitionInfoType const& transitionInfo,
955  ServiceToken const& token,
956  StreamID streamID,
957  ParentContext const& parentContext,
958  typename T::Context const* context) {
960  return;
961  }
962 
963  //Need to check workStarted_ before adding to waitingTasks_
964  bool expected = false;
965  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
966 
968  if constexpr (T::isEvent_) {
969  timesVisited_.fetch_add(1, std::memory_order_relaxed);
970  }
971 
972  if (workStarted) {
974 
975  //if have TriggerResults based selection we want to reject the event before doing prefetching
977  //We need to run the selection in a different task so that
978  // we can prefetch the data needed for the selection
979  WaitingTask* moduleTask =
980  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
981 
982  //make sure the task is either run or destroyed
983  struct DestroyTask {
984  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
985 
986  ~DestroyTask() {
987  auto p = m_task.exchange(nullptr);
988  if (p) {
989  TaskSentry s{p};
990  }
991  }
992 
993  edm::WaitingTask* release() { return m_task.exchange(nullptr); }
994 
995  private:
996  std::atomic<edm::WaitingTask*> m_task;
997  };
998  if constexpr (T::isEvent_) {
999  if (hasAcquire()) {
1000  auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1001  ServiceWeakToken weakToken = token;
1002  auto* group = task.group();
1003  moduleTask = make_waiting_task(
1004  [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1005  WaitingTaskWithArenaHolder runTaskHolder(
1006  *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1007  AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1008  t.execute();
1009  });
1010  }
1011  }
1012  auto* group = task.group();
1013  auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1014  ServiceWeakToken weakToken = token;
1015  auto selectionTask =
1016  make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1017  std::exception_ptr const*) mutable {
1018  ServiceRegistry::Operate guard(weakToken.lock());
1019  prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1020  weakToken.lock(),
1021  parentContext,
1022  info,
1023  T::transition_);
1024  });
1025  prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1026  } else {
1027  WaitingTask* moduleTask =
1028  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1029  auto group = task.group();
1030  if constexpr (T::isEvent_) {
1031  if (hasAcquire()) {
1032  WaitingTaskWithArenaHolder runTaskHolder(
1033  *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1034  moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1035  }
1036  }
1037  prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1038  }
1039  }
1040  }
1041 
1042  template <typename T>
1043  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1044  typename T::TransitionInfoType const& transitionInfo,
1045  StreamID streamID,
1046  ParentContext const& parentContext,
1047  typename T::Context const* context) {
1048  std::exception_ptr exceptionPtr;
1049  if (iEPtr) {
1050  if (shouldRethrowException(iEPtr, parentContext, T::isEvent_)) {
1051  exceptionPtr = iEPtr;
1052  setException<T::isEvent_>(exceptionPtr);
1053  } else {
1054  setPassed<T::isEvent_>();
1055  }
1057  } else {
1058  // Caught exception is propagated via WaitingTaskList
1059  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1060  exceptionPtr = std::current_exception();
1061  }
1062  }
1063  waitingTasks_.doneWaiting(exceptionPtr);
1064  return exceptionPtr;
1065  }
1066 
1067  template <typename T>
1069  typename T::TransitionInfoType const& transitionInfo,
1070  ServiceToken const& serviceToken,
1071  StreamID streamID,
1072  ParentContext const& parentContext,
1073  typename T::Context const* context) {
1075  return;
1076  }
1077 
1078  //Need to check workStarted_ before adding to waitingTasks_
1079  bool expected = false;
1080  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1081 
1083  if (workStarted) {
1084  ServiceWeakToken weakToken = serviceToken;
1085  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1086  std::exception_ptr exceptionPtr;
1087  // Caught exception is propagated via WaitingTaskList
1088  CMS_SA_ALLOW try {
1089  //Need to make the services available
1090  ServiceRegistry::Operate guard(weakToken.lock());
1091 
1092  this->runModule<T>(info, streamID, parentContext, context);
1093  } catch (...) {
1094  exceptionPtr = std::current_exception();
1095  }
1096  this->waitingTasks_.doneWaiting(exceptionPtr);
1097  };
1098 
1099  if (needsESPrefetching(T::transition_)) {
1100  auto group = task.group();
1101  auto afterPrefetch =
1102  edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1103  if (iExcept) {
1104  this->waitingTasks_.doneWaiting(*iExcept);
1105  } else {
1106  if (auto queue = this->serializeRunModule()) {
1107  queue.push(*group, toDo);
1108  } else {
1109  group->run(toDo);
1110  }
1111  }
1112  });
1114  WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1115  } else {
1116  auto group = task.group();
1117  if (auto queue = this->serializeRunModule()) {
1118  queue.push(*group, toDo);
1119  } else {
1120  group->run(toDo);
1121  }
1122  }
1123  }
1124  }
1125 
1126  template <typename T>
1127  bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1128  StreamID streamID,
1129  ParentContext const& parentContext,
1130  typename T::Context const* context) {
1131  //unscheduled producers should advance this
1132  //if (T::isEvent_) {
1133  // ++timesVisited_;
1134  //}
1135  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1136  if constexpr (T::isEvent_) {
1137  timesRun_.fetch_add(1, std::memory_order_relaxed);
1138  }
1139 
1140  bool rc = true;
1141  try {
1142  convertException::wrap([&]() {
1144  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1145 
1146  if (rc) {
1147  setPassed<T::isEvent_>();
1148  } else {
1149  setFailed<T::isEvent_>();
1150  }
1151  });
1152  } catch (cms::Exception& ex) {
1154  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_)) {
1156  setException<T::isEvent_>(std::current_exception());
1157  std::rethrow_exception(cached_exception_);
1158  } else {
1159  rc = setPassed<T::isEvent_>();
1160  }
1161  }
1162 
1163  return rc;
1164  }
1165 
1166  template <typename T>
1167  std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1168  StreamID streamID,
1169  ParentContext const& parentContext,
1170  typename T::Context const* context) {
1171  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1172  std::exception_ptr prefetchingException; // null because there was no prefetching to do
1173  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1174  }
1175 } // namespace edm
1176 #endif
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1167
virtual bool hasAcquire() const =0
void clearModule()
Definition: Worker.h:123
std::atomic< int > timesVisited_
Definition: Worker.h:586
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
virtual bool wantsStreamRuns() const =0
ModuleDescription const * moduleDescription() const
void execute() final
Definition: Worker.h:417
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:239
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1043
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void push(oneapi::tbb::task_group &iG, F &&iF)
Definition: Worker.h:108
T::Context const * m_context
Definition: Worker.h:484
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:639
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:592
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
virtual bool hasAccumulator() const =0
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:330
#define DUMMY
Definition: DMRtrends.cc:34
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:767
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:237
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:507
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:666
void execute() final
Definition: Worker.h:501
virtual ~Worker()
Definition: Worker.cc:106
void endJob()
Definition: Worker.cc:263
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:776
ParentContext const m_parentContext
Definition: Worker.h:483
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:888
std::atomic< int > timesExcept_
Definition: Worker.h:589
oneapi::tbb::task_group * m_group
Definition: Worker.h:486
std::atomic< bool > workStarted_
Definition: Worker.h:604
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:715
std::atomic< int > timesFailed_
Definition: Worker.h:588
void emitPostModuleStreamPrefetchingSignal()
Definition: Worker.h:357
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:349
void addedToPath()
Definition: Worker.h:231
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:599
void clearCounters()
Definition: Worker.h:223
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:953
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:102
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.cc:367
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
virtual void implRespondToCloseOutputFile()=0
ActivityRegistry * activityRegistry()
Definition: Worker.h:282
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:691
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
Definition: Worker.h:141
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:391
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:280
bool moduleValid_
Definition: Worker.h:606
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
void reset()
Resets access to the resource so that added tasks will wait.
SerialTaskQueueChain * serial_
Definition: Worker.h:98
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:741
virtual std::string workerType() const =0
assert(be >=bs)
ExceptionToActionTable const * actions_
Definition: Worker.h:596
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
LimitedTaskQueue * limited_
Definition: Worker.h:99
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:605
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:108
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
oneapi::tbb::task_group * m_group
Definition: Worker.h:581
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:594
BranchType
Definition: BranchType.h:11
int timesPass() const
Definition: Worker.h:240
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.h:496
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:150
std::exception_ptr cached_exception_
Definition: Worker.h:597
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
void emitPostModuleGlobalPrefetchingSignal()
Definition: Worker.h:362
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
bool setFailed()
Definition: Worker.h:321
virtual void selectInputProcessBlocks(ProductRegistry const &, ProcessBlockHelperBase const &)=0
std::exception_ptr exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool resume()
Resumes processing if the queue was paused.
int timesPassed() const
Definition: Worker.h:235
int iEvent
Definition: GenABIO.cc:224
EnableQueueGuard(EnableQueueGuard &&iGuard)
Definition: Worker.h:409
Worker & operator=(Worker const &)=delete
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:603
T::TransitionInfoType m_transitionInfo
Definition: Worker.h:481
void reset()
Definition: Worker.h:179
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:750
ServiceWeakToken m_serviceToken
Definition: Worker.h:485
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual bool wantsInputProcessBlocks() const =0
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:852
virtual bool wantsProcessBlocks() const =0
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:917
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:665
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:110
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:675
int timesFailed() const
Definition: Worker.h:236
EnableQueueGuard(SerialTaskQueue *iQueue)
Definition: Worker.h:405
virtual std::vector< ConsumesInfo > consumesInfo() const =0
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:328
#define CMS_THREAD_GUARD(_var_)
Transition
Definition: Transition.h:12
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:89
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< State > state_
Definition: Worker.h:590
double f[11][100]
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:168
Definition: Types.py:1
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:199
int timesVisited() const
Definition: Worker.h:234
int numberOfPathsOn_
Definition: Worker.h:591
int timesExcept() const
Definition: Worker.h:237
bool setPassed()
Definition: Worker.h:312
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:716
virtual void implEndJob()=0
ServiceToken lock() const
Definition: ServiceToken.h:101
ConcurrencyTypes
Definition: Worker.h:96
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:404
virtual Types moduleType() const =0
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:868
ModuleDescription const * description() const
Definition: Worker.h:189
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:103
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:638
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1068
virtual bool implNeedToRunSelection() const =0
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual TaskQueueAdaptor serializeRunModule()=0
virtual SerialTaskQueue * globalRunsQueue()=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:690
virtual void implBeginJob()=0
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:174
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:224
void respondToCloseOutputFile()
Definition: Worker.h:176
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:337
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:792
virtual void implBeginStream(StreamID)=0
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
virtual void implEndStream(StreamID)=0
virtual void doClearModule()=0
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:175
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent() const =0
virtual bool wantsStreamLuminosityBlocks() const =0
virtual void convertCurrentProcessAlias(std::string const &processName)=0
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1127
HLT enums.
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:353
State state() const
Definition: Worker.h:238
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:801
StreamContext const * getStreamContext() const
double a
Definition: hdecay.h:119
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition)
Definition: Worker.h:648
std::atomic< int > timesPassed_
Definition: Worker.h:587
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:352
void beginJob()
Definition: Worker.cc:247
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:304
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:346
virtual bool wantsGlobalLuminosityBlocks() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:725
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:843
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:818
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:601
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:827
void prefetchAsync(WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
Definition: Worker.h:927
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
std::atomic< int > timesRun_
Definition: Worker.h:585
int timesRun() const
Definition: Worker.h:233
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:908
long double T
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
GlobalContext const * getGlobalContext() const
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:877
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:740
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:897
def move(src, dest)
Definition: eostools.py:511
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
EnableQueueGuard & operator=(EnableQueueGuard const &)=delete
edm::WaitingTaskList & waitingTaskList()
Definition: Worker.h:245
virtual ConcurrencyTypes moduleConcurrencyType() const =0
virtual bool wantsGlobalRuns() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:700
BranchType const & branchType() const
Definition: Principal.h:181
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context, oneapi::tbb::task_group *iGroup)
Definition: Worker.h:388
virtual void modulesWhoseProductsAreConsumed(std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const *> const &labelsToDesc) const =0
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0