CMS 3D CMS Logo

Worker.cc
Go to the documentation of this file.
1 
2 /*----------------------------------------------------------------------
3 ----------------------------------------------------------------------*/
4 
9 
10 namespace edm {
11  namespace {
12  class ModuleBeginJobSignalSentry {
13 public:
14  ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md):a_(a), md_(&md) {
15  if(a_) a_->preModuleBeginJobSignal_(*md_);
16  }
17  ~ModuleBeginJobSignalSentry() {
18  if(a_) a_->postModuleBeginJobSignal_(*md_);
19  }
20 private:
21  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
22  ModuleDescription const* md_;
23  };
24 
25  class ModuleEndJobSignalSentry {
26 public:
27  ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md):a_(a), md_(&md) {
28  if(a_) a_->preModuleEndJobSignal_(*md_);
29  }
30  ~ModuleEndJobSignalSentry() {
31  if(a_) a_->postModuleEndJobSignal_(*md_);
32  }
33 private:
34  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
35  ModuleDescription const* md_;
36  };
37 
38  class ModuleBeginStreamSignalSentry {
39  public:
40  ModuleBeginStreamSignalSentry(ActivityRegistry* a,
41  StreamContext const& sc,
42  ModuleCallingContext const& mcc) : a_(a), sc_(sc), mcc_(mcc) {
43  if(a_) a_->preModuleBeginStreamSignal_(sc_, mcc_);
44  }
45  ~ModuleBeginStreamSignalSentry() {
46  if(a_) a_->postModuleBeginStreamSignal_(sc_, mcc_);
47  }
48  private:
49  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
50  StreamContext const& sc_;
51  ModuleCallingContext const& mcc_;
52  };
53 
54  class ModuleEndStreamSignalSentry {
55  public:
56  ModuleEndStreamSignalSentry(ActivityRegistry* a,
57  StreamContext const& sc,
58  ModuleCallingContext const& mcc) : a_(a), sc_(sc), mcc_(mcc) {
59  if(a_) a_->preModuleEndStreamSignal_(sc_, mcc_);
60  }
61  ~ModuleEndStreamSignalSentry() {
62  if(a_) a_->postModuleEndStreamSignal_(sc_, mcc_);
63  }
64  private:
65  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
66  StreamContext const& sc_;
67  ModuleCallingContext const& mcc_;
68  };
69 
70  }
71 
73  ExceptionToActionTable const* iActions) :
74  timesRun_(0),
75  timesVisited_(0),
76  timesPassed_(0),
77  timesFailed_(0),
78  timesExcept_(0),
79  state_(Ready),
80  numberOfPathsOn_(0),
81  numberOfPathsLeftToRun_(0),
82  moduleCallingContext_(&iMD),
83  actions_(iActions),
84  cached_exception_(),
85  actReg_(),
86  earlyDeleteHelper_(nullptr),
87  workStarted_(false)
88  {
89  }
90 
92  }
93 
94  void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) {
95  actReg_ = areg;
96  }
97 
98 
100  cms::Exception& ex,
101  ModuleCallingContext const* mcc) {
102 
103  ModuleCallingContext const* imcc = mcc;
104  while( (imcc->type() == ParentContext::Type::kModule) or
105  (imcc->type() == ParentContext::Type::kInternal) ) {
106  std::ostringstream iost;
108  iost << "Prefetching for module ";
109  } else {
110  iost << "Calling method for module ";
111  }
112  iost << imcc->moduleDescription()->moduleName() << "/'"
113  << imcc->moduleDescription()->moduleLabel() << "'";
114 
115  if(imcc->type() == ParentContext::Type::kInternal) {
116  iost << " (probably inside some kind of mixing module)";
117  imcc = imcc->internalContext()->moduleCallingContext();
118  } else {
119  imcc = imcc->moduleCallingContext();
120  }
121  ex.addContext(iost.str());
122  }
123  std::ostringstream ost;
125  ost << "Prefetching for module ";
126  } else {
127  ost << "Calling method for module ";
128  }
129  ost << imcc->moduleDescription()->moduleName() << "/'"
130  << imcc->moduleDescription()->moduleLabel() << "'";
131  ex.addContext(ost.str());
132 
133  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
134  ost.str("");
135  ost << "Running path '";
136  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
137  ex.addContext(ost.str());
138  auto streamContext =imcc->placeInPathContext()->pathContext()->streamContext();
139  if(streamContext) {
140  ost.str("");
141  edm::exceptionContext(ost,*streamContext);
142  ex.addContext(ost.str());
143  }
144  } else {
145  if (imcc->type() == ParentContext::Type::kStream) {
146  ost.str("");
147  edm::exceptionContext(ost, *(imcc->streamContext()) );
148  ex.addContext(ost.str());
149  } else if(imcc->type() == ParentContext::Type::kGlobal) {
150  ost.str("");
151  edm::exceptionContext(ost, *(imcc->globalContext()) );
152  ex.addContext(ost.str());
153  }
154  }
155  }
156 
157  bool Worker::shouldRethrowException(std::exception_ptr iPtr,
158  ParentContext const& parentContext,
159  bool isEvent,
160  TransitionIDValueBase const& iID) const {
161 
162  // NOTE: the warning printed as a result of ignoring or failing
163  // a module will only be printed during the full true processing
164  // pass of this module
165 
166  // Get the action corresponding to this exception. However, if processing
167  // something other than an event (e.g. run, lumi) always rethrow.
168  if(not isEvent) {
169  return true;
170  }
171  try {
172  convertException::wrap([&]() {
173  std::rethrow_exception(iPtr);
174  });
175  } catch(cms::Exception &ex) {
177 
178  if(action == exception_actions::Rethrow) {
179  return true;
180  }
181 
182  ModuleCallingContext tempContext(&description(),ModuleCallingContext::State::kInvalid, parentContext, nullptr);
183 
184  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
185  // as IgnoreCompletely, so any subsequent OutputModules are still run.
186  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
187  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
188  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
189  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
190 
191  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
192  action == exception_actions::FailPath) {
194  }
195  }
197  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
198  return false;
199  }
200  }
201  return true;
202  }
203 
204 
205  void Worker::prefetchAsync(WaitingTask* iTask, ParentContext const& parentContext, Principal const& iPrincipal) {
206  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
207  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
208 
210 
211  if(iPrincipal.branchType()==InEvent) {
212  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
213  }
214 
215  //Need to be sure the ref count isn't set to 0 immediately
216  iTask->increment_ref_count();
217  for(auto const& item : items) {
218  ProductResolverIndex productResolverIndex = item.productResolverIndex();
219  bool skipCurrentProcess = item.skipCurrentProcess();
220  if(productResolverIndex != ProductResolverIndexAmbiguous) {
221  iPrincipal.prefetchAsync(iTask,productResolverIndex, skipCurrentProcess, &moduleCallingContext_);
222  }
223  }
224 
225  if(iPrincipal.branchType()==InEvent) {
227  }
228 
229  if(0 == iTask->decrement_ref_count()) {
230  //if everything finishes before we leave this routine, we need to launch the task
231  tbb::task::spawn(*iTask);
232  }
233  }
234 
236  earlyDeleteHelper_=iHelper;
237  }
238 
243  }
244 
246  try {
247  convertException::wrap([&]() {
248  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
249  implBeginJob();
250  });
251  }
252  catch(cms::Exception& ex) {
253  state_ = Exception;
254  std::ostringstream ost;
255  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
256  ex.addContext(ost.str());
257  throw;
258  }
259  }
260 
261  void Worker::endJob() {
262  try {
263  convertException::wrap([&]() {
264  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
265  implEndJob();
266  });
267  }
268  catch(cms::Exception& ex) {
269  state_ = Exception;
270  std::ostringstream ost;
271  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
272  ex.addContext(ost.str());
273  throw;
274  }
275  }
276 
277  void Worker::beginStream(StreamID id, StreamContext& streamContext) {
278  try {
279  convertException::wrap([&]() {
281  streamContext.setEventID(EventID(0, 0, 0));
282  streamContext.setRunIndex(RunIndex::invalidRunIndex());
284  streamContext.setTimestamp(Timestamp());
285  ParentContext parentContext(&streamContext);
286  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
288  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
289  implBeginStream(id);
290  });
291  }
292  catch(cms::Exception& ex) {
293  state_ = Exception;
294  std::ostringstream ost;
295  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
296  ex.addContext(ost.str());
297  throw;
298  }
299  }
300 
301  void Worker::endStream(StreamID id, StreamContext& streamContext) {
302  try {
303  convertException::wrap([&]() {
305  streamContext.setEventID(EventID(0, 0, 0));
306  streamContext.setRunIndex(RunIndex::invalidRunIndex());
308  streamContext.setTimestamp(Timestamp());
309  ParentContext parentContext(&streamContext);
310  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
312  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
313  implEndStream(id);
314  });
315  }
316  catch(cms::Exception& ex) {
317  state_ = Exception;
318  std::ostringstream ost;
319  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
320  ex.addContext(ost.str());
321  throw;
322  }
323  }
324 
326  if( 0 == --numberOfPathsLeftToRun_) {
328  }
329  }
330 
332  if(earlyDeleteHelper_) {
333  earlyDeleteHelper_->moduleRan(iEvent);
334  }
335  }
336 }
std::string const & pathName() const
Definition: PathContext.h:37
GlobalContext const * globalContext() const
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:239
void skipOnPath()
Definition: Worker.cc:325
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:69
ModuleDescription const & description() const
Definition: Worker.h:132
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:402
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
ModuleCallingContext const * moduleCallingContext() const
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:235
ModuleCallingContext const * getTopModuleCallingContext() const
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:99
virtual ~Worker()
Definition: Worker.cc:91
void endJob()
Definition: Worker.cc:261
InternalContext const * internalContext() const
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:409
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
ModuleCallingContext const * moduleCallingContext() const
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:277
void exceptionContext(std::ostream &, GlobalContext const &)
#define nullptr
ExceptionToActionTable const * actions_
Definition: Worker.h:406
std::string const & moduleLabel() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:94
ParentContext const & parent() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:404
void setTransition(Transition v)
Definition: StreamContext.h:65
std::exception_ptr cached_exception_
Definition: Worker.h:407
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
bool isEndPath() const
Definition: PathContext.h:42
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:230
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:413
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:68
BranchType const & branchType() const
Definition: Principal.h:178
PathContext const * pathContext() const
void prefetchAsync(WaitingTask *waitTask, ProductResolverIndex index, bool skipCurrentProcess, ModuleCallingContext const *mcc) const
Definition: Principal.cc:523
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
ModuleDescription const * moduleDescription() const
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:72
std::atomic< State > state_
Definition: Worker.h:400
virtual void implEndJob()=0
StreamContext const * streamContext() const
static LuminosityBlockIndex invalidLuminosityBlockIndex()
virtual void implBeginJob()=0
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:205
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
virtual void implEndStream(StreamID)=0
StreamContext const * streamContext() const
Definition: PathContext.h:38
void setEventID(EventID const &v)
Definition: StreamContext.h:66
ModuleCallingContext const * previousModuleOnThread() const
HLT enums.
double a
Definition: hdecay.h:121
void beginJob()
Definition: Worker.cc:245
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:301
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:331
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:411
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
PlaceInPathContext const * placeInPathContext() const
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:67
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)