CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
58 
60 
61 #include <array>
62 #include <atomic>
63 #include <cassert>
64 #include <map>
65 #include <memory>
66 #include <sstream>
67 #include <string>
68 #include <vector>
69 #include <exception>
70 #include <unordered_map>
71 
72 namespace edm {
73  class EventPrincipal;
74  class EventSetupImpl;
75  class EarlyDeleteHelper;
76  class ModuleProcessName;
77  class ProductResolverIndexHelper;
78  class ProductResolverIndexAndSkipBit;
79  class StreamID;
80  class StreamContext;
81  class ProductRegistry;
82  class ThinnedAssociationsHelper;
83 
84  namespace workerhelper {
85  template <typename O>
86  class CallImpl;
87  }
88  namespace eventsetup {
90  }
91 
92  class Worker {
93  public:
94  enum State { Ready, Pass, Fail, Exception };
100 
101  TaskQueueAdaptor() = default;
103  TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
104 
105  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
106 
107  template <class F>
108  void push(oneapi::tbb::task_group& iG, F&& iF) {
109  if (serial_) {
110  serial_->push(iG, iF);
111  } else {
112  limited_->push(iG, iF);
113  }
114  }
115  };
116 
117  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
118  virtual ~Worker();
119 
120  Worker(Worker const&) = delete; // Disallow copying and moving
121  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
122 
123  void clearModule() {
124  moduleValid_ = false;
125  doClearModule();
126  }
127 
128  virtual bool wantsProcessBlocks() const = 0;
129  virtual bool wantsInputProcessBlocks() const = 0;
130  virtual bool wantsGlobalRuns() const = 0;
131  virtual bool wantsGlobalLuminosityBlocks() const = 0;
132  virtual bool wantsStreamRuns() const = 0;
133  virtual bool wantsStreamLuminosityBlocks() const = 0;
134 
135  virtual SerialTaskQueue* globalRunsQueue() = 0;
137 
139  oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, EventPrincipal const*);
140 
142  oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) {
143  assert(false);
144  }
145 
146  template <typename T>
148  typename T::TransitionInfoType const&,
149  ServiceToken const&,
150  StreamID,
151  ParentContext const&,
152  typename T::Context const*);
153 
154  template <typename T>
156  typename T::TransitionInfoType const&,
157  ServiceToken const&,
158  StreamID,
159  ParentContext const&,
160  typename T::Context const*);
161 
162  template <typename T>
163  std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
164  StreamID,
165  ParentContext const&,
166  typename T::Context const*);
167 
168  virtual size_t transformIndex(edm::BranchDescription const&) const = 0;
170  size_t iTransformIndex,
171  EventPrincipal const&,
172  ServiceToken const&,
173  StreamID,
174  ModuleCallingContext const&,
175  StreamContext const*);
176 
178  void skipOnPath(EventPrincipal const& iEvent);
179  void beginJob();
180  void endJob();
181  void beginStream(StreamID id, StreamContext& streamContext);
182  void endStream(StreamID id, StreamContext& streamContext);
187 
188  void reset() {
189  cached_exception_ = std::exception_ptr();
190  state_ = Ready;
192  workStarted_ = false;
194  }
195 
196  void postDoEvent(EventPrincipal const&);
197 
199  if (moduleValid_) {
201  }
202  return nullptr;
203  }
206  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
207 
209 
210  //Used to make EDGetToken work
211  virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
213  virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
214  virtual void resolvePutIndicies(
215  BranchType iBranchType,
216  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
217  iIndicies) = 0;
218 
219  virtual void modulesWhoseProductsAreConsumed(
220  std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
221  std::vector<ModuleProcessName>& modulesInPreviousProcesses,
222  ProductRegistry const& preg,
223  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
224 
225  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
226 
227  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
228 
229  virtual Types moduleType() const = 0;
230  virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
231 
232  void clearCounters() {
233  timesRun_.store(0, std::memory_order_release);
234  timesVisited_.store(0, std::memory_order_release);
235  timesPassed_.store(0, std::memory_order_release);
236  timesFailed_.store(0, std::memory_order_release);
237  timesExcept_.store(0, std::memory_order_release);
238  }
239 
241  //NOTE: calling state() is done to force synchronization across threads
242  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
243  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
244  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
245  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
246  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
247  State state() const { return state_; }
248 
249  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
250 
251  virtual bool hasAccumulator() const = 0;
252 
253  // Used in PuttableProductResolver
255 
256  protected:
257  template <typename O>
259 
260  virtual void doClearModule() = 0;
261 
262  virtual std::string workerType() const = 0;
263  virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
264 
265  virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
266  virtual bool implNeedToRunSelection() const = 0;
267 
268  virtual void implDoAcquire(EventTransitionInfo const&,
269  ModuleCallingContext const*,
271 
273  size_t iTransformIndex,
274  EventPrincipal const&,
275  ParentContext const&,
276  ServiceWeakToken const&) = 0;
277  virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const = 0;
278 
280  virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
282  virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
283  virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
284  virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
285  virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
286  virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
287  virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
288  virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
289  virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
290  virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
291  virtual void implBeginJob() = 0;
292  virtual void implEndJob() = 0;
293  virtual void implBeginStream(StreamID) = 0;
294  virtual void implEndStream(StreamID) = 0;
295 
297 
299 
300  private:
301  template <typename T>
302  bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
303 
304  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
305  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
306 
307  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
308 
309  virtual std::vector<ESResolverIndex> const& esItemsToGetFrom(Transition) const = 0;
310  virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
311 
313  ModuleCallingContext const& moduleCallingContext,
314  Principal const& iPrincipal) const = 0;
315 
316  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
317  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
318  virtual void implRespondToCloseOutputFile() = 0;
319 
321 
322  virtual TaskQueueAdaptor serializeRunModule() = 0;
323 
324  bool shouldRethrowException(std::exception_ptr iPtr,
325  ParentContext const& parentContext,
326  bool isEvent,
327  bool isTryToContinue) const;
329 
330  template <bool IS_EVENT>
331  bool setPassed() {
332  if (IS_EVENT) {
333  timesPassed_.fetch_add(1, std::memory_order_relaxed);
334  }
335  state_ = Pass;
336  return true;
337  }
338 
339  template <bool IS_EVENT>
340  bool setFailed() {
341  if (IS_EVENT) {
342  timesFailed_.fetch_add(1, std::memory_order_relaxed);
343  }
344  state_ = Fail;
345  return false;
346  }
347 
348  template <bool IS_EVENT>
349  std::exception_ptr setException(std::exception_ptr iException) {
350  if (IS_EVENT) {
351  timesExcept_.fetch_add(1, std::memory_order_relaxed);
352  }
353  cached_exception_ = iException; // propagate_const<T> has no reset() function
354  state_ = Exception;
355  return cached_exception_;
356  }
357 
358  template <typename T>
360  ServiceToken const&,
361  ParentContext const&,
362  typename T::TransitionInfoType const&,
363  Transition);
364 
366  void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const;
367 
368  bool needsESPrefetching(Transition iTrans) const noexcept {
369  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
370  }
371 
373  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
374  }
375 
377  actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
379  }
380 
382  actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
384  }
385 
386  virtual bool hasAcquire() const = 0;
387 
388  template <typename T>
389  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
390  typename T::TransitionInfoType const&,
391  StreamID,
392  ParentContext const&,
393  typename T::Context const*);
394 
396 
397  void runAcquireAfterAsyncPrefetch(std::exception_ptr,
398  EventTransitionInfo const&,
399  ParentContext const&,
401 
402  std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const& parentContext);
403 
404  template <typename T>
405  class RunModuleTask : public WaitingTask {
406  public:
408  typename T::TransitionInfoType const& transitionInfo,
409  ServiceToken const& token,
410  StreamID streamID,
411  ParentContext const& parentContext,
412  typename T::Context const* context,
413  oneapi::tbb::task_group* iGroup)
414  : m_worker(worker),
415  m_transitionInfo(transitionInfo),
416  m_streamID(streamID),
417  m_parentContext(parentContext),
420  m_group(iGroup) {}
421 
425  EnableQueueGuard(EnableQueueGuard const&) = delete;
426  EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
428  EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
430  if (queue_) {
431  queue_->resume();
432  }
433  }
434  };
435 
436  void execute() final {
437  //Need to make the services available early so other services can see them
439 
440  //incase the emit causes an exception, we need a memory location
441  // to hold the exception_ptr
442  std::exception_ptr temp_excptr;
443  auto excptr = exceptionPtr();
444  if constexpr (T::isEvent_) {
445  if (!m_worker->hasAcquire()) {
446  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
447  CMS_SA_ALLOW try {
448  //pre was called in prefetchAsync
450  } catch (...) {
451  temp_excptr = std::current_exception();
452  if (not excptr) {
453  excptr = temp_excptr;
454  }
455  }
456  }
457  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
459  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
461  }
462 
463  if (not excptr) {
464  if (auto queue = m_worker->serializeRunModule()) {
465  auto f = [worker = m_worker,
467  streamID = m_streamID,
468  parentContext = m_parentContext,
469  sContext = m_context,
470  serviceToken = m_serviceToken]() {
471  //Need to make the services available
472  ServiceRegistry::Operate operateRunModule(serviceToken.lock());
473 
474  //If needed, we pause the queue in begin transition and resume it
475  // at the end transition. This can guarantee that the module
476  // only processes one run or lumi at a time
478  std::exception_ptr ptr;
479  worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
480  };
481  //keep another global transition from running if necessary
483  if (gQueue) {
484  gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
485  gQueue->pause();
486  queue.push(*group, std::move(f));
487  });
488  } else {
489  queue.push(*m_group, std::move(f));
490  }
491  return;
492  }
493  }
494 
496  }
497 
498  private:
500  typename T::TransitionInfoType m_transitionInfo;
503  typename T::Context const* m_context;
505  oneapi::tbb::task_group* m_group;
506  };
507 
508  // AcquireTask is only used for the Event case, but we define
509  // it as a template so all cases will compile.
510  // DUMMY exists to work around the C++ Standard prohibition on
511  // fully specializing templates nested in other classes.
512  template <typename T, typename DUMMY = void>
513  class AcquireTask : public WaitingTask {
514  public:
516  typename T::TransitionInfoType const&,
517  ServiceToken const&,
518  ParentContext const&,
520  void execute() final {}
521  };
522 
523  template <typename DUMMY>
525  public:
527  EventTransitionInfo const& eventTransitionInfo,
528  ServiceToken const& token,
529  ParentContext const& parentContext,
531  : m_worker(worker),
532  m_eventTransitionInfo(eventTransitionInfo),
533  m_parentContext(parentContext),
534  m_holder(std::move(holder)),
535  m_serviceToken(token) {}
536 
537  void execute() final {
538  //Need to make the services available early so other services can see them
539  ServiceRegistry::Operate guard(m_serviceToken.lock());
540 
541  //incase the emit causes an exception, we need a memory location
542  // to hold the exception_ptr
543  std::exception_ptr temp_excptr;
544  auto excptr = exceptionPtr();
545  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
546  CMS_SA_ALLOW try {
547  //pre was called in prefetchAsync
548  m_worker->emitPostModuleEventPrefetchingSignal();
549  } catch (...) {
550  temp_excptr = std::current_exception();
551  if (not excptr) {
552  excptr = temp_excptr;
553  }
554  }
555 
556  if (not excptr) {
557  if (auto queue = m_worker->serializeRunModule()) {
558  queue.push(*m_holder.group(),
559  [worker = m_worker,
560  info = m_eventTransitionInfo,
561  parentContext = m_parentContext,
562  serviceToken = m_serviceToken,
563  holder = m_holder]() {
564  //Need to make the services available
565  ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
566 
567  std::exception_ptr ptr;
568  worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
569  });
570  return;
571  }
572  }
573 
574  m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
575  }
576 
577  private:
583  };
584 
585  // This class does nothing unless there is an exception originating
586  // in an "External Worker". In that case, it handles converting the
587  // exception to a CMS exception and adding context to the exception.
589  public:
591  oneapi::tbb::task_group* group,
592  WaitingTask* runModuleTask,
593  ParentContext const& parentContext);
594 
595  void execute() final;
596 
597  private:
600  oneapi::tbb::task_group* m_group;
602  };
603 
604  std::atomic<int> timesRun_;
605  std::atomic<int> timesVisited_;
606  std::atomic<int> timesPassed_;
607  std::atomic<int> timesFailed_;
608  std::atomic<int> timesExcept_;
609  std::atomic<State> state_;
611  std::atomic<int> numberOfPathsLeftToRun_;
612 
614 
615  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
616  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
617 
618  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
619 
621 
623  std::atomic<bool> workStarted_;
625  bool moduleValid_ = true;
626  bool shouldTryToContinue_ = false;
627  };
628 
629  namespace {
630  template <typename T>
631  class ModuleSignalSentry {
632  public:
633  ModuleSignalSentry(ActivityRegistry* a,
634  typename T::Context const* context,
635  ModuleCallingContext const* moduleCallingContext)
636  : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
637  if (a_)
638  T::preModuleSignal(a_, context, moduleCallingContext_);
639  }
640 
641  ~ModuleSignalSentry() {
642  if (a_)
643  T::postModuleSignal(a_, context_, moduleCallingContext_);
644  }
645 
646  private:
647  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
648  typename T::Context const* context_;
649  ModuleCallingContext const* moduleCallingContext_;
650  };
651 
652  } // namespace
653 
654  namespace workerhelper {
655  template <>
657  public:
659  static bool call(Worker* iWorker,
660  StreamID,
661  EventTransitionInfo const& info,
662  ActivityRegistry* /* actReg */,
663  ModuleCallingContext const* mcc,
664  Arg::Context const* /* context*/) {
665  //Signal sentry is handled by the module
666  return iWorker->implDo(info, mcc);
667  }
668  static void esPrefetchAsync(Worker* worker,
669  WaitingTaskHolder waitingTask,
670  ServiceToken const& token,
671  EventTransitionInfo const& info,
672  Transition transition) {
673  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
674  }
675  static bool wantsTransition(Worker const* iWorker) { return true; }
676  static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
677 
678  static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
679  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
680  };
681 
682  template <>
684  public:
686  static bool call(Worker* iWorker,
687  StreamID,
688  RunTransitionInfo const& info,
689  ActivityRegistry* actReg,
690  ModuleCallingContext const* mcc,
691  Arg::Context const* context) {
692  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
693  return iWorker->implDoBegin(info, mcc);
694  }
695  static void esPrefetchAsync(Worker* worker,
696  WaitingTaskHolder waitingTask,
697  ServiceToken const& token,
698  RunTransitionInfo const& info,
699  Transition transition) {
700  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
701  }
702  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
703  static bool needToRunSelection(Worker const* iWorker) { return false; }
704  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
705  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
706  };
707  template <>
709  public:
711  static bool call(Worker* iWorker,
712  StreamID id,
713  RunTransitionInfo const& info,
714  ActivityRegistry* actReg,
715  ModuleCallingContext const* mcc,
716  Arg::Context const* context) {
717  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
718  return iWorker->implDoStreamBegin(id, info, mcc);
719  }
720  static void esPrefetchAsync(Worker* worker,
721  WaitingTaskHolder waitingTask,
722  ServiceToken const& token,
723  RunTransitionInfo const& info,
724  Transition transition) {
725  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
726  }
727  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
728  static bool needToRunSelection(Worker const* iWorker) { return false; }
729  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
730  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
731  };
732  template <>
734  public:
736  static bool call(Worker* iWorker,
737  StreamID,
738  RunTransitionInfo const& info,
739  ActivityRegistry* actReg,
740  ModuleCallingContext const* mcc,
741  Arg::Context const* context) {
742  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
743  return iWorker->implDoEnd(info, mcc);
744  }
745  static void esPrefetchAsync(Worker* worker,
746  WaitingTaskHolder waitingTask,
747  ServiceToken const& token,
748  RunTransitionInfo const& info,
749  Transition transition) {
750  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
751  }
752  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
753  static bool needToRunSelection(Worker const* iWorker) { return false; }
754  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
755  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
756  };
757  template <>
759  public:
761  static bool call(Worker* iWorker,
762  StreamID id,
763  RunTransitionInfo const& info,
764  ActivityRegistry* actReg,
765  ModuleCallingContext const* mcc,
766  Arg::Context const* context) {
767  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
768  return iWorker->implDoStreamEnd(id, info, mcc);
769  }
770  static void esPrefetchAsync(Worker* worker,
771  WaitingTaskHolder waitingTask,
772  ServiceToken const& token,
773  RunTransitionInfo const& info,
774  Transition transition) {
775  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
776  }
777  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
778  static bool needToRunSelection(Worker const* iWorker) { return false; }
779  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
780  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
781  };
782 
783  template <>
785  public:
787  static bool call(Worker* iWorker,
788  StreamID,
789  LumiTransitionInfo const& info,
790  ActivityRegistry* actReg,
791  ModuleCallingContext const* mcc,
792  Arg::Context const* context) {
793  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
794  return iWorker->implDoBegin(info, mcc);
795  }
796  static void esPrefetchAsync(Worker* worker,
797  WaitingTaskHolder waitingTask,
798  ServiceToken const& token,
799  LumiTransitionInfo const& info,
800  Transition transition) {
801  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
802  }
803  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
804  static bool needToRunSelection(Worker const* iWorker) { return false; }
805  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
806  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
807  };
808  template <>
810  public:
812  static bool call(Worker* iWorker,
813  StreamID id,
814  LumiTransitionInfo const& info,
815  ActivityRegistry* actReg,
816  ModuleCallingContext const* mcc,
817  Arg::Context const* context) {
818  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
819  return iWorker->implDoStreamBegin(id, info, mcc);
820  }
821  static void esPrefetchAsync(Worker* worker,
822  WaitingTaskHolder waitingTask,
823  ServiceToken const& token,
824  LumiTransitionInfo const& info,
825  Transition transition) {
826  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
827  }
828  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
829  static bool needToRunSelection(Worker const* iWorker) { return false; }
830  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
831  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
832  };
833 
834  template <>
836  public:
838  static bool call(Worker* iWorker,
839  StreamID,
840  LumiTransitionInfo const& info,
841  ActivityRegistry* actReg,
842  ModuleCallingContext const* mcc,
843  Arg::Context const* context) {
844  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
845  return iWorker->implDoEnd(info, mcc);
846  }
847  static void esPrefetchAsync(Worker* worker,
848  WaitingTaskHolder waitingTask,
849  ServiceToken const& token,
850  LumiTransitionInfo const& info,
851  Transition transition) {
852  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
853  }
854  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
855  static bool needToRunSelection(Worker const* iWorker) { return false; }
856  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
857  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
858  };
859  template <>
861  public:
863  static bool call(Worker* iWorker,
864  StreamID id,
865  LumiTransitionInfo const& info,
866  ActivityRegistry* actReg,
867  ModuleCallingContext const* mcc,
868  Arg::Context const* context) {
869  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
870  return iWorker->implDoStreamEnd(id, info, mcc);
871  }
872  static void esPrefetchAsync(Worker* worker,
873  WaitingTaskHolder waitingTask,
874  ServiceToken const& token,
875  LumiTransitionInfo const& info,
876  Transition transition) {
877  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
878  }
879  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
880  static bool needToRunSelection(Worker const* iWorker) { return false; }
881  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
882  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
883  };
884  template <>
886  public:
888  static bool call(Worker* iWorker,
889  StreamID,
891  ActivityRegistry* actReg,
892  ModuleCallingContext const* mcc,
893  Arg::Context const* context) {
894  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
895  return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
896  }
897  static void esPrefetchAsync(
899  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
900  static bool needToRunSelection(Worker const* iWorker) { return false; }
901  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
902  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
903  };
904  template <>
906  public:
908  static bool call(Worker* iWorker,
909  StreamID,
911  ActivityRegistry* actReg,
912  ModuleCallingContext const* mcc,
913  Arg::Context const* context) {
914  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
915  return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
916  }
917  static void esPrefetchAsync(
919  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsInputProcessBlocks(); }
920  static bool needToRunSelection(Worker const* iWorker) { return false; }
921  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
922  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
923  };
924  template <>
926  public:
928  static bool call(Worker* iWorker,
929  StreamID,
931  ActivityRegistry* actReg,
932  ModuleCallingContext const* mcc,
933  Arg::Context const* context) {
934  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
935  return iWorker->implDoEndProcessBlock(info.principal(), mcc);
936  }
937  static void esPrefetchAsync(
939  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
940  static bool needToRunSelection(Worker const* iWorker) { return false; }
941  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
942  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
943  };
944  } // namespace workerhelper
945 
946  template <typename T>
948  ServiceToken const& token,
949  ParentContext const& parentContext,
950  typename T::TransitionInfoType const& transitionInfo,
951  Transition iTransition) {
952  Principal const& principal = transitionInfo.principal();
953 
955 
956  if constexpr (T::isEvent_) {
957  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
958  } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
959  actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
960  } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
961  actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
962  }
963 
964  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
965  edPrefetchAsync(iTask, token, principal);
966 
967  if (principal.branchType() == InEvent) {
969  }
970  }
971 
972  template <typename T>
974  typename T::TransitionInfoType const& transitionInfo,
975  ServiceToken const& token,
976  StreamID streamID,
977  ParentContext const& parentContext,
978  typename T::Context const* context) {
980  return;
981  }
982 
983  //Need to check workStarted_ before adding to waitingTasks_
984  bool expected = false;
985  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
986 
988  if constexpr (T::isEvent_) {
989  timesVisited_.fetch_add(1, std::memory_order_relaxed);
990  }
991 
992  if (workStarted) {
994 
995  //if have TriggerResults based selection we want to reject the event before doing prefetching
997  //We need to run the selection in a different task so that
998  // we can prefetch the data needed for the selection
999  WaitingTask* moduleTask =
1000  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1001 
1002  //make sure the task is either run or destroyed
1003  struct DestroyTask {
1004  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
1005 
1006  ~DestroyTask() {
1007  auto p = m_task.exchange(nullptr);
1008  if (p) {
1009  TaskSentry s{p};
1010  }
1011  }
1012 
1013  edm::WaitingTask* release() { return m_task.exchange(nullptr); }
1014 
1015  private:
1016  std::atomic<edm::WaitingTask*> m_task;
1017  };
1018  if constexpr (T::isEvent_) {
1019  if (hasAcquire()) {
1020  auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1021  ServiceWeakToken weakToken = token;
1022  auto* group = task.group();
1023  moduleTask = make_waiting_task(
1024  [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1025  WaitingTaskWithArenaHolder runTaskHolder(
1026  *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1027  AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1028  t.execute();
1029  });
1030  }
1031  }
1032  auto* group = task.group();
1033  auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1034  ServiceWeakToken weakToken = token;
1035  auto selectionTask =
1036  make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1037  std::exception_ptr const*) mutable {
1038  ServiceRegistry::Operate guard(weakToken.lock());
1039  prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1040  weakToken.lock(),
1041  parentContext,
1042  info,
1043  T::transition_);
1044  });
1045  prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1046  } else {
1047  WaitingTask* moduleTask =
1048  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1049  auto group = task.group();
1050  if constexpr (T::isEvent_) {
1051  if (hasAcquire()) {
1052  WaitingTaskWithArenaHolder runTaskHolder(
1053  *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1054  moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1055  }
1056  }
1057  prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1058  }
1059  }
1060  }
1061 
1062  template <typename T>
1063  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1064  typename T::TransitionInfoType const& transitionInfo,
1065  StreamID streamID,
1066  ParentContext const& parentContext,
1067  typename T::Context const* context) {
1068  std::exception_ptr exceptionPtr;
1069  bool shouldRun = true;
1070  if (iEPtr) {
1071  if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1072  exceptionPtr = iEPtr;
1073  setException<T::isEvent_>(exceptionPtr);
1074  shouldRun = false;
1075  } else {
1076  if (not shouldTryToContinue_) {
1077  setPassed<T::isEvent_>();
1078  shouldRun = false;
1079  }
1080  }
1081  }
1082  if (shouldRun) {
1083  // Caught exception is propagated via WaitingTaskList
1084  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1085  exceptionPtr = std::current_exception();
1086  }
1087  } else {
1089  }
1090  waitingTasks_.doneWaiting(exceptionPtr);
1091  return exceptionPtr;
1092  }
1093 
1094  template <typename T>
1096  typename T::TransitionInfoType const& transitionInfo,
1097  ServiceToken const& serviceToken,
1098  StreamID streamID,
1099  ParentContext const& parentContext,
1100  typename T::Context const* context) {
1102  return;
1103  }
1104 
1105  //Need to check workStarted_ before adding to waitingTasks_
1106  bool expected = false;
1107  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1108 
1110  if (workStarted) {
1111  ServiceWeakToken weakToken = serviceToken;
1112  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1113  std::exception_ptr exceptionPtr;
1114  // Caught exception is propagated via WaitingTaskList
1115  CMS_SA_ALLOW try {
1116  //Need to make the services available
1117  ServiceRegistry::Operate guard(weakToken.lock());
1118 
1119  this->runModule<T>(info, streamID, parentContext, context);
1120  } catch (...) {
1121  exceptionPtr = std::current_exception();
1122  }
1123  this->waitingTasks_.doneWaiting(exceptionPtr);
1124  };
1125 
1126  if (needsESPrefetching(T::transition_)) {
1127  auto group = task.group();
1128  auto afterPrefetch =
1129  edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1130  if (iExcept) {
1131  this->waitingTasks_.doneWaiting(*iExcept);
1132  } else {
1133  if (auto queue = this->serializeRunModule()) {
1134  queue.push(*group, toDo);
1135  } else {
1136  group->run(toDo);
1137  }
1138  }
1139  });
1142  WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1143  } else {
1144  auto group = task.group();
1145  if (auto queue = this->serializeRunModule()) {
1146  queue.push(*group, toDo);
1147  } else {
1148  group->run(toDo);
1149  }
1150  }
1151  }
1152  }
1153 
1154  template <typename T>
1155  bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1156  StreamID streamID,
1157  ParentContext const& parentContext,
1158  typename T::Context const* context) {
1159  //unscheduled producers should advance this
1160  //if (T::isEvent_) {
1161  // ++timesVisited_;
1162  //}
1163  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1164  if constexpr (T::isEvent_) {
1165  timesRun_.fetch_add(1, std::memory_order_relaxed);
1166  }
1167 
1168  bool rc = true;
1169  try {
1170  convertException::wrap([&]() {
1172  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1173 
1174  if (rc) {
1175  setPassed<T::isEvent_>();
1176  } else {
1177  setFailed<T::isEvent_>();
1178  }
1179  });
1180  } catch (cms::Exception& ex) {
1182  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1184  setException<T::isEvent_>(std::current_exception());
1185  std::rethrow_exception(cached_exception_);
1186  } else {
1187  rc = setPassed<T::isEvent_>();
1188  }
1189  }
1190 
1191  return rc;
1192  }
1193 
1194  template <typename T>
1195  std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1196  StreamID streamID,
1197  ParentContext const& parentContext,
1198  typename T::Context const* context) {
1199  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1200  std::exception_ptr prefetchingException; // null because there was no prefetching to do
1201  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1202  }
1203 } // namespace edm
1204 #endif
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1195
virtual bool hasAcquire() const =0
void clearModule()
Definition: Worker.h:123
std::atomic< int > timesVisited_
Definition: Worker.h:605
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
virtual bool wantsStreamRuns() const =0
ModuleDescription const * moduleDescription() const
void execute() final
Definition: Worker.h:436
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:270
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1063
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void push(oneapi::tbb::task_group &iG, F &&iF)
Definition: Worker.h:108
T::Context const * m_context
Definition: Worker.h:503
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:659
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:611
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
virtual bool hasAccumulator() const =0
unsigned int ProductResolverIndex
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:349
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:787
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:242
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:526
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:686
void execute() final
Definition: Worker.h:520
virtual ~Worker()
Definition: Worker.cc:109
void endJob()
Definition: Worker.cc:296
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:796
ParentContext const m_parentContext
Definition: Worker.h:502
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:908
std::atomic< int > timesExcept_
Definition: Worker.h:608
oneapi::tbb::task_group * m_group
Definition: Worker.h:505
std::atomic< bool > workStarted_
Definition: Worker.h:623
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:735
std::atomic< int > timesFailed_
Definition: Worker.h:607
void emitPostModuleStreamPrefetchingSignal()
Definition: Worker.h:376
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:368
void addedToPath()
Definition: Worker.h:240
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
void clearCounters()
Definition: Worker.h:232
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:973
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:102
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.cc:400
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
virtual void implRespondToCloseOutputFile()=0
ActivityRegistry * activityRegistry()
Definition: Worker.h:298
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:711
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
Definition: Worker.h:141
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:424
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:313
bool moduleValid_
Definition: Worker.h:625
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
void reset()
Resets access to the resource so that added tasks will wait.
SerialTaskQueueChain * serial_
Definition: Worker.h:98
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:761
virtual std::string workerType() const =0
assert(be >=bs)
ExceptionToActionTable const * actions_
Definition: Worker.h:615
LimitedTaskQueue * limited_
Definition: Worker.h:99
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:624
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:111
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
oneapi::tbb::task_group * m_group
Definition: Worker.h:600
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
BranchType
Definition: BranchType.h:11
int timesPass() const
Definition: Worker.h:249
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.h:515
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:155
std::exception_ptr cached_exception_
Definition: Worker.h:616
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
void emitPostModuleGlobalPrefetchingSignal()
Definition: Worker.h:381
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
bool setFailed()
Definition: Worker.h:340
virtual void selectInputProcessBlocks(ProductRegistry const &, ProcessBlockHelperBase const &)=0
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
Definition: Worker.cc:245
std::exception_ptr exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool resume()
Resumes processing if the queue was paused.
int timesPassed() const
Definition: Worker.h:244
int iEvent
Definition: GenABIO.cc:224
EnableQueueGuard(EnableQueueGuard &&iGuard)
Definition: Worker.h:428
Worker & operator=(Worker const &)=delete
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
T::TransitionInfoType m_transitionInfo
Definition: Worker.h:500
void reset()
Definition: Worker.h:188
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:770
ServiceWeakToken m_serviceToken
Definition: Worker.h:504
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual bool wantsInputProcessBlocks() const =0
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:872
virtual bool wantsProcessBlocks() const =0
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:937
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:685
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:695
int timesFailed() const
Definition: Worker.h:245
EnableQueueGuard(SerialTaskQueue *iQueue)
Definition: Worker.h:424
virtual std::vector< ConsumesInfo > consumesInfo() const =0
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:361
#define CMS_THREAD_GUARD(_var_)
Transition
Definition: Transition.h:12
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:90
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< State > state_
Definition: Worker.h:609
double f[11][100]
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:177
Definition: Types.py:1
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:204
int timesVisited() const
Definition: Worker.h:243
int numberOfPathsOn_
Definition: Worker.h:610
int timesExcept() const
Definition: Worker.h:246
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom(Transition) const =0
bool setPassed()
Definition: Worker.h:331
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:736
virtual void implEndJob()=0
ServiceToken lock() const
Definition: ServiceToken.h:101
virtual size_t transformIndex(edm::BranchDescription const &) const =0
Definition: Worker.cc:244
ConcurrencyTypes
Definition: Worker.h:96
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:437
virtual Types moduleType() const =0
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:888
ModuleDescription const * description() const
Definition: Worker.h:198
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:103
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:658
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1095
virtual bool implNeedToRunSelection() const =0
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual TaskQueueAdaptor serializeRunModule()=0
virtual SerialTaskQueue * globalRunsQueue()=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:710
virtual void implBeginJob()=0
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:183
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:229
#define DUMMY
void respondToCloseOutputFile()
Definition: Worker.h:185
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:370
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:812
virtual void implBeginStream(StreamID)=0
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
Definition: Worker.cc:120
virtual void implEndStream(StreamID)=0
virtual void doClearModule()=0
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:184
virtual bool wantsStreamLuminosityBlocks() const =0
virtual void convertCurrentProcessAlias(std::string const &processName)=0
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1155
HLT enums.
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:372
State state() const
Definition: Worker.h:247
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:821
StreamContext const * getStreamContext() const
void checkForShouldTryToContinue(ModuleDescription const &)
Definition: Worker.cc:113
double a
Definition: hdecay.h:121
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition)
Definition: Worker.h:668
std::atomic< int > timesPassed_
Definition: Worker.h:606
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:385
void beginJob()
Definition: Worker.cc:280
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:337
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:379
virtual bool wantsGlobalLuminosityBlocks() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:745
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:863
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:838
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:620
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:847
void prefetchAsync(WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
Definition: Worker.h:947
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
std::atomic< int > timesRun_
Definition: Worker.h:604
int timesRun() const
Definition: Worker.h:242
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:928
long double T
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
GlobalContext const * getGlobalContext() const
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:897
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:760
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:917
def move(src, dest)
Definition: eostools.py:511
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
EnableQueueGuard & operator=(EnableQueueGuard const &)=delete
edm::WaitingTaskList & waitingTaskList()
Definition: Worker.h:254
virtual ConcurrencyTypes moduleConcurrencyType() const =0
virtual bool wantsGlobalRuns() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:720
BranchType const & branchType() const
Definition: Principal.h:175
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
bool shouldTryToContinue_
Definition: Worker.h:626
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const =0
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context, oneapi::tbb::task_group *iGroup)
Definition: Worker.h:407
virtual void modulesWhoseProductsAreConsumed(std::array< std::vector< ModuleDescription const *> *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const *> const &labelsToDesc) const =0
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0