CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
58 
60 
61 #include <array>
62 #include <atomic>
63 #include <cassert>
64 #include <map>
65 #include <memory>
66 #include <sstream>
67 #include <string>
68 #include <vector>
69 #include <exception>
70 #include <unordered_map>
71 
72 namespace edm {
73  class EventPrincipal;
74  class EventSetupImpl;
75  class EarlyDeleteHelper;
76  class ModuleProcessName;
77  class ProductResolverIndexHelper;
78  class ProductResolverIndexAndSkipBit;
79  class StreamID;
80  class StreamContext;
81  class ProductRegistry;
82  class ThinnedAssociationsHelper;
83 
84  namespace workerhelper {
85  template <typename O>
86  class CallImpl;
87  }
88  namespace eventsetup {
90  }
91 
92  class Worker {
93  public:
94  enum State { Ready, Pass, Fail, Exception };
100 
101  TaskQueueAdaptor() = default;
103  TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
104 
105  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
106 
107  template <class F>
108  void push(oneapi::tbb::task_group& iG, F&& iF) {
109  if (serial_) {
110  serial_->push(iG, iF);
111  } else {
112  limited_->push(iG, iF);
113  }
114  }
115  };
116 
117  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
118  virtual ~Worker();
119 
120  Worker(Worker const&) = delete; // Disallow copying and moving
121  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
122 
123  void clearModule() {
124  moduleValid_ = false;
125  doClearModule();
126  }
127 
128  virtual bool wantsProcessBlocks() const = 0;
129  virtual bool wantsInputProcessBlocks() const = 0;
130  virtual bool wantsGlobalRuns() const = 0;
131  virtual bool wantsGlobalLuminosityBlocks() const = 0;
132  virtual bool wantsStreamRuns() const = 0;
133  virtual bool wantsStreamLuminosityBlocks() const = 0;
134 
135  virtual SerialTaskQueue* globalRunsQueue() = 0;
137 
139  oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, EventPrincipal const*);
140 
142  oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) {
143  assert(false);
144  }
145 
146  template <typename T>
148  typename T::TransitionInfoType const&,
149  ServiceToken const&,
150  StreamID,
151  ParentContext const&,
152  typename T::Context const*);
153 
154  template <typename T>
156  typename T::TransitionInfoType const&,
157  ServiceToken const&,
158  StreamID,
159  ParentContext const&,
160  typename T::Context const*);
161 
162  template <typename T>
163  std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
164  StreamID,
165  ParentContext const&,
166  typename T::Context const*);
167 
168  virtual size_t transformIndex(edm::BranchDescription const&) const = 0;
170  size_t iTransformIndex,
171  EventPrincipal const&,
172  ServiceToken const&,
173  StreamID,
174  ModuleCallingContext const&,
175  StreamContext const*);
176 
178  void skipOnPath(EventPrincipal const& iEvent);
179  void beginJob();
180  void endJob();
181  void beginStream(StreamID id, StreamContext& streamContext);
182  void endStream(StreamID id, StreamContext& streamContext);
187 
188  void reset() {
189  cached_exception_ = std::exception_ptr();
190  state_ = Ready;
192  workStarted_ = false;
194  }
195 
196  void postDoEvent(EventPrincipal const&);
197 
199  if (moduleValid_) {
201  }
202  return nullptr;
203  }
206  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
207 
209 
210  //Used to make EDGetToken work
211  virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
212  virtual void updateLookup(eventsetup::ESRecordsToProxyIndices const&) = 0;
213  virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
214  virtual void resolvePutIndicies(
215  BranchType iBranchType,
216  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
217  iIndicies) = 0;
218 
219  virtual void modulesWhoseProductsAreConsumed(
220  std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
221  std::vector<ModuleProcessName>& modulesInPreviousProcesses,
222  ProductRegistry const& preg,
223  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
224 
225  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
226 
227  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
228 
229  virtual Types moduleType() const = 0;
230  virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
231 
232  void clearCounters() {
233  timesRun_.store(0, std::memory_order_release);
234  timesVisited_.store(0, std::memory_order_release);
235  timesPassed_.store(0, std::memory_order_release);
236  timesFailed_.store(0, std::memory_order_release);
237  timesExcept_.store(0, std::memory_order_release);
238  }
239 
241  //NOTE: calling state() is done to force synchronization across threads
242  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
243  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
244  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
245  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
246  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
247  State state() const { return state_; }
248 
249  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
250 
251  virtual bool hasAccumulator() const = 0;
252 
253  // Used in PuttableProductResolver
255 
256  protected:
257  template <typename O>
259 
260  virtual void doClearModule() = 0;
261 
262  virtual std::string workerType() const = 0;
263  virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
264 
265  virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
266  virtual bool implNeedToRunSelection() const = 0;
267 
268  virtual void implDoAcquire(EventTransitionInfo const&,
269  ModuleCallingContext const*,
271 
273  size_t iTransformIndex,
274  EventPrincipal const&,
275  ParentContext const&,
276  ServiceWeakToken const&) = 0;
277  virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const = 0;
278 
280  virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
282  virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
283  virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
284  virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
285  virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
286  virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
287  virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
288  virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
289  virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
290  virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
291  virtual void implBeginJob() = 0;
292  virtual void implEndJob() = 0;
293  virtual void implBeginStream(StreamID) = 0;
294  virtual void implEndStream(StreamID) = 0;
295 
297 
299 
300  private:
301  template <typename T>
302  bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
303 
304  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
305  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
306 
307  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
308 
309  virtual std::vector<ESProxyIndex> const& esItemsToGetFrom(Transition) const = 0;
310  virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
311 
313  ModuleCallingContext const& moduleCallingContext,
314  Principal const& iPrincipal) const = 0;
315 
316  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
317  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
318  virtual void implRespondToCloseOutputFile() = 0;
319 
321 
322  virtual TaskQueueAdaptor serializeRunModule() = 0;
323 
324  bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const;
325 
326  template <bool IS_EVENT>
327  bool setPassed() {
328  if (IS_EVENT) {
329  timesPassed_.fetch_add(1, std::memory_order_relaxed);
330  }
331  state_ = Pass;
332  return true;
333  }
334 
335  template <bool IS_EVENT>
336  bool setFailed() {
337  if (IS_EVENT) {
338  timesFailed_.fetch_add(1, std::memory_order_relaxed);
339  }
340  state_ = Fail;
341  return false;
342  }
343 
344  template <bool IS_EVENT>
345  std::exception_ptr setException(std::exception_ptr iException) {
346  if (IS_EVENT) {
347  timesExcept_.fetch_add(1, std::memory_order_relaxed);
348  }
349  cached_exception_ = iException; // propagate_const<T> has no reset() function
350  state_ = Exception;
351  return cached_exception_;
352  }
353 
354  template <typename T>
356  ServiceToken const&,
357  ParentContext const&,
358  typename T::TransitionInfoType const&,
359  Transition);
360 
362  void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const;
363 
364  bool needsESPrefetching(Transition iTrans) const noexcept {
365  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
366  }
367 
369  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
370  }
371 
373  actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
375  }
376 
378  actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
380  }
381 
382  virtual bool hasAcquire() const = 0;
383 
384  template <typename T>
385  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
386  typename T::TransitionInfoType const&,
387  StreamID,
388  ParentContext const&,
389  typename T::Context const*);
390 
392 
393  void runAcquireAfterAsyncPrefetch(std::exception_ptr,
394  EventTransitionInfo const&,
395  ParentContext const&,
397 
398  std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const& parentContext);
399 
400  template <typename T>
401  class RunModuleTask : public WaitingTask {
402  public:
404  typename T::TransitionInfoType const& transitionInfo,
405  ServiceToken const& token,
406  StreamID streamID,
407  ParentContext const& parentContext,
408  typename T::Context const* context,
409  oneapi::tbb::task_group* iGroup)
410  : m_worker(worker),
411  m_transitionInfo(transitionInfo),
412  m_streamID(streamID),
413  m_parentContext(parentContext),
416  m_group(iGroup) {}
417 
421  EnableQueueGuard(EnableQueueGuard const&) = delete;
422  EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
424  EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
426  if (queue_) {
427  queue_->resume();
428  }
429  }
430  };
431 
432  void execute() final {
433  //Need to make the services available early so other services can see them
435 
436  //incase the emit causes an exception, we need a memory location
437  // to hold the exception_ptr
438  std::exception_ptr temp_excptr;
439  auto excptr = exceptionPtr();
440  if constexpr (T::isEvent_) {
441  if (!m_worker->hasAcquire()) {
442  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
443  CMS_SA_ALLOW try {
444  //pre was called in prefetchAsync
446  } catch (...) {
447  temp_excptr = std::current_exception();
448  if (not excptr) {
449  excptr = temp_excptr;
450  }
451  }
452  }
453  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
455  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
457  }
458 
459  if (not excptr) {
460  if (auto queue = m_worker->serializeRunModule()) {
461  auto f = [worker = m_worker,
463  streamID = m_streamID,
464  parentContext = m_parentContext,
465  sContext = m_context,
466  serviceToken = m_serviceToken]() {
467  //Need to make the services available
468  ServiceRegistry::Operate operateRunModule(serviceToken.lock());
469 
470  //If needed, we pause the queue in begin transition and resume it
471  // at the end transition. This can guarantee that the module
472  // only processes one run or lumi at a time
474  std::exception_ptr ptr;
475  worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
476  };
477  //keep another global transition from running if necessary
479  if (gQueue) {
480  gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
481  gQueue->pause();
482  queue.push(*group, std::move(f));
483  });
484  } else {
485  queue.push(*m_group, std::move(f));
486  }
487  return;
488  }
489  }
490 
492  }
493 
494  private:
496  typename T::TransitionInfoType m_transitionInfo;
499  typename T::Context const* m_context;
501  oneapi::tbb::task_group* m_group;
502  };
503 
504  // AcquireTask is only used for the Event case, but we define
505  // it as a template so all cases will compile.
506  // DUMMY exists to work around the C++ Standard prohibition on
507  // fully specializing templates nested in other classes.
508  template <typename T, typename DUMMY = void>
509  class AcquireTask : public WaitingTask {
510  public:
512  typename T::TransitionInfoType const&,
513  ServiceToken const&,
514  ParentContext const&,
516  void execute() final {}
517  };
518 
519  template <typename DUMMY>
521  public:
523  EventTransitionInfo const& eventTransitionInfo,
524  ServiceToken const& token,
525  ParentContext const& parentContext,
527  : m_worker(worker),
528  m_eventTransitionInfo(eventTransitionInfo),
529  m_parentContext(parentContext),
530  m_holder(std::move(holder)),
531  m_serviceToken(token) {}
532 
533  void execute() final {
534  //Need to make the services available early so other services can see them
535  ServiceRegistry::Operate guard(m_serviceToken.lock());
536 
537  //incase the emit causes an exception, we need a memory location
538  // to hold the exception_ptr
539  std::exception_ptr temp_excptr;
540  auto excptr = exceptionPtr();
541  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
542  CMS_SA_ALLOW try {
543  //pre was called in prefetchAsync
544  m_worker->emitPostModuleEventPrefetchingSignal();
545  } catch (...) {
546  temp_excptr = std::current_exception();
547  if (not excptr) {
548  excptr = temp_excptr;
549  }
550  }
551 
552  if (not excptr) {
553  if (auto queue = m_worker->serializeRunModule()) {
554  queue.push(*m_holder.group(),
555  [worker = m_worker,
556  info = m_eventTransitionInfo,
557  parentContext = m_parentContext,
558  serviceToken = m_serviceToken,
559  holder = m_holder]() {
560  //Need to make the services available
561  ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
562 
563  std::exception_ptr ptr;
564  worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
565  });
566  return;
567  }
568  }
569 
570  m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
571  }
572 
573  private:
579  };
580 
581  // This class does nothing unless there is an exception originating
582  // in an "External Worker". In that case, it handles converting the
583  // exception to a CMS exception and adding context to the exception.
585  public:
587  oneapi::tbb::task_group* group,
588  WaitingTask* runModuleTask,
589  ParentContext const& parentContext);
590 
591  void execute() final;
592 
593  private:
596  oneapi::tbb::task_group* m_group;
598  };
599 
600  std::atomic<int> timesRun_;
601  std::atomic<int> timesVisited_;
602  std::atomic<int> timesPassed_;
603  std::atomic<int> timesFailed_;
604  std::atomic<int> timesExcept_;
605  std::atomic<State> state_;
607  std::atomic<int> numberOfPathsLeftToRun_;
608 
610 
611  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
612  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
613 
614  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
615 
617 
619  std::atomic<bool> workStarted_;
621  bool moduleValid_ = true;
622  };
623 
624  namespace {
625  template <typename T>
626  class ModuleSignalSentry {
627  public:
628  ModuleSignalSentry(ActivityRegistry* a,
629  typename T::Context const* context,
630  ModuleCallingContext const* moduleCallingContext)
631  : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
632  if (a_)
633  T::preModuleSignal(a_, context, moduleCallingContext_);
634  }
635 
636  ~ModuleSignalSentry() {
637  if (a_)
638  T::postModuleSignal(a_, context_, moduleCallingContext_);
639  }
640 
641  private:
642  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
643  typename T::Context const* context_;
644  ModuleCallingContext const* moduleCallingContext_;
645  };
646 
647  } // namespace
648 
649  namespace workerhelper {
650  template <>
652  public:
654  static bool call(Worker* iWorker,
655  StreamID,
656  EventTransitionInfo const& info,
657  ActivityRegistry* /* actReg */,
658  ModuleCallingContext const* mcc,
659  Arg::Context const* /* context*/) {
660  //Signal sentry is handled by the module
661  return iWorker->implDo(info, mcc);
662  }
663  static void esPrefetchAsync(Worker* worker,
664  WaitingTaskHolder waitingTask,
665  ServiceToken const& token,
666  EventTransitionInfo const& info,
667  Transition transition) {
668  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
669  }
670  static bool wantsTransition(Worker const* iWorker) { return true; }
671  static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
672 
673  static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
674  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
675  };
676 
677  template <>
679  public:
681  static bool call(Worker* iWorker,
682  StreamID,
683  RunTransitionInfo const& info,
684  ActivityRegistry* actReg,
685  ModuleCallingContext const* mcc,
686  Arg::Context const* context) {
687  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
688  return iWorker->implDoBegin(info, mcc);
689  }
690  static void esPrefetchAsync(Worker* worker,
691  WaitingTaskHolder waitingTask,
692  ServiceToken const& token,
693  RunTransitionInfo const& info,
694  Transition transition) {
695  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
696  }
697  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
698  static bool needToRunSelection(Worker const* iWorker) { return false; }
699  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
700  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
701  };
702  template <>
704  public:
706  static bool call(Worker* iWorker,
707  StreamID id,
708  RunTransitionInfo const& info,
709  ActivityRegistry* actReg,
710  ModuleCallingContext const* mcc,
711  Arg::Context const* context) {
712  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
713  return iWorker->implDoStreamBegin(id, info, mcc);
714  }
715  static void esPrefetchAsync(Worker* worker,
716  WaitingTaskHolder waitingTask,
717  ServiceToken const& token,
718  RunTransitionInfo const& info,
719  Transition transition) {
720  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
721  }
722  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
723  static bool needToRunSelection(Worker const* iWorker) { return false; }
724  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
725  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
726  };
727  template <>
729  public:
731  static bool call(Worker* iWorker,
732  StreamID,
733  RunTransitionInfo const& info,
734  ActivityRegistry* actReg,
735  ModuleCallingContext const* mcc,
736  Arg::Context const* context) {
737  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
738  return iWorker->implDoEnd(info, mcc);
739  }
740  static void esPrefetchAsync(Worker* worker,
741  WaitingTaskHolder waitingTask,
742  ServiceToken const& token,
743  RunTransitionInfo const& info,
744  Transition transition) {
745  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
746  }
747  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
748  static bool needToRunSelection(Worker const* iWorker) { return false; }
749  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
750  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
751  };
752  template <>
754  public:
756  static bool call(Worker* iWorker,
757  StreamID id,
758  RunTransitionInfo const& info,
759  ActivityRegistry* actReg,
760  ModuleCallingContext const* mcc,
761  Arg::Context const* context) {
762  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
763  return iWorker->implDoStreamEnd(id, info, mcc);
764  }
765  static void esPrefetchAsync(Worker* worker,
766  WaitingTaskHolder waitingTask,
767  ServiceToken const& token,
768  RunTransitionInfo const& info,
769  Transition transition) {
770  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
771  }
772  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
773  static bool needToRunSelection(Worker const* iWorker) { return false; }
774  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
775  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
776  };
777 
778  template <>
780  public:
782  static bool call(Worker* iWorker,
783  StreamID,
784  LumiTransitionInfo const& info,
785  ActivityRegistry* actReg,
786  ModuleCallingContext const* mcc,
787  Arg::Context const* context) {
788  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
789  return iWorker->implDoBegin(info, mcc);
790  }
791  static void esPrefetchAsync(Worker* worker,
792  WaitingTaskHolder waitingTask,
793  ServiceToken const& token,
794  LumiTransitionInfo const& info,
795  Transition transition) {
796  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
797  }
798  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
799  static bool needToRunSelection(Worker const* iWorker) { return false; }
800  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
801  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
802  };
803  template <>
805  public:
807  static bool call(Worker* iWorker,
808  StreamID id,
809  LumiTransitionInfo const& info,
810  ActivityRegistry* actReg,
811  ModuleCallingContext const* mcc,
812  Arg::Context const* context) {
813  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
814  return iWorker->implDoStreamBegin(id, info, mcc);
815  }
816  static void esPrefetchAsync(Worker* worker,
817  WaitingTaskHolder waitingTask,
818  ServiceToken const& token,
819  LumiTransitionInfo const& info,
820  Transition transition) {
821  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
822  }
823  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
824  static bool needToRunSelection(Worker const* iWorker) { return false; }
825  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
826  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
827  };
828 
829  template <>
831  public:
833  static bool call(Worker* iWorker,
834  StreamID,
835  LumiTransitionInfo const& info,
836  ActivityRegistry* actReg,
837  ModuleCallingContext const* mcc,
838  Arg::Context const* context) {
839  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
840  return iWorker->implDoEnd(info, mcc);
841  }
842  static void esPrefetchAsync(Worker* worker,
843  WaitingTaskHolder waitingTask,
844  ServiceToken const& token,
845  LumiTransitionInfo const& info,
846  Transition transition) {
847  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
848  }
849  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
850  static bool needToRunSelection(Worker const* iWorker) { return false; }
851  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
852  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
853  };
854  template <>
856  public:
858  static bool call(Worker* iWorker,
859  StreamID id,
860  LumiTransitionInfo const& info,
861  ActivityRegistry* actReg,
862  ModuleCallingContext const* mcc,
863  Arg::Context const* context) {
864  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
865  return iWorker->implDoStreamEnd(id, info, mcc);
866  }
867  static void esPrefetchAsync(Worker* worker,
868  WaitingTaskHolder waitingTask,
869  ServiceToken const& token,
870  LumiTransitionInfo const& info,
871  Transition transition) {
872  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
873  }
874  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
875  static bool needToRunSelection(Worker const* iWorker) { return false; }
876  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
877  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
878  };
879  template <>
881  public:
883  static bool call(Worker* iWorker,
884  StreamID,
886  ActivityRegistry* actReg,
887  ModuleCallingContext const* mcc,
888  Arg::Context const* context) {
889  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
890  return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
891  }
892  static void esPrefetchAsync(
894  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
895  static bool needToRunSelection(Worker const* iWorker) { return false; }
896  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
897  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
898  };
899  template <>
901  public:
903  static bool call(Worker* iWorker,
904  StreamID,
906  ActivityRegistry* actReg,
907  ModuleCallingContext const* mcc,
908  Arg::Context const* context) {
909  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
910  return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
911  }
912  static void esPrefetchAsync(
914  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsInputProcessBlocks(); }
915  static bool needToRunSelection(Worker const* iWorker) { return false; }
916  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
917  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
918  };
919  template <>
921  public:
923  static bool call(Worker* iWorker,
924  StreamID,
926  ActivityRegistry* actReg,
927  ModuleCallingContext const* mcc,
928  Arg::Context const* context) {
929  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
930  return iWorker->implDoEndProcessBlock(info.principal(), mcc);
931  }
932  static void esPrefetchAsync(
934  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
935  static bool needToRunSelection(Worker const* iWorker) { return false; }
936  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
937  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
938  };
939  } // namespace workerhelper
940 
941  template <typename T>
943  ServiceToken const& token,
944  ParentContext const& parentContext,
945  typename T::TransitionInfoType const& transitionInfo,
946  Transition iTransition) {
947  Principal const& principal = transitionInfo.principal();
948 
950 
951  if constexpr (T::isEvent_) {
952  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
953  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
954  actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
955  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
956  actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
957  }
958 
959  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
960  edPrefetchAsync(iTask, token, principal);
961 
962  if (principal.branchType() == InEvent) {
964  }
965  }
966 
967  template <typename T>
969  typename T::TransitionInfoType const& transitionInfo,
970  ServiceToken const& token,
971  StreamID streamID,
972  ParentContext const& parentContext,
973  typename T::Context const* context) {
975  return;
976  }
977 
978  //Need to check workStarted_ before adding to waitingTasks_
979  bool expected = false;
980  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
981 
983  if constexpr (T::isEvent_) {
984  timesVisited_.fetch_add(1, std::memory_order_relaxed);
985  }
986 
987  if (workStarted) {
989 
990  //if have TriggerResults based selection we want to reject the event before doing prefetching
992  //We need to run the selection in a different task so that
993  // we can prefetch the data needed for the selection
994  WaitingTask* moduleTask =
995  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
996 
997  //make sure the task is either run or destroyed
998  struct DestroyTask {
999  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
1000 
1001  ~DestroyTask() {
1002  auto p = m_task.exchange(nullptr);
1003  if (p) {
1004  TaskSentry s{p};
1005  }
1006  }
1007 
1008  edm::WaitingTask* release() { return m_task.exchange(nullptr); }
1009 
1010  private:
1011  std::atomic<edm::WaitingTask*> m_task;
1012  };
1013  if constexpr (T::isEvent_) {
1014  if (hasAcquire()) {
1015  auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1016  ServiceWeakToken weakToken = token;
1017  auto* group = task.group();
1018  moduleTask = make_waiting_task(
1019  [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1020  WaitingTaskWithArenaHolder runTaskHolder(
1021  *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1022  AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1023  t.execute();
1024  });
1025  }
1026  }
1027  auto* group = task.group();
1028  auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1029  ServiceWeakToken weakToken = token;
1030  auto selectionTask =
1031  make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1032  std::exception_ptr const*) mutable {
1033  ServiceRegistry::Operate guard(weakToken.lock());
1034  prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1035  weakToken.lock(),
1036  parentContext,
1037  info,
1038  T::transition_);
1039  });
1040  prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1041  } else {
1042  WaitingTask* moduleTask =
1043  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1044  auto group = task.group();
1045  if constexpr (T::isEvent_) {
1046  if (hasAcquire()) {
1047  WaitingTaskWithArenaHolder runTaskHolder(
1048  *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1049  moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1050  }
1051  }
1052  prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1053  }
1054  }
1055  }
1056 
1057  template <typename T>
1058  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1059  typename T::TransitionInfoType const& transitionInfo,
1060  StreamID streamID,
1061  ParentContext const& parentContext,
1062  typename T::Context const* context) {
1063  std::exception_ptr exceptionPtr;
1064  if (iEPtr) {
1065  if (shouldRethrowException(iEPtr, parentContext, T::isEvent_)) {
1066  exceptionPtr = iEPtr;
1067  setException<T::isEvent_>(exceptionPtr);
1068  } else {
1069  setPassed<T::isEvent_>();
1070  }
1072  } else {
1073  // Caught exception is propagated via WaitingTaskList
1074  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1075  exceptionPtr = std::current_exception();
1076  }
1077  }
1078  waitingTasks_.doneWaiting(exceptionPtr);
1079  return exceptionPtr;
1080  }
1081 
1082  template <typename T>
1084  typename T::TransitionInfoType const& transitionInfo,
1085  ServiceToken const& serviceToken,
1086  StreamID streamID,
1087  ParentContext const& parentContext,
1088  typename T::Context const* context) {
1090  return;
1091  }
1092 
1093  //Need to check workStarted_ before adding to waitingTasks_
1094  bool expected = false;
1095  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1096 
1098  if (workStarted) {
1099  ServiceWeakToken weakToken = serviceToken;
1100  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1101  std::exception_ptr exceptionPtr;
1102  // Caught exception is propagated via WaitingTaskList
1103  CMS_SA_ALLOW try {
1104  //Need to make the services available
1105  ServiceRegistry::Operate guard(weakToken.lock());
1106 
1107  this->runModule<T>(info, streamID, parentContext, context);
1108  } catch (...) {
1109  exceptionPtr = std::current_exception();
1110  }
1111  this->waitingTasks_.doneWaiting(exceptionPtr);
1112  };
1113 
1114  if (needsESPrefetching(T::transition_)) {
1115  auto group = task.group();
1116  auto afterPrefetch =
1117  edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1118  if (iExcept) {
1119  this->waitingTasks_.doneWaiting(*iExcept);
1120  } else {
1121  if (auto queue = this->serializeRunModule()) {
1122  queue.push(*group, toDo);
1123  } else {
1124  group->run(toDo);
1125  }
1126  }
1127  });
1129  WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1130  } else {
1131  auto group = task.group();
1132  if (auto queue = this->serializeRunModule()) {
1133  queue.push(*group, toDo);
1134  } else {
1135  group->run(toDo);
1136  }
1137  }
1138  }
1139  }
1140 
1141  template <typename T>
1142  bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1143  StreamID streamID,
1144  ParentContext const& parentContext,
1145  typename T::Context const* context) {
1146  //unscheduled producers should advance this
1147  //if (T::isEvent_) {
1148  // ++timesVisited_;
1149  //}
1150  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1151  if constexpr (T::isEvent_) {
1152  timesRun_.fetch_add(1, std::memory_order_relaxed);
1153  }
1154 
1155  bool rc = true;
1156  try {
1157  convertException::wrap([&]() {
1159  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1160 
1161  if (rc) {
1162  setPassed<T::isEvent_>();
1163  } else {
1164  setFailed<T::isEvent_>();
1165  }
1166  });
1167  } catch (cms::Exception& ex) {
1169  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_)) {
1171  setException<T::isEvent_>(std::current_exception());
1172  std::rethrow_exception(cached_exception_);
1173  } else {
1174  rc = setPassed<T::isEvent_>();
1175  }
1176  }
1177 
1178  return rc;
1179  }
1180 
1181  template <typename T>
1182  std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1183  StreamID streamID,
1184  ParentContext const& parentContext,
1185  typename T::Context const* context) {
1186  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1187  std::exception_ptr prefetchingException; // null because there was no prefetching to do
1188  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1189  }
1190 } // namespace edm
1191 #endif
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1182
virtual bool hasAcquire() const =0
void clearModule()
Definition: Worker.h:123
std::atomic< int > timesVisited_
Definition: Worker.h:601
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
virtual bool wantsStreamRuns() const =0
ModuleDescription const * moduleDescription() const
void execute() final
Definition: Worker.h:432
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:265
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1058
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void push(oneapi::tbb::task_group &iG, F &&iF)
Definition: Worker.h:108
T::Context const * m_context
Definition: Worker.h:499
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:654
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:607
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
virtual bool hasAccumulator() const =0
unsigned int ProductResolverIndex
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:345
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:782
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:237
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:522
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:681
void execute() final
Definition: Worker.h:516
virtual ~Worker()
Definition: Worker.cc:106
void endJob()
Definition: Worker.cc:289
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:791
ParentContext const m_parentContext
Definition: Worker.h:498
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:903
std::atomic< int > timesExcept_
Definition: Worker.h:604
oneapi::tbb::task_group * m_group
Definition: Worker.h:501
std::atomic< bool > workStarted_
Definition: Worker.h:619
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:730
std::atomic< int > timesFailed_
Definition: Worker.h:603
void emitPostModuleStreamPrefetchingSignal()
Definition: Worker.h:372
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:364
void addedToPath()
Definition: Worker.h:240
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
void clearCounters()
Definition: Worker.h:232
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:968
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:102
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.cc:393
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
virtual void implRespondToCloseOutputFile()=0
ActivityRegistry * activityRegistry()
Definition: Worker.h:298
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:706
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
Definition: Worker.h:141
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:417
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:306
bool moduleValid_
Definition: Worker.h:621
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
void reset()
Resets access to the resource so that added tasks will wait.
SerialTaskQueueChain * serial_
Definition: Worker.h:98
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:756
virtual std::string workerType() const =0
assert(be >=bs)
ExceptionToActionTable const * actions_
Definition: Worker.h:611
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
LimitedTaskQueue * limited_
Definition: Worker.h:99
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:620
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:108
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
oneapi::tbb::task_group * m_group
Definition: Worker.h:596
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
BranchType
Definition: BranchType.h:11
int timesPass() const
Definition: Worker.h:249
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.h:511
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:150
std::exception_ptr cached_exception_
Definition: Worker.h:612
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
void emitPostModuleGlobalPrefetchingSignal()
Definition: Worker.h:377
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
bool setFailed()
Definition: Worker.h:336
virtual void selectInputProcessBlocks(ProductRegistry const &, ProcessBlockHelperBase const &)=0
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
Definition: Worker.cc:240
std::exception_ptr exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool resume()
Resumes processing if the queue was paused.
int timesPassed() const
Definition: Worker.h:244
int iEvent
Definition: GenABIO.cc:224
EnableQueueGuard(EnableQueueGuard &&iGuard)
Definition: Worker.h:424
Worker & operator=(Worker const &)=delete
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
T::TransitionInfoType m_transitionInfo
Definition: Worker.h:496
void reset()
Definition: Worker.h:188
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:765
ServiceWeakToken m_serviceToken
Definition: Worker.h:500
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual bool wantsInputProcessBlocks() const =0
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:867
virtual bool wantsProcessBlocks() const =0
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:932
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:680
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:110
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:690
int timesFailed() const
Definition: Worker.h:245
EnableQueueGuard(SerialTaskQueue *iQueue)
Definition: Worker.h:420
virtual std::vector< ConsumesInfo > consumesInfo() const =0
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:354
#define CMS_THREAD_GUARD(_var_)
Transition
Definition: Transition.h:12
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:89
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< State > state_
Definition: Worker.h:605
double f[11][100]
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:177
Definition: Types.py:1
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:199
int timesVisited() const
Definition: Worker.h:243
int numberOfPathsOn_
Definition: Worker.h:606
int timesExcept() const
Definition: Worker.h:246
bool setPassed()
Definition: Worker.h:327
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:731
virtual void implEndJob()=0
ServiceToken lock() const
Definition: ServiceToken.h:101
virtual size_t transformIndex(edm::BranchDescription const &) const =0
Definition: Worker.cc:239
ConcurrencyTypes
Definition: Worker.h:96
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:430
virtual Types moduleType() const =0
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:883
ModuleDescription const * description() const
Definition: Worker.h:198
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:103
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:653
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1083
virtual bool implNeedToRunSelection() const =0
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual TaskQueueAdaptor serializeRunModule()=0
virtual SerialTaskQueue * globalRunsQueue()=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:705
virtual void implBeginJob()=0
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:183
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:224
#define DUMMY
void respondToCloseOutputFile()
Definition: Worker.h:185
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:363
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:807
virtual void implBeginStream(StreamID)=0
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
virtual void implEndStream(StreamID)=0
virtual void doClearModule()=0
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:184
virtual bool wantsStreamLuminosityBlocks() const =0
virtual void convertCurrentProcessAlias(std::string const &processName)=0
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1142
HLT enums.
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:368
State state() const
Definition: Worker.h:247
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:816
StreamContext const * getStreamContext() const
double a
Definition: hdecay.h:119
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition)
Definition: Worker.h:663
std::atomic< int > timesPassed_
Definition: Worker.h:602
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:378
void beginJob()
Definition: Worker.cc:273
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:330
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:372
virtual bool wantsGlobalLuminosityBlocks() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:740
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:858
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:833
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:616
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:842
void prefetchAsync(WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
Definition: Worker.h:942
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
std::atomic< int > timesRun_
Definition: Worker.h:600
int timesRun() const
Definition: Worker.h:242
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:923
long double T
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
GlobalContext const * getGlobalContext() const
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:892
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:755
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:912
def move(src, dest)
Definition: eostools.py:511
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
EnableQueueGuard & operator=(EnableQueueGuard const &)=delete
edm::WaitingTaskList & waitingTaskList()
Definition: Worker.h:254
virtual ConcurrencyTypes moduleConcurrencyType() const =0
virtual bool wantsGlobalRuns() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:715
BranchType const & branchType() const
Definition: Principal.h:181
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const =0
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context, oneapi::tbb::task_group *iGroup)
Definition: Worker.h:403
virtual void modulesWhoseProductsAreConsumed(std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const *> const &labelsToDesc) const =0
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0