CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
53 
55 
56 #include <atomic>
57 #include <map>
58 #include <memory>
59 #include <sstream>
60 #include <string>
61 #include <vector>
62 #include <exception>
63 #include <unordered_map>
64 
65 namespace edm {
66  class EventPrincipal;
67  class EarlyDeleteHelper;
68  class ProductResolverIndexHelper;
69  class ProductResolverIndexAndSkipBit;
70  class StreamID;
71  class StreamContext;
72  class ProductRegistry;
73  class ThinnedAssociationsHelper;
74 
75  namespace workerhelper {
76  template <typename O>
77  class CallImpl;
78  }
79  namespace eventsetup {
81  }
82 
83  class Worker {
84  public:
85  enum State { Ready, Pass, Fail, Exception };
86  enum Types { kAnalyzer, kFilter, kProducer, kOutputModule };
88  SerialTaskQueueChain* serial_ = nullptr;
89  LimitedTaskQueue* limited_ = nullptr;
90 
91  TaskQueueAdaptor() = default;
92  TaskQueueAdaptor(SerialTaskQueueChain* iChain) : serial_(iChain) {}
93  TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
94 
95  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
96 
97  template <class F>
98  void push(F&& iF) {
99  if (serial_) {
100  serial_->push(iF);
101  } else {
102  limited_->push(iF);
103  }
104  }
105  template <class F>
106  void pushAndWait(F&& iF) {
107  if (serial_) {
108  serial_->pushAndWait(iF);
109  } else {
110  limited_->pushAndWait(iF);
111  }
112  }
113  };
114 
115  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
116  virtual ~Worker();
117 
118  Worker(Worker const&) = delete; // Disallow copying and moving
119  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
120 
121  virtual bool wantsGlobalRuns() const = 0;
122  virtual bool wantsGlobalLuminosityBlocks() const = 0;
123  virtual bool wantsStreamRuns() const = 0;
124  virtual bool wantsStreamLuminosityBlocks() const = 0;
125 
126  virtual SerialTaskQueue* globalRunsQueue() = 0;
127  virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
128 
129  template <typename T>
130  bool doWork(typename T::MyPrincipal const&,
131  EventSetupImpl const& c,
132  StreamID stream,
133  ParentContext const& parentContext,
134  typename T::Context const* context);
135 
136  void prePrefetchSelectionAsync(WaitingTask* task, ServiceToken const&, StreamID stream, EventPrincipal const*);
137 
138  void prePrefetchSelectionAsync(WaitingTask* task, ServiceToken const&, StreamID stream, void const*) {
139  assert(false);
140  }
141 
142  template <typename T>
143  void doWorkAsync(WaitingTask* task,
144  typename T::MyPrincipal const&,
145  EventSetupImpl const& c,
146  ServiceToken const& token,
147  StreamID stream,
148  ParentContext const& parentContext,
149  typename T::Context const* context);
150 
151  template <typename T>
152  void doWorkNoPrefetchingAsync(WaitingTask* task,
153  typename T::MyPrincipal const&,
154  EventSetupImpl const& c,
155  ServiceToken const& token,
156  StreamID stream,
157  ParentContext const& parentContext,
158  typename T::Context const* context);
159 
160  template <typename T>
161  std::exception_ptr runModuleDirectly(typename T::MyPrincipal const& ep,
162  EventSetupImpl const& es,
163  StreamID streamID,
164  ParentContext const& parentContext,
165  typename T::Context const* context);
166 
167  void callWhenDoneAsync(WaitingTask* task) { waitingTasks_.add(task); }
168  void skipOnPath();
169  void beginJob();
170  void endJob();
171  void beginStream(StreamID id, StreamContext& streamContext);
172  void endStream(StreamID id, StreamContext& streamContext);
173  void respondToOpenInputFile(FileBlock const& fb) { implRespondToOpenInputFile(fb); }
174  void respondToCloseInputFile(FileBlock const& fb) { implRespondToCloseInputFile(fb); }
175 
177  implRegisterThinnedAssociations(registry, helper);
178  }
179 
180  void reset() {
181  cached_exception_ = std::exception_ptr();
182  state_ = Ready;
183  waitingTasks_.reset();
184  workStarted_ = false;
185  numberOfPathsLeftToRun_ = numberOfPathsOn_;
186  }
187 
188  void postDoEvent(EventPrincipal const&);
189 
190  ModuleDescription const& description() const { return *(moduleCallingContext_.moduleDescription()); }
191  ModuleDescription const* descPtr() const { return moduleCallingContext_.moduleDescription(); }
194  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
195 
196  void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
197 
198  //Used to make EDGetToken work
199  virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
200  virtual void updateLookup(eventsetup::ESRecordsToProxyIndices const&) = 0;
201  virtual void resolvePutIndicies(
202  BranchType iBranchType,
203  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
204  iIndicies) = 0;
205 
206  virtual void modulesWhoseProductsAreConsumed(
207  std::vector<ModuleDescription const*>& modules,
208  ProductRegistry const& preg,
209  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
210 
211  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
212 
213  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
214 
215  virtual Types moduleType() const = 0;
216 
217  void clearCounters() {
218  timesRun_.store(0, std::memory_order_release);
219  timesVisited_.store(0, std::memory_order_release);
220  timesPassed_.store(0, std::memory_order_release);
221  timesFailed_.store(0, std::memory_order_release);
222  timesExcept_.store(0, std::memory_order_release);
223  }
224 
225  void addedToPath() { ++numberOfPathsOn_; }
226  //NOTE: calling state() is done to force synchronization across threads
227  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
228  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
229  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
230  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
231  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
232  State state() const { return state_; }
233 
234  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
235 
236  virtual bool hasAccumulator() const = 0;
237 
238  protected:
239  template <typename O>
241  virtual std::string workerType() const = 0;
242  virtual bool implDo(EventPrincipal const&, EventSetupImpl const& c, ModuleCallingContext const* mcc) = 0;
243 
244  virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
245  virtual bool implNeedToRunSelection() const = 0;
246 
247  virtual void implDoAcquire(EventPrincipal const&,
248  EventSetupImpl const& c,
249  ModuleCallingContext const* mcc,
250  WaitingTaskWithArenaHolder& holder) = 0;
251 
252  virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const& ep, ModuleCallingContext const* mcc) = 0;
253  virtual bool implDoBegin(RunPrincipal const& rp, EventSetupImpl const& c, ModuleCallingContext const* mcc) = 0;
254  virtual bool implDoStreamBegin(StreamID id,
255  RunPrincipal const& rp,
256  EventSetupImpl const& c,
257  ModuleCallingContext const* mcc) = 0;
258  virtual bool implDoStreamEnd(StreamID id,
259  RunPrincipal const& rp,
260  EventSetupImpl const& c,
261  ModuleCallingContext const* mcc) = 0;
262  virtual bool implDoEnd(RunPrincipal const& rp, EventSetupImpl const& c, ModuleCallingContext const* mcc) = 0;
263  virtual bool implDoBegin(LuminosityBlockPrincipal const& lbp,
264  EventSetupImpl const& c,
265  ModuleCallingContext const* mcc) = 0;
266  virtual bool implDoStreamBegin(StreamID id,
267  LuminosityBlockPrincipal const& lbp,
268  EventSetupImpl const& c,
269  ModuleCallingContext const* mcc) = 0;
270  virtual bool implDoStreamEnd(StreamID id,
271  LuminosityBlockPrincipal const& lbp,
272  EventSetupImpl const& c,
273  ModuleCallingContext const* mcc) = 0;
274  virtual bool implDoEnd(LuminosityBlockPrincipal const& lbp,
275  EventSetupImpl const& c,
276  ModuleCallingContext const* mcc) = 0;
277  virtual void implBeginJob() = 0;
278  virtual void implEndJob() = 0;
279  virtual void implBeginStream(StreamID) = 0;
280  virtual void implEndStream(StreamID) = 0;
281 
282  void resetModuleDescription(ModuleDescription const*);
283 
284  ActivityRegistry* activityRegistry() { return actReg_.get(); }
285 
286  private:
287  template <typename T>
288  bool runModule(typename T::MyPrincipal const&,
289  EventSetupImpl const& c,
290  StreamID stream,
291  ParentContext const& parentContext,
292  typename T::Context const* context);
293 
294  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
295  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
296 
297  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
298 
299  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
300 
301  virtual void preActionBeforeRunEventAsync(WaitingTask* iTask,
302  ModuleCallingContext const& moduleCallingContext,
303  Principal const& iPrincipal) const = 0;
304 
305  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
306  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
307 
308  virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
309 
310  virtual TaskQueueAdaptor serializeRunModule() = 0;
311 
312  static void exceptionContext(cms::Exception& ex, ModuleCallingContext const* mcc);
313 
314  /*This base class is used to hide the differences between the ID used
315  for Event, LuminosityBlock and Run. Using the base class allows us
316  to only convert the ID to string form if it is actually needed in
317  the call to shouldRethrowException.
318  */
320  public:
321  virtual std::string value() const = 0;
323  };
324 
325  template <typename T>
327  public:
328  TransitionIDValue(T const& iP) : p_(iP) {}
329  std::string value() const override {
330  std::ostringstream iost;
331  iost << p_.id();
332  return iost.str();
333  }
334 
335  private:
336  T const& p_;
337  };
338 
339  bool shouldRethrowException(std::exception_ptr iPtr,
340  ParentContext const& parentContext,
341  bool isEvent,
342  TransitionIDValueBase const& iID) const;
343 
344  template <bool IS_EVENT>
345  bool setPassed() {
346  if (IS_EVENT) {
347  timesPassed_.fetch_add(1, std::memory_order_relaxed);
348  }
349  state_ = Pass;
350  return true;
351  }
352 
353  template <bool IS_EVENT>
354  bool setFailed() {
355  if (IS_EVENT) {
356  timesFailed_.fetch_add(1, std::memory_order_relaxed);
357  }
358  state_ = Fail;
359  return false;
360  }
361 
362  template <bool IS_EVENT>
363  std::exception_ptr setException(std::exception_ptr iException) {
364  if (IS_EVENT) {
365  timesExcept_.fetch_add(1, std::memory_order_relaxed);
366  }
367  cached_exception_ = iException; // propagate_const<T> has no reset() function
368  state_ = Exception;
369  return cached_exception_;
370  }
371 
372  void prefetchAsync(WaitingTask*, ServiceToken const&, ParentContext const& parentContext, Principal const&);
373 
375  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
376  }
377 
378  virtual bool hasAcquire() const = 0;
379 
380  template <typename T>
381  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
382  typename T::MyPrincipal const& ep,
383  EventSetupImpl const& es,
384  StreamID streamID,
385  ParentContext const& parentContext,
386  typename T::Context const* context);
387 
388  void runAcquire(EventPrincipal const& ep,
389  EventSetupImpl const& es,
390  ParentContext const& parentContext,
392 
393  void runAcquireAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
394  EventPrincipal const& ep,
395  EventSetupImpl const& es,
396  ParentContext const& parentContext,
398 
399  std::exception_ptr handleExternalWorkException(std::exception_ptr const* iEPtr, ParentContext const& parentContext);
400 
401  template <typename T>
402  class RunModuleTask : public WaitingTask {
403  public:
405  typename T::MyPrincipal const& ep,
406  EventSetupImpl const& es,
407  ServiceToken const& token,
408  StreamID streamID,
409  ParentContext const& parentContext,
410  typename T::Context const* context)
411  : m_worker(worker),
412  m_principal(ep),
413  m_es(es),
414  m_streamID(streamID),
415  m_parentContext(parentContext),
416  m_context(context),
417  m_serviceToken(token) {}
418 
421  EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
422  EnableQueueGuard(EnableQueueGuard const&) = delete;
423  EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
424  EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
425  EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
427  if (queue_) {
428  queue_->resume();
429  }
430  }
431  };
432 
433  tbb::task* execute() override {
434  //Need to make the services available early so other services can see them
435  ServiceRegistry::Operate guard(m_serviceToken);
436 
437  //incase the emit causes an exception, we need a memory location
438  // to hold the exception_ptr
439  std::exception_ptr temp_excptr;
440  auto excptr = exceptionPtr();
441  if (T::isEvent_ && !m_worker->hasAcquire()) {
442  try {
443  //pre was called in prefetchAsync
444  m_worker->emitPostModuleEventPrefetchingSignal();
445  } catch (...) {
446  temp_excptr = std::current_exception();
447  if (not excptr) {
448  excptr = &temp_excptr;
449  }
450  }
451  }
452 
453  if (not excptr) {
454  if (auto queue = m_worker->serializeRunModule()) {
455  auto const& principal = m_principal;
456  auto& es = m_es;
457  auto f = [worker = m_worker,
458  &principal,
459  &es,
460  streamID = m_streamID,
461  parentContext = m_parentContext,
462  sContext = m_context,
463  serviceToken = m_serviceToken]() {
464  //Need to make the services available
465  ServiceRegistry::Operate guard(serviceToken);
466 
467  //If needed, we pause the queue in begin transition and resume it
468  // at the end transition. This guarantees that the module
469  // only processes one transition at a time
471  std::exception_ptr* ptr = nullptr;
472  worker->template runModuleAfterAsyncPrefetch<T>(ptr, principal, es, streamID, parentContext, sContext);
473  };
474  //keep another global transition from running if necessary
475  auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
476  if (gQueue) {
477  gQueue->push([queue, gQueue, f]() mutable {
478  gQueue->pause();
479  queue.push(std::move(f));
480  });
481  } else {
482  queue.push(std::move(f));
483  }
484  return nullptr;
485  }
486  }
487 
488  m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_principal, m_es, m_streamID, m_parentContext, m_context);
489  return nullptr;
490  }
491 
492  private:
494  typename T::MyPrincipal const& m_principal;
498  typename T::Context const* m_context;
500  };
501 
502  // AcquireTask is only used for the Event case, but we define
503  // it as a template so all cases will compile.
504  // DUMMY exists to work around the C++ Standard prohibition on
505  // fully specializing templates nested in other classes.
506  template <typename T, typename DUMMY = void>
507  class AcquireTask : public WaitingTask {
508  public:
510  typename T::MyPrincipal const& ep,
511  EventSetupImpl const& es,
512  ServiceToken const& token,
513  ParentContext const& parentContext,
514  WaitingTaskWithArenaHolder holder) {}
515  tbb::task* execute() override { return nullptr; }
516  };
517 
518  template <typename DUMMY>
520  public:
522  EventPrincipal const& ep,
523  EventSetupImpl const& es,
524  ServiceToken const& token,
525  ParentContext const& parentContext,
527  : m_worker(worker),
528  m_principal(ep),
529  m_es(es),
530  m_parentContext(parentContext),
531  m_holder(std::move(holder)),
532  m_serviceToken(token) {}
533 
534  tbb::task* execute() override {
535  //Need to make the services available early so other services can see them
536  ServiceRegistry::Operate guard(m_serviceToken);
537 
538  //incase the emit causes an exception, we need a memory location
539  // to hold the exception_ptr
540  std::exception_ptr temp_excptr;
541  auto excptr = exceptionPtr();
542  try {
543  //pre was called in prefetchAsync
544  m_worker->emitPostModuleEventPrefetchingSignal();
545  } catch (...) {
546  temp_excptr = std::current_exception();
547  if (not excptr) {
548  excptr = &temp_excptr;
549  }
550  }
551 
552  if (not excptr) {
553  if (auto queue = m_worker->serializeRunModule()) {
554  auto const& principal = m_principal;
555  auto& es = m_es;
556  queue.push([worker = m_worker,
557  &principal,
558  &es,
559  parentContext = m_parentContext,
560  serviceToken = m_serviceToken,
561  holder = m_holder]() {
562  //Need to make the services available
563  ServiceRegistry::Operate guard(serviceToken);
564 
565  std::exception_ptr* ptr = nullptr;
566  worker->runAcquireAfterAsyncPrefetch(ptr, principal, es, parentContext, holder);
567  });
568  return nullptr;
569  }
570  }
571 
572  m_worker->runAcquireAfterAsyncPrefetch(excptr, m_principal, m_es, m_parentContext, std::move(m_holder));
573  return nullptr;
574  }
575 
576  private:
583  };
584 
585  // This class does nothing unless there is an exception originating
586  // in an "External Worker". In that case, it handles converting the
587  // exception to a CMS exception and adding context to the exception.
589  public:
590  HandleExternalWorkExceptionTask(Worker* worker, WaitingTask* runModuleTask, ParentContext const& parentContext);
591 
592  tbb::task* execute() override;
593 
594  private:
598  };
599 
600  std::atomic<int> timesRun_;
601  std::atomic<int> timesVisited_;
602  std::atomic<int> timesPassed_;
603  std::atomic<int> timesFailed_;
604  std::atomic<int> timesExcept_;
605  std::atomic<State> state_;
607  std::atomic<int> numberOfPathsLeftToRun_;
608 
610 
611  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
612  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
613 
614  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
615 
616  edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
617 
618  edm::WaitingTaskList waitingTasks_;
619  std::atomic<bool> workStarted_;
620  bool ranAcquireWithoutException_;
621  };
622 
623  namespace {
624  template <typename T>
625  class ModuleSignalSentry {
626  public:
627  ModuleSignalSentry(ActivityRegistry* a,
628  typename T::Context const* context,
629  ModuleCallingContext const* moduleCallingContext)
630  : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
631  if (a_)
632  T::preModuleSignal(a_, context, moduleCallingContext_);
633  }
634 
635  ~ModuleSignalSentry() {
636  if (a_)
637  T::postModuleSignal(a_, context_, moduleCallingContext_);
638  }
639 
640  private:
641  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
642  typename T::Context const* context_;
643  ModuleCallingContext const* moduleCallingContext_;
644  };
645 
646  } // namespace
647 
648  namespace workerhelper {
649  template <>
651  public:
653  static bool call(Worker* iWorker,
654  StreamID,
655  EventPrincipal const& ep,
656  EventSetupImpl const& es,
657  ActivityRegistry* /* actReg */,
658  ModuleCallingContext const* mcc,
659  Arg::Context const* /* context*/) {
660  //Signal sentry is handled by the module
661  return iWorker->implDo(ep, es, mcc);
662  }
663  static bool wantsTransition(Worker const* iWorker) { return true; }
664  static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
665 
666  static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
667  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
668  };
669 
670  template <>
672  public:
674  static bool call(Worker* iWorker,
675  StreamID,
676  RunPrincipal const& ep,
677  EventSetupImpl const& es,
678  ActivityRegistry* actReg,
679  ModuleCallingContext const* mcc,
680  Arg::Context const* context) {
681  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
682  return iWorker->implDoBegin(ep, es, mcc);
683  }
684  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
685  static bool needToRunSelection(Worker const* iWorker) { return false; }
686  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
687  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
688  };
689  template <>
691  public:
693  static bool call(Worker* iWorker,
694  StreamID id,
695  RunPrincipal const& ep,
696  EventSetupImpl const& es,
697  ActivityRegistry* actReg,
698  ModuleCallingContext const* mcc,
699  Arg::Context const* context) {
700  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
701  return iWorker->implDoStreamBegin(id, ep, es, mcc);
702  }
703  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
704  static bool needToRunSelection(Worker const* iWorker) { return false; }
705  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
706  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
707  };
708  template <>
710  public:
712  static bool call(Worker* iWorker,
713  StreamID,
714  RunPrincipal const& ep,
715  EventSetupImpl const& es,
716  ActivityRegistry* actReg,
717  ModuleCallingContext const* mcc,
718  Arg::Context const* context) {
719  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
720  return iWorker->implDoEnd(ep, es, mcc);
721  }
722  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
723  static bool needToRunSelection(Worker const* iWorker) { return false; }
724  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
725  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
726  };
727  template <>
729  public:
731  static bool call(Worker* iWorker,
732  StreamID id,
733  RunPrincipal const& ep,
734  EventSetupImpl const& es,
735  ActivityRegistry* actReg,
736  ModuleCallingContext const* mcc,
737  Arg::Context const* context) {
738  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
739  return iWorker->implDoStreamEnd(id, ep, es, mcc);
740  }
741  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
742  static bool needToRunSelection(Worker const* iWorker) { return false; }
743  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
744  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
745  };
746 
747  template <>
749  public:
751  static bool call(Worker* iWorker,
752  StreamID,
753  LuminosityBlockPrincipal const& ep,
754  EventSetupImpl const& es,
755  ActivityRegistry* actReg,
756  ModuleCallingContext const* mcc,
757  Arg::Context const* context) {
758  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
759  return iWorker->implDoBegin(ep, es, mcc);
760  }
761  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
762  static bool needToRunSelection(Worker const* iWorker) { return false; }
763  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
764  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
765  };
766  template <>
768  public:
770  static bool call(Worker* iWorker,
771  StreamID id,
772  LuminosityBlockPrincipal const& ep,
773  EventSetupImpl const& es,
774  ActivityRegistry* actReg,
775  ModuleCallingContext const* mcc,
776  Arg::Context const* context) {
777  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
778  return iWorker->implDoStreamBegin(id, ep, es, mcc);
779  }
780  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
781  static bool needToRunSelection(Worker const* iWorker) { return false; }
782  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
783  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
784  };
785 
786  template <>
788  public:
790  static bool call(Worker* iWorker,
791  StreamID,
792  LuminosityBlockPrincipal const& ep,
793  EventSetupImpl const& es,
794  ActivityRegistry* actReg,
795  ModuleCallingContext const* mcc,
796  Arg::Context const* context) {
797  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
798  return iWorker->implDoEnd(ep, es, mcc);
799  }
800  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
801  static bool needToRunSelection(Worker const* iWorker) { return false; }
802  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
803  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
804  };
805  template <>
807  public:
809  static bool call(Worker* iWorker,
810  StreamID id,
811  LuminosityBlockPrincipal const& ep,
812  EventSetupImpl const& es,
813  ActivityRegistry* actReg,
814  ModuleCallingContext const* mcc,
815  Arg::Context const* context) {
816  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
817  return iWorker->implDoStreamEnd(id, ep, es, mcc);
818  }
819  static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
820  static bool needToRunSelection(Worker const* iWorker) { return false; }
821  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
822  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
823  };
824  } // namespace workerhelper
825 
826  template <typename T>
828  typename T::MyPrincipal const& ep,
829  EventSetupImpl const& es,
830  ServiceToken const& token,
831  StreamID streamID,
832  ParentContext const& parentContext,
833  typename T::Context const* context) {
835  return;
836  }
837 
838  waitingTasks_.add(task);
839  if (T::isEvent_) {
840  timesVisited_.fetch_add(1, std::memory_order_relaxed);
841  }
842 
843  bool expected = false;
844  if (workStarted_.compare_exchange_strong(expected, true)) {
845  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
846 
847  //if have TriggerResults based selection we want to reject the event before doing prefetching
849  //We need to run the selection in a different task so that
850  // we can prefetch the data needed for the selection
851  auto runTask =
852  new (tbb::task::allocate_root()) RunModuleTask<T>(this, ep, es, token, streamID, parentContext, context);
853 
854  //make sure the task is either run or destroyed
855  struct DestroyTask {
856  DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
857 
858  ~DestroyTask() {
859  auto p = m_task.load();
860  if (p) {
862  }
863  }
864 
866  auto t = m_task.load();
867  m_task.store(nullptr);
868  return t;
869  }
870 
871  std::atomic<edm::WaitingTask*> m_task;
872  };
873 
874  auto ownRunTask = std::make_shared<DestroyTask>(runTask);
875  auto selectionTask =
876  make_waiting_task(tbb::task::allocate_root(),
877  [ownRunTask, parentContext, &ep, token, this](std::exception_ptr const*) mutable {
878  ServiceRegistry::Operate guard(token);
879  prefetchAsync(ownRunTask->release(), token, parentContext, ep);
880  });
881  prePrefetchSelectionAsync(selectionTask, token, streamID, &ep);
882  } else {
883  WaitingTask* moduleTask =
884  new (tbb::task::allocate_root()) RunModuleTask<T>(this, ep, es, token, streamID, parentContext, context);
885  if (T::isEvent_ && hasAcquire()) {
886  WaitingTaskWithArenaHolder runTaskHolder(
887  new (tbb::task::allocate_root()) HandleExternalWorkExceptionTask(this, moduleTask, parentContext));
888  moduleTask = new (tbb::task::allocate_root())
889  AcquireTask<T>(this, ep, es, token, parentContext, std::move(runTaskHolder));
890  }
891  prefetchAsync(moduleTask, token, parentContext, ep);
892  }
893  }
894  }
895 
896  template <typename T>
897  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
898  typename T::MyPrincipal const& ep,
899  EventSetupImpl const& es,
900  StreamID streamID,
901  ParentContext const& parentContext,
902  typename T::Context const* context) {
903  std::exception_ptr exceptionPtr;
904  if (iEPtr) {
905  assert(*iEPtr);
907  if (shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
908  exceptionPtr = *iEPtr;
909  setException<T::isEvent_>(exceptionPtr);
910  } else {
911  setPassed<T::isEvent_>();
912  }
913  moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
914  } else {
915  try {
916  runModule<T>(ep, es, streamID, parentContext, context);
917  } catch (...) {
918  exceptionPtr = std::current_exception();
919  }
920  }
921  waitingTasks_.doneWaiting(exceptionPtr);
922  return exceptionPtr;
923  }
924 
925  template <typename T>
927  typename T::MyPrincipal const& principal,
928  EventSetupImpl const& es,
929  ServiceToken const& serviceToken,
930  StreamID streamID,
931  ParentContext const& parentContext,
932  typename T::Context const* context) {
934  return;
935  }
936  waitingTasks_.add(task);
937  bool expected = false;
938  if (workStarted_.compare_exchange_strong(expected, true)) {
939  auto toDo = [this, &principal, &es, streamID, parentContext, context, serviceToken]() {
940  std::exception_ptr exceptionPtr;
941  try {
942  //Need to make the services available
943  ServiceRegistry::Operate guard(serviceToken);
944 
945  this->runModule<T>(principal, es, streamID, parentContext, context);
946  } catch (...) {
947  exceptionPtr = std::current_exception();
948  }
949  this->waitingTasks_.doneWaiting(exceptionPtr);
950  };
951  if (auto queue = this->serializeRunModule()) {
952  queue.push(toDo);
953  } else {
954  auto task = make_functor_task(tbb::task::allocate_root(), toDo);
955  tbb::task::spawn(*task);
956  }
957  }
958  }
959 
960  template <typename T>
961  bool Worker::doWork(typename T::MyPrincipal const& ep,
962  EventSetupImpl const& es,
963  StreamID streamID,
964  ParentContext const& parentContext,
965  typename T::Context const* context) {
966  if (T::isEvent_) {
967  timesVisited_.fetch_add(1, std::memory_order_relaxed);
968  }
969  bool rc = false;
970 
971  switch (state_) {
972  case Ready:
973  break;
974  case Pass:
975  return true;
976  case Fail:
977  return false;
978  case Exception: {
979  std::rethrow_exception(cached_exception_);
980  }
981  }
982 
983  bool expected = false;
984  if (not workStarted_.compare_exchange_strong(expected, true)) {
985  //another thread beat us here
986  auto waitTask = edm::make_empty_waiting_task();
987  waitTask->increment_ref_count();
988 
989  waitingTasks_.add(waitTask.get());
990 
991  waitTask->wait_for_all();
992 
993  switch (state_) {
994  case Ready:
995  assert(false);
996  case Pass:
997  return true;
998  case Fail:
999  return false;
1000  case Exception: {
1001  std::rethrow_exception(cached_exception_);
1002  }
1003  }
1004  }
1005 
1006  //Need the context to be set until after any exception is resolved
1007  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1008 
1009  auto resetContext = [](ModuleCallingContext* iContext) {
1010  iContext->setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1011  };
1012  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_, resetContext);
1013 
1014  if (T::isEvent_) {
1015  //if have TriggerResults based selection we want to reject the event before doing prefetching
1017  auto waitTask = edm::make_empty_waiting_task();
1018  waitTask->set_ref_count(2);
1019  prePrefetchSelectionAsync(waitTask.get(), ServiceRegistry::instance().presentToken(), streamID, &ep);
1020  waitTask->decrement_ref_count();
1021  waitTask->wait_for_all();
1022 
1023  if (state() != Ready) {
1024  //The selection must have rejected this event
1025  return true;
1026  }
1027  }
1028  auto waitTask = edm::make_empty_waiting_task();
1029  {
1030  //Make sure signal is sent once the prefetching is done
1031  // [the 'pre' signal was sent in prefetchAsync]
1032  //The purpose of this block is to send the signal after wait_for_all
1033  auto sentryFunc = [this](void*) { emitPostModuleEventPrefetchingSignal(); };
1034  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(), sentryFunc);
1035 
1036  //set count to 2 since wait_for_all requires value to not go to 0
1037  waitTask->set_ref_count(2);
1038 
1039  prefetchAsync(waitTask.get(), ServiceRegistry::instance().presentToken(), parentContext, ep);
1040  waitTask->decrement_ref_count();
1041  waitTask->wait_for_all();
1042  }
1043  if (waitTask->exceptionPtr() != nullptr) {
1045  if (shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
1046  setException<T::isEvent_>(*waitTask->exceptionPtr());
1047  waitingTasks_.doneWaiting(cached_exception_);
1048  std::rethrow_exception(cached_exception_);
1049  } else {
1050  setPassed<T::isEvent_>();
1051  waitingTasks_.doneWaiting(nullptr);
1052  return true;
1053  }
1054  }
1055  }
1056 
1057  //successful prefetch so no reset necessary
1058  prefetchSentry.release();
1059  if (auto queue = serializeRunModule()) {
1060  auto serviceToken = ServiceRegistry::instance().presentToken();
1061  queue.pushAndWait([&]() {
1062  //Need to make the services available
1063  ServiceRegistry::Operate guard(serviceToken);
1064  try {
1065  rc = runModule<T>(ep, es, streamID, parentContext, context);
1066  } catch (...) {
1067  }
1068  });
1069  } else {
1070  try {
1071  rc = runModule<T>(ep, es, streamID, parentContext, context);
1072  } catch (...) {
1073  }
1074  }
1075  if (state_ == Exception) {
1076  waitingTasks_.doneWaiting(cached_exception_);
1077  std::rethrow_exception(cached_exception_);
1078  }
1079 
1080  waitingTasks_.doneWaiting(nullptr);
1081  return rc;
1082  }
1083 
1084  template <typename T>
1085  bool Worker::runModule(typename T::MyPrincipal const& ep,
1086  EventSetupImpl const& es,
1087  StreamID streamID,
1088  ParentContext const& parentContext,
1089  typename T::Context const* context) {
1090  //unscheduled producers should advance this
1091  //if (T::isEvent_) {
1092  // ++timesVisited_;
1093  //}
1094  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1095  if (T::isEvent_) {
1096  timesRun_.fetch_add(1, std::memory_order_relaxed);
1097  }
1098 
1099  bool rc = true;
1100  try {
1101  convertException::wrap([&]() {
1102  rc = workerhelper::CallImpl<T>::call(this, streamID, ep, es, actReg_.get(), &moduleCallingContext_, context);
1103 
1104  if (rc) {
1105  setPassed<T::isEvent_>();
1106  } else {
1107  setFailed<T::isEvent_>();
1108  }
1109  });
1110  } catch (cms::Exception& ex) {
1111  exceptionContext(ex, &moduleCallingContext_);
1113  if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
1114  assert(not cached_exception_);
1115  setException<T::isEvent_>(std::current_exception());
1116  std::rethrow_exception(cached_exception_);
1117  } else {
1118  rc = setPassed<T::isEvent_>();
1119  }
1120  }
1121 
1122  return rc;
1123  }
1124 
1125  template <typename T>
1126  std::exception_ptr Worker::runModuleDirectly(typename T::MyPrincipal const& ep,
1127  EventSetupImpl const& es,
1128  StreamID streamID,
1129  ParentContext const& parentContext,
1130  typename T::Context const* context) {
1131  timesVisited_.fetch_add(1, std::memory_order_relaxed);
1132  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
1133  return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
1134  }
1135 } // namespace edm
1136 #endif
void push(T &&iAction)
asynchronously pushes functor iAction into queue
tbb::task * execute() override
Definition: Worker.h:433
std::atomic< int > timesVisited_
Definition: Worker.h:601
ModuleDescription const & description() const
Definition: Worker.h:190
T::Context const * m_context
Definition: Worker.h:498
void callWhenDoneAsync(WaitingTask *task)
Definition: Worker.h:167
T::MyPrincipal const & m_principal
Definition: Worker.h:494
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:607
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:363
virtual bool wantsGlobalRuns() const =0
#define DUMMY
Definition: DMRtrends.cc:33
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:653
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:790
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:769
Definition: hltDiff.cc:290
virtual bool implDo(EventPrincipal const &, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
ParentContext const m_parentContext
Definition: Worker.h:497
def destroy(e)
Definition: pyrootRender.py:15
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< int > timesExcept_
Definition: Worker.h:604
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:711
std::atomic< int > timesFailed_
Definition: Worker.h:603
int timesPassed() const
Definition: Worker.h:229
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetupImpl const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:1126
void addedToPath()
Definition: Worker.h:225
State state() const
Definition: Worker.h:232
void clearCounters()
Definition: Worker.h:217
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:92
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
Definition: Worker.h:138
void push(T &&iAction)
asynchronously pushes functor iAction into queue
ActivityRegistry * activityRegistry()
Definition: Worker.h:284
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
void exceptionContext(std::ostream &, GlobalContext const &)
virtual bool implDoBegin(RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
ExceptionToActionTable const * actions_
Definition: Worker.h:611
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:712
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:674
ServiceToken presentToken() const
bool runModule(typename T::MyPrincipal const &, EventSetupImpl const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:1085
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
BranchType
Definition: BranchType.h:11
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetupImpl const &c, ServiceToken const &token, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:926
void beginJob()
Definition: Breakpoints.cc:14
tbb::task * execute() override
Definition: Worker.h:515
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetupImpl const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:897
AcquireTask(Worker *worker, EventPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:521
bool setFailed()
Definition: Worker.h:354
int timesExcept() const
Definition: Worker.h:231
void pushAndWait(F &&iF)
Definition: Worker.h:106
bool resume()
Resumes processing if the queue was paused.
def principal(options)
EnableQueueGuard(EnableQueueGuard &&iGuard)
Definition: Worker.h:425
void reset()
Definition: Worker.h:180
virtual bool implNeedToRunSelection() const =0
int timesVisited() const
Definition: Worker.h:228
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:673
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
ModuleDescription const * descPtr() const
Definition: Worker.h:191
EnableQueueGuard(SerialTaskQueue *iQueue)
Definition: Worker.h:421
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:751
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.h:176
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:789
virtual bool wantsStreamLuminosityBlocks() const =0
bool doWork(typename T::MyPrincipal const &, EventSetupImpl const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:961
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:770
std::atomic< State > state_
Definition: Worker.h:605
int timesRun() const
Definition: Worker.h:227
double f[11][100]
Definition: Types.py:1
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:731
int numberOfPathsOn_
Definition: Worker.h:606
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:808
bool setPassed()
Definition: Worker.h:345
TransitionIDValue(T const &iP)
Definition: Worker.h:328
AcquireTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:509
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:93
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:652
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:809
std::string value() const override
Definition: Worker.h:329
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetupImpl const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:693
virtual SerialTaskQueue * globalRunsQueue()=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:692
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:173
virtual bool wantsGlobalLuminosityBlocks() const =0
int timesPass() const
Definition: Worker.h:234
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
Definition: hltDiff.cc:291
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:404
int timesFailed() const
Definition: Worker.h:230
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:174
virtual bool wantsStreamRuns() const =0
virtual bool implDoEnd(RunPrincipal const &rp, EventSetupImpl const &c, ModuleCallingContext const *mcc)=0
HLT enums.
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:374
double a
Definition: hdecay.h:121
std::atomic< int > timesPassed_
Definition: Worker.h:602
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
auto wrap(F iFunc) -> decltype(iFunc())
static Interceptor::Registry registry("Interceptor")
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:750
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:281
std::atomic< int > timesRun_
Definition: Worker.h:600
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetupImpl const &c, ServiceToken const &token, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:827
long double T
EventSetupImpl const & m_es
Definition: Worker.h:495
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:730
def move(src, dest)
Definition: eostools.py:511
ServiceToken m_serviceToken
Definition: Worker.h:499
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0