CMS 3D CMS Logo

Worker.cc
Go to the documentation of this file.
1 
2 /*----------------------------------------------------------------------
3 ----------------------------------------------------------------------*/
17 
18 namespace edm {
19  namespace {
20  class ModuleBeginJobSignalSentry {
21  public:
22  ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
23  if (a_)
24  a_->preModuleBeginJobSignal_(*md_);
25  }
26  ~ModuleBeginJobSignalSentry() {
27  if (a_)
28  a_->postModuleBeginJobSignal_(*md_);
29  }
30 
31  private:
32  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
33  ModuleDescription const* md_;
34  };
35 
36  class ModuleEndJobSignalSentry {
37  public:
38  ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
39  if (a_)
40  a_->preModuleEndJobSignal_(*md_);
41  }
42  ~ModuleEndJobSignalSentry() {
43  if (a_)
44  a_->postModuleEndJobSignal_(*md_);
45  }
46 
47  private:
48  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
49  ModuleDescription const* md_;
50  };
51 
52  class ModuleBeginStreamSignalSentry {
53  public:
54  ModuleBeginStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
55  : a_(a), sc_(sc), mcc_(mcc) {
56  if (a_)
57  a_->preModuleBeginStreamSignal_(sc_, mcc_);
58  }
59  ~ModuleBeginStreamSignalSentry() {
60  if (a_)
61  a_->postModuleBeginStreamSignal_(sc_, mcc_);
62  }
63 
64  private:
65  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
66  StreamContext const& sc_;
67  ModuleCallingContext const& mcc_;
68  };
69 
70  class ModuleEndStreamSignalSentry {
71  public:
72  ModuleEndStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
73  : a_(a), sc_(sc), mcc_(mcc) {
74  if (a_)
75  a_->preModuleEndStreamSignal_(sc_, mcc_);
76  }
77  ~ModuleEndStreamSignalSentry() {
78  if (a_)
79  a_->postModuleEndStreamSignal_(sc_, mcc_);
80  }
81 
82  private:
83  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
84  StreamContext const& sc_;
85  ModuleCallingContext const& mcc_;
86  };
87 
88  } // namespace
89 
91  : timesRun_(0),
92  timesVisited_(0),
93  timesPassed_(0),
94  timesFailed_(0),
95  timesExcept_(0),
96  state_(Ready),
97  numberOfPathsOn_(0),
98  numberOfPathsLeftToRun_(0),
99  moduleCallingContext_(&iMD),
100  actions_(iActions),
101  cached_exception_(),
102  actReg_(),
103  earlyDeleteHelper_(nullptr),
104  workStarted_(false),
105  ranAcquireWithoutException_(false) {
107  }
108 
110 
111  void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
112 
115  if (pset and pset->exists("@shouldTryToContinue")) {
116  shouldTryToContinue_ = true;
117  }
118  }
119 
120  bool Worker::shouldRethrowException(std::exception_ptr iPtr,
121  ParentContext const& parentContext,
122  bool isEvent,
123  bool shouldTryToContinue) const {
124  // NOTE: the warning printed as a result of ignoring or failing
125  // a module will only be printed during the full true processing
126  // pass of this module
127 
128  // Get the action corresponding to this exception. However, if processing
129  // something other than an event (e.g. run, lumi) always rethrow.
130  if (not isEvent) {
131  return true;
132  }
133  try {
134  convertException::wrap([&]() { std::rethrow_exception(iPtr); });
135  } catch (cms::Exception& ex) {
137 
139  return true;
140  }
142  if (shouldTryToContinue) {
143  edm::printCmsExceptionWarning("TryToContinue", ex);
144  }
145  return not shouldTryToContinue;
146  }
148  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
149  return false;
150  }
151  }
152  return true;
153  }
154 
155  void Worker::prePrefetchSelectionAsync(oneapi::tbb::task_group& group,
156  WaitingTask* successTask,
157  ServiceToken const& token,
158  StreamID id,
159  EventPrincipal const* iPrincipal) {
160  successTask->increment_ref_count();
161 
162  ServiceWeakToken weakToken = token;
163  auto choiceTask =
164  edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
165  ServiceRegistry::Operate guard(weakToken.lock());
166  // There is no reasonable place to rethrow, and implDoPrePrefetchSelection() should not throw in the first place.
167  CMS_SA_ALLOW try {
168  if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
169  timesRun_.fetch_add(1, std::memory_order_relaxed);
170  setPassed<true>();
171  waitingTasks_.doneWaiting(nullptr);
172  //TBB requires that destroyed tasks have count 0
173  if (0 == successTask->decrement_ref_count()) {
174  TaskSentry s(successTask);
175  }
176  return;
177  }
178  } catch (...) {
179  }
180  if (0 == successTask->decrement_ref_count()) {
181  group.run([successTask]() {
182  TaskSentry s(successTask);
183  successTask->execute();
184  });
185  }
186  });
187 
188  WaitingTaskHolder choiceHolder{group, choiceTask};
189 
190  std::vector<ProductResolverIndexAndSkipBit> items;
192 
193  for (auto const& item : items) {
194  ProductResolverIndex productResolverIndex = item.productResolverIndex();
195  bool skipCurrentProcess = item.skipCurrentProcess();
196  if (productResolverIndex != ProductResolverIndexAmbiguous) {
197  iPrincipal->prefetchAsync(
198  choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
199  }
200  }
201  choiceHolder.doneWaiting(std::exception_ptr{});
202  }
203 
205  EventSetupImpl const& iImpl,
206  Transition iTrans,
207  ServiceToken const& iToken) {
209  return;
210  }
211  auto const& recs = esRecordsToGetFrom(iTrans);
212  auto const& items = esItemsToGetFrom(iTrans);
213 
214  assert(items.size() == recs.size());
215  if (items.empty()) {
216  return;
217  }
218 
219  for (size_t i = 0; i != items.size(); ++i) {
220  if (recs[i] != ESRecordIndex{}) {
221  auto rec = iImpl.findImpl(recs[i]);
222  if (rec) {
223  rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
224  }
225  }
226  }
227  }
228 
229  void Worker::edPrefetchAsync(WaitingTaskHolder iTask, ServiceToken const& token, Principal const& iPrincipal) const {
230  // Prefetch products the module declares it consumes
231  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
232 
233  for (auto const& item : items) {
234  ProductResolverIndex productResolverIndex = item.productResolverIndex();
235  bool skipCurrentProcess = item.skipCurrentProcess();
236  if (productResolverIndex != ProductResolverIndexAmbiguous) {
237  iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
238  }
239  }
240  }
241 
243 
244  size_t Worker::transformIndex(edm::BranchDescription const&) const { return -1; }
246  size_t iTransformIndex,
247  EventPrincipal const& iPrincipal,
248  ServiceToken const& iToken,
249  StreamID,
250  ModuleCallingContext const& mcc,
251  StreamContext const*) {
252  ServiceWeakToken weakToken = iToken;
253 
254  //Need to make the services available early so other services can see them
255  auto task = make_waiting_task([this, iTask, weakToken, &iPrincipal, iTransformIndex, parent = mcc.parent()](
256  std::exception_ptr const* iExcept) mutable {
257  if (iExcept) {
258  iTask.doneWaiting(*iExcept);
259  return;
260  }
261  implDoTransformAsync(iTask, iTransformIndex, iPrincipal, parent, weakToken);
262  });
263 
264  //NOTE: need different ModuleCallingContext. The ProductResolver will copy the context in order to get
265  // a longer lifetime than this function call.
266  iPrincipal.prefetchAsync(
267  WaitingTaskHolder(*iTask.group(), task), itemToGetForTransform(iTransformIndex), false, iToken, &mcc);
268  }
269 
276  assert(iDesc);
278  }
279 
281  try {
282  convertException::wrap([&]() {
283  ModuleBeginJobSignalSentry cpp(actReg_.get(), *description());
284  implBeginJob();
285  });
286  } catch (cms::Exception& ex) {
287  state_ = Exception;
288  std::ostringstream ost;
289  ost << "Calling beginJob for module " << description()->moduleName() << "/'" << description()->moduleLabel()
290  << "'";
291  ex.addContext(ost.str());
292  throw;
293  }
294  }
295 
296  void Worker::endJob() {
297  try {
298  convertException::wrap([&]() {
300  assert(desc != nullptr);
301  ModuleEndJobSignalSentry cpp(actReg_.get(), *desc);
302  implEndJob();
303  });
304  } catch (cms::Exception& ex) {
305  state_ = Exception;
306  std::ostringstream ost;
307  ost << "Calling endJob for module " << description()->moduleName() << "/'" << description()->moduleLabel() << "'";
308  ex.addContext(ost.str());
309  throw;
310  }
311  }
312 
313  void Worker::beginStream(StreamID id, StreamContext& streamContext) {
314  try {
315  convertException::wrap([&]() {
317  streamContext.setEventID(EventID(0, 0, 0));
318  streamContext.setRunIndex(RunIndex::invalidRunIndex());
320  streamContext.setTimestamp(Timestamp());
321  ParentContext parentContext(&streamContext);
322  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
324  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
325  implBeginStream(id);
326  });
327  } catch (cms::Exception& ex) {
328  state_ = Exception;
329  std::ostringstream ost;
330  ost << "Calling beginStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
331  << "'";
332  ex.addContext(ost.str());
333  throw;
334  }
335  }
336 
337  void Worker::endStream(StreamID id, StreamContext& streamContext) {
338  try {
339  convertException::wrap([&]() {
341  streamContext.setEventID(EventID(0, 0, 0));
342  streamContext.setRunIndex(RunIndex::invalidRunIndex());
344  streamContext.setTimestamp(Timestamp());
345  ParentContext parentContext(&streamContext);
346  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
348  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
349  implEndStream(id);
350  });
351  } catch (cms::Exception& ex) {
352  state_ = Exception;
353  std::ostringstream ost;
354  ost << "Calling endStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
355  << "'";
356  ex.addContext(ost.str());
357  throw;
358  }
359  }
360 
362  try {
364  } catch (cms::Exception& ex) {
365  ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
366  throw ex;
367  }
368  }
369 
371  if (earlyDeleteHelper_) {
372  earlyDeleteHelper_->pathFinished(iEvent);
373  }
374  if (0 == --numberOfPathsLeftToRun_) {
376  }
377  }
378 
380  if (earlyDeleteHelper_) {
381  earlyDeleteHelper_->moduleRan(iEvent);
382  }
383  }
384 
386  ParentContext const& parentContext,
387  WaitingTaskWithArenaHolder& holder) {
388  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
389  try {
390  convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
391  } catch (cms::Exception& ex) {
393  if (shouldRethrowException(std::current_exception(), parentContext, true, shouldTryToContinue_)) {
394  timesRun_.fetch_add(1, std::memory_order_relaxed);
395  throw;
396  }
397  }
398  }
399 
400  void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr iEPtr,
401  EventTransitionInfo const& eventTransitionInfo,
402  ParentContext const& parentContext,
405  std::exception_ptr exceptionPtr;
406  if (iEPtr) {
407  if (shouldRethrowException(iEPtr, parentContext, true, shouldTryToContinue_)) {
408  exceptionPtr = iEPtr;
409  }
411  } else {
412  // Caught exception is propagated via WaitingTaskWithArenaHolder
413  CMS_SA_ALLOW try {
414  runAcquire(eventTransitionInfo, parentContext, holder);
416  } catch (...) {
417  exceptionPtr = std::current_exception();
418  }
419  }
420  // It is important this is after runAcquire completely finishes
421  holder.doneWaiting(exceptionPtr);
422  }
423 
424  std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const& parentContext) {
426  try {
427  convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
428  } catch (cms::Exception& ex) {
429  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
431  return std::current_exception();
432  }
433  }
434  return iEPtr;
435  }
436 
438  oneapi::tbb::task_group* group,
439  WaitingTask* runModuleTask,
440  ParentContext const& parentContext)
441  : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
442 
444  auto excptr = exceptionPtr();
445  WaitingTaskHolder holder(*m_group, m_runModuleTask);
446  if (excptr) {
447  holder.doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
448  }
449  }
450 } // namespace edm
ModuleCallingContext const * previousModuleOnThread() const
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:270
static const TGPicture * info(bool iBackgroundIsBlack)
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:70
#define CMS_SA_ALLOW
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:611
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
unsigned int ProductResolverIndex
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:242
void prefetchAsync(WaitingTaskHolder iTask, ESResolverIndex iResolverIndex, EventSetupImpl const *, ServiceToken const &, ESParentContext) const
prefetch the data to setup for subsequent calls to getImplementation
virtual ~Worker()
Definition: Worker.cc:109
void endJob()
Definition: Worker.cc:296
virtual void execute()=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:618
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
std::string const & moduleName() const
void prefetchAsync(WaitingTaskHolder waitTask, ProductResolverIndex index, bool skipCurrentProcess, ServiceToken const &token, ModuleCallingContext const *mcc) const
Definition: Principal.cc:640
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.cc:400
std::string const & category() const
Definition: Exception.cc:147
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:424
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:313
assert(be >=bs)
ExceptionToActionTable const * actions_
Definition: Worker.h:615
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:624
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:111
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:613
void setTransition(Transition v)
Definition: StreamContext.h:66
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:155
std::exception_ptr cached_exception_
Definition: Worker.h:616
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
oneapi::tbb::task_group * group() const noexcept
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
Definition: Worker.cc:245
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
ParameterSetID const & parameterSetID() const
int iEvent
Definition: GenABIO.cc:224
void doneWaiting(std::exception_ptr iExcept)
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:622
void doneWaiting(std::exception_ptr iExcept)
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:69
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:361
Transition
Definition: Transition.h:12
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:90
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< State > state_
Definition: Worker.h:609
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:204
virtual std::vector< ESResolverIndex > const & esItemsToGetFrom(Transition) const =0
virtual void implEndJob()=0
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ServiceToken lock() const
Definition: ServiceToken.h:101
virtual size_t transformIndex(edm::BranchDescription const &) const =0
Definition: Worker.cc:244
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:437
ModuleDescription const * description() const
Definition: Worker.h:198
exception_actions::ActionCodes find(const std::string &category) const
virtual void implBeginJob()=0
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:229
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:370
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual void implBeginStream(StreamID)=0
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const
Definition: Worker.cc:120
virtual void implEndStream(StreamID)=0
void setEventID(EventID const &v)
Definition: StreamContext.h:67
HLT enums.
void checkForShouldTryToContinue(ModuleDescription const &)
Definition: Worker.cc:113
double a
Definition: hdecay.h:121
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:385
void beginJob()
Definition: Worker.cc:280
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:337
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:379
auto wrap(F iFunc) -> decltype(iFunc())
unsigned int decrement_ref_count()
Definition: TaskBase.h:42
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:620
std::atomic< int > timesRun_
Definition: Worker.h:604
eventsetup::EventSetupRecordImpl const * findImpl(const eventsetup::EventSetupRecordKey &) const
std::string const & moduleLabel() const
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:68
static Registry * instance()
Definition: Registry.cc:12
void increment_ref_count()
Definition: TaskBase.h:41
BranchType const & branchType() const
Definition: Principal.h:175
bool shouldTryToContinue_
Definition: Worker.h:626
ParentContext const & parent() const
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const =0