CMS 3D CMS Logo

Worker.cc
Go to the documentation of this file.
1 
2 /*----------------------------------------------------------------------
3 ----------------------------------------------------------------------*/
16 
17 namespace edm {
18  namespace {
19  class ModuleBeginJobSignalSentry {
20  public:
21  ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
22  if (a_)
23  a_->preModuleBeginJobSignal_(*md_);
24  }
25  ~ModuleBeginJobSignalSentry() {
26  if (a_)
27  a_->postModuleBeginJobSignal_(*md_);
28  }
29 
30  private:
31  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
32  ModuleDescription const* md_;
33  };
34 
35  class ModuleEndJobSignalSentry {
36  public:
37  ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
38  if (a_)
39  a_->preModuleEndJobSignal_(*md_);
40  }
41  ~ModuleEndJobSignalSentry() {
42  if (a_)
43  a_->postModuleEndJobSignal_(*md_);
44  }
45 
46  private:
47  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
48  ModuleDescription const* md_;
49  };
50 
51  class ModuleBeginStreamSignalSentry {
52  public:
53  ModuleBeginStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
54  : a_(a), sc_(sc), mcc_(mcc) {
55  if (a_)
56  a_->preModuleBeginStreamSignal_(sc_, mcc_);
57  }
58  ~ModuleBeginStreamSignalSentry() {
59  if (a_)
60  a_->postModuleBeginStreamSignal_(sc_, mcc_);
61  }
62 
63  private:
64  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
65  StreamContext const& sc_;
66  ModuleCallingContext const& mcc_;
67  };
68 
69  class ModuleEndStreamSignalSentry {
70  public:
71  ModuleEndStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
72  : a_(a), sc_(sc), mcc_(mcc) {
73  if (a_)
74  a_->preModuleEndStreamSignal_(sc_, mcc_);
75  }
76  ~ModuleEndStreamSignalSentry() {
77  if (a_)
78  a_->postModuleEndStreamSignal_(sc_, mcc_);
79  }
80 
81  private:
82  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
83  StreamContext const& sc_;
84  ModuleCallingContext const& mcc_;
85  };
86 
87  } // namespace
88 
90  : timesRun_(0),
91  timesVisited_(0),
92  timesPassed_(0),
93  timesFailed_(0),
94  timesExcept_(0),
95  state_(Ready),
96  numberOfPathsOn_(0),
97  numberOfPathsLeftToRun_(0),
98  moduleCallingContext_(&iMD),
99  actions_(iActions),
100  cached_exception_(),
101  actReg_(),
102  earlyDeleteHelper_(nullptr),
103  workStarted_(false),
104  ranAcquireWithoutException_(false) {}
105 
107 
108  void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
109 
110  bool Worker::shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const {
111  // NOTE: the warning printed as a result of ignoring or failing
112  // a module will only be printed during the full true processing
113  // pass of this module
114 
115  // Get the action corresponding to this exception. However, if processing
116  // something other than an event (e.g. run, lumi) always rethrow.
117  if (not isEvent) {
118  return true;
119  }
120  try {
121  convertException::wrap([&]() { std::rethrow_exception(iPtr); });
122  } catch (cms::Exception& ex) {
124 
126  return true;
127  }
128 
129  ModuleCallingContext tempContext(description(), ModuleCallingContext::State::kInvalid, parentContext, nullptr);
130 
131  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
132  // as IgnoreCompletely, so any subsequent OutputModules are still run.
133  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
134  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
135  if (top_mcc->type() == ParentContext::Type::kPlaceInPath &&
136  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
140  }
141  }
143  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
144  return false;
145  }
146  }
147  return true;
148  }
149 
150  void Worker::prePrefetchSelectionAsync(oneapi::tbb::task_group& group,
151  WaitingTask* successTask,
152  ServiceToken const& token,
153  StreamID id,
154  EventPrincipal const* iPrincipal) {
155  successTask->increment_ref_count();
156 
157  ServiceWeakToken weakToken = token;
158  auto choiceTask =
159  edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
160  ServiceRegistry::Operate guard(weakToken.lock());
161  // There is no reasonable place to rethrow, and implDoPrePrefetchSelection() should not throw in the first place.
162  CMS_SA_ALLOW try {
163  if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
164  timesRun_.fetch_add(1, std::memory_order_relaxed);
165  setPassed<true>();
166  waitingTasks_.doneWaiting(nullptr);
167  //TBB requires that destroyed tasks have count 0
168  if (0 == successTask->decrement_ref_count()) {
169  TaskSentry s(successTask);
170  }
171  return;
172  }
173  } catch (...) {
174  }
175  if (0 == successTask->decrement_ref_count()) {
176  group.run([successTask]() {
177  TaskSentry s(successTask);
178  successTask->execute();
179  });
180  }
181  });
182 
183  WaitingTaskHolder choiceHolder{group, choiceTask};
184 
185  std::vector<ProductResolverIndexAndSkipBit> items;
187 
188  for (auto const& item : items) {
189  ProductResolverIndex productResolverIndex = item.productResolverIndex();
190  bool skipCurrentProcess = item.skipCurrentProcess();
191  if (productResolverIndex != ProductResolverIndexAmbiguous) {
192  iPrincipal->prefetchAsync(
193  choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
194  }
195  }
196  choiceHolder.doneWaiting(std::exception_ptr{});
197  }
198 
200  EventSetupImpl const& iImpl,
201  Transition iTrans,
202  ServiceToken const& iToken) {
204  return;
205  }
206  auto const& recs = esRecordsToGetFrom(iTrans);
207  auto const& items = esItemsToGetFrom(iTrans);
208 
209  assert(items.size() == recs.size());
210  if (items.empty()) {
211  return;
212  }
213 
214  for (size_t i = 0; i != items.size(); ++i) {
215  if (recs[i] != ESRecordIndex{}) {
216  auto rec = iImpl.findImpl(recs[i]);
217  if (rec) {
218  rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
219  }
220  }
221  }
222  }
223 
224  void Worker::edPrefetchAsync(WaitingTaskHolder iTask, ServiceToken const& token, Principal const& iPrincipal) const {
225  // Prefetch products the module declares it consumes
226  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
227 
228  for (auto const& item : items) {
229  ProductResolverIndex productResolverIndex = item.productResolverIndex();
230  bool skipCurrentProcess = item.skipCurrentProcess();
231  if (productResolverIndex != ProductResolverIndexAmbiguous) {
232  iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
233  }
234  }
235  }
236 
238 
239  size_t Worker::transformIndex(edm::BranchDescription const&) const { return -1; }
241  size_t iTransformIndex,
242  EventPrincipal const& iPrincipal,
243  ServiceToken const& iToken,
244  StreamID,
245  ModuleCallingContext const& mcc,
246  StreamContext const*) {
247  ServiceWeakToken weakToken = iToken;
248 
249  //Need to make the services available early so other services can see them
250  auto task = make_waiting_task([this, iTask, weakToken, &iPrincipal, iTransformIndex, parent = mcc.parent()](
251  std::exception_ptr const* iExcept) mutable {
252  if (iExcept) {
253  iTask.doneWaiting(*iExcept);
254  return;
255  }
256  implDoTransformAsync(iTask, iTransformIndex, iPrincipal, parent, weakToken);
257  });
258 
259  //NOTE: need different ModuleCallingContext. The ProductResolver will copy the context in order to get
260  // a longer lifetime than this function call.
261  iPrincipal.prefetchAsync(
262  WaitingTaskHolder(*iTask.group(), task), itemToGetForTransform(iTransformIndex), false, iToken, &mcc);
263  }
264 
271  }
272 
274  try {
275  convertException::wrap([&]() {
276  ModuleBeginJobSignalSentry cpp(actReg_.get(), *description());
277  implBeginJob();
278  });
279  } catch (cms::Exception& ex) {
280  state_ = Exception;
281  std::ostringstream ost;
282  ost << "Calling beginJob for module " << description()->moduleName() << "/'" << description()->moduleLabel()
283  << "'";
284  ex.addContext(ost.str());
285  throw;
286  }
287  }
288 
289  void Worker::endJob() {
290  try {
291  convertException::wrap([&]() {
293  assert(desc != nullptr);
294  ModuleEndJobSignalSentry cpp(actReg_.get(), *desc);
295  implEndJob();
296  });
297  } catch (cms::Exception& ex) {
298  state_ = Exception;
299  std::ostringstream ost;
300  ost << "Calling endJob for module " << description()->moduleName() << "/'" << description()->moduleLabel() << "'";
301  ex.addContext(ost.str());
302  throw;
303  }
304  }
305 
306  void Worker::beginStream(StreamID id, StreamContext& streamContext) {
307  try {
308  convertException::wrap([&]() {
310  streamContext.setEventID(EventID(0, 0, 0));
311  streamContext.setRunIndex(RunIndex::invalidRunIndex());
313  streamContext.setTimestamp(Timestamp());
314  ParentContext parentContext(&streamContext);
315  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
317  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
318  implBeginStream(id);
319  });
320  } catch (cms::Exception& ex) {
321  state_ = Exception;
322  std::ostringstream ost;
323  ost << "Calling beginStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
324  << "'";
325  ex.addContext(ost.str());
326  throw;
327  }
328  }
329 
330  void Worker::endStream(StreamID id, StreamContext& streamContext) {
331  try {
332  convertException::wrap([&]() {
334  streamContext.setEventID(EventID(0, 0, 0));
335  streamContext.setRunIndex(RunIndex::invalidRunIndex());
337  streamContext.setTimestamp(Timestamp());
338  ParentContext parentContext(&streamContext);
339  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
341  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
342  implEndStream(id);
343  });
344  } catch (cms::Exception& ex) {
345  state_ = Exception;
346  std::ostringstream ost;
347  ost << "Calling endStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
348  << "'";
349  ex.addContext(ost.str());
350  throw;
351  }
352  }
353 
355  try {
357  } catch (cms::Exception& ex) {
358  ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
359  throw ex;
360  }
361  }
362 
364  if (earlyDeleteHelper_) {
365  earlyDeleteHelper_->pathFinished(iEvent);
366  }
367  if (0 == --numberOfPathsLeftToRun_) {
369  }
370  }
371 
373  if (earlyDeleteHelper_) {
374  earlyDeleteHelper_->moduleRan(iEvent);
375  }
376  }
377 
379  ParentContext const& parentContext,
380  WaitingTaskWithArenaHolder& holder) {
381  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
382  try {
383  convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
384  } catch (cms::Exception& ex) {
386  if (shouldRethrowException(std::current_exception(), parentContext, true)) {
387  timesRun_.fetch_add(1, std::memory_order_relaxed);
388  throw;
389  }
390  }
391  }
392 
393  void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr iEPtr,
394  EventTransitionInfo const& eventTransitionInfo,
395  ParentContext const& parentContext,
398  std::exception_ptr exceptionPtr;
399  if (iEPtr) {
400  if (shouldRethrowException(iEPtr, parentContext, true)) {
401  exceptionPtr = iEPtr;
402  }
404  } else {
405  // Caught exception is propagated via WaitingTaskWithArenaHolder
406  CMS_SA_ALLOW try {
407  runAcquire(eventTransitionInfo, parentContext, holder);
409  } catch (...) {
410  exceptionPtr = std::current_exception();
411  }
412  }
413  // It is important this is after runAcquire completely finishes
414  holder.doneWaiting(exceptionPtr);
415  }
416 
417  std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const& parentContext) {
419  try {
420  convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
421  } catch (cms::Exception& ex) {
422  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
424  return std::current_exception();
425  }
426  }
427  return iEPtr;
428  }
429 
431  oneapi::tbb::task_group* group,
432  WaitingTask* runModuleTask,
433  ParentContext const& parentContext)
434  : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
435 
437  auto excptr = exceptionPtr();
438  WaitingTaskHolder holder(*m_group, m_runModuleTask);
439  if (excptr) {
440  holder.doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
441  }
442  }
443 } // namespace edm
ModuleCallingContext const * previousModuleOnThread() const
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:265
static const TGPicture * info(bool iBackgroundIsBlack)
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:70
#define CMS_SA_ALLOW
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:607
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:237
void prefetchAsync(WaitingTaskHolder iTask, ESProxyIndex iProxyIndex, EventSetupImpl const *, ServiceToken const &, ESParentContext) const
prefetch the data to setup for subsequent calls to getImplementation
virtual ~Worker()
Definition: Worker.cc:106
void endJob()
Definition: Worker.cc:289
virtual void execute()=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
std::string const & moduleName() const
void prefetchAsync(WaitingTaskHolder waitTask, ProductResolverIndex index, bool skipCurrentProcess, ServiceToken const &token, ModuleCallingContext const *mcc) const
Definition: Principal.cc:647
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.cc:393
std::string const & category() const
Definition: Exception.cc:143
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:417
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:306
assert(be >=bs)
ExceptionToActionTable const * actions_
Definition: Worker.h:611
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:620
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:108
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
ModuleCallingContext const * getTopModuleCallingContext() const
void setTransition(Transition v)
Definition: StreamContext.h:66
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:150
std::exception_ptr cached_exception_
Definition: Worker.h:612
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
oneapi::tbb::task_group * group() const noexcept
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *)
Definition: Worker.cc:240
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
PlaceInPathContext const * placeInPathContext() const
int iEvent
Definition: GenABIO.cc:224
void doneWaiting(std::exception_ptr iExcept)
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
void doneWaiting(std::exception_ptr iExcept)
virtual void implDoTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ParentContext const &, ServiceWeakToken const &)=0
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:69
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:110
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:354
Transition
Definition: Transition.h:12
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:89
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::atomic< State > state_
Definition: Worker.h:605
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:199
virtual void implEndJob()=0
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ServiceToken lock() const
Definition: ServiceToken.h:101
virtual size_t transformIndex(edm::BranchDescription const &) const =0
Definition: Worker.cc:239
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:430
ModuleDescription const * description() const
Definition: Worker.h:198
exception_actions::ActionCodes find(const std::string &category) const
virtual void implBeginJob()=0
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:224
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:363
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual void implBeginStream(StreamID)=0
virtual void implEndStream(StreamID)=0
void setEventID(EventID const &v)
Definition: StreamContext.h:67
HLT enums.
double a
Definition: hdecay.h:119
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:378
void beginJob()
Definition: Worker.cc:273
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:330
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:372
PathContext const * pathContext() const
auto wrap(F iFunc) -> decltype(iFunc())
unsigned int decrement_ref_count()
Definition: TaskBase.h:42
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:616
std::atomic< int > timesRun_
Definition: Worker.h:600
bool isEndPath() const
Definition: PathContext.h:35
eventsetup::EventSetupRecordImpl const * findImpl(const eventsetup::EventSetupRecordKey &) const
std::string const & moduleLabel() const
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:68
void increment_ref_count()
Definition: TaskBase.h:41
BranchType const & branchType() const
Definition: Principal.h:181
ParentContext const & parent() const
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const =0