CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
Worker.cc
Go to the documentation of this file.
1 
2 /*----------------------------------------------------------------------
3 ----------------------------------------------------------------------*/
17 #include "tbb/global_control.h"
18 
19 namespace edm {
20  namespace {
21  class ModuleBeginJobSignalSentry {
22  public:
23  ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
24  if (a_)
25  a_->preModuleBeginJobSignal_(*md_);
26  }
27  ~ModuleBeginJobSignalSentry() {
28  if (a_)
29  a_->postModuleBeginJobSignal_(*md_);
30  }
31 
32  private:
33  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
34  ModuleDescription const* md_;
35  };
36 
37  class ModuleEndJobSignalSentry {
38  public:
39  ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
40  if (a_)
41  a_->preModuleEndJobSignal_(*md_);
42  }
43  ~ModuleEndJobSignalSentry() {
44  if (a_)
45  a_->postModuleEndJobSignal_(*md_);
46  }
47 
48  private:
49  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
50  ModuleDescription const* md_;
51  };
52 
53  class ModuleBeginStreamSignalSentry {
54  public:
55  ModuleBeginStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
56  : a_(a), sc_(sc), mcc_(mcc) {
57  if (a_)
58  a_->preModuleBeginStreamSignal_(sc_, mcc_);
59  }
60  ~ModuleBeginStreamSignalSentry() {
61  if (a_)
62  a_->postModuleBeginStreamSignal_(sc_, mcc_);
63  }
64 
65  private:
66  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
67  StreamContext const& sc_;
68  ModuleCallingContext const& mcc_;
69  };
70 
71  class ModuleEndStreamSignalSentry {
72  public:
73  ModuleEndStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
74  : a_(a), sc_(sc), mcc_(mcc) {
75  if (a_)
76  a_->preModuleEndStreamSignal_(sc_, mcc_);
77  }
78  ~ModuleEndStreamSignalSentry() {
79  if (a_)
80  a_->postModuleEndStreamSignal_(sc_, mcc_);
81  }
82 
83  private:
84  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
85  StreamContext const& sc_;
86  ModuleCallingContext const& mcc_;
87  };
88 
89  } // namespace
90 
92  : timesRun_(0),
93  timesVisited_(0),
94  timesPassed_(0),
95  timesFailed_(0),
96  timesExcept_(0),
97  state_(Ready),
98  numberOfPathsOn_(0),
99  numberOfPathsLeftToRun_(0),
100  moduleCallingContext_(&iMD),
101  actions_(iActions),
102  cached_exception_(),
103  actReg_(),
104  earlyDeleteHelper_(nullptr),
105  workStarted_(false),
106  ranAcquireWithoutException_(false) {}
107 
109 
110  void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
111 
112  bool Worker::shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const {
113  // NOTE: the warning printed as a result of ignoring or failing
114  // a module will only be printed during the full true processing
115  // pass of this module
116 
117  // Get the action corresponding to this exception. However, if processing
118  // something other than an event (e.g. run, lumi) always rethrow.
119  if (not isEvent) {
120  return true;
121  }
122  try {
123  convertException::wrap([&]() { std::rethrow_exception(iPtr); });
124  } catch (cms::Exception& ex) {
126 
127  if (action == exception_actions::Rethrow) {
128  return true;
129  }
130 
131  ModuleCallingContext tempContext(description(), ModuleCallingContext::State::kInvalid, parentContext, nullptr);
132 
133  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
134  // as IgnoreCompletely, so any subsequent OutputModules are still run.
135  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
136  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
137  if (top_mcc->type() == ParentContext::Type::kPlaceInPath &&
138  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
139  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
140  action == exception_actions::FailPath) {
142  }
143  }
144  if (action == exception_actions::IgnoreCompletely) {
145  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
146  return false;
147  }
148  }
149  return true;
150  }
151 
153  WaitingTask* successTask,
154  ServiceToken const& token,
155  StreamID id,
156  EventPrincipal const* iPrincipal) {
157  successTask->increment_ref_count();
158 
159  ServiceWeakToken weakToken = token;
160  auto choiceTask =
161  edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
162  ServiceRegistry::Operate guard(weakToken.lock());
163  // There is no reasonable place to rethrow, and implDoPrePrefetchSelection() should not throw in the first place.
164  CMS_SA_ALLOW try {
165  if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
166  timesRun_.fetch_add(1, std::memory_order_relaxed);
167  setPassed<true>();
168  waitingTasks_.doneWaiting(nullptr);
169  //TBB requires that destroyed tasks have count 0
170  if (0 == successTask->decrement_ref_count()) {
171  TaskSentry s(successTask);
172  }
173  return;
174  }
175  } catch (...) {
176  }
177  if (0 == successTask->decrement_ref_count()) {
178  group.run([successTask]() {
179  TaskSentry s(successTask);
180  successTask->execute();
181  });
182  }
183  });
184 
185  WaitingTaskHolder choiceHolder{group, choiceTask};
186 
187  std::vector<ProductResolverIndexAndSkipBit> items;
188  itemsToGetForSelection(items);
189 
190  for (auto const& item : items) {
191  ProductResolverIndex productResolverIndex = item.productResolverIndex();
192  bool skipCurrentProcess = item.skipCurrentProcess();
193  if (productResolverIndex != ProductResolverIndexAmbiguous) {
194  iPrincipal->prefetchAsync(
195  choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
196  }
197  }
198  choiceHolder.doneWaiting(std::exception_ptr{});
199  }
200 
202  EventSetupImpl const& iImpl,
203  Transition iTrans,
204  ServiceToken const& iToken) {
206  return;
207  }
208  auto const& recs = esRecordsToGetFrom(iTrans);
209  auto const& items = esItemsToGetFrom(iTrans);
210 
211  assert(items.size() == recs.size());
212  if (items.empty()) {
213  return;
214  }
215 
216  //Thread case of 1 thread special. The master thread is doing a wait_for_all on the
217  // default tbb arena. It will not process any tasks on the es arena. We need to add a
218  // task that will synchronously do a wait_for_all in the es arena to be sure prefetching
219  // will work.
220 
221  if UNLIKELY (tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) == 1) {
222  auto taskGroup = iTask.group();
223  ServiceWeakToken weakToken = iToken;
224  taskGroup->run([this, task = std::move(iTask), iTrans, &iImpl, weakToken]() {
225  std::exception_ptr exceptPtr{};
226  iImpl.taskArena()->execute([this, iTrans, &iImpl, weakToken, &exceptPtr]() {
227  exceptPtr = syncWait([&](WaitingTaskHolder&& iHolder) {
228  auto const& recs = esRecordsToGetFrom(iTrans);
229  auto const& items = esItemsToGetFrom(iTrans);
230  auto hWaitTask = std::move(iHolder);
231  auto token = weakToken.lock();
232  for (size_t i = 0; i != items.size(); ++i) {
233  if (recs[i] != ESRecordIndex{}) {
234  auto rec = iImpl.findImpl(recs[i]);
235  if (rec) {
236  rec->prefetchAsync(hWaitTask, items[i], &iImpl, token, ESParentContext(&moduleCallingContext_));
237  }
238  }
239  }
240  }); //syncWait
241  }); //esTaskArena().execute
242  //note use of a copy gets around declaring the lambda as mutable
243  auto tempTask = task;
244  tempTask.doneWaiting(exceptPtr);
245  }); //group.run
246  } else {
247  auto group = iTask.group();
248  //We need iTask to run in the default arena since it is not an ES task
249  auto task = make_waiting_task(
250  [holder = WaitingTaskWithArenaHolder(std::move(iTask))](std::exception_ptr const* iExcept) mutable {
251  if (iExcept) {
252  holder.doneWaiting(*iExcept);
253  } else {
254  holder.doneWaiting(std::exception_ptr{});
255  }
256  });
257 
258  WaitingTaskHolder tempH(*group, task);
259  iImpl.taskArena()->execute([&]() {
260  for (size_t i = 0; i != items.size(); ++i) {
261  if (recs[i] != ESRecordIndex{}) {
262  auto rec = iImpl.findImpl(recs[i]);
263  if (rec) {
264  rec->prefetchAsync(tempH, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
265  }
266  }
267  }
268  });
269  }
270  }
271 
272  void Worker::edPrefetchAsync(WaitingTaskHolder iTask, ServiceToken const& token, Principal const& iPrincipal) const {
273  // Prefetch products the module declares it consumes
274  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
275 
276  for (auto const& item : items) {
277  ProductResolverIndex productResolverIndex = item.productResolverIndex();
278  bool skipCurrentProcess = item.skipCurrentProcess();
279  if (productResolverIndex != ProductResolverIndexAmbiguous) {
280  iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
281  }
282  }
283  }
284 
286 
293  }
294 
296  try {
297  convertException::wrap([&]() {
298  ModuleBeginJobSignalSentry cpp(actReg_.get(), *description());
299  implBeginJob();
300  });
301  } catch (cms::Exception& ex) {
302  state_ = Exception;
303  std::ostringstream ost;
304  ost << "Calling beginJob for module " << description()->moduleName() << "/'" << description()->moduleLabel()
305  << "'";
306  ex.addContext(ost.str());
307  throw;
308  }
309  }
310 
311  void Worker::endJob() {
312  try {
313  convertException::wrap([&]() {
315  assert(desc != nullptr);
316  ModuleEndJobSignalSentry cpp(actReg_.get(), *desc);
317  implEndJob();
318  });
319  } catch (cms::Exception& ex) {
320  state_ = Exception;
321  std::ostringstream ost;
322  ost << "Calling endJob for module " << description()->moduleName() << "/'" << description()->moduleLabel() << "'";
323  ex.addContext(ost.str());
324  throw;
325  }
326  }
327 
328  void Worker::beginStream(StreamID id, StreamContext& streamContext) {
329  try {
330  convertException::wrap([&]() {
332  streamContext.setEventID(EventID(0, 0, 0));
333  streamContext.setRunIndex(RunIndex::invalidRunIndex());
335  streamContext.setTimestamp(Timestamp());
336  ParentContext parentContext(&streamContext);
337  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
339  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
340  implBeginStream(id);
341  });
342  } catch (cms::Exception& ex) {
343  state_ = Exception;
344  std::ostringstream ost;
345  ost << "Calling beginStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
346  << "'";
347  ex.addContext(ost.str());
348  throw;
349  }
350  }
351 
352  void Worker::endStream(StreamID id, StreamContext& streamContext) {
353  try {
354  convertException::wrap([&]() {
356  streamContext.setEventID(EventID(0, 0, 0));
357  streamContext.setRunIndex(RunIndex::invalidRunIndex());
359  streamContext.setTimestamp(Timestamp());
360  ParentContext parentContext(&streamContext);
361  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
363  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
364  implEndStream(id);
365  });
366  } catch (cms::Exception& ex) {
367  state_ = Exception;
368  std::ostringstream ost;
369  ost << "Calling endStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
370  << "'";
371  ex.addContext(ost.str());
372  throw;
373  }
374  }
375 
377  try {
378  implRegisterThinnedAssociations(registry, helper);
379  } catch (cms::Exception& ex) {
380  ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
381  throw ex;
382  }
383  }
384 
386  if (earlyDeleteHelper_) {
387  earlyDeleteHelper_->pathFinished(iEvent);
388  }
389  if (0 == --numberOfPathsLeftToRun_) {
391  }
392  }
393 
395  if (earlyDeleteHelper_) {
396  earlyDeleteHelper_->moduleRan(iEvent);
397  }
398  }
399 
401  ParentContext const& parentContext,
402  WaitingTaskWithArenaHolder& holder) {
403  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
404  try {
405  convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
406  } catch (cms::Exception& ex) {
408  if (shouldRethrowException(std::current_exception(), parentContext, true)) {
409  timesRun_.fetch_add(1, std::memory_order_relaxed);
410  throw;
411  }
412  }
413  }
414 
415  void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
416  EventTransitionInfo const& eventTransitionInfo,
417  ParentContext const& parentContext,
420  std::exception_ptr exceptionPtr;
421  if (iEPtr) {
422  assert(*iEPtr);
423  if (shouldRethrowException(*iEPtr, parentContext, true)) {
424  exceptionPtr = *iEPtr;
425  }
427  } else {
428  // Caught exception is propagated via WaitingTaskWithArenaHolder
429  CMS_SA_ALLOW try {
430  runAcquire(eventTransitionInfo, parentContext, holder);
432  } catch (...) {
433  exceptionPtr = std::current_exception();
434  }
435  }
436  // It is important this is after runAcquire completely finishes
437  holder.doneWaiting(exceptionPtr);
438  }
439 
440  std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr const* iEPtr,
441  ParentContext const& parentContext) {
443  try {
444  convertException::wrap([iEPtr]() { std::rethrow_exception(*iEPtr); });
445  } catch (cms::Exception& ex) {
446  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
448  return std::current_exception();
449  }
450  }
451  return *iEPtr;
452  }
453 
455  tbb::task_group* group,
456  WaitingTask* runModuleTask,
457  ParentContext const& parentContext)
458  : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
459 
461  auto excptr = exceptionPtr();
462  WaitingTaskHolder holder(*m_group, m_runModuleTask);
463  if (excptr) {
464  holder.doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
465  }
466  }
467 } // namespace edm
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
ServiceToken lock() const
Definition: ServiceToken.h:101
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:287
static const TGPicture * info(bool iBackgroundIsBlack)
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:69
#define CMS_SA_ALLOW
ModuleDescription const * description() const
Definition: Worker.h:188
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:574
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
unsigned int ProductResolverIndex
std::exception_ptr syncWait(F &&iFunc)
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:285
ModuleCallingContext const * getTopModuleCallingContext() const
virtual ~Worker()
Definition: Worker.cc:108
void prefetchAsync(WaitingTaskHolder iTask, ESProxyIndex iProxyIndex, EventSetupImpl const *, ServiceToken const &, ESParentContext) const
prefetch the data to setup for subsequent calls to getImplementation
void endJob()
Definition: Worker.cc:311
virtual void execute()=0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:581
std::string const & moduleName() const
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder)
Definition: Worker.cc:415
std::string const & category() const
Definition: Exception.cc:143
exception_actions::ActionCodes find(const std::string &category) const
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:328
assert(be >=bs)
ExceptionToActionTable const * actions_
Definition: Worker.h:578
virtual std::vector< ESProxyIndex > const & esItemsToGetFrom(Transition) const =0
std::string const & moduleLabel() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:587
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:110
ParentContext const & parent() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:576
void setTransition(Transition v)
Definition: StreamContext.h:65
void prefetchAsync(WaitingTaskHolder waitTask, ProductResolverIndex index, bool skipCurrentProcess, ServiceToken const &token, ModuleCallingContext const *mcc) const
Definition: Principal.cc:639
std::exception_ptr cached_exception_
Definition: Worker.h:579
virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const &, ModuleCallingContext const *)=0
virtual std::vector< ESRecordIndex > const & esRecordsToGetFrom(Transition) const =0
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
bool isEndPath() const
Definition: PathContext.h:35
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:224
void doneWaiting(std::exception_ptr iExcept)
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:585
void doneWaiting(std::exception_ptr iExcept)
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:68
BranchType const & branchType() const
Definition: Principal.h:181
PathContext const * pathContext() const
def move
Definition: eostools.py:511
tbb::task_arena * taskArena() const
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:376
Transition
Definition: Transition.h:12
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:91
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
std::atomic< State > state_
Definition: Worker.h:572
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &)
Definition: Worker.cc:201
virtual void implEndJob()=0
static LuminosityBlockIndex invalidLuminosityBlockIndex()
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent) const
Definition: Worker.cc:112
areg
Definition: Schedule.cc:687
tuple group
Definition: watchdog.py:82
virtual void implBeginJob()=0
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:385
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual void implBeginStream(StreamID)=0
string action
Definition: mps_fire.py:183
tbb::task_group * group() const noexcept
virtual void implEndStream(StreamID)=0
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const
Definition: Worker.cc:272
void setEventID(EventID const &v)
Definition: StreamContext.h:66
ModuleCallingContext const * previousModuleOnThread() const
double a
Definition: hdecay.h:119
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:400
void beginJob()
Definition: Worker.cc:295
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:352
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:394
auto wrap(F iFunc) -> decltype(iFunc())
unsigned int decrement_ref_count()
Definition: TaskBase.h:42
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:583
#define UNLIKELY(x)
Definition: Likely.h:21
std::atomic< int > timesRun_
Definition: Worker.h:567
eventsetup::EventSetupRecordImpl const * findImpl(const eventsetup::EventSetupRecordKey &) const
PlaceInPathContext const * placeInPathContext() const
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:67
void prePrefetchSelectionAsync(tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:152
HandleExternalWorkExceptionTask(Worker *worker, tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:454
void increment_ref_count()
Definition: TaskBase.h:41
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:440
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)