CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
58 
60 
61 #include <atomic>
62 #include <cassert>
63 #include <map>
64 #include <memory>
65 #include <sstream>
66 #include <string>
67 #include <vector>
68 #include <exception>
69 #include <unordered_map>
70 
71 namespace edm {
72  class EventPrincipal;
73  class EventSetupImpl;
74  class EarlyDeleteHelper;
75  class ModuleProcessName;
76  class ProductResolverIndexHelper;
77  class ProductResolverIndexAndSkipBit;
78  class StreamID;
79  class StreamContext;
80  class ProductRegistry;
81  class ThinnedAssociationsHelper;
82 
83  namespace workerhelper {
84  template <typename O>
85  class CallImpl;
86  }
87  namespace eventsetup {
89  }
90 
91  class Worker {
92  public:
93  enum State { Ready, Pass, Fail, Exception };
99 
100  TaskQueueAdaptor() = default;
102  TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
103 
104  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
105 
106  template <class F>
107  void push(tbb::task_group& iG, F&& iF) {
108  if (serial_) {
109  serial_->push(iG, iF);
110  } else {
111  limited_->push(iG, iF);
112  }
113  }
114  };
115 
116  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
117  virtual ~Worker();
118 
119  Worker(Worker const&) = delete; // Disallow copying and moving
120  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
121 
122  void clearModule() {
123  moduleValid_ = false;
124  doClearModule();
125  }
126 
127  virtual bool wantsProcessBlocks() const = 0;
128  virtual bool wantsInputProcessBlocks() const = 0;
129  virtual bool wantsGlobalRuns() const = 0;
130  virtual bool wantsGlobalLuminosityBlocks() const = 0;
131  virtual bool wantsStreamRuns() const = 0;
132  virtual bool wantsStreamLuminosityBlocks() const = 0;
133 
134  virtual SerialTaskQueue* globalRunsQueue() = 0;
136 
138  tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, EventPrincipal const*);
139 
141  tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) {
142  assert(false);
143  }
144 
145  template <typename T>
147  typename T::TransitionInfoType const&,
148  ServiceToken const&,
149  StreamID,
150  ParentContext const&,
151  typename T::Context const*);
152 
153  template <typename T>
155  typename T::TransitionInfoType const&,
156  ServiceToken const&,
157  StreamID,
158  ParentContext const&,
159  typename T::Context const*);
160 
161  template <typename T>
162  std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
163  StreamID,
164  ParentContext const&,
165  typename T::Context const*);
166 
168  void skipOnPath(EventPrincipal const& iEvent);
169  void beginJob();
170  void endJob();
171  void beginStream(StreamID id, StreamContext& streamContext);
172  void endStream(StreamID id, StreamContext& streamContext);
177 
178  void reset() {
179  cached_exception_ = std::exception_ptr();
180  state_ = Ready;
182  workStarted_ = false;
184  }
185 
186  void postDoEvent(EventPrincipal const&);
187 
189  if (moduleValid_) {
191  }
192  return nullptr;
193  }
196  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
197 
199 
200  //Used to make EDGetToken work
201  virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
202  virtual void updateLookup(eventsetup::ESRecordsToProxyIndices const&) = 0;
203  virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
204  virtual void resolvePutIndicies(
205  BranchType iBranchType,
206  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
207  iIndicies) = 0;
208 
209  virtual void modulesWhoseProductsAreConsumed(
210  std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
211  std::vector<ModuleProcessName>& modulesInPreviousProcesses,
212  ProductRegistry const& preg,
213  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
214 
215  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
216 
217  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
218 
219  virtual Types moduleType() const = 0;
220  virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
221 
222  void clearCounters() {
223  timesRun_.store(0, std::memory_order_release);
224  timesVisited_.store(0, std::memory_order_release);
225  timesPassed_.store(0, std::memory_order_release);
226  timesFailed_.store(0, std::memory_order_release);
227  timesExcept_.store(0, std::memory_order_release);
228  }
229 
231  //NOTE: calling state() is done to force synchronization across threads
232  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
233  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
234  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
235  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
236  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
237  State state() const { return state_; }
238 
239  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
240 
241  virtual bool hasAccumulator() const = 0;
242 
243  protected:
244  template <typename O>
246 
247  virtual void doClearModule() = 0;
248 
249  virtual std::string workerType() const = 0;
250  virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
251 
252  virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
253  virtual bool implNeedToRunSelection() const = 0;
254 
255  virtual void implDoAcquire(EventTransitionInfo const&,
256  ModuleCallingContext const*,
258 
260  virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
262  virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
263  virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
264  virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
265  virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
266  virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
267  virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
268  virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
269  virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
270  virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
271  virtual void implBeginJob() = 0;
272  virtual void implEndJob() = 0;
273  virtual void implBeginStream(StreamID) = 0;
274  virtual void implEndStream(StreamID) = 0;
275 
277 
279 
280  private:
281  template <typename T>
282  bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
283 
284  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
285  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
286 
287  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
288 
289  virtual std::vector<ESProxyIndex> const& esItemsToGetFrom(Transition) const = 0;
290  virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
291  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
292 
294  ModuleCallingContext const& moduleCallingContext,
295  Principal const& iPrincipal) const = 0;
296 
297  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
298  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
299  virtual void implRespondToCloseOutputFile() = 0;
300 
302 
303  virtual TaskQueueAdaptor serializeRunModule() = 0;
304 
305  static void exceptionContext(cms::Exception& ex, ModuleCallingContext const* mcc);
306 
307  bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const;
308 
309  template <bool IS_EVENT>
310  bool setPassed() {
311  if (IS_EVENT) {
312  timesPassed_.fetch_add(1, std::memory_order_relaxed);
313  }
314  state_ = Pass;
315  return true;
316  }
317 
318  template <bool IS_EVENT>
319  bool setFailed() {
320  if (IS_EVENT) {
321  timesFailed_.fetch_add(1, std::memory_order_relaxed);
322  }
323  state_ = Fail;
324  return false;
325  }
326 
327  template <bool IS_EVENT>
328  std::exception_ptr setException(std::exception_ptr iException) {
329  if (IS_EVENT) {
330  timesExcept_.fetch_add(1, std::memory_order_relaxed);
331  }
332  cached_exception_ = iException; // propagate_const<T> has no reset() function
333  state_ = Exception;
334  return cached_exception_;
335  }
336 
337  template <typename T>
339  ServiceToken const&,
340  ParentContext const&,
341  typename T::TransitionInfoType const&,
342  Transition);
343 
345  void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const;
346 
347  bool needsESPrefetching(Transition iTrans) const noexcept {
348  return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
349  }
350 
352  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
353  }
354 
355  virtual bool hasAcquire() const = 0;
356 
357  template <typename T>
358  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const*,
359  typename T::TransitionInfoType const&,
360  StreamID,
361  ParentContext const&,
362  typename T::Context const*);
363 
365 
366  void runAcquireAfterAsyncPrefetch(std::exception_ptr const*,
367  EventTransitionInfo const&,
368  ParentContext const&,
370 
371  std::exception_ptr handleExternalWorkException(std::exception_ptr const* iEPtr, ParentContext const& parentContext);
372 
373  template <typename T>
374  class RunModuleTask : public WaitingTask {
375  public:
377  typename T::TransitionInfoType const& transitionInfo,
378  ServiceToken const& token,
379  StreamID streamID,
380  ParentContext const& parentContext,
381  typename T::Context const* context,
382  tbb::task_group* iGroup)
383  : m_worker(worker),
384  m_transitionInfo(transitionInfo),
385  m_streamID(streamID),
386  m_parentContext(parentContext),
389  m_group(iGroup) {}
390 
394  EnableQueueGuard(EnableQueueGuard const&) = delete;
395  EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
397  EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
399  if (queue_) {
400  queue_->resume();
401  }
402  }
403  };
404 
405  void execute() final {
406  //Need to make the services available early so other services can see them
408 
409  //incase the emit causes an exception, we need a memory location
410  // to hold the exception_ptr
411  std::exception_ptr temp_excptr;
412  auto excptr = exceptionPtr();
413  if constexpr (T::isEvent_) {
414  if (!m_worker->hasAcquire()) {
415  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
416  CMS_SA_ALLOW try {
417  //pre was called in prefetchAsync
419  } catch (...) {
420  temp_excptr = std::current_exception();
421  if (not excptr) {
422  excptr = &temp_excptr;
423  }
424  }
425  }
426  }
427 
428  if (not excptr) {
429  if (auto queue = m_worker->serializeRunModule()) {
430  auto f = [worker = m_worker,
432  streamID = m_streamID,
433  parentContext = m_parentContext,
434  sContext = m_context,
435  serviceToken = m_serviceToken]() {
436  //Need to make the services available
437  ServiceRegistry::Operate operateRunModule(serviceToken.lock());
438 
439  //If needed, we pause the queue in begin transition and resume it
440  // at the end transition. This can guarantee that the module
441  // only processes one run or lumi at a time
443  std::exception_ptr* ptr = nullptr;
444  worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
445  };
446  //keep another global transition from running if necessary
448  if (gQueue) {
449  gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
450  gQueue->pause();
451  queue.push(*group, std::move(f));
452  });
453  } else {
454  queue.push(*m_group, std::move(f));
455  }
456  return;
457  }
458  }
459 
461  }
462 
463  private:
465  typename T::TransitionInfoType m_transitionInfo;
468  typename T::Context const* m_context;
470  tbb::task_group* m_group;
471  };
472 
473  // AcquireTask is only used for the Event case, but we define
474  // it as a template so all cases will compile.
475  // DUMMY exists to work around the C++ Standard prohibition on
476  // fully specializing templates nested in other classes.
477  template <typename T, typename DUMMY = void>
478  class AcquireTask : public WaitingTask {
479  public:
481  typename T::TransitionInfoType const&,
482  ServiceToken const&,
483  ParentContext const&,
485  void execute() final {}
486  };
487 
488  template <typename DUMMY>
490  public:
492  EventTransitionInfo const& eventTransitionInfo,
493  ServiceToken const& token,
494  ParentContext const& parentContext,
496  : m_worker(worker),
497  m_eventTransitionInfo(eventTransitionInfo),
498  m_parentContext(parentContext),
499  m_holder(std::move(holder)),
500  m_serviceToken(token) {}
501 
502  void execute() final {
503  //Need to make the services available early so other services can see them
504  ServiceRegistry::Operate guard(m_serviceToken.lock());
505 
506  //incase the emit causes an exception, we need a memory location
507  // to hold the exception_ptr
508  std::exception_ptr temp_excptr;
509  auto excptr = exceptionPtr();
510  // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
511  CMS_SA_ALLOW try {
512  //pre was called in prefetchAsync
513  m_worker->emitPostModuleEventPrefetchingSignal();
514  } catch (...) {
515  temp_excptr = std::current_exception();
516  if (not excptr) {
517  excptr = &temp_excptr;
518  }
519  }
520 
521  if (not excptr) {
522  if (auto queue = m_worker->serializeRunModule()) {
523  queue.push(*m_holder.group(),
524  [worker = m_worker,
525  info = m_eventTransitionInfo,
526  parentContext = m_parentContext,
527  serviceToken = m_serviceToken,
528  holder = m_holder]() {
529  //Need to make the services available
530  ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
531 
532  std::exception_ptr* ptr = nullptr;
533  worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
534  });
535  return;
536  }
537  }
538 
539  m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
540  }
541 
542  private:
548  };
549 
550  // This class does nothing unless there is an exception originating
551  // in an "External Worker". In that case, it handles converting the
552  // exception to a CMS exception and adding context to the exception.
554  public:
556  tbb::task_group* group,
557  WaitingTask* runModuleTask,
558  ParentContext const& parentContext);
559 
560  void execute() final;
561 
562  private:
565  tbb::task_group* m_group;
567  };
568 
569  std::atomic<int> timesRun_;
570  std::atomic<int> timesVisited_;
571  std::atomic<int> timesPassed_;
572  std::atomic<int> timesFailed_;
573  std::atomic<int> timesExcept_;
574  std::atomic<State> state_;
576  std::atomic<int> numberOfPathsLeftToRun_;
577 
579 
580  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
581  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
582 
583  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
584 
586 
588  std::atomic<bool> workStarted_;
590  bool moduleValid_ = true;
591  };
592 
593  namespace {
594  template <typename T>
595  class ModuleSignalSentry {
596  public:
597  ModuleSignalSentry(ActivityRegistry* a,
598  typename T::Context const* context,
599  ModuleCallingContext const* moduleCallingContext)
600  : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
601  if (a_)
602  T::preModuleSignal(a_, context, moduleCallingContext_);
603  }
604 
605  ~ModuleSignalSentry() {
606  if (a_)
607  T::postModuleSignal(a_, context_, moduleCallingContext_);
608  }
609 
610  private:
611  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
612  typename T::Context const* context_;
613  ModuleCallingContext const* moduleCallingContext_;
614  };
615 
616  } // namespace
617 
618  namespace workerhelper {
619  template <>
621  public:
623  static bool call(Worker* iWorker,
624  StreamID,
625  EventTransitionInfo const& info,
626  ActivityRegistry* /* actReg */,
627  ModuleCallingContext const* mcc,
628  Arg::Context const* /* context*/) {
629  //Signal sentry is handled by the module
630  return iWorker->implDo(info, mcc);
631  }
632  static void esPrefetchAsync(Worker* worker,
633  WaitingTaskHolder waitingTask,
634  ServiceToken const& token,
635  EventTransitionInfo const& info,
636  Transition transition) {
637  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
638  }
639  static bool wantsTransition(Worker const* iWorker) { return true; }
640  static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
641 
642  static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
643  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
644  };
645 
646  template <>
648  public:
650  static bool call(Worker* iWorker,
651  StreamID,
652  RunTransitionInfo const& info,
653  ActivityRegistry* actReg,
654  ModuleCallingContext const* mcc,
655  Arg::Context const* context) {
656  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
657  return iWorker->implDoBegin(info, mcc);
658  }
659  static void esPrefetchAsync(Worker* worker,
660  WaitingTaskHolder waitingTask,
661  ServiceToken const& token,
662  RunTransitionInfo const& info,
663  Transition transition) {
664  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
665  }
666  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
667  static bool needToRunSelection(Worker const* iWorker) { return false; }
668  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
669  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
670  };
671  template <>
673  public:
675  static bool call(Worker* iWorker,
676  StreamID id,
677  RunTransitionInfo const& info,
678  ActivityRegistry* actReg,
679  ModuleCallingContext const* mcc,
680  Arg::Context const* context) {
681  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
682  return iWorker->implDoStreamBegin(id, info, mcc);
683  }
684  static void esPrefetchAsync(Worker* worker,
685  WaitingTaskHolder waitingTask,
686  ServiceToken const& token,
687  RunTransitionInfo const& info,
688  Transition transition) {
689  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
690  }
691  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
692  static bool needToRunSelection(Worker const* iWorker) { return false; }
693  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
694  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
695  };
696  template <>
698  public:
700  static bool call(Worker* iWorker,
701  StreamID,
702  RunTransitionInfo const& info,
703  ActivityRegistry* actReg,
704  ModuleCallingContext const* mcc,
705  Arg::Context const* context) {
706  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
707  return iWorker->implDoEnd(info, mcc);
708  }
709  static void esPrefetchAsync(Worker* worker,
710  WaitingTaskHolder waitingTask,
711  ServiceToken const& token,
712  RunTransitionInfo const& info,
713  Transition transition) {
714  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
715  }
716  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
717  static bool needToRunSelection(Worker const* iWorker) { return false; }
718  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
719  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
720  };
721  template <>
723  public:
725  static bool call(Worker* iWorker,
726  StreamID id,
727  RunTransitionInfo const& info,
728  ActivityRegistry* actReg,
729  ModuleCallingContext const* mcc,
730  Arg::Context const* context) {
731  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
732  return iWorker->implDoStreamEnd(id, info, mcc);
733  }
734  static void esPrefetchAsync(Worker* worker,
735  WaitingTaskHolder waitingTask,
736  ServiceToken const& token,
737  RunTransitionInfo const& info,
738  Transition transition) {
739  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
740  }
741  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
742  static bool needToRunSelection(Worker const* iWorker) { return false; }
743  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
744  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
745  };
746 
747  template <>
749  public:
751  static bool call(Worker* iWorker,
752  StreamID,
753  LumiTransitionInfo const& info,
754  ActivityRegistry* actReg,
755  ModuleCallingContext const* mcc,
756  Arg::Context const* context) {
757  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
758  return iWorker->implDoBegin(info, mcc);
759  }
760  static void esPrefetchAsync(Worker* worker,
761  WaitingTaskHolder waitingTask,
762  ServiceToken const& token,
763  LumiTransitionInfo const& info,
764  Transition transition) {
765  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
766  }
767  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
768  static bool needToRunSelection(Worker const* iWorker) { return false; }
769  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
770  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
771  };
772  template <>
774  public:
776  static bool call(Worker* iWorker,
777  StreamID id,
778  LumiTransitionInfo const& info,
779  ActivityRegistry* actReg,
780  ModuleCallingContext const* mcc,
781  Arg::Context const* context) {
782  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
783  return iWorker->implDoStreamBegin(id, info, mcc);
784  }
785  static void esPrefetchAsync(Worker* worker,
786  WaitingTaskHolder waitingTask,
787  ServiceToken const& token,
788  LumiTransitionInfo const& info,
789  Transition transition) {
790  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
791  }
792  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
793  static bool needToRunSelection(Worker const* iWorker) { return false; }
794  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
795  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
796  };
797 
798  template <>
800  public:
802  static bool call(Worker* iWorker,
803  StreamID,
804  LumiTransitionInfo const& info,
805  ActivityRegistry* actReg,
806  ModuleCallingContext const* mcc,
807  Arg::Context const* context) {
808  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
809  return iWorker->implDoEnd(info, mcc);
810  }
811  static void esPrefetchAsync(Worker* worker,
812  WaitingTaskHolder waitingTask,
813  ServiceToken const& token,
814  LumiTransitionInfo const& info,
815  Transition transition) {
816  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
817  }
818  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
819  static bool needToRunSelection(Worker const* iWorker) { return false; }
820  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
821  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
822  };
823  template <>
825  public:
827  static bool call(Worker* iWorker,
828  StreamID id,
829  LumiTransitionInfo const& info,
830  ActivityRegistry* actReg,
831  ModuleCallingContext const* mcc,
832  Arg::Context const* context) {
833  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
834  return iWorker->implDoStreamEnd(id, info, mcc);
835  }
836  static void esPrefetchAsync(Worker* worker,
837  WaitingTaskHolder waitingTask,
838  ServiceToken const& token,
839  LumiTransitionInfo const& info,
840  Transition transition) {
841  worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
842  }
843  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
844  static bool needToRunSelection(Worker const* iWorker) { return false; }
845  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
846  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
847  };
848  template <>
850  public:
852  static bool call(Worker* iWorker,
853  StreamID,
855  ActivityRegistry* actReg,
856  ModuleCallingContext const* mcc,
857  Arg::Context const* context) {
858  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
859  return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
860  }
861  static void esPrefetchAsync(
863  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
864  static bool needToRunSelection(Worker const* iWorker) { return false; }
865  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
866  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
867  };
868  template <>
870  public:
872  static bool call(Worker* iWorker,
873  StreamID,
875  ActivityRegistry* actReg,
876  ModuleCallingContext const* mcc,
877  Arg::Context const* context) {
878  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
879  return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
880  }
881  static void esPrefetchAsync(
883  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsInputProcessBlocks(); }
884  static bool needToRunSelection(Worker const* iWorker) { return false; }
885  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
886  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
887  };
888  template <>
890  public:
892  static bool call(Worker* iWorker,
893  StreamID,
895  ActivityRegistry* actReg,
896  ModuleCallingContext const* mcc,
897  Arg::Context const* context) {
898  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
899  return iWorker->implDoEndProcessBlock(info.principal(), mcc);
900  }
901  static void esPrefetchAsync(
903  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
904  static bool needToRunSelection(Worker const* iWorker) { return false; }
905  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
906  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
907  };
908  } // namespace workerhelper
909 
910  template <typename T>
912  ServiceToken const& token,
913  ParentContext const& parentContext,
914  typename T::TransitionInfoType const& transitionInfo,
915  Transition iTransition) {
916  Principal const& principal = transitionInfo.principal();
917 
919 
920  if (principal.branchType() == InEvent) {
921  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
922  }
923 
924  workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
925  edPrefetchAsync(iTask, token, principal);
926 
927  if (principal.branchType() == InEvent) {
929  }
930  }
931 
932  template <typename T>
934  typename T::TransitionInfoType const& transitionInfo,
935  ServiceToken const& token,
936  StreamID streamID,
937  ParentContext const& parentContext,
938  typename T::Context const* context) {
940  return;
941  }
942 
943  //Need to check workStarted_ before adding to waitingTasks_
944  bool expected = false;
945  bool workStarted = workStarted_.compare_exchange_strong(expected, true);
946 
948  if constexpr (T::isEvent_) {
949  timesVisited_.fetch_add(1, std::memory_order_relaxed);
950  }
951 
952  if (workStarted) {
954 
955  //if have TriggerResults based selection we want to reject the event before doing prefetching
957  //We need to run the selection in a different task so that
958  // we can prefetch the data needed for the selection
959  WaitingTask* moduleTask =
960  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
961 
962  //make sure the task is either run or destroyed
963  struct DestroyTask {
964  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
965 
966  ~DestroyTask() {
967  auto p = m_task.exchange(nullptr);
968  if (p) {
969  TaskSentry s{p};
970  }
971  }
972 
973  edm::WaitingTask* release() { return m_task.exchange(nullptr); }
974 
975  private:
976  std::atomic<edm::WaitingTask*> m_task;
977  };
978  if constexpr (T::isEvent_) {
979  if (hasAcquire()) {
980  auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
981  ServiceWeakToken weakToken = token;
982  auto* group = task.group();
983  moduleTask = make_waiting_task(
984  [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
985  WaitingTaskWithArenaHolder runTaskHolder(
986  *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
987  AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
988  t.execute();
989  });
990  }
991  }
992  auto* group = task.group();
993  auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
994  ServiceWeakToken weakToken = token;
995  auto selectionTask =
996  make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
997  std::exception_ptr const*) mutable {
998  ServiceRegistry::Operate guard(weakToken.lock());
999  prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1000  weakToken.lock(),
1001  parentContext,
1002  info,
1003  T::transition_);
1004  });
1005  prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1006  } else {
1007  WaitingTask* moduleTask =
1008  new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1009  auto group = task.group();
1010  if constexpr (T::isEvent_) {
1011  if (hasAcquire()) {
1012  WaitingTaskWithArenaHolder runTaskHolder(
1013  *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1014  moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1015  }
1016  }
1017  prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1018  }
1019  }
1020  }
1021 
1022  template <typename T>
1023  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
1024  typename T::TransitionInfoType const& transitionInfo,
1025  StreamID streamID,
1026  ParentContext const& parentContext,
1027  typename T::Context const* context) {
1028  std::exception_ptr exceptionPtr;
1029  if (iEPtr) {
1030  assert(*iEPtr);
1031  if (shouldRethrowException(*iEPtr, parentContext, T::isEvent_)) {
1032  exceptionPtr = *iEPtr;
1033  setException<T::isEvent_>(exceptionPtr);
1034  } else {
1035  setPassed<T::isEvent_>();
1036  }
1038  } else {
1039  // Caught exception is propagated via WaitingTaskList
1040  CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1041  exceptionPtr = std::current_exception();
1042  }
1043  }
1044  waitingTasks_.doneWaiting(exceptionPtr);
1045  return exceptionPtr;
1046  }
1047 
1048  template <typename T>
1050  typename T::TransitionInfoType const& transitionInfo,
1051  ServiceToken const& serviceToken,
1052  StreamID streamID,
1053  ParentContext const& parentContext,
1054  typename T::Context const* context) {
1056  return;
1057  }
1058 
1059  //Need to check workStarted_ before adding to waitingTasks_
1060  bool expected = false;
1061  auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1062 
1064  if (workStarted) {
1065  ServiceWeakToken weakToken = serviceToken;
1066  auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1067  std::exception_ptr exceptionPtr;
1068  // Caught exception is propagated via WaitingTaskList
1069  CMS_SA_ALLOW try {
1070  //Need to make the services available
1071  ServiceRegistry::Operate guard(weakToken.lock());
1072 
1073  this->runModule<T>(info, streamID, parentContext, context);
1074  } catch (...) {
1075  exceptionPtr = std::current_exception();
1076  }
1077  this->waitingTasks_.doneWaiting(exceptionPtr);
1078  };
1079 
1080  if (needsESPrefetching(T::transition_)) {
1081  auto group = task.group();
1082  auto afterPrefetch =
1083  edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1084  if (iExcept) {
1085  this->waitingTasks_.doneWaiting(*iExcept);
1086  } else {
1087  if (auto queue = this->serializeRunModule()) {
1088  queue.push(*group, toDo);
1089  } else {
1090  group->run(toDo);
1091  }
1092  }
1093  });
1095  WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1096  } else {
1097  auto group = task.group();
1098  if (auto queue = this->serializeRunModule()) {
1099  queue.push(*group, toDo);
1100  } else {
1101  group->run(toDo);
1102  }
1103  }
1104  }
1105  }
1106 
1107  template <typename T>
1108  bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1109  StreamID streamID,
1110  ParentContext const& parentContext,
1111  typename T::Context const* context) {
1112  //unscheduled producers should advance this
1113  //if (T::isEvent_) {
1114  // ++timesVisited_;
1115  //}
1116  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1117  if constexpr (T::isEvent_) {
1118  timesRun_.fetch_add(1, std::memory_order_relaxed);
1119  }
1120 
1121  bool rc = true;
1122  try {
1123  convertException::wrap([&]() {
1125  this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1126 
1127  if (rc) {
1128  setPassed<T::isEvent_>();
1129  } else {
1130  setFailed<T::isEvent_>();
1131  }
1132  });
1133  } catch (cms::Exception& ex) {
1135  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_)) {
1137  setException<T::isEvent_>(std::current_exception());
1138  std::rethrow_exception(cached_exception_);
1139  } else {
1140  rc = setPassed<T::isEvent_>();
1141  }
1142  }
1143 
1144  return rc;
1145  }
1146 
1147  template <typename T>
1148  std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1149  StreamID streamID,
1150  ParentContext const& parentContext,
1151  typename T::Context const* context) {
1152  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1153  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
1154  return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1155  }
1156 } // namespace edm
1157 #endif
edm::Worker::RunModuleTask::EnableQueueGuard::operator=
EnableQueueGuard & operator=(EnableQueueGuard const &)=delete
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:903
edm::EventTransitionInfo
Definition: TransitionInfoTypes.h:26
edm::Worker::RunModuleTask
Definition: Worker.h:374
edm::Worker::consumesInfo
virtual std::vector< ConsumesInfo > consumesInfo() const =0
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::call
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:675
edm::StreamID
Definition: StreamID.h:30
edm::Worker::itemsToGetForSelection
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::call
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:827
edm::Worker::implDoPrePrefetchSelection
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
ModuleCallingContext.h
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:356
electrons_cff.bool
bool
Definition: electrons_cff.py:366
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::Arg
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:699
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::execute
void execute() final
Definition: Worker.h:502
ServiceRegistry.h
edm::Worker::itemsMayGet
virtual void itemsMayGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
MessageLogger.h
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:397
edm::eventsetup::ESRecordsToProxyIndices
Definition: ESRecordsToProxyIndices.h:35
edm::ModuleCallingContext::State::kPrefetching
edm::ProcessBlockTransitionInfo
Definition: TransitionInfoTypes.h:80
edm::Worker::implDoAccessInputProcessBlock
virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
edm::Worker::selectInputProcessBlocks
virtual void selectInputProcessBlocks(ProductRegistry const &, ProcessBlockHelperBase const &)=0
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:906
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::Arg
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:622
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:669
edm::ServiceWeakToken
Definition: ServiceToken.h:86
edm::Worker::runModule
bool runModule(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1108
edm::Worker::actReg_
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:583
edm::Worker::wantsGlobalRuns
virtual bool wantsGlobalRuns() const =0
BranchType.h
edm::Worker::implEndJob
virtual void implEndJob()=0
edm::Worker::convertCurrentProcessAlias
virtual void convertCurrentProcessAlias(std::string const &processName)=0
f
double f[11][100]
Definition: MuScleFitUtils.cc:78
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:864
edm::ModuleContextSentry
Definition: ModuleContextSentry.h:11
edm::LumiTransitionInfo
Definition: TransitionInfoTypes.h:42
propagate_const.h
edm::EventSetupImpl
Definition: EventSetupImpl.h:49
edm::Worker::HandleExternalWorkExceptionTask::m_worker
Worker * m_worker
Definition: Worker.h:563
WaitingTaskHolder.h
PlaceInPathContext.h
edm::OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:71
edm::Worker::timesPassed_
std::atomic< int > timesPassed_
Definition: Worker.h:571
modules
Definition: MuonCleanerBySegments.cc:35
edm::Worker::Fail
Definition: Worker.h:93
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:836
edm::Worker::kOne
Definition: Worker.h:95
edm::Worker::TaskQueueAdaptor::limited_
LimitedTaskQueue * limited_
Definition: Worker.h:98
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::Worker::runModuleDirectly
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1148
edm::Worker::beginJob
void beginJob()
Definition: Worker.cc:347
edm::SerialTaskQueue::resume
bool resume()
Resumes processing if the queue was paused.
Definition: SerialTaskQueue.cc:58
edm::Worker::timesVisited_
std::atomic< int > timesVisited_
Definition: Worker.h:570
edm::Worker::Exception
Definition: Worker.h:93
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:794
edm::Worker::wantsGlobalLuminosityBlocks
virtual bool wantsGlobalLuminosityBlocks() const =0
edm::Worker::resetModuleDescription
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:339
ProductResolverIndex.h
edm::Worker::wantsInputProcessBlocks
virtual bool wantsInputProcessBlocks() const =0
edm::Worker::postDoEvent
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:446
edm::Worker::actions_
ExceptionToActionTable const * actions_
Definition: Worker.h:580
FunctorTask.h
edm::Worker::HandleExternalWorkExceptionTask::execute
void execute() final
Definition: Worker.cc:512
edm::Worker::implBeginJob
virtual void implBeginJob()=0
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:785
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:905
edm::LuminosityBlockPrincipal
Definition: LuminosityBlockPrincipal.h:31
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:668
cms::cuda::stream
uint32_t const T *__restrict__ const uint32_t *__restrict__ int32_t int Histo::index_type cudaStream_t stream
Definition: HistoContainer.h:51
edm::Worker::respondToCloseOutputFile
void respondToCloseOutputFile()
Definition: Worker.h:175
edm::Worker::HandleExternalWorkExceptionTask::HandleExternalWorkExceptionTask
HandleExternalWorkExceptionTask(Worker *worker, tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:506
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:743
edm::Worker::Worker
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:91
edm::Worker::RunModuleTask::RunModuleTask
RunModuleTask(Worker *worker, typename T::TransitionInfoType const &transitionInfo, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context, tbb::task_group *iGroup)
Definition: Worker.h:376
cms::cuda::assert
assert(be >=bs)
edm::Worker::respondToOpenInputFile
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:173
edm::Worker::setException
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:328
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:274
edm::Worker::kGlobal
Definition: Worker.h:95
edm::Worker::RunModuleTask::EnableQueueGuard
Definition: Worker.h:391
edm::Worker::itemsToGet
virtual void itemsToGet(BranchType, std::vector< ProductResolverIndexAndSkipBit > &) const =0
edm::Principal
Definition: Principal.h:56
edm::Worker::numberOfPathsLeftToRun_
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:576
edm::Worker::registerThinnedAssociations
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:428
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
edm::SerialTaskQueue
Definition: SerialTaskQueue.h:67
mps_check.array
array
Definition: mps_check.py:216
edm::Worker::timesVisited
int timesVisited() const
Definition: Worker.h:233
edm::Worker::kProducer
Definition: Worker.h:94
edm::Worker::State
State
Definition: Worker.h:93
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:866
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::call
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:802
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:684
edm::Worker::RunModuleTask::EnableQueueGuard::~EnableQueueGuard
~EnableQueueGuard()
Definition: Worker.h:398
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:718
edm::ModuleCallingContext::State::kInvalid
edm::ProductResolverIndexHelper
Definition: ProductResolverIndexHelper.h:89
edm::ModuleCallingContext::moduleDescription
ModuleDescription const * moduleDescription() const
Definition: ModuleCallingContext.h:50
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_parentContext
const ParentContext m_parentContext
Definition: Worker.h:545
edm::Worker::RunModuleTask::execute
void execute() final
Definition: Worker.h:405
edm::Worker::implDoBegin
virtual bool implDoBegin(RunTransitionInfo const &, ModuleCallingContext const *)=0
edm::Worker::timesExcept
int timesExcept() const
Definition: Worker.h:236
edm::Worker::RunModuleTask::m_serviceToken
ServiceWeakToken m_serviceToken
Definition: Worker.h:469
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::Arg
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:674
edm::TaskSentry
Definition: TaskBase.h:50
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:667
edm::RunTransitionInfo
Definition: TransitionInfoTypes.h:64
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:811
edm::Worker::kLegacy
Definition: Worker.h:95
edm::Worker::wantsStreamLuminosityBlocks
virtual bool wantsStreamLuminosityBlocks() const =0
edm::BranchActionGlobalBegin
Definition: BranchActionType.h:12
ESIndices.h
edm::ProcessBlockPrincipal
Definition: ProcessBlockPrincipal.h:22
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:883
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:471
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:717
edm::Worker::implEndStream
virtual void implEndStream(StreamID)=0
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:861
edm::Worker::workStarted_
std::atomic< bool > workStarted_
Definition: Worker.h:588
edm::Worker::itemsShouldPutInEvent
virtual std::vector< ProductResolverIndex > const & itemsShouldPutInEvent() const =0
edm::BranchType
BranchType
Definition: BranchType.h:11
edm::Worker::ConcurrencyTypes
ConcurrencyTypes
Definition: Worker.h:95
edm::Worker::moduleConcurrencyType
virtual ConcurrencyTypes moduleConcurrencyType() const =0
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:694
edm::ModuleDescription
Definition: ModuleDescription.h:21
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:845
edm::NumBranchTypes
Definition: BranchType.h:11
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:769
edm::Worker::reset
void reset()
Definition: Worker.h:178
edm::Worker::operator=
Worker & operator=(Worker const &)=delete
edm::Exception
Definition: EDMException.h:77
edm::WaitingTaskList::reset
void reset()
Resets access to the resource so that added tasks will wait.
Definition: WaitingTaskList.cc:53
F
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:163
edm::OccurrenceTraits< RunPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:112
edm::WaitingTaskList
Definition: WaitingTaskList.h:84
edm::ProductRegistry
Definition: ProductRegistry.h:37
edm::Worker::implRespondToCloseInputFile
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
ModuleDescription.h
ActivityRegistry.h
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:846
visDQMUpload.context
context
Definition: visDQMUpload.py:37
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:818
edm::FileBlock
Definition: FileBlock.h:22
edm::Worker::description
ModuleDescription const * description() const
Definition: Worker.h:188
edm::Worker::AcquireTask
Definition: Worker.h:478
edm::ServiceToken
Definition: ServiceToken.h:42
edm::Worker::kStream
Definition: Worker.h:95
edm::WaitingTaskWithArenaHolder
Definition: WaitingTaskWithArenaHolder.h:34
alignCSCRings.s
s
Definition: alignCSCRings.py:92
edm::propagate_const
Definition: propagate_const.h:32
edm::Worker::implDoStreamEnd
virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
edm::Worker::kFilter
Definition: Worker.h:94
edm::Worker::doWorkAsync
void doWorkAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:933
edm::Worker::runAcquireAfterAsyncPrefetch
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.cc:467
edm::Worker::implDoBeginProcessBlock
virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
edm::LimitedTaskQueue
Definition: LimitedTaskQueue.h:39
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:885
edm::WaitingTask::exceptionPtr
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:904
edm::EventPrincipal
Definition: EventPrincipal.h:48
edm::Worker::earlyDeleteHelper_
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:585
edm::OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd >
Definition: OccurrenceTraits.h:192
edm::Worker::setActivityRegistry
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:110
edm::StreamContext
Definition: StreamContext.h:31
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:884
edm::BranchActionProcessBlockInput
Definition: BranchActionType.h:16
edm::Worker::timesRun_
std::atomic< int > timesRun_
Definition: Worker.h:569
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:640
edm::Worker::kLimited
Definition: Worker.h:95
edm::Worker::AcquireTask::AcquireTask
AcquireTask(Worker *, typename T::TransitionInfoType const &, ServiceToken const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.h:480
edm::Worker::resolvePutIndicies
virtual void resolvePutIndicies(BranchType iBranchType, std::unordered_multimap< std::string, std::tuple< TypeID const *, const char *, edm::ProductResolverIndex >> const &iIndicies)=0
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:639
edm::Worker::kAnalyzer
Definition: Worker.h:94
edm::convertException::wrap
auto wrap(F iFunc) -> decltype(iFunc())
Definition: ConvertException.h:19
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:716
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:253
edm::Worker::state_
std::atomic< State > state_
Definition: Worker.h:574
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
edm::Principal::branchType
BranchType const & branchType() const
Definition: Principal.h:181
ProductResolverIndexAndSkipBit.h
edm::Worker::HandleExternalWorkExceptionTask
Definition: Worker.h:553
edm::Worker::endJob
void endJob()
Definition: Worker.cc:363
edm::OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:38
edm::WaitingTaskList::doneWaiting
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
Definition: WaitingTaskList.cc:212
edm::Worker::doClearModule
virtual void doClearModule()=0
edm::Worker::HandleExternalWorkExceptionTask::m_parentContext
const ParentContext m_parentContext
Definition: Worker.h:566
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::Arg
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:724
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:692
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::AcquireTask
AcquireTask(Worker *worker, EventTransitionInfo const &eventTransitionInfo, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:491
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:793
edm::InEvent
Definition: BranchType.h:11
Transition.h
edm::OccurrenceTraits
Definition: OccurrenceTraits.h:35
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:901
edm::Worker::RunModuleTask::m_worker
Worker * m_worker
Definition: Worker.h:464
ConvertException.h
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::call
static bool call(Worker *iWorker, StreamID, EventTransitionInfo const &info, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:623
edm::Worker::Pass
Definition: Worker.h:93
edm::OccurrenceTraits< RunPrincipal, BranchActionStreamEnd >
Definition: OccurrenceTraits.h:152
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::call
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:872
LimitedTaskQueue.h
edm::Worker
Definition: Worker.h:91
edm::Worker::timesExcept_
std::atomic< int > timesExcept_
Definition: Worker.h:573
edm::Worker::TaskQueueAdaptor::push
void push(tbb::task_group &iG, F &&iF)
Definition: Worker.h:107
edm::Worker::implDoEnd
virtual bool implDoEnd(RunTransitionInfo const &, ModuleCallingContext const *)=0
createBeamHaloJobs.queue
queue
Definition: createBeamHaloJobs.py:343
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:768
edm::ParentContext
Definition: ParentContext.h:27
edm::Worker::RunModuleTask::m_parentContext
const ParentContext m_parentContext
Definition: Worker.h:467
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::call
static bool call(Worker *iWorker, StreamID id, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:776
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:819
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:666
edm::Worker::clearModule
void clearModule()
Definition: Worker.h:122
edm::ThinnedAssociationsHelper
Definition: ThinnedAssociationsHelper.h:37
edm::LimitedTaskQueue::push
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
Definition: LimitedTaskQueue.h:115
edm::Worker::runAcquire
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:452
edm::Worker::doWorkNoPrefetchingAsync
void doWorkNoPrefetchingAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1049
edm::GlobalContext
Definition: GlobalContext.h:29
edm::Worker::serializeRunModule
virtual TaskQueueAdaptor serializeRunModule()=0
edm::Worker::implDoAcquire
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:843
edm::Worker::wantsProcessBlocks
virtual bool wantsProcessBlocks() const =0
edm::ProcessBlockHelperBase
Definition: ProcessBlockHelperBase.h:18
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
Definition: Worker.h:821
edm::Worker::waitingTasks_
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:587
edm::workerhelper::CallImpl
Definition: Worker.h:85
edm::Transition
Transition
Definition: Transition.h:12
edm::OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput >
Definition: OccurrenceTraits.h:434
a
double a
Definition: hdecay.h:119
AlCaHLTBitMon_ParallelJobs.p
def p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::call
static bool call(Worker *iWorker, StreamID, LumiTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:751
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin >
Definition: OccurrenceTraits.h:233
fetchall_from_DQM_v2.release
release
Definition: fetchall_from_DQM_v2.py:92
WaitingTaskWithArenaHolder.h
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:32
edm::Worker::RunModuleTask::m_group
tbb::task_group * m_group
Definition: Worker.h:470
edm::Worker::implRegisterThinnedAssociations
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
edm::ModuleCallingContext::getStreamContext
StreamContext const * getStreamContext() const
Definition: ModuleCallingContext.cc:32
ExceptionMessages.h
edm::Worker::timesRun
int timesRun() const
Definition: Worker.h:232
edm::Worker::RunModuleTask::m_transitionInfo
T::TransitionInfoType m_transitionInfo
Definition: Worker.h:465
edm::BranchActionStreamBegin
Definition: BranchActionType.h:13
thread_safety_macros.h
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *)
Definition: Worker.h:642
edm::Worker::RunModuleTask::EnableQueueGuard::queue_
SerialTaskQueue * queue_
Definition: Worker.h:392
ESRecordsToProxyIndices
helper
Definition: helper.py:1
edm::Worker::HandleExternalWorkExceptionTask::m_runModuleTask
WaitingTask * m_runModuleTask
Definition: Worker.h:564
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:767
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:744
edm::ModuleCallingContext::setContext
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
Definition: ModuleCallingContext.cc:24
iEvent
int iEvent
Definition: GenABIO.cc:224
edm::Worker::RunModuleTask::m_streamID
StreamID m_streamID
Definition: Worker.h:466
edm::Worker::setEarlyDeleteHelper
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:337
edm::Worker::ranAcquireWithoutException_
bool ranAcquireWithoutException_
Definition: Worker.h:589
edm::Worker::exceptionContext
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:112
edm::Worker::implRespondToOpenInputFile
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
edm::Worker::numberOfPathsOn_
int numberOfPathsOn_
Definition: Worker.h:575
edm::Worker::~Worker
virtual ~Worker()
Definition: Worker.cc:108
edm::SerialTaskQueueChain::push
void push(tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue
Definition: SerialTaskQueueChain.h:75
edm::Worker::timesFailed
int timesFailed() const
Definition: Worker.h:235
edm::Worker::TaskQueueAdaptor::TaskQueueAdaptor
TaskQueueAdaptor()=default
edm::Worker::moduleCallingContext_
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:578
InternalContext.h
ExceptionActions.h
edm::Worker::setPassed
bool setPassed()
Definition: Worker.h:310
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::esPrefetchAsync
static void esPrefetchAsync(Worker *, WaitingTaskHolder, ServiceToken const &, ProcessBlockTransitionInfo const &, Transition)
Definition: Worker.h:881
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::Arg
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:649
edm::ServiceWeakToken::lock
ServiceToken lock() const
Definition: ServiceToken.h:101
edm::Worker::respondToCloseInputFile
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:174
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionProcessBlockInput > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:886
edm::Worker::callWhenDoneAsync
void callWhenDoneAsync(WaitingTaskHolder task)
Definition: Worker.h:167
edm::Worker::timesFailed_
std::atomic< int > timesFailed_
Definition: Worker.h:572
edm::Worker::TaskQueueAdaptor
Definition: Worker.h:96
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:844
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:863
edm::ExceptionToActionTable
Definition: ExceptionActions.h:16
edm::Worker::esPrefetchAsync
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:253
edm::Worker::implDo
virtual bool implDo(EventTransitionInfo const &, ModuleCallingContext const *)=0
edm::Worker::state
State state() const
Definition: Worker.h:237
edm::Worker::cached_exception_
std::exception_ptr cached_exception_
Definition: Worker.h:581
edm::Worker::addedToPath
void addedToPath()
Definition: Worker.h:230
PathContext.h
edm::Worker::HandleExternalWorkExceptionTask::m_group
tbb::task_group * m_group
Definition: Worker.h:565
edm::EarlyDeleteHelper
Definition: EarlyDeleteHelper.h:40
edm::Worker::setFailed
bool setFailed()
Definition: Worker.h:319
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::Worker::needsESPrefetching
bool needsESPrefetching(Transition iTrans) const noexcept
Definition: Worker.h:347
edm::Worker::esRecordsToGetFrom
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
WaitingTask.h
edm::Worker::moduleType
virtual Types moduleType() const =0
edm::Worker::timesPassed
int timesPassed() const
Definition: Worker.h:234
edm::Worker::runModuleAfterAsyncPrefetch
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *, typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1023
SimL1EmulatorRepack_CalouGT_cff.processName
processName
Definition: SimL1EmulatorRepack_CalouGT_cff.py:17
edm::Worker::RunModuleTask::EnableQueueGuard::EnableQueueGuard
EnableQueueGuard(EnableQueueGuard &&iGuard)
Definition: Worker.h:397
edm::Worker::wantsStreamRuns
virtual bool wantsStreamRuns() const =0
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, EventTransitionInfo const &info, Transition transition)
Definition: Worker.h:632
Types
Definition: Types.py:1
edm::Worker::itemsToGetFrom
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
edm::Worker::hasAcquire
virtual bool hasAcquire() const =0
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:865
OccurrenceTraits.h
EarlyDeleteHelper
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:741
edm::WaitingTaskList::add
void add(tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
Definition: WaitingTaskList.cc:125
ParentContext.h
edm::Worker::clearCounters
void clearCounters()
Definition: Worker.h:222
edm::Worker::implRespondToCloseOutputFile
virtual void implRespondToCloseOutputFile()=0
edm::WaitingTask
Definition: WaitingTask.h:36
WorkerParams.h
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalEnd > >::call
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:892
edm::Worker::globalRunsQueue
virtual SerialTaskQueue * globalRunsQueue()=0
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:659
eostools.move
def move(src, dest)
Definition: eostools.py:511
std
Definition: JetResolutionObject.h:76
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:795
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:709
edm::Worker::prefetchAsync
void prefetchAsync(WaitingTaskHolder, ServiceToken const &, ParentContext const &, typename T::TransitionInfoType const &, Transition)
Definition: Worker.h:911
edm::BranchActionGlobalEnd
Definition: BranchActionType.h:15
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::call
static bool call(Worker *iWorker, StreamID id, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:725
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::call
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:700
Frameworkfwd.h
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:770
T
long double T
Definition: Basic3DVectorLD.h:48
edm::Worker::updateLookup
virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &)=0
edm::Worker::TaskQueueAdaptor::TaskQueueAdaptor
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:101
edm::Worker::hasAccumulator
virtual bool hasAccumulator() const =0
edm::Worker::preActionBeforeRunEventAsync
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
edm::Worker::skipOnPath
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:437
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_worker
Worker * m_worker
Definition: Worker.h:543
edm::Worker::activityRegistry
ActivityRegistry * activityRegistry()
Definition: Worker.h:278
edm::Worker::modulesWhoseProductsAreConsumed
virtual void modulesWhoseProductsAreConsumed(std::array< std::vector< ModuleDescription const * > *, NumBranchTypes > &modules, std::vector< ModuleProcessName > &modulesInPreviousProcesses, ProductRegistry const &preg, std::map< std::string, ModuleDescription const * > const &labelsToDesc) const =0
edm::Worker::RunModuleTask::EnableQueueGuard::EnableQueueGuard
EnableQueueGuard(SerialTaskQueue *iQueue)
Definition: Worker.h:393
edm::Worker::edPrefetchAsync
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:324
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *iWorker)
Definition: Worker.h:719
edm::Worker::esItemsToGetFrom
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
edm::Worker::TaskQueueAdaptor::serial_
SerialTaskQueueChain * serial_
Definition: Worker.h:97
edm::Worker::prePrefetchSelectionAsync
void prePrefetchSelectionAsync(tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:204
edm::Worker::globalLuminosityBlocksQueue
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
edm::Worker::handleExternalWorkException
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:492
Exception.h
edm::OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd >
Definition: OccurrenceTraits.h:314
edm::Worker::implBeginStream
virtual void implBeginStream(StreamID)=0
edm::Worker::Ready
Definition: Worker.h:93
DUMMY
#define DUMMY
Definition: DMRtrends.cc:34
edm::Worker::AcquireTask::execute
void execute() final
Definition: Worker.h:485
edm::Worker::RunModuleTask::m_context
T::Context const * m_context
Definition: Worker.h:468
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > >::call
static bool call(Worker *iWorker, StreamID, RunTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:650
edm::Worker::timesPass
int timesPass() const
Definition: Worker.h:239
edm::Worker::emitPostModuleEventPrefetchingSignal
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:351
SerialTaskQueueChain.h
edm::workerhelper::CallImpl< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > >::enableGlobalQueue
static SerialTaskQueue * enableGlobalQueue(Worker *)
Definition: Worker.h:643
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, RunTransitionInfo const &info, Transition transition)
Definition: Worker.h:734
WaitingTaskList.h
cms::Exception
Definition: Exception.h:70
TransitionInfoTypes.h
edm::RunPrincipal
Definition: RunPrincipal.h:34
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_holder
WaitingTaskWithArenaHolder m_holder
Definition: Worker.h:546
edm::Worker::prePrefetchSelectionAsync
void prePrefetchSelectionAsync(tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
Definition: Worker.h:140
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > >::needToRunSelection
static bool needToRunSelection(Worker const *iWorker)
Definition: Worker.h:742
edm::SerialTaskQueueChain
Definition: SerialTaskQueueChain.h:32
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:691
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:820
ConsumesInfo.h
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_eventTransitionInfo
EventTransitionInfo m_eventTransitionInfo
Definition: Worker.h:544
StreamID.h
edm::ServiceRegistry::Operate
Definition: ServiceRegistry.h:40
edm::BranchActionStreamEnd
Definition: BranchActionType.h:14
CMS_THREAD_GUARD
#define CMS_THREAD_GUARD(_var_)
Definition: thread_safety_macros.h:6
edm::Worker::kOutputModule
Definition: Worker.h:94
ModuleContextSentry.h
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > >::esPrefetchAsync
static void esPrefetchAsync(Worker *worker, WaitingTaskHolder waitingTask, ServiceToken const &token, LumiTransitionInfo const &info, Transition transition)
Definition: Worker.h:760
edm::workerhelper::CallImpl< OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > >::wantsTransition
static bool wantsTransition(Worker const *iWorker)
Definition: Worker.h:792
edm::workerhelper::CallImpl< OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > >::pauseGlobalQueue
static SerialTaskQueue * pauseGlobalQueue(Worker *iWorker)
Definition: Worker.h:693
edm::Worker::beginStream
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:380
edm::Worker::moduleValid_
bool moduleValid_
Definition: Worker.h:590
edm::Worker::implNeedToRunSelection
virtual bool implNeedToRunSelection() const =0
edm::Worker::implDoStreamBegin
virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const &, ModuleCallingContext const *)=0
edm::Worker::AcquireTask< OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >, DUMMY >::m_serviceToken
ServiceWeakToken m_serviceToken
Definition: Worker.h:547
edm::Worker::endStream
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:404
edm::Worker::workerType
virtual std::string workerType() const =0
edm::Worker::implDoEndProcessBlock
virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const &, ModuleCallingContext const *)=0
FWCoreCommonFwd.h
edm::Worker::TaskQueueAdaptor::TaskQueueAdaptor
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:102
edm::Worker::shouldRethrowException
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:164
edm::workerhelper::CallImpl< OccurrenceTraits< ProcessBlockPrincipal, BranchActionGlobalBegin > >::call
static bool call(Worker *iWorker, StreamID, ProcessBlockTransitionInfo const &info, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:852
watchdog.group
group
Definition: watchdog.py:82
edm::Transition::NumberOfEventSetupTransitions
edm::ModuleCallingContext
Definition: ModuleCallingContext.h:29
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316