CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
58 
60 
61 #include <array>
62 #include <atomic>
63 #include <cassert>
64 #include <map>
65 #include <memory>
66 #include <sstream>
67 #include <string>
68 #include <vector>
69 #include <exception>
70 #include <unordered_map>
71 
72 namespace edm {
73  class EventPrincipal;
74  class EventSetupImpl;
75  class EarlyDeleteHelper;
76  class ModuleProcessName;
77  class ProductResolverIndexHelper;
78  class ProductResolverIndexAndSkipBit;
79  class StreamID;
80  class StreamContext;
81  class ProductRegistry;
82  class ThinnedAssociationsHelper;
83 
84  namespace workerhelper {
85  template <typename O>
86  class CallImpl;
87  }
88  namespace eventsetup {
90  }
91 
92  class Worker {
93  public:
94  enum State { Ready, Pass, Fail, Exception };
100 
101  TaskQueueAdaptor() = default;
103  TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
104 
105  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
106 
107  template <class F>
108  void push(oneapi::tbb::task_group& iG, F&& iF) {
109  if (serial_) {
110  serial_->push(iG, iF);
111  } else {
112  limited_->push(iG, iF);
113  }
114  }
115  };
116 
117  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
118  virtual ~Worker();
119 
120  Worker(Worker const&) = delete; // Disallow copying and moving
121  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
122 
123  void clearModule() {
124  moduleValid_ = false;
125  doClearModule();
126  }
127 
128  virtual bool wantsProcessBlocks() const noexcept = 0;
129  virtual bool wantsInputProcessBlocks() const noexcept = 0;
130  virtual bool wantsGlobalRuns() const noexcept = 0;
131  virtual bool wantsGlobalLuminosityBlocks() const noexcept = 0;
132  virtual bool wantsStreamRuns() const noexcept = 0;
133  virtual bool wantsStreamLuminosityBlocks() const noexcept = 0;
134 
135  virtual SerialTaskQueue* globalRunsQueue() = 0;
137 
138  void prePrefetchSelectionAsync(oneapi::tbb::task_group&,
139  WaitingTask* task,
142  EventPrincipal const*) noexcept;
143 
145  oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) noexcept {
146  assert(false);
147  }
148 
149  template <typename T>
151  typename T::TransitionInfoType const&,
152  ServiceToken const&,
153  StreamID,
154  ParentContext const&,
155  typename T::Context const*) noexcept;
156 
157  template <typename T>
159  typename T::TransitionInfoType const&,
160  ServiceToken const&,
161  StreamID,
162  ParentContext const&,
163  typename T::Context const*) noexcept;
164 
165  template <typename T>
166  std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
167  StreamID,
168  ParentContext const&,
169  typename T::Context const*) noexcept;
170 
171  virtual size_t transformIndex(edm::BranchDescription const&) const noexcept = 0;
173  size_t iTransformIndex,
174  EventPrincipal const&,
175  ServiceToken const&,
176  StreamID,
177  ModuleCallingContext const&,
178  StreamContext const*) noexcept;
179 
181  void skipOnPath(EventPrincipal const& iEvent);
182  void beginJob();
183  void endJob();
184  void beginStream(StreamID id, StreamContext& streamContext);
185  void endStream(StreamID id, StreamContext& streamContext);
190 
191  void reset() {
192  cached_exception_ = std::exception_ptr();
193  state_ = Ready;
195  workStarted_ = false;
197  }
198 
199  void postDoEvent(EventPrincipal const&);
200 
201  ModuleDescription const* description() const noexcept {
202  if (moduleValid_) {
204  }
205  return nullptr;
206  }
209  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
210 
212 
213  //Used to make EDGetToken work
214  virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
216  virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
217  virtual void resolvePutIndicies(
218  BranchType iBranchType,
219  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
220  iIndicies) = 0;
221 
222  virtual void modulesWhoseProductsAreConsumed(
223  std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
224  std::vector<ModuleProcessName>& modulesInPreviousProcesses,
225  ProductRegistry const& preg,
226  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
227 
228  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
229 
230  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
231 
232  virtual Types moduleType() const = 0;
233  virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
234 
235  void clearCounters() noexcept {
236  timesRun_.store(0, std::memory_order_release);
237  timesVisited_.store(0, std::memory_order_release);
238  timesPassed_.store(0, std::memory_order_release);
239  timesFailed_.store(0, std::memory_order_release);
240  timesExcept_.store(0, std::memory_order_release);
241  }
242 
243  void addedToPath() noexcept { ++numberOfPathsOn_; }
244  //NOTE: calling state() is done to force synchronization across threads
245  int timesRun() const noexcept { return timesRun_.load(std::memory_order_acquire); }
246  int timesVisited() const noexcept { return timesVisited_.load(std::memory_order_acquire); }
247  int timesPassed() const noexcept { return timesPassed_.load(std::memory_order_acquire); }
248  int timesFailed() const noexcept { return timesFailed_.load(std::memory_order_acquire); }
249  int timesExcept() const noexcept { return timesExcept_.load(std::memory_order_acquire); }
250  State state() const noexcept { return state_; }
251 
252  int timesPass() const noexcept { return timesPassed(); } // for backward compatibility only - to be removed soon
253 
254  virtual bool hasAccumulator() const noexcept = 0;
255 
256  // Used in PuttableProductResolver
258 
259  protected:
260  template <typename O>
262 
263  virtual void doClearModule() = 0;
264 
265  virtual std::string workerType() const = 0;
266  virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
267 
268  virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
269  virtual bool implNeedToRunSelection() const noexcept = 0;
270 
274 
276  size_t iTransformIndex,
279  ServiceWeakToken const&) noexcept = 0;
280  virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const noexcept = 0;
281 
294  virtual void implBeginJob() = 0;
295  virtual void implEndJob() = 0;
296  virtual void implBeginStream(StreamID) = 0;
297  virtual void implEndStream(StreamID) = 0;
298 
300 
302 
303  private:
304  template <typename T>
305  bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
306 
307  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
308  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
309 
310  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
311 
312  virtual std::vector<ESResolverIndex> const& esItemsToGetFrom(Transition) const = 0;
313  virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
314 
316  ModuleCallingContext const& moduleCallingContext,
317  Principal const& iPrincipal) const noexcept = 0;
318 
319  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
320  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
321  virtual void implRespondToCloseOutputFile() = 0;
322 
324 
325  virtual TaskQueueAdaptor serializeRunModule() = 0;
326 
327  bool shouldRethrowException(std::exception_ptr iPtr,
328  ParentContext const& parentContext,
329  bool isEvent,
330  bool isTryToContinue) const noexcept;
332 
333  template <bool IS_EVENT>
334  bool setPassed() {
335  if (IS_EVENT) {
336  timesPassed_.fetch_add(1, std::memory_order_relaxed);
337  }
338  state_ = Pass;
339  return true;
340  }
341 
342  template <bool IS_EVENT>
343  bool setFailed() {
344  if (IS_EVENT) {
345  timesFailed_.fetch_add(1, std::memory_order_relaxed);
346  }
347  state_ = Fail;
348  return false;
349  }
350 
351  template <bool IS_EVENT>
352  std::exception_ptr setException(std::exception_ptr iException) {
353  if (IS_EVENT) {
354  timesExcept_.fetch_add(1, std::memory_order_relaxed);
355  }
356  cached_exception_ = iException; // propagate_const<T> has no reset() function
357  state_ = Exception;
358  return cached_exception_;
359  }
360 
361  template <typename T>
363  ServiceToken const&,
364  ParentContext const&,
365  typename T::TransitionInfoType const&,
366  Transition) noexcept;
367 
369  void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const noexcept;
370 
371  bool needsESPrefetching(Transition iTrans) const noexcept {
372  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
373  }
374 
376  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
377  }
378 
380  actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
382  }
383 
385  actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
387  }
388 
389  virtual bool hasAcquire() const noexcept = 0;
390 
391  template <typename T>
392  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
393  typename T::TransitionInfoType const&,
394  StreamID,
396  typename T::Context const*) noexcept;
397 
399 
400  void runAcquireAfterAsyncPrefetch(std::exception_ptr,
403  WaitingTaskWithArenaHolder) noexcept;
404 
405  std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
406  ParentContext const& parentContext) noexcept;
407 
408  template <typename T>
409  class RunModuleTask : public WaitingTask {
410  public:
412  typename T::TransitionInfoType const& transitionInfo,
413  ServiceToken const& token,
414  StreamID streamID,
415  ParentContext const& parentContext,
416  typename T::Context const* context,
417  oneapi::tbb::task_group* iGroup) noexcept
418  : m_worker(worker),
419  m_transitionInfo(transitionInfo),
420  m_streamID(streamID),
421  m_parentContext(parentContext),
422  m_context(context),
423  m_serviceToken(token),
424  m_group(iGroup) {}
425 
428  EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
429  EnableQueueGuard(EnableQueueGuard const&) = delete;
430  EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
431  EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
432  EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
434  if (queue_) {
435  queue_->resume();
436  }
437  }
438  };
439 
440  void execute() final {
441  //Need to make the services available early so other services can see them
442  ServiceRegistry::Operate guard(m_serviceToken.lock());
443 
444  //incase the emit causes an exception, we need a memory location
445  // to hold the exception_ptr
446  std::exception_ptr temp_excptr;
447  auto excptr = exceptionPtr();
448  if constexpr (T::isEvent_) {
449  if (!m_worker->hasAcquire()) {
450  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
451  CMS_SA_ALLOW try {
452  //pre was called in prefetchAsync
453  m_worker->emitPostModuleEventPrefetchingSignal();
454  } catch (...) {
455  temp_excptr = std::current_exception();
456  if (not excptr) {
457  excptr = temp_excptr;
458  }
459  }
460  }
461  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
462  m_worker->emitPostModuleStreamPrefetchingSignal();
463  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
464  m_worker->emitPostModuleGlobalPrefetchingSignal();
465  }
466 
467  if (not excptr) {
468  if (auto queue = m_worker->serializeRunModule()) {
469  auto f = [worker = m_worker,
470  info = m_transitionInfo,
471  streamID = m_streamID,
472  parentContext = m_parentContext,
473  sContext = m_context,
474  serviceToken = m_serviceToken]() {
475  //Need to make the services available
476  ServiceRegistry::Operate operateRunModule(serviceToken.lock());
477 
478  //If needed, we pause the queue in begin transition and resume it
479  // at the end transition. This can guarantee that the module
480  // only processes one run or lumi at a time
482  std::exception_ptr ptr;
483  worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
484  };
485  //keep another global transition from running if necessary
486  auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
487  if (gQueue) {
488  gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
489  gQueue->pause();
490  queue.push(*group, std::move(f));
491  });
492  } else {
493  queue.push(*m_group, std::move(f));
494  }
495  return;
496  }
497  }
498 
499  m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
500  }
501 
502  private:
504  typename T::TransitionInfoType m_transitionInfo;
507  typename T::Context const* m_context;
509  oneapi::tbb::task_group* m_group;
510  };
511 
512  // AcquireTask is only used for the Event case, but we define
513  // it as a template so all cases will compile.
514  // DUMMY exists to work around the C++ Standard prohibition on
515  // fully specializing templates nested in other classes.
516  template <typename T, typename DUMMY = void>
517  class AcquireTask : public WaitingTask {
518  public:
520  typename T::TransitionInfoType const&,
521  ServiceToken const&,
522  ParentContext const&,
523  WaitingTaskWithArenaHolder) noexcept {}
524  void execute() final {}
525  };
526 
527  template <typename DUMMY>
529  public:
531  EventTransitionInfo const& eventTransitionInfo,
532  ServiceToken const& token,
533  ParentContext const& parentContext,
534  WaitingTaskWithArenaHolder holder) noexcept
535  : m_worker(worker),
536  m_eventTransitionInfo(eventTransitionInfo),
537  m_parentContext(parentContext),
538  m_holder(std::move(holder)),
539  m_serviceToken(token) {}
540 
541  void execute() final {
542  //Need to make the services available early so other services can see them
543  ServiceRegistry::Operate guard(m_serviceToken.lock());
544 
545  //incase the emit causes an exception, we need a memory location
546  // to hold the exception_ptr
547  std::exception_ptr temp_excptr;
548  auto excptr = exceptionPtr();
549  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
550  CMS_SA_ALLOW try {
551  //pre was called in prefetchAsync
552  m_worker->emitPostModuleEventPrefetchingSignal();
553  } catch (...) {
554  temp_excptr = std::current_exception();
555  if (not excptr) {
556  excptr = temp_excptr;
557  }
558  }
559 
560  if (not excptr) {
561  if (auto queue = m_worker->serializeRunModule()) {
562  queue.push(*m_holder.group(),
563  [worker = m_worker,
564  info = m_eventTransitionInfo,
565  parentContext = m_parentContext,
566  serviceToken = m_serviceToken,
567  holder = m_holder]() {
568  //Need to make the services available
569  ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
570 
571  std::exception_ptr ptr;
572  worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
573  });
574  return;
575  }
576  }
577 
578  m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
579  }
580 
581  private:
587  };
588 
589  // This class does nothing unless there is an exception originating
590  // in an "External Worker". In that case, it handles converting the
591  // exception to a CMS exception and adding context to the exception.
593  public:
595  oneapi::tbb::task_group* group,
596  WaitingTask* runModuleTask,
597  ParentContext const& parentContext) noexcept;
598 
599  void execute() final;
600 
601  private:
604  oneapi::tbb::task_group* m_group;
606  };
607 
608  std::atomic<int> timesRun_;
609  std::atomic<int> timesVisited_;
610  std::atomic<int> timesPassed_;
611  std::atomic<int> timesFailed_;
612  std::atomic<int> timesExcept_;
613  std::atomic<State> state_;
615  std::atomic<int> numberOfPathsLeftToRun_;
616 
618 
619  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
620  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
621 
622  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
623 
625 
627  std::atomic<bool> workStarted_;
629  bool moduleValid_ = true;
630  bool shouldTryToContinue_ = false;
631  bool beginSucceeded_ = false;
632  };
633 
634  namespace {
635  template <typename T>
636  class ModuleSignalSentry {
637  public:
638  ModuleSignalSentry(ActivityRegistry* a,
639  typename T::Context const* context,
640  ModuleCallingContext const* moduleCallingContext)
641  : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {}
642 
643  ~ModuleSignalSentry() {
644  // This destructor does nothing unless we are unwinding the
645  // the stack from an earlier exception (a_ will be null if we are
646  // are not). We want to report the earlier exception and ignore any
647  // addition exceptions from the post module signal.
648  CMS_SA_ALLOW try {
649  if (a_) {
650  T::postModuleSignal(a_, context_, moduleCallingContext_);
651  }
652  } catch (...) {
653  }
654  }
655  void preModuleSignal() {
656  if (a_) {
657  try {
658  convertException::wrap([this]() { T::preModuleSignal(a_, context_, moduleCallingContext_); });
659  } catch (cms::Exception& ex) {
660  ex.addContext("Handling pre module signal, likely in a service function immediately before module method");
661  throw;
662  }
663  }
664  }
665  void postModuleSignal() {
666  if (a_) {
667  auto temp = a_;
668  // Setting a_ to null informs the destructor that the signal
669  // was already run and that it should do nothing.
670  a_ = nullptr;
671  try {
672  convertException::wrap([this, temp]() { T::postModuleSignal(temp, context_, moduleCallingContext_); });
673  } catch (cms::Exception& ex) {
674  ex.addContext("Handling post module signal, likely in a service function immediately after module method");
675  throw;
676  }
677  }
678  }
679 
680  private:
681  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
682  typename T::Context const* context_;
683  ModuleCallingContext const* moduleCallingContext_;
684  };
685 
686  } // namespace
687 
688  namespace workerhelper {
689  template <>
691  public:
693  static bool call(Worker* iWorker,
694  StreamID,
695  EventTransitionInfo const& info,
696  ActivityRegistry* /* actReg */,
697  ModuleCallingContext const* mcc,
698  Arg::Context const* /* context*/) {
699  //Signal sentry is handled by the module
700  return iWorker->implDo(info, mcc);
701  }
702  static void esPrefetchAsync(Worker* worker,
703  WaitingTaskHolder waitingTask,
704  ServiceToken const& token,
705  EventTransitionInfo const& info,
706  Transition transition) noexcept {
707  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
708  }
709  static bool wantsTransition(Worker const* iWorker) noexcept { return true; }
710  static bool needToRunSelection(Worker const* iWorker) noexcept { return iWorker->implNeedToRunSelection(); }
711 
712  static SerialTaskQueue* pauseGlobalQueue(Worker*) noexcept { return nullptr; }
713  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
714  };
715 
716  template <>
718  public:
720  static bool call(Worker* iWorker,
721  StreamID,
722  RunTransitionInfo const& info,
723  ActivityRegistry* actReg,
724  ModuleCallingContext const* mcc,
725  Arg::Context const* context) {
726  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
727  // If preModuleSignal() throws, implDoBegin() is not called, and the
728  // cpp destructor calls postModuleSignal (ignoring additional exceptions)
729  cpp.preModuleSignal();
730  // If implDoBegin() throws, the cpp destructor calls postModuleSignal
731  // (ignoring additional exceptions)
732  auto returnValue = iWorker->implDoBegin(info, mcc);
733  // If postModuleSignal() throws, the exception will propagate to the framework
734  cpp.postModuleSignal();
735  iWorker->beginSucceeded_ = true;
736  return returnValue;
737  }
738  static void esPrefetchAsync(Worker* worker,
739  WaitingTaskHolder waitingTask,
740  ServiceToken const& token,
741  RunTransitionInfo const& info,
742  Transition transition) noexcept {
743  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
744  }
745  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
746  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
747  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
748  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
749  };
750  template <>
752  public:
754  static bool call(Worker* iWorker,
755  StreamID id,
756  RunTransitionInfo const& info,
757  ActivityRegistry* actReg,
758  ModuleCallingContext const* mcc,
759  Arg::Context const* context) {
760  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
761  cpp.preModuleSignal();
762  auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
763  cpp.postModuleSignal();
764  iWorker->beginSucceeded_ = true;
765  return returnValue;
766  }
767  static void esPrefetchAsync(Worker* worker,
768  WaitingTaskHolder waitingTask,
769  ServiceToken const& token,
770  RunTransitionInfo const& info,
771  Transition transition) noexcept {
772  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
773  }
774  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
775  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
776  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
777  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
778  };
779  template <>
781  public:
783  static bool call(Worker* iWorker,
784  StreamID,
785  RunTransitionInfo const& info,
786  ActivityRegistry* actReg,
787  ModuleCallingContext const* mcc,
788  Arg::Context const* context) {
789  if (iWorker->beginSucceeded_) {
790  iWorker->beginSucceeded_ = false;
791 
792  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
793  cpp.preModuleSignal();
794  auto returnValue = iWorker->implDoEnd(info, mcc);
795  cpp.postModuleSignal();
796  return returnValue;
797  }
798  return true;
799  }
800  static void esPrefetchAsync(Worker* worker,
801  WaitingTaskHolder waitingTask,
802  ServiceToken const& token,
803  RunTransitionInfo const& info,
804  Transition transition) noexcept {
805  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
806  }
807  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
808  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
809  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
810  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
811  };
812  template <>
814  public:
816  static bool call(Worker* iWorker,
817  StreamID id,
818  RunTransitionInfo const& info,
819  ActivityRegistry* actReg,
820  ModuleCallingContext const* mcc,
821  Arg::Context const* context) {
822  if (iWorker->beginSucceeded_) {
823  iWorker->beginSucceeded_ = false;
824 
825  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
826  cpp.preModuleSignal();
827  auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
828  cpp.postModuleSignal();
829  return returnValue;
830  }
831  return true;
832  }
833  static void esPrefetchAsync(Worker* worker,
834  WaitingTaskHolder waitingTask,
835  ServiceToken const& token,
836  RunTransitionInfo const& info,
837  Transition transition) noexcept {
838  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
839  }
840  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
841  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
842  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
843  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
844  };
845 
846  template <>
848  public:
850  static bool call(Worker* iWorker,
851  StreamID,
852  LumiTransitionInfo const& info,
853  ActivityRegistry* actReg,
854  ModuleCallingContext const* mcc,
855  Arg::Context const* context) {
856  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
857  cpp.preModuleSignal();
858  auto returnValue = iWorker->implDoBegin(info, mcc);
859  cpp.postModuleSignal();
860  iWorker->beginSucceeded_ = true;
861  return returnValue;
862  }
863  static void esPrefetchAsync(Worker* worker,
864  WaitingTaskHolder waitingTask,
865  ServiceToken const& token,
866  LumiTransitionInfo const& info,
867  Transition transition) noexcept {
868  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
869  }
870  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
871  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
872  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept {
873  return iWorker->globalLuminosityBlocksQueue();
874  }
875  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
876  };
877  template <>
879  public:
881  static bool call(Worker* iWorker,
882  StreamID id,
883  LumiTransitionInfo const& info,
884  ActivityRegistry* actReg,
885  ModuleCallingContext const* mcc,
886  Arg::Context const* context) {
887  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
888  cpp.preModuleSignal();
889  auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
890  cpp.postModuleSignal();
891  iWorker->beginSucceeded_ = true;
892  return returnValue;
893  }
894  static void esPrefetchAsync(Worker* worker,
895  WaitingTaskHolder waitingTask,
896  ServiceToken const& token,
897  LumiTransitionInfo const& info,
898  Transition transition) noexcept {
899  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
900  }
901  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
902  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
903  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
904  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
905  };
906 
907  template <>
909  public:
911  static bool call(Worker* iWorker,
912  StreamID,
913  LumiTransitionInfo const& info,
914  ActivityRegistry* actReg,
915  ModuleCallingContext const* mcc,
916  Arg::Context const* context) {
917  if (iWorker->beginSucceeded_) {
918  iWorker->beginSucceeded_ = false;
919 
920  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
921  cpp.preModuleSignal();
922  auto returnValue = iWorker->implDoEnd(info, mcc);
923  cpp.postModuleSignal();
924  return returnValue;
925  }
926  return true;
927  }
928  static void esPrefetchAsync(Worker* worker,
929  WaitingTaskHolder waitingTask,
930  ServiceToken const& token,
931  LumiTransitionInfo const& info,
932  Transition transition) noexcept {
933  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
934  }
935  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
936  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
937  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
938  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept {
939  return iWorker->globalLuminosityBlocksQueue();
940  }
941  };
942  template <>
944  public:
946  static bool call(Worker* iWorker,
947  StreamID id,
948  LumiTransitionInfo const& info,
949  ActivityRegistry* actReg,
950  ModuleCallingContext const* mcc,
951  Arg::Context const* context) {
952  if (iWorker->beginSucceeded_) {
953  iWorker->beginSucceeded_ = false;
954 
955  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
956  cpp.preModuleSignal();
957  auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
958  cpp.postModuleSignal();
959  return returnValue;
960  }
961  return true;
962  }
963  static void esPrefetchAsync(Worker* worker,
964  WaitingTaskHolder waitingTask,
965  ServiceToken const& token,
966  LumiTransitionInfo const& info,
967  Transition transition) noexcept {
968  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
969  }
970  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
971  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
972  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
973  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
974  };
975  template <>
977  public:
979  static bool call(Worker* iWorker,
980  StreamID,
982  ActivityRegistry* actReg,
983  ModuleCallingContext const* mcc,
984  Arg::Context const* context) {
985  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
986  cpp.preModuleSignal();
987  auto returnValue = iWorker->implDoBeginProcessBlock(info.principal(), mcc);
988  cpp.postModuleSignal();
989  return returnValue;
990  }
991  static void esPrefetchAsync(
993  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
994  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
995  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
996  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
997  };
998  template <>
1000  public:
1002  static bool call(Worker* iWorker,
1003  StreamID,
1005  ActivityRegistry* actReg,
1006  ModuleCallingContext const* mcc,
1007  Arg::Context const* context) {
1008  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
1009  cpp.preModuleSignal();
1010  auto returnValue = iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
1011  cpp.postModuleSignal();
1012  return returnValue;
1013  }
1014  static void esPrefetchAsync(
1016  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsInputProcessBlocks(); }
1017  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1018  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1019  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1020  };
1021  template <>
1023  public:
1025  static bool call(Worker* iWorker,
1026  StreamID,
1028  ActivityRegistry* actReg,
1029  ModuleCallingContext const* mcc,
1030  Arg::Context const* context) {
1031  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
1032  cpp.preModuleSignal();
1033  auto returnValue = iWorker->implDoEndProcessBlock(info.principal(), mcc);
1034  cpp.postModuleSignal();
1035  return returnValue;
1036  }
1037  static void esPrefetchAsync(
1039  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
1040  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1041  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1042  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1043  };
1044  } // namespace workerhelper
1045 
1046  template <typename T>
1048  ServiceToken const& token,
1049  ParentContext const& parentContext,
1050  typename T::TransitionInfoType const& transitionInfo,
1051  Transition iTransition) noexcept {
1052  Principal const& principal = transitionInfo.principal();
1053 
1054  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1055 
1056  if constexpr (T::isEvent_) {
1057  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1058  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
1059  actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1060  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
1061  actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
1062  }
1063 
1064  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
1065  edPrefetchAsync(iTask, token, principal);
1066 
1067  if (principal.branchType() == InEvent) {
1068  preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
1069  }
1070  }
1071 
1072  template <typename T>
1074  typename T::TransitionInfoType const& transitionInfo,
1075  ServiceToken const& token,
1076  StreamID streamID,
1077  ParentContext const& parentContext,
1078  typename T::Context const* context) noexcept {
1080  return;
1081  }
1082 
1083  //Need to check workStarted_ before adding to waitingTasks_
1084  bool expected = false;
1085  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
1086 
1087  waitingTasks_.add(task);
1088  if constexpr (T::isEvent_) {
1089  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1090  }
1091 
1092  if (workStarted) {
1093  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1094 
1095  //if have TriggerResults based selection we want to reject the event before doing prefetching
1097  //We need to run the selection in a different task so that
1098  // we can prefetch the data needed for the selection
1099  WaitingTask* moduleTask =
1100  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1101 
1102  //make sure the task is either run or destroyed
1103  struct DestroyTask {
1104  DestroyTask(edm::WaitingTask* iTask) noexcept : m_task(iTask) {}
1105 
1106  ~DestroyTask() noexcept {
1107  auto p = m_task.exchange(nullptr);
1108  if (p) {
1109  TaskSentry s{p};
1110  }
1111  }
1112 
1113  edm::WaitingTask* release() noexcept { return m_task.exchange(nullptr); }
1114 
1115  private:
1116  std::atomic<edm::WaitingTask*> m_task;
1117  };
1118  if constexpr (T::isEvent_) {
1119  if (hasAcquire()) {
1120  auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1121  ServiceWeakToken weakToken = token;
1122  auto* group = task.group();
1123  moduleTask = make_waiting_task(
1124  [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1125  WaitingTaskWithArenaHolder runTaskHolder(
1126  *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1127  AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1128  t.execute();
1129  });
1130  }
1131  }
1132  auto* group = task.group();
1133  auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1134  ServiceWeakToken weakToken = token;
1135  auto selectionTask =
1136  make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1137  std::exception_ptr const*) mutable {
1138  ServiceRegistry::Operate guard(weakToken.lock());
1139  prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1140  weakToken.lock(),
1141  parentContext,
1142  info,
1143  T::transition_);
1144  });
1145  prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1146  } else {
1147  WaitingTask* moduleTask =
1148  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1149  auto group = task.group();
1150  if constexpr (T::isEvent_) {
1151  if (hasAcquire()) {
1152  WaitingTaskWithArenaHolder runTaskHolder(
1153  *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1154  moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1155  }
1156  }
1157  prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1158  }
1159  }
1160  }
1161 
1162  template <typename T>
1163  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1164  typename T::TransitionInfoType const& transitionInfo,
1165  StreamID streamID,
1166  ParentContext const& parentContext,
1167  typename T::Context const* context) noexcept {
1168  std::exception_ptr exceptionPtr;
1169  bool shouldRun = true;
1170  if (iEPtr) {
1171  if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1172  exceptionPtr = iEPtr;
1173  setException<T::isEvent_>(exceptionPtr);
1174  shouldRun = false;
1175  } else {
1176  if (not shouldTryToContinue_) {
1177  setPassed<T::isEvent_>();
1178  shouldRun = false;
1179  }
1180  }
1181  }
1182  if (shouldRun) {
1183  // Caught exception is propagated via WaitingTaskList
1184  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1185  exceptionPtr = std::current_exception();
1186  }
1187  } else {
1188  moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1189  }
1190  waitingTasks_.doneWaiting(exceptionPtr);
1191  return exceptionPtr;
1192  }
1193 
1194  template <typename T>
1196  typename T::TransitionInfoType const& transitionInfo,
1197  ServiceToken const& serviceToken,
1198  StreamID streamID,
1199  ParentContext const& parentContext,
1200  typename T::Context const* context) noexcept {
1202  return;
1203  }
1204 
1205  //Need to check workStarted_ before adding to waitingTasks_
1206  bool expected = false;
1207  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1208 
1209  waitingTasks_.add(task);
1210  if (workStarted) {
1211  ServiceWeakToken weakToken = serviceToken;
1212  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1213  std::exception_ptr exceptionPtr;
1214  // Caught exception is propagated via WaitingTaskList
1215  CMS_SA_ALLOW try {
1216  //Need to make the services available
1217  ServiceRegistry::Operate guard(weakToken.lock());
1218 
1219  this->runModule<T>(info, streamID, parentContext, context);
1220  } catch (...) {
1221  exceptionPtr = std::current_exception();
1222  }
1223  this->waitingTasks_.doneWaiting(exceptionPtr);
1224  };
1225 
1226  if (needsESPrefetching(T::transition_)) {
1227  auto group = task.group();
1228  auto afterPrefetch =
1229  edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1230  if (iExcept) {
1231  this->waitingTasks_.doneWaiting(*iExcept);
1232  } else {
1233  if (auto queue = this->serializeRunModule()) {
1234  queue.push(*group, toDo);
1235  } else {
1236  group->run(toDo);
1237  }
1238  }
1239  });
1240  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1241  esPrefetchAsync(
1242  WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1243  } else {
1244  auto group = task.group();
1245  if (auto queue = this->serializeRunModule()) {
1246  queue.push(*group, toDo);
1247  } else {
1248  group->run(toDo);
1249  }
1250  }
1251  }
1252  }
1253 
1254  template <typename T>
1255  bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1256  StreamID streamID,
1257  ParentContext const& parentContext,
1258  typename T::Context const* context) {
1259  //unscheduled producers should advance this
1260  //if (T::isEvent_) {
1261  // ++timesVisited_;
1262  //}
1263  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1264  if constexpr (T::isEvent_) {
1265  timesRun_.fetch_add(1, std::memory_order_relaxed);
1266  }
1267 
1268  bool rc = true;
1269  try {
1270  convertException::wrap([&]() {
1272  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1273 
1274  if (rc) {
1275  setPassed<T::isEvent_>();
1276  } else {
1277  setFailed<T::isEvent_>();
1278  }
1279  });
1280  } catch (cms::Exception& ex) {
1282  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1284  setException<T::isEvent_>(std::current_exception());
1285  std::rethrow_exception(cached_exception_);
1286  } else {
1287  rc = setPassed<T::isEvent_>();
1288  }
1289  }
1290 
1291  return rc;
1292  }
1293 
1294  template <typename T>
1295  std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1296  StreamID streamID,
1297  ParentContext const& parentContext,
1298  typename T::Context const* context) noexcept {
1299  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1300  std::exception_ptr prefetchingException; // null because there was no prefetching to do
1301  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1302  }
1303 } // namespace edm
1304 #endif
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:738
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const noexcept
Definition: Worker.cc:120
void clearModule()
Definition: Worker.h:123
std::atomic< int > timesVisited_
Definition: Worker.h:609
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
Definition: Worker.h:809
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
void execute() final
Definition: Worker.h:440
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:289
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void push(oneapi::tbb::task_group &iG, F &&iF)
Definition: Worker.h:108
T::Context const * m_context
Definition: Worker.h:507
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition) noexcept
Definition: Worker.h:1014
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:693
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:615
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:352
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:850
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
Definition: Worker.h:1195
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:259
int timesPassed() const noexcept
Definition: Worker.h:247
std::exception_ptr exceptionPtr() const noexcept
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:720
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder) noexcept
Definition: Worker.h:530
void execute() final
Definition: Worker.h:524
int timesVisited() const noexcept
Definition: Worker.h:246
int timesFailed() const noexcept
Definition: Worker.h:248
virtual ~Worker()
Definition: Worker.cc:109
void endJob()
Definition: Worker.cc:316
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition) noexcept
Definition: Worker.h:991
ParentContext const m_parentContext
Definition: Worker.h:506
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:1002
std::atomic< int > timesExcept_
Definition: Worker.h:612
oneapi::tbb::task_group * m_group
Definition: Worker.h:509
std::atomic< bool > workStarted_
Definition: Worker.h:627
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:782
std::atomic< int > timesFailed_
Definition: Worker.h:611
void emitPostModuleStreamPrefetchingSignal()
Definition: Worker.h:379
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:371
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *) noexcept
Definition: Worker.cc:262
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:622
void clearCounters() noexcept
Definition: Worker.h:235
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
Definition: Worker.h:776
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
int timesRun() const noexcept
Definition: Worker.h:245
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:102
virtual bool hasAcquire() const noexcept=0
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
virtual void implRespondToCloseOutputFile()=0
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const noexcept=0
ActivityRegistry * activityRegistry()
Definition: Worker.h:301
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:754
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:333
bool moduleValid_
Definition: Worker.h:629
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
void reset()
Resets access to the resource so that added tasks will wait.
SerialTaskQueueChain * serial_
Definition: Worker.h:98
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:816
virtual std::string workerType() const =0
ModuleDescription const * moduleDescription() const noexcept
assert(be >=bs)
ExceptionToActionTable const * actions_
Definition: Worker.h:619
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder) noexcept
Definition: Worker.cc:420
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const noexcept=0
LimitedTaskQueue * limited_
Definition: Worker.h:99
bool ranAcquireWithoutException_
Definition: Worker.h:628
virtual bool hasAccumulator() const noexcept=0
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:111
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
oneapi::tbb::task_group * m_group
Definition: Worker.h:604
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:617
BranchType
Definition: BranchType.h:11
virtual bool wantsInputProcessBlocks() const noexcept=0
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const noexcept
Definition: Worker.cc:244
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext) noexcept
Definition: Worker.cc:444
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &) noexcept=0
std::exception_ptr cached_exception_
Definition: Worker.h:620
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
void emitPostModuleGlobalPrefetchingSignal()
Definition: Worker.h:384
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
bool setFailed()
Definition: Worker.h:343
virtual void selectInputProcessBlocks(ProductRegistry const &, ProcessBlockHelperBase const &)=0
bool resume()
Resumes processing if the queue was paused.
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
Definition: Worker.h:1295
int iEvent
Definition: GenABIO.cc:224
virtual bool wantsProcessBlocks() const noexcept=0
EnableQueueGuard(EnableQueueGuard &&iGuard)
Definition: Worker.h:432
Worker & operator=(Worker const &)=delete
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
Definition: Worker.h:842
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:626
T::TransitionInfoType m_transitionInfo
Definition: Worker.h:504
void reset()
Definition: Worker.h:191
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
ServiceWeakToken m_serviceToken
Definition: Worker.h:508
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
Definition: Worker.h:1163
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:719
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
EnableQueueGuard(SerialTaskQueue *iQueue)
Definition: Worker.h:428
virtual std::vector< ConsumesInfo > consumesInfo() const =0
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
GlobalContext const * getGlobalContext() const noexcept(false)
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:381
virtual bool implNeedToRunSelection() const noexcept=0
#define CMS_THREAD_GUARD(_var_)
int timesExcept() const noexcept
Definition: Worker.h:249
Transition
Definition: Transition.h:12
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:90
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< State > state_
Definition: Worker.h:613
double f[11][100]
ModuleDescription const * description() const noexcept
Definition: Worker.h:201
def template(fileName, svg, replaceme="REPLACEME")
Definition: svgfig.py:521
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:180
Definition: Types.py:1
int numberOfPathsOn_
Definition: Worker.h:614
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom(Transition) const =0
void prefetchAsync(WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition) noexcept
Definition: Worker.h:1047
bool setPassed()
Definition: Worker.h:334
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:783
virtual void implEndJob()=0
virtual bool wantsGlobalLuminosityBlocks() const noexcept=0
virtual bool wantsStreamRuns() const noexcept=0
ServiceToken lock() const
Definition: ServiceToken.h:101
ConcurrencyTypes
Definition: Worker.h:96
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context, oneapi::tbb::task_group *iGroup) noexcept
Definition: Worker.h:411
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:702
virtual Types moduleType() const =0
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:979
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:103
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:692
StreamContext const * getStreamContext() const noexcept(false)
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:833
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual TaskQueueAdaptor serializeRunModule()=0
virtual SerialTaskQueue * globalRunsQueue()=0
virtual bool wantsStreamLuminosityBlocks() const noexcept=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:753
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext) noexcept
Definition: Worker.cc:458
virtual void implBeginJob()=0
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition) noexcept
Definition: Worker.h:1037
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:186
bool beginSucceeded_
Definition: Worker.h:631
#define DUMMY
void respondToCloseOutputFile()
Definition: Worker.h:188
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *) noexcept
Definition: Worker.cc:155
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:390
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:881
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual void implBeginStream(StreamID)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:963
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker) noexcept
Definition: Worker.h:810
void addedToPath() noexcept
Definition: Worker.h:243
virtual void implEndStream(StreamID)=0
virtual void doClearModule()=0
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:187
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:800
virtual void convertCurrentProcessAlias(std::string const &processName)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:928
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1255
HLT enums.
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:375
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder) noexcept
Definition: Worker.h:519
edm::WaitingTaskList & waitingTaskList() noexcept
Definition: Worker.h:257
void checkForShouldTryToContinue(ModuleDescription const &)
Definition: Worker.cc:113
double a
Definition: hdecay.h:121
std::atomic< int > timesPassed_
Definition: Worker.h:610
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
Definition: Worker.h:1073
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:405
void beginJob()
Definition: Worker.cc:300
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:357
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:399
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:894
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:863
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:946
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:911
int timesPass() const noexcept
Definition: Worker.h:252
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &) noexcept
Definition: Worker.cc:219
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:624
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
std::atomic< int > timesRun_
Definition: Worker.h:608
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:1025
long double T
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
Definition: Worker.h:747
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:815
State state() const noexcept
Definition: Worker.h:250
virtual size_t transformIndex(edm::BranchDescription const &) const noexcept=0
Definition: Worker.cc:261
def move(src, dest)
Definition: eostools.py:511
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
virtual ConcurrencyTypes moduleConcurrencyType() const =0
virtual bool wantsGlobalRuns() const noexcept=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:767
BranchType const & branchType() const
Definition: Principal.h:175
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
bool shouldTryToContinue_
Definition: Worker.h:630
virtual void modulesWhoseProductsAreConsumed(std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const *> const &labelsToDesc) const =0
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0