CMS 3D CMS Logo

Worker.cc
Go to the documentation of this file.
1 
2 /*----------------------------------------------------------------------
3 ----------------------------------------------------------------------*/
4 
9 
10 namespace edm {
11  namespace {
12  class ModuleBeginJobSignalSentry {
13 public:
14  ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md):a_(a), md_(&md) {
15  if(a_) a_->preModuleBeginJobSignal_(*md_);
16  }
17  ~ModuleBeginJobSignalSentry() {
18  if(a_) a_->postModuleBeginJobSignal_(*md_);
19  }
20 private:
21  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
22  ModuleDescription const* md_;
23  };
24 
25  class ModuleEndJobSignalSentry {
26 public:
27  ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md):a_(a), md_(&md) {
28  if(a_) a_->preModuleEndJobSignal_(*md_);
29  }
30  ~ModuleEndJobSignalSentry() {
31  if(a_) a_->postModuleEndJobSignal_(*md_);
32  }
33 private:
34  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
35  ModuleDescription const* md_;
36  };
37 
38  class ModuleBeginStreamSignalSentry {
39  public:
40  ModuleBeginStreamSignalSentry(ActivityRegistry* a,
41  StreamContext const& sc,
42  ModuleCallingContext const& mcc) : a_(a), sc_(sc), mcc_(mcc) {
43  if(a_) a_->preModuleBeginStreamSignal_(sc_, mcc_);
44  }
45  ~ModuleBeginStreamSignalSentry() {
46  if(a_) a_->postModuleBeginStreamSignal_(sc_, mcc_);
47  }
48  private:
49  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
50  StreamContext const& sc_;
51  ModuleCallingContext const& mcc_;
52  };
53 
54  class ModuleEndStreamSignalSentry {
55  public:
56  ModuleEndStreamSignalSentry(ActivityRegistry* a,
57  StreamContext const& sc,
58  ModuleCallingContext const& mcc) : a_(a), sc_(sc), mcc_(mcc) {
59  if(a_) a_->preModuleEndStreamSignal_(sc_, mcc_);
60  }
61  ~ModuleEndStreamSignalSentry() {
62  if(a_) a_->postModuleEndStreamSignal_(sc_, mcc_);
63  }
64  private:
65  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
66  StreamContext const& sc_;
67  ModuleCallingContext const& mcc_;
68  };
69 
70  }
71 
73  ExceptionToActionTable const* iActions) :
74  timesRun_(0),
75  timesVisited_(0),
76  timesPassed_(0),
77  timesFailed_(0),
78  timesExcept_(0),
79  state_(Ready),
80  numberOfPathsOn_(0),
81  numberOfPathsLeftToRun_(0),
82  moduleCallingContext_(&iMD),
83  actions_(iActions),
84  cached_exception_(),
85  actReg_(),
86  earlyDeleteHelper_(nullptr),
87  workStarted_(false)
88  {
89  }
90 
92  }
93 
94  void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) {
95  actReg_ = areg;
96  }
97 
98 
100  cms::Exception& ex,
101  ModuleCallingContext const* mcc) {
102 
103  ModuleCallingContext const* imcc = mcc;
104  while( (imcc->type() == ParentContext::Type::kModule) or
105  (imcc->type() == ParentContext::Type::kInternal) ) {
106  std::ostringstream iost;
108  iost << "Prefetching for module ";
109  } else {
110  iost << "Calling method for module ";
111  }
112  iost << imcc->moduleDescription()->moduleName() << "/'"
113  << imcc->moduleDescription()->moduleLabel() << "'";
114 
115  if(imcc->type() == ParentContext::Type::kInternal) {
116  iost << " (probably inside some kind of mixing module)";
117  imcc = imcc->internalContext()->moduleCallingContext();
118  } else {
119  imcc = imcc->moduleCallingContext();
120  }
121  ex.addContext(iost.str());
122  }
123  std::ostringstream ost;
125  ost << "Prefetching for module ";
126  } else {
127  ost << "Calling method for module ";
128  }
129  ost << imcc->moduleDescription()->moduleName() << "/'"
130  << imcc->moduleDescription()->moduleLabel() << "'";
131  ex.addContext(ost.str());
132 
133  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
134  ost.str("");
135  ost << "Running path '";
136  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
137  ex.addContext(ost.str());
138  auto streamContext =imcc->placeInPathContext()->pathContext()->streamContext();
139  if(streamContext) {
140  ost.str("");
141  edm::exceptionContext(ost,*streamContext);
142  ex.addContext(ost.str());
143  }
144  } else {
145  if (imcc->type() == ParentContext::Type::kStream) {
146  ost.str("");
147  edm::exceptionContext(ost, *(imcc->streamContext()) );
148  ex.addContext(ost.str());
149  } else if(imcc->type() == ParentContext::Type::kGlobal) {
150  ost.str("");
151  edm::exceptionContext(ost, *(imcc->globalContext()) );
152  ex.addContext(ost.str());
153  }
154  }
155  }
156 
157  bool Worker::shouldRethrowException(std::exception_ptr iPtr,
158  ParentContext const& parentContext,
159  bool isEvent,
160  TransitionIDValueBase const& iID) const {
161 
162  // NOTE: the warning printed as a result of ignoring or failing
163  // a module will only be printed during the full true processing
164  // pass of this module
165 
166  // Get the action corresponding to this exception. However, if processing
167  // something other than an event (e.g. run, lumi) always rethrow.
168  if(not isEvent) {
169  return true;
170  }
171  try {
172  convertException::wrap([&]() {
173  std::rethrow_exception(iPtr);
174  });
175  } catch(cms::Exception &ex) {
177 
178  if(action == exception_actions::Rethrow) {
179  return true;
180  }
181 
182  ModuleCallingContext tempContext(&description(),ModuleCallingContext::State::kInvalid, parentContext, nullptr);
183 
184  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
185  // as IgnoreCompletely, so any subsequent OutputModules are still run.
186  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
187  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
188  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
189  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
190 
191  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
192  action == exception_actions::FailPath) {
194  }
195  }
197  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
198  return false;
199  }
200  }
201  return true;
202  }
203 
204 
205  void Worker::prefetchAsync(WaitingTask* iTask, ParentContext const& parentContext, Principal const& iPrincipal) {
206  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
207  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFromEvent();
208 
210 
211  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
212 
213  //Need to be sure the ref count isn't set to 0 immediately
214  iTask->increment_ref_count();
215  for(auto const& item : items) {
216  ProductResolverIndex productResolverIndex = item.productResolverIndex();
217  bool skipCurrentProcess = item.skipCurrentProcess();
218  if(productResolverIndex != ProductResolverIndexAmbiguous) {
219  iPrincipal.prefetchAsync(iTask,productResolverIndex, skipCurrentProcess, &moduleCallingContext_);
220  }
221  }
222 
224 
225  if(0 == iTask->decrement_ref_count()) {
226  //if everything finishes before we leave this routine, we need to launch the task
227  tbb::task::spawn(*iTask);
228  }
229  }
230 
232  earlyDeleteHelper_=iHelper;
233  }
234 
239  }
240 
242  try {
243  convertException::wrap([&]() {
244  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
245  implBeginJob();
246  });
247  }
248  catch(cms::Exception& ex) {
249  state_ = Exception;
250  std::ostringstream ost;
251  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
252  ex.addContext(ost.str());
253  throw;
254  }
255  }
256 
257  void Worker::endJob() {
258  try {
259  convertException::wrap([&]() {
260  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
261  implEndJob();
262  });
263  }
264  catch(cms::Exception& ex) {
265  state_ = Exception;
266  std::ostringstream ost;
267  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
268  ex.addContext(ost.str());
269  throw;
270  }
271  }
272 
273  void Worker::beginStream(StreamID id, StreamContext& streamContext) {
274  try {
275  convertException::wrap([&]() {
277  streamContext.setEventID(EventID(0, 0, 0));
278  streamContext.setRunIndex(RunIndex::invalidRunIndex());
280  streamContext.setTimestamp(Timestamp());
281  ParentContext parentContext(&streamContext);
282  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
284  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
285  implBeginStream(id);
286  });
287  }
288  catch(cms::Exception& ex) {
289  state_ = Exception;
290  std::ostringstream ost;
291  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
292  ex.addContext(ost.str());
293  throw;
294  }
295  }
296 
297  void Worker::endStream(StreamID id, StreamContext& streamContext) {
298  try {
299  convertException::wrap([&]() {
301  streamContext.setEventID(EventID(0, 0, 0));
302  streamContext.setRunIndex(RunIndex::invalidRunIndex());
304  streamContext.setTimestamp(Timestamp());
305  ParentContext parentContext(&streamContext);
306  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
308  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
309  implEndStream(id);
310  });
311  }
312  catch(cms::Exception& ex) {
313  state_ = Exception;
314  std::ostringstream ost;
315  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
316  ex.addContext(ost.str());
317  throw;
318  }
319  }
320 
322  if( 0 == --numberOfPathsLeftToRun_) {
324  }
325  }
326 
328  if(earlyDeleteHelper_) {
329  earlyDeleteHelper_->moduleRan(iEvent);
330  }
331  }
332 }
std::string const & pathName() const
Definition: PathContext.h:37
GlobalContext const * globalContext() const
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:235
void skipOnPath()
Definition: Worker.cc:321
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:69
ModuleDescription const & description() const
Definition: Worker.h:132
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:399
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
ModuleCallingContext const * moduleCallingContext() const
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:231
ModuleCallingContext const * getTopModuleCallingContext() const
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:99
virtual ~Worker()
Definition: Worker.cc:91
void endJob()
Definition: Worker.cc:257
InternalContext const * internalContext() const
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFromEvent() const =0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:406
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
ModuleCallingContext const * moduleCallingContext() const
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:273
void exceptionContext(std::ostream &, GlobalContext const &)
#define nullptr
ExceptionToActionTable const * actions_
Definition: Worker.h:403
std::string const & moduleLabel() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:94
ParentContext const & parent() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:401
void setTransition(Transition v)
Definition: StreamContext.h:65
std::exception_ptr cached_exception_
Definition: Worker.h:404
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
bool isEndPath() const
Definition: PathContext.h:42
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:230
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:157
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:410
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:68
PathContext const * pathContext() const
void prefetchAsync(WaitingTask *waitTask, ProductResolverIndex index, bool skipCurrentProcess, ModuleCallingContext const *mcc) const
Definition: Principal.cc:508
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
ModuleDescription const * moduleDescription() const
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:72
std::atomic< State > state_
Definition: Worker.h:397
virtual void implEndJob()=0
StreamContext const * streamContext() const
static LuminosityBlockIndex invalidLuminosityBlockIndex()
virtual void implBeginJob()=0
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:205
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
virtual void implEndStream(StreamID)=0
StreamContext const * streamContext() const
Definition: PathContext.h:38
void setEventID(EventID const &v)
Definition: StreamContext.h:66
ModuleCallingContext const * previousModuleOnThread() const
HLT enums.
double a
Definition: hdecay.h:121
void beginJob()
Definition: Worker.cc:241
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:297
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:327
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:408
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
PlaceInPathContext const * placeInPathContext() const
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:67
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)