CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
56 
58 
59 #include <atomic>
60 #include <cassert>
61 #include <map>
62 #include <memory>
63 #include <sstream>
64 #include <string>
65 #include <vector>
66 #include <exception>
67 #include <unordered_map>
68 
69 namespace edm {
70  class EventPrincipal;
71  class EventSetupImpl;
72  class EarlyDeleteHelper;
73  class ProductResolverIndexHelper;
74  class ProductResolverIndexAndSkipBit;
75  class StreamID;
76  class StreamContext;
77  class ProductRegistry;
78  class ThinnedAssociationsHelper;
79 
80  namespace workerhelper {
81  template <typename O>
82  class CallImpl;
83  }
84  namespace eventsetup {
86  }
87 
88  class Worker {
89  public:
90  enum State { Ready, Pass, Fail, Exception };
95 
96  TaskQueueAdaptor() = default;
98  TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
99 
100  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
101 
102  template <class F>
103  void push(F&& iF) {
104  if (serial_) {
105  serial_->push(iF);
106  } else {
107  limited_->push(iF);
108  }
109  }
110  template <class F>
111  void pushAndWait(F&& iF) {
112  if (serial_) {
113  serial_->pushAndWait(iF);
114  } else {
115  limited_->pushAndWait(iF);
116  }
117  }
118  };
119 
120  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
121  virtual ~Worker();
122 
123  Worker(Worker const&) = delete; // Disallow copying and moving
124  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
125 
126  virtual bool wantsProcessBlocks() const = 0;
127  virtual bool wantsInputProcessBlocks() const = 0;
128  virtual bool wantsGlobalRuns() const = 0;
129  virtual bool wantsGlobalLuminosityBlocks() const = 0;
130  virtual bool wantsStreamRuns() const = 0;
131  virtual bool wantsStreamLuminosityBlocks() const = 0;
132 
133  virtual SerialTaskQueue* globalRunsQueue() = 0;
135 
136  template <typename T>
137  bool doWork(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
138 
140 
142  assert(false);
143  }
144 
145  template <typename T>
146  void doWorkAsync(WaitingTask*,
147  typename T::TransitionInfoType const&,
148  ServiceToken const&,
149  StreamID,
150  ParentContext const&,
151  typename T::Context const*);
152 
153  template <typename T>
155  typename T::TransitionInfoType const&,
156  ServiceToken const&,
157  StreamID,
158  ParentContext const&,
159  typename T::Context const*);
160 
161  template <typename T>
162  std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
163  StreamID,
164  ParentContext const&,
165  typename T::Context const*);
166 
168  void skipOnPath(EventPrincipal const& iEvent);
169  void beginJob();
170  void endJob();
171  void beginStream(StreamID id, StreamContext& streamContext);
172  void endStream(StreamID id, StreamContext& streamContext);
175 
177 
178  void reset() {
179  cached_exception_ = std::exception_ptr();
180  state_ = Ready;
182  workStarted_ = false;
184  }
185 
186  void postDoEvent(EventPrincipal const&);
187 
192  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
193 
195 
196  //Used to make EDGetToken work
197  virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
198  virtual void updateLookup(eventsetup::ESRecordsToProxyIndices const&) = 0;
199  virtual void resolvePutIndicies(
200  BranchType iBranchType,
201  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
202  iIndicies) = 0;
203 
204  virtual void modulesWhoseProductsAreConsumed(
205  std::vector<ModuleDescription const*>& modules,
206  ProductRegistry const& preg,
207  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
208 
209  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
210 
211  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
212 
213  virtual Types moduleType() const = 0;
214 
215  void clearCounters() {
216  timesRun_.store(0, std::memory_order_release);
217  timesVisited_.store(0, std::memory_order_release);
218  timesPassed_.store(0, std::memory_order_release);
219  timesFailed_.store(0, std::memory_order_release);
220  timesExcept_.store(0, std::memory_order_release);
221  }
222 
224  //NOTE: calling state() is done to force synchronization across threads
225  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
226  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
227  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
228  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
229  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
230  State state() const { return state_; }
231 
232  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
233 
234  virtual bool hasAccumulator() const = 0;
235 
236  protected:
237  template <typename O>
239  virtual std::string workerType() const = 0;
240  virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
241 
242  virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
243  virtual bool implNeedToRunSelection() const = 0;
244 
245  virtual void implDoAcquire(EventTransitionInfo const&,
246  ModuleCallingContext const*,
248 
250  virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
252  virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
253  virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
254  virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
255  virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
256  virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
257  virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
258  virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
259  virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
260  virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
261  virtual void implBeginJob() = 0;
262  virtual void implEndJob() = 0;
263  virtual void implBeginStream(StreamID) = 0;
264  virtual void implEndStream(StreamID) = 0;
265 
267 
269 
270  private:
271  template <typename T>
272  bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
273 
274  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
275  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
276 
277  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
278 
279  virtual std::vector<ESProxyIndex> const& esItemsToGetFrom(Transition) const = 0;
280  virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
281  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
282 
283  virtual void preActionBeforeRunEventAsync(WaitingTask* iTask,
284  ModuleCallingContext const& moduleCallingContext,
285  Principal const& iPrincipal) const = 0;
286 
287  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
288  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
289 
291 
292  virtual TaskQueueAdaptor serializeRunModule() = 0;
293 
294  static void exceptionContext(cms::Exception& ex, ModuleCallingContext const* mcc);
295 
296  bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const;
297 
298  template <bool IS_EVENT>
299  bool setPassed() {
300  if (IS_EVENT) {
301  timesPassed_.fetch_add(1, std::memory_order_relaxed);
302  }
303  state_ = Pass;
304  return true;
305  }
306 
307  template <bool IS_EVENT>
308  bool setFailed() {
309  if (IS_EVENT) {
310  timesFailed_.fetch_add(1, std::memory_order_relaxed);
311  }
312  state_ = Fail;
313  return false;
314  }
315 
316  template <bool IS_EVENT>
317  std::exception_ptr setException(std::exception_ptr iException) {
318  if (IS_EVENT) {
319  timesExcept_.fetch_add(1, std::memory_order_relaxed);
320  }
321  cached_exception_ = iException; // propagate_const<T> has no reset() function
322  state_ = Exception;
323  return cached_exception_;
324  }
325 
326  template <typename T>
327  void prefetchAsync(
328  WaitingTask*, ServiceToken const&, ParentContext const&, typename T::TransitionInfoType const&, Transition);
329 
331  void edPrefetchAsync(WaitingTask*, ServiceToken const&, Principal const&) const;
332 
333  bool needsESPrefetching(Transition iTrans) const noexcept {
334  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
335  }
336 
338  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
339  }
340 
341  virtual bool hasAcquire() const = 0;
342 
343  template <typename T>
344  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const*,
345  typename T::TransitionInfoType const&,
346  StreamID,
347  ParentContext const&,
348  typename T::Context const*);
349 
351 
352  void runAcquireAfterAsyncPrefetch(std::exception_ptr const*,
353  EventTransitionInfo const&,
354  ParentContext const&,
356 
357  std::exception_ptr handleExternalWorkException(std::exception_ptr const* iEPtr, ParentContext const& parentContext);
358 
359  template <typename T>
360  class RunModuleTask : public WaitingTask {
361  public:
363  typename T::TransitionInfoType const& transitionInfo,
364  ServiceToken const& token,
365  StreamID streamID,
366  ParentContext const& parentContext,
367  typename T::Context const* context)
368  : m_worker(worker),
369  m_transitionInfo(transitionInfo),
370  m_streamID(streamID),
371  m_parentContext(parentContext),
372  m_context(context),
374 
378  EnableQueueGuard(EnableQueueGuard const&) = delete;
379  EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
381  EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
383  if (queue_) {
384  queue_->resume();
385  }
386  }
387  };
388 
389  tbb::task* execute() override {
390  //Need to make the services available early so other services can see them
392 
393  //incase the emit causes an exception, we need a memory location
394  // to hold the exception_ptr
395  std::exception_ptr temp_excptr;
396  auto excptr = exceptionPtr();
397  if constexpr (T::isEvent_) {
398  if (!m_worker->hasAcquire()) {
399  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
400  CMS_SA_ALLOW try {
401  //pre was called in prefetchAsync
403  } catch (...) {
404  temp_excptr = std::current_exception();
405  if (not excptr) {
406  excptr = &temp_excptr;
407  }
408  }
409  }
410  }
411 
412  if (not excptr) {
413  if (auto queue = m_worker->serializeRunModule()) {
414  auto f = [worker = m_worker,
416  streamID = m_streamID,
417  parentContext = m_parentContext,
418  sContext = m_context,
419  serviceToken = m_serviceToken]() {
420  //Need to make the services available
421  ServiceRegistry::Operate operateRunModule(serviceToken);
422 
423  //If needed, we pause the queue in begin transition and resume it
424  // at the end transition. This can guarantee that the module
425  // only processes one run or lumi at a time
427  std::exception_ptr* ptr = nullptr;
428  worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
429  };
430  //keep another global transition from running if necessary
432  if (gQueue) {
433  gQueue->push([queue, gQueue, f]() mutable {
434  gQueue->pause();
435  queue.push(std::move(f));
436  });
437  } else {
438  queue.push(std::move(f));
439  }
440  return nullptr;
441  }
442  }
443 
445  return nullptr;
446  }
447 
448  private:
450  typename T::TransitionInfoType m_transitionInfo;
453  typename T::Context const* m_context;
455  };
456 
457  // AcquireTask is only used for the Event case, but we define
458  // it as a template so all cases will compile.
459  // DUMMY exists to work around the C++ Standard prohibition on
460  // fully specializing templates nested in other classes.
461  template <typename T, typename DUMMY = void>
462  class AcquireTask : public WaitingTask {
463  public:
465  typename T::TransitionInfoType const&,
466  ServiceToken const&,
467  ParentContext const&,
469  tbb::task* execute() override { return nullptr; }
470  };
471 
472  template <typename DUMMY>
474  public:
476  EventTransitionInfo const& eventTransitionInfo,
477  ServiceToken const& token,
478  ParentContext const& parentContext,
480  : m_worker(worker),
481  m_eventTransitionInfo(eventTransitionInfo),
482  m_parentContext(parentContext),
483  m_holder(std::move(holder)),
484  m_serviceToken(token) {}
485 
486  tbb::task* execute() override {
487  //Need to make the services available early so other services can see them
488  ServiceRegistry::Operate guard(m_serviceToken);
489 
490  //incase the emit causes an exception, we need a memory location
491  // to hold the exception_ptr
492  std::exception_ptr temp_excptr;
493  auto excptr = exceptionPtr();
494  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
495  CMS_SA_ALLOW try {
496  //pre was called in prefetchAsync
497  m_worker->emitPostModuleEventPrefetchingSignal();
498  } catch (...) {
499  temp_excptr = std::current_exception();
500  if (not excptr) {
501  excptr = &temp_excptr;
502  }
503  }
504 
505  if (not excptr) {
506  if (auto queue = m_worker->serializeRunModule()) {
507  queue.push([worker = m_worker,
508  info = m_eventTransitionInfo,
509  parentContext = m_parentContext,
510  serviceToken = m_serviceToken,
511  holder = m_holder]() {
512  //Need to make the services available
513  ServiceRegistry::Operate operateRunAcquire(serviceToken);
514 
515  std::exception_ptr* ptr = nullptr;
516  worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
517  });
518  return nullptr;
519  }
520  }
521 
522  m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
523  return nullptr;
524  }
525 
526  private:
532  };
533 
534  // This class does nothing unless there is an exception originating
535  // in an "External Worker". In that case, it handles converting the
536  // exception to a CMS exception and adding context to the exception.
538  public:
539  HandleExternalWorkExceptionTask(Worker* worker, WaitingTask* runModuleTask, ParentContext const& parentContext);
540 
541  tbb::task* execute() override;
542 
543  private:
547  };
548 
549  std::atomic<int> timesRun_;
550  std::atomic<int> timesVisited_;
551  std::atomic<int> timesPassed_;
552  std::atomic<int> timesFailed_;
553  std::atomic<int> timesExcept_;
554  std::atomic<State> state_;
556  std::atomic<int> numberOfPathsLeftToRun_;
557 
559 
560  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
561  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
562 
563  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
564 
566 
568  std::atomic<bool> workStarted_;
570  };
571 
572  namespace {
573  template <typename T>
574  class ModuleSignalSentry {
575  public:
576  ModuleSignalSentry(ActivityRegistry* a,
577  typename T::Context const* context,
578  ModuleCallingContext const* moduleCallingContext)
579  : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
580  if (a_)
581  T::preModuleSignal(a_, context, moduleCallingContext_);
582  }
583 
584  ~ModuleSignalSentry() {
585  if (a_)
586  T::postModuleSignal(a_, context_, moduleCallingContext_);
587  }
588 
589  private:
590  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
591  typename T::Context const* context_;
592  ModuleCallingContext const* moduleCallingContext_;
593  };
594 
595  } // namespace
596 
597  namespace workerhelper {
598  template <>
600  public:
602  static bool call(Worker* iWorker,
603  StreamID,
604  EventTransitionInfo const& info,
605  ActivityRegistry* /* actReg */,
606  ModuleCallingContext const* mcc,
607  Arg::Context const* /* context*/) {
608  //Signal sentry is handled by the module
609  return iWorker->implDo(info, mcc);
610  }
611  static void esPrefetchAsync(Worker* worker,
612  WaitingTask* waitingTask,
613  ServiceToken const& token,
614  EventTransitionInfo const& info,
615  Transition transition) {
616  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
617  }
618  static bool wantsTransition(Worker const* iWorker) { return true; }
619  static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
620 
621  static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
622  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
623  };
624 
625  template <>
627  public:
629  static bool call(Worker* iWorker,
630  StreamID,
631  RunTransitionInfo const& info,
632  ActivityRegistry* actReg,
633  ModuleCallingContext const* mcc,
634  Arg::Context const* context) {
635  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
636  return iWorker->implDoBegin(info, mcc);
637  }
638  static void esPrefetchAsync(Worker* worker,
639  WaitingTask* waitingTask,
640  ServiceToken const& token,
641  RunTransitionInfo const& info,
642  Transition transition) {
643  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
644  }
645  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
646  static bool needToRunSelection(Worker const* iWorker) { return false; }
647  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
648  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
649  };
650  template <>
652  public:
654  static bool call(Worker* iWorker,
655  StreamID id,
656  RunTransitionInfo const& info,
657  ActivityRegistry* actReg,
658  ModuleCallingContext const* mcc,
659  Arg::Context const* context) {
660  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
661  return iWorker->implDoStreamBegin(id, info, mcc);
662  }
663  static void esPrefetchAsync(Worker* worker,
664  WaitingTask* waitingTask,
665  ServiceToken const& token,
666  RunTransitionInfo const& info,
667  Transition transition) {
668  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
669  }
670  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
671  static bool needToRunSelection(Worker const* iWorker) { return false; }
672  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
673  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
674  };
675  template <>
677  public:
679  static bool call(Worker* iWorker,
680  StreamID,
681  RunTransitionInfo const& info,
682  ActivityRegistry* actReg,
683  ModuleCallingContext const* mcc,
684  Arg::Context const* context) {
685  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
686  return iWorker->implDoEnd(info, mcc);
687  }
688  static void esPrefetchAsync(Worker* worker,
689  WaitingTask* waitingTask,
690  ServiceToken const& token,
691  RunTransitionInfo const& info,
692  Transition transition) {
693  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
694  }
695  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
696  static bool needToRunSelection(Worker const* iWorker) { return false; }
697  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
698  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
699  };
700  template <>
702  public:
704  static bool call(Worker* iWorker,
705  StreamID id,
706  RunTransitionInfo const& info,
707  ActivityRegistry* actReg,
708  ModuleCallingContext const* mcc,
709  Arg::Context const* context) {
710  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
711  return iWorker->implDoStreamEnd(id, info, mcc);
712  }
713  static void esPrefetchAsync(Worker* worker,
714  WaitingTask* waitingTask,
715  ServiceToken const& token,
716  RunTransitionInfo const& info,
717  Transition transition) {
718  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
719  }
720  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
721  static bool needToRunSelection(Worker const* iWorker) { return false; }
722  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
723  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
724  };
725 
726  template <>
728  public:
730  static bool call(Worker* iWorker,
731  StreamID,
732  LumiTransitionInfo const& info,
733  ActivityRegistry* actReg,
734  ModuleCallingContext const* mcc,
735  Arg::Context const* context) {
736  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
737  return iWorker->implDoBegin(info, mcc);
738  }
739  static void esPrefetchAsync(Worker* worker,
740  WaitingTask* waitingTask,
741  ServiceToken const& token,
742  LumiTransitionInfo const& info,
743  Transition transition) {
744  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
745  }
746  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
747  static bool needToRunSelection(Worker const* iWorker) { return false; }
748  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
749  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
750  };
751  template <>
753  public:
755  static bool call(Worker* iWorker,
756  StreamID id,
757  LumiTransitionInfo const& info,
758  ActivityRegistry* actReg,
759  ModuleCallingContext const* mcc,
760  Arg::Context const* context) {
761  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
762  return iWorker->implDoStreamBegin(id, info, mcc);
763  }
764  static void esPrefetchAsync(Worker* worker,
765  WaitingTask* waitingTask,
766  ServiceToken const& token,
767  LumiTransitionInfo const& info,
768  Transition transition) {
769  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
770  }
771  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
772  static bool needToRunSelection(Worker const* iWorker) { return false; }
773  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
774  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
775  };
776 
777  template <>
779  public:
781  static bool call(Worker* iWorker,
782  StreamID,
783  LumiTransitionInfo const& info,
784  ActivityRegistry* actReg,
785  ModuleCallingContext const* mcc,
786  Arg::Context const* context) {
787  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
788  return iWorker->implDoEnd(info, mcc);
789  }
790  static void esPrefetchAsync(Worker* worker,
791  WaitingTask* waitingTask,
792  ServiceToken const& token,
793  LumiTransitionInfo const& info,
794  Transition transition) {
795  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
796  }
797  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
798  static bool needToRunSelection(Worker const* iWorker) { return false; }
799  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
800  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
801  };
802  template <>
804  public:
806  static bool call(Worker* iWorker,
807  StreamID id,
808  LumiTransitionInfo const& info,
809  ActivityRegistry* actReg,
810  ModuleCallingContext const* mcc,
811  Arg::Context const* context) {
812  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
813  return iWorker->implDoStreamEnd(id, info, mcc);
814  }
815  static void esPrefetchAsync(Worker* worker,
816  WaitingTask* waitingTask,
817  ServiceToken const& token,
818  LumiTransitionInfo const& info,
819  Transition transition) {
820  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
821  }
822  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
823  static bool needToRunSelection(Worker const* iWorker) { return false; }
824  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
825  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
826  };
827  template <>
829  public:
831  static bool call(Worker* iWorker,
832  StreamID,
834  ActivityRegistry* actReg,
835  ModuleCallingContext const* mcc,
836  Arg::Context const* context) {
837  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
838  return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
839  }
840  static constexpr void esPrefetchAsync(
842  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
843  static bool needToRunSelection(Worker const* iWorker) { return false; }
844  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
845  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
846  };
847  template <>
849  public:
851  static bool call(Worker* iWorker,
852  StreamID,
854  ActivityRegistry* actReg,
855  ModuleCallingContext const* mcc,
856  Arg::Context const* context) {
857  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
858  return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
859  }
860  static constexpr void esPrefetchAsync(
862  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsInputProcessBlocks(); }
863  static bool needToRunSelection(Worker const* iWorker) { return false; }
864  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
865  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
866  };
867  template <>
869  public:
871  static bool call(Worker* iWorker,
872  StreamID,
874  ActivityRegistry* actReg,
875  ModuleCallingContext const* mcc,
876  Arg::Context const* context) {
877  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
878  return iWorker->implDoEndProcessBlock(info.principal(), mcc);
879  }
880  static constexpr void esPrefetchAsync(
882  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
883  static bool needToRunSelection(Worker const* iWorker) { return false; }
884  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
885  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
886  };
887  } // namespace workerhelper
888 
889  template <typename T>
891  ServiceToken const& token,
892  ParentContext const& parentContext,
893  typename T::TransitionInfoType const& transitionInfo,
894  Transition iTransition) {
895  Principal const& principal = transitionInfo.principal();
896 
898 
899  if (principal.branchType() == InEvent) {
900  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
901  }
902 
903  //Need to be sure the ref count isn't set to 0 immediately
904  iTask->increment_ref_count();
905 
906  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
907  edPrefetchAsync(iTask, token, principal);
908 
909  if (principal.branchType() == InEvent) {
911  }
912 
913  if (0 == iTask->decrement_ref_count()) {
914  //if everything finishes before we leave this routine, we need to launch the task
915  tbb::task::spawn(*iTask);
916  }
917  }
918 
919  template <typename T>
921  typename T::TransitionInfoType const& transitionInfo,
922  ServiceToken const& token,
923  StreamID streamID,
924  ParentContext const& parentContext,
925  typename T::Context const* context) {
927  return;
928  }
929 
930  //Need to check workStarted_ before adding to waitingTasks_
931  bool expected = false;
932  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
933 
935  if constexpr (T::isEvent_) {
936  timesVisited_.fetch_add(1, std::memory_order_relaxed);
937  }
938 
939  if (workStarted) {
941 
942  //if have TriggerResults based selection we want to reject the event before doing prefetching
944  //We need to run the selection in a different task so that
945  // we can prefetch the data needed for the selection
946  auto runTask = new (tbb::task::allocate_root())
947  RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context);
948 
949  //make sure the task is either run or destroyed
950  struct DestroyTask {
951  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
952 
953  ~DestroyTask() {
954  auto p = m_task.load();
955  if (p) {
956  tbb::task::destroy(*p);
957  }
958  }
959 
961  auto t = m_task.load();
962  m_task.store(nullptr);
963  return t;
964  }
965 
966  std::atomic<edm::WaitingTask*> m_task;
967  };
968 
969  auto ownRunTask = std::make_shared<DestroyTask>(runTask);
970  auto selectionTask = make_waiting_task(
971  tbb::task::allocate_root(),
972  [ownRunTask, parentContext, info = transitionInfo, token, this](std::exception_ptr const*) mutable {
974  prefetchAsync<T>(ownRunTask->release(), token, parentContext, info, T::transition_);
975  });
976  prePrefetchSelectionAsync(selectionTask, token, streamID, &transitionInfo.principal());
977  } else {
978  WaitingTask* moduleTask = new (tbb::task::allocate_root())
979  RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context);
980  if constexpr (T::isEvent_) {
981  if (hasAcquire()) {
982  WaitingTaskWithArenaHolder runTaskHolder(
983  new (tbb::task::allocate_root()) HandleExternalWorkExceptionTask(this, moduleTask, parentContext));
984  moduleTask = new (tbb::task::allocate_root())
985  AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
986  }
987  }
988  prefetchAsync<T>(moduleTask, token, parentContext, transitionInfo, T::transition_);
989  }
990  }
991  }
992 
993  template <typename T>
994  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
995  typename T::TransitionInfoType const& transitionInfo,
996  StreamID streamID,
997  ParentContext const& parentContext,
998  typename T::Context const* context) {
999  std::exception_ptr exceptionPtr;
1000  if (iEPtr) {
1001  assert(*iEPtr);
1002  if (shouldRethrowException(*iEPtr, parentContext, T::isEvent_)) {
1003  exceptionPtr = *iEPtr;
1004  setException<T::isEvent_>(exceptionPtr);
1005  } else {
1006  setPassed<T::isEvent_>();
1007  }
1009  } else {
1010  // Caught exception is propagated via WaitingTaskList
1011  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1012  exceptionPtr = std::current_exception();
1013  }
1014  }
1015  waitingTasks_.doneWaiting(exceptionPtr);
1016  return exceptionPtr;
1017  }
1018 
1019  template <typename T>
1021  typename T::TransitionInfoType const& transitionInfo,
1022  ServiceToken const& serviceToken,
1023  StreamID streamID,
1024  ParentContext const& parentContext,
1025  typename T::Context const* context) {
1027  return;
1028  }
1029 
1030  //Need to check workStarted_ before adding to waitingTasks_
1031  bool expected = false;
1032  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1033 
1035  if (workStarted) {
1036  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, serviceToken]() {
1037  std::exception_ptr exceptionPtr;
1038  // Caught exception is propagated via WaitingTaskList
1039  CMS_SA_ALLOW try {
1040  //Need to make the services available
1041  ServiceRegistry::Operate guard(serviceToken);
1042 
1043  this->runModule<T>(info, streamID, parentContext, context);
1044  } catch (...) {
1045  exceptionPtr = std::current_exception();
1046  }
1047  this->waitingTasks_.doneWaiting(exceptionPtr);
1048  };
1049 
1050  if (needsESPrefetching(T::transition_)) {
1051  auto afterPrefetch = edm::make_waiting_task(
1052  tbb::task::allocate_root(), [toDo = std::move(toDo), this](std::exception_ptr const* iExcept) {
1053  if (iExcept) {
1054  this->waitingTasks_.doneWaiting(*iExcept);
1055  } else {
1056  if (auto queue = this->serializeRunModule()) {
1057  queue.push(toDo);
1058  } else {
1059  auto taskToDo = make_functor_task(tbb::task::allocate_root(), toDo);
1060  tbb::task::spawn(*taskToDo);
1061  }
1062  }
1063  });
1064  esPrefetchAsync(afterPrefetch, transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1065  } else {
1066  if (auto queue = this->serializeRunModule()) {
1067  queue.push(toDo);
1068  } else {
1069  auto taskToDo = make_functor_task(tbb::task::allocate_root(), toDo);
1070  tbb::task::spawn(*taskToDo);
1071  }
1072  }
1073  }
1074  }
1075 
1076  template <typename T>
1077  bool Worker::doWork(typename T::TransitionInfoType const& transitionInfo,
1078  StreamID streamID,
1079  ParentContext const& parentContext,
1080  typename T::Context const* context) {
1081  if constexpr (T::isEvent_) {
1082  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1083  }
1084  bool rc = false;
1085 
1086  switch (state_) {
1087  case Ready:
1088  break;
1089  case Pass:
1090  return true;
1091  case Fail:
1092  return false;
1093  case Exception: {
1094  std::rethrow_exception(cached_exception_);
1095  }
1096  }
1097 
1098  bool expected = false;
1099  if (not workStarted_.compare_exchange_strong(expected, true)) {
1100  //another thread beat us here
1101  auto waitTask = edm::make_empty_waiting_task();
1102  waitTask->increment_ref_count();
1103 
1104  waitingTasks_.add(waitTask.get());
1105 
1106  waitTask->wait_for_all();
1107 
1108  switch (state_) {
1109  case Ready:
1110  assert(false);
1111  case Pass:
1112  return true;
1113  case Fail:
1114  return false;
1115  case Exception: {
1116  std::rethrow_exception(cached_exception_);
1117  }
1118  }
1119  }
1120 
1121  //Need the context to be set until after any exception is resolved
1123 
1124  auto resetContext = [](ModuleCallingContext* iContext) {
1125  iContext->setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1126  };
1127  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_, resetContext);
1128 
1129  if constexpr (T::isEvent_) {
1130  //if have TriggerResults based selection we want to reject the event before doing prefetching
1132  auto waitTask = edm::make_empty_waiting_task();
1133  waitTask->set_ref_count(2);
1135  waitTask.get(), ServiceRegistry::instance().presentToken(), streamID, &transitionInfo.principal());
1136  waitTask->decrement_ref_count();
1137  waitTask->wait_for_all();
1138 
1139  if (state() != Ready) {
1140  //The selection must have rejected this event
1141  return true;
1142  }
1143  }
1144  auto waitTask = edm::make_empty_waiting_task();
1145  {
1146  //Make sure signal is sent once the prefetching is done
1147  // [the 'pre' signal was sent in prefetchAsync]
1148  //The purpose of this block is to send the signal after wait_for_all
1149  auto sentryFunc = [this](void*) { emitPostModuleEventPrefetchingSignal(); };
1150  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(), sentryFunc);
1151 
1152  //set count to 2 since wait_for_all requires value to not go to 0
1153  waitTask->set_ref_count(2);
1154 
1155  prefetchAsync<T>(
1156  waitTask.get(), ServiceRegistry::instance().presentToken(), parentContext, transitionInfo, T::transition_);
1157  waitTask->decrement_ref_count();
1158  waitTask->wait_for_all();
1159  }
1160  if (waitTask->exceptionPtr() != nullptr) {
1161  if (shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_)) {
1162  setException<T::isEvent_>(*waitTask->exceptionPtr());
1164  std::rethrow_exception(cached_exception_);
1165  } else {
1166  setPassed<T::isEvent_>();
1167  waitingTasks_.doneWaiting(nullptr);
1168  return true;
1169  }
1170  }
1171  }
1172 
1173  //successful prefetch so no reset necessary
1174  prefetchSentry.release();
1175  if (auto queue = serializeRunModule()) {
1176  auto serviceToken = ServiceRegistry::instance().presentToken();
1177  queue.pushAndWait([&]() {
1178  //Need to make the services available
1179  ServiceRegistry::Operate guard(serviceToken);
1180  // This try-catch is primarily for paranoia: runModule() deals internally with exceptions, except for those coming from Service signal actions, which are not supposed to throw exceptions
1181  CMS_SA_ALLOW try { rc = runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1182  }
1183  });
1184  } else {
1185  // This try-catch is primarily for paranoia: runModule() deals internally with exceptions, except for those coming from Service signal actions, which are not supposed to throw exceptions
1186  CMS_SA_ALLOW try { rc = runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1187  }
1188  }
1189  if (state_ == Exception) {
1191  std::rethrow_exception(cached_exception_);
1192  }
1193 
1194  waitingTasks_.doneWaiting(nullptr);
1195  return rc;
1196  }
1197 
1198  template <typename T>
1199  bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1200  StreamID streamID,
1201  ParentContext const& parentContext,
1202  typename T::Context const* context) {
1203  //unscheduled producers should advance this
1204  //if (T::isEvent_) {
1205  // ++timesVisited_;
1206  //}
1207  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1208  if constexpr (T::isEvent_) {
1209  timesRun_.fetch_add(1, std::memory_order_relaxed);
1210  }
1211 
1212  bool rc = true;
1213  try {
1214  convertException::wrap([&]() {
1216  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1217 
1218  if (rc) {
1219  setPassed<T::isEvent_>();
1220  } else {
1221  setFailed<T::isEvent_>();
1222  }
1223  });
1224  } catch (cms::Exception& ex) {
1226  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_)) {
1228  setException<T::isEvent_>(std::current_exception());
1229  std::rethrow_exception(cached_exception_);
1230  } else {
1231  rc = setPassed<T::isEvent_>();
1232  }
1233  }
1234 
1235  return rc;
1236  }
1237 
1238  template <typename T>
1239  std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1240  StreamID streamID,
1241  ParentContext const& parentContext,
1242  typename T::Context const* context) {
1243  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1244  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
1245  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1246  }
1247 } // namespace edm
1248 #endif
edm::Worker::RunModuleTask::EnableQueueGuard::operator=
EnableQueueGuard & operator=(EnableQueueGuard const &)=delete
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:882
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:739
edm::EventTransitionInfo
Definition: TransitionInfoTypes.h:26
edm::Worker::RunModuleTask
Definition: Worker.h:360
edm::Worker::consumesInfo
virtual std::vector< ConsumesInfo > consumesInfo() const =0
edm::Worker::HandleExternalWorkExceptionTask::execute
tbb::task * execute() override
Definition: Worker.cc:510
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::call
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:654
edm::StreamID
Definition: StreamID.h:30
edm::Worker::TaskQueueAdaptor::push
void push(F &&iF)
Definition: Worker.h:103
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync
static constexpr void esPrefetchAsync(Worker *, WaitingTask *, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:840
edm::Worker::itemsToGetForSelection
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::call
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:806
edm::Worker::implDoPrePrefetchSelection
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
ModuleCallingContext.h
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:356
edm::Worker::HandleExternalWorkExceptionTask::HandleExternalWorkExceptionTask
HandleExternalWorkExceptionTask(Worker *worker, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:505
electrons_cff.bool
bool
Definition: electrons_cff.py:393
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::Arg
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:678
ServiceRegistry.h
edm::Worker::esPrefetchAsync
void esPrefetchAsync(WaitingTask *, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:247
edm::Worker::itemsMayGet
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
MessageLogger.h
edm::Worker::doWorkAsync
void doWorkAsync(WaitingTask *, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:920
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:397
edm::eventsetup::ESRecordsToProxyIndices
Definition: ESRecordsToProxyIndices.h:35
edm::ModuleCallingContext::State::kPrefetching
edm::ProcessBlockTransitionInfo
Definition: TransitionInfoTypes.h:80
edm::Worker::AcquireTask::execute
tbb::task * execute() override
Definition: Worker.h:469
edm::Worker::implDoAccessInputProcessBlock
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:885
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::Arg
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:601
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:648
edm::Worker::runModule
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1199
edm::Worker::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:563
edm::Worker::wantsGlobalRuns
virtual bool wantsGlobalRuns() const =0
BranchType.h
edm::Worker::implEndJob
virtual void implEndJob()=0
edm::Worker::convertCurrentProcessAlias
virtual void convertCurrentProcessAlias(std::string const &processName)=0
f
double f[11][100]
Definition: MuScleFitUtils.cc:78
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:843
edm::ModuleContextSentry
Definition: ModuleContextSentry.h:11
edm::LumiTransitionInfo
Definition: TransitionInfoTypes.h:42
propagate_const.h
edm::EventSetupImpl
Definition: EventSetupImpl.h:48
edm::Worker::prePrefetchSelectionAsync
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
Definition: Worker.h:141
edm::Worker::HandleExternalWorkExceptionTask::m_worker
Worker * m_worker
Definition: Worker.h:544
PlaceInPathContext.h
edm::Worker::TaskQueueAdaptor::pushAndWait
void pushAndWait(F &&iF)
Definition: Worker.h:111
edm::OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:71
edm::Worker::timesPassed_
std::atomic< int > timesPassed_
Definition: Worker.h:551
modules
Definition: ZHLTMatchFilter.cc:17
edm::Worker::Fail
Definition: Worker.h:90
edm::Worker::TaskQueueAdaptor::limited_
LimitedTaskQueue * limited_
Definition: Worker.h:94
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::Worker::runModuleDirectly
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1239
edm::Worker::beginJob
void beginJob()
Definition: Worker.cc:349
edm::Worker::preActionBeforeRunEventAsync
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:35
edm::Worker::timesVisited_
std::atomic< int > timesVisited_
Definition: Worker.h:550
cms::cuda::stream
cudaStream_t stream
Definition: HistoContainer.h:57
edm::Worker::Exception
Definition: Worker.h:90
edm::WaitingTaskList::add
void add(WaitingTask *)
Adds task to the waiting list.
Definition: WaitingTaskList.cc:90
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:773
edm::Worker::wantsGlobalLuminosityBlocks
virtual bool wantsGlobalLuminosityBlocks() const =0
edm::Worker::RunModuleTask::m_serviceToken
ServiceToken m_serviceToken
Definition: Worker.h:454
edm::Worker::resetModuleDescription
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:341
ProductResolverIndex.h
edm::Worker::wantsInputProcessBlocks
virtual bool wantsInputProcessBlocks() const =0
edm::Worker::postDoEvent
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:445
edm::Worker::actions_
ExceptionToActionTable const * actions_
Definition: Worker.h:560
FunctorTask.h
edm::Worker::implBeginJob
virtual void implBeginJob()=0
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:884
edm::LuminosityBlockPrincipal
Definition: LuminosityBlockPrincipal.h:31
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:647
edm::make_functor_task
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
edm::Worker::prefetchAsync
void prefetchAsync(WaitingTask *, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
Definition: Worker.h:890
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::esPrefetchAsync
static constexpr void esPrefetchAsync(Worker *, WaitingTask *, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:860
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:722
edm::Worker::Worker
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:91
cms::cuda::assert
assert(be >=bs)
edm::Worker::respondToOpenInputFile
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:173
edm::Worker::setException
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:317
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:274
edm::Worker::RunModuleTask::EnableQueueGuard
Definition: Worker.h:375
edm::Worker::itemsToGet
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
edm::Principal
Definition: Principal.h:57
edm::Worker::numberOfPathsLeftToRun_
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:556
edm::Worker::registerThinnedAssociations
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:427
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_serviceToken
ServiceToken m_serviceToken
Definition: Worker.h:531
edm::SerialTaskQueue
Definition: SerialTaskQueue.h:67
edm::Worker::RunModuleTask::execute
tbb::task * execute() override
Definition: Worker.h:389
edm::Worker::timesVisited
int timesVisited() const
Definition: Worker.h:226
edm::Worker::kProducer
Definition: Worker.h:91
edm::Worker::State
State
Definition: Worker.h:90
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:845
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::call
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:781
edm::Worker::RunModuleTask::EnableQueueGuard::~EnableQueueGuard
~EnableQueueGuard()
Definition: Worker.h:382
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:697
edm::ModuleCallingContext::State::kInvalid
edm::ProductResolverIndexHelper
Definition: ProductResolverIndexHelper.h:89
edm::ModuleCallingContext::moduleDescription
ModuleDescription const * moduleDescription() const
Definition: ModuleCallingContext.h:50
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_parentContext
const ParentContext m_parentContext
Definition: Worker.h:529
edm::Worker::implDoBegin
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
edm::Worker::timesExcept
int timesExcept() const
Definition: Worker.h:229
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::Arg
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:653
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:646
edm::RunTransitionInfo
Definition: TransitionInfoTypes.h:64
edm::Worker::wantsStreamLuminosityBlocks
virtual bool wantsStreamLuminosityBlocks() const =0
edm::BranchActionGlobalBegin
Definition: BranchActionType.h:12
ESIndices.h
edm::ProcessBlockPrincipal
Definition: ProcessBlockPrincipal.h:22
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:862
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:471
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:696
edm::Worker::implEndStream
virtual void implEndStream(StreamID)=0
edm::Worker::workStarted_
std::atomic< bool > workStarted_
Definition: Worker.h:568
edm::Worker::itemsShouldPutInEvent
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent() const =0
edm::BranchType
BranchType
Definition: BranchType.h:11
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:673
edm::ModuleDescription
Definition: ModuleDescription.h:21
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:824
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:748
edm::Worker::reset
void reset()
Definition: Worker.h:178
edm::Worker::operator=
Worker & operator=(Worker const &)=delete
edm::Exception
Definition: EDMException.h:77
edm::WaitingTaskList::reset
void reset()
Resets access to the resource so that added tasks will wait.
Definition: WaitingTaskList.cc:52
F
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
edm::OccurrenceTraits< RunPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:112
edm::WaitingTaskList
Definition: WaitingTaskList.h:101
edm::ProductRegistry
Definition: ProductRegistry.h:37
edm::Worker::implRespondToCloseInputFile
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
ModuleDescription.h
ActivityRegistry.h
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:825
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:797
edm::FileBlock
Definition: FileBlock.h:20
edm::Worker::AcquireTask
Definition: Worker.h:462
edm::ServiceToken
Definition: ServiceToken.h:40
edm::WaitingTaskWithArenaHolder
Definition: WaitingTaskWithArenaHolder.h:31
edm::propagate_const
Definition: propagate_const.h:32
edm::Worker::implDoStreamEnd
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
edm::Worker::kFilter
Definition: Worker.h:91
edm::SerialTaskQueueChain::pushAndWait
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:97
edm::Worker::runAcquireAfterAsyncPrefetch
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.cc:466
edm::Worker::implDoBeginProcessBlock
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
edm::LimitedTaskQueue
Definition: LimitedTaskQueue.h:39
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:864
edm::WaitingTask::exceptionPtr
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:883
edm::Worker::edPrefetchAsync
void edPrefetchAsync(WaitingTask *, ServiceToken const &, Principal const &) const
Definition: Worker.cc:326
edm::EventPrincipal
Definition: EventPrincipal.h:46
edm::Worker::earlyDeleteHelper_
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:565
edm::OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:192
edm::Worker::setActivityRegistry
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:110
edm::StreamContext
Definition: StreamContext.h:31
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:863
edm::BranchActionProcessBlockInput
Definition: BranchActionType.h:16
edm::Worker::timesRun_
std::atomic< int > timesRun_
Definition: Worker.h:549
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:619
edm::SerialTaskQueueChain::push
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:86
edm::Worker::callWhenDoneAsync
void callWhenDoneAsync(WaitingTask *task)
Definition: Worker.h:167
edm::Worker::AcquireTask::AcquireTask
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.h:464
edm::Worker::resolvePutIndicies
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:618
edm::Worker::kAnalyzer
Definition: Worker.h:91
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:695
edm::Worker::modulesWhoseProductsAreConsumed
virtual void modulesWhoseProductsAreConsumed(std::vector< ModuleDescription const * > &modules, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
edm::Worker::state_
std::atomic< State > state_
Definition: Worker.h:554
edm::ActivityRegistry
Definition: ActivityRegistry.h:133
edm::Principal::branchType
BranchType const & branchType() const
Definition: Principal.h:181
edm::LimitedTaskQueue::pushAndWait
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
Definition: LimitedTaskQueue.h:139
ProductResolverIndexAndSkipBit.h
edm::Worker::HandleExternalWorkExceptionTask
Definition: Worker.h:537
edm::Worker::endJob
void endJob()
Definition: Worker.cc:364
edm::OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:38
edm::WaitingTaskList::doneWaiting
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
Definition: WaitingTaskList.cc:170
edm::Worker::descPtr
ModuleDescription const * descPtr() const
Definition: Worker.h:189
edm::Worker::HandleExternalWorkExceptionTask::m_parentContext
const ParentContext m_parentContext
Definition: Worker.h:546
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::Arg
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:703
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:671
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::AcquireTask
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:475
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:772
edm::InEvent
Definition: BranchType.h:11
Transition.h
edm::OccurrenceTraits
Definition: OccurrenceTraits.h:35
edm::Worker::RunModuleTask::m_worker
Worker * m_worker
Definition: Worker.h:449
ConvertException.h
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::call
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:602
edm::Worker::Pass
Definition: Worker.h:90
edm::OccurrenceTraits< RunPrincipal, BranchActionStreamEnd >
Definition: OccurrenceTraits.h:152
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::call
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:851
LimitedTaskQueue.h
edm::Worker
Definition: Worker.h:88
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
edm::Worker::timesExcept_
std::atomic< int > timesExcept_
Definition: Worker.h:553
edm::Worker::implDoEnd
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
createBeamHaloJobs.queue
queue
Definition: createBeamHaloJobs.py:343
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:638
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:747
edm::ParentContext
Definition: ParentContext.h:27
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::Worker::RunModuleTask::m_parentContext
const ParentContext m_parentContext
Definition: Worker.h:452
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::call
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:755
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:798
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:645
edm::ThinnedAssociationsHelper
Definition: ThinnedAssociationsHelper.h:37
edm::Worker::runAcquire
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:451
edm::GlobalContext
Definition: GlobalContext.h:29
edm::Worker::serializeRunModule
virtual TaskQueueAdaptor serializeRunModule()=0
edm::Worker::implDoAcquire
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:822
edm::Worker::wantsProcessBlocks
virtual bool wantsProcessBlocks() const =0
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
Definition: Worker.h:800
edm::Worker::waitingTasks_
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:567
edm::workerhelper::CallImpl
Definition: Worker.h:82
edm::Transition
Transition
Definition: Transition.h:12
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput >
Definition: OccurrenceTraits.h:434
a
double a
Definition: hdecay.h:119
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::call
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:730
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:233
fetchall_from_DQM_v2.release
release
Definition: fetchall_from_DQM_v2.py:92
WaitingTaskWithArenaHolder.h
edm::Worker::implRegisterThinnedAssociations
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
edm::ServiceRegistry::presentToken
ServiceToken presentToken() const
Definition: ServiceRegistry.cc:63
edm::ModuleCallingContext::getStreamContext
StreamContext const * getStreamContext() const
Definition: ModuleCallingContext.cc:32
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:688
ExceptionMessages.h
edm::Worker::timesRun
int timesRun() const
Definition: Worker.h:225
edm::Worker::RunModuleTask::m_transitionInfo
T::TransitionInfoType m_transitionInfo
Definition: Worker.h:450
edm::BranchActionStreamBegin
Definition: BranchActionType.h:13
thread_safety_macros.h
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *)
Definition: Worker.h:621
edm::Worker::RunModuleTask::EnableQueueGuard::queue_
SerialTaskQueue * queue_
Definition: Worker.h:376
ESRecordsToProxyIndices
helper
Definition: helper.py:1
edm::Worker::HandleExternalWorkExceptionTask::m_runModuleTask
WaitingTask * m_runModuleTask
Definition: Worker.h:545
edm::Worker::prePrefetchSelectionAsync
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:204
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:746
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:723
edm::ModuleCallingContext::setContext
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
Definition: ModuleCallingContext.cc:24
iEvent
int iEvent
Definition: GenABIO.cc:224
edm::Worker::RunModuleTask::RunModuleTask
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:362
edm::Worker::RunModuleTask::m_streamID
StreamID m_streamID
Definition: Worker.h:451
edm::Worker::setEarlyDeleteHelper
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:339
edm::Worker::ranAcquireWithoutException_
bool ranAcquireWithoutException_
Definition: Worker.h:569
edm::Worker::exceptionContext
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:112
edm::Worker::doWorkNoPrefetchingAsync
void doWorkNoPrefetchingAsync(WaitingTask *, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1020
edm::Worker::implRespondToOpenInputFile
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
edm::Worker::numberOfPathsOn_
int numberOfPathsOn_
Definition: Worker.h:555
edm::Worker::~Worker
virtual ~Worker()
Definition: Worker.cc:108
edm::Worker::timesFailed
int timesFailed() const
Definition: Worker.h:228
edm::Worker::doWork
bool doWork(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1077
edm::Worker::TaskQueueAdaptor::TaskQueueAdaptor
TaskQueueAdaptor()=default
edm::Worker::moduleCallingContext_
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:558
InternalContext.h
ExceptionActions.h
edm::Worker::setPassed
bool setPassed()
Definition: Worker.h:299
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::Arg
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:628
edm::Worker::respondToCloseInputFile
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:174
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:865
edm::Worker::timesFailed_
std::atomic< int > timesFailed_
Definition: Worker.h:552
edm::Worker::TaskQueueAdaptor
Definition: Worker.h:92
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:823
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:842
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:663
edm::ExceptionToActionTable
Definition: ExceptionActions.h:16
edm::Worker::description
ModuleDescription const & description() const
Definition: Worker.h:188
edm::Worker::implDo
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
edm::Worker::state
State state() const
Definition: Worker.h:230
edm::Worker::cached_exception_
std::exception_ptr cached_exception_
Definition: Worker.h:561
edm::Worker::addedToPath
void addedToPath()
Definition: Worker.h:223
PathContext.h
edm::ServiceRegistry::instance
static ServiceRegistry & instance()
Definition: ServiceRegistry.cc:90
edm::EarlyDeleteHelper
Definition: EarlyDeleteHelper.h:40
edm::Worker::setFailed
bool setFailed()
Definition: Worker.h:308
edm::Worker::needsESPrefetching
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:333
edm::Worker::esRecordsToGetFrom
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
WaitingTask.h
edm::make_empty_waiting_task
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
Definition: WaitingTaskList.h:96
edm::Worker::moduleType
virtual Types moduleType() const =0
edm::Worker::timesPassed
int timesPassed() const
Definition: Worker.h:227
edm::Worker::runModuleAfterAsyncPrefetch
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:994
SimL1EmulatorRepack_CalouGT_cff.processName
processName
Definition: SimL1EmulatorRepack_CalouGT_cff.py:17
edm::Worker::RunModuleTask::EnableQueueGuard::EnableQueueGuard
EnableQueueGuard(EnableQueueGuard &&iGuard)
Definition: Worker.h:381
edm::Worker::wantsStreamRuns
virtual bool wantsStreamRuns() const =0
Types
Definition: Types.py:1
edm::Worker::itemsToGetFrom
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
cms::cuda::device::unique_ptr
std::unique_ptr< T, impl::DeviceDeleter > unique_ptr
Definition: device_unique_ptr.h:33
edm::Worker::hasAcquire
virtual bool hasAcquire() const =0
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:844
OccurrenceTraits.h
EarlyDeleteHelper
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:720
ParentContext.h
edm::Worker::clearCounters
void clearCounters()
Definition: Worker.h:215
edm::WaitingTask
Definition: WaitingTask.h:36
WorkerParams.h
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::call
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:871
edm::Worker::globalRunsQueue
virtual SerialTaskQueue * globalRunsQueue()=0
eostools.move
def move(src, dest)
Definition: eostools.py:511
std
Definition: JetResolutionObject.h:76
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:774
edm::BranchActionGlobalEnd
Definition: BranchActionType.h:15
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::call
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:704
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::call
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:679
Frameworkfwd.h
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:749
T
long double T
Definition: Basic3DVectorLD.h:48
edm::Worker::updateLookup
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
edm::Worker::TaskQueueAdaptor::TaskQueueAdaptor
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:97
edm::Worker::hasAccumulator
virtual bool hasAccumulator() const =0
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:815
edm::Worker::skipOnPath
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:436
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_worker
Worker * m_worker
Definition: Worker.h:527
edm::Worker::activityRegistry
ActivityRegistry * activityRegistry()
Definition: Worker.h:268
edm::Worker::RunModuleTask::EnableQueueGuard::EnableQueueGuard
EnableQueueGuard(SerialTaskQueue *iQueue)
Definition: Worker.h:377
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
Definition: Worker.h:698
edm::Worker::esItemsToGetFrom
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
edm::Worker::TaskQueueAdaptor::serial_
SerialTaskQueueChain * serial_
Definition: Worker.h:93
edm::Worker::globalLuminosityBlocksQueue
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync
static constexpr void esPrefetchAsync(Worker *, WaitingTask *, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:880
edm::Worker::handleExternalWorkException
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:491
Exception.h
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd >
Definition: OccurrenceTraits.h:314
edm::Worker::implBeginStream
virtual void implBeginStream(StreamID)=0
edm::Worker::Ready
Definition: Worker.h:90
DUMMY
#define DUMMY
Definition: DMRtrends.cc:34
edm::Worker::RunModuleTask::m_context
T::Context const * m_context
Definition: Worker.h:453
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::call
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:629
edm::Worker::timesPass
int timesPass() const
Definition: Worker.h:232
edm::Worker::emitPostModuleEventPrefetchingSignal
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:337
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition)
Definition: Worker.h:611
SerialTaskQueueChain.h
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:622
WaitingTaskList.h
cms::Exception
Definition: Exception.h:70
TransitionInfoTypes.h
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::execute
tbb::task * execute() override
Definition: Worker.h:486
edm::RunPrincipal
Definition: RunPrincipal.h:34
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:790
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_holder
WaitingTaskWithArenaHolder m_holder
Definition: Worker.h:530
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:721
edm::SerialTaskQueueChain
Definition: SerialTaskQueueChain.h:32
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:670
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:799
ConsumesInfo.h
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_eventTransitionInfo
EventTransitionInfo m_eventTransitionInfo
Definition: Worker.h:528
StreamID.h
edm::ServiceRegistry::Operate
Definition: ServiceRegistry.h:40
edm::BranchActionStreamEnd
Definition: BranchActionType.h:14
CMS_THREAD_GUARD
#define CMS_THREAD_GUARD(_var_)
Definition: thread_safety_macros.h:6
edm::Worker::kOutputModule
Definition: Worker.h:91
ModuleContextSentry.h
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:771
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:672
edm::Worker::beginStream
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:379
benchmark_cfg.fb
fb
Definition: benchmark_cfg.py:14
edm::Worker::implNeedToRunSelection
virtual bool implNeedToRunSelection() const =0
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:713
edm::LimitedTaskQueue::push
void push(T &&iAction)
asynchronously pushes functor iAction into queue
Definition: LimitedTaskQueue.h:126
edm::Worker::implDoStreamBegin
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
edm::Worker::endStream
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:403
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTask *waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:764
edm::Worker::workerType
virtual std::string workerType() const =0
edm::Worker::implDoEndProcessBlock
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
edm::Worker::TaskQueueAdaptor::TaskQueueAdaptor
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:98
edm::Worker::shouldRethrowException
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:164
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::call
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:831
edm::Transition::NumberOfEventSetupTransitions
edm::ModuleCallingContext
Definition: ModuleCallingContext.h:29
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:318