CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
58 
60 
61 #include <atomic>
62 #include <cassert>
63 #include <map>
64 #include <memory>
65 #include <sstream>
66 #include <string>
67 #include <vector>
68 #include <exception>
69 #include <unordered_map>
70 
71 namespace edm {
72  class EventPrincipal;
73  class EventSetupImpl;
74  class EarlyDeleteHelper;
75  class ModuleProcessName;
76  class ProductResolverIndexHelper;
77  class ProductResolverIndexAndSkipBit;
78  class StreamID;
79  class StreamContext;
80  class ProductRegistry;
81  class ThinnedAssociationsHelper;
82 
83  namespace workerhelper {
84  template <typename O>
85  class CallImpl;
86  }
87  namespace eventsetup {
89  }
90 
91  class Worker {
92  public:
93  enum State { Ready, Pass, Fail, Exception };
99 
100  TaskQueueAdaptor() = default;
102  TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
103 
104  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
105 
106  template <class F>
107  void push(oneapi::tbb::task_group& iG, F&& iF) {
108  if (serial_) {
109  serial_->push(iG, iF);
110  } else {
111  limited_->push(iG, iF);
112  }
113  }
114  };
115 
116  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
117  virtual ~Worker();
118 
119  Worker(Worker const&) = delete; // Disallow copying and moving
120  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
121 
122  void clearModule() {
123  moduleValid_ = false;
124  doClearModule();
125  }
126 
127  virtual bool wantsProcessBlocks() const = 0;
128  virtual bool wantsInputProcessBlocks() const = 0;
129  virtual bool wantsGlobalRuns() const = 0;
130  virtual bool wantsGlobalLuminosityBlocks() const = 0;
131  virtual bool wantsStreamRuns() const = 0;
132  virtual bool wantsStreamLuminosityBlocks() const = 0;
133 
134  virtual SerialTaskQueue* globalRunsQueue() = 0;
136 
138  oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, EventPrincipal const*);
139 
141  oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) {
142  assert(false);
143  }
144 
145  template <typename T>
147  typename T::TransitionInfoType const&,
148  ServiceToken const&,
149  StreamID,
150  ParentContext const&,
151  typename T::Context const*);
152 
153  template <typename T>
155  typename T::TransitionInfoType const&,
156  ServiceToken const&,
157  StreamID,
158  ParentContext const&,
159  typename T::Context const*);
160 
161  template <typename T>
162  std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
163  StreamID,
164  ParentContext const&,
165  typename T::Context const*);
166 
168  void skipOnPath(EventPrincipal const& iEvent);
169  void beginJob();
170  void endJob();
171  void beginStream(StreamID id, StreamContext& streamContext);
172  void endStream(StreamID id, StreamContext& streamContext);
177 
178  void reset() {
179  cached_exception_ = std::exception_ptr();
180  state_ = Ready;
182  workStarted_ = false;
184  }
185 
186  void postDoEvent(EventPrincipal const&);
187 
189  if (moduleValid_) {
191  }
192  return nullptr;
193  }
196  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
197 
199 
200  //Used to make EDGetToken work
201  virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
202  virtual void updateLookup(eventsetup::ESRecordsToProxyIndices const&) = 0;
203  virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
204  virtual void resolvePutIndicies(
205  BranchType iBranchType,
206  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
207  iIndicies) = 0;
208 
209  virtual void modulesWhoseProductsAreConsumed(
210  std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
211  std::vector<ModuleProcessName>& modulesInPreviousProcesses,
212  ProductRegistry const& preg,
213  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
214 
215  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
216 
217  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
218 
219  virtual Types moduleType() const = 0;
220  virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
221 
222  void clearCounters() {
223  timesRun_.store(0, std::memory_order_release);
224  timesVisited_.store(0, std::memory_order_release);
225  timesPassed_.store(0, std::memory_order_release);
226  timesFailed_.store(0, std::memory_order_release);
227  timesExcept_.store(0, std::memory_order_release);
228  }
229 
231  //NOTE: calling state() is done to force synchronization across threads
232  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
233  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
234  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
235  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
236  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
237  State state() const { return state_; }
238 
239  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
240 
241  virtual bool hasAccumulator() const = 0;
242 
243  // Used in PuttableProductResolver
245 
246  protected:
247  template <typename O>
249 
250  virtual void doClearModule() = 0;
251 
252  virtual std::string workerType() const = 0;
253  virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
254 
255  virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
256  virtual bool implNeedToRunSelection() const = 0;
257 
258  virtual void implDoAcquire(EventTransitionInfo const&,
259  ModuleCallingContext const*,
261 
263  virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
265  virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
266  virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
267  virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
268  virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
269  virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
270  virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
271  virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
272  virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
273  virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
274  virtual void implBeginJob() = 0;
275  virtual void implEndJob() = 0;
276  virtual void implBeginStream(StreamID) = 0;
277  virtual void implEndStream(StreamID) = 0;
278 
280 
282 
283  private:
284  template <typename T>
285  bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
286 
287  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
288  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
289 
290  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
291 
292  virtual std::vector<ESProxyIndex> const& esItemsToGetFrom(Transition) const = 0;
293  virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
294  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
295 
297  ModuleCallingContext const& moduleCallingContext,
298  Principal const& iPrincipal) const = 0;
299 
300  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
301  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
302  virtual void implRespondToCloseOutputFile() = 0;
303 
305 
306  virtual TaskQueueAdaptor serializeRunModule() = 0;
307 
308  bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const;
309 
310  template <bool IS_EVENT>
311  bool setPassed() {
312  if (IS_EVENT) {
313  timesPassed_.fetch_add(1, std::memory_order_relaxed);
314  }
315  state_ = Pass;
316  return true;
317  }
318 
319  template <bool IS_EVENT>
320  bool setFailed() {
321  if (IS_EVENT) {
322  timesFailed_.fetch_add(1, std::memory_order_relaxed);
323  }
324  state_ = Fail;
325  return false;
326  }
327 
328  template <bool IS_EVENT>
329  std::exception_ptr setException(std::exception_ptr iException) {
330  if (IS_EVENT) {
331  timesExcept_.fetch_add(1, std::memory_order_relaxed);
332  }
333  cached_exception_ = iException; // propagate_const<T> has no reset() function
334  state_ = Exception;
335  return cached_exception_;
336  }
337 
338  template <typename T>
340  ServiceToken const&,
341  ParentContext const&,
342  typename T::TransitionInfoType const&,
343  Transition);
344 
346  void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const;
347 
348  bool needsESPrefetching(Transition iTrans) const noexcept {
349  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
350  }
351 
353  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
354  }
355 
356  virtual bool hasAcquire() const = 0;
357 
358  template <typename T>
359  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const*,
360  typename T::TransitionInfoType const&,
361  StreamID,
362  ParentContext const&,
363  typename T::Context const*);
364 
366 
367  void runAcquireAfterAsyncPrefetch(std::exception_ptr const*,
368  EventTransitionInfo const&,
369  ParentContext const&,
371 
372  std::exception_ptr handleExternalWorkException(std::exception_ptr const* iEPtr, ParentContext const& parentContext);
373 
374  template <typename T>
375  class RunModuleTask : public WaitingTask {
376  public:
378  typename T::TransitionInfoType const& transitionInfo,
379  ServiceToken const& token,
380  StreamID streamID,
381  ParentContext const& parentContext,
382  typename T::Context const* context,
383  oneapi::tbb::task_group* iGroup)
384  : m_worker(worker),
385  m_transitionInfo(transitionInfo),
386  m_streamID(streamID),
387  m_parentContext(parentContext),
390  m_group(iGroup) {}
391 
395  EnableQueueGuard(EnableQueueGuard const&) = delete;
396  EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
398  EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
400  if (queue_) {
401  queue_->resume();
402  }
403  }
404  };
405 
406  void execute() final {
407  //Need to make the services available early so other services can see them
409 
410  //incase the emit causes an exception, we need a memory location
411  // to hold the exception_ptr
412  std::exception_ptr temp_excptr;
413  auto excptr = exceptionPtr();
414  if constexpr (T::isEvent_) {
415  if (!m_worker->hasAcquire()) {
416  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
417  CMS_SA_ALLOW try {
418  //pre was called in prefetchAsync
420  } catch (...) {
421  temp_excptr = std::current_exception();
422  if (not excptr) {
423  excptr = &temp_excptr;
424  }
425  }
426  }
427  }
428 
429  if (not excptr) {
430  if (auto queue = m_worker->serializeRunModule()) {
431  auto f = [worker = m_worker,
433  streamID = m_streamID,
434  parentContext = m_parentContext,
435  sContext = m_context,
436  serviceToken = m_serviceToken]() {
437  //Need to make the services available
438  ServiceRegistry::Operate operateRunModule(serviceToken.lock());
439 
440  //If needed, we pause the queue in begin transition and resume it
441  // at the end transition. This can guarantee that the module
442  // only processes one run or lumi at a time
444  std::exception_ptr* ptr = nullptr;
445  worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
446  };
447  //keep another global transition from running if necessary
449  if (gQueue) {
450  gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
451  gQueue->pause();
452  queue.push(*group, std::move(f));
453  });
454  } else {
455  queue.push(*m_group, std::move(f));
456  }
457  return;
458  }
459  }
460 
462  }
463 
464  private:
466  typename T::TransitionInfoType m_transitionInfo;
469  typename T::Context const* m_context;
471  oneapi::tbb::task_group* m_group;
472  };
473 
474  // AcquireTask is only used for the Event case, but we define
475  // it as a template so all cases will compile.
476  // DUMMY exists to work around the C++ Standard prohibition on
477  // fully specializing templates nested in other classes.
478  template <typename T, typename DUMMY = void>
479  class AcquireTask : public WaitingTask {
480  public:
482  typename T::TransitionInfoType const&,
483  ServiceToken const&,
484  ParentContext const&,
486  void execute() final {}
487  };
488 
489  template <typename DUMMY>
491  public:
493  EventTransitionInfo const& eventTransitionInfo,
494  ServiceToken const& token,
495  ParentContext const& parentContext,
497  : m_worker(worker),
498  m_eventTransitionInfo(eventTransitionInfo),
499  m_parentContext(parentContext),
500  m_holder(std::move(holder)),
501  m_serviceToken(token) {}
502 
503  void execute() final {
504  //Need to make the services available early so other services can see them
505  ServiceRegistry::Operate guard(m_serviceToken.lock());
506 
507  //incase the emit causes an exception, we need a memory location
508  // to hold the exception_ptr
509  std::exception_ptr temp_excptr;
510  auto excptr = exceptionPtr();
511  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
512  CMS_SA_ALLOW try {
513  //pre was called in prefetchAsync
514  m_worker->emitPostModuleEventPrefetchingSignal();
515  } catch (...) {
516  temp_excptr = std::current_exception();
517  if (not excptr) {
518  excptr = &temp_excptr;
519  }
520  }
521 
522  if (not excptr) {
523  if (auto queue = m_worker->serializeRunModule()) {
524  queue.push(*m_holder.group(),
525  [worker = m_worker,
526  info = m_eventTransitionInfo,
527  parentContext = m_parentContext,
528  serviceToken = m_serviceToken,
529  holder = m_holder]() {
530  //Need to make the services available
531  ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
532 
533  std::exception_ptr* ptr = nullptr;
534  worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
535  });
536  return;
537  }
538  }
539 
540  m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
541  }
542 
543  private:
549  };
550 
551  // This class does nothing unless there is an exception originating
552  // in an "External Worker". In that case, it handles converting the
553  // exception to a CMS exception and adding context to the exception.
555  public:
557  oneapi::tbb::task_group* group,
558  WaitingTask* runModuleTask,
559  ParentContext const& parentContext);
560 
561  void execute() final;
562 
563  private:
566  oneapi::tbb::task_group* m_group;
568  };
569 
570  std::atomic<int> timesRun_;
571  std::atomic<int> timesVisited_;
572  std::atomic<int> timesPassed_;
573  std::atomic<int> timesFailed_;
574  std::atomic<int> timesExcept_;
575  std::atomic<State> state_;
577  std::atomic<int> numberOfPathsLeftToRun_;
578 
580 
581  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
582  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
583 
584  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
585 
587 
589  std::atomic<bool> workStarted_;
591  bool moduleValid_ = true;
592  };
593 
594  namespace {
595  template <typename T>
596  class ModuleSignalSentry {
597  public:
598  ModuleSignalSentry(ActivityRegistry* a,
599  typename T::Context const* context,
600  ModuleCallingContext const* moduleCallingContext)
601  : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
602  if (a_)
603  T::preModuleSignal(a_, context, moduleCallingContext_);
604  }
605 
606  ~ModuleSignalSentry() {
607  if (a_)
608  T::postModuleSignal(a_, context_, moduleCallingContext_);
609  }
610 
611  private:
612  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
613  typename T::Context const* context_;
614  ModuleCallingContext const* moduleCallingContext_;
615  };
616 
617  } // namespace
618 
619  namespace workerhelper {
620  template <>
622  public:
624  static bool call(Worker* iWorker,
625  StreamID,
626  EventTransitionInfo const& info,
627  ActivityRegistry* /* actReg */,
628  ModuleCallingContext const* mcc,
629  Arg::Context const* /* context*/) {
630  //Signal sentry is handled by the module
631  return iWorker->implDo(info, mcc);
632  }
633  static void esPrefetchAsync(Worker* worker,
634  WaitingTaskHolder waitingTask,
635  ServiceToken const& token,
636  EventTransitionInfo const& info,
637  Transition transition) {
638  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
639  }
640  static bool wantsTransition(Worker const* iWorker) { return true; }
641  static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
642 
643  static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
644  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
645  };
646 
647  template <>
649  public:
651  static bool call(Worker* iWorker,
652  StreamID,
653  RunTransitionInfo const& info,
654  ActivityRegistry* actReg,
655  ModuleCallingContext const* mcc,
656  Arg::Context const* context) {
657  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
658  return iWorker->implDoBegin(info, mcc);
659  }
660  static void esPrefetchAsync(Worker* worker,
661  WaitingTaskHolder waitingTask,
662  ServiceToken const& token,
663  RunTransitionInfo const& info,
664  Transition transition) {
665  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
666  }
667  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
668  static bool needToRunSelection(Worker const* iWorker) { return false; }
669  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
670  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
671  };
672  template <>
674  public:
676  static bool call(Worker* iWorker,
677  StreamID id,
678  RunTransitionInfo const& info,
679  ActivityRegistry* actReg,
680  ModuleCallingContext const* mcc,
681  Arg::Context const* context) {
682  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
683  return iWorker->implDoStreamBegin(id, info, mcc);
684  }
685  static void esPrefetchAsync(Worker* worker,
686  WaitingTaskHolder waitingTask,
687  ServiceToken const& token,
688  RunTransitionInfo const& info,
689  Transition transition) {
690  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
691  }
692  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
693  static bool needToRunSelection(Worker const* iWorker) { return false; }
694  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
695  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
696  };
697  template <>
699  public:
701  static bool call(Worker* iWorker,
702  StreamID,
703  RunTransitionInfo const& info,
704  ActivityRegistry* actReg,
705  ModuleCallingContext const* mcc,
706  Arg::Context const* context) {
707  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
708  return iWorker->implDoEnd(info, mcc);
709  }
710  static void esPrefetchAsync(Worker* worker,
711  WaitingTaskHolder waitingTask,
712  ServiceToken const& token,
713  RunTransitionInfo const& info,
714  Transition transition) {
715  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
716  }
717  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
718  static bool needToRunSelection(Worker const* iWorker) { return false; }
719  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
720  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
721  };
722  template <>
724  public:
726  static bool call(Worker* iWorker,
727  StreamID id,
728  RunTransitionInfo const& info,
729  ActivityRegistry* actReg,
730  ModuleCallingContext const* mcc,
731  Arg::Context const* context) {
732  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
733  return iWorker->implDoStreamEnd(id, info, mcc);
734  }
735  static void esPrefetchAsync(Worker* worker,
736  WaitingTaskHolder waitingTask,
737  ServiceToken const& token,
738  RunTransitionInfo const& info,
739  Transition transition) {
740  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
741  }
742  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
743  static bool needToRunSelection(Worker const* iWorker) { return false; }
744  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
745  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
746  };
747 
748  template <>
750  public:
752  static bool call(Worker* iWorker,
753  StreamID,
754  LumiTransitionInfo const& info,
755  ActivityRegistry* actReg,
756  ModuleCallingContext const* mcc,
757  Arg::Context const* context) {
758  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
759  return iWorker->implDoBegin(info, mcc);
760  }
761  static void esPrefetchAsync(Worker* worker,
762  WaitingTaskHolder waitingTask,
763  ServiceToken const& token,
764  LumiTransitionInfo const& info,
765  Transition transition) {
766  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
767  }
768  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
769  static bool needToRunSelection(Worker const* iWorker) { return false; }
770  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
771  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
772  };
773  template <>
775  public:
777  static bool call(Worker* iWorker,
778  StreamID id,
779  LumiTransitionInfo const& info,
780  ActivityRegistry* actReg,
781  ModuleCallingContext const* mcc,
782  Arg::Context const* context) {
783  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
784  return iWorker->implDoStreamBegin(id, info, mcc);
785  }
786  static void esPrefetchAsync(Worker* worker,
787  WaitingTaskHolder waitingTask,
788  ServiceToken const& token,
789  LumiTransitionInfo const& info,
790  Transition transition) {
791  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
792  }
793  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
794  static bool needToRunSelection(Worker const* iWorker) { return false; }
795  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
796  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
797  };
798 
799  template <>
801  public:
803  static bool call(Worker* iWorker,
804  StreamID,
805  LumiTransitionInfo const& info,
806  ActivityRegistry* actReg,
807  ModuleCallingContext const* mcc,
808  Arg::Context const* context) {
809  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
810  return iWorker->implDoEnd(info, mcc);
811  }
812  static void esPrefetchAsync(Worker* worker,
813  WaitingTaskHolder waitingTask,
814  ServiceToken const& token,
815  LumiTransitionInfo const& info,
816  Transition transition) {
817  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
818  }
819  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
820  static bool needToRunSelection(Worker const* iWorker) { return false; }
821  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
822  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
823  };
824  template <>
826  public:
828  static bool call(Worker* iWorker,
829  StreamID id,
830  LumiTransitionInfo const& info,
831  ActivityRegistry* actReg,
832  ModuleCallingContext const* mcc,
833  Arg::Context const* context) {
834  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
835  return iWorker->implDoStreamEnd(id, info, mcc);
836  }
837  static void esPrefetchAsync(Worker* worker,
838  WaitingTaskHolder waitingTask,
839  ServiceToken const& token,
840  LumiTransitionInfo const& info,
841  Transition transition) {
842  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
843  }
844  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
845  static bool needToRunSelection(Worker const* iWorker) { return false; }
846  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
847  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
848  };
849  template <>
851  public:
853  static bool call(Worker* iWorker,
854  StreamID,
856  ActivityRegistry* actReg,
857  ModuleCallingContext const* mcc,
858  Arg::Context const* context) {
859  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
860  return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
861  }
862  static void esPrefetchAsync(
864  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
865  static bool needToRunSelection(Worker const* iWorker) { return false; }
866  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
867  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
868  };
869  template <>
871  public:
873  static bool call(Worker* iWorker,
874  StreamID,
876  ActivityRegistry* actReg,
877  ModuleCallingContext const* mcc,
878  Arg::Context const* context) {
879  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
880  return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
881  }
882  static void esPrefetchAsync(
884  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsInputProcessBlocks(); }
885  static bool needToRunSelection(Worker const* iWorker) { return false; }
886  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
887  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
888  };
889  template <>
891  public:
893  static bool call(Worker* iWorker,
894  StreamID,
896  ActivityRegistry* actReg,
897  ModuleCallingContext const* mcc,
898  Arg::Context const* context) {
899  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
900  return iWorker->implDoEndProcessBlock(info.principal(), mcc);
901  }
902  static void esPrefetchAsync(
904  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
905  static bool needToRunSelection(Worker const* iWorker) { return false; }
906  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
907  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
908  };
909  } // namespace workerhelper
910 
911  template <typename T>
913  ServiceToken const& token,
914  ParentContext const& parentContext,
915  typename T::TransitionInfoType const& transitionInfo,
916  Transition iTransition) {
917  Principal const& principal = transitionInfo.principal();
918 
920 
921  if (principal.branchType() == InEvent) {
922  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
923  }
924 
925  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
926  edPrefetchAsync(iTask, token, principal);
927 
928  if (principal.branchType() == InEvent) {
930  }
931  }
932 
933  template <typename T>
935  typename T::TransitionInfoType const& transitionInfo,
936  ServiceToken const& token,
937  StreamID streamID,
938  ParentContext const& parentContext,
939  typename T::Context const* context) {
941  return;
942  }
943 
944  //Need to check workStarted_ before adding to waitingTasks_
945  bool expected = false;
946  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
947 
949  if constexpr (T::isEvent_) {
950  timesVisited_.fetch_add(1, std::memory_order_relaxed);
951  }
952 
953  if (workStarted) {
955 
956  //if have TriggerResults based selection we want to reject the event before doing prefetching
958  //We need to run the selection in a different task so that
959  // we can prefetch the data needed for the selection
960  WaitingTask* moduleTask =
961  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
962 
963  //make sure the task is either run or destroyed
964  struct DestroyTask {
965  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
966 
967  ~DestroyTask() {
968  auto p = m_task.exchange(nullptr);
969  if (p) {
970  TaskSentry s{p};
971  }
972  }
973 
974  edm::WaitingTask* release() { return m_task.exchange(nullptr); }
975 
976  private:
977  std::atomic<edm::WaitingTask*> m_task;
978  };
979  if constexpr (T::isEvent_) {
980  if (hasAcquire()) {
981  auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
982  ServiceWeakToken weakToken = token;
983  auto* group = task.group();
984  moduleTask = make_waiting_task(
985  [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
986  WaitingTaskWithArenaHolder runTaskHolder(
987  *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
988  AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
989  t.execute();
990  });
991  }
992  }
993  auto* group = task.group();
994  auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
995  ServiceWeakToken weakToken = token;
996  auto selectionTask =
997  make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
998  std::exception_ptr const*) mutable {
999  ServiceRegistry::Operate guard(weakToken.lock());
1000  prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1001  weakToken.lock(),
1002  parentContext,
1003  info,
1004  T::transition_);
1005  });
1006  prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1007  } else {
1008  WaitingTask* moduleTask =
1009  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1010  auto group = task.group();
1011  if constexpr (T::isEvent_) {
1012  if (hasAcquire()) {
1013  WaitingTaskWithArenaHolder runTaskHolder(
1014  *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1015  moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1016  }
1017  }
1018  prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1019  }
1020  }
1021  }
1022 
1023  template <typename T>
1024  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
1025  typename T::TransitionInfoType const& transitionInfo,
1026  StreamID streamID,
1027  ParentContext const& parentContext,
1028  typename T::Context const* context) {
1029  std::exception_ptr exceptionPtr;
1030  if (iEPtr) {
1031  assert(*iEPtr);
1032  if (shouldRethrowException(*iEPtr, parentContext, T::isEvent_)) {
1033  exceptionPtr = *iEPtr;
1034  setException<T::isEvent_>(exceptionPtr);
1035  } else {
1036  setPassed<T::isEvent_>();
1037  }
1039  } else {
1040  // Caught exception is propagated via WaitingTaskList
1041  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1042  exceptionPtr = std::current_exception();
1043  }
1044  }
1045  waitingTasks_.doneWaiting(exceptionPtr);
1046  return exceptionPtr;
1047  }
1048 
1049  template <typename T>
1051  typename T::TransitionInfoType const& transitionInfo,
1052  ServiceToken const& serviceToken,
1053  StreamID streamID,
1054  ParentContext const& parentContext,
1055  typename T::Context const* context) {
1057  return;
1058  }
1059 
1060  //Need to check workStarted_ before adding to waitingTasks_
1061  bool expected = false;
1062  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1063 
1065  if (workStarted) {
1066  ServiceWeakToken weakToken = serviceToken;
1067  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1068  std::exception_ptr exceptionPtr;
1069  // Caught exception is propagated via WaitingTaskList
1070  CMS_SA_ALLOW try {
1071  //Need to make the services available
1072  ServiceRegistry::Operate guard(weakToken.lock());
1073 
1074  this->runModule<T>(info, streamID, parentContext, context);
1075  } catch (...) {
1076  exceptionPtr = std::current_exception();
1077  }
1078  this->waitingTasks_.doneWaiting(exceptionPtr);
1079  };
1080 
1081  if (needsESPrefetching(T::transition_)) {
1082  auto group = task.group();
1083  auto afterPrefetch =
1084  edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1085  if (iExcept) {
1086  this->waitingTasks_.doneWaiting(*iExcept);
1087  } else {
1088  if (auto queue = this->serializeRunModule()) {
1089  queue.push(*group, toDo);
1090  } else {
1091  group->run(toDo);
1092  }
1093  }
1094  });
1096  WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1097  } else {
1098  auto group = task.group();
1099  if (auto queue = this->serializeRunModule()) {
1100  queue.push(*group, toDo);
1101  } else {
1102  group->run(toDo);
1103  }
1104  }
1105  }
1106  }
1107 
1108  template <typename T>
1109  bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1110  StreamID streamID,
1111  ParentContext const& parentContext,
1112  typename T::Context const* context) {
1113  //unscheduled producers should advance this
1114  //if (T::isEvent_) {
1115  // ++timesVisited_;
1116  //}
1117  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1118  if constexpr (T::isEvent_) {
1119  timesRun_.fetch_add(1, std::memory_order_relaxed);
1120  }
1121 
1122  bool rc = true;
1123  try {
1124  convertException::wrap([&]() {
1126  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1127 
1128  if (rc) {
1129  setPassed<T::isEvent_>();
1130  } else {
1131  setFailed<T::isEvent_>();
1132  }
1133  });
1134  } catch (cms::Exception& ex) {
1136  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_)) {
1138  setException<T::isEvent_>(std::current_exception());
1139  std::rethrow_exception(cached_exception_);
1140  } else {
1141  rc = setPassed<T::isEvent_>();
1142  }
1143  }
1144 
1145  return rc;
1146  }
1147 
1148  template <typename T>
1149  std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1150  StreamID streamID,
1151  ParentContext const& parentContext,
1152  typename T::Context const* context) {
1153  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1154  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
1155  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1156  }
1157 } // namespace edm
1158 #endif
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1149
virtual bool hasAcquire() const =0
void clearModule()
Definition: Worker.h:122
std::atomic< int > timesVisited_
Definition: Worker.h:571
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
virtual bool wantsStreamRuns() const =0
ModuleDescription const * moduleDescription() const
void execute() final
Definition: Worker.h:406
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:239
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void push(oneapi::tbb::task_group &iG, F &&iF)
Definition: Worker.h:107
T::Context const * m_context
Definition: Worker.h:469
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:624
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:577
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
virtual bool hasAccumulator() const =0
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:329
#define DUMMY
Definition: DMRtrends.cc:34
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:752
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:237
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:492
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:651
void execute() final
Definition: Worker.h:486
virtual ~Worker()
Definition: Worker.cc:106
void endJob()
Definition: Worker.cc:263
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:761
ParentContext const m_parentContext
Definition: Worker.h:468
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:873
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
std::atomic< int > timesExcept_
Definition: Worker.h:574
oneapi::tbb::task_group * m_group
Definition: Worker.h:471
std::atomic< bool > workStarted_
Definition: Worker.h:589
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:700
std::atomic< int > timesFailed_
Definition: Worker.h:573
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:348
void addedToPath()
Definition: Worker.h:230
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:584
void clearCounters()
Definition: Worker.h:222
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:934
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.cc:367
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:101
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
virtual void implRespondToCloseOutputFile()=0
ActivityRegistry * activityRegistry()
Definition: Worker.h:281
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:676
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
Definition: Worker.h:140
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:280
bool moduleValid_
Definition: Worker.h:591
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
void reset()
Resets access to the resource so that added tasks will wait.
SerialTaskQueueChain * serial_
Definition: Worker.h:97
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:726
virtual std::string workerType() const =0
assert(be >=bs)
ExceptionToActionTable const * actions_
Definition: Worker.h:581
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
LimitedTaskQueue * limited_
Definition: Worker.h:98
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:590
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:108
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
oneapi::tbb::task_group * m_group
Definition: Worker.h:566
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:579
BranchType
Definition: BranchType.h:11
int timesPass() const
Definition: Worker.h:239
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.h:481
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:150
std::exception_ptr cached_exception_
Definition: Worker.h:582
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
bool setFailed()
Definition: Worker.h:320
virtual void selectInputProcessBlocks(ProductRegistry const &, ProcessBlockHelperBase const &)=0
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool resume()
Resumes processing if the queue was paused.
int timesPassed() const
Definition: Worker.h:234
int iEvent
Definition: GenABIO.cc:224
EnableQueueGuard(EnableQueueGuard &&iGuard)
Definition: Worker.h:398
Worker & operator=(Worker const &)=delete
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:588
T::TransitionInfoType m_transitionInfo
Definition: Worker.h:466
void reset()
Definition: Worker.h:178
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:735
ServiceWeakToken m_serviceToken
Definition: Worker.h:470
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual bool wantsInputProcessBlocks() const =0
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:837
virtual bool wantsProcessBlocks() const =0
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:902
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:650
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:110
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:660
int timesFailed() const
Definition: Worker.h:235
EnableQueueGuard(SerialTaskQueue *iQueue)
Definition: Worker.h:394
virtual std::vector< ConsumesInfo > consumesInfo() const =0
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:328
#define CMS_THREAD_GUARD(_var_)
Transition
Definition: Transition.h:12
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:89
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
std::atomic< State > state_
Definition: Worker.h:575
double f[11][100]
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:167
Definition: Types.py:1
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:199
int timesVisited() const
Definition: Worker.h:233
int numberOfPathsOn_
Definition: Worker.h:576
int timesExcept() const
Definition: Worker.h:236
bool setPassed()
Definition: Worker.h:311
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:701
virtual void implEndJob()=0
ServiceToken lock() const
Definition: ServiceToken.h:101
ConcurrencyTypes
Definition: Worker.h:95
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:406
virtual Types moduleType() const =0
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:853
ModuleDescription const * description() const
Definition: Worker.h:188
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1024
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:102
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:623
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1050
virtual bool implNeedToRunSelection() const =0
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual TaskQueueAdaptor serializeRunModule()=0
virtual SerialTaskQueue * globalRunsQueue()=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:675
virtual void implBeginJob()=0
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:173
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:224
void respondToCloseOutputFile()
Definition: Worker.h:175
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:337
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:777
virtual void implBeginStream(StreamID)=0
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
virtual void implEndStream(StreamID)=0
virtual void doClearModule()=0
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:174
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent() const =0
virtual bool wantsStreamLuminosityBlocks() const =0
virtual void convertCurrentProcessAlias(std::string const &processName)=0
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1109
HLT enums.
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:352
State state() const
Definition: Worker.h:237
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:786
StreamContext const * getStreamContext() const
double a
Definition: hdecay.h:119
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition)
Definition: Worker.h:633
std::atomic< int > timesPassed_
Definition: Worker.h:572
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:352
void beginJob()
Definition: Worker.cc:247
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:304
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:346
virtual bool wantsGlobalLuminosityBlocks() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:710
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:828
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:803
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:586
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:812
void prefetchAsync(WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
Definition: Worker.h:912
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
std::atomic< int > timesRun_
Definition: Worker.h:570
int timesRun() const
Definition: Worker.h:232
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:893
long double T
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:862
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:725
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:882
def move(src, dest)
Definition: eostools.py:511
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
EnableQueueGuard & operator=(EnableQueueGuard const &)=delete
edm::WaitingTaskList & waitingTaskList()
Definition: Worker.h:244
virtual ConcurrencyTypes moduleConcurrencyType() const =0
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:392
virtual bool wantsGlobalRuns() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:685
BranchType const & branchType() const
Definition: Principal.h:181
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context, oneapi::tbb::task_group *iGroup)
Definition: Worker.h:377
virtual void modulesWhoseProductsAreConsumed(std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const *> const &labelsToDesc) const =0
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0