CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
59 
61 
62 #include <array>
63 #include <atomic>
64 #include <cassert>
65 #include <map>
66 #include <memory>
67 #include <sstream>
68 #include <string>
69 #include <vector>
70 #include <exception>
71 #include <unordered_map>
72 
73 namespace edm {
74  class EventPrincipal;
75  class EventSetupImpl;
76  class EarlyDeleteHelper;
77  class ModuleProcessName;
78  class ProductResolverIndexHelper;
79  class ProductResolverIndexAndSkipBit;
80  class ProductRegistry;
81  class ThinnedAssociationsHelper;
82 
83  namespace workerhelper {
84  template <typename O>
85  class CallImpl;
86  }
87  namespace eventsetup {
89  }
90 
91  class Worker {
92  public:
93  enum State { Ready, Pass, Fail, Exception };
99 
100  TaskQueueAdaptor() = default;
102  TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
103 
104  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
105 
106  template <class F>
107  void push(oneapi::tbb::task_group& iG, F&& iF) {
108  if (serial_) {
109  serial_->push(iG, iF);
110  } else {
111  limited_->push(iG, iF);
112  }
113  }
114  };
115 
116  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
117  virtual ~Worker();
118 
119  Worker(Worker const&) = delete; // Disallow copying and moving
120  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
121 
122  void clearModule() {
123  moduleValid_ = false;
124  doClearModule();
125  }
126 
127  virtual bool wantsProcessBlocks() const noexcept = 0;
128  virtual bool wantsInputProcessBlocks() const noexcept = 0;
129  virtual bool wantsGlobalRuns() const noexcept = 0;
130  virtual bool wantsGlobalLuminosityBlocks() const noexcept = 0;
131  virtual bool wantsStreamRuns() const noexcept = 0;
132  virtual bool wantsStreamLuminosityBlocks() const noexcept = 0;
133 
134  virtual SerialTaskQueue* globalRunsQueue() = 0;
136 
137  void prePrefetchSelectionAsync(oneapi::tbb::task_group&,
138  WaitingTask* task,
141  EventPrincipal const*) noexcept;
142 
144  oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) noexcept {
145  assert(false);
146  }
147 
148  template <typename T>
150  typename T::TransitionInfoType const&,
151  ServiceToken const&,
152  StreamID,
153  ParentContext const&,
154  typename T::Context const*) noexcept;
155 
156  template <typename T>
158  typename T::TransitionInfoType const&,
159  ServiceToken const&,
160  StreamID,
161  ParentContext const&,
162  typename T::Context const*) noexcept;
163 
164  template <typename T>
165  std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
166  StreamID,
167  ParentContext const&,
168  typename T::Context const*) noexcept;
169 
170  virtual size_t transformIndex(edm::BranchDescription const&) const noexcept = 0;
172  size_t iTransformIndex,
173  EventPrincipal const&,
174  ServiceToken const&,
175  StreamID,
176  ModuleCallingContext const&,
177  StreamContext const*) noexcept;
178 
180  void skipOnPath(EventPrincipal const& iEvent);
181  void beginJob(GlobalContext const&);
182  void endJob(GlobalContext const&);
183  void beginStream(StreamID, StreamContext const&);
184  void endStream(StreamID, StreamContext const&);
189 
190  void reset() {
191  cached_exception_ = std::exception_ptr();
192  state_ = Ready;
194  workStarted_ = false;
196  }
197 
198  void postDoEvent(EventPrincipal const&);
199 
200  ModuleDescription const* description() const noexcept {
201  if (moduleValid_) {
203  }
204  return nullptr;
205  }
208  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
209 
211 
212  //Used to make EDGetToken work
213  virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
215  virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
216  virtual void resolvePutIndicies(
217  BranchType iBranchType,
218  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
219  iIndicies) = 0;
220 
221  virtual void modulesWhoseProductsAreConsumed(
222  std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
223  std::vector<ModuleProcessName>& modulesInPreviousProcesses,
224  ProductRegistry const& preg,
225  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
226 
227  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
228 
229  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
230 
231  virtual Types moduleType() const = 0;
232  virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
233 
234  void clearCounters() noexcept {
235  timesRun_.store(0, std::memory_order_release);
236  timesVisited_.store(0, std::memory_order_release);
237  timesPassed_.store(0, std::memory_order_release);
238  timesFailed_.store(0, std::memory_order_release);
239  timesExcept_.store(0, std::memory_order_release);
240  }
241 
242  void addedToPath() noexcept { ++numberOfPathsOn_; }
243  //NOTE: calling state() is done to force synchronization across threads
244  int timesRun() const noexcept { return timesRun_.load(std::memory_order_acquire); }
245  int timesVisited() const noexcept { return timesVisited_.load(std::memory_order_acquire); }
246  int timesPassed() const noexcept { return timesPassed_.load(std::memory_order_acquire); }
247  int timesFailed() const noexcept { return timesFailed_.load(std::memory_order_acquire); }
248  int timesExcept() const noexcept { return timesExcept_.load(std::memory_order_acquire); }
249  State state() const noexcept { return state_; }
250 
251  int timesPass() const noexcept { return timesPassed(); } // for backward compatibility only - to be removed soon
252 
253  virtual bool hasAccumulator() const noexcept = 0;
254 
255  // Used in PuttableProductResolver
257 
258  protected:
259  template <typename O>
261 
262  virtual void doClearModule() = 0;
263 
264  virtual std::string workerType() const = 0;
265  virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
266 
267  virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
268  virtual bool implNeedToRunSelection() const noexcept = 0;
269 
273 
275  size_t iTransformIndex,
278  ServiceWeakToken const&) noexcept = 0;
279  virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const noexcept = 0;
280 
293  virtual void implBeginJob() = 0;
294  virtual void implEndJob() = 0;
295  virtual void implBeginStream(StreamID) = 0;
296  virtual void implEndStream(StreamID) = 0;
297 
299 
301 
302  private:
303  template <typename T>
304  bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
305 
306  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
307  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
308 
309  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
310 
311  virtual std::vector<ESResolverIndex> const& esItemsToGetFrom(Transition) const = 0;
312  virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
313 
315  ModuleCallingContext const& moduleCallingContext,
316  Principal const& iPrincipal) const noexcept = 0;
317 
318  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
319  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
320  virtual void implRespondToCloseOutputFile() = 0;
321 
323 
324  virtual TaskQueueAdaptor serializeRunModule() = 0;
325 
326  bool shouldRethrowException(std::exception_ptr iPtr,
327  ParentContext const& parentContext,
328  bool isEvent,
329  bool isTryToContinue) const noexcept;
331 
332  template <bool IS_EVENT>
333  bool setPassed() {
334  if (IS_EVENT) {
335  timesPassed_.fetch_add(1, std::memory_order_relaxed);
336  }
337  state_ = Pass;
338  return true;
339  }
340 
341  template <bool IS_EVENT>
342  bool setFailed() {
343  if (IS_EVENT) {
344  timesFailed_.fetch_add(1, std::memory_order_relaxed);
345  }
346  state_ = Fail;
347  return false;
348  }
349 
350  template <bool IS_EVENT>
351  std::exception_ptr setException(std::exception_ptr iException) {
352  if (IS_EVENT) {
353  timesExcept_.fetch_add(1, std::memory_order_relaxed);
354  }
355  cached_exception_ = iException; // propagate_const<T> has no reset() function
356  state_ = Exception;
357  return cached_exception_;
358  }
359 
360  template <typename T>
362  ServiceToken const&,
363  ParentContext const&,
364  typename T::TransitionInfoType const&,
365  Transition) noexcept;
366 
368  void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const noexcept;
369 
370  bool needsESPrefetching(Transition iTrans) const noexcept {
371  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
372  }
373 
375  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
376  }
377 
379  actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
381  }
382 
384  actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
386  }
387 
388  virtual bool hasAcquire() const noexcept = 0;
389 
390  template <typename T>
391  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
392  typename T::TransitionInfoType const&,
393  StreamID,
395  typename T::Context const*) noexcept;
396 
398 
399  void runAcquireAfterAsyncPrefetch(std::exception_ptr,
402  WaitingTaskWithArenaHolder) noexcept;
403 
404  std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
405  ParentContext const& parentContext) noexcept;
406 
407  template <typename T>
408  class RunModuleTask : public WaitingTask {
409  public:
411  typename T::TransitionInfoType const& transitionInfo,
412  ServiceToken const& token,
413  StreamID streamID,
414  ParentContext const& parentContext,
415  typename T::Context const* context,
416  oneapi::tbb::task_group* iGroup) noexcept
417  : m_worker(worker),
418  m_transitionInfo(transitionInfo),
419  m_streamID(streamID),
420  m_parentContext(parentContext),
421  m_context(context),
422  m_serviceToken(token),
423  m_group(iGroup) {}
424 
427  EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
428  EnableQueueGuard(EnableQueueGuard const&) = delete;
429  EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
430  EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
431  EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
433  if (queue_) {
434  queue_->resume();
435  }
436  }
437  };
438 
439  void execute() final {
440  //Need to make the services available early so other services can see them
441  ServiceRegistry::Operate guard(m_serviceToken.lock());
442 
443  //incase the emit causes an exception, we need a memory location
444  // to hold the exception_ptr
445  std::exception_ptr temp_excptr;
446  auto excptr = exceptionPtr();
447  if constexpr (T::isEvent_) {
448  if (!m_worker->hasAcquire()) {
449  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
450  CMS_SA_ALLOW try {
451  //pre was called in prefetchAsync
452  m_worker->emitPostModuleEventPrefetchingSignal();
453  } catch (...) {
454  temp_excptr = std::current_exception();
455  if (not excptr) {
456  excptr = temp_excptr;
457  }
458  }
459  }
460  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
461  m_worker->emitPostModuleStreamPrefetchingSignal();
462  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
463  m_worker->emitPostModuleGlobalPrefetchingSignal();
464  }
465 
466  if (not excptr) {
467  if (auto queue = m_worker->serializeRunModule()) {
468  auto f = [worker = m_worker,
469  info = m_transitionInfo,
470  streamID = m_streamID,
471  parentContext = m_parentContext,
472  sContext = m_context,
473  serviceToken = m_serviceToken]() {
474  //Need to make the services available
475  ServiceRegistry::Operate operateRunModule(serviceToken.lock());
476 
477  //If needed, we pause the queue in begin transition and resume it
478  // at the end transition. This can guarantee that the module
479  // only processes one run or lumi at a time
481  std::exception_ptr ptr;
482  worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
483  };
484  //keep another global transition from running if necessary
485  auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
486  if (gQueue) {
487  gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
488  gQueue->pause();
489  queue.push(*group, std::move(f));
490  });
491  } else {
492  queue.push(*m_group, std::move(f));
493  }
494  return;
495  }
496  }
497 
498  m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
499  }
500 
501  private:
503  typename T::TransitionInfoType m_transitionInfo;
506  typename T::Context const* m_context;
508  oneapi::tbb::task_group* m_group;
509  };
510 
511  // AcquireTask is only used for the Event case, but we define
512  // it as a template so all cases will compile.
513  // DUMMY exists to work around the C++ Standard prohibition on
514  // fully specializing templates nested in other classes.
515  template <typename T, typename DUMMY = void>
516  class AcquireTask : public WaitingTask {
517  public:
519  typename T::TransitionInfoType const&,
520  ServiceToken const&,
521  ParentContext const&,
522  WaitingTaskWithArenaHolder) noexcept {}
523  void execute() final {}
524  };
525 
526  template <typename DUMMY>
528  public:
530  EventTransitionInfo const& eventTransitionInfo,
531  ServiceToken const& token,
532  ParentContext const& parentContext,
533  WaitingTaskWithArenaHolder holder) noexcept
534  : m_worker(worker),
535  m_eventTransitionInfo(eventTransitionInfo),
536  m_parentContext(parentContext),
537  m_holder(std::move(holder)),
538  m_serviceToken(token) {}
539 
540  void execute() final {
541  //Need to make the services available early so other services can see them
542  ServiceRegistry::Operate guard(m_serviceToken.lock());
543 
544  //incase the emit causes an exception, we need a memory location
545  // to hold the exception_ptr
546  std::exception_ptr temp_excptr;
547  auto excptr = exceptionPtr();
548  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
549  CMS_SA_ALLOW try {
550  //pre was called in prefetchAsync
551  m_worker->emitPostModuleEventPrefetchingSignal();
552  } catch (...) {
553  temp_excptr = std::current_exception();
554  if (not excptr) {
555  excptr = temp_excptr;
556  }
557  }
558 
559  if (not excptr) {
560  if (auto queue = m_worker->serializeRunModule()) {
561  queue.push(*m_holder.group(),
562  [worker = m_worker,
563  info = m_eventTransitionInfo,
564  parentContext = m_parentContext,
565  serviceToken = m_serviceToken,
566  holder = m_holder]() {
567  //Need to make the services available
568  ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
569 
570  std::exception_ptr ptr;
571  worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
572  });
573  return;
574  }
575  }
576 
577  m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
578  }
579 
580  private:
586  };
587 
588  // This class does nothing unless there is an exception originating
589  // in an "External Worker". In that case, it handles converting the
590  // exception to a CMS exception and adding context to the exception.
592  public:
594  oneapi::tbb::task_group* group,
595  WaitingTask* runModuleTask,
596  ParentContext const& parentContext) noexcept;
597 
598  void execute() final;
599 
600  private:
603  oneapi::tbb::task_group* m_group;
605  };
606 
607  std::atomic<int> timesRun_;
608  std::atomic<int> timesVisited_;
609  std::atomic<int> timesPassed_;
610  std::atomic<int> timesFailed_;
611  std::atomic<int> timesExcept_;
612  std::atomic<State> state_;
614  std::atomic<int> numberOfPathsLeftToRun_;
615 
617 
618  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
619  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
620 
621  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
622 
624 
626  std::atomic<bool> workStarted_;
628  bool moduleValid_ = true;
629  bool shouldTryToContinue_ = false;
630  bool beginSucceeded_ = false;
631  };
632 
633  namespace {
634  template <typename T>
635  class ModuleSignalSentry {
636  public:
637  ModuleSignalSentry(ActivityRegistry* a,
638  typename T::Context const* context,
639  ModuleCallingContext const* moduleCallingContext)
640  : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {}
641 
642  ~ModuleSignalSentry() {
643  // This destructor does nothing unless we are unwinding the
644  // the stack from an earlier exception (a_ will be null if we are
645  // are not). We want to report the earlier exception and ignore any
646  // addition exceptions from the post module signal.
647  CMS_SA_ALLOW try {
648  if (a_) {
649  T::postModuleSignal(a_, context_, moduleCallingContext_);
650  }
651  } catch (...) {
652  }
653  }
654  void preModuleSignal() {
655  if (a_) {
656  try {
657  convertException::wrap([this]() { T::preModuleSignal(a_, context_, moduleCallingContext_); });
658  } catch (cms::Exception& ex) {
659  ex.addContext("Handling pre module signal, likely in a service function immediately before module method");
660  throw;
661  }
662  }
663  }
664  void postModuleSignal() {
665  if (a_) {
666  auto temp = a_;
667  // Setting a_ to null informs the destructor that the signal
668  // was already run and that it should do nothing.
669  a_ = nullptr;
670  try {
671  convertException::wrap([this, temp]() { T::postModuleSignal(temp, context_, moduleCallingContext_); });
672  } catch (cms::Exception& ex) {
673  ex.addContext("Handling post module signal, likely in a service function immediately after module method");
674  throw;
675  }
676  }
677  }
678 
679  private:
680  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
681  typename T::Context const* context_;
682  ModuleCallingContext const* moduleCallingContext_;
683  };
684 
685  } // namespace
686 
687  namespace workerhelper {
688  template <>
690  public:
692  static bool call(Worker* iWorker,
693  StreamID,
694  EventTransitionInfo const& info,
695  ActivityRegistry* /* actReg */,
696  ModuleCallingContext const* mcc,
697  Arg::Context const* /* context*/) {
698  //Signal sentry is handled by the module
699  return iWorker->implDo(info, mcc);
700  }
701  static void esPrefetchAsync(Worker* worker,
702  WaitingTaskHolder waitingTask,
703  ServiceToken const& token,
704  EventTransitionInfo const& info,
705  Transition transition) noexcept {
706  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
707  }
708  static bool wantsTransition(Worker const* iWorker) noexcept { return true; }
709  static bool needToRunSelection(Worker const* iWorker) noexcept { return iWorker->implNeedToRunSelection(); }
710 
711  static SerialTaskQueue* pauseGlobalQueue(Worker*) noexcept { return nullptr; }
712  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
713  };
714 
715  template <>
717  public:
719  static bool call(Worker* iWorker,
720  StreamID,
721  RunTransitionInfo const& info,
722  ActivityRegistry* actReg,
723  ModuleCallingContext const* mcc,
724  Arg::Context const* context) {
725  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
726  // If preModuleSignal() throws, implDoBegin() is not called, and the
727  // cpp destructor calls postModuleSignal (ignoring additional exceptions)
728  cpp.preModuleSignal();
729  // If implDoBegin() throws, the cpp destructor calls postModuleSignal
730  // (ignoring additional exceptions)
731  auto returnValue = iWorker->implDoBegin(info, mcc);
732  // If postModuleSignal() throws, the exception will propagate to the framework
733  cpp.postModuleSignal();
734  iWorker->beginSucceeded_ = true;
735  return returnValue;
736  }
737  static void esPrefetchAsync(Worker* worker,
738  WaitingTaskHolder waitingTask,
739  ServiceToken const& token,
740  RunTransitionInfo const& info,
741  Transition transition) noexcept {
742  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
743  }
744  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
745  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
746  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
747  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
748  };
749  template <>
751  public:
753  static bool call(Worker* iWorker,
754  StreamID id,
755  RunTransitionInfo const& info,
756  ActivityRegistry* actReg,
757  ModuleCallingContext const* mcc,
758  Arg::Context const* context) {
759  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
760  cpp.preModuleSignal();
761  auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
762  cpp.postModuleSignal();
763  iWorker->beginSucceeded_ = true;
764  return returnValue;
765  }
766  static void esPrefetchAsync(Worker* worker,
767  WaitingTaskHolder waitingTask,
768  ServiceToken const& token,
769  RunTransitionInfo const& info,
770  Transition transition) noexcept {
771  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
772  }
773  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
774  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
775  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
776  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
777  };
778  template <>
780  public:
782  static bool call(Worker* iWorker,
783  StreamID,
784  RunTransitionInfo const& info,
785  ActivityRegistry* actReg,
786  ModuleCallingContext const* mcc,
787  Arg::Context const* context) {
788  if (iWorker->beginSucceeded_) {
789  iWorker->beginSucceeded_ = false;
790 
791  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
792  cpp.preModuleSignal();
793  auto returnValue = iWorker->implDoEnd(info, mcc);
794  cpp.postModuleSignal();
795  return returnValue;
796  }
797  return true;
798  }
799  static void esPrefetchAsync(Worker* worker,
800  WaitingTaskHolder waitingTask,
801  ServiceToken const& token,
802  RunTransitionInfo const& info,
803  Transition transition) noexcept {
804  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
805  }
806  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
807  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
808  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
809  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
810  };
811  template <>
813  public:
815  static bool call(Worker* iWorker,
816  StreamID id,
817  RunTransitionInfo const& info,
818  ActivityRegistry* actReg,
819  ModuleCallingContext const* mcc,
820  Arg::Context const* context) {
821  if (iWorker->beginSucceeded_) {
822  iWorker->beginSucceeded_ = false;
823 
824  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
825  cpp.preModuleSignal();
826  auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
827  cpp.postModuleSignal();
828  return returnValue;
829  }
830  return true;
831  }
832  static void esPrefetchAsync(Worker* worker,
833  WaitingTaskHolder waitingTask,
834  ServiceToken const& token,
835  RunTransitionInfo const& info,
836  Transition transition) noexcept {
837  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
838  }
839  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
840  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
841  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
842  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
843  };
844 
845  template <>
847  public:
849  static bool call(Worker* iWorker,
850  StreamID,
851  LumiTransitionInfo const& info,
852  ActivityRegistry* actReg,
853  ModuleCallingContext const* mcc,
854  Arg::Context const* context) {
855  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
856  cpp.preModuleSignal();
857  auto returnValue = iWorker->implDoBegin(info, mcc);
858  cpp.postModuleSignal();
859  iWorker->beginSucceeded_ = true;
860  return returnValue;
861  }
862  static void esPrefetchAsync(Worker* worker,
863  WaitingTaskHolder waitingTask,
864  ServiceToken const& token,
865  LumiTransitionInfo const& info,
866  Transition transition) noexcept {
867  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
868  }
869  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
870  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
871  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept {
872  return iWorker->globalLuminosityBlocksQueue();
873  }
874  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
875  };
876  template <>
878  public:
880  static bool call(Worker* iWorker,
881  StreamID id,
882  LumiTransitionInfo const& info,
883  ActivityRegistry* actReg,
884  ModuleCallingContext const* mcc,
885  Arg::Context const* context) {
886  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
887  cpp.preModuleSignal();
888  auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
889  cpp.postModuleSignal();
890  iWorker->beginSucceeded_ = true;
891  return returnValue;
892  }
893  static void esPrefetchAsync(Worker* worker,
894  WaitingTaskHolder waitingTask,
895  ServiceToken const& token,
896  LumiTransitionInfo const& info,
897  Transition transition) noexcept {
898  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
899  }
900  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
901  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
902  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
903  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
904  };
905 
906  template <>
908  public:
910  static bool call(Worker* iWorker,
911  StreamID,
912  LumiTransitionInfo const& info,
913  ActivityRegistry* actReg,
914  ModuleCallingContext const* mcc,
915  Arg::Context const* context) {
916  if (iWorker->beginSucceeded_) {
917  iWorker->beginSucceeded_ = false;
918 
919  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
920  cpp.preModuleSignal();
921  auto returnValue = iWorker->implDoEnd(info, mcc);
922  cpp.postModuleSignal();
923  return returnValue;
924  }
925  return true;
926  }
927  static void esPrefetchAsync(Worker* worker,
928  WaitingTaskHolder waitingTask,
929  ServiceToken const& token,
930  LumiTransitionInfo const& info,
931  Transition transition) noexcept {
932  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
933  }
934  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
935  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
936  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
937  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept {
938  return iWorker->globalLuminosityBlocksQueue();
939  }
940  };
941  template <>
943  public:
945  static bool call(Worker* iWorker,
946  StreamID id,
947  LumiTransitionInfo const& info,
948  ActivityRegistry* actReg,
949  ModuleCallingContext const* mcc,
950  Arg::Context const* context) {
951  if (iWorker->beginSucceeded_) {
952  iWorker->beginSucceeded_ = false;
953 
954  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
955  cpp.preModuleSignal();
956  auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
957  cpp.postModuleSignal();
958  return returnValue;
959  }
960  return true;
961  }
962  static void esPrefetchAsync(Worker* worker,
963  WaitingTaskHolder waitingTask,
964  ServiceToken const& token,
965  LumiTransitionInfo const& info,
966  Transition transition) noexcept {
967  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
968  }
969  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
970  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
971  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
972  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
973  };
974  template <>
976  public:
978  static bool call(Worker* iWorker,
979  StreamID,
981  ActivityRegistry* actReg,
982  ModuleCallingContext const* mcc,
983  Arg::Context const* context) {
984  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
985  cpp.preModuleSignal();
986  auto returnValue = iWorker->implDoBeginProcessBlock(info.principal(), mcc);
987  cpp.postModuleSignal();
988  iWorker->beginSucceeded_ = true;
989  return returnValue;
990  }
991  static void esPrefetchAsync(
993  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
994  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
995  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
996  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
997  };
998  template <>
1000  public:
1002  static bool call(Worker* iWorker,
1003  StreamID,
1005  ActivityRegistry* actReg,
1006  ModuleCallingContext const* mcc,
1007  Arg::Context const* context) {
1008  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
1009  cpp.preModuleSignal();
1010  auto returnValue = iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
1011  cpp.postModuleSignal();
1012  return returnValue;
1013  }
1014  static void esPrefetchAsync(
1016  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsInputProcessBlocks(); }
1017  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1018  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1019  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1020  };
1021  template <>
1023  public:
1025  static bool call(Worker* iWorker,
1026  StreamID,
1028  ActivityRegistry* actReg,
1029  ModuleCallingContext const* mcc,
1030  Arg::Context const* context) {
1031  if (iWorker->beginSucceeded_) {
1032  iWorker->beginSucceeded_ = false;
1033 
1034  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
1035  cpp.preModuleSignal();
1036  auto returnValue = iWorker->implDoEndProcessBlock(info.principal(), mcc);
1037  cpp.postModuleSignal();
1038  return returnValue;
1039  }
1040  return true;
1041  }
1042  static void esPrefetchAsync(
1044  static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
1045  static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1046  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1047  static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1048  };
1049  } // namespace workerhelper
1050 
1051  template <typename T>
1053  ServiceToken const& token,
1054  ParentContext const& parentContext,
1055  typename T::TransitionInfoType const& transitionInfo,
1056  Transition iTransition) noexcept {
1057  Principal const& principal = transitionInfo.principal();
1058 
1059  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1060 
1061  if constexpr (T::isEvent_) {
1062  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1063  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
1064  actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1065  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
1066  actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
1067  }
1068 
1069  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
1070  edPrefetchAsync(iTask, token, principal);
1071 
1072  if (principal.branchType() == InEvent) {
1073  preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
1074  }
1075  }
1076 
1077  template <typename T>
1079  typename T::TransitionInfoType const& transitionInfo,
1080  ServiceToken const& token,
1081  StreamID streamID,
1082  ParentContext const& parentContext,
1083  typename T::Context const* context) noexcept {
1085  return;
1086  }
1087 
1088  //Need to check workStarted_ before adding to waitingTasks_
1089  bool expected = false;
1090  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
1091 
1092  waitingTasks_.add(task);
1093  if constexpr (T::isEvent_) {
1094  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1095  }
1096 
1097  if (workStarted) {
1098  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1099 
1100  //if have TriggerResults based selection we want to reject the event before doing prefetching
1102  //We need to run the selection in a different task so that
1103  // we can prefetch the data needed for the selection
1104  WaitingTask* moduleTask =
1105  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1106 
1107  //make sure the task is either run or destroyed
1108  struct DestroyTask {
1109  DestroyTask(edm::WaitingTask* iTask) noexcept : m_task(iTask) {}
1110 
1111  ~DestroyTask() noexcept {
1112  auto p = m_task.exchange(nullptr);
1113  if (p) {
1114  TaskSentry s{p};
1115  }
1116  }
1117 
1118  edm::WaitingTask* release() noexcept { return m_task.exchange(nullptr); }
1119 
1120  private:
1121  std::atomic<edm::WaitingTask*> m_task;
1122  };
1123  if constexpr (T::isEvent_) {
1124  if (hasAcquire()) {
1125  auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1126  ServiceWeakToken weakToken = token;
1127  auto* group = task.group();
1128  moduleTask = make_waiting_task(
1129  [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1130  WaitingTaskWithArenaHolder runTaskHolder(
1131  *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1132  AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1133  t.execute();
1134  });
1135  }
1136  }
1137  auto* group = task.group();
1138  auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1139  ServiceWeakToken weakToken = token;
1140  auto selectionTask =
1141  make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1142  std::exception_ptr const*) mutable {
1143  ServiceRegistry::Operate guard(weakToken.lock());
1144  prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1145  weakToken.lock(),
1146  parentContext,
1147  info,
1148  T::transition_);
1149  });
1150  prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1151  } else {
1152  WaitingTask* moduleTask =
1153  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1154  auto group = task.group();
1155  if constexpr (T::isEvent_) {
1156  if (hasAcquire()) {
1157  WaitingTaskWithArenaHolder runTaskHolder(
1158  *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1159  moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1160  }
1161  }
1162  prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1163  }
1164  }
1165  }
1166 
1167  template <typename T>
1168  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1169  typename T::TransitionInfoType const& transitionInfo,
1170  StreamID streamID,
1171  ParentContext const& parentContext,
1172  typename T::Context const* context) noexcept {
1173  std::exception_ptr exceptionPtr;
1174  bool shouldRun = true;
1175  if (iEPtr) {
1176  if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1177  exceptionPtr = iEPtr;
1178  setException<T::isEvent_>(exceptionPtr);
1179  shouldRun = false;
1180  } else {
1181  if (not shouldTryToContinue_) {
1182  setPassed<T::isEvent_>();
1183  shouldRun = false;
1184  }
1185  }
1186  }
1187  if (shouldRun) {
1188  // Caught exception is propagated via WaitingTaskList
1189  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1190  exceptionPtr = std::current_exception();
1191  }
1192  } else {
1193  moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1194  }
1195  waitingTasks_.doneWaiting(exceptionPtr);
1196  return exceptionPtr;
1197  }
1198 
1199  template <typename T>
1201  typename T::TransitionInfoType const& transitionInfo,
1202  ServiceToken const& serviceToken,
1203  StreamID streamID,
1204  ParentContext const& parentContext,
1205  typename T::Context const* context) noexcept {
1207  return;
1208  }
1209 
1210  //Need to check workStarted_ before adding to waitingTasks_
1211  bool expected = false;
1212  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1213 
1214  waitingTasks_.add(task);
1215  if (workStarted) {
1216  ServiceWeakToken weakToken = serviceToken;
1217  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1218  std::exception_ptr exceptionPtr;
1219  // Caught exception is propagated via WaitingTaskList
1220  CMS_SA_ALLOW try {
1221  //Need to make the services available
1222  ServiceRegistry::Operate guard(weakToken.lock());
1223 
1224  this->runModule<T>(info, streamID, parentContext, context);
1225  } catch (...) {
1226  exceptionPtr = std::current_exception();
1227  }
1228  this->waitingTasks_.doneWaiting(exceptionPtr);
1229  };
1230 
1231  if (needsESPrefetching(T::transition_)) {
1232  auto group = task.group();
1233  auto afterPrefetch =
1234  edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1235  if (iExcept) {
1236  this->waitingTasks_.doneWaiting(*iExcept);
1237  } else {
1238  if (auto queue = this->serializeRunModule()) {
1239  queue.push(*group, toDo);
1240  } else {
1241  group->run(toDo);
1242  }
1243  }
1244  });
1245  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1246  esPrefetchAsync(
1247  WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1248  } else {
1249  auto group = task.group();
1250  if (auto queue = this->serializeRunModule()) {
1251  queue.push(*group, toDo);
1252  } else {
1253  group->run(toDo);
1254  }
1255  }
1256  }
1257  }
1258 
1259  template <typename T>
1260  bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1261  StreamID streamID,
1262  ParentContext const& parentContext,
1263  typename T::Context const* context) {
1264  //unscheduled producers should advance this
1265  //if (T::isEvent_) {
1266  // ++timesVisited_;
1267  //}
1268  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1269  if constexpr (T::isEvent_) {
1270  timesRun_.fetch_add(1, std::memory_order_relaxed);
1271  }
1272 
1273  bool rc = true;
1274  try {
1275  convertException::wrap([&]() {
1277  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1278 
1279  if (rc) {
1280  setPassed<T::isEvent_>();
1281  } else {
1282  setFailed<T::isEvent_>();
1283  }
1284  });
1285  } catch (cms::Exception& ex) {
1287  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1289  setException<T::isEvent_>(std::current_exception());
1290  std::rethrow_exception(cached_exception_);
1291  } else {
1292  rc = setPassed<T::isEvent_>();
1293  }
1294  }
1295 
1296  return rc;
1297  }
1298 
1299  template <typename T>
1300  std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1301  StreamID streamID,
1302  ParentContext const& parentContext,
1303  typename T::Context const* context) noexcept {
1304  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1305  std::exception_ptr prefetchingException; // null because there was no prefetching to do
1306  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1307  }
1308 } // namespace edm
1309 #endif
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:737
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const noexcept
Definition: Worker.cc:112
void clearModule()
Definition: Worker.h:122
std::atomic< int > timesVisited_
Definition: Worker.h:608
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
Definition: Worker.h:808
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
void execute() final
Definition: Worker.h:439
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:281
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void push(oneapi::tbb::task_group &iG, F &&iF)
Definition: Worker.h:107
T::Context const * m_context
Definition: Worker.h:506
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition) noexcept
Definition: Worker.h:1014
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:692
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:614
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:351
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:849
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
Definition: Worker.h:1200
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:251
int timesPassed() const noexcept
Definition: Worker.h:246
std::exception_ptr exceptionPtr() const noexcept
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:719
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder) noexcept
Definition: Worker.h:529
void execute() final
Definition: Worker.h:523
int timesVisited() const noexcept
Definition: Worker.h:245
int timesFailed() const noexcept
Definition: Worker.h:247
virtual ~Worker()
Definition: Worker.cc:101
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition) noexcept
Definition: Worker.h:991
ParentContext const m_parentContext
Definition: Worker.h:505
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:1002
std::atomic< int > timesExcept_
Definition: Worker.h:611
oneapi::tbb::task_group * m_group
Definition: Worker.h:508
std::atomic< bool > workStarted_
Definition: Worker.h:626
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:781
std::atomic< int > timesFailed_
Definition: Worker.h:610
void emitPostModuleStreamPrefetchingSignal()
Definition: Worker.h:378
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:370
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *) noexcept
Definition: Worker.cc:254
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:621
void clearCounters() noexcept
Definition: Worker.h:234
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
Definition: Worker.h:775
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
int timesRun() const noexcept
Definition: Worker.h:244
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:101
virtual bool hasAcquire() const noexcept=0
void beginStream(StreamID, StreamContext const &)
Definition: Worker.cc:332
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
virtual void implRespondToCloseOutputFile()=0
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const noexcept=0
ActivityRegistry * activityRegistry()
Definition: Worker.h:300
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:753
bool moduleValid_
Definition: Worker.h:628
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
void reset()
Resets access to the resource so that added tasks will wait.
SerialTaskQueueChain * serial_
Definition: Worker.h:97
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:815
virtual std::string workerType() const =0
ModuleDescription const * moduleDescription() const noexcept
assert(be >=bs)
ExceptionToActionTable const * actions_
Definition: Worker.h:618
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder) noexcept
Definition: Worker.cc:411
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const noexcept=0
LimitedTaskQueue * limited_
Definition: Worker.h:98
bool ranAcquireWithoutException_
Definition: Worker.h:627
virtual bool hasAccumulator() const noexcept=0
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:103
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
oneapi::tbb::task_group * m_group
Definition: Worker.h:603
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:616
BranchType
Definition: BranchType.h:11
virtual bool wantsInputProcessBlocks() const noexcept=0
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const noexcept
Definition: Worker.cc:236
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext) noexcept
Definition: Worker.cc:435
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &) noexcept=0
std::exception_ptr cached_exception_
Definition: Worker.h:619
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
void emitPostModuleGlobalPrefetchingSignal()
Definition: Worker.h:383
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
bool setFailed()
Definition: Worker.h:342
virtual void selectInputProcessBlocks(ProductRegistry const &, ProcessBlockHelperBase const &)=0
bool resume()
Resumes processing if the queue was paused.
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
Definition: Worker.h:1300
int iEvent
Definition: GenABIO.cc:224
virtual bool wantsProcessBlocks() const noexcept=0
EnableQueueGuard(EnableQueueGuard &&iGuard)
Definition: Worker.h:431
Worker & operator=(Worker const &)=delete
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
Definition: Worker.h:841
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:625
T::TransitionInfoType m_transitionInfo
Definition: Worker.h:503
void reset()
Definition: Worker.h:190
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
ServiceWeakToken m_serviceToken
Definition: Worker.h:507
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
Definition: Worker.h:1168
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:718
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void endStream(StreamID, StreamContext const &)
Definition: Worker.cc:351
EnableQueueGuard(SerialTaskQueue *iQueue)
Definition: Worker.h:427
virtual std::vector< ConsumesInfo > consumesInfo() const =0
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
GlobalContext const * getGlobalContext() const noexcept(false)
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:372
virtual bool implNeedToRunSelection() const noexcept=0
#define CMS_THREAD_GUARD(_var_)
int timesExcept() const noexcept
Definition: Worker.h:248
Transition
Definition: Transition.h:12
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:82
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< State > state_
Definition: Worker.h:612
double f[11][100]
ModuleDescription const * description() const noexcept
Definition: Worker.h:200
def template(fileName, svg, replaceme="REPLACEME")
Definition: svgfig.py:521
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:179
Definition: Types.py:1
int numberOfPathsOn_
Definition: Worker.h:613
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom(Transition) const =0
void prefetchAsync(WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition) noexcept
Definition: Worker.h:1052
bool setPassed()
Definition: Worker.h:333
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:782
virtual void implEndJob()=0
virtual bool wantsGlobalLuminosityBlocks() const noexcept=0
virtual bool wantsStreamRuns() const noexcept=0
ServiceToken lock() const
Definition: ServiceToken.h:101
ConcurrencyTypes
Definition: Worker.h:95
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context, oneapi::tbb::task_group *iGroup) noexcept
Definition: Worker.h:410
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:701
virtual Types moduleType() const =0
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:978
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:102
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:691
StreamContext const * getStreamContext() const noexcept(false)
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:832
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual TaskQueueAdaptor serializeRunModule()=0
virtual SerialTaskQueue * globalRunsQueue()=0
virtual bool wantsStreamLuminosityBlocks() const noexcept=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:752
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext) noexcept
Definition: Worker.cc:449
virtual void implBeginJob()=0
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition) noexcept
Definition: Worker.h:1042
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:185
bool beginSucceeded_
Definition: Worker.h:630
#define DUMMY
void respondToCloseOutputFile()
Definition: Worker.h:187
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *) noexcept
Definition: Worker.cc:147
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:381
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:880
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual void implBeginStream(StreamID)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:962
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker) noexcept
Definition: Worker.h:809
void addedToPath() noexcept
Definition: Worker.h:242
virtual void implEndStream(StreamID)=0
virtual void doClearModule()=0
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:186
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:799
virtual void convertCurrentProcessAlias(std::string const &processName)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:927
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1260
HLT enums.
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:374
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder) noexcept
Definition: Worker.h:518
edm::WaitingTaskList & waitingTaskList() noexcept
Definition: Worker.h:256
void checkForShouldTryToContinue(ModuleDescription const &)
Definition: Worker.cc:105
double a
Definition: hdecay.h:121
std::atomic< int > timesPassed_
Definition: Worker.h:609
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *) noexcept
Definition: Worker.h:1078
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:396
void endJob(GlobalContext const &)
Definition: Worker.cc:311
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:390
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:893
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:862
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:945
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:910
int timesPass() const noexcept
Definition: Worker.h:251
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &) noexcept
Definition: Worker.cc:211
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:623
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
std::atomic< int > timesRun_
Definition: Worker.h:607
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:1025
long double T
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker) noexcept
Definition: Worker.h:746
void beginJob(GlobalContext const &)
Definition: Worker.cc:292
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:814
State state() const noexcept
Definition: Worker.h:249
virtual size_t transformIndex(edm::BranchDescription const &) const noexcept=0
Definition: Worker.cc:253
def move(src, dest)
Definition: eostools.py:511
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
virtual ConcurrencyTypes moduleConcurrencyType() const =0
virtual bool wantsGlobalRuns() const noexcept=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition) noexcept
Definition: Worker.h:766
BranchType const & branchType() const
Definition: Principal.h:175
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
bool shouldTryToContinue_
Definition: Worker.h:629
virtual void modulesWhoseProductsAreConsumed(std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const *> const &labelsToDesc) const =0
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0