CMS 3D CMS Logo

Worker.cc
Go to the documentation of this file.
1 
2 /*----------------------------------------------------------------------
3 ----------------------------------------------------------------------*/
17 
18 namespace edm {
19  namespace {
20  class ModuleBeginJobSignalSentry {
21  public:
22  ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
23  if (a_)
24  a_->preModuleBeginJobSignal_(*md_);
25  }
26  ~ModuleBeginJobSignalSentry() {
27  if (a_)
28  a_->postModuleBeginJobSignal_(*md_);
29  }
30 
31  private:
32  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
33  ModuleDescription const* md_;
34  };
35 
36  class ModuleEndJobSignalSentry {
37  public:
38  ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
39  if (a_)
40  a_->preModuleEndJobSignal_(*md_);
41  }
42  ~ModuleEndJobSignalSentry() {
43  if (a_)
44  a_->postModuleEndJobSignal_(*md_);
45  }
46 
47  private:
48  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
49  ModuleDescription const* md_;
50  };
51 
52  class ModuleBeginStreamSignalSentry {
53  public:
54  ModuleBeginStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
55  : a_(a), sc_(sc), mcc_(mcc) {
56  if (a_)
57  a_->preModuleBeginStreamSignal_(sc_, mcc_);
58  }
59  ~ModuleBeginStreamSignalSentry() {
60  if (a_)
61  a_->postModuleBeginStreamSignal_(sc_, mcc_);
62  }
63 
64  private:
65  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
66  StreamContext const& sc_;
67  ModuleCallingContext const& mcc_;
68  };
69 
70  class ModuleEndStreamSignalSentry {
71  public:
72  ModuleEndStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
73  : a_(a), sc_(sc), mcc_(mcc) {
74  if (a_)
75  a_->preModuleEndStreamSignal_(sc_, mcc_);
76  }
77  ~ModuleEndStreamSignalSentry() {
78  if (a_)
79  a_->postModuleEndStreamSignal_(sc_, mcc_);
80  }
81 
82  private:
83  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
84  StreamContext const& sc_;
85  ModuleCallingContext const& mcc_;
86  };
87 
88  } // namespace
89 
91  : timesRun_(0),
92  timesVisited_(0),
93  timesPassed_(0),
94  timesFailed_(0),
95  timesExcept_(0),
96  state_(Ready),
97  numberOfPathsOn_(0),
98  numberOfPathsLeftToRun_(0),
99  moduleCallingContext_(&iMD),
100  actions_(iActions),
101  cached_exception_(),
102  actReg_(),
103  earlyDeleteHelper_(nullptr),
104  workStarted_(false),
105  ranAcquireWithoutException_(false) {
107  }
108 
110 
111  void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
112 
115  if (pset and pset->exists("@shouldTryToContinue")) {
116  shouldTryToContinue_ = true;
117  }
118  }
119 
120  bool Worker::shouldRethrowException(std::exception_ptr iPtr,
121  ParentContext const& parentContext,
122  bool isEvent,
123  bool shouldTryToContinue) const {
124  // NOTE: the warning printed as a result of ignoring or failing
125  // a module will only be printed during the full true processing
126  // pass of this module
127 
128  // Get the action corresponding to this exception. However, if processing
129  // something other than an event (e.g. run, lumi) always rethrow.
130  if (not isEvent) {
131  return true;
132  }
133  try {
134  convertException::wrap([&]() { std::rethrow_exception(iPtr); });
135  } catch (cms::Exception& ex) {
137 
139  return true;
140  }
142  if (shouldTryToContinue) {
143  edm::printCmsExceptionWarning("TryToContinue", ex);
144  }
145  return not shouldTryToContinue;
146  }
148  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
149  return false;
150  }
151  }
152  return true;
153  }
154 
155  void Worker::prePrefetchSelectionAsync(oneapi::tbb::task_group& group,
156  WaitingTask* successTask,
157  ServiceToken const& token,
158  StreamID id,
159  EventPrincipal const* iPrincipal) {
160  successTask->increment_ref_count();
161 
162  ServiceWeakToken weakToken = token;
163  auto choiceTask =
164  edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
165  ServiceRegistry::Operate guard(weakToken.lock());
166  // There is no reasonable place to rethrow, and implDoPrePrefetchSelection() should not throw in the first place.
167  CMS_SA_ALLOW try {
168  if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
169  timesRun_.fetch_add(1, std::memory_order_relaxed);
170  setPassed<true>();
171  waitingTasks_.doneWaiting(nullptr);
172  //TBB requires that destroyed tasks have count 0
173  if (0 == successTask->decrement_ref_count()) {
174  TaskSentry s(successTask);
175  }
176  return;
177  }
178  } catch (...) {
179  }
180  if (0 == successTask->decrement_ref_count()) {
181  group.run([successTask]() {
182  TaskSentry s(successTask);
183  successTask->execute();
184  });
185  }
186  });
187 
188  WaitingTaskHolder choiceHolder{group, choiceTask};
189 
190  std::vector<ProductResolverIndexAndSkipBit> items;
192 
193  for (auto const& item : items) {
194  ProductResolverIndex productResolverIndex = item.productResolverIndex();
195  bool skipCurrentProcess = item.skipCurrentProcess();
196  if (productResolverIndex != ProductResolverIndexAmbiguous) {
197  iPrincipal->prefetchAsync(
198  choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
199  }
200  }
201  choiceHolder.doneWaiting(std::exception_ptr{});
202  }
203 
205  EventSetupImpl const& iImpl,
206  Transition iTrans,
207  ServiceToken const& iToken) {
209  return;
210  }
211  auto const& recs = esRecordsToGetFrom(iTrans);
212  auto const& items = esItemsToGetFrom(iTrans);
213 
214  assert(items.size() == recs.size());
215  if (items.empty()) {
216  return;
217  }
218 
219  for (size_t i = 0; i != items.size(); ++i) {
220  if (recs[i] != ESRecordIndex{}) {
221  auto rec = iImpl.findImpl(recs[i]);
222  if (rec) {
223  rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
224  }
225  }
226  }
227  }
228 
229  void Worker::edPrefetchAsync(WaitingTaskHolder iTask, ServiceToken const& token, Principal const& iPrincipal) const {
230  // Prefetch products the module declares it consumes
231  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
232 
233  for (auto const& item : items) {
234  ProductResolverIndex productResolverIndex = item.productResolverIndex();
235  bool skipCurrentProcess = item.skipCurrentProcess();
236  if (productResolverIndex != ProductResolverIndexAmbiguous) {
237  iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
238  }
239  }
240  }
241 
243 
244  size_t Worker::transformIndex(edm::BranchDescription const&) const { return -1; }
246  size_t iTransformIndex,
247  EventPrincipal const& iPrincipal,
248  ServiceToken const& iToken,
249  StreamID,
250  ModuleCallingContext const& mcc,
251  StreamContext const*) {
252  ServiceWeakToken weakToken = iToken;
253 
254  //Need to make the services available early so other services can see them
255  auto task = make_waiting_task(
256  [this, iTask, weakToken, &iPrincipal, iTransformIndex, mcc](std::exception_ptr const* iExcept) mutable {
257  //post prefetch signal
258  actReg_->postModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
259  if (iExcept) {
260  iTask.doneWaiting(*iExcept);
261  return;
262  }
263  implDoTransformAsync(iTask, iTransformIndex, iPrincipal, mcc.parent(), weakToken);
264  });
265 
266  //pre prefetch signal
267  actReg_->preModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
268  iPrincipal.prefetchAsync(
269  WaitingTaskHolder(*iTask.group(), task), itemToGetForTransform(iTransformIndex), false, iToken, &mcc);
270  }
271 
274  0,
279  assert(iDesc);
281  }
282 
284  try {
285  convertException::wrap([&]() {
286  ModuleBeginJobSignalSentry cpp(actReg_.get(), *description());
287  implBeginJob();
288  });
289  } catch (cms::Exception& ex) {
290  state_ = Exception;
291  std::ostringstream ost;
292  ost << "Calling beginJob for module " << description()->moduleName() << "/'" << description()->moduleLabel()
293  << "'";
294  ex.addContext(ost.str());
295  throw;
296  }
297  }
298 
299  void Worker::endJob() {
300  try {
301  convertException::wrap([&]() {
303  assert(desc != nullptr);
304  ModuleEndJobSignalSentry cpp(actReg_.get(), *desc);
305  implEndJob();
306  });
307  } catch (cms::Exception& ex) {
308  state_ = Exception;
309  std::ostringstream ost;
310  ost << "Calling endJob for module " << description()->moduleName() << "/'" << description()->moduleLabel() << "'";
311  ex.addContext(ost.str());
312  throw;
313  }
314  }
315 
316  void Worker::beginStream(StreamID id, StreamContext& streamContext) {
317  try {
318  convertException::wrap([&]() {
320  streamContext.setEventID(EventID(0, 0, 0));
321  streamContext.setRunIndex(RunIndex::invalidRunIndex());
323  streamContext.setTimestamp(Timestamp());
324  ParentContext parentContext(&streamContext);
325  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
327  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
328  implBeginStream(id);
329  });
330  } catch (cms::Exception& ex) {
331  state_ = Exception;
332  std::ostringstream ost;
333  ost << "Calling beginStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
334  << "'";
335  ex.addContext(ost.str());
336  throw;
337  }
338  }
339 
340  void Worker::endStream(StreamID id, StreamContext& streamContext) {
341  try {
342  convertException::wrap([&]() {
344  streamContext.setEventID(EventID(0, 0, 0));
345  streamContext.setRunIndex(RunIndex::invalidRunIndex());
347  streamContext.setTimestamp(Timestamp());
348  ParentContext parentContext(&streamContext);
349  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
351  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
352  implEndStream(id);
353  });
354  } catch (cms::Exception& ex) {
355  state_ = Exception;
356  std::ostringstream ost;
357  ost << "Calling endStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
358  << "'";
359  ex.addContext(ost.str());
360  throw;
361  }
362  }
363 
365  try {
367  } catch (cms::Exception& ex) {
368  ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
369  throw ex;
370  }
371  }
372 
374  if (earlyDeleteHelper_) {
375  earlyDeleteHelper_->pathFinished(iEvent);
376  }
377  if (0 == --numberOfPathsLeftToRun_) {
379  }
380  }
381 
383  if (earlyDeleteHelper_) {
384  earlyDeleteHelper_->moduleRan(iEvent);
385  }
386  }
387 
389  ParentContext const& parentContext,
390  WaitingTaskWithArenaHolder& holder) {
391  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
392  try {
393  convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
394  } catch (cms::Exception& ex) {
396  if (shouldRethrowException(std::current_exception(), parentContext, true, shouldTryToContinue_)) {
397  timesRun_.fetch_add(1, std::memory_order_relaxed);
398  throw;
399  }
400  }
401  }
402 
403  void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr iEPtr,
404  EventTransitionInfo const& eventTransitionInfo,
405  ParentContext const& parentContext,
408  std::exception_ptr exceptionPtr;
409  if (iEPtr) {
410  if (shouldRethrowException(iEPtr, parentContext, true, shouldTryToContinue_)) {
411  exceptionPtr = iEPtr;
412  }
414  } else {
415  // Caught exception is propagated via WaitingTaskWithArenaHolder
416  CMS_SA_ALLOW try {
417  runAcquire(eventTransitionInfo, parentContext, holder);
419  } catch (...) {
420  exceptionPtr = std::current_exception();
421  }
422  }
423  // It is important this is after runAcquire completely finishes
424  holder.doneWaiting(exceptionPtr);
425  }
426 
427  std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const& parentContext) {
429  try {
430  convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
431  } catch (cms::Exception& ex) {
432  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
434  return std::current_exception();
435  }
436  }
437  return iEPtr;
438  }
439 
441  oneapi::tbb::task_group* group,
442  WaitingTask* runModuleTask,
443  ParentContext const& parentContext)
444  : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
445 
447  auto excptr = exceptionPtr();
448  WaitingTaskHolder holder(*m_group, m_runModuleTask);
449  if (excptr) {
450  holder.doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
451  }
452  }
453 } // namespace edm
ModuleCallingContext const * previousModuleOnThread() const
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:272
static const TGPicture * info(bool iBackgroundIsBlack)
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:70
#define CMS_SA_ALLOW
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:611
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
unsigned int ProductResolverIndex
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:242
void prefetchAsync(WaitingTaskHolder iTask, ESResolverIndex iResolverIndex, EventSetupImpl const *, ServiceToken const &, ESParentContext) const
prefetch the data to setup for subsequent calls to getImplementation
virtual ~Worker()
Definition: Worker.cc:109
void endJob()
Definition: Worker.cc:299
virtual void execute()=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
std::string const & moduleName() const
void prefetchAsync(WaitingTaskHolder waitTask, ProductResolverIndex index, bool skipCurrentProcess, ServiceToken const &token, ModuleCallingContext const *mcc) const
Definition: Principal.cc:640
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.cc:403
std::string const & category() const
Definition: Exception.cc:147
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:427
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:316
assert(be >=bs)
ExceptionToActionTable const * actions_
Definition: Worker.h:615
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:624
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:111
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
void setTransition(Transition v)
Definition: StreamContext.h:66
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:155
std::exception_ptr cached_exception_
Definition: Worker.h:616
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
oneapi::tbb::task_group * group() const noexcept
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
Definition: Worker.cc:245
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
ParameterSetID const & parameterSetID() const
int iEvent
Definition: GenABIO.cc:224
void doneWaiting(std::exception_ptr iExcept)
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
void doneWaiting(std::exception_ptr iExcept)
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:69
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:364
Transition
Definition: Transition.h:12
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:90
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< State > state_
Definition: Worker.h:609
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:204
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom(Transition) const =0
virtual void implEndJob()=0
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ServiceToken lock() const
Definition: ServiceToken.h:101
virtual size_t transformIndex(edm::BranchDescription const &) const =0
Definition: Worker.cc:244
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:440
ModuleDescription const * description() const
Definition: Worker.h:198
exception_actions::ActionCodes find(const std::string &category) const
virtual void implBeginJob()=0
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:229
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:373
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual void implBeginStream(StreamID)=0
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
Definition: Worker.cc:120
virtual void implEndStream(StreamID)=0
void setEventID(EventID const &v)
Definition: StreamContext.h:67
HLT enums.
StreamContext const * getStreamContext() const
void checkForShouldTryToContinue(ModuleDescription const &)
Definition: Worker.cc:113
double a
Definition: hdecay.h:121
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:388
void beginJob()
Definition: Worker.cc:283
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:340
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:382
auto wrap(F iFunc) -> decltype(iFunc())
unsigned int decrement_ref_count()
Definition: TaskBase.h:42
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:620
std::atomic< int > timesRun_
Definition: Worker.h:604
eventsetup::EventSetupRecordImpl const * findImpl(const eventsetup::EventSetupRecordKey &) const
std::string const & moduleLabel() const
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:68
static Registry * instance()
Definition: Registry.cc:12
void increment_ref_count()
Definition: TaskBase.h:41
BranchType const & branchType() const
Definition: Principal.h:175
bool shouldTryToContinue_
Definition: Worker.h:626
ParentContext const & parent() const
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const =0