CMS 3D CMS Logo

Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
53 
55 
56 #include <atomic>
57 #include <map>
58 #include <memory>
59 #include <sstream>
60 #include <string>
61 #include <vector>
62 #include <exception>
63 #include <unordered_map>
64 
65 namespace edm {
66  class EventPrincipal;
67  class EarlyDeleteHelper;
68  class ProductResolverIndexHelper;
69  class ProductResolverIndexAndSkipBit;
70  class StreamID;
71  class StreamContext;
72  class ProductRegistry;
73  class ThinnedAssociationsHelper;
74 
75  namespace workerhelper {
76  template< typename O> class CallImpl;
77  }
78 
79  class Worker {
80  public:
81  enum State { Ready, Pass, Fail, Exception };
82  enum Types { kAnalyzer, kFilter, kProducer, kOutputModule};
84  SerialTaskQueueChain* serial_ = nullptr;
85  LimitedTaskQueue* limited_ = nullptr;
86 
87  TaskQueueAdaptor() = default;
88  TaskQueueAdaptor(SerialTaskQueueChain* iChain): serial_(iChain) {}
89  TaskQueueAdaptor(LimitedTaskQueue* iLimited): limited_(iLimited) {}
90 
91  operator bool() { return serial_ != nullptr or limited_ != nullptr; }
92 
93  template <class F>
94  void push(F&& iF) {
95  if(serial_) {
96  serial_->push(iF);
97  } else {
98  limited_->push(iF);
99  }
100  }
101  template <class F>
102  void pushAndWait(F&& iF) {
103  if(serial_) {
104  serial_->pushAndWait(iF);
105  } else {
106  limited_->pushAndWait(iF);
107  }
108  }
109 
110  };
111 
112  Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
113  virtual ~Worker();
114 
115  Worker(Worker const&) = delete; // Disallow copying and moving
116  Worker& operator=(Worker const&) = delete; // Disallow copying and moving
117 
118  virtual bool wantsGlobalRuns() const = 0;
119  virtual bool wantsGlobalLuminosityBlocks() const = 0;
120  virtual bool wantsStreamRuns() const = 0;
121  virtual bool wantsStreamLuminosityBlocks() const = 0;
122 
123  virtual SerialTaskQueue* globalRunsQueue() = 0;
124  virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
125 
126  template <typename T>
127  bool doWork(typename T::MyPrincipal const&, EventSetup const& c,
128  StreamID stream,
129  ParentContext const& parentContext,
130  typename T::Context const* context);
131 
132  void prePrefetchSelectionAsync(WaitingTask* task,
133  ServiceToken const&,
134  StreamID stream,
135  EventPrincipal const*);
136 
138  ServiceToken const&,
139  StreamID stream,
140  void const*) {assert(false);}
141 
142  template <typename T>
143  void doWorkAsync(WaitingTask* task,
144  typename T::MyPrincipal const&, EventSetup const& c,
145  ServiceToken const& token, StreamID stream,
146  ParentContext const& parentContext,
147  typename T::Context const* context);
148 
149  template <typename T>
150  void doWorkNoPrefetchingAsync(WaitingTask* task,
151  typename T::MyPrincipal const&,
152  EventSetup const& c,
153  ServiceToken const& token,
154  StreamID stream,
155  ParentContext const& parentContext,
156  typename T::Context const* context);
157 
158  template <typename T>
159  std::exception_ptr runModuleDirectly(typename T::MyPrincipal const& ep,
160  EventSetup const& es,
161  StreamID streamID,
162  ParentContext const& parentContext,
163  typename T::Context const* context);
164 
166  waitingTasks_.add(task);
167  }
168  void skipOnPath();
169  void beginJob() ;
170  void endJob();
171  void beginStream(StreamID id, StreamContext& streamContext);
172  void endStream(StreamID id, StreamContext& streamContext);
173  void respondToOpenInputFile(FileBlock const& fb) {implRespondToOpenInputFile(fb);}
174  void respondToCloseInputFile(FileBlock const& fb) {implRespondToCloseInputFile(fb);}
175 
176  void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper) { implRegisterThinnedAssociations(registry, helper); }
177 
178  void reset() {
179  cached_exception_ = std::exception_ptr();
180  state_ = Ready;
181  waitingTasks_.reset();
182  workStarted_ = false;
183  numberOfPathsLeftToRun_ = numberOfPathsOn_;
184  }
185 
186  void postDoEvent(EventPrincipal const&);
187 
188  ModuleDescription const& description() const {return *(moduleCallingContext_.moduleDescription());}
189  ModuleDescription const* descPtr() const {return moduleCallingContext_.moduleDescription(); }
192  void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
193 
194  void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
195 
196  //Used to make EDGetToken work
197  virtual void updateLookup(BranchType iBranchType,
198  ProductResolverIndexHelper const&) = 0;
199  virtual void resolvePutIndicies(BranchType iBranchType,
200  std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const& iIndicies) = 0;
201 
202  virtual void modulesWhoseProductsAreConsumed(std::vector<ModuleDescription const*>& modules,
203  ProductRegistry const& preg,
204  std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
205 
206  virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
207 
208  virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
209 
210  virtual Types moduleType() const =0;
211 
212  void clearCounters() {
213  timesRun_.store(0,std::memory_order_release);
214  timesVisited_.store(0,std::memory_order_release);
215  timesPassed_.store(0,std::memory_order_release);
216  timesFailed_.store(0,std::memory_order_release);
217  timesExcept_.store(0,std::memory_order_release);
218  }
219 
220  void addedToPath() {
221  ++numberOfPathsOn_;
222  }
223  //NOTE: calling state() is done to force synchronization across threads
224  int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
225  int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
226  int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
227  int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
228  int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
229  State state() const { return state_; }
230 
231  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
232 
233  virtual bool hasAccumulator() const = 0;
234 
235  protected:
236  template<typename O> friend class workerhelper::CallImpl;
237  virtual std::string workerType() const = 0;
238  virtual bool implDo(EventPrincipal const&, EventSetup const& c,
239  ModuleCallingContext const* mcc) = 0;
240 
241  virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
242  virtual bool implNeedToRunSelection() const = 0;
243 
244  virtual void implDoAcquire(EventPrincipal const&, EventSetup const& c,
245  ModuleCallingContext const* mcc,
246  WaitingTaskWithArenaHolder& holder) = 0;
247 
248  virtual bool implDoPrePrefetchSelection(StreamID id,
249  EventPrincipal const& ep,
250  ModuleCallingContext const* mcc) = 0;
251  virtual bool implDoBegin(RunPrincipal const& rp, EventSetup const& c,
252  ModuleCallingContext const* mcc) = 0;
253  virtual bool implDoStreamBegin(StreamID id, RunPrincipal const& rp, EventSetup const& c,
254  ModuleCallingContext const* mcc) = 0;
255  virtual bool implDoStreamEnd(StreamID id, RunPrincipal const& rp, EventSetup const& c,
256  ModuleCallingContext const* mcc) = 0;
257  virtual bool implDoEnd(RunPrincipal const& rp, EventSetup const& c,
258  ModuleCallingContext const* mcc) = 0;
259  virtual bool implDoBegin(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
260  ModuleCallingContext const* mcc) = 0;
261  virtual bool implDoStreamBegin(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
262  ModuleCallingContext const* mcc) = 0;
263  virtual bool implDoStreamEnd(StreamID id, LuminosityBlockPrincipal const& lbp, EventSetup const& c,
264  ModuleCallingContext const* mcc) = 0;
265  virtual bool implDoEnd(LuminosityBlockPrincipal const& lbp, EventSetup const& c,
266  ModuleCallingContext const* mcc) = 0;
267  virtual void implBeginJob() = 0;
268  virtual void implEndJob() = 0;
269  virtual void implBeginStream(StreamID) = 0;
270  virtual void implEndStream(StreamID) = 0;
271 
272  void resetModuleDescription(ModuleDescription const*);
273 
274  ActivityRegistry* activityRegistry() { return actReg_.get(); }
275 
276  private:
277 
278  template <typename T>
279  bool runModule(typename T::MyPrincipal const&, EventSetup const& c,
280  StreamID stream,
281  ParentContext const& parentContext,
282  typename T::Context const* context);
283 
284  virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
285  virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
286 
287  virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
288 
289 
290  virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
291 
292  virtual void preActionBeforeRunEventAsync(WaitingTask* iTask, ModuleCallingContext const& moduleCallingContext, Principal const& iPrincipal) const = 0;
293 
294  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
295  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
296 
297  virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
298 
299  virtual TaskQueueAdaptor serializeRunModule() = 0;
300 
301  static void exceptionContext(cms::Exception& ex,
302  ModuleCallingContext const* mcc);
303 
304  /*This base class is used to hide the differences between the ID used
305  for Event, LuminosityBlock and Run. Using the base class allows us
306  to only convert the ID to string form if it is actually needed in
307  the call to shouldRethrowException.
308  */
310  public:
311  virtual std::string value() const = 0;
313  };
314 
315  template< typename T>
317  public:
318  TransitionIDValue(T const& iP): p_(iP) {}
319  std::string value() const override {
320  std::ostringstream iost;
321  iost<<p_.id();
322  return iost.str();
323  }
324  private:
325  T const& p_;
326 
327  };
328 
329  bool shouldRethrowException(std::exception_ptr iPtr,
330  ParentContext const& parentContext,
331  bool isEvent,
332  TransitionIDValueBase const& iID) const;
333 
334  template<bool IS_EVENT>
335  bool setPassed() {
336  if(IS_EVENT) {
337  timesPassed_.fetch_add(1,std::memory_order_relaxed);
338  }
339  state_ = Pass;
340  return true;
341  }
342 
343  template<bool IS_EVENT>
344  bool setFailed() {
345  if(IS_EVENT) {
346  timesFailed_.fetch_add(1,std::memory_order_relaxed);
347  }
348  state_ = Fail;
349  return false;
350  }
351 
352  template<bool IS_EVENT>
353  std::exception_ptr setException(std::exception_ptr iException) {
354  if (IS_EVENT) {
355  timesExcept_.fetch_add(1,std::memory_order_relaxed);
356  }
357  cached_exception_ = iException; // propagate_const<T> has no reset() function
358  state_ = Exception;
359  return cached_exception_;
360  }
361 
362  void prefetchAsync(WaitingTask*,
363  ServiceToken const&,
364  ParentContext const& parentContext,
365  Principal const& );
366 
368  actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
369  }
370 
371  virtual bool hasAcquire() const = 0;
372 
373  template<typename T>
374  std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const * iEPtr,
375  typename T::MyPrincipal const& ep,
376  EventSetup const& es,
377  StreamID streamID,
378  ParentContext const& parentContext,
379  typename T::Context const* context);
380 
381  void runAcquire(EventPrincipal const& ep,
382  EventSetup const& es,
383  ParentContext const& parentContext,
385 
386  void runAcquireAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
387  EventPrincipal const& ep,
388  EventSetup const& es,
389  ParentContext const& parentContext,
391 
392  std::exception_ptr handleExternalWorkException(std::exception_ptr const* iEPtr,
393  ParentContext const& parentContext);
394 
395  template< typename T>
396  class RunModuleTask : public WaitingTask {
397  public:
399  typename T::MyPrincipal const& ep,
400  EventSetup const& es,
401  ServiceToken const& token,
402  StreamID streamID,
403  ParentContext const& parentContext,
404  typename T::Context const* context):
405  m_worker(worker),
406  m_principal(ep),
407  m_es(es),
408  m_streamID(streamID),
409  m_parentContext(parentContext),
410  m_context(context),
411  m_serviceToken(token) {}
412 
415  EnableQueueGuard(SerialTaskQueue* iQueue): queue_{iQueue} {}
416  EnableQueueGuard(EnableQueueGuard const&) = delete;
417  EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
418  EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
419  EnableQueueGuard(EnableQueueGuard&& iGuard): queue_{iGuard.queue_} {iGuard.queue_ = nullptr;}
420  ~EnableQueueGuard() { if(queue_) {queue_->resume();} }
421  };
422 
423  tbb::task* execute() override {
424  //Need to make the services available early so other services can see them
425  ServiceRegistry::Operate guard(m_serviceToken);
426 
427  //incase the emit causes an exception, we need a memory location
428  // to hold the exception_ptr
429  std::exception_ptr temp_excptr;
430  auto excptr = exceptionPtr();
431  if(T::isEvent_ && !m_worker->hasAcquire()) {
432  try {
433  //pre was called in prefetchAsync
434  m_worker->emitPostModuleEventPrefetchingSignal();
435  }catch(...) {
436  temp_excptr = std::current_exception();
437  if(not excptr) {
438  excptr = &temp_excptr;
439  }
440  }
441  }
442 
443  if( not excptr) {
444  if(auto queue = m_worker->serializeRunModule()) {
445  auto const & principal = m_principal;
446  auto& es = m_es;
447  auto f = [worker = m_worker, &principal,
448  &es, streamID = m_streamID,
449  parentContext = m_parentContext,
450  sContext = m_context, serviceToken = m_serviceToken]()
451  {
452  //Need to make the services available
453  ServiceRegistry::Operate guard(serviceToken);
454 
455  //If needed, we pause the queue in begin transition and resume it
456  // at the end transition. This guarantees that the module
457  // only processes one transition at a time
459  std::exception_ptr* ptr = nullptr;
460  worker->template runModuleAfterAsyncPrefetch<T>(ptr,
461  principal,
462  es,
463  streamID,
464  parentContext,
465  sContext);
466  };
467  //keep another global transition from running if necessary
468  auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
469  if(gQueue) {
470  gQueue->push( [queue,gQueue, f]() mutable {
471  gQueue->pause();
472  queue.push(std::move(f));} );
473  } else {
474  queue.push( std::move(f) );
475  }
476  return nullptr;
477  }
478  }
479 
480  m_worker->runModuleAfterAsyncPrefetch<T>(excptr,
481  m_principal,
482  m_es,
483  m_streamID,
484  m_parentContext,
485  m_context);
486  return nullptr;
487  }
488 
489  private:
491  typename T::MyPrincipal const& m_principal;
492  EventSetup const& m_es;
495  typename T::Context const* m_context;
497  };
498 
499  // AcquireTask is only used for the Event case, but we define
500  // it as a template so all cases will compile.
501  // DUMMY exists to work around the C++ Standard prohibition on
502  // fully specializing templates nested in other classes.
503  template <typename T, typename DUMMY = void>
504  class AcquireTask : public WaitingTask {
505  public:
507  typename T::MyPrincipal const& ep,
508  EventSetup const& es,
509  ServiceToken const& token,
510  ParentContext const& parentContext,
511  WaitingTaskWithArenaHolder holder) {}
512  tbb::task* execute() override { return nullptr; }
513  };
514 
515  template <typename DUMMY>
517  public:
519  EventPrincipal const& ep,
520  EventSetup const& es,
521  ServiceToken const& token,
522  ParentContext const& parentContext,
524  m_worker(worker),
525  m_principal(ep),
526  m_es(es),
527  m_parentContext(parentContext),
528  m_holder(std::move(holder)),
529  m_serviceToken(token) {}
530 
531  tbb::task* execute() override {
532  //Need to make the services available early so other services can see them
533  ServiceRegistry::Operate guard(m_serviceToken);
534 
535  //incase the emit causes an exception, we need a memory location
536  // to hold the exception_ptr
537  std::exception_ptr temp_excptr;
538  auto excptr = exceptionPtr();
539  try {
540  //pre was called in prefetchAsync
541  m_worker->emitPostModuleEventPrefetchingSignal();
542  } catch(...) {
543  temp_excptr = std::current_exception();
544  if(not excptr) {
545  excptr = &temp_excptr;
546  }
547  }
548 
549  if( not excptr) {
550  if(auto queue = m_worker->serializeRunModule()) {
551  auto const & principal = m_principal;
552  auto& es = m_es;
553  queue.push( [worker = m_worker, &principal,
554  &es, parentContext = m_parentContext,
555  serviceToken = m_serviceToken, holder = m_holder]()
556  {
557  //Need to make the services available
558  ServiceRegistry::Operate guard(serviceToken);
559 
560  std::exception_ptr* ptr = nullptr;
561  worker->runAcquireAfterAsyncPrefetch(ptr,
562  principal,
563  es,
564  parentContext,
565  holder);
566  });
567  return nullptr;
568  }
569  }
570 
571  m_worker->runAcquireAfterAsyncPrefetch(excptr,
572  m_principal,
573  m_es,
574  m_parentContext,
575  std::move(m_holder));
576  return nullptr;
577  }
578 
579  private:
582  EventSetup const& m_es;
586  };
587 
588  // This class does nothing unless there is an exception originating
589  // in an "External Worker". In that case, it handles converting the
590  // exception to a CMS exception and adding context to the exception.
592  public:
593 
595  WaitingTask* runModuleTask,
596  ParentContext const& parentContext);
597 
598  tbb::task* execute() override;
599 
600  private:
604  };
605 
606  std::atomic<int> timesRun_;
607  std::atomic<int> timesVisited_;
608  std::atomic<int> timesPassed_;
609  std::atomic<int> timesFailed_;
610  std::atomic<int> timesExcept_;
611  std::atomic<State> state_;
613  std::atomic<int> numberOfPathsLeftToRun_;
614 
616 
617  ExceptionToActionTable const* actions_; // memory assumed to be managed elsewhere
618  CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_; // if state is 'exception'
619 
620  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
621 
622  edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
623 
624  edm::WaitingTaskList waitingTasks_;
625  std::atomic<bool> workStarted_;
626  bool ranAcquireWithoutException_;
627  };
628 
629  namespace {
630  template <typename T>
631  class ModuleSignalSentry {
632  public:
633  ModuleSignalSentry(ActivityRegistry *a,
634  typename T::Context const* context,
635  ModuleCallingContext const* moduleCallingContext) :
636  a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
637 
638  if(a_) T::preModuleSignal(a_, context, moduleCallingContext_);
639  }
640 
641  ~ModuleSignalSentry() {
642  if(a_) T::postModuleSignal(a_, context_, moduleCallingContext_);
643  }
644 
645  private:
646  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
647  typename T::Context const* context_;
648  ModuleCallingContext const* moduleCallingContext_;
649  };
650 
651  }
652 
653  namespace workerhelper {
654  template<>
656  public:
658  static bool call(Worker* iWorker, StreamID,
659  EventPrincipal const& ep, EventSetup const& es,
660  ActivityRegistry* /* actReg */,
661  ModuleCallingContext const* mcc,
662  Arg::Context const* /* context*/) {
663  //Signal sentry is handled by the module
664  return iWorker->implDo(ep,es, mcc);
665  }
666  static bool wantsTransition(Worker const* iWorker) {
667  return true;
668  }
669  static bool needToRunSelection( Worker const* iWorker) {
670  return iWorker->implNeedToRunSelection();
671  }
672 
673  static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr;}
674  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr;}
675  };
676 
677  template<>
679  public:
681  static bool call(Worker* iWorker,StreamID,
682  RunPrincipal const& ep, EventSetup const& es,
683  ActivityRegistry* actReg,
684  ModuleCallingContext const* mcc,
685  Arg::Context const* context) {
686  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
687  return iWorker->implDoBegin(ep,es, mcc);
688  }
689  static bool wantsTransition(Worker const* iWorker) {
690  return iWorker->wantsGlobalRuns();
691  }
692  static bool needToRunSelection( Worker const* iWorker) {
693  return false;
694  }
695  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue();}
696  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr;}
697 
698  };
699  template<>
701  public:
703  static bool call(Worker* iWorker,StreamID id,
704  RunPrincipal const & ep, EventSetup const& es,
705  ActivityRegistry* actReg,
706  ModuleCallingContext const* mcc,
707  Arg::Context const* context) {
708  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
709  return iWorker->implDoStreamBegin(id,ep,es, mcc);
710  }
711  static bool wantsTransition(Worker const* iWorker) {
712  return iWorker->wantsStreamRuns();
713  }
714  static bool needToRunSelection( Worker const* iWorker) {
715  return false;
716  }
717  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr;}
718  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr;}
719 
720  };
721  template<>
723  public:
725  static bool call(Worker* iWorker,StreamID,
726  RunPrincipal const& ep, EventSetup const& es,
727  ActivityRegistry* actReg,
728  ModuleCallingContext const* mcc,
729  Arg::Context const* context) {
730  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
731  return iWorker->implDoEnd(ep,es, mcc);
732  }
733  static bool wantsTransition(Worker const* iWorker) {
734  return iWorker->wantsGlobalRuns();
735  }
736  static bool needToRunSelection( Worker const* iWorker) {
737  return false;
738  }
739  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr;}
740  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue();}
741  };
742  template<>
744  public:
746  static bool call(Worker* iWorker,StreamID id,
747  RunPrincipal const& ep, EventSetup const& es,
748  ActivityRegistry* actReg,
749  ModuleCallingContext const* mcc,
750  Arg::Context const* context) {
751  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
752  return iWorker->implDoStreamEnd(id,ep,es, mcc);
753  }
754  static bool wantsTransition(Worker const* iWorker) {
755  return iWorker->wantsStreamRuns();
756  }
757  static bool needToRunSelection( Worker const* iWorker) {
758  return false;
759  }
760  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr;}
761  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr;}
762  };
763 
764  template<>
766  public:
768  static bool call(Worker* iWorker,StreamID,
769  LuminosityBlockPrincipal const& ep, EventSetup const& es,
770  ActivityRegistry* actReg,
771  ModuleCallingContext const* mcc,
772  Arg::Context const* context) {
773  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
774  return iWorker->implDoBegin(ep,es, mcc);
775  }
776  static bool wantsTransition(Worker const* iWorker) {
777  return iWorker->wantsGlobalLuminosityBlocks();
778  }
779  static bool needToRunSelection( Worker const* iWorker) {
780  return false;
781  }
782  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue();}
783  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr;}
784  };
785  template<>
787  public:
789  static bool call(Worker* iWorker,StreamID id,
790  LuminosityBlockPrincipal const& ep, EventSetup const& es,
791  ActivityRegistry* actReg,
792  ModuleCallingContext const* mcc,
793  Arg::Context const* context) {
794  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
795  return iWorker->implDoStreamBegin(id,ep,es, mcc);
796  }
797  static bool wantsTransition(Worker const* iWorker) {
798  return iWorker->wantsStreamLuminosityBlocks();
799  }
800  static bool needToRunSelection( Worker const* iWorker) {
801  return false;
802  }
803  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr;}
804  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr;}
805 };
806 
807  template<>
809  public:
811  static bool call(Worker* iWorker,StreamID,
812  LuminosityBlockPrincipal const& ep, EventSetup const& es,
813  ActivityRegistry* actReg,
814  ModuleCallingContext const* mcc,
815  Arg::Context const* context) {
816  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
817  return iWorker->implDoEnd(ep,es, mcc);
818  }
819  static bool wantsTransition(Worker const* iWorker) {
820  return iWorker->wantsGlobalLuminosityBlocks();
821  }
822  static bool needToRunSelection( Worker const* iWorker) {
823  return false;
824  }
825  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr;}
826  static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue();}
827  };
828  template<>
830  public:
832  static bool call(Worker* iWorker,StreamID id,
833  LuminosityBlockPrincipal const& ep, EventSetup const& es,
834  ActivityRegistry* actReg,
835  ModuleCallingContext const* mcc,
836  Arg::Context const* context) {
837  ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
838  return iWorker->implDoStreamEnd(id,ep,es, mcc);
839  }
840  static bool wantsTransition(Worker const* iWorker) {
841  return iWorker->wantsStreamLuminosityBlocks();
842  }
843  static bool needToRunSelection( Worker const* iWorker) {
844  return false;
845  }
846  static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr;}
847  static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr;}
848  };
849  }
850 
851  template <typename T>
853  typename T::MyPrincipal const& ep,
854  EventSetup const& es,
855  ServiceToken const& token,
856  StreamID streamID,
857  ParentContext const& parentContext,
858  typename T::Context const* context) {
860  return;
861  }
862 
863  waitingTasks_.add(task);
864  if(T::isEvent_) {
865  timesVisited_.fetch_add(1,std::memory_order_relaxed);
866  }
867 
868  bool expected = false;
869  if(workStarted_.compare_exchange_strong(expected,true)) {
870  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching,parentContext,nullptr);
871 
872  //if have TriggerResults based selection we want to reject the event before doing prefetching
874  //We need to run the selection in a different task so that
875  // we can prefetch the data needed for the selection
876  auto runTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
877  this, ep,es,token,streamID,parentContext,context);
878 
879  //make sure the task is either run or destroyed
880  struct DestroyTask {
881  DestroyTask(edm::WaitingTask* iTask):
882  m_task(iTask) {}
883 
884  ~DestroyTask() {
885  auto p = m_task.load();
886  if(p) {
888  }
889  }
890 
892  auto t = m_task.load();
893  m_task.store(nullptr);
894  return t;
895  }
896 
897  std::atomic<edm::WaitingTask*> m_task;
898  };
899 
900  auto ownRunTask = std::make_shared<DestroyTask>(runTask);
901  auto selectionTask = make_waiting_task(tbb::task::allocate_root(), [ownRunTask,parentContext,&ep,token, this] (std::exception_ptr const* ) mutable {
902 
903  ServiceRegistry::Operate guard(token);
904  prefetchAsync(ownRunTask->release(), token, parentContext, ep);
905  });
906  prePrefetchSelectionAsync(selectionTask,token,streamID, &ep);
907  } else {
908  WaitingTask* moduleTask = new (tbb::task::allocate_root()) RunModuleTask<T>(
909  this, ep, es, token, streamID, parentContext, context);
910  if (T::isEvent_ && hasAcquire()) {
911  WaitingTaskWithArenaHolder runTaskHolder(
912  new (tbb::task::allocate_root())
914  moduleTask,
915  parentContext));
916  moduleTask = new (tbb::task::allocate_root()) AcquireTask<T>(
917  this, ep, es, token, parentContext, std::move(runTaskHolder));
918  }
919  prefetchAsync(moduleTask, token, parentContext, ep);
920  }
921  }
922  }
923 
924  template<typename T>
925  std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
926  typename T::MyPrincipal const& ep,
927  EventSetup const& es,
928  StreamID streamID,
929  ParentContext const& parentContext,
930  typename T::Context const* context) {
931  std::exception_ptr exceptionPtr;
932  if(iEPtr) {
933  assert(*iEPtr);
935  if(shouldRethrowException(*iEPtr, parentContext, T::isEvent_, idValue)) {
936  exceptionPtr = *iEPtr;
937  setException<T::isEvent_>(exceptionPtr);
938  } else {
939  setPassed<T::isEvent_>();
940  }
941  moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr);
942  } else {
943  try {
944  runModule<T>(ep,es,streamID,parentContext,context);
945  } catch(...) {
946  exceptionPtr = std::current_exception();
947  }
948  }
949  waitingTasks_.doneWaiting(exceptionPtr);
950  return exceptionPtr;
951  }
952 
953  template <typename T>
955  typename T::MyPrincipal const& principal,
956  EventSetup const& es,
957  ServiceToken const& serviceToken,
958  StreamID streamID,
959  ParentContext const& parentContext,
960  typename T::Context const* context) {
962  return;
963  }
964  waitingTasks_.add(task);
965  bool expected = false;
966  if(workStarted_.compare_exchange_strong(expected,true)) {
967 
968  auto toDo =[this, &principal, &es, streamID,parentContext,context, serviceToken]()
969  {
970  std::exception_ptr exceptionPtr;
971  try {
972  //Need to make the services available
973  ServiceRegistry::Operate guard(serviceToken);
974 
975  this->runModule<T>(principal,
976  es,
977  streamID,
978  parentContext,
979  context);
980  } catch( ... ) {
981  exceptionPtr = std::current_exception();
982  }
983  this->waitingTasks_.doneWaiting(exceptionPtr);
984  };
985  if(auto queue = this->serializeRunModule()) {
986  queue.push( toDo);
987  } else {
988  auto task = make_functor_task( tbb::task::allocate_root(), toDo);
989  tbb::task::spawn(*task);
990  }
991  }
992  }
993 
994  template <typename T>
995  bool Worker::doWork(typename T::MyPrincipal const& ep,
996  EventSetup const& es,
997  StreamID streamID,
998  ParentContext const& parentContext,
999  typename T::Context const* context) {
1000 
1001  if (T::isEvent_) {
1002  timesVisited_.fetch_add(1,std::memory_order_relaxed);
1003  }
1004  bool rc = false;
1005 
1006  switch(state_) {
1007  case Ready: break;
1008  case Pass: return true;
1009  case Fail: return false;
1010  case Exception: {
1011  std::rethrow_exception(cached_exception_);
1012  }
1013  }
1014 
1015  bool expected = false;
1016  if(not workStarted_.compare_exchange_strong(expected, true) ) {
1017  //another thread beat us here
1018  auto waitTask = edm::make_empty_waiting_task();
1019  waitTask->increment_ref_count();
1020 
1021  waitingTasks_.add(waitTask.get());
1022 
1023  waitTask->wait_for_all();
1024 
1025  switch(state_) {
1026  case Ready: assert(false);
1027  case Pass: return true;
1028  case Fail: return false;
1029  case Exception: {
1030  std::rethrow_exception(cached_exception_);
1031  }
1032  }
1033  }
1034 
1035  //Need the context to be set until after any exception is resolved
1036  moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching,parentContext,nullptr);
1037 
1038  auto resetContext = [](ModuleCallingContext* iContext) {iContext->setContext(ModuleCallingContext::State::kInvalid,ParentContext(),nullptr); };
1039  std::unique_ptr<ModuleCallingContext, decltype(resetContext)> prefetchSentry(&moduleCallingContext_,resetContext);
1040 
1041  if (T::isEvent_) {
1042  //if have TriggerResults based selection we want to reject the event before doing prefetching
1044  auto waitTask = edm::make_empty_waiting_task();
1045  waitTask->set_ref_count(2);
1046  prePrefetchSelectionAsync(waitTask.get(),
1048  streamID, &ep);
1049  waitTask->decrement_ref_count();
1050  waitTask->wait_for_all();
1051 
1052  if(state() != Ready) {
1053  //The selection must have rejected this event
1054  return true;
1055  }
1056  }
1057  auto waitTask = edm::make_empty_waiting_task();
1058  {
1059  //Make sure signal is sent once the prefetching is done
1060  // [the 'pre' signal was sent in prefetchAsync]
1061  //The purpose of this block is to send the signal after wait_for_all
1062  auto sentryFunc = [this](void*) {
1063  emitPostModuleEventPrefetchingSignal();
1064  };
1065  std::unique_ptr<ActivityRegistry, decltype(sentryFunc)> signalSentry(actReg_.get(),sentryFunc);
1066 
1067  //set count to 2 since wait_for_all requires value to not go to 0
1068  waitTask->set_ref_count(2);
1069 
1070  prefetchAsync(waitTask.get(),ServiceRegistry::instance().presentToken(), parentContext, ep);
1071  waitTask->decrement_ref_count();
1072  waitTask->wait_for_all();
1073  }
1074  if(waitTask->exceptionPtr() != nullptr) {
1076  if(shouldRethrowException(*waitTask->exceptionPtr(), parentContext, T::isEvent_, idValue)) {
1077  setException<T::isEvent_>(*waitTask->exceptionPtr());
1078  waitingTasks_.doneWaiting(cached_exception_);
1079  std::rethrow_exception(cached_exception_);
1080  } else {
1081  setPassed<T::isEvent_>();
1082  waitingTasks_.doneWaiting(nullptr);
1083  return true;
1084  }
1085  }
1086  }
1087 
1088  //successful prefetch so no reset necessary
1089  prefetchSentry.release();
1090  if(auto queue = serializeRunModule()) {
1091  auto serviceToken = ServiceRegistry::instance().presentToken();
1092  queue.pushAndWait([&]() {
1093  //Need to make the services available
1094  ServiceRegistry::Operate guard(serviceToken);
1095  try {
1096  rc = runModule<T>(ep,es,streamID,parentContext,context);
1097  } catch(...) {
1098  }
1099  });
1100  } else {
1101  try {
1102  rc = runModule<T>(ep,es,streamID,parentContext,context);
1103  } catch(...) {
1104  }
1105  }
1106  if(state_ == Exception) {
1107  waitingTasks_.doneWaiting(cached_exception_);
1108  std::rethrow_exception(cached_exception_);
1109  }
1110 
1111  waitingTasks_.doneWaiting(nullptr);
1112  return rc;
1113  }
1114 
1115 
1116  template <typename T>
1117  bool Worker::runModule(typename T::MyPrincipal const& ep,
1118  EventSetup const& es,
1119  StreamID streamID,
1120  ParentContext const& parentContext,
1121  typename T::Context const* context) {
1122  //unscheduled producers should advance this
1123  //if (T::isEvent_) {
1124  // ++timesVisited_;
1125  //}
1126  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1127  if (T::isEvent_) {
1128  timesRun_.fetch_add(1,std::memory_order_relaxed);
1129  }
1130 
1131  bool rc = true;
1132  try {
1134  {
1135  rc = workerhelper::CallImpl<T>::call(this,streamID,ep,es, actReg_.get(), &moduleCallingContext_, context);
1136 
1137  if (rc) {
1138  setPassed<T::isEvent_>();
1139  } else {
1140  setFailed<T::isEvent_>();
1141  }
1142  });
1143  } catch(cms::Exception& ex) {
1144  exceptionContext(ex, &moduleCallingContext_);
1146  if(shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, idValue)) {
1147  assert(not cached_exception_);
1148  setException<T::isEvent_>(std::current_exception());
1149  std::rethrow_exception(cached_exception_);
1150  } else {
1151  rc = setPassed<T::isEvent_>();
1152  }
1153  }
1154 
1155  return rc;
1156  }
1157 
1158  template <typename T>
1159  std::exception_ptr Worker::runModuleDirectly(typename T::MyPrincipal const& ep,
1160  EventSetup const& es,
1161  StreamID streamID,
1162  ParentContext const& parentContext,
1163  typename T::Context const* context) {
1164 
1165  timesVisited_.fetch_add(1,std::memory_order_relaxed);
1166  std::exception_ptr const* prefetchingException = nullptr; // null because there was no prefetching to do
1167  return runModuleAfterAsyncPrefetch<T>(prefetchingException, ep, es, streamID, parentContext, context);
1168  }
1169 }
1170 #endif
void push(T &&iAction)
asynchronously pushes functor iAction into queue
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:811
tbb::task * execute() override
Definition: Worker.h:423
std::atomic< int > timesVisited_
Definition: Worker.h:607
ModuleDescription const & description() const
Definition: Worker.h:188
virtual bool implDoStreamBegin(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
T::Context const * m_context
Definition: Worker.h:495
bool runModule(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:1117
void callWhenDoneAsync(WaitingTask *task)
Definition: Worker.h:165
T::MyPrincipal const & m_principal
Definition: Worker.h:491
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:613
std::exception_ptr setException(std::exception_ptr iException)
Definition: Worker.h:353
virtual bool wantsGlobalRuns() const =0
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:746
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:788
Definition: hltDiff.cc:290
ParentContext const m_parentContext
Definition: Worker.h:494
def destroy(e)
Definition: pyrootRender.py:14
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
std::atomic< int > timesExcept_
Definition: Worker.h:610
OccurrenceTraits< RunPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:724
std::atomic< int > timesFailed_
Definition: Worker.h:609
int timesPassed() const
Definition: Worker.h:226
void addedToPath()
Definition: Worker.h:220
bool doWork(typename T::MyPrincipal const &, EventSetup const &c, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:995
State state() const
Definition: Worker.h:229
void clearCounters()
Definition: Worker.h:212
static bool call(Worker *iWorker, StreamID, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:768
TaskQueueAdaptor(SerialTaskQueueChain *iChain)
Definition: Worker.h:88
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:681
static bool call(Worker *iWorker, StreamID, EventPrincipal const &ep, EventSetup const &es, ActivityRegistry *, ModuleCallingContext const *mcc, Arg::Context const *)
Definition: Worker.h:658
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, void const *)
Definition: Worker.h:137
void push(T &&iAction)
asynchronously pushes functor iAction into queue
ActivityRegistry * activityRegistry()
Definition: Worker.h:274
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:832
void exceptionContext(std::ostream &, GlobalContext const &)
ExceptionToActionTable const * actions_
Definition: Worker.h:617
ServiceToken presentToken() const
void doWorkNoPrefetchingAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, ServiceToken const &token, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:954
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:1159
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
BranchType
Definition: BranchType.h:11
void beginJob()
Definition: Breakpoints.cc:15
tbb::task * execute() override
Definition: Worker.h:512
AcquireTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:506
bool setFailed()
Definition: Worker.h:344
int timesExcept() const
Definition: Worker.h:228
void pushAndWait(F &&iF)
Definition: Worker.h:102
bool resume()
Resumes processing if the queue was paused.
def principal(options)
EnableQueueGuard(EnableQueueGuard &&iGuard)
Definition: Worker.h:419
virtual bool implDoEnd(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
void reset()
Definition: Worker.h:178
virtual bool implNeedToRunSelection() const =0
int timesVisited() const
Definition: Worker.h:225
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
OccurrenceTraits< RunPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:680
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
ModuleDescription const * descPtr() const
Definition: Worker.h:189
virtual bool implDoBegin(RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
EnableQueueGuard(SerialTaskQueue *iQueue)
Definition: Worker.h:415
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.h:176
static ServiceRegistry & instance()
#define CMS_THREAD_GUARD(_var_)
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalEnd > Arg
Definition: Worker.h:810
virtual bool wantsStreamLuminosityBlocks() const =0
std::atomic< State > state_
Definition: Worker.h:611
int timesRun() const
Definition: Worker.h:224
double f[11][100]
static bool call(Worker *iWorker, StreamID id, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:703
static bool call(Worker *iWorker, StreamID, RunPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:725
Definition: Types.py:1
int numberOfPathsOn_
Definition: Worker.h:612
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:831
bool setPassed()
Definition: Worker.h:335
TransitionIDValue(T const &iP)
Definition: Worker.h:318
TaskQueueAdaptor(LimitedTaskQueue *iLimited)
Definition: Worker.h:89
OccurrenceTraits< EventPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:657
std::string value() const override
Definition: Worker.h:319
virtual SerialTaskQueue * globalRunsQueue()=0
OccurrenceTraits< RunPrincipal, BranchActionStreamBegin > Arg
Definition: Worker.h:702
std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const *iEPtr, typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:925
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:173
virtual bool wantsGlobalLuminosityBlocks() const =0
int timesPass() const
Definition: Worker.h:231
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
AcquireTask(Worker *worker, EventPrincipal const &ep, EventSetup const &es, ServiceToken const &token, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.h:518
void doWorkAsync(WaitingTask *task, typename T::MyPrincipal const &, EventSetup const &c, ServiceToken const &token, StreamID stream, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:852
Definition: hltDiff.cc:291
static bool call(Worker *iWorker, StreamID id, LuminosityBlockPrincipal const &ep, EventSetup const &es, ActivityRegistry *actReg, ModuleCallingContext const *mcc, Arg::Context const *context)
Definition: Worker.h:789
int timesFailed() const
Definition: Worker.h:227
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:174
virtual bool wantsStreamRuns() const =0
HLT enums.
void emitPostModuleEventPrefetchingSignal()
Definition: Worker.h:367
virtual bool implDo(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc)=0
double a
Definition: hdecay.h:121
T get() const
Definition: EventSetup.h:68
std::atomic< int > timesPassed_
Definition: Worker.h:608
void pushAndWait(T &&iAction)
synchronously pushes functor iAction into queue
virtual bool implDoStreamEnd(StreamID id, RunPrincipal const &rp, EventSetup const &c, ModuleCallingContext const *mcc)=0
auto wrap(F iFunc) -> decltype(iFunc())
static Interceptor::Registry registry("Interceptor")
OccurrenceTraits< LuminosityBlockPrincipal, BranchActionGlobalBegin > Arg
Definition: Worker.h:767
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
Definition: blowfish.cc:281
std::atomic< int > timesRun_
Definition: Worker.h:606
RunModuleTask(Worker *worker, typename T::MyPrincipal const &ep, EventSetup const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:398
long double T
OccurrenceTraits< RunPrincipal, BranchActionStreamEnd > Arg
Definition: Worker.h:745
def move(src, dest)
Definition: eostools.py:511
ServiceToken m_serviceToken
Definition: Worker.h:496
virtual SerialTaskQueue * globalLuminosityBlocksQueue()=0
EventSetup const & m_es
Definition: Worker.h:492