CMS 3D CMS Logo

Worker.cc
Go to the documentation of this file.
1 
2 /*----------------------------------------------------------------------
3 ----------------------------------------------------------------------*/
4 
9 
10 namespace edm {
11  namespace {
12  class ModuleBeginJobSignalSentry {
13 public:
14  ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md):a_(a), md_(&md) {
15  if(a_) a_->preModuleBeginJobSignal_(*md_);
16  }
17  ~ModuleBeginJobSignalSentry() {
18  if(a_) a_->postModuleBeginJobSignal_(*md_);
19  }
20 private:
21  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
22  ModuleDescription const* md_;
23  };
24 
25  class ModuleEndJobSignalSentry {
26 public:
27  ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md):a_(a), md_(&md) {
28  if(a_) a_->preModuleEndJobSignal_(*md_);
29  }
30  ~ModuleEndJobSignalSentry() {
31  if(a_) a_->postModuleEndJobSignal_(*md_);
32  }
33 private:
34  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
35  ModuleDescription const* md_;
36  };
37 
38  class ModuleBeginStreamSignalSentry {
39  public:
40  ModuleBeginStreamSignalSentry(ActivityRegistry* a,
41  StreamContext const& sc,
42  ModuleCallingContext const& mcc) : a_(a), sc_(sc), mcc_(mcc) {
43  if(a_) a_->preModuleBeginStreamSignal_(sc_, mcc_);
44  }
45  ~ModuleBeginStreamSignalSentry() {
46  if(a_) a_->postModuleBeginStreamSignal_(sc_, mcc_);
47  }
48  private:
49  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
50  StreamContext const& sc_;
51  ModuleCallingContext const& mcc_;
52  };
53 
54  class ModuleEndStreamSignalSentry {
55  public:
56  ModuleEndStreamSignalSentry(ActivityRegistry* a,
57  StreamContext const& sc,
58  ModuleCallingContext const& mcc) : a_(a), sc_(sc), mcc_(mcc) {
59  if(a_) a_->preModuleEndStreamSignal_(sc_, mcc_);
60  }
61  ~ModuleEndStreamSignalSentry() {
62  if(a_) a_->postModuleEndStreamSignal_(sc_, mcc_);
63  }
64  private:
65  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
66  StreamContext const& sc_;
67  ModuleCallingContext const& mcc_;
68  };
69 
70  }
71 
73  ExceptionToActionTable const* iActions) :
74  timesRun_(),
75  timesVisited_(),
76  timesPassed_(),
77  timesFailed_(),
78  timesExcept_(),
79  state_(Ready),
80  numberOfPathsOn_(0),
81  numberOfPathsLeftToRun_(0),
82  moduleCallingContext_(&iMD),
83  actions_(iActions),
84  cached_exception_(),
85  actReg_(),
86  earlyDeleteHelper_(nullptr),
87  workStarted_(false)
88  {
89  }
90 
92  }
93 
94  void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) {
95  actReg_ = areg;
96  }
97 
98 
100  bool iIsEvent,
101  cms::Exception& ex,
102  ModuleCallingContext const* mcc) {
103 
104  ModuleCallingContext const* imcc = mcc;
105  while(imcc->type() == ParentContext::Type::kModule) {
106  std::ostringstream iost;
107  iost << "Calling method for module "
108  << imcc->moduleDescription()->moduleName() << "/'"
109  << imcc->moduleDescription()->moduleLabel() << "'";
110  ex.addContext(iost.str());
111  imcc = imcc->moduleCallingContext();
112  }
113  if(imcc->type() == ParentContext::Type::kInternal) {
114  std::ostringstream iost;
115  iost << "Calling method for module "
116  << imcc->moduleDescription()->moduleName() << "/'"
117  << imcc->moduleDescription()->moduleLabel() << "' (probably inside some kind of mixing module)";
118  ex.addContext(iost.str());
119  imcc = imcc->internalContext()->moduleCallingContext();
120  }
121  while(imcc->type() == ParentContext::Type::kModule) {
122  std::ostringstream iost;
123  iost << "Calling method for module "
124  << imcc->moduleDescription()->moduleName() << "/'"
125  << imcc->moduleDescription()->moduleLabel() << "'";
126  ex.addContext(iost.str());
127  imcc = imcc->moduleCallingContext();
128  }
129  std::ostringstream ost;
130  if (iIsEvent) {
131  ost << "Calling event method";
132  }
133  else {
134  // It should be impossible to get here, because
135  // this function only gets called when the IgnoreCompletely
136  // exception behavior is active, which can only be true
137  // for events.
138  ost << "Calling unknown function";
139  }
140  ost << " for module " << imcc->moduleDescription()->moduleName() << "/'" << imcc->moduleDescription()->moduleLabel() << "'";
141  ex.addContext(ost.str());
142 
143  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
144  ost.str("");
145  ost << "Running path '";
146  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
147  ex.addContext(ost.str());
148  }
149  ost.str("");
150  ost << "Processing ";
151  ost << iID;
152  ex.addContext(ost.str());
153  }
154 
156  ParentContext const& parentContext,
157  bool isEvent,
158  TransitionIDValueBase const& iID) const {
159 
160  // NOTE: the warning printed as a result of ignoring or failing
161  // a module will only be printed during the full true processing
162  // pass of this module
163 
164  // Get the action corresponding to this exception. However, if processing
165  // something other than an event (e.g. run, lumi) always rethrow.
167 
168  if(action == exception_actions::Rethrow) {
169  return true;
170  }
171 
172  ModuleCallingContext tempContext(&description(),ModuleCallingContext::State::kInvalid, parentContext, nullptr);
173 
174  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
175  // as IgnoreCompletely, so any subsequent OutputModules are still run.
176  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
177  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
178  if(top_mcc->type() == ParentContext::Type::kPlaceInPath &&
179  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
180 
181  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
183  }
184  switch(action) {
186  {
187  exceptionContext(iID.value(), isEvent, ex, &tempContext);
188  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
189  return false;
190  break;
191  }
192  default:
193  return true;
194  }
195  }
196 
197 
198  void Worker::prefetchAsync(WaitingTask* iTask, ParentContext const& parentContext, Principal const& iPrincipal) {
199  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
200  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFromEvent();
201 
203 
204  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),moduleCallingContext_);
205 
206  //Need to be sure the ref count isn't set to 0 immediately
207  iTask->increment_ref_count();
208  for(auto const& item : items) {
209  ProductResolverIndex productResolverIndex = item.productResolverIndex();
210  bool skipCurrentProcess = item.skipCurrentProcess();
211  if(productResolverIndex != ProductResolverIndexAmbiguous) {
212  iPrincipal.prefetchAsync(iTask,productResolverIndex, skipCurrentProcess, &moduleCallingContext_);
213  }
214  }
215 
217 
218  if(0 == iTask->decrement_ref_count()) {
219  //if everything finishes before we leave this routine, we need to launch the task
220  tbb::task::spawn(*iTask);
221  }
222  }
223 
225  earlyDeleteHelper_=iHelper;
226  }
227 
232  }
233 
235  try {
236  convertException::wrap([&]() {
237  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
238  implBeginJob();
239  });
240  }
241  catch(cms::Exception& ex) {
242  state_ = Exception;
243  std::ostringstream ost;
244  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
245  ex.addContext(ost.str());
246  throw;
247  }
248  }
249 
250  void Worker::endJob() {
251  try {
252  convertException::wrap([&]() {
253  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
254  implEndJob();
255  });
256  }
257  catch(cms::Exception& ex) {
258  state_ = Exception;
259  std::ostringstream ost;
260  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
261  ex.addContext(ost.str());
262  throw;
263  }
264  }
265 
266  void Worker::beginStream(StreamID id, StreamContext& streamContext) {
267  try {
268  convertException::wrap([&]() {
270  streamContext.setEventID(EventID(0, 0, 0));
271  streamContext.setRunIndex(RunIndex::invalidRunIndex());
273  streamContext.setTimestamp(Timestamp());
274  ParentContext parentContext(&streamContext);
275  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
277  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
278  implBeginStream(id);
279  });
280  }
281  catch(cms::Exception& ex) {
282  state_ = Exception;
283  std::ostringstream ost;
284  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
285  ex.addContext(ost.str());
286  throw;
287  }
288  }
289 
290  void Worker::endStream(StreamID id, StreamContext& streamContext) {
291  try {
292  convertException::wrap([&]() {
294  streamContext.setEventID(EventID(0, 0, 0));
295  streamContext.setRunIndex(RunIndex::invalidRunIndex());
297  streamContext.setTimestamp(Timestamp());
298  ParentContext parentContext(&streamContext);
299  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
301  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
302  implEndStream(id);
303  });
304  }
305  catch(cms::Exception& ex) {
306  state_ = Exception;
307  std::ostringstream ost;
308  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
309  ex.addContext(ost.str());
310  throw;
311  }
312  }
313 
315  if( 0 == --numberOfPathsLeftToRun_) {
317  }
318  }
319 
321  if(earlyDeleteHelper_) {
322  earlyDeleteHelper_->moduleRan(iEvent);
323  }
324  }
325 }
std::string const & pathName() const
Definition: PathContext.h:37
bool shouldRethrowException(cms::Exception &ex, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:155
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:228
void skipOnPath()
Definition: Worker.cc:314
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:69
ModuleDescription const & description() const
Definition: Worker.h:123
virtual std::string value() const =0
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:391
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
ModuleCallingContext const * moduleCallingContext() const
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:224
ModuleCallingContext const * getTopModuleCallingContext() const
virtual ~Worker()
Definition: Worker.cc:91
void endJob()
Definition: Worker.cc:250
InternalContext const * internalContext() const
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFromEvent() const =0
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:398
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
ModuleCallingContext const * moduleCallingContext() const
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:266
#define nullptr
ExceptionToActionTable const * actions_
Definition: Worker.h:395
std::string const & moduleLabel() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:94
ParentContext const & parent() const
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:393
void setTransition(Transition v)
Definition: StreamContext.h:65
std::exception_ptr cached_exception_
Definition: Worker.h:396
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
bool isEndPath() const
Definition: PathContext.h:42
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:230
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:402
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:68
PathContext const * pathContext() const
void prefetchAsync(WaitingTask *waitTask, ProductResolverIndex index, bool skipCurrentProcess, ModuleCallingContext const *mcc) const
Definition: Principal.cc:508
ModuleDescription const * moduleDescription() const
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:72
std::atomic< State > state_
Definition: Worker.h:389
virtual void implEndJob()=0
static LuminosityBlockIndex invalidLuminosityBlockIndex()
virtual void implBeginJob()=0
void prefetchAsync(WaitingTask *, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:198
void addContext(std::string const &context)
Definition: Exception.cc:227
virtual void implBeginStream(StreamID)=0
virtual void implEndStream(StreamID)=0
void setEventID(EventID const &v)
Definition: StreamContext.h:66
ModuleCallingContext const * previousModuleOnThread() const
HLT enums.
static void exceptionContext(const std::string &iID, bool iIsEvent, cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:99
double a
Definition: hdecay.h:121
void beginJob()
Definition: Worker.cc:234
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:290
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:320
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:400
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
PlaceInPathContext const * placeInPathContext() const
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:67
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)