CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
58 
60 
61 #include <atomic>
62 #include <cassert>
63 #include <map>
64 #include <memory>
65 #include <sstream>
66 #include <string>
67 #include <vector>
68 #include <exception>
69 #include <unordered_map>
70 
71 namespace edm {
72  class EventPrincipal;
73  class EventSetupImpl;
74  class EarlyDeleteHelper;
75  class ModuleProcessName;
76  class ProductResolverIndexHelper;
77  class ProductResolverIndexAndSkipBit;
78  class StreamID;
79  class StreamContext;
80  class ProductRegistry;
81  class ThinnedAssociationsHelper;
82 
83  namespace workerhelper {
84  template <typename O>
85  class CallImpl;
86  }
87  namespace eventsetup {
89  }
90 
91  class Worker {
92  public:
93  enum State { Ready, Pass, Fail, Exception };
99 
100  TaskQueueAdaptor() = default;
102  TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
103 
104  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
105 
106  template <class F>
107  void push(oneapi::tbb::task_group& iG, F&& iF) {
108  if (serial_) {
109  serial_->push(iG, iF);
110  } else {
111  limited_->push(iG, iF);
112  }
113  }
114  };
115 
116  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
117  virtual ~Worker();
118 
119  Worker(Worker const&) = delete; // Disallow copying and moving
120  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
121 
122  void clearModule() {
123  moduleValid_ = false;
124  doClearModule();
125  }
126 
127  virtual bool wantsProcessBlocks() const = 0;
128  virtual bool wantsInputProcessBlocks() const = 0;
129  virtual bool wantsGlobalRuns() const = 0;
130  virtual bool wantsGlobalLuminosityBlocks() const = 0;
131  virtual bool wantsStreamRuns() const = 0;
132  virtual bool wantsStreamLuminosityBlocks() const = 0;
133 
134  virtual SerialTaskQueue* globalRunsQueue() = 0;
136 
138  oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, EventPrincipal const*);
139 
141  oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) {
142  assert(false);
143  }
144 
145  template <typename T>
147  typename T::TransitionInfoType const&,
148  ServiceToken const&,
149  StreamID,
150  ParentContext const&,
151  typename T::Context const*);
152 
153  template <typename T>
155  typename T::TransitionInfoType const&,
156  ServiceToken const&,
157  StreamID,
158  ParentContext const&,
159  typename T::Context const*);
160 
161  template <typename T>
162  std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
163  StreamID,
164  ParentContext const&,
165  typename T::Context const*);
166 
168  void skipOnPath(EventPrincipal const& iEvent);
169  void beginJob();
170  void endJob();
171  void beginStream(StreamID id, StreamContext& streamContext);
172  void endStream(StreamID id, StreamContext& streamContext);
177 
178  void reset() {
179  cached_exception_ = std::exception_ptr();
180  state_ = Ready;
182  workStarted_ = false;
184  }
185 
186  void postDoEvent(EventPrincipal const&);
187 
189  if (moduleValid_) {
191  }
192  return nullptr;
193  }
196  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
197 
199 
200  //Used to make EDGetToken work
201  virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
202  virtual void updateLookup(eventsetup::ESRecordsToProxyIndices const&) = 0;
203  virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
204  virtual void resolvePutIndicies(
205  BranchType iBranchType,
206  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
207  iIndicies) = 0;
208 
209  virtual void modulesWhoseProductsAreConsumed(
210  std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
211  std::vector<ModuleProcessName>& modulesInPreviousProcesses,
212  ProductRegistry const& preg,
213  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
214 
215  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
216 
217  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
218 
219  virtual Types moduleType() const = 0;
220  virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
221 
222  void clearCounters() {
223  timesRun_.store(0, std::memory_order_release);
224  timesVisited_.store(0, std::memory_order_release);
225  timesPassed_.store(0, std::memory_order_release);
226  timesFailed_.store(0, std::memory_order_release);
227  timesExcept_.store(0, std::memory_order_release);
228  }
229 
231  //NOTE: calling state() is done to force synchronization across threads
232  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
233  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
234  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
235  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
236  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
237  State state() const { return state_; }
238 
239  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
240 
241  virtual bool hasAccumulator() const = 0;
242 
243  protected:
244  template <typename O>
246 
247  virtual void doClearModule() = 0;
248 
249  virtual std::string workerType() const = 0;
250  virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
251 
252  virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
253  virtual bool implNeedToRunSelection() const = 0;
254 
255  virtual void implDoAcquire(EventTransitionInfo const&,
256  ModuleCallingContext const*,
258 
260  virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
262  virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
263  virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
264  virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
265  virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
266  virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
267  virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
268  virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
269  virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
270  virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
271  virtual void implBeginJob() = 0;
272  virtual void implEndJob() = 0;
273  virtual void implBeginStream(StreamID) = 0;
274  virtual void implEndStream(StreamID) = 0;
275 
277 
279 
280  private:
281  template <typename T>
282  bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
283 
284  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
285  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
286 
287  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
288 
289  virtual std::vector<ESProxyIndex> const& esItemsToGetFrom(Transition) const = 0;
290  virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
291  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
292 
294  ModuleCallingContext const& moduleCallingContext,
295  Principal const& iPrincipal) const = 0;
296 
297  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
298  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
299  virtual void implRespondToCloseOutputFile() = 0;
300 
302 
303  virtual TaskQueueAdaptor serializeRunModule() = 0;
304 
305  bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const;
306 
307  template <bool IS_EVENT>
308  bool setPassed() {
309  if (IS_EVENT) {
310  timesPassed_.fetch_add(1, std::memory_order_relaxed);
311  }
312  state_ = Pass;
313  return true;
314  }
315 
316  template <bool IS_EVENT>
317  bool setFailed() {
318  if (IS_EVENT) {
319  timesFailed_.fetch_add(1, std::memory_order_relaxed);
320  }
321  state_ = Fail;
322  return false;
323  }
324 
325  template <bool IS_EVENT>
326  std::exception_ptr setException(std::exception_ptr iException) {
327  if (IS_EVENT) {
328  timesExcept_.fetch_add(1, std::memory_order_relaxed);
329  }
330  cached_exception_ = iException; // propagate_const<T> has no reset() function
331  state_ = Exception;
332  return cached_exception_;
333  }
334 
335  template <typename T>
337  ServiceToken const&,
338  ParentContext const&,
339  typename T::TransitionInfoType const&,
340  Transition);
341 
343  void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const;
344 
345  bool needsESPrefetching(Transition iTrans) const noexcept {
346  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
347  }
348 
350  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
351  }
352 
353  virtual bool hasAcquire() const = 0;
354 
355  template <typename T>
356  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const*,
357  typename T::TransitionInfoType const&,
358  StreamID,
359  ParentContext const&,
360  typename T::Context const*);
361 
363 
364  void runAcquireAfterAsyncPrefetch(std::exception_ptr const*,
365  EventTransitionInfo const&,
366  ParentContext const&,
368 
369  std::exception_ptr handleExternalWorkException(std::exception_ptr const* iEPtr, ParentContext const& parentContext);
370 
371  template <typename T>
372  class RunModuleTask : public WaitingTask {
373  public:
375  typename T::TransitionInfoType const& transitionInfo,
376  ServiceToken const& token,
377  StreamID streamID,
378  ParentContext const& parentContext,
379  typename T::Context const* context,
380  oneapi::tbb::task_group* iGroup)
381  : m_worker(worker),
382  m_transitionInfo(transitionInfo),
383  m_streamID(streamID),
384  m_parentContext(parentContext),
385  m_context(context),
386  m_serviceToken(token),
387  m_group(iGroup) {}
388 
392  EnableQueueGuard(EnableQueueGuard const&) = delete;
393  EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
395  EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
397  if (queue_) {
398  queue_->resume();
399  }
400  }
401  };
402 
403  void execute() final {
404  //Need to make the services available early so other services can see them
406 
407  //incase the emit causes an exception, we need a memory location
408  // to hold the exception_ptr
409  std::exception_ptr temp_excptr;
410  auto excptr = exceptionPtr();
411  if constexpr (T::isEvent_) {
412  if (!m_worker->hasAcquire()) {
413  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
414  CMS_SA_ALLOW try {
415  //pre was called in prefetchAsync
417  } catch (...) {
418  temp_excptr = std::current_exception();
419  if (not excptr) {
420  excptr = &temp_excptr;
421  }
422  }
423  }
424  }
425 
426  if (not excptr) {
427  if (auto queue = m_worker->serializeRunModule()) {
428  auto f = [worker = m_worker,
430  streamID = m_streamID,
431  parentContext = m_parentContext,
432  sContext = m_context,
433  serviceToken = m_serviceToken]() {
434  //Need to make the services available
435  ServiceRegistry::Operate operateRunModule(serviceToken.lock());
436 
437  //If needed, we pause the queue in begin transition and resume it
438  // at the end transition. This can guarantee that the module
439  // only processes one run or lumi at a time
441  std::exception_ptr* ptr = nullptr;
442  worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
443  };
444  //keep another global transition from running if necessary
446  if (gQueue) {
447  gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
448  gQueue->pause();
449  queue.push(*group, std::move(f));
450  });
451  } else {
452  queue.push(*m_group, std::move(f));
453  }
454  return;
455  }
456  }
457 
459  }
460 
461  private:
463  typename T::TransitionInfoType m_transitionInfo;
466  typename T::Context const* m_context;
468  oneapi::tbb::task_group* m_group;
469  };
470 
471  // AcquireTask is only used for the Event case, but we define
472  // it as a template so all cases will compile.
473  // DUMMY exists to work around the C++ Standard prohibition on
474  // fully specializing templates nested in other classes.
475  template <typename T, typename DUMMY = void>
476  class AcquireTask : public WaitingTask {
477  public:
479  typename T::TransitionInfoType const&,
480  ServiceToken const&,
481  ParentContext const&,
483  void execute() final {}
484  };
485 
486  template <typename DUMMY>
488  public:
490  EventTransitionInfo const& eventTransitionInfo,
491  ServiceToken const& token,
492  ParentContext const& parentContext,
494  : m_worker(worker),
495  m_eventTransitionInfo(eventTransitionInfo),
496  m_parentContext(parentContext),
497  m_holder(std::move(holder)),
498  m_serviceToken(token) {}
499 
500  void execute() final {
501  //Need to make the services available early so other services can see them
502  ServiceRegistry::Operate guard(m_serviceToken.lock());
503 
504  //incase the emit causes an exception, we need a memory location
505  // to hold the exception_ptr
506  std::exception_ptr temp_excptr;
507  auto excptr = exceptionPtr();
508  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
509  CMS_SA_ALLOW try {
510  //pre was called in prefetchAsync
511  m_worker->emitPostModuleEventPrefetchingSignal();
512  } catch (...) {
513  temp_excptr = std::current_exception();
514  if (not excptr) {
515  excptr = &temp_excptr;
516  }
517  }
518 
519  if (not excptr) {
520  if (auto queue = m_worker->serializeRunModule()) {
521  queue.push(*m_holder.group(),
522  [worker = m_worker,
523  info = m_eventTransitionInfo,
524  parentContext = m_parentContext,
525  serviceToken = m_serviceToken,
526  holder = m_holder]() {
527  //Need to make the services available
528  ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
529 
530  std::exception_ptr* ptr = nullptr;
531  worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
532  });
533  return;
534  }
535  }
536 
537  m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
538  }
539 
540  private:
546  };
547 
548  // This class does nothing unless there is an exception originating
549  // in an "External Worker". In that case, it handles converting the
550  // exception to a CMS exception and adding context to the exception.
552  public:
554  oneapi::tbb::task_group* group,
555  WaitingTask* runModuleTask,
556  ParentContext const& parentContext);
557 
558  void execute() final;
559 
560  private:
563  oneapi::tbb::task_group* m_group;
565  };
566 
567  std::atomic<int> timesRun_;
568  std::atomic<int> timesVisited_;
569  std::atomic<int> timesPassed_;
570  std::atomic<int> timesFailed_;
571  std::atomic<int> timesExcept_;
572  std::atomic<State> state_;
574  std::atomic<int> numberOfPathsLeftToRun_;
575 
577 
578  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
579  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
580 
581  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
582 
584 
586  std::atomic<bool> workStarted_;
589  };
590 
591  namespace {
592  template <typename T>
593  class ModuleSignalSentry {
594  public:
595  ModuleSignalSentry(ActivityRegistry* a,
596  typename T::Context const* context,
597  ModuleCallingContext const* moduleCallingContext)
598  : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
599  if (a_)
600  T::preModuleSignal(a_, context, moduleCallingContext_);
601  }
602 
603  ~ModuleSignalSentry() {
604  if (a_)
605  T::postModuleSignal(a_, context_, moduleCallingContext_);
606  }
607 
608  private:
609  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
610  typename T::Context const* context_;
612  };
613 
614  } // namespace
615 
616  namespace workerhelper {
617  template <>
619  public:
621  static bool call(Worker* iWorker,
622  StreamID,
623  EventTransitionInfo const& info,
624  ActivityRegistry* /* actReg */,
625  ModuleCallingContext const* mcc,
626  Arg::Context const* /* context*/) {
627  //Signal sentry is handled by the module
628  return iWorker->implDo(info, mcc);
629  }
630  static void esPrefetchAsync(Worker* worker,
631  WaitingTaskHolder waitingTask,
632  ServiceToken const& token,
633  EventTransitionInfo const& info,
634  Transition transition) {
635  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
636  }
637  static bool wantsTransition(Worker const* iWorker) { return true; }
638  static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
639 
640  static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
641  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
642  };
643 
644  template <>
646  public:
648  static bool call(Worker* iWorker,
649  StreamID,
650  RunTransitionInfo const& info,
651  ActivityRegistry* actReg,
652  ModuleCallingContext const* mcc,
653  Arg::Context const* context) {
654  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
655  return iWorker->implDoBegin(info, mcc);
656  }
657  static void esPrefetchAsync(Worker* worker,
658  WaitingTaskHolder waitingTask,
659  ServiceToken const& token,
660  RunTransitionInfo const& info,
661  Transition transition) {
662  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
663  }
664  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
665  static bool needToRunSelection(Worker const* iWorker) { return false; }
666  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
667  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
668  };
669  template <>
671  public:
673  static bool call(Worker* iWorker,
674  StreamID id,
675  RunTransitionInfo const& info,
676  ActivityRegistry* actReg,
677  ModuleCallingContext const* mcc,
678  Arg::Context const* context) {
679  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
680  return iWorker->implDoStreamBegin(id, info, mcc);
681  }
682  static void esPrefetchAsync(Worker* worker,
683  WaitingTaskHolder waitingTask,
684  ServiceToken const& token,
685  RunTransitionInfo const& info,
686  Transition transition) {
687  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
688  }
689  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
690  static bool needToRunSelection(Worker const* iWorker) { return false; }
691  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
692  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
693  };
694  template <>
696  public:
698  static bool call(Worker* iWorker,
699  StreamID,
700  RunTransitionInfo const& info,
701  ActivityRegistry* actReg,
702  ModuleCallingContext const* mcc,
703  Arg::Context const* context) {
704  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
705  return iWorker->implDoEnd(info, mcc);
706  }
707  static void esPrefetchAsync(Worker* worker,
708  WaitingTaskHolder waitingTask,
709  ServiceToken const& token,
710  RunTransitionInfo const& info,
711  Transition transition) {
712  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
713  }
714  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
715  static bool needToRunSelection(Worker const* iWorker) { return false; }
716  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
717  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
718  };
719  template <>
721  public:
723  static bool call(Worker* iWorker,
724  StreamID id,
725  RunTransitionInfo const& info,
726  ActivityRegistry* actReg,
727  ModuleCallingContext const* mcc,
728  Arg::Context const* context) {
729  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
730  return iWorker->implDoStreamEnd(id, info, mcc);
731  }
732  static void esPrefetchAsync(Worker* worker,
733  WaitingTaskHolder waitingTask,
734  ServiceToken const& token,
735  RunTransitionInfo const& info,
736  Transition transition) {
737  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
738  }
739  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
740  static bool needToRunSelection(Worker const* iWorker) { return false; }
741  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
742  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
743  };
744 
745  template <>
747  public:
749  static bool call(Worker* iWorker,
750  StreamID,
751  LumiTransitionInfo const& info,
752  ActivityRegistry* actReg,
753  ModuleCallingContext const* mcc,
754  Arg::Context const* context) {
755  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
756  return iWorker->implDoBegin(info, mcc);
757  }
758  static void esPrefetchAsync(Worker* worker,
759  WaitingTaskHolder waitingTask,
760  ServiceToken const& token,
761  LumiTransitionInfo const& info,
762  Transition transition) {
763  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
764  }
765  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
766  static bool needToRunSelection(Worker const* iWorker) { return false; }
767  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
768  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
769  };
770  template <>
772  public:
774  static bool call(Worker* iWorker,
775  StreamID id,
776  LumiTransitionInfo const& info,
777  ActivityRegistry* actReg,
778  ModuleCallingContext const* mcc,
779  Arg::Context const* context) {
780  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
781  return iWorker->implDoStreamBegin(id, info, mcc);
782  }
783  static void esPrefetchAsync(Worker* worker,
784  WaitingTaskHolder waitingTask,
785  ServiceToken const& token,
786  LumiTransitionInfo const& info,
787  Transition transition) {
788  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
789  }
790  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
791  static bool needToRunSelection(Worker const* iWorker) { return false; }
792  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
793  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
794  };
795 
796  template <>
798  public:
800  static bool call(Worker* iWorker,
801  StreamID,
802  LumiTransitionInfo const& info,
803  ActivityRegistry* actReg,
804  ModuleCallingContext const* mcc,
805  Arg::Context const* context) {
806  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
807  return iWorker->implDoEnd(info, mcc);
808  }
809  static void esPrefetchAsync(Worker* worker,
810  WaitingTaskHolder waitingTask,
811  ServiceToken const& token,
812  LumiTransitionInfo const& info,
813  Transition transition) {
814  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
815  }
816  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
817  static bool needToRunSelection(Worker const* iWorker) { return false; }
818  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
819  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
820  };
821  template <>
823  public:
825  static bool call(Worker* iWorker,
826  StreamID id,
827  LumiTransitionInfo const& info,
828  ActivityRegistry* actReg,
829  ModuleCallingContext const* mcc,
830  Arg::Context const* context) {
831  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
832  return iWorker->implDoStreamEnd(id, info, mcc);
833  }
834  static void esPrefetchAsync(Worker* worker,
835  WaitingTaskHolder waitingTask,
836  ServiceToken const& token,
837  LumiTransitionInfo const& info,
838  Transition transition) {
839  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
840  }
841  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
842  static bool needToRunSelection(Worker const* iWorker) { return false; }
843  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
844  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
845  };
846  template <>
848  public:
850  static bool call(Worker* iWorker,
851  StreamID,
853  ActivityRegistry* actReg,
854  ModuleCallingContext const* mcc,
855  Arg::Context const* context) {
856  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
857  return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
858  }
859  static void esPrefetchAsync(
861  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
862  static bool needToRunSelection(Worker const* iWorker) { return false; }
863  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
864  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
865  };
866  template <>
868  public:
870  static bool call(Worker* iWorker,
871  StreamID,
873  ActivityRegistry* actReg,
874  ModuleCallingContext const* mcc,
875  Arg::Context const* context) {
876  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
877  return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
878  }
879  static void esPrefetchAsync(
881  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsInputProcessBlocks(); }
882  static bool needToRunSelection(Worker const* iWorker) { return false; }
883  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
884  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
885  };
886  template <>
888  public:
890  static bool call(Worker* iWorker,
891  StreamID,
893  ActivityRegistry* actReg,
894  ModuleCallingContext const* mcc,
895  Arg::Context const* context) {
896  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
897  return iWorker->implDoEndProcessBlock(info.principal(), mcc);
898  }
899  static void esPrefetchAsync(
901  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
902  static bool needToRunSelection(Worker const* iWorker) { return false; }
903  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
904  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
905  };
906  } // namespace workerhelper
907 
908  template <typename T>
910  ServiceToken const& token,
911  ParentContext const& parentContext,
912  typename T::TransitionInfoType const& transitionInfo,
913  Transition iTransition) {
914  Principal const& principal = transitionInfo.principal();
915 
917 
918  if (principal.branchType() == InEvent) {
919  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
920  }
921 
922  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
923  edPrefetchAsync(iTask, token, principal);
924 
925  if (principal.branchType() == InEvent) {
927  }
928  }
929 
930  template <typename T>
932  typename T::TransitionInfoType const& transitionInfo,
933  ServiceToken const& token,
934  StreamID streamID,
935  ParentContext const& parentContext,
936  typename T::Context const* context) {
938  return;
939  }
940 
941  //Need to check workStarted_ before adding to waitingTasks_
942  bool expected = false;
943  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
944 
945  waitingTasks_.add(task);
946  if constexpr (T::isEvent_) {
947  timesVisited_.fetch_add(1, std::memory_order_relaxed);
948  }
949 
950  if (workStarted) {
952 
953  //if have TriggerResults based selection we want to reject the event before doing prefetching
955  //We need to run the selection in a different task so that
956  // we can prefetch the data needed for the selection
957  WaitingTask* moduleTask =
958  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
959 
960  //make sure the task is either run or destroyed
961  struct DestroyTask {
962  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
963 
964  ~DestroyTask() {
965  auto p = m_task.exchange(nullptr);
966  if (p) {
967  TaskSentry s{p};
968  }
969  }
970 
971  edm::WaitingTask* release() { return m_task.exchange(nullptr); }
972 
973  private:
974  std::atomic<edm::WaitingTask*> m_task;
975  };
976  if constexpr (T::isEvent_) {
977  if (hasAcquire()) {
978  auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
979  ServiceWeakToken weakToken = token;
980  auto* group = task.group();
981  moduleTask = make_waiting_task(
982  [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
983  WaitingTaskWithArenaHolder runTaskHolder(
984  *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
985  AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
986  t.execute();
987  });
988  }
989  }
990  auto* group = task.group();
991  auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
992  ServiceWeakToken weakToken = token;
993  auto selectionTask =
994  make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
995  std::exception_ptr const*) mutable {
996  ServiceRegistry::Operate guard(weakToken.lock());
997  prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
998  weakToken.lock(),
999  parentContext,
1000  info,
1001  T::transition_);
1002  });
1003  prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1004  } else {
1005  WaitingTask* moduleTask =
1006  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1007  auto group = task.group();
1008  if constexpr (T::isEvent_) {
1009  if (hasAcquire()) {
1010  WaitingTaskWithArenaHolder runTaskHolder(
1011  *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1012  moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1013  }
1014  }
1015  prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1016  }
1017  }
1018  }
1019 
1020  template <typename T>
1021  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
1022  typename T::TransitionInfoType const& transitionInfo,
1023  StreamID streamID,
1024  ParentContext const& parentContext,
1025  typename T::Context const* context) {
1026  std::exception_ptr exceptionPtr;
1027  if (iEPtr) {
1028  assert(*iEPtr);
1029  if (shouldRethrowException(*iEPtr, parentContext, T::isEvent_)) {
1030  exceptionPtr = *iEPtr;
1031  setException<T::isEvent_>(exceptionPtr);
1032  } else {
1033  setPassed<T::isEvent_>();
1034  }
1036  } else {
1037  // Caught exception is propagated via WaitingTaskList
1038  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1039  exceptionPtr = std::current_exception();
1040  }
1041  }
1042  waitingTasks_.doneWaiting(exceptionPtr);
1043  return exceptionPtr;
1044  }
1045 
1046  template <typename T>
1048  typename T::TransitionInfoType const& transitionInfo,
1049  ServiceToken const& serviceToken,
1050  StreamID streamID,
1051  ParentContext const& parentContext,
1052  typename T::Context const* context) {
1054  return;
1055  }
1056 
1057  //Need to check workStarted_ before adding to waitingTasks_
1058  bool expected = false;
1059  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1060 
1061  waitingTasks_.add(task);
1062  if (workStarted) {
1063  ServiceWeakToken weakToken = serviceToken;
1064  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1065  std::exception_ptr exceptionPtr;
1066  // Caught exception is propagated via WaitingTaskList
1067  CMS_SA_ALLOW try {
1068  //Need to make the services available
1069  ServiceRegistry::Operate guard(weakToken.lock());
1070 
1071  this->runModule<T>(info, streamID, parentContext, context);
1072  } catch (...) {
1073  exceptionPtr = std::current_exception();
1074  }
1075  this->waitingTasks_.doneWaiting(exceptionPtr);
1076  };
1077 
1078  if (needsESPrefetching(T::transition_)) {
1079  auto group = task.group();
1080  auto afterPrefetch =
1081  edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1082  if (iExcept) {
1083  this->waitingTasks_.doneWaiting(*iExcept);
1084  } else {
1085  if (auto queue = this->serializeRunModule()) {
1086  queue.push(*group, toDo);
1087  } else {
1088  group->run(toDo);
1089  }
1090  }
1091  });
1093  WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1094  } else {
1095  auto group = task.group();
1096  if (auto queue = this->serializeRunModule()) {
1097  queue.push(*group, toDo);
1098  } else {
1099  group->run(toDo);
1100  }
1101  }
1102  }
1103  }
1104 
1105  template <typename T>
1106  bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1107  StreamID streamID,
1108  ParentContext const& parentContext,
1109  typename T::Context const* context) {
1110  //unscheduled producers should advance this
1111  //if (T::isEvent_) {
1112  // ++timesVisited_;
1113  //}
1114  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1115  if constexpr (T::isEvent_) {
1116  timesRun_.fetch_add(1, std::memory_order_relaxed);
1117  }
1118 
1119  bool rc = true;
1120  try {
1121  convertException::wrap([&]() {
1123  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1124 
1125  if (rc) {
1126  setPassed<T::isEvent_>();
1127  } else {
1128  setFailed<T::isEvent_>();
1129  }
1130  });
1131  } catch (cms::Exception& ex) {
1133  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_)) {
1135  setException<T::isEvent_>(std::current_exception());
1136  std::rethrow_exception(cached_exception_);
1137  } else {
1138  rc = setPassed<T::isEvent_>();
1139  }
1140  }
1141 
1142  return rc;
1143  }
1144 
1145  template <typename T>
1146  std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1147  StreamID streamID,
1148  ParentContext const& parentContext,
1149  typename T::Context const* context) {
1150  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1151  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
1152  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1153  }
1154 } // namespace edm
1155 #endif
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1146
virtual bool hasAcquire() const =0
void clearModule()
Definition: Worker.h:122
std::atomic< int > timesVisited_
Definition: Worker.h:568
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
ServiceToken lock() const
Definition: ServiceToken.h:101
virtual bool wantsStreamRuns() const =0
void execute() final
Definition: Worker.h:403
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
tuple array
Definition: mps_check.py:216
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:239
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
void push(oneapi::tbb::task_group &iG, F &&iF)
Definition: Worker.h:107
T::Context const * m_context
Definition: Worker.h:466
ModuleDescription const * description() const
Definition: Worker.h:188
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:621
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:574
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
virtual bool hasAccumulator() const =0
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
StreamContext const * getStreamContext() const
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:326
#define DUMMY
Definition: DMRtrends.cc:34
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:749
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:237
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:489
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:648
void execute() final
Definition: Worker.h:483
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
virtual ~Worker()
Definition: Worker.cc:106
void endJob()
Definition: Worker.cc:263
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:758
ParentContext const m_parentContext
Definition: Worker.h:465
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:870
std::atomic< int > timesExcept_
Definition: Worker.h:571
oneapi::tbb::task_group * m_group
Definition: Worker.h:468
std::atomic< bool > workStarted_
Definition: Worker.h:586
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:697
std::atomic< int > timesFailed_
Definition: Worker.h:570
int timesPassed() const
Definition: Worker.h:234
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:345
void addedToPath()
Definition: Worker.h:230
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:581
State state() const
Definition: Worker.h:237
void clearCounters()
Definition: Worker.h:222
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:931
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.cc:367
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:101
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
virtual void implRespondToCloseOutputFile()=0
ActivityRegistry * activityRegistry()
Definition: Worker.h:278
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:673
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
Definition: Worker.h:140
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:280
bool moduleValid_
Definition: Worker.h:588
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
void reset()
Resets access to the resource so that added tasks will wait.
SerialTaskQueueChain * serial_
Definition: Worker.h:97
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:723
virtual std::string workerType() const =0
assert(be >=bs)
ExceptionToActionTable const * actions_
Definition: Worker.h:578
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
LimitedTaskQueue * limited_
Definition: Worker.h:98
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:587
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:108
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
oneapi::tbb::task_group * m_group
Definition: Worker.h:563
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:576
BranchType
Definition: BranchType.h:11
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.h:478
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:150
std::exception_ptr cached_exception_
Definition: Worker.h:579
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
oneapi::tbb::task_group * group() const noexcept
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
bool setFailed()
Definition: Worker.h:317
virtual void selectInputProcessBlocks(ProductRegistry const &, ProcessBlockHelperBase const &)=0
int timesExcept() const
Definition: Worker.h:236
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
bool resume()
Resumes processing if the queue was paused.
int iEvent
Definition: GenABIO.cc:224
EnableQueueGuard(EnableQueueGuard &&iGuard)
Definition: Worker.h:395
Worker & operator=(Worker const &)=delete
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:585
T::TransitionInfoType m_transitionInfo
Definition: Worker.h:463
void reset()
Definition: Worker.h:178
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:732
int timesVisited() const
Definition: Worker.h:233
ServiceWeakToken m_serviceToken
Definition: Worker.h:467
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual bool wantsInputProcessBlocks() const =0
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:834
BranchType const & branchType() const
Definition: Principal.h:181
virtual bool wantsProcessBlocks() const =0
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:899
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:647
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:657
def move
Definition: eostools.py:511
ModuleDescription const * moduleDescription() const
EnableQueueGuard(SerialTaskQueue *iQueue)
Definition: Worker.h:391
virtual std::vector< ConsumesInfo > consumesInfo() const =0
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:328
#define CMS_THREAD_GUARD(_var_)
Transition
Definition: Transition.h:12
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:89
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
std::atomic< State > state_
Definition: Worker.h:572
int timesRun() const
Definition: Worker.h:232
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:167
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:199
int numberOfPathsOn_
Definition: Worker.h:573
bool setPassed()
Definition: Worker.h:308
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:698
virtual void implEndJob()=0
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:110
areg
Definition: Schedule.cc:687
ConcurrencyTypes
Definition: Worker.h:95
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:406
virtual Types moduleType() const =0
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:850
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1021
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:102
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:620
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1047
tuple group
Definition: watchdog.py:82
virtual bool implNeedToRunSelection() const =0
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
virtual void modulesWhoseProductsAreConsumed(std::array< std::vector< ModuleDescription const * > *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
virtual TaskQueueAdaptor serializeRunModule()=0
virtual SerialTaskQueue * globalRunsQueue()=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:672
virtual void implBeginJob()=0
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:173
int timesPass() const
Definition: Worker.h:239
void respondToCloseOutputFile()
Definition: Worker.h:175
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:337
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:774
virtual void implBeginStream(StreamID)=0
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
ProcessBlockPrincipal & principal()
EventSetupImpl const & eventSetupImpl() const
virtual void implEndStream(StreamID)=0
int timesFailed() const
Definition: Worker.h:235
virtual void doClearModule()=0
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:224
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:174
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent() const =0
virtual bool wantsStreamLuminosityBlocks() const =0
virtual void convertCurrentProcessAlias(std::string const &processName)=0
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1106
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:349
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:783
double a
Definition: hdecay.h:119
EventSetupImpl const & eventSetupImpl() const
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition)
Definition: Worker.h:630
std::atomic< int > timesPassed_
Definition: Worker.h:569
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:352
void beginJob()
Definition: Worker.cc:247
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:304
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:346
virtual bool wantsGlobalLuminosityBlocks() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:707
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:825
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:800
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:583
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:809
void prefetchAsync(WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
Definition: Worker.h:909
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
std::atomic< int > timesRun_
Definition: Worker.h:567
preg
Definition: Schedule.cc:687
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:890
long double T
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:859
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:722
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:879
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
EnableQueueGuard & operator=(EnableQueueGuard const &)=delete
virtual ConcurrencyTypes moduleConcurrencyType() const =0
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:392
EventSetupImpl const & eventSetupImpl() const
virtual bool wantsGlobalRuns() const =0
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:682
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context, oneapi::tbb::task_group *iGroup)
Definition: Worker.h:374
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0