CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
58 
60 
61 #include <array>
62 #include <atomic>
63 #include <cassert>
64 #include <map>
65 #include <memory>
66 #include <sstream>
67 #include <string>
68 #include <vector>
69 #include <exception>
70 #include <unordered_map>
71 
72 namespace edm {
73  class EventPrincipal;
74  class EventSetupImpl;
75  class EarlyDeleteHelper;
76  class ModuleProcessName;
77  class ProductResolverIndexHelper;
78  class ProductResolverIndexAndSkipBit;
79  class StreamID;
80  class StreamContext;
81  class ProductRegistry;
82  class ThinnedAssociationsHelper;
83 
84  namespace workerhelper {
85  template <typename O>
86  class CallImpl;
87  }
88  namespace eventsetup {
90  }
91 
92  class Worker {
93  public:
94  enum State { Ready, Pass, Fail, Exception };
100 
101  TaskQueueAdaptor() = default;
103  TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
104 
105  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
106 
107  template <class F>
108  void push(oneapi::tbb::task_group& iG, F&& iF) {
109  if (serial_) {
110  serial_->push(iG, iF);
111  } else {
112  limited_->push(iG, iF);
113  }
114  }
115  };
116 
117  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
118  virtual ~Worker();
119 
120  Worker(Worker const&) = delete; // Disallow copying and moving
121  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
122 
123  void clearModule() {
124  moduleValid_ = false;
125  doClearModule();
126  }
127 
128  virtual bool wantsProcessBlocks() const = 0;
129  virtual bool wantsInputProcessBlocks() const = 0;
130  virtual bool wantsGlobalRuns() const = 0;
131  virtual bool wantsGlobalLuminosityBlocks() const = 0;
132  virtual bool wantsStreamRuns() const = 0;
133  virtual bool wantsStreamLuminosityBlocks() const = 0;
134 
135  virtual SerialTaskQueue* globalRunsQueue() = 0;
137 
139  oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, EventPrincipal const*);
140 
142  oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) {
143  assert(false);
144  }
145 
146  template <typename T>
148  typename T::TransitionInfoType const&,
149  ServiceToken const&,
150  StreamID,
151  ParentContext const&,
152  typename T::Context const*);
153 
154  template <typename T>
156  typename T::TransitionInfoType const&,
157  ServiceToken const&,
158  StreamID,
159  ParentContext const&,
160  typename T::Context const*);
161 
162  template <typename T>
163  std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
164  StreamID,
165  ParentContext const&,
166  typename T::Context const*);
167 
168  virtual size_t transformIndex(edm::BranchDescription const&) const = 0;
170  size_t iTransformIndex,
171  EventPrincipal const&,
172  ServiceToken const&,
173  StreamID,
174  ModuleCallingContext const&,
175  StreamContext const*);
176 
178  void skipOnPath(EventPrincipal const& iEvent);
179  void beginJob();
180  void endJob();
181  void beginStream(StreamID id, StreamContext& streamContext);
182  void endStream(StreamID id, StreamContext& streamContext);
187 
188  void reset() {
189  cached_exception_ = std::exception_ptr();
190  state_ = Ready;
192  workStarted_ = false;
194  }
195 
196  void postDoEvent(EventPrincipal const&);
197 
199  if (moduleValid_) {
201  }
202  return nullptr;
203  }
206  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
207 
209 
210  //Used to make EDGetToken work
211  virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
213  virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
214  virtual void resolvePutIndicies(
215  BranchType iBranchType,
216  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
217  iIndicies) = 0;
218 
219  virtual void modulesWhoseProductsAreConsumed(
220  std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
221  std::vector<ModuleProcessName>& modulesInPreviousProcesses,
222  ProductRegistry const& preg,
223  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
224 
225  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
226 
227  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
228 
229  virtual Types moduleType() const = 0;
230  virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
231 
232  void clearCounters() {
233  timesRun_.store(0, std::memory_order_release);
234  timesVisited_.store(0, std::memory_order_release);
235  timesPassed_.store(0, std::memory_order_release);
236  timesFailed_.store(0, std::memory_order_release);
237  timesExcept_.store(0, std::memory_order_release);
238  }
239 
241  //NOTE: calling state() is done to force synchronization across threads
242  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
243  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
244  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
245  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
246  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
247  State state() const { return state_; }
248 
249  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
250 
251  virtual bool hasAccumulator() const = 0;
252 
253  // Used in PuttableProductResolver
255 
256  protected:
257  template <typename O>
259 
260  virtual void doClearModule() = 0;
261 
262  virtual std::string workerType() const = 0;
263  virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
264 
265  virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
266  virtual bool implNeedToRunSelection() const = 0;
267 
268  virtual void implDoAcquire(EventTransitionInfo const&,
269  ModuleCallingContext const*,
271 
273  size_t iTransformIndex,
274  EventPrincipal const&,
275  ParentContext const&,
276  ServiceWeakToken const&) = 0;
277  virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const = 0;
278 
280  virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
282  virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
283  virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
284  virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
285  virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
286  virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
287  virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
288  virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
289  virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
290  virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
291  virtual void implBeginJob() = 0;
292  virtual void implEndJob() = 0;
293  virtual void implBeginStream(StreamID) = 0;
294  virtual void implEndStream(StreamID) = 0;
295 
297 
299 
300  private:
301  template <typename T>
302  bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
303 
304  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
305  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
306 
307  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
308 
309  virtual std::vector<ESResolverIndex> const& esItemsToGetFrom(Transition) const = 0;
310  virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
311 
313  ModuleCallingContext const& moduleCallingContext,
314  Principal const& iPrincipal) const = 0;
315 
316  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
317  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
318  virtual void implRespondToCloseOutputFile() = 0;
319 
321 
322  virtual TaskQueueAdaptor serializeRunModule() = 0;
323 
324  bool shouldRethrowException(std::exception_ptr iPtr,
325  ParentContext const& parentContext,
326  bool isEvent,
327  bool isTryToContinue) const;
329 
330  template <bool IS_EVENT>
331  bool setPassed() {
332  if (IS_EVENT) {
333  timesPassed_.fetch_add(1, std::memory_order_relaxed);
334  }
335  state_ = Pass;
336  return true;
337  }
338 
339  template <bool IS_EVENT>
340  bool setFailed() {
341  if (IS_EVENT) {
342  timesFailed_.fetch_add(1, std::memory_order_relaxed);
343  }
344  state_ = Fail;
345  return false;
346  }
347 
348  template <bool IS_EVENT>
349  std::exception_ptr setException(std::exception_ptr iException) {
350  if (IS_EVENT) {
351  timesExcept_.fetch_add(1, std::memory_order_relaxed);
352  }
353  cached_exception_ = iException; // propagate_const<T> has no reset() function
354  state_ = Exception;
355  return cached_exception_;
356  }
357 
358  template <typename T>
360  ServiceToken const&,
361  ParentContext const&,
362  typename T::TransitionInfoType const&,
363  Transition);
364 
366  void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const;
367 
368  bool needsESPrefetching(Transition iTrans) const noexcept {
369  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
370  }
371 
373  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
374  }
375 
377  actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
379  }
380 
382  actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
384  }
385 
386  virtual bool hasAcquire() const = 0;
387 
388  template <typename T>
389  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
390  typename T::TransitionInfoType const&,
391  StreamID,
392  ParentContext const&,
393  typename T::Context const*);
394 
396 
397  void runAcquireAfterAsyncPrefetch(std::exception_ptr,
398  EventTransitionInfo const&,
399  ParentContext const&,
401 
402  std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const& parentContext);
403 
404  template <typename T>
405  class RunModuleTask : public WaitingTask {
406  public:
408  typename T::TransitionInfoType const& transitionInfo,
409  ServiceToken const& token,
410  StreamID streamID,
411  ParentContext const& parentContext,
412  typename T::Context const* context,
413  oneapi::tbb::task_group* iGroup)
414  : m_worker(worker),
415  m_transitionInfo(transitionInfo),
416  m_streamID(streamID),
417  m_parentContext(parentContext),
420  m_group(iGroup) {}
421 
425  EnableQueueGuard(EnableQueueGuard const&) = delete;
426  EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
428  EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
430  if (queue_) {
431  queue_->resume();
432  }
433  }
434  };
435 
436  void execute() final {
437  //Need to make the services available early so other services can see them
439 
440  //incase the emit causes an exception, we need a memory location
441  // to hold the exception_ptr
442  std::exception_ptr temp_excptr;
443  auto excptr = exceptionPtr();
444  if constexpr (T::isEvent_) {
445  if (!m_worker->hasAcquire()) {
446  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
447  CMS_SA_ALLOW try {
448  //pre was called in prefetchAsync
450  } catch (...) {
451  temp_excptr = std::current_exception();
452  if (not excptr) {
453  excptr = temp_excptr;
454  }
455  }
456  }
457  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
459  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
461  }
462 
463  if (not excptr) {
464  if (auto queue = m_worker->serializeRunModule()) {
465  auto f = [worker = m_worker,
467  streamID = m_streamID,
468  parentContext = m_parentContext,
469  sContext = m_context,
470  serviceToken = m_serviceToken]() {
471  //Need to make the services available
472  ServiceRegistry::Operate operateRunModule(serviceToken.lock());
473 
474  //If needed, we pause the queue in begin transition and resume it
475  // at the end transition. This can guarantee that the module
476  // only processes one run or lumi at a time
478  std::exception_ptr ptr;
479  worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
480  };
481  //keep another global transition from running if necessary
483  if (gQueue) {
484  gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
485  gQueue->pause();
486  queue.push(*group, std::move(f));
487  });
488  } else {
489  queue.push(*m_group, std::move(f));
490  }
491  return;
492  }
493  }
494 
496  }
497 
498  private:
500  typename T::TransitionInfoType m_transitionInfo;
503  typename T::Context const* m_context;
505  oneapi::tbb::task_group* m_group;
506  };
507 
508  // AcquireTask is only used for the Event case, but we define
509  // it as a template so all cases will compile.
510  // DUMMY exists to work around the C++ Standard prohibition on
511  // fully specializing templates nested in other classes.
512  template <typename T, typename DUMMY = void>
513  class AcquireTask : public WaitingTask {
514  public:
516  typename T::TransitionInfoType const&,
517  ServiceToken const&,
518  ParentContext const&,
520  void execute() final {}
521  };
522 
523  template <typename DUMMY>
525  public:
527  EventTransitionInfo const& eventTransitionInfo,
528  ServiceToken const& token,
529  ParentContext const& parentContext,
531  : m_worker(worker),
532  m_eventTransitionInfo(eventTransitionInfo),
533  m_parentContext(parentContext),
534  m_holder(std::move(holder)),
535  m_serviceToken(token) {}
536 
537  void execute() final {
538  //Need to make the services available early so other services can see them
539  ServiceRegistry::Operate guard(m_serviceToken.lock());
540 
541  //incase the emit causes an exception, we need a memory location
542  // to hold the exception_ptr
543  std::exception_ptr temp_excptr;
544  auto excptr = exceptionPtr();
545  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
546  CMS_SA_ALLOW try {
547  //pre was called in prefetchAsync
548  m_worker->emitPostModuleEventPrefetchingSignal();
549  } catch (...) {
550  temp_excptr = std::current_exception();
551  if (not excptr) {
552  excptr = temp_excptr;
553  }
554  }
555 
556  if (not excptr) {
557  if (auto queue = m_worker->serializeRunModule()) {
558  queue.push(*m_holder.group(),
559  [worker = m_worker,
560  info = m_eventTransitionInfo,
561  parentContext = m_parentContext,
562  serviceToken = m_serviceToken,
563  holder = m_holder]() {
564  //Need to make the services available
565  ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
566 
567  std::exception_ptr ptr;
568  worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
569  });
570  return;
571  }
572  }
573 
574  m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
575  }
576 
577  private:
583  };
584 
585  // This class does nothing unless there is an exception originating
586  // in an "External Worker". In that case, it handles converting the
587  // exception to a CMS exception and adding context to the exception.
589  public:
591  oneapi::tbb::task_group* group,
592  WaitingTask* runModuleTask,
593  ParentContext const& parentContext);
594 
595  void execute() final;
596 
597  private:
600  oneapi::tbb::task_group* m_group;
602  };
603 
604  std::atomic<int> timesRun_;
605  std::atomic<int> timesVisited_;
606  std::atomic<int> timesPassed_;
607  std::atomic<int> timesFailed_;
608  std::atomic<int> timesExcept_;
609  std::atomic<State> state_;
611  std::atomic<int> numberOfPathsLeftToRun_;
612 
614 
615  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
616  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
617 
618  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
619 
621 
623  std::atomic<bool> workStarted_;
625  bool moduleValid_ = true;
626  bool shouldTryToContinue_ = false;
627  bool beginSucceeded_ = false;
628  };
629 
630  namespace {
631  template <typename T>
632  class ModuleSignalSentry {
633  public:
634  ModuleSignalSentry(ActivityRegistry* a,
635  typename T::Context const* context,
636  ModuleCallingContext const* moduleCallingContext)
637  : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {}
638 
639  ~ModuleSignalSentry() {
640  // This destructor does nothing unless we are unwinding the
641  // the stack from an earlier exception (a_ will be null if we are
642  // are not). We want to report the earlier exception and ignore any
643  // addition exceptions from the post module signal.
644  CMS_SA_ALLOW try {
645  if (a_) {
646  T::postModuleSignal(a_, context_, moduleCallingContext_);
647  }
648  } catch (...) {
649  }
650  }
651  void preModuleSignal() {
652  if (a_) {
653  T::preModuleSignal(a_, context_, moduleCallingContext_);
654  }
655  }
656  void postModuleSignal() {
657  if (a_) {
658  auto temp = a_;
659  // Setting a_ to null informs the destructor that the signal
660  // was already run and that it should do nothing.
661  a_ = nullptr;
662  T::postModuleSignal(temp, context_, moduleCallingContext_);
663  }
664  }
665 
666  private:
667  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
668  typename T::Context const* context_;
669  ModuleCallingContext const* moduleCallingContext_;
670  };
671 
672  } // namespace
673 
674  namespace workerhelper {
675  template <>
677  public:
679  static bool call(Worker* iWorker,
680  StreamID,
681  EventTransitionInfo const& info,
682  ActivityRegistry* /* actReg */,
683  ModuleCallingContext const* mcc,
684  Arg::Context const* /* context*/) {
685  //Signal sentry is handled by the module
686  return iWorker->implDo(info, mcc);
687  }
688  static void esPrefetchAsync(Worker* worker,
689  WaitingTaskHolder waitingTask,
690  ServiceToken const& token,
691  EventTransitionInfo const& info,
692  Transition transition) {
693  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
694  }
695  static bool wantsTransition(Worker const* iWorker) { return true; }
696  static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
697 
698  static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
699  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
700  };
701 
702  template <>
704  public:
706  static bool call(Worker* iWorker,
707  StreamID,
708  RunTransitionInfo const& info,
709  ActivityRegistry* actReg,
710  ModuleCallingContext const* mcc,
711  Arg::Context const* context) {
712  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
713  // If preModuleSignal() throws, implDoBegin() is not called, and the
714  // cpp destructor calls postModuleSignal (ignoring additional exceptions)
715  cpp.preModuleSignal();
716  // If implDoBegin() throws, the cpp destructor calls postModuleSignal
717  // (ignoring additional exceptions)
718  auto returnValue = iWorker->implDoBegin(info, mcc);
719  // If postModuleSignal() throws, the exception will propagate to the framework
720  cpp.postModuleSignal();
721  return returnValue;
722  }
723  static void esPrefetchAsync(Worker* worker,
724  WaitingTaskHolder waitingTask,
725  ServiceToken const& token,
726  RunTransitionInfo const& info,
727  Transition transition) {
728  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
729  }
730  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
731  static bool needToRunSelection(Worker const* iWorker) { return false; }
732  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
733  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
734  };
735  template <>
737  public:
739  static bool call(Worker* iWorker,
740  StreamID id,
741  RunTransitionInfo const& info,
742  ActivityRegistry* actReg,
743  ModuleCallingContext const* mcc,
744  Arg::Context const* context) {
745  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
746  cpp.preModuleSignal();
747  auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
748  cpp.postModuleSignal();
749  return returnValue;
750  }
751  static void esPrefetchAsync(Worker* worker,
752  WaitingTaskHolder waitingTask,
753  ServiceToken const& token,
754  RunTransitionInfo const& info,
755  Transition transition) {
756  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
757  }
758  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
759  static bool needToRunSelection(Worker const* iWorker) { return false; }
760  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
761  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
762  };
763  template <>
765  public:
767  static bool call(Worker* iWorker,
768  StreamID,
769  RunTransitionInfo const& info,
770  ActivityRegistry* actReg,
771  ModuleCallingContext const* mcc,
772  Arg::Context const* context) {
773  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
774  cpp.preModuleSignal();
775  auto returnValue = iWorker->implDoEnd(info, mcc);
776  cpp.postModuleSignal();
777  return returnValue;
778  }
779  static void esPrefetchAsync(Worker* worker,
780  WaitingTaskHolder waitingTask,
781  ServiceToken const& token,
782  RunTransitionInfo const& info,
783  Transition transition) {
784  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
785  }
786  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
787  static bool needToRunSelection(Worker const* iWorker) { return false; }
788  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
789  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
790  };
791  template <>
793  public:
795  static bool call(Worker* iWorker,
796  StreamID id,
797  RunTransitionInfo const& info,
798  ActivityRegistry* actReg,
799  ModuleCallingContext const* mcc,
800  Arg::Context const* context) {
801  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
802  cpp.preModuleSignal();
803  auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
804  cpp.postModuleSignal();
805  return returnValue;
806  }
807  static void esPrefetchAsync(Worker* worker,
808  WaitingTaskHolder waitingTask,
809  ServiceToken const& token,
810  RunTransitionInfo const& info,
811  Transition transition) {
812  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
813  }
814  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
815  static bool needToRunSelection(Worker const* iWorker) { return false; }
816  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
817  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
818  };
819 
820  template <>
822  public:
824  static bool call(Worker* iWorker,
825  StreamID,
826  LumiTransitionInfo const& info,
827  ActivityRegistry* actReg,
828  ModuleCallingContext const* mcc,
829  Arg::Context const* context) {
830  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
831  cpp.preModuleSignal();
832  auto returnValue = iWorker->implDoBegin(info, mcc);
833  cpp.postModuleSignal();
834  return returnValue;
835  }
836  static void esPrefetchAsync(Worker* worker,
837  WaitingTaskHolder waitingTask,
838  ServiceToken const& token,
839  LumiTransitionInfo const& info,
840  Transition transition) {
841  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
842  }
843  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
844  static bool needToRunSelection(Worker const* iWorker) { return false; }
845  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
846  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
847  };
848  template <>
850  public:
852  static bool call(Worker* iWorker,
853  StreamID id,
854  LumiTransitionInfo const& info,
855  ActivityRegistry* actReg,
856  ModuleCallingContext const* mcc,
857  Arg::Context const* context) {
858  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
859  cpp.preModuleSignal();
860  auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
861  cpp.postModuleSignal();
862  iWorker->beginSucceeded_ = true;
863  return returnValue;
864  }
865  static void esPrefetchAsync(Worker* worker,
866  WaitingTaskHolder waitingTask,
867  ServiceToken const& token,
868  LumiTransitionInfo const& info,
869  Transition transition) {
870  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
871  }
872  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
873  static bool needToRunSelection(Worker const* iWorker) { return false; }
874  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
875  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
876  };
877 
878  template <>
880  public:
882  static bool call(Worker* iWorker,
883  StreamID,
884  LumiTransitionInfo const& info,
885  ActivityRegistry* actReg,
886  ModuleCallingContext const* mcc,
887  Arg::Context const* context) {
888  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
889  cpp.preModuleSignal();
890  auto returnValue = iWorker->implDoEnd(info, mcc);
891  cpp.postModuleSignal();
892  return returnValue;
893  }
894  static void esPrefetchAsync(Worker* worker,
895  WaitingTaskHolder waitingTask,
896  ServiceToken const& token,
897  LumiTransitionInfo const& info,
898  Transition transition) {
899  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
900  }
901  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
902  static bool needToRunSelection(Worker const* iWorker) { return false; }
903  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
904  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
905  };
906  template <>
908  public:
910  static bool call(Worker* iWorker,
911  StreamID id,
912  LumiTransitionInfo const& info,
913  ActivityRegistry* actReg,
914  ModuleCallingContext const* mcc,
915  Arg::Context const* context) {
916  if (iWorker->beginSucceeded_) {
917  iWorker->beginSucceeded_ = false;
918 
919  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
920  cpp.preModuleSignal();
921  auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
922  cpp.postModuleSignal();
923  return returnValue;
924  }
925  return true;
926  }
927  static void esPrefetchAsync(Worker* worker,
928  WaitingTaskHolder waitingTask,
929  ServiceToken const& token,
930  LumiTransitionInfo const& info,
931  Transition transition) {
932  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
933  }
934  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
935  static bool needToRunSelection(Worker const* iWorker) { return false; }
936  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
937  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
938  };
939  template <>
941  public:
943  static bool call(Worker* iWorker,
944  StreamID,
946  ActivityRegistry* actReg,
947  ModuleCallingContext const* mcc,
948  Arg::Context const* context) {
949  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
950  cpp.preModuleSignal();
951  auto returnValue = iWorker->implDoBeginProcessBlock(info.principal(), mcc);
952  cpp.postModuleSignal();
953  return returnValue;
954  }
955  static void esPrefetchAsync(
957  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
958  static bool needToRunSelection(Worker const* iWorker) { return false; }
959  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
960  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
961  };
962  template <>
964  public:
966  static bool call(Worker* iWorker,
967  StreamID,
969  ActivityRegistry* actReg,
970  ModuleCallingContext const* mcc,
971  Arg::Context const* context) {
972  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
973  cpp.preModuleSignal();
974  auto returnValue = iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
975  cpp.postModuleSignal();
976  return returnValue;
977  }
978  static void esPrefetchAsync(
980  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsInputProcessBlocks(); }
981  static bool needToRunSelection(Worker const* iWorker) { return false; }
982  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
983  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
984  };
985  template <>
987  public:
989  static bool call(Worker* iWorker,
990  StreamID,
992  ActivityRegistry* actReg,
993  ModuleCallingContext const* mcc,
994  Arg::Context const* context) {
995  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
996  cpp.preModuleSignal();
997  auto returnValue = iWorker->implDoEndProcessBlock(info.principal(), mcc);
998  cpp.postModuleSignal();
999  return returnValue;
1000  }
1001  static void esPrefetchAsync(
1003  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
1004  static bool needToRunSelection(Worker const* iWorker) { return false; }
1005  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
1006  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
1007  };
1008  } // namespace workerhelper
1009 
1010  template <typename T>
1012  ServiceToken const& token,
1013  ParentContext const& parentContext,
1014  typename T::TransitionInfoType const& transitionInfo,
1015  Transition iTransition) {
1016  Principal const& principal = transitionInfo.principal();
1017 
1019 
1020  if constexpr (T::isEvent_) {
1021  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1022  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
1023  actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1024  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
1025  actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
1026  }
1027 
1028  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
1029  edPrefetchAsync(iTask, token, principal);
1030 
1031  if (principal.branchType() == InEvent) {
1033  }
1034  }
1035 
1036  template <typename T>
1038  typename T::TransitionInfoType const& transitionInfo,
1039  ServiceToken const& token,
1040  StreamID streamID,
1041  ParentContext const& parentContext,
1042  typename T::Context const* context) {
1044  return;
1045  }
1046 
1047  //Need to check workStarted_ before adding to waitingTasks_
1048  bool expected = false;
1049  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
1050 
1052  if constexpr (T::isEvent_) {
1053  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1054  }
1055 
1056  if (workStarted) {
1058 
1059  //if have TriggerResults based selection we want to reject the event before doing prefetching
1061  //We need to run the selection in a different task so that
1062  // we can prefetch the data needed for the selection
1063  WaitingTask* moduleTask =
1064  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1065 
1066  //make sure the task is either run or destroyed
1067  struct DestroyTask {
1068  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
1069 
1070  ~DestroyTask() {
1071  auto p = m_task.exchange(nullptr);
1072  if (p) {
1073  TaskSentry s{p};
1074  }
1075  }
1076 
1077  edm::WaitingTask* release() { return m_task.exchange(nullptr); }
1078 
1079  private:
1080  std::atomic<edm::WaitingTask*> m_task;
1081  };
1082  if constexpr (T::isEvent_) {
1083  if (hasAcquire()) {
1084  auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1085  ServiceWeakToken weakToken = token;
1086  auto* group = task.group();
1087  moduleTask = make_waiting_task(
1088  [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1089  WaitingTaskWithArenaHolder runTaskHolder(
1090  *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1091  AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1092  t.execute();
1093  });
1094  }
1095  }
1096  auto* group = task.group();
1097  auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1098  ServiceWeakToken weakToken = token;
1099  auto selectionTask =
1100  make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1101  std::exception_ptr const*) mutable {
1102  ServiceRegistry::Operate guard(weakToken.lock());
1103  prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1104  weakToken.lock(),
1105  parentContext,
1106  info,
1107  T::transition_);
1108  });
1109  prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1110  } else {
1111  WaitingTask* moduleTask =
1112  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1113  auto group = task.group();
1114  if constexpr (T::isEvent_) {
1115  if (hasAcquire()) {
1116  WaitingTaskWithArenaHolder runTaskHolder(
1117  *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1118  moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1119  }
1120  }
1121  prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1122  }
1123  }
1124  }
1125 
1126  template <typename T>
1127  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1128  typename T::TransitionInfoType const& transitionInfo,
1129  StreamID streamID,
1130  ParentContext const& parentContext,
1131  typename T::Context const* context) {
1132  std::exception_ptr exceptionPtr;
1133  bool shouldRun = true;
1134  if (iEPtr) {
1135  if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1136  exceptionPtr = iEPtr;
1137  setException<T::isEvent_>(exceptionPtr);
1138  shouldRun = false;
1139  } else {
1140  if (not shouldTryToContinue_) {
1141  setPassed<T::isEvent_>();
1142  shouldRun = false;
1143  }
1144  }
1145  }
1146  if (shouldRun) {
1147  // Caught exception is propagated via WaitingTaskList
1148  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1149  exceptionPtr = std::current_exception();
1150  }
1151  } else {
1153  }
1154  waitingTasks_.doneWaiting(exceptionPtr);
1155  return exceptionPtr;
1156  }
1157 
1158  template <typename T>
1160  typename T::TransitionInfoType const& transitionInfo,
1161  ServiceToken const& serviceToken,
1162  StreamID streamID,
1163  ParentContext const& parentContext,
1164  typename T::Context const* context) {
1166  return;
1167  }
1168 
1169  //Need to check workStarted_ before adding to waitingTasks_
1170  bool expected = false;
1171  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1172 
1174  if (workStarted) {
1175  ServiceWeakToken weakToken = serviceToken;
1176  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1177  std::exception_ptr exceptionPtr;
1178  // Caught exception is propagated via WaitingTaskList
1179  CMS_SA_ALLOW try {
1180  //Need to make the services available
1181  ServiceRegistry::Operate guard(weakToken.lock());
1182 
1183  this->runModule<T>(info, streamID, parentContext, context);
1184  } catch (...) {
1185  exceptionPtr = std::current_exception();
1186  }
1187  this->waitingTasks_.doneWaiting(exceptionPtr);
1188  };
1189 
1190  if (needsESPrefetching(T::transition_)) {
1191  auto group = task.group();
1192  auto afterPrefetch =
1193  edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1194  if (iExcept) {
1195  this->waitingTasks_.doneWaiting(*iExcept);
1196  } else {
1197  if (auto queue = this->serializeRunModule()) {
1198  queue.push(*group, toDo);
1199  } else {
1200  group->run(toDo);
1201  }
1202  }
1203  });
1206  WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1207  } else {
1208  auto group = task.group();
1209  if (auto queue = this->serializeRunModule()) {
1210  queue.push(*group, toDo);
1211  } else {
1212  group->run(toDo);
1213  }
1214  }
1215  }
1216  }
1217 
1218  template <typename T>
1219  bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1220  StreamID streamID,
1221  ParentContext const& parentContext,
1222  typename T::Context const* context) {
1223  //unscheduled producers should advance this
1224  //if (T::isEvent_) {
1225  // ++timesVisited_;
1226  //}
1227  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1228  if constexpr (T::isEvent_) {
1229  timesRun_.fetch_add(1, std::memory_order_relaxed);
1230  }
1231 
1232  bool rc = true;
1233  try {
1234  convertException::wrap([&]() {
1236  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1237 
1238  if (rc) {
1239  setPassed<T::isEvent_>();
1240  } else {
1241  setFailed<T::isEvent_>();
1242  }
1243  });
1244  } catch (cms::Exception& ex) {
1246  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1248  setException<T::isEvent_>(std::current_exception());
1249  std::rethrow_exception(cached_exception_);
1250  } else {
1251  rc = setPassed<T::isEvent_>();
1252  }
1253  }
1254 
1255  return rc;
1256  }
1257 
1258  template <typename T>
1259  std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1260  StreamID streamID,
1261  ParentContext const& parentContext,
1262  typename T::Context const* context) {
1263  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1264  std::exception_ptr prefetchingException; // null because there was no prefetching to do
1265  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1266  }
1267 } // namespace edm
1268 #endif
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1259
virtual bool hasAcquire() const =0
void clearModule()
Definition: Worker.h:123
std::atomic< int > timesVisited_
Definition: Worker.h:605
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
virtual bool wantsStreamRuns() const =0
ModuleDescription const * moduleDescription() const
void execute() final
Definition: Worker.h:436
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:272
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1127
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void push(oneapi::tbb::task_group &iG, F &&iF)
Definition: Worker.h:108
T::Context const * m_context
Definition: Worker.h:503
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:679
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:611
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
virtual bool hasAccumulator() const =0
unsigned int ProductResolverIndex
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:349
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:824
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:242
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:526
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:706
void execute() final
Definition: Worker.h:520
virtual ~Worker()
Definition: Worker.cc:109
void endJob()
Definition: Worker.cc:299
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:836
ParentContext const m_parentContext
Definition: Worker.h:502
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:966
std::atomic< int > timesExcept_
Definition: Worker.h:608
oneapi::tbb::task_group * m_group
Definition: Worker.h:505
std::atomic< bool > workStarted_
Definition: Worker.h:623
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:766
std::atomic< int > timesFailed_
Definition: Worker.h:607
void emitPostModuleStreamPrefetchingSignal()
Definition: Worker.h:376
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:368
void addedToPath()
Definition: Worker.h:240
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
void clearCounters()
Definition: Worker.h:232
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1037
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:102
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.cc:403
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
virtual void implRespondToCloseOutputFile()=0
ActivityRegistry * activityRegistry()
Definition: Worker.h:298
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:739
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
Definition: Worker.h:141
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:427
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:316
bool moduleValid_
Definition: Worker.h:625
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
void reset()
Resets access to the resource so that added tasks will wait.
SerialTaskQueueChain * serial_
Definition: Worker.h:98
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:795
virtual std::string workerType() const =0
assert(be >=bs)
ExceptionToActionTable const * actions_
Definition: Worker.h:615
LimitedTaskQueue * limited_
Definition: Worker.h:99
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:624
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:111
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
oneapi::tbb::task_group * m_group
Definition: Worker.h:600
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
BranchType
Definition: BranchType.h:11
int timesPass() const
Definition: Worker.h:249
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.h:515
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:155
std::exception_ptr cached_exception_
Definition: Worker.h:616
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
void emitPostModuleGlobalPrefetchingSignal()
Definition: Worker.h:381
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
bool setFailed()
Definition: Worker.h:340
virtual void selectInputProcessBlocks(ProductRegistry const &, ProcessBlockHelperBase const &)=0
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
Definition: Worker.cc:245
std::exception_ptr exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool resume()
Resumes processing if the queue was paused.
int timesPassed() const
Definition: Worker.h:244
int iEvent
Definition: GenABIO.cc:224
EnableQueueGuard(EnableQueueGuard &&iGuard)
Definition: Worker.h:428
Worker & operator=(Worker const &)=delete
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
T::TransitionInfoType m_transitionInfo
Definition: Worker.h:500
void reset()
Definition: Worker.h:188
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:807
ServiceWeakToken m_serviceToken
Definition: Worker.h:504
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual bool wantsInputProcessBlocks() const =0
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:927
virtual bool wantsProcessBlocks() const =0
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:1001
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:705
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:723
int timesFailed() const
Definition: Worker.h:245
EnableQueueGuard(SerialTaskQueue *iQueue)
Definition: Worker.h:424
virtual std::vector< ConsumesInfo > consumesInfo() const =0
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:364
#define CMS_THREAD_GUARD(_var_)
Transition
Definition: Transition.h:12
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:90
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< State > state_
Definition: Worker.h:609
double f[11][100]
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:177
Definition: Types.py:1
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:204
int timesVisited() const
Definition: Worker.h:243
int numberOfPathsOn_
Definition: Worker.h:610
int timesExcept() const
Definition: Worker.h:246
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom(Transition) const =0
bool setPassed()
Definition: Worker.h:331
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:767
virtual void implEndJob()=0
ServiceToken lock() const
Definition: ServiceToken.h:101
virtual size_t transformIndex(edm::BranchDescription const &) const =0
Definition: Worker.cc:244
ConcurrencyTypes
Definition: Worker.h:96
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:440
virtual Types moduleType() const =0
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:943
ModuleDescription const * description() const
Definition: Worker.h:198
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:103
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:678
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1159
virtual bool implNeedToRunSelection() const =0
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual TaskQueueAdaptor serializeRunModule()=0
virtual SerialTaskQueue * globalRunsQueue()=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:738
virtual void implBeginJob()=0
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:183
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:229
bool beginSucceeded_
Definition: Worker.h:627
#define DUMMY
void respondToCloseOutputFile()
Definition: Worker.h:185
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:373
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:852
virtual void implBeginStream(StreamID)=0
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
Definition: Worker.cc:120
virtual void implEndStream(StreamID)=0
virtual void doClearModule()=0
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:184
virtual bool wantsStreamLuminosityBlocks() const =0
virtual void convertCurrentProcessAlias(std::string const &processName)=0
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1219
HLT enums.
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:372
State state() const
Definition: Worker.h:247
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:865
StreamContext const * getStreamContext() const
void checkForShouldTryToContinue(ModuleDescription const &)
Definition: Worker.cc:113
double a
Definition: hdecay.h:121
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition)
Definition: Worker.h:688
std::atomic< int > timesPassed_
Definition: Worker.h:606
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:388
void beginJob()
Definition: Worker.cc:283
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:340
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:382
virtual bool wantsGlobalLuminosityBlocks() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:779
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:910
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:882
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:620
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:894
void prefetchAsync(WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
Definition: Worker.h:1011
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
std::atomic< int > timesRun_
Definition: Worker.h:604
int timesRun() const
Definition: Worker.h:242
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:989
long double T
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
GlobalContext const * getGlobalContext() const
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:955
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:794
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:978
def move(src, dest)
Definition: eostools.py:511
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
EnableQueueGuard & operator=(EnableQueueGuard const &)=delete
edm::WaitingTaskList & waitingTaskList()
Definition: Worker.h:254
virtual ConcurrencyTypes moduleConcurrencyType() const =0
virtual bool wantsGlobalRuns() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:751
BranchType const & branchType() const
Definition: Principal.h:175
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
bool shouldTryToContinue_
Definition: Worker.h:626
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const =0
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context, oneapi::tbb::task_group *iGroup)
Definition: Worker.h:407
virtual void modulesWhoseProductsAreConsumed(std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const *> const &labelsToDesc) const =0
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0