CMS 3D CMS Logo

Worker.cc
Go to the documentation of this file.
1 
2 /*----------------------------------------------------------------------
3 ----------------------------------------------------------------------*/
4 
10 
11 namespace edm {
12  namespace {
13  class ModuleBeginJobSignalSentry {
14 public:
15  ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md):a_(a), md_(&md) {
16  if(a_) a_->preModuleBeginJobSignal_(*md_);
17  }
18  ~ModuleBeginJobSignalSentry() {
19  if(a_) a_->postModuleBeginJobSignal_(*md_);
20  }
21 private:
22  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
23  ModuleDescription const* md_;
24  };
25 
26  class ModuleEndJobSignalSentry {
27 public:
28  ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md):a_(a), md_(&md) {
29  if(a_) a_->preModuleEndJobSignal_(*md_);
30  }
31  ~ModuleEndJobSignalSentry() {
32  if(a_) a_->postModuleEndJobSignal_(*md_);
33  }
34 private:
35  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
36  ModuleDescription const* md_;
37  };
38 
39  class ModuleBeginStreamSignalSentry {
40  public:
41  ModuleBeginStreamSignalSentry(ActivityRegistry* a,
42  StreamContext const& sc,
43  ModuleCallingContext const& mcc) : a_(a), sc_(sc), mcc_(mcc) {
44  if(a_) a_->preModuleBeginStreamSignal_(sc_, mcc_);
45  }
46  ~ModuleBeginStreamSignalSentry() {
47  if(a_) a_->postModuleBeginStreamSignal_(sc_, mcc_);
48  }
49  private:
50  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
51  StreamContext const& sc_;
52  ModuleCallingContext const& mcc_;
53  };
54 
55  class ModuleEndStreamSignalSentry {
56  public:
57  ModuleEndStreamSignalSentry(ActivityRegistry* a,
58  StreamContext const& sc,
59  ModuleCallingContext const& mcc) : a_(a), sc_(sc), mcc_(mcc) {
60  if(a_) a_->preModuleEndStreamSignal_(sc_, mcc_);
61  }
62  ~ModuleEndStreamSignalSentry() {
63  if(a_) a_->postModuleEndStreamSignal_(sc_, mcc_);
64  }
65  private:
66  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
67  StreamContext const& sc_;
68  ModuleCallingContext const& mcc_;
69  };
70 
71  }
72 
74  ExceptionToActionTable const* iActions) :
75  timesRun_(0),
76  timesVisited_(0),
77  timesPassed_(0),
78  timesFailed_(0),
79  timesExcept_(0),
80  state_(Ready),
81  numberOfPathsOn_(0),
82  numberOfPathsLeftToRun_(0),
83  moduleCallingContext_(&iMD),
84  actions_(iActions),
85  cached_exception_(),
86  actReg_(),
87  earlyDeleteHelper_(nullptr),
88  workStarted_(false),
89  ranAcquireWithoutException_(false)
90  {
91  }
92 
94  }
95 
96  void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) {
97  actReg_ = areg;
98  }
99 
100 
102  cms::Exception& ex,
103  ModuleCallingContext const* mcc) {
104 
105  ModuleCallingContext const* imcc = mcc;
106  while( (imcc->type() == ParentContext::Type::kModule) or
107  (imcc->type() == ParentContext::Type::kInternal) ) {
108  std::ostringstream iost;
110  iost << "Prefetching for module ";
111  } else {
112  iost << "Calling method for module ";
113  }
114  iost << imcc->moduleDescription()->moduleName() << "/'"
115  << imcc->moduleDescription()->moduleLabel() << "'";
116 
117  if(imcc->type() == ParentContext::Type::kInternal) {
118  iost << " (probably inside some kind of mixing module)";
119  imcc = imcc->internalContext()->moduleCallingContext();
120  } else {
121  imcc = imcc->moduleCallingContext();
122  }
123  ex.addContext(iost.str());
124  }
125  std::ostringstream ost;
127  ost << "Prefetching for module ";
128  } else {
129  ost << "Calling method for module ";
130  }
131  ost << imcc->moduleDescription()->moduleName() << "/'"
132  << imcc->moduleDescription()->moduleLabel() << "'";
133  ex.addContext(ost.str());
134 
135  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
136  ost.str("");
137  ost << "Running path '";
138  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
139  ex.addContext(ost.str());
140  auto streamContext =imcc->placeInPathContext()->pathContext()->streamContext();
141  if(streamContext) {
142  ost.str("");
143  edm::exceptionContext(ost,*streamContext);
144  ex.addContext(ost.str());
145  }
146  } else {
147  if (imcc->type() == ParentContext::Type::kStream) {
148  ost.str("");
149  edm::exceptionContext(ost, *(imcc->streamContext()) );
150  ex.addContext(ost.str());
151  } else if(imcc->type() == ParentContext::Type::kGlobal) {
152  ost.str("");
153  edm::exceptionContext(ost, *(imcc->globalContext()) );
154  ex.addContext(ost.str());
155  }
156  }
157  }
158 
159  bool Worker::shouldRethrowException(std::exception_ptr iPtr,
160  ParentContext const& parentContext,
161  bool isEvent,
162  TransitionIDValueBase const& iID) const {
163 
164  // NOTE: the warning printed as a result of ignoring or failing
165  // a module will only be printed during the full true processing
166  // pass of this module
167 
168  // Get the action corresponding to this exception. However, if processing
169  // something other than an event (e.g. run, lumi) always rethrow.
170  if(not isEvent) {
171  return true;
172  }
173  try {
174  convertException::wrap([&]() {
175  std::rethrow_exception(iPtr);
176  });
177  } catch(cms::Exception &ex) {
179 
180  if(action == exception_actions::Rethrow) {
181  return true;
182  }
183 
184  ModuleCallingContext tempContext(&description(),ModuleCallingContext::State::kInvalid, parentContext, nullptr);
185 
186  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
187  // as IgnoreCompletely, so any subsequent OutputModules are still run.
188  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
189  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
190  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
191  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
192 
193  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
194  action == exception_actions::FailPath) {
196  }
197  }
199  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
200  return false;
201  }
202  }
203  return true;
204  }
205 
206 
207  void Worker::prefetchAsync(WaitingTask* iTask, ServiceToken const& token, ParentContext const& parentContext, Principal const& iPrincipal) {
208  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
209  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
210 
212 
213  if(iPrincipal.branchType()==InEvent) {
214  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
215  }
216 
217  //Need to be sure the ref count isn't set to 0 immediately
218  iTask->increment_ref_count();
219  for(auto const& item : items) {
220  ProductResolverIndex productResolverIndex = item.productResolverIndex();
221  bool skipCurrentProcess = item.skipCurrentProcess();
222  if(productResolverIndex != ProductResolverIndexAmbiguous) {
223  iPrincipal.prefetchAsync(iTask,productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
224  }
225  }
226 
227  if(iPrincipal.branchType()==InEvent) {
229  }
230 
231  if(0 == iTask->decrement_ref_count()) {
232  //if everything finishes before we leave this routine, we need to launch the task
233  tbb::task::spawn(*iTask);
234  }
235  }
236 
238  ServiceToken const& token,
239  StreamID id,
240  EventPrincipal const* iPrincipal) {
241  successTask->increment_ref_count();
242 
243  auto choiceTask = edm::make_waiting_task(tbb::task::allocate_root(),
244  [id,successTask,iPrincipal,this,token](std::exception_ptr const*) {
245  ServiceRegistry::Operate guard(token);
246  try {
247  if( not implDoPrePrefetchSelection(id,*iPrincipal,&moduleCallingContext_) ) {
248  timesRun_.fetch_add(1,std::memory_order_relaxed);
249  setPassed<true>();
250  waitingTasks_.doneWaiting(nullptr);
251  //TBB requires that destroyed tasks have count 0
252  if ( 0 == successTask->decrement_ref_count() ) {
253  tbb::task::destroy(*successTask);
254  }
255  return;
256  }
257  } catch(...) {}
258  if(0 == successTask->decrement_ref_count()) {
259  tbb::task::spawn(*successTask);
260  }
261  });
262 
263  WaitingTaskHolder choiceHolder{choiceTask};
264 
265  std::vector<ProductResolverIndexAndSkipBit> items;
266  itemsToGetForSelection(items);
267 
268  for(auto const& item : items) {
269  ProductResolverIndex productResolverIndex = item.productResolverIndex();
270  bool skipCurrentProcess = item.skipCurrentProcess();
271  if(productResolverIndex != ProductResolverIndexAmbiguous) {
272  iPrincipal->prefetchAsync(choiceTask,productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
273  }
274  }
275  choiceHolder.doneWaiting(std::exception_ptr{});
276  }
277 
278 
280  earlyDeleteHelper_=iHelper;
281  }
282 
287  }
288 
290  try {
291  convertException::wrap([&]() {
292  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
293  implBeginJob();
294  });
295  }
296  catch(cms::Exception& ex) {
297  state_ = Exception;
298  std::ostringstream ost;
299  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
300  ex.addContext(ost.str());
301  throw;
302  }
303  }
304 
305  void Worker::endJob() {
306  try {
307  convertException::wrap([&]() {
308  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
309  implEndJob();
310  });
311  }
312  catch(cms::Exception& ex) {
313  state_ = Exception;
314  std::ostringstream ost;
315  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
316  ex.addContext(ost.str());
317  throw;
318  }
319  }
320 
321  void Worker::beginStream(StreamID id, StreamContext& streamContext) {
322  try {
323  convertException::wrap([&]() {
325  streamContext.setEventID(EventID(0, 0, 0));
326  streamContext.setRunIndex(RunIndex::invalidRunIndex());
328  streamContext.setTimestamp(Timestamp());
329  ParentContext parentContext(&streamContext);
330  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
332  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
333  implBeginStream(id);
334  });
335  }
336  catch(cms::Exception& ex) {
337  state_ = Exception;
338  std::ostringstream ost;
339  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
340  ex.addContext(ost.str());
341  throw;
342  }
343  }
344 
345  void Worker::endStream(StreamID id, StreamContext& streamContext) {
346  try {
347  convertException::wrap([&]() {
349  streamContext.setEventID(EventID(0, 0, 0));
350  streamContext.setRunIndex(RunIndex::invalidRunIndex());
352  streamContext.setTimestamp(Timestamp());
353  ParentContext parentContext(&streamContext);
354  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
356  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
357  implEndStream(id);
358  });
359  }
360  catch(cms::Exception& ex) {
361  state_ = Exception;
362  std::ostringstream ost;
363  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
364  ex.addContext(ost.str());
365  throw;
366  }
367  }
368 
370  if( 0 == --numberOfPathsLeftToRun_) {
372  }
373  }
374 
376  if(earlyDeleteHelper_) {
377  earlyDeleteHelper_->moduleRan(iEvent);
378  }
379  }
380 
382  EventSetup const& es,
383  ParentContext const& parentContext,
384  WaitingTaskWithArenaHolder& holder) {
385 
386  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
387  try {
389  {
390  this->implDoAcquire(ep, es, &moduleCallingContext_, holder);
391  });
392  } catch(cms::Exception& ex) {
395  if(shouldRethrowException(std::current_exception(), parentContext, true, idValue)) {
396  timesRun_.fetch_add(1,std::memory_order_relaxed);
397  throw;
398  }
399  }
400  }
401 
402  void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
403  EventPrincipal const& ep,
404  EventSetup const& es,
405  ParentContext const& parentContext,
408  std::exception_ptr exceptionPtr;
409  if(iEPtr) {
410  assert(*iEPtr);
412  if(shouldRethrowException(*iEPtr, parentContext, true, idValue)) {
413  exceptionPtr = *iEPtr;
414  }
416  } else {
417  try {
418  runAcquire(ep, es, parentContext, holder);
420  } catch(...) {
421  exceptionPtr = std::current_exception();
422  }
423  }
424  // It is important this is after runAcquire completely finishes
425  holder.doneWaiting(exceptionPtr);
426  }
427 
428  std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr const* iEPtr,
429  ParentContext const& parentContext) {
431  try {
432  convertException::wrap([iEPtr]() {
433  std::rethrow_exception(*iEPtr);
434  });
435  } catch(cms::Exception &ex) {
436  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
438  return std::current_exception();
439  }
440  }
441  return *iEPtr;
442  }
443 
446  WaitingTask* runModuleTask,
447  ParentContext const& parentContext) :
448  m_worker(worker),
449  m_runModuleTask(runModuleTask),
450  m_parentContext(parentContext) {
451  }
452 
453  tbb::task*
455 
456  auto excptr = exceptionPtr();
457  if (excptr) {
458  // increment the ref count so the holder will not spawn it
459  m_runModuleTask->set_ref_count(1);
462  m_parentContext));
463  }
464  m_runModuleTask->set_ref_count(0);
465  // Depend on TBB Scheduler Bypass to run the next task
466  return m_runModuleTask;
467  }
468 }
std::string const & pathName() const
Definition: PathContext.h:37
GlobalContext const * globalContext() const
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:283
void skipOnPath()
Definition: Worker.cc:369
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:73
ModuleDescription const & description() const
Definition: Worker.h:188
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:613
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
ModuleCallingContext const * moduleCallingContext() const
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:279
ModuleCallingContext const * getTopModuleCallingContext() const
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:101
virtual ~Worker()
Definition: Worker.cc:93
void endJob()
Definition: Worker.cc:305
def destroy(e)
Definition: pyrootRender.py:13
InternalContext const * internalContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:620
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
ModuleCallingContext const * moduleCallingContext() const
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:321
void exceptionContext(std::ostream &, GlobalContext const &)
#define nullptr
ExceptionToActionTable const * actions_
Definition: Worker.h:617
std::string const & moduleLabel() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:626
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:96
ParentContext const & parent() const
void prefetchAsync(WaitingTask *waitTask, ProductResolverIndex index, bool skipCurrentProcess, ServiceToken const &token, ModuleCallingContext const *mcc) const
Definition: Principal.cc:580
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:615
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *iEPtr, EventPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.cc:402
void setTransition(Transition v)
Definition: StreamContext.h:69
std::exception_ptr cached_exception_
Definition: Worker.h:618
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
bool isEndPath() const
Definition: PathContext.h:42
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:230
void doneWaiting(std::exception_ptr iExcept)
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:159
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:624
void doneWaiting(std::exception_ptr iExcept)
HandleExternalWorkExceptionTask(Worker *worker, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:445
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:72
BranchType const & branchType() const
Definition: Principal.h:176
PathContext const * pathContext() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
ModuleDescription const * moduleDescription() const
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:73
std::atomic< State > state_
Definition: Worker.h:611
void runAcquire(EventPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder &holder)
Definition: Worker.cc:381
void prefetchAsync(WaitingTask *, ServiceToken const &, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:207
virtual void implEndJob()=0
StreamContext const * streamContext() const
static LuminosityBlockIndex invalidLuminosityBlockIndex()
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:237
virtual void implBeginJob()=0
virtual void implDoAcquire(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc, WaitingTaskWithArenaHolder &holder)=0
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
virtual void implEndStream(StreamID)=0
StreamContext const * streamContext() const
Definition: PathContext.h:38
void setEventID(EventID const &v)
Definition: StreamContext.h:70
ModuleCallingContext const * previousModuleOnThread() const
HLT enums.
double a
Definition: hdecay.h:121
void beginJob()
Definition: Worker.cc:289
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:345
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:375
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:622
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
std::atomic< int > timesRun_
Definition: Worker.h:606
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:54
PlaceInPathContext const * placeInPathContext() const
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:71
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:428
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)