CMS 3D CMS Logo

Worker.cc
Go to the documentation of this file.
1 
2 /*----------------------------------------------------------------------
3 ----------------------------------------------------------------------*/
16 
17 namespace edm {
18  namespace {
19  class ModuleBeginJobSignalSentry {
20  public:
21  ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
22  if (a_)
23  a_->preModuleBeginJobSignal_(*md_);
24  }
25  ~ModuleBeginJobSignalSentry() {
26  if (a_)
27  a_->postModuleBeginJobSignal_(*md_);
28  }
29 
30  private:
31  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
32  ModuleDescription const* md_;
33  };
34 
35  class ModuleEndJobSignalSentry {
36  public:
37  ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
38  if (a_)
39  a_->preModuleEndJobSignal_(*md_);
40  }
41  ~ModuleEndJobSignalSentry() {
42  if (a_)
43  a_->postModuleEndJobSignal_(*md_);
44  }
45 
46  private:
47  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
48  ModuleDescription const* md_;
49  };
50 
51  class ModuleBeginStreamSignalSentry {
52  public:
53  ModuleBeginStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
54  : a_(a), sc_(sc), mcc_(mcc) {
55  if (a_)
56  a_->preModuleBeginStreamSignal_(sc_, mcc_);
57  }
58  ~ModuleBeginStreamSignalSentry() {
59  if (a_)
60  a_->postModuleBeginStreamSignal_(sc_, mcc_);
61  }
62 
63  private:
64  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
65  StreamContext const& sc_;
66  ModuleCallingContext const& mcc_;
67  };
68 
69  class ModuleEndStreamSignalSentry {
70  public:
71  ModuleEndStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
72  : a_(a), sc_(sc), mcc_(mcc) {
73  if (a_)
74  a_->preModuleEndStreamSignal_(sc_, mcc_);
75  }
76  ~ModuleEndStreamSignalSentry() {
77  if (a_)
78  a_->postModuleEndStreamSignal_(sc_, mcc_);
79  }
80 
81  private:
82  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
83  StreamContext const& sc_;
84  ModuleCallingContext const& mcc_;
85  };
86 
87  } // namespace
88 
90  : timesRun_(0),
91  timesVisited_(0),
92  timesPassed_(0),
93  timesFailed_(0),
94  timesExcept_(0),
95  state_(Ready),
96  numberOfPathsOn_(0),
97  numberOfPathsLeftToRun_(0),
98  moduleCallingContext_(&iMD),
99  actions_(iActions),
100  cached_exception_(),
101  actReg_(),
102  earlyDeleteHelper_(nullptr),
103  workStarted_(false),
104  ranAcquireWithoutException_(false) {}
105 
107 
108  void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
109 
110  bool Worker::shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const {
111  // NOTE: the warning printed as a result of ignoring or failing
112  // a module will only be printed during the full true processing
113  // pass of this module
114 
115  // Get the action corresponding to this exception. However, if processing
116  // something other than an event (e.g. run, lumi) always rethrow.
117  if (not isEvent) {
118  return true;
119  }
120  try {
121  convertException::wrap([&]() { std::rethrow_exception(iPtr); });
122  } catch (cms::Exception& ex) {
124 
126  return true;
127  }
128 
129  ModuleCallingContext tempContext(description(), ModuleCallingContext::State::kInvalid, parentContext, nullptr);
130 
131  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
132  // as IgnoreCompletely, so any subsequent OutputModules are still run.
133  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
134  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
135  if (top_mcc->type() == ParentContext::Type::kPlaceInPath &&
136  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
140  }
141  }
143  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
144  return false;
145  }
146  }
147  return true;
148  }
149 
150  void Worker::prePrefetchSelectionAsync(oneapi::tbb::task_group& group,
151  WaitingTask* successTask,
152  ServiceToken const& token,
153  StreamID id,
154  EventPrincipal const* iPrincipal) {
155  successTask->increment_ref_count();
156 
157  ServiceWeakToken weakToken = token;
158  auto choiceTask =
159  edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
160  ServiceRegistry::Operate guard(weakToken.lock());
161  // There is no reasonable place to rethrow, and implDoPrePrefetchSelection() should not throw in the first place.
162  CMS_SA_ALLOW try {
163  if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
164  timesRun_.fetch_add(1, std::memory_order_relaxed);
165  setPassed<true>();
166  waitingTasks_.doneWaiting(nullptr);
167  //TBB requires that destroyed tasks have count 0
168  if (0 == successTask->decrement_ref_count()) {
169  TaskSentry s(successTask);
170  }
171  return;
172  }
173  } catch (...) {
174  }
175  if (0 == successTask->decrement_ref_count()) {
176  group.run([successTask]() {
177  TaskSentry s(successTask);
178  successTask->execute();
179  });
180  }
181  });
182 
183  WaitingTaskHolder choiceHolder{group, choiceTask};
184 
185  std::vector<ProductResolverIndexAndSkipBit> items;
187 
188  for (auto const& item : items) {
189  ProductResolverIndex productResolverIndex = item.productResolverIndex();
190  bool skipCurrentProcess = item.skipCurrentProcess();
191  if (productResolverIndex != ProductResolverIndexAmbiguous) {
192  iPrincipal->prefetchAsync(
193  choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
194  }
195  }
196  choiceHolder.doneWaiting(std::exception_ptr{});
197  }
198 
200  EventSetupImpl const& iImpl,
201  Transition iTrans,
202  ServiceToken const& iToken) {
204  return;
205  }
206  auto const& recs = esRecordsToGetFrom(iTrans);
207  auto const& items = esItemsToGetFrom(iTrans);
208 
209  assert(items.size() == recs.size());
210  if (items.empty()) {
211  return;
212  }
213 
214  for (size_t i = 0; i != items.size(); ++i) {
215  if (recs[i] != ESRecordIndex{}) {
216  auto rec = iImpl.findImpl(recs[i]);
217  if (rec) {
218  rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
219  }
220  }
221  }
222  }
223 
224  void Worker::edPrefetchAsync(WaitingTaskHolder iTask, ServiceToken const& token, Principal const& iPrincipal) const {
225  // Prefetch products the module declares it consumes
226  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
227 
228  for (auto const& item : items) {
229  ProductResolverIndex productResolverIndex = item.productResolverIndex();
230  bool skipCurrentProcess = item.skipCurrentProcess();
231  if (productResolverIndex != ProductResolverIndexAmbiguous) {
232  iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
233  }
234  }
235  }
236 
238 
245  }
246 
248  try {
249  convertException::wrap([&]() {
250  ModuleBeginJobSignalSentry cpp(actReg_.get(), *description());
251  implBeginJob();
252  });
253  } catch (cms::Exception& ex) {
254  state_ = Exception;
255  std::ostringstream ost;
256  ost << "Calling beginJob for module " << description()->moduleName() << "/'" << description()->moduleLabel()
257  << "'";
258  ex.addContext(ost.str());
259  throw;
260  }
261  }
262 
263  void Worker::endJob() {
264  try {
265  convertException::wrap([&]() {
267  assert(desc != nullptr);
268  ModuleEndJobSignalSentry cpp(actReg_.get(), *desc);
269  implEndJob();
270  });
271  } catch (cms::Exception& ex) {
272  state_ = Exception;
273  std::ostringstream ost;
274  ost << "Calling endJob for module " << description()->moduleName() << "/'" << description()->moduleLabel() << "'";
275  ex.addContext(ost.str());
276  throw;
277  }
278  }
279 
280  void Worker::beginStream(StreamID id, StreamContext& streamContext) {
281  try {
282  convertException::wrap([&]() {
284  streamContext.setEventID(EventID(0, 0, 0));
285  streamContext.setRunIndex(RunIndex::invalidRunIndex());
287  streamContext.setTimestamp(Timestamp());
288  ParentContext parentContext(&streamContext);
289  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
291  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
292  implBeginStream(id);
293  });
294  } catch (cms::Exception& ex) {
295  state_ = Exception;
296  std::ostringstream ost;
297  ost << "Calling beginStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
298  << "'";
299  ex.addContext(ost.str());
300  throw;
301  }
302  }
303 
304  void Worker::endStream(StreamID id, StreamContext& streamContext) {
305  try {
306  convertException::wrap([&]() {
308  streamContext.setEventID(EventID(0, 0, 0));
309  streamContext.setRunIndex(RunIndex::invalidRunIndex());
311  streamContext.setTimestamp(Timestamp());
312  ParentContext parentContext(&streamContext);
313  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
315  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
316  implEndStream(id);
317  });
318  } catch (cms::Exception& ex) {
319  state_ = Exception;
320  std::ostringstream ost;
321  ost << "Calling endStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
322  << "'";
323  ex.addContext(ost.str());
324  throw;
325  }
326  }
327 
329  try {
331  } catch (cms::Exception& ex) {
332  ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
333  throw ex;
334  }
335  }
336 
338  if (earlyDeleteHelper_) {
339  earlyDeleteHelper_->pathFinished(iEvent);
340  }
341  if (0 == --numberOfPathsLeftToRun_) {
343  }
344  }
345 
347  if (earlyDeleteHelper_) {
348  earlyDeleteHelper_->moduleRan(iEvent);
349  }
350  }
351 
353  ParentContext const& parentContext,
354  WaitingTaskWithArenaHolder& holder) {
355  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
356  try {
357  convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
358  } catch (cms::Exception& ex) {
360  if (shouldRethrowException(std::current_exception(), parentContext, true)) {
361  timesRun_.fetch_add(1, std::memory_order_relaxed);
362  throw;
363  }
364  }
365  }
366 
367  void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
368  EventTransitionInfo const& eventTransitionInfo,
369  ParentContext const& parentContext,
372  std::exception_ptr exceptionPtr;
373  if (iEPtr) {
374  assert(*iEPtr);
375  if (shouldRethrowException(*iEPtr, parentContext, true)) {
376  exceptionPtr = *iEPtr;
377  }
379  } else {
380  // Caught exception is propagated via WaitingTaskWithArenaHolder
381  CMS_SA_ALLOW try {
382  runAcquire(eventTransitionInfo, parentContext, holder);
384  } catch (...) {
385  exceptionPtr = std::current_exception();
386  }
387  }
388  // It is important this is after runAcquire completely finishes
389  holder.doneWaiting(exceptionPtr);
390  }
391 
392  std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr const* iEPtr,
393  ParentContext const& parentContext) {
395  try {
396  convertException::wrap([iEPtr]() { std::rethrow_exception(*iEPtr); });
397  } catch (cms::Exception& ex) {
398  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
400  return std::current_exception();
401  }
402  }
403  return *iEPtr;
404  }
405 
407  oneapi::tbb::task_group* group,
408  WaitingTask* runModuleTask,
409  ParentContext const& parentContext)
410  : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
411 
413  auto excptr = exceptionPtr();
414  WaitingTaskHolder holder(*m_group, m_runModuleTask);
415  if (excptr) {
416  holder.doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
417  }
418  }
419 } // namespace edm
ModuleCallingContext const * previousModuleOnThread() const
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:239
static const TGPicture * info(bool iBackgroundIsBlack)
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:69
#define CMS_SA_ALLOW
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:574
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:237
void prefetchAsync(WaitingTaskHolder iTask, ESProxyIndex iProxyIndex, EventSetupImpl const *, ServiceToken const &, ESParentContext) const
prefetch the data to setup for subsequent calls to getImplementation
virtual ~Worker()
Definition: Worker.cc:106
void endJob()
Definition: Worker.cc:263
virtual void execute()=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:581
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.cc:367
std::string const & moduleName() const
void prefetchAsync(WaitingTaskHolder waitTask, ProductResolverIndex index, bool skipCurrentProcess, ServiceToken const &token, ModuleCallingContext const *mcc) const
Definition: Principal.cc:639
std::string const & category() const
Definition: Exception.cc:143
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:280
assert(be >=bs)
ExceptionToActionTable const * actions_
Definition: Worker.h:578
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:587
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:108
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:576
ModuleCallingContext const * getTopModuleCallingContext() const
void setTransition(Transition v)
Definition: StreamContext.h:65
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:150
std::exception_ptr cached_exception_
Definition: Worker.h:579
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
PlaceInPathContext const * placeInPathContext() const
int iEvent
Definition: GenABIO.cc:224
void doneWaiting(std::exception_ptr iExcept)
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:585
void doneWaiting(std::exception_ptr iExcept)
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:68
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:110
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:328
Transition
Definition: Transition.h:12
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:89
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
std::atomic< State > state_
Definition: Worker.h:572
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:199
virtual void implEndJob()=0
static LuminosityBlockIndex invalidLuminosityBlockIndex()
ServiceToken lock() const
Definition: ServiceToken.h:101
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:406
ModuleDescription const * description() const
Definition: Worker.h:188
exception_actions::ActionCodes find(const std::string &category) const
virtual void implBeginJob()=0
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:224
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:337
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual void implBeginStream(StreamID)=0
virtual void implEndStream(StreamID)=0
void setEventID(EventID const &v)
Definition: StreamContext.h:66
HLT enums.
double a
Definition: hdecay.h:119
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:352
void beginJob()
Definition: Worker.cc:247
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:304
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:346
PathContext const * pathContext() const
auto wrap(F iFunc) -> decltype(iFunc())
unsigned int decrement_ref_count()
Definition: TaskBase.h:42
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:583
std::atomic< int > timesRun_
Definition: Worker.h:567
bool isEndPath() const
Definition: PathContext.h:35
eventsetup::EventSetupRecordImpl const * findImpl(const eventsetup::EventSetupRecordKey &) const
std::string const & moduleLabel() const
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:67
void increment_ref_count()
Definition: TaskBase.h:41
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:392
BranchType const & branchType() const
Definition: Principal.h:181
ParentContext const & parent() const
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)