CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Worker.cc
Go to the documentation of this file.
1 
2 /*----------------------------------------------------------------------
3 ----------------------------------------------------------------------*/
4 
10 
11 namespace edm {
12  namespace {
13  class ModuleBeginJobSignalSentry {
14 public:
15  ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md):a_(a), md_(&md) {
16  if(a_) a_->preModuleBeginJobSignal_(*md_);
17  }
18  ~ModuleBeginJobSignalSentry() {
19  if(a_) a_->postModuleBeginJobSignal_(*md_);
20  }
21 private:
22  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
23  ModuleDescription const* md_;
24  };
25 
26  class ModuleEndJobSignalSentry {
27 public:
28  ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md):a_(a), md_(&md) {
29  if(a_) a_->preModuleEndJobSignal_(*md_);
30  }
31  ~ModuleEndJobSignalSentry() {
32  if(a_) a_->postModuleEndJobSignal_(*md_);
33  }
34 private:
35  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
36  ModuleDescription const* md_;
37  };
38 
39  class ModuleBeginStreamSignalSentry {
40  public:
41  ModuleBeginStreamSignalSentry(ActivityRegistry* a,
42  StreamContext const& sc,
43  ModuleCallingContext const& mcc) : a_(a), sc_(sc), mcc_(mcc) {
44  if(a_) a_->preModuleBeginStreamSignal_(sc_, mcc_);
45  }
46  ~ModuleBeginStreamSignalSentry() {
47  if(a_) a_->postModuleBeginStreamSignal_(sc_, mcc_);
48  }
49  private:
50  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
51  StreamContext const& sc_;
52  ModuleCallingContext const& mcc_;
53  };
54 
55  class ModuleEndStreamSignalSentry {
56  public:
57  ModuleEndStreamSignalSentry(ActivityRegistry* a,
58  StreamContext const& sc,
59  ModuleCallingContext const& mcc) : a_(a), sc_(sc), mcc_(mcc) {
60  if(a_) a_->preModuleEndStreamSignal_(sc_, mcc_);
61  }
62  ~ModuleEndStreamSignalSentry() {
63  if(a_) a_->postModuleEndStreamSignal_(sc_, mcc_);
64  }
65  private:
66  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
67  StreamContext const& sc_;
68  ModuleCallingContext const& mcc_;
69  };
70 
71  }
72 
74  ExceptionToActionTable const* iActions) :
75  timesRun_(0),
76  timesVisited_(0),
77  timesPassed_(0),
78  timesFailed_(0),
79  timesExcept_(0),
80  state_(Ready),
81  numberOfPathsOn_(0),
82  numberOfPathsLeftToRun_(0),
83  moduleCallingContext_(&iMD),
84  actions_(iActions),
85  cached_exception_(),
86  actReg_(),
87  earlyDeleteHelper_(nullptr),
88  workStarted_(false),
89  ranAcquireWithoutException_(false)
90  {
91  }
92 
94  }
95 
96  void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) {
97  actReg_ = areg;
98  }
99 
100 
102  cms::Exception& ex,
103  ModuleCallingContext const* mcc) {
104 
105  ModuleCallingContext const* imcc = mcc;
106  while( (imcc->type() == ParentContext::Type::kModule) or
107  (imcc->type() == ParentContext::Type::kInternal) ) {
108  std::ostringstream iost;
110  iost << "Prefetching for module ";
111  } else {
112  iost << "Calling method for module ";
113  }
114  iost << imcc->moduleDescription()->moduleName() << "/'"
115  << imcc->moduleDescription()->moduleLabel() << "'";
116 
117  if(imcc->type() == ParentContext::Type::kInternal) {
118  iost << " (probably inside some kind of mixing module)";
119  imcc = imcc->internalContext()->moduleCallingContext();
120  } else {
121  imcc = imcc->moduleCallingContext();
122  }
123  ex.addContext(iost.str());
124  }
125  std::ostringstream ost;
127  ost << "Prefetching for module ";
128  } else {
129  ost << "Calling method for module ";
130  }
131  ost << imcc->moduleDescription()->moduleName() << "/'"
132  << imcc->moduleDescription()->moduleLabel() << "'";
133  ex.addContext(ost.str());
134 
135  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
136  ost.str("");
137  ost << "Running path '";
138  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
139  ex.addContext(ost.str());
140  auto streamContext =imcc->placeInPathContext()->pathContext()->streamContext();
141  if(streamContext) {
142  ost.str("");
143  edm::exceptionContext(ost,*streamContext);
144  ex.addContext(ost.str());
145  }
146  } else {
147  if (imcc->type() == ParentContext::Type::kStream) {
148  ost.str("");
149  edm::exceptionContext(ost, *(imcc->streamContext()) );
150  ex.addContext(ost.str());
151  } else if(imcc->type() == ParentContext::Type::kGlobal) {
152  ost.str("");
153  edm::exceptionContext(ost, *(imcc->globalContext()) );
154  ex.addContext(ost.str());
155  }
156  }
157  }
158 
159  bool Worker::shouldRethrowException(std::exception_ptr iPtr,
160  ParentContext const& parentContext,
161  bool isEvent,
162  TransitionIDValueBase const& iID) const {
163 
164  // NOTE: the warning printed as a result of ignoring or failing
165  // a module will only be printed during the full true processing
166  // pass of this module
167 
168  // Get the action corresponding to this exception. However, if processing
169  // something other than an event (e.g. run, lumi) always rethrow.
170  if(not isEvent) {
171  return true;
172  }
173  try {
174  convertException::wrap([&]() {
175  std::rethrow_exception(iPtr);
176  });
177  } catch(cms::Exception &ex) {
179 
180  if(action == exception_actions::Rethrow) {
181  return true;
182  }
183 
184  ModuleCallingContext tempContext(&description(),ModuleCallingContext::State::kInvalid, parentContext, nullptr);
185 
186  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
187  // as IgnoreCompletely, so any subsequent OutputModules are still run.
188  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
189  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
190  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
191  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
192 
193  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
194  action == exception_actions::FailPath) {
196  }
197  }
199  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
200  return false;
201  }
202  }
203  return true;
204  }
205 
206 
207  void Worker::prefetchAsync(WaitingTask* iTask, ParentContext const& parentContext, Principal const& iPrincipal) {
208  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
209  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
210 
212 
213  if(iPrincipal.branchType()==InEvent) {
214  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
215  }
216 
217  //Need to be sure the ref count isn't set to 0 immediately
218  iTask->increment_ref_count();
219  for(auto const& item : items) {
220  ProductResolverIndex productResolverIndex = item.productResolverIndex();
221  bool skipCurrentProcess = item.skipCurrentProcess();
222  if(productResolverIndex != ProductResolverIndexAmbiguous) {
223  iPrincipal.prefetchAsync(iTask,productResolverIndex, skipCurrentProcess, &moduleCallingContext_);
224  }
225  }
226 
227  if(iPrincipal.branchType()==InEvent) {
229  }
230 
231  if(0 == iTask->decrement_ref_count()) {
232  //if everything finishes before we leave this routine, we need to launch the task
233  tbb::task::spawn(*iTask);
234  }
235  }
236 
238  StreamID id,
239  EventPrincipal const* iPrincipal) {
240  std::vector<ProductResolverIndexAndSkipBit> items;
241  itemsToGetForSelection(items);
242 
243  successTask->increment_ref_count();
244  auto token = ServiceRegistry::instance().presentToken();
245 
246  auto choiceTask = edm::make_waiting_task(tbb::task::allocate_root(),
247  [id,successTask,iPrincipal,this,token](std::exception_ptr const*) {
248  ServiceRegistry::Operate guard(token);
249  try {
250  if( not implDoPrePrefetchSelection(id,*iPrincipal,&moduleCallingContext_) ) {
251  timesRun_.fetch_add(1,std::memory_order_relaxed);
252  setPassed<true>();
253  waitingTasks_.doneWaiting(nullptr);
254  //TBB requires that destroyed tasks have count 0
255  if ( 0 == successTask->decrement_ref_count() ) {
256  tbb::task::destroy(*successTask);
257  }
258  return;
259  }
260  } catch(...) {}
261  if(0 == successTask->decrement_ref_count()) {
262  tbb::task::spawn(*successTask);
263  }
264  });
265 
266  choiceTask->increment_ref_count();
267  for(auto const& item : items) {
268  ProductResolverIndex productResolverIndex = item.productResolverIndex();
269  bool skipCurrentProcess = item.skipCurrentProcess();
270  if(productResolverIndex != ProductResolverIndexAmbiguous) {
271  iPrincipal->prefetchAsync(choiceTask,productResolverIndex, skipCurrentProcess, &moduleCallingContext_);
272  }
273  }
274 
275  if(0 == choiceTask->decrement_ref_count()) {
276  tbb::task::spawn(*choiceTask);
277  }
278  }
279 
280 
282  earlyDeleteHelper_=iHelper;
283  }
284 
289  }
290 
292  try {
293  convertException::wrap([&]() {
294  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
295  implBeginJob();
296  });
297  }
298  catch(cms::Exception& ex) {
299  state_ = Exception;
300  std::ostringstream ost;
301  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
302  ex.addContext(ost.str());
303  throw;
304  }
305  }
306 
307  void Worker::endJob() {
308  try {
309  convertException::wrap([&]() {
310  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
311  implEndJob();
312  });
313  }
314  catch(cms::Exception& ex) {
315  state_ = Exception;
316  std::ostringstream ost;
317  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
318  ex.addContext(ost.str());
319  throw;
320  }
321  }
322 
323  void Worker::beginStream(StreamID id, StreamContext& streamContext) {
324  try {
325  convertException::wrap([&]() {
327  streamContext.setEventID(EventID(0, 0, 0));
328  streamContext.setRunIndex(RunIndex::invalidRunIndex());
330  streamContext.setTimestamp(Timestamp());
331  ParentContext parentContext(&streamContext);
332  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
334  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
335  implBeginStream(id);
336  });
337  }
338  catch(cms::Exception& ex) {
339  state_ = Exception;
340  std::ostringstream ost;
341  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
342  ex.addContext(ost.str());
343  throw;
344  }
345  }
346 
347  void Worker::endStream(StreamID id, StreamContext& streamContext) {
348  try {
349  convertException::wrap([&]() {
351  streamContext.setEventID(EventID(0, 0, 0));
352  streamContext.setRunIndex(RunIndex::invalidRunIndex());
354  streamContext.setTimestamp(Timestamp());
355  ParentContext parentContext(&streamContext);
356  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
358  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
359  implEndStream(id);
360  });
361  }
362  catch(cms::Exception& ex) {
363  state_ = Exception;
364  std::ostringstream ost;
365  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
366  ex.addContext(ost.str());
367  throw;
368  }
369  }
370 
372  if( 0 == --numberOfPathsLeftToRun_) {
374  }
375  }
376 
378  if(earlyDeleteHelper_) {
379  earlyDeleteHelper_->moduleRan(iEvent);
380  }
381  }
382 
384  EventSetup const& es,
385  ParentContext const& parentContext,
386  WaitingTaskWithArenaHolder& holder) {
387 
388  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
389  try {
391  {
392  this->implDoAcquire(ep, es, &moduleCallingContext_, holder);
393  });
394  } catch(cms::Exception& ex) {
397  if(shouldRethrowException(std::current_exception(), parentContext, true, idValue)) {
398  timesRun_.fetch_add(1,std::memory_order_relaxed);
399  throw;
400  }
401  }
402  }
403 
404  void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
405  EventPrincipal const& ep,
406  EventSetup const& es,
407  ParentContext const& parentContext,
410  std::exception_ptr exceptionPtr;
411  if(iEPtr) {
412  assert(*iEPtr);
414  if(shouldRethrowException(*iEPtr, parentContext, true, idValue)) {
415  exceptionPtr = *iEPtr;
416  }
418  } else {
419  try {
420  runAcquire(ep, es, parentContext, holder);
422  } catch(...) {
423  exceptionPtr = std::current_exception();
424  }
425  }
426  // It is important this is after runAcquire completely finishes
427  holder.doneWaiting(exceptionPtr);
428  }
429 
430  std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr const* iEPtr,
431  ParentContext const& parentContext) {
433  try {
434  convertException::wrap([iEPtr]() {
435  std::rethrow_exception(*iEPtr);
436  });
437  } catch(cms::Exception &ex) {
438  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
440  return std::current_exception();
441  }
442  }
443  return *iEPtr;
444  }
445 
448  WaitingTask* runModuleTask,
449  ParentContext const& parentContext) :
450  m_worker(worker),
451  m_runModuleTask(runModuleTask),
452  m_parentContext(parentContext) {
453  }
454 
455  tbb::task*
457 
458  auto excptr = exceptionPtr();
459  if (excptr) {
460  // increment the ref count so the holder will not spawn it
461  m_runModuleTask->set_ref_count(1);
464  m_parentContext));
465  }
466  m_runModuleTask->set_ref_count(0);
467  // Depend on TBB Scheduler Bypass to run the next task
468  return m_runModuleTask;
469  }
470 }
void prePrefetchSelectionAsync(WaitingTask *task, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:237
std::string const & pathName() const
Definition: PathContext.h:37
GlobalContext const * globalContext() const
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:285
void skipOnPath()
Definition: Worker.cc:371
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:69
ModuleDescription const & description() const
Definition: Worker.h:182
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:578
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
ModuleCallingContext const * moduleCallingContext() const
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:281
ModuleCallingContext const * getTopModuleCallingContext() const
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:101
virtual ~Worker()
Definition: Worker.cc:93
void endJob()
Definition: Worker.cc:307
def destroy(e)
Definition: pyrootRender.py:13
InternalContext const * internalContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:585
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
ModuleCallingContext const * moduleCallingContext() const
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:323
void exceptionContext(std::ostream &, GlobalContext const &)
#define nullptr
ExceptionToActionTable const * actions_
Definition: Worker.h:582
ServiceToken presentToken() const
std::string const & moduleLabel() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:591
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:96
ParentContext const & parent() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:580
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *iEPtr, EventPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.cc:404
void setTransition(Transition v)
Definition: StreamContext.h:65
std::exception_ptr cached_exception_
Definition: Worker.h:583
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
bool isEndPath() const
Definition: PathContext.h:42
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:230
void doneWaiting(std::exception_ptr iExcept)
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:159
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:589
void doneWaiting(std::exception_ptr iExcept)
HandleExternalWorkExceptionTask(Worker *worker, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:447
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:68
BranchType const & branchType() const
Definition: Principal.h:178
PathContext const * pathContext() const
void prefetchAsync(WaitingTask *waitTask, ProductResolverIndex index, bool skipCurrentProcess, ModuleCallingContext const *mcc) const
Definition: Principal.cc:565
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
ModuleDescription const * moduleDescription() const
static ServiceRegistry & instance()
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:73
std::atomic< State > state_
Definition: Worker.h:576
void runAcquire(EventPrincipal const &ep, EventSetup const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder &holder)
Definition: Worker.cc:383
virtual void implEndJob()=0
StreamContext const * streamContext() const
static LuminosityBlockIndex invalidLuminosityBlockIndex()
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
virtual void implBeginJob()=0
virtual void implDoAcquire(EventPrincipal const &, EventSetup const &c, ModuleCallingContext const *mcc, WaitingTaskWithArenaHolder &holder)=0
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:207
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
virtual void implEndStream(StreamID)=0
StreamContext const * streamContext() const
Definition: PathContext.h:38
void setEventID(EventID const &v)
Definition: StreamContext.h:66
ModuleCallingContext const * previousModuleOnThread() const
HLT enums.
double a
Definition: hdecay.h:121
void beginJob()
Definition: Worker.cc:291
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:347
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:377
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:587
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
std::atomic< int > timesRun_
Definition: Worker.h:571
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:54
PlaceInPathContext const * placeInPathContext() const
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:67
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:430
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)