CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
57 
59 
60 #include <atomic>
61 #include <cassert>
62 #include <map>
63 #include <memory>
64 #include <sstream>
65 #include <string>
66 #include <vector>
67 #include <exception>
68 #include <unordered_map>
69 
70 namespace edm {
71  class EventPrincipal;
72  class EventSetupImpl;
73  class EarlyDeleteHelper;
74  class ModuleProcessName;
75  class ProductResolverIndexHelper;
76  class ProductResolverIndexAndSkipBit;
77  class StreamID;
78  class StreamContext;
79  class ProductRegistry;
80  class ThinnedAssociationsHelper;
81 
82  namespace workerhelper {
83  template <typename O>
84  class CallImpl;
85  }
86  namespace eventsetup {
88  }
89 
90  class Worker {
91  public:
92  enum State { Ready, Pass, Fail, Exception };
97 
98  TaskQueueAdaptor() = default;
100  TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
101 
102  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
103 
104  template <class F>
105  void push(tbb::task_group& iG, F&& iF) {
106  if (serial_) {
107  serial_->push(iG, iF);
108  } else {
109  limited_->push(iG, iF);
110  }
111  }
112  };
113 
114  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
115  virtual ~Worker();
116 
117  Worker(Worker const&) = delete; // Disallow copying and moving
118  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
119 
120  void clearModule() {
121  moduleValid_ = false;
122  doClearModule();
123  }
124 
125  virtual bool wantsProcessBlocks() const = 0;
126  virtual bool wantsInputProcessBlocks() const = 0;
127  virtual bool wantsGlobalRuns() const = 0;
128  virtual bool wantsGlobalLuminosityBlocks() const = 0;
129  virtual bool wantsStreamRuns() const = 0;
130  virtual bool wantsStreamLuminosityBlocks() const = 0;
131 
132  virtual SerialTaskQueue* globalRunsQueue() = 0;
134 
136  tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, EventPrincipal const*);
137 
139  tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) {
140  assert(false);
141  }
142 
143  template <typename T>
145  typename T::TransitionInfoType const&,
146  ServiceToken const&,
147  StreamID,
148  ParentContext const&,
149  typename T::Context const*);
150 
151  template <typename T>
153  typename T::TransitionInfoType const&,
154  ServiceToken const&,
155  StreamID,
156  ParentContext const&,
157  typename T::Context const*);
158 
159  template <typename T>
160  std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
161  StreamID,
162  ParentContext const&,
163  typename T::Context const*);
164 
166  void skipOnPath(EventPrincipal const& iEvent);
167  void beginJob();
168  void endJob();
169  void beginStream(StreamID id, StreamContext& streamContext);
170  void endStream(StreamID id, StreamContext& streamContext);
173 
175 
176  void reset() {
177  cached_exception_ = std::exception_ptr();
178  state_ = Ready;
180  workStarted_ = false;
182  }
183 
184  void postDoEvent(EventPrincipal const&);
185 
187  if (moduleValid_) {
189  }
190  return nullptr;
191  }
194  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
195 
197 
198  //Used to make EDGetToken work
199  virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
200  virtual void updateLookup(eventsetup::ESRecordsToProxyIndices const&) = 0;
201  virtual void resolvePutIndicies(
202  BranchType iBranchType,
203  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
204  iIndicies) = 0;
205 
206  virtual void modulesWhoseProductsAreConsumed(
207  std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
208  std::vector<ModuleProcessName>& modulesInPreviousProcesses,
209  ProductRegistry const& preg,
210  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
211 
212  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
213 
214  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
215 
216  virtual Types moduleType() const = 0;
217 
218  void clearCounters() {
219  timesRun_.store(0, std::memory_order_release);
220  timesVisited_.store(0, std::memory_order_release);
221  timesPassed_.store(0, std::memory_order_release);
222  timesFailed_.store(0, std::memory_order_release);
223  timesExcept_.store(0, std::memory_order_release);
224  }
225 
227  //NOTE: calling state() is done to force synchronization across threads
228  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
229  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
230  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
231  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
232  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
233  State state() const { return state_; }
234 
235  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
236 
237  virtual bool hasAccumulator() const = 0;
238 
239  protected:
240  template <typename O>
242 
243  virtual void doClearModule() = 0;
244 
245  virtual std::string workerType() const = 0;
246  virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
247 
248  virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
249  virtual bool implNeedToRunSelection() const = 0;
250 
251  virtual void implDoAcquire(EventTransitionInfo const&,
252  ModuleCallingContext const*,
254 
256  virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
258  virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
259  virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
260  virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
261  virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
262  virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
263  virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
264  virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
265  virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
266  virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
267  virtual void implBeginJob() = 0;
268  virtual void implEndJob() = 0;
269  virtual void implBeginStream(StreamID) = 0;
270  virtual void implEndStream(StreamID) = 0;
271 
273 
275 
276  private:
277  template <typename T>
278  bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
279 
280  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
281  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
282 
283  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
284 
285  virtual std::vector<ESProxyIndex> const& esItemsToGetFrom(Transition) const = 0;
286  virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
287  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
288 
290  ModuleCallingContext const& moduleCallingContext,
291  Principal const& iPrincipal) const = 0;
292 
293  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
294  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
295 
297 
298  virtual TaskQueueAdaptor serializeRunModule() = 0;
299 
300  static void exceptionContext(cms::Exception& ex, ModuleCallingContext const* mcc);
301 
302  bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const;
303 
304  template <bool IS_EVENT>
305  bool setPassed() {
306  if (IS_EVENT) {
307  timesPassed_.fetch_add(1, std::memory_order_relaxed);
308  }
309  state_ = Pass;
310  return true;
311  }
312 
313  template <bool IS_EVENT>
314  bool setFailed() {
315  if (IS_EVENT) {
316  timesFailed_.fetch_add(1, std::memory_order_relaxed);
317  }
318  state_ = Fail;
319  return false;
320  }
321 
322  template <bool IS_EVENT>
323  std::exception_ptr setException(std::exception_ptr iException) {
324  if (IS_EVENT) {
325  timesExcept_.fetch_add(1, std::memory_order_relaxed);
326  }
327  cached_exception_ = iException; // propagate_const<T> has no reset() function
328  state_ = Exception;
329  return cached_exception_;
330  }
331 
332  template <typename T>
334  ServiceToken const&,
335  ParentContext const&,
336  typename T::TransitionInfoType const&,
337  Transition);
338 
340  void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const;
341 
342  bool needsESPrefetching(Transition iTrans) const noexcept {
343  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
344  }
345 
347  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
348  }
349 
350  virtual bool hasAcquire() const = 0;
351 
352  template <typename T>
353  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const*,
354  typename T::TransitionInfoType const&,
355  StreamID,
356  ParentContext const&,
357  typename T::Context const*);
358 
360 
361  void runAcquireAfterAsyncPrefetch(std::exception_ptr const*,
362  EventTransitionInfo const&,
363  ParentContext const&,
365 
366  std::exception_ptr handleExternalWorkException(std::exception_ptr const* iEPtr, ParentContext const& parentContext);
367 
368  template <typename T>
369  class RunModuleTask : public WaitingTask {
370  public:
372  typename T::TransitionInfoType const& transitionInfo,
373  ServiceToken const& token,
374  StreamID streamID,
375  ParentContext const& parentContext,
376  typename T::Context const* context,
377  tbb::task_group* iGroup)
378  : m_worker(worker),
379  m_transitionInfo(transitionInfo),
380  m_streamID(streamID),
381  m_parentContext(parentContext),
382  m_context(context),
384  m_group(iGroup) {}
385 
389  EnableQueueGuard(EnableQueueGuard const&) = delete;
390  EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
392  EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
394  if (queue_) {
395  queue_->resume();
396  }
397  }
398  };
399 
400  void execute() final {
401  //Need to make the services available early so other services can see them
403 
404  //incase the emit causes an exception, we need a memory location
405  // to hold the exception_ptr
406  std::exception_ptr temp_excptr;
407  auto excptr = exceptionPtr();
408  if constexpr (T::isEvent_) {
409  if (!m_worker->hasAcquire()) {
410  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
411  CMS_SA_ALLOW try {
412  //pre was called in prefetchAsync
414  } catch (...) {
415  temp_excptr = std::current_exception();
416  if (not excptr) {
417  excptr = &temp_excptr;
418  }
419  }
420  }
421  }
422 
423  if (not excptr) {
424  if (auto queue = m_worker->serializeRunModule()) {
425  auto f = [worker = m_worker,
427  streamID = m_streamID,
428  parentContext = m_parentContext,
429  sContext = m_context,
430  serviceToken = m_serviceToken]() {
431  //Need to make the services available
432  ServiceRegistry::Operate operateRunModule(serviceToken.lock());
433 
434  //If needed, we pause the queue in begin transition and resume it
435  // at the end transition. This can guarantee that the module
436  // only processes one run or lumi at a time
438  std::exception_ptr* ptr = nullptr;
439  worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
440  };
441  //keep another global transition from running if necessary
443  if (gQueue) {
444  gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
445  gQueue->pause();
446  queue.push(*group, std::move(f));
447  });
448  } else {
449  queue.push(*m_group, std::move(f));
450  }
451  return;
452  }
453  }
454 
456  }
457 
458  private:
460  typename T::TransitionInfoType m_transitionInfo;
463  typename T::Context const* m_context;
465  tbb::task_group* m_group;
466  };
467 
468  // AcquireTask is only used for the Event case, but we define
469  // it as a template so all cases will compile.
470  // DUMMY exists to work around the C++ Standard prohibition on
471  // fully specializing templates nested in other classes.
472  template <typename T, typename DUMMY = void>
473  class AcquireTask : public WaitingTask {
474  public:
476  typename T::TransitionInfoType const&,
477  ServiceToken const&,
478  ParentContext const&,
480  void execute() final {}
481  };
482 
483  template <typename DUMMY>
485  public:
487  EventTransitionInfo const& eventTransitionInfo,
488  ServiceToken const& token,
489  ParentContext const& parentContext,
491  : m_worker(worker),
492  m_eventTransitionInfo(eventTransitionInfo),
493  m_parentContext(parentContext),
494  m_holder(std::move(holder)),
495  m_serviceToken(token) {}
496 
497  void execute() final {
498  //Need to make the services available early so other services can see them
499  ServiceRegistry::Operate guard(m_serviceToken.lock());
500 
501  //incase the emit causes an exception, we need a memory location
502  // to hold the exception_ptr
503  std::exception_ptr temp_excptr;
504  auto excptr = exceptionPtr();
505  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
506  CMS_SA_ALLOW try {
507  //pre was called in prefetchAsync
508  m_worker->emitPostModuleEventPrefetchingSignal();
509  } catch (...) {
510  temp_excptr = std::current_exception();
511  if (not excptr) {
512  excptr = &temp_excptr;
513  }
514  }
515 
516  if (not excptr) {
517  if (auto queue = m_worker->serializeRunModule()) {
518  queue.push(*m_holder.group(),
519  [worker = m_worker,
520  info = m_eventTransitionInfo,
521  parentContext = m_parentContext,
522  serviceToken = m_serviceToken,
523  holder = m_holder]() {
524  //Need to make the services available
525  ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
526 
527  std::exception_ptr* ptr = nullptr;
528  worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
529  });
530  return;
531  }
532  }
533 
534  m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
535  }
536 
537  private:
543  };
544 
545  // This class does nothing unless there is an exception originating
546  // in an "External Worker". In that case, it handles converting the
547  // exception to a CMS exception and adding context to the exception.
549  public:
551  tbb::task_group* group,
552  WaitingTask* runModuleTask,
553  ParentContext const& parentContext);
554 
555  void execute() final;
556 
557  private:
560  tbb::task_group* m_group;
562  };
563 
564  std::atomic<int> timesRun_;
565  std::atomic<int> timesVisited_;
566  std::atomic<int> timesPassed_;
567  std::atomic<int> timesFailed_;
568  std::atomic<int> timesExcept_;
569  std::atomic<State> state_;
571  std::atomic<int> numberOfPathsLeftToRun_;
572 
574 
575  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
576  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
577 
578  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
579 
581 
583  std::atomic<bool> workStarted_;
585  bool moduleValid_ = true;
586  };
587 
588  namespace {
589  template <typename T>
590  class ModuleSignalSentry {
591  public:
592  ModuleSignalSentry(ActivityRegistry* a,
593  typename T::Context const* context,
594  ModuleCallingContext const* moduleCallingContext)
595  : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
596  if (a_)
597  T::preModuleSignal(a_, context, moduleCallingContext_);
598  }
599 
600  ~ModuleSignalSentry() {
601  if (a_)
602  T::postModuleSignal(a_, context_, moduleCallingContext_);
603  }
604 
605  private:
606  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
607  typename T::Context const* context_;
608  ModuleCallingContext const* moduleCallingContext_;
609  };
610 
611  } // namespace
612 
613  namespace workerhelper {
614  template <>
616  public:
618  static bool call(Worker* iWorker,
619  StreamID,
620  EventTransitionInfo const& info,
621  ActivityRegistry* /* actReg */,
622  ModuleCallingContext const* mcc,
623  Arg::Context const* /* context*/) {
624  //Signal sentry is handled by the module
625  return iWorker->implDo(info, mcc);
626  }
627  static void esPrefetchAsync(Worker* worker,
628  WaitingTaskHolder waitingTask,
629  ServiceToken const& token,
630  EventTransitionInfo const& info,
631  Transition transition) {
632  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
633  }
634  static bool wantsTransition(Worker const* iWorker) { return true; }
635  static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
636 
637  static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
638  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
639  };
640 
641  template <>
643  public:
645  static bool call(Worker* iWorker,
646  StreamID,
647  RunTransitionInfo const& info,
648  ActivityRegistry* actReg,
649  ModuleCallingContext const* mcc,
650  Arg::Context const* context) {
651  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
652  return iWorker->implDoBegin(info, mcc);
653  }
654  static void esPrefetchAsync(Worker* worker,
655  WaitingTaskHolder waitingTask,
656  ServiceToken const& token,
657  RunTransitionInfo const& info,
658  Transition transition) {
659  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
660  }
661  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
662  static bool needToRunSelection(Worker const* iWorker) { return false; }
663  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
664  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
665  };
666  template <>
668  public:
670  static bool call(Worker* iWorker,
671  StreamID id,
672  RunTransitionInfo const& info,
673  ActivityRegistry* actReg,
674  ModuleCallingContext const* mcc,
675  Arg::Context const* context) {
676  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
677  return iWorker->implDoStreamBegin(id, info, mcc);
678  }
679  static void esPrefetchAsync(Worker* worker,
680  WaitingTaskHolder waitingTask,
681  ServiceToken const& token,
682  RunTransitionInfo const& info,
683  Transition transition) {
684  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
685  }
686  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
687  static bool needToRunSelection(Worker const* iWorker) { return false; }
688  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
689  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
690  };
691  template <>
693  public:
695  static bool call(Worker* iWorker,
696  StreamID,
697  RunTransitionInfo const& info,
698  ActivityRegistry* actReg,
699  ModuleCallingContext const* mcc,
700  Arg::Context const* context) {
701  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
702  return iWorker->implDoEnd(info, mcc);
703  }
704  static void esPrefetchAsync(Worker* worker,
705  WaitingTaskHolder waitingTask,
706  ServiceToken const& token,
707  RunTransitionInfo const& info,
708  Transition transition) {
709  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
710  }
711  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
712  static bool needToRunSelection(Worker const* iWorker) { return false; }
713  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
714  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
715  };
716  template <>
718  public:
720  static bool call(Worker* iWorker,
721  StreamID id,
722  RunTransitionInfo const& info,
723  ActivityRegistry* actReg,
724  ModuleCallingContext const* mcc,
725  Arg::Context const* context) {
726  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
727  return iWorker->implDoStreamEnd(id, info, mcc);
728  }
729  static void esPrefetchAsync(Worker* worker,
730  WaitingTaskHolder waitingTask,
731  ServiceToken const& token,
732  RunTransitionInfo const& info,
733  Transition transition) {
734  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
735  }
736  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
737  static bool needToRunSelection(Worker const* iWorker) { return false; }
738  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
739  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
740  };
741 
742  template <>
744  public:
746  static bool call(Worker* iWorker,
747  StreamID,
748  LumiTransitionInfo const& info,
749  ActivityRegistry* actReg,
750  ModuleCallingContext const* mcc,
751  Arg::Context const* context) {
752  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
753  return iWorker->implDoBegin(info, mcc);
754  }
755  static void esPrefetchAsync(Worker* worker,
756  WaitingTaskHolder waitingTask,
757  ServiceToken const& token,
758  LumiTransitionInfo const& info,
759  Transition transition) {
760  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
761  }
762  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
763  static bool needToRunSelection(Worker const* iWorker) { return false; }
764  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
765  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
766  };
767  template <>
769  public:
771  static bool call(Worker* iWorker,
772  StreamID id,
773  LumiTransitionInfo const& info,
774  ActivityRegistry* actReg,
775  ModuleCallingContext const* mcc,
776  Arg::Context const* context) {
777  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
778  return iWorker->implDoStreamBegin(id, info, mcc);
779  }
780  static void esPrefetchAsync(Worker* worker,
781  WaitingTaskHolder waitingTask,
782  ServiceToken const& token,
783  LumiTransitionInfo const& info,
784  Transition transition) {
785  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
786  }
787  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
788  static bool needToRunSelection(Worker const* iWorker) { return false; }
789  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
790  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
791  };
792 
793  template <>
795  public:
797  static bool call(Worker* iWorker,
798  StreamID,
799  LumiTransitionInfo const& info,
800  ActivityRegistry* actReg,
801  ModuleCallingContext const* mcc,
802  Arg::Context const* context) {
803  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
804  return iWorker->implDoEnd(info, mcc);
805  }
806  static void esPrefetchAsync(Worker* worker,
807  WaitingTaskHolder waitingTask,
808  ServiceToken const& token,
809  LumiTransitionInfo const& info,
810  Transition transition) {
811  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
812  }
813  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
814  static bool needToRunSelection(Worker const* iWorker) { return false; }
815  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
816  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
817  };
818  template <>
820  public:
822  static bool call(Worker* iWorker,
823  StreamID id,
824  LumiTransitionInfo const& info,
825  ActivityRegistry* actReg,
826  ModuleCallingContext const* mcc,
827  Arg::Context const* context) {
828  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
829  return iWorker->implDoStreamEnd(id, info, mcc);
830  }
831  static void esPrefetchAsync(Worker* worker,
832  WaitingTaskHolder waitingTask,
833  ServiceToken const& token,
834  LumiTransitionInfo const& info,
835  Transition transition) {
836  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
837  }
838  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
839  static bool needToRunSelection(Worker const* iWorker) { return false; }
840  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
841  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
842  };
843  template <>
845  public:
847  static bool call(Worker* iWorker,
848  StreamID,
850  ActivityRegistry* actReg,
851  ModuleCallingContext const* mcc,
852  Arg::Context const* context) {
853  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
854  return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
855  }
856  static void esPrefetchAsync(
858  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
859  static bool needToRunSelection(Worker const* iWorker) { return false; }
860  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
861  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
862  };
863  template <>
865  public:
867  static bool call(Worker* iWorker,
868  StreamID,
870  ActivityRegistry* actReg,
871  ModuleCallingContext const* mcc,
872  Arg::Context const* context) {
873  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
874  return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
875  }
876  static void esPrefetchAsync(
878  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsInputProcessBlocks(); }
879  static bool needToRunSelection(Worker const* iWorker) { return false; }
880  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
881  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
882  };
883  template <>
885  public:
887  static bool call(Worker* iWorker,
888  StreamID,
890  ActivityRegistry* actReg,
891  ModuleCallingContext const* mcc,
892  Arg::Context const* context) {
893  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
894  return iWorker->implDoEndProcessBlock(info.principal(), mcc);
895  }
896  static void esPrefetchAsync(
898  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
899  static bool needToRunSelection(Worker const* iWorker) { return false; }
900  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
901  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
902  };
903  } // namespace workerhelper
904 
905  template <typename T>
907  ServiceToken const& token,
908  ParentContext const& parentContext,
909  typename T::TransitionInfoType const& transitionInfo,
910  Transition iTransition) {
911  Principal const& principal = transitionInfo.principal();
912 
914 
915  if (principal.branchType() == InEvent) {
916  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
917  }
918 
919  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
920  edPrefetchAsync(iTask, token, principal);
921 
922  if (principal.branchType() == InEvent) {
924  }
925  }
926 
927  template <typename T>
929  typename T::TransitionInfoType const& transitionInfo,
930  ServiceToken const& token,
931  StreamID streamID,
932  ParentContext const& parentContext,
933  typename T::Context const* context) {
935  return;
936  }
937 
938  //Need to check workStarted_ before adding to waitingTasks_
939  bool expected = false;
940  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
941 
943  if constexpr (T::isEvent_) {
944  timesVisited_.fetch_add(1, std::memory_order_relaxed);
945  }
946 
947  if (workStarted) {
949 
950  //if have TriggerResults based selection we want to reject the event before doing prefetching
952  //We need to run the selection in a different task so that
953  // we can prefetch the data needed for the selection
954  auto runTask =
955  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
956 
957  //make sure the task is either run or destroyed
958  struct DestroyTask {
959  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
960 
961  ~DestroyTask() {
962  auto p = m_task.exchange(nullptr);
963  if (p) {
964  TaskSentry s{p};
965  }
966  }
967 
968  edm::WaitingTask* release() { return m_task.exchange(nullptr); }
969 
970  private:
971  std::atomic<edm::WaitingTask*> m_task;
972  };
973  auto* group = task.group();
974  auto ownRunTask = std::make_shared<DestroyTask>(runTask);
975  ServiceWeakToken weakToken = token;
976  auto selectionTask =
977  make_waiting_task([ownRunTask, parentContext, info = transitionInfo, weakToken, group, this](
978  std::exception_ptr const*) mutable {
979  ServiceRegistry::Operate guard(weakToken.lock());
980  prefetchAsync<T>(WaitingTaskHolder(*group, ownRunTask->release()),
981  weakToken.lock(),
982  parentContext,
983  info,
984  T::transition_);
985  });
986  prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
987  } else {
988  WaitingTask* moduleTask =
989  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
990  auto group = task.group();
991  if constexpr (T::isEvent_) {
992  if (hasAcquire()) {
993  WaitingTaskWithArenaHolder runTaskHolder(
994  *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
995  moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
996  }
997  }
998  prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
999  }
1000  }
1001  }
1002 
1003  template <typename T>
1004  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
1005  typename T::TransitionInfoType const& transitionInfo,
1006  StreamID streamID,
1007  ParentContext const& parentContext,
1008  typename T::Context const* context) {
1009  std::exception_ptr exceptionPtr;
1010  if (iEPtr) {
1011  assert(*iEPtr);
1012  if (shouldRethrowException(*iEPtr, parentContext, T::isEvent_)) {
1013  exceptionPtr = *iEPtr;
1014  setException<T::isEvent_>(exceptionPtr);
1015  } else {
1016  setPassed<T::isEvent_>();
1017  }
1019  } else {
1020  // Caught exception is propagated via WaitingTaskList
1021  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1022  exceptionPtr = std::current_exception();
1023  }
1024  }
1025  waitingTasks_.doneWaiting(exceptionPtr);
1026  return exceptionPtr;
1027  }
1028 
1029  template <typename T>
1031  typename T::TransitionInfoType const& transitionInfo,
1032  ServiceToken const& serviceToken,
1033  StreamID streamID,
1034  ParentContext const& parentContext,
1035  typename T::Context const* context) {
1037  return;
1038  }
1039 
1040  //Need to check workStarted_ before adding to waitingTasks_
1041  bool expected = false;
1042  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1043 
1045  if (workStarted) {
1046  ServiceWeakToken weakToken = serviceToken;
1047  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1048  std::exception_ptr exceptionPtr;
1049  // Caught exception is propagated via WaitingTaskList
1050  CMS_SA_ALLOW try {
1051  //Need to make the services available
1052  ServiceRegistry::Operate guard(weakToken.lock());
1053 
1054  this->runModule<T>(info, streamID, parentContext, context);
1055  } catch (...) {
1056  exceptionPtr = std::current_exception();
1057  }
1058  this->waitingTasks_.doneWaiting(exceptionPtr);
1059  };
1060 
1061  if (needsESPrefetching(T::transition_)) {
1062  auto group = task.group();
1063  auto afterPrefetch =
1064  edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1065  if (iExcept) {
1066  this->waitingTasks_.doneWaiting(*iExcept);
1067  } else {
1068  if (auto queue = this->serializeRunModule()) {
1069  queue.push(*group, toDo);
1070  } else {
1071  group->run(toDo);
1072  }
1073  }
1074  });
1076  WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1077  } else {
1078  auto group = task.group();
1079  if (auto queue = this->serializeRunModule()) {
1080  queue.push(*group, toDo);
1081  } else {
1082  group->run(toDo);
1083  }
1084  }
1085  }
1086  }
1087 
1088  template <typename T>
1089  bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1090  StreamID streamID,
1091  ParentContext const& parentContext,
1092  typename T::Context const* context) {
1093  //unscheduled producers should advance this
1094  //if (T::isEvent_) {
1095  // ++timesVisited_;
1096  //}
1097  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1098  if constexpr (T::isEvent_) {
1099  timesRun_.fetch_add(1, std::memory_order_relaxed);
1100  }
1101 
1102  bool rc = true;
1103  try {
1104  convertException::wrap([&]() {
1106  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1107 
1108  if (rc) {
1109  setPassed<T::isEvent_>();
1110  } else {
1111  setFailed<T::isEvent_>();
1112  }
1113  });
1114  } catch (cms::Exception& ex) {
1116  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_)) {
1118  setException<T::isEvent_>(std::current_exception());
1119  std::rethrow_exception(cached_exception_);
1120  } else {
1121  rc = setPassed<T::isEvent_>();
1122  }
1123  }
1124 
1125  return rc;
1126  }
1127 
1128  template <typename T>
1129  std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1130  StreamID streamID,
1131  ParentContext const& parentContext,
1132  typename T::Context const* context) {
1133  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1134  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
1135  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1136  }
1137 } // namespace edm
1138 #endif
edm::Worker::RunModuleTask::EnableQueueGuard::operator=
EnableQueueGuard & operator=(EnableQueueGuard const &)=delete
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:898
edm::EventTransitionInfo
Definition: TransitionInfoTypes.h:26
edm::Worker::RunModuleTask
Definition: Worker.h:369
edm::Worker::consumesInfo
virtual std::vector< ConsumesInfo > consumesInfo() const =0
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::call
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:670
edm::StreamID
Definition: StreamID.h:30
edm::Worker::itemsToGetForSelection
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::call
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:822
edm::Worker::implDoPrePrefetchSelection
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
ModuleCallingContext.h
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:356
electrons_cff.bool
bool
Definition: electrons_cff.py:366
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::Arg
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:694
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::execute
void execute() final
Definition: Worker.h:497
ServiceRegistry.h
edm::Worker::itemsMayGet
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
MessageLogger.h
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:397
edm::eventsetup::ESRecordsToProxyIndices
Definition: ESRecordsToProxyIndices.h:35
edm::ModuleCallingContext::State::kPrefetching
edm::ProcessBlockTransitionInfo
Definition: TransitionInfoTypes.h:80
edm::Worker::implDoAccessInputProcessBlock
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:901
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::Arg
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:617
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:664
edm::ServiceWeakToken
Definition: ServiceToken.h:86
edm::Worker::runModule
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1089
edm::Worker::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:578
edm::Worker::wantsGlobalRuns
virtual bool wantsGlobalRuns() const =0
BranchType.h
edm::Worker::implEndJob
virtual void implEndJob()=0
edm::Worker::convertCurrentProcessAlias
virtual void convertCurrentProcessAlias(std::string const &processName)=0
f
double f[11][100]
Definition: MuScleFitUtils.cc:78
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:859
edm::ModuleContextSentry
Definition: ModuleContextSentry.h:11
edm::LumiTransitionInfo
Definition: TransitionInfoTypes.h:42
propagate_const.h
edm::EventSetupImpl
Definition: EventSetupImpl.h:49
edm::Worker::HandleExternalWorkExceptionTask::m_worker
Worker * m_worker
Definition: Worker.h:558
WaitingTaskHolder.h
PlaceInPathContext.h
edm::OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:71
edm::Worker::timesPassed_
std::atomic< int > timesPassed_
Definition: Worker.h:566
modules
Definition: MuonCleanerBySegments.cc:35
edm::Worker::Fail
Definition: Worker.h:92
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:831
edm::Worker::TaskQueueAdaptor::limited_
LimitedTaskQueue * limited_
Definition: Worker.h:96
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::Worker::runModuleDirectly
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1129
edm::Worker::beginJob
void beginJob()
Definition: Worker.cc:347
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:58
edm::Worker::timesVisited_
std::atomic< int > timesVisited_
Definition: Worker.h:565
cms::cuda::stream
cudaStream_t stream
Definition: HistoContainer.h:57
edm::Worker::Exception
Definition: Worker.h:92
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:789
edm::Worker::wantsGlobalLuminosityBlocks
virtual bool wantsGlobalLuminosityBlocks() const =0
edm::Worker::resetModuleDescription
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:339
ProductResolverIndex.h
edm::Worker::wantsInputProcessBlocks
virtual bool wantsInputProcessBlocks() const =0
edm::Worker::postDoEvent
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:446
edm::Worker::actions_
ExceptionToActionTable const * actions_
Definition: Worker.h:575
FunctorTask.h
edm::Worker::HandleExternalWorkExceptionTask::execute
void execute() final
Definition: Worker.cc:512
edm::Worker::implBeginJob
virtual void implBeginJob()=0
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:780
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:900
edm::LuminosityBlockPrincipal
Definition: LuminosityBlockPrincipal.h:31
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:663
edm::Worker::HandleExternalWorkExceptionTask::HandleExternalWorkExceptionTask
HandleExternalWorkExceptionTask(Worker *worker, tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:506
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:738
edm::Worker::Worker
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:91
edm::Worker::RunModuleTask::RunModuleTask
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context, tbb::task_group *iGroup)
Definition: Worker.h:371
cms::cuda::assert
assert(be >=bs)
edm::Worker::respondToOpenInputFile
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:171
edm::Worker::setException
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:323
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:274
edm::Worker::RunModuleTask::EnableQueueGuard
Definition: Worker.h:386
edm::Worker::itemsToGet
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
edm::Principal
Definition: Principal.h:56
edm::Worker::numberOfPathsLeftToRun_
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:571
edm::Worker::registerThinnedAssociations
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:428
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
edm::SerialTaskQueue
Definition: SerialTaskQueue.h:67
mps_check.array
array
Definition: mps_check.py:216
edm::Worker::timesVisited
int timesVisited() const
Definition: Worker.h:229
edm::Worker::kProducer
Definition: Worker.h:93
edm::Worker::State
State
Definition: Worker.h:92
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:861
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::call
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:797
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:679
edm::Worker::RunModuleTask::EnableQueueGuard::~EnableQueueGuard
~EnableQueueGuard()
Definition: Worker.h:393
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:713
edm::ModuleCallingContext::State::kInvalid
edm::ProductResolverIndexHelper
Definition: ProductResolverIndexHelper.h:89
edm::ModuleCallingContext::moduleDescription
ModuleDescription const * moduleDescription() const
Definition: ModuleCallingContext.h:50
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_parentContext
const ParentContext m_parentContext
Definition: Worker.h:540
edm::Worker::RunModuleTask::execute
void execute() final
Definition: Worker.h:400
edm::Worker::implDoBegin
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
edm::Worker::timesExcept
int timesExcept() const
Definition: Worker.h:232
edm::Worker::RunModuleTask::m_serviceToken
ServiceWeakToken m_serviceToken
Definition: Worker.h:464
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::Arg
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:669
edm::TaskSentry
Definition: TaskBase.h:50
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:662
edm::RunTransitionInfo
Definition: TransitionInfoTypes.h:64
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:806
edm::Worker::wantsStreamLuminosityBlocks
virtual bool wantsStreamLuminosityBlocks() const =0
edm::BranchActionGlobalBegin
Definition: BranchActionType.h:12
ESIndices.h
edm::ProcessBlockPrincipal
Definition: ProcessBlockPrincipal.h:22
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:878
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:471
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:712
edm::Worker::implEndStream
virtual void implEndStream(StreamID)=0
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:856
edm::Worker::workStarted_
std::atomic< bool > workStarted_
Definition: Worker.h:583
edm::Worker::itemsShouldPutInEvent
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent() const =0
edm::BranchType
BranchType
Definition: BranchType.h:11
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:689
edm::ModuleDescription
Definition: ModuleDescription.h:21
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:840
edm::NumBranchTypes
Definition: BranchType.h:11
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:764
edm::Worker::reset
void reset()
Definition: Worker.h:176
edm::Worker::operator=
Worker & operator=(Worker const &)=delete
edm::Exception
Definition: EDMException.h:77
edm::WaitingTaskList::reset
void reset()
Resets access to the resource so that added tasks will wait.
Definition: WaitingTaskList.cc:53
F
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
edm::OccurrenceTraits< RunPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:112
edm::WaitingTaskList
Definition: WaitingTaskList.h:84
edm::ProductRegistry
Definition: ProductRegistry.h:37
edm::Worker::implRespondToCloseInputFile
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
ModuleDescription.h
ActivityRegistry.h
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:841
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:813
edm::FileBlock
Definition: FileBlock.h:20
edm::Worker::description
ModuleDescription const * description() const
Definition: Worker.h:186
edm::Worker::AcquireTask
Definition: Worker.h:473
edm::ServiceToken
Definition: ServiceToken.h:42
edm::WaitingTaskWithArenaHolder
Definition: WaitingTaskWithArenaHolder.h:34
alignCSCRings.s
s
Definition: alignCSCRings.py:92
edm::propagate_const
Definition: propagate_const.h:32
edm::Worker::implDoStreamEnd
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
edm::Worker::kFilter
Definition: Worker.h:93
edm::Worker::doWorkAsync
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:928
edm::Worker::runAcquireAfterAsyncPrefetch
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.cc:467
edm::Worker::implDoBeginProcessBlock
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
edm::LimitedTaskQueue
Definition: LimitedTaskQueue.h:39
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:880
edm::WaitingTask::exceptionPtr
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:899
edm::EventPrincipal
Definition: EventPrincipal.h:46
edm::Worker::earlyDeleteHelper_
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:580
edm::OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:192
edm::Worker::setActivityRegistry
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:110
edm::StreamContext
Definition: StreamContext.h:31
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:879
edm::BranchActionProcessBlockInput
Definition: BranchActionType.h:16
edm::Worker::timesRun_
std::atomic< int > timesRun_
Definition: Worker.h:564
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:635
edm::Worker::AcquireTask::AcquireTask
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.h:475
edm::Worker::resolvePutIndicies
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:634
edm::Worker::kAnalyzer
Definition: Worker.h:93
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:711
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
edm::Worker::state_
std::atomic< State > state_
Definition: Worker.h:569
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
edm::Principal::branchType
BranchType const & branchType() const
Definition: Principal.h:181
ProductResolverIndexAndSkipBit.h
edm::Worker::HandleExternalWorkExceptionTask
Definition: Worker.h:548
edm::Worker::endJob
void endJob()
Definition: Worker.cc:363
edm::OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:38
edm::WaitingTaskList::doneWaiting
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
Definition: WaitingTaskList.cc:212
edm::Worker::doClearModule
virtual void doClearModule()=0
edm::Worker::HandleExternalWorkExceptionTask::m_parentContext
const ParentContext m_parentContext
Definition: Worker.h:561
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::Arg
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:719
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:687
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::AcquireTask
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:486
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:788
edm::InEvent
Definition: BranchType.h:11
Transition.h
edm::OccurrenceTraits
Definition: OccurrenceTraits.h:35
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:896
edm::Worker::RunModuleTask::m_worker
Worker * m_worker
Definition: Worker.h:459
ConvertException.h
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::call
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:618
edm::Worker::Pass
Definition: Worker.h:92
edm::OccurrenceTraits< RunPrincipal, BranchActionStreamEnd >
Definition: OccurrenceTraits.h:152
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::call
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:867
LimitedTaskQueue.h
edm::Worker
Definition: Worker.h:90
edm::Worker::timesExcept_
std::atomic< int > timesExcept_
Definition: Worker.h:568
edm::Worker::TaskQueueAdaptor::push
void push(tbb::task_group &iG, F &&iF)
Definition: Worker.h:105
edm::Worker::implDoEnd
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
createBeamHaloJobs.queue
queue
Definition: createBeamHaloJobs.py:343
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:763
edm::ParentContext
Definition: ParentContext.h:27
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::Worker::RunModuleTask::m_parentContext
const ParentContext m_parentContext
Definition: Worker.h:462
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::call
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:771
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:814
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:661
edm::Worker::clearModule
void clearModule()
Definition: Worker.h:120
edm::ThinnedAssociationsHelper
Definition: ThinnedAssociationsHelper.h:37
edm::LimitedTaskQueue::push
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
Definition: LimitedTaskQueue.h:115
edm::Worker::runAcquire
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:452
edm::Worker::doWorkNoPrefetchingAsync
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1030
edm::GlobalContext
Definition: GlobalContext.h:29
edm::Worker::serializeRunModule
virtual TaskQueueAdaptor serializeRunModule()=0
edm::Worker::implDoAcquire
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:838
edm::Worker::wantsProcessBlocks
virtual bool wantsProcessBlocks() const =0
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
Definition: Worker.h:816
edm::Worker::waitingTasks_
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:582
edm::workerhelper::CallImpl
Definition: Worker.h:84
edm::Transition
Transition
Definition: Transition.h:12
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput >
Definition: OccurrenceTraits.h:434
a
double a
Definition: hdecay.h:119
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::call
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:746
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:233
fetchall_from_DQM_v2.release
release
Definition: fetchall_from_DQM_v2.py:92
WaitingTaskWithArenaHolder.h
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:32
edm::Worker::RunModuleTask::m_group
tbb::task_group * m_group
Definition: Worker.h:465
edm::Worker::implRegisterThinnedAssociations
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
edm::ModuleCallingContext::getStreamContext
StreamContext const * getStreamContext() const
Definition: ModuleCallingContext.cc:32
ExceptionMessages.h
edm::Worker::timesRun
int timesRun() const
Definition: Worker.h:228
edm::Worker::RunModuleTask::m_transitionInfo
T::TransitionInfoType m_transitionInfo
Definition: Worker.h:460
edm::BranchActionStreamBegin
Definition: BranchActionType.h:13
thread_safety_macros.h
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *)
Definition: Worker.h:637
edm::Worker::RunModuleTask::EnableQueueGuard::queue_
SerialTaskQueue * queue_
Definition: Worker.h:387
ESRecordsToProxyIndices
helper
Definition: helper.py:1
edm::Worker::HandleExternalWorkExceptionTask::m_runModuleTask
WaitingTask * m_runModuleTask
Definition: Worker.h:559
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:762
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:739
edm::ModuleCallingContext::setContext
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
Definition: ModuleCallingContext.cc:24
iEvent
int iEvent
Definition: GenABIO.cc:224
edm::Worker::RunModuleTask::m_streamID
StreamID m_streamID
Definition: Worker.h:461
edm::Worker::setEarlyDeleteHelper
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:337
edm::Worker::ranAcquireWithoutException_
bool ranAcquireWithoutException_
Definition: Worker.h:584
edm::Worker::exceptionContext
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:112
edm::Worker::implRespondToOpenInputFile
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
edm::Worker::numberOfPathsOn_
int numberOfPathsOn_
Definition: Worker.h:570
edm::Worker::~Worker
virtual ~Worker()
Definition: Worker.cc:108
edm::SerialTaskQueueChain::push
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:75
edm::Worker::timesFailed
int timesFailed() const
Definition: Worker.h:231
edm::Worker::TaskQueueAdaptor::TaskQueueAdaptor
TaskQueueAdaptor()=default
edm::Worker::moduleCallingContext_
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:573
InternalContext.h
ExceptionActions.h
edm::Worker::setPassed
bool setPassed()
Definition: Worker.h:305
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::esPrefetchAsync
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:876
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::Arg
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:644
edm::ServiceWeakToken::lock
ServiceToken lock() const
Definition: ServiceToken.h:101
edm::Worker::respondToCloseInputFile
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:172
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:881
edm::Worker::callWhenDoneAsync
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:165
edm::Worker::timesFailed_
std::atomic< int > timesFailed_
Definition: Worker.h:567
edm::Worker::TaskQueueAdaptor
Definition: Worker.h:94
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:839
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:858
edm::ExceptionToActionTable
Definition: ExceptionActions.h:16
edm::Worker::esPrefetchAsync
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:253
edm::Worker::implDo
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
edm::Worker::state
State state() const
Definition: Worker.h:233
edm::Worker::cached_exception_
std::exception_ptr cached_exception_
Definition: Worker.h:576
edm::Worker::addedToPath
void addedToPath()
Definition: Worker.h:226
PathContext.h
edm::Worker::HandleExternalWorkExceptionTask::m_group
tbb::task_group * m_group
Definition: Worker.h:560
edm::EarlyDeleteHelper
Definition: EarlyDeleteHelper.h:40
edm::Worker::setFailed
bool setFailed()
Definition: Worker.h:314
edm::Worker::needsESPrefetching
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:342
edm::Worker::esRecordsToGetFrom
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
WaitingTask.h
edm::Worker::moduleType
virtual Types moduleType() const =0
edm::Worker::timesPassed
int timesPassed() const
Definition: Worker.h:230
edm::Worker::runModuleAfterAsyncPrefetch
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1004
SimL1EmulatorRepack_CalouGT_cff.processName
processName
Definition: SimL1EmulatorRepack_CalouGT_cff.py:17
edm::Worker::RunModuleTask::EnableQueueGuard::EnableQueueGuard
EnableQueueGuard(EnableQueueGuard &&iGuard)
Definition: Worker.h:392
edm::Worker::wantsStreamRuns
virtual bool wantsStreamRuns() const =0
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition)
Definition: Worker.h:627
Types
Definition: Types.py:1
edm::Worker::itemsToGetFrom
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
edm::Worker::hasAcquire
virtual bool hasAcquire() const =0
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:860
OccurrenceTraits.h
EarlyDeleteHelper
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:736
edm::WaitingTaskList::add
void add(tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
Definition: WaitingTaskList.cc:125
ParentContext.h
edm::Worker::clearCounters
void clearCounters()
Definition: Worker.h:218
edm::WaitingTask
Definition: WaitingTask.h:36
WorkerParams.h
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::call
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:887
edm::Worker::globalRunsQueue
virtual SerialTaskQueue * globalRunsQueue()=0
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:654
eostools.move
def move(src, dest)
Definition: eostools.py:511
std
Definition: JetResolutionObject.h:76
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:790
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:704
edm::Worker::prefetchAsync
void prefetchAsync(WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
Definition: Worker.h:906
edm::BranchActionGlobalEnd
Definition: BranchActionType.h:15
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::call
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:720
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::call
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:695
Frameworkfwd.h
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:765
T
long double T
Definition: Basic3DVectorLD.h:48
edm::Worker::updateLookup
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
edm::Worker::TaskQueueAdaptor::TaskQueueAdaptor
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:99
edm::Worker::hasAccumulator
virtual bool hasAccumulator() const =0
edm::Worker::preActionBeforeRunEventAsync
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
edm::Worker::skipOnPath
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:437
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_worker
Worker * m_worker
Definition: Worker.h:538
edm::Worker::activityRegistry
ActivityRegistry * activityRegistry()
Definition: Worker.h:274
edm::Worker::modulesWhoseProductsAreConsumed
virtual void modulesWhoseProductsAreConsumed(std::array< std::vector< ModuleDescription const * > *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
edm::Worker::RunModuleTask::EnableQueueGuard::EnableQueueGuard
EnableQueueGuard(SerialTaskQueue *iQueue)
Definition: Worker.h:388
edm::Worker::edPrefetchAsync
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:324
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
Definition: Worker.h:714
edm::Worker::esItemsToGetFrom
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
edm::Worker::TaskQueueAdaptor::serial_
SerialTaskQueueChain * serial_
Definition: Worker.h:95
edm::Worker::prePrefetchSelectionAsync
void prePrefetchSelectionAsync(tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:204
edm::Worker::globalLuminosityBlocksQueue
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::Worker::handleExternalWorkException
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:492
Exception.h
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd >
Definition: OccurrenceTraits.h:314
edm::Worker::implBeginStream
virtual void implBeginStream(StreamID)=0
edm::Worker::Ready
Definition: Worker.h:92
DUMMY
#define DUMMY
Definition: DMRtrends.cc:34
edm::Worker::AcquireTask::execute
void execute() final
Definition: Worker.h:480
edm::Worker::RunModuleTask::m_context
T::Context const * m_context
Definition: Worker.h:463
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::call
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:645
edm::Worker::timesPass
int timesPass() const
Definition: Worker.h:235
edm::Worker::emitPostModuleEventPrefetchingSignal
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:346
SerialTaskQueueChain.h
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:638
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:729
WaitingTaskList.h
cms::Exception
Definition: Exception.h:70
TransitionInfoTypes.h
edm::RunPrincipal
Definition: RunPrincipal.h:34
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_holder
WaitingTaskWithArenaHolder m_holder
Definition: Worker.h:541
edm::Worker::prePrefetchSelectionAsync
void prePrefetchSelectionAsync(tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
Definition: Worker.h:138
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:737
edm::SerialTaskQueueChain
Definition: SerialTaskQueueChain.h:32
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:686
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:815
ConsumesInfo.h
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_eventTransitionInfo
EventTransitionInfo m_eventTransitionInfo
Definition: Worker.h:539
StreamID.h
edm::ServiceRegistry::Operate
Definition: ServiceRegistry.h:40
edm::BranchActionStreamEnd
Definition: BranchActionType.h:14
CMS_THREAD_GUARD
#define CMS_THREAD_GUARD(_var_)
Definition: thread_safety_macros.h:6
edm::Worker::kOutputModule
Definition: Worker.h:93
ModuleContextSentry.h
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:755
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:787
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:688
edm::Worker::beginStream
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:380
edm::Worker::moduleValid_
bool moduleValid_
Definition: Worker.h:585
benchmark_cfg.fb
fb
Definition: benchmark_cfg.py:14
edm::Worker::implNeedToRunSelection
virtual bool implNeedToRunSelection() const =0
edm::Worker::implDoStreamBegin
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_serviceToken
ServiceWeakToken m_serviceToken
Definition: Worker.h:542
edm::Worker::endStream
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:404
edm::Worker::workerType
virtual std::string workerType() const =0
edm::Worker::implDoEndProcessBlock
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
edm::Worker::TaskQueueAdaptor::TaskQueueAdaptor
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:100
edm::Worker::shouldRethrowException
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:164
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::call
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:847
watchdog.group
group
Definition: watchdog.py:82
edm::Transition::NumberOfEventSetupTransitions
edm::ModuleCallingContext
Definition: ModuleCallingContext.h:29
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316