CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
31 
32 #include "boost/shared_ptr.hpp"
33 #include "boost/utility.hpp"
34 
37 
38 namespace edm {
39 
40  class Worker : private boost::noncopyable {
41  public:
42  enum State { Ready, Pass, Fail, Exception };
43 
44  Worker(ModuleDescription const& iMD, WorkerParams const& iWP);
45  virtual ~Worker();
46 
47  template <typename T>
48  bool doWork(typename T::MyPrincipal&, EventSetup const& c,
49  CurrentProcessingContext const* cpc,
50  CPUTimer *const timer);
51  void beginJob() ;
52  void endJob();
57 
59  void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
60 
61  void reset() { state_ = Ready; }
62 
63  ModuleDescription const& description() const {return md_;}
64  ModuleDescription const* descPtr() const {return &md_; }
67  void setActivityRegistry(boost::shared_ptr<ActivityRegistry> areg);
68 
69  std::pair<double, double> timeCpuReal() const {
70  return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
71  }
72 
73  void clearCounters() {
75  }
76 
77  void useStopwatch();
78 
79  int timesRun() const { return timesRun_; }
80  int timesVisited() const { return timesVisited_; }
81  int timesPassed() const { return timesPassed_; }
82  int timesFailed() const { return timesFailed_; }
83  int timesExcept() const { return timesExcept_; }
84  State state() const { return state_; }
85 
86  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
87 
88  protected:
89  virtual std::string workerType() const = 0;
90  virtual bool implDoBegin(EventPrincipal&, EventSetup const& c,
91  CurrentProcessingContext const* cpc) = 0;
92  virtual bool implDoEnd(EventPrincipal&, EventSetup const& c,
93  CurrentProcessingContext const* cpc) = 0;
94  virtual bool implDoBegin(RunPrincipal& rp, EventSetup const& c,
95  CurrentProcessingContext const* cpc) = 0;
96  virtual bool implDoEnd(RunPrincipal& rp, EventSetup const& c,
97  CurrentProcessingContext const* cpc) = 0;
98  virtual bool implDoBegin(LuminosityBlockPrincipal& lbp, EventSetup const& c,
99  CurrentProcessingContext const* cpc) = 0;
100  virtual bool implDoEnd(LuminosityBlockPrincipal& lbp, EventSetup const& c,
101  CurrentProcessingContext const* cpc) = 0;
102  virtual void implBeginJob() = 0;
103  virtual void implEndJob() = 0;
104 
105  private:
106  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
107  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
108  virtual void implRespondToOpenOutputFiles(FileBlock const& fb) = 0;
109  virtual void implRespondToCloseOutputFiles(FileBlock const& fb) = 0;
110 
111  virtual void implPreForkReleaseResources() = 0;
112  virtual void implPostForkReacquireResources(unsigned int iChildIndex,
113  unsigned int iNumberOfChildren) = 0;
115 
122 
124  ActionTable const* actions_; // memory assumed to be managed elsewhere
125  boost::shared_ptr<edm::Exception> cached_exception_; // if state is 'exception'
126 
127  boost::shared_ptr<ActivityRegistry> actReg_;
128  };
129 
130  namespace {
131  template <typename T>
132  class ModuleSignalSentry {
133  public:
134  ModuleSignalSentry(ActivityRegistry *a, ModuleDescription& md) : a_(a), md_(&md) {
135  if(a_) T::preModuleSignal(a_, md_);
136  }
137  ~ModuleSignalSentry() {
138  if(a_) T::postModuleSignal(a_, md_);
139  }
140  private:
141  ActivityRegistry* a_;
142  ModuleDescription* md_;
143  };
144 
145  template <typename T>
146  cms::Exception& exceptionContext(ModuleDescription const& iMD,
147  T const& ip,
148  cms::Exception& iEx) {
149  iEx << iMD.moduleName() << "/" << iMD.moduleLabel()
150  << " " << ip.id() << "\n";
151  return iEx;
152  }
153 
154  cms::Exception& exceptionContext(ModuleDescription const& iMD,
155  cms::Exception& iEx) {
156  iEx << iMD.moduleName() << "/" << iMD.moduleLabel() << "\n";
157  return iEx;
158  }
159  }
160 
161  template <typename T>
162  bool Worker::doWork(typename T::MyPrincipal& ep,
163  EventSetup const& es,
164  CurrentProcessingContext const* cpc,
165  CPUTimer* const iTimer) {
166 
167  // A RunStopwatch, but only if we are processing an event.
169  iTimer);
170 
171  if (T::isEvent_) {
172  ++timesVisited_;
173  }
174  bool rc = false;
175 
176  switch(state_) {
177  case Ready: break;
178  case Pass: return true;
179  case Fail: return false;
180  case Exception: {
181  // rethrow the cached exception again
182  // It seems impossible to
183  // get here a second time until a cms::Exception has been
184  // thrown prviously.
185  LogWarning("repeat") << "A module has been invoked a second "
186  << "time even though it caught an "
187  << "exception during the previous "
188  << "invocation.\n"
189  << "This may be an indication of a "
190  << "configuration problem.\n";
191 
192  cached_exception_->raise();
193  }
194  }
195 
196  if (T::isEvent_) ++timesRun_;
197 
198  try {
199 
200  ModuleSignalSentry<T> cpp(actReg_.get(), md_);
201  if (T::begin_) {
202  rc = implDoBegin(ep, es, cpc);
203  } else {
204  rc = implDoEnd(ep, es, cpc);
205  }
206 
207  if (rc) {
208  state_ = Pass;
209  if (T::isEvent_) ++timesPassed_;
210  } else {
211  state_ = Fail;
212  if (T::isEvent_) ++timesFailed_;
213  }
214  }
215 
216  catch(cms::Exception& e) {
217 
218  // NOTE: the warning printed as a result of ignoring or failing
219  // a module will only be printed during the full true processing
220  // pass of this module
221 
222  // Get the action corresponding to this exception. However, if processing
223  // something other than an event (e.g. run, lumi) always rethrow.
225 
226  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
227  // as FailModule, so any subsequent OutputModules are still run.
228  // For unscheduled modules only treat FailPath as a FailModule but still allow SkipEvent to throw
229  if (cpc && cpc->isEndPath()) {
230  if ((action == actions::SkipEvent && !cpc->isUnscheduled()) ||
231  action == actions::FailPath) action = actions::FailModule;
232  }
233  switch(action) {
235  rc = true;
236  ++timesPassed_;
237  state_ = Pass;
238  LogWarning("IgnoreCompletely")
239  << "Module ignored an exception\n"
240  << e.what() << "\n";
241  break;
242  }
243 
244  case actions::FailModule: {
245  rc = true;
246  LogWarning("FailModule")
247  << "Module failed due to an exception\n"
248  << e.what() << "\n";
249  ++timesFailed_;
250  state_ = Fail;
251  break;
252  }
253 
254  default: {
255 
256  // we should not need to include the event/run/module names
257  // the exception because the error logger will pick this
258  // up automatically. I'm leaving it in until this is
259  // verified
260 
261  // here we simply add a small amount of data to the
262  // exception to add some context, we could have rethrown
263  // it as something else and embedded with this exception
264  // as an argument to the constructor.
265 
266  if (T::isEvent_) ++timesExcept_;
267  state_ = Exception;
268  e << "cms::Exception going through module ";
269  exceptionContext(md_, ep, e);
270  edm::Exception *edmEx = dynamic_cast<edm::Exception *>(&e);
271  if (edmEx) {
272  cached_exception_.reset(new edm::Exception(*edmEx));
273  } else {
274  cached_exception_.reset(new edm::Exception(errors::OtherCMS, std::string(), e));
275  }
276  throw;
277  }
278  }
279  }
280 
281  catch(std::bad_alloc& bda) {
282  if (T::isEvent_) ++timesExcept_;
283  state_ = Exception;
286  << "A std::bad_alloc exception occurred during a call to the module ";
287  exceptionContext(md_, ep, *cached_exception_)
288  << "The job has probably exhausted the virtual memory available to the process.\n";
289  cached_exception_->raise();
290  }
291  catch(std::exception& e) {
292  if (T::isEvent_) ++timesExcept_;
293  state_ = Exception;
296  << "A std::exception occurred during a call to the module ";
297  exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n"
298  << "Previous information:\n" << e.what();
299  cached_exception_->raise();
300  }
301  catch(std::string& s) {
302  if (T::isEvent_) ++timesExcept_;
303  state_ = Exception;
304  cached_exception_.reset(new edm::Exception(errors::BadExceptionType, "std::string"));
306  << "A std::string thrown as an exception occurred during a call to the module ";
307  exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n"
308  << "Previous information:\n string = " << s;
309  cached_exception_->raise();
310  }
311  catch(char const* c) {
312  if (T::isEvent_) ++timesExcept_;
313  state_ = Exception;
314  cached_exception_.reset(new edm::Exception(errors::BadExceptionType, "const char *"));
316  << "A const char* thrown as an exception occurred during a call to the module ";
317  exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n"
318  << "Previous information:\n const char* = " << c << "\n";
319  cached_exception_->raise();
320  }
321  catch(...) {
322  if (T::isEvent_) ++timesExcept_;
323  state_ = Exception;
324  cached_exception_.reset(new edm::Exception(errors::Unknown, "repeated"));
326  << "An unknown occurred during a previous call to the module ";
327  exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n";
328  cached_exception_->raise();
329  }
330 
331  return rc;
332  }
333 
334 }
335 #endif
virtual char const * what() const
Definition: Exception.cc:97
ModuleDescription const & description() const
Definition: Worker.h:63
bool isEndPath() const
Return true if the path is an end path, and false otherwise.
virtual void implPostForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)=0
virtual ~Worker()
Definition: Worker.cc:53
std::string rootCause() const
Definition: Exception.cc:78
void endJob()
Definition: Worker.cc:132
virtual void implRespondToOpenOutputFiles(FileBlock const &fb)=0
boost::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:127
int timesPassed() const
Definition: Worker.h:81
void setActivityRegistry(boost::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:56
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
State state() const
Definition: Worker.h:84
void respondToOpenOutputFiles(FileBlock const &fb)
Definition: Worker.h:55
void clearCounters()
Definition: Worker.h:73
virtual void implPreForkReleaseResources()=0
virtual std::string workerType() const =0
virtual bool implDoEnd(EventPrincipal &, EventSetup const &c, CurrentProcessingContext const *cpc)=0
std::pair< double, double > timeCpuReal() const
Definition: Worker.h:69
bool isUnscheduled() const
Returns true if the module is being called via unscheduled execution.
int timesExcept() const
Definition: Worker.h:83
int timesExcept_
Definition: Worker.h:120
virtual bool implDoBegin(EventPrincipal &, EventSetup const &c, CurrentProcessingContext const *cpc)=0
boost::shared_ptr< CPUTimer > StopwatchPointer
Definition: RunStopwatch.h:23
void reset()
Definition: Worker.h:61
int timesVisited() const
Definition: Worker.h:80
virtual void implRespondToCloseOutputFiles(FileBlock const &fb)=0
int timesPassed_
Definition: Worker.h:118
ModuleDescription const * descPtr() const
Definition: Worker.h:64
void useStopwatch()
Definition: Worker.cc:204
int timesRun() const
Definition: Worker.h:79
virtual void implEndJob()=0
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:59
void respondToCloseOutputFiles(FileBlock const &fb)
Definition: Worker.h:56
virtual void implBeginJob()=0
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:53
int timesRun_
Definition: Worker.h:116
int timesPass() const
Definition: Worker.h:86
ActionTable const * actions_
Definition: Worker.h:124
int timesFailed() const
Definition: Worker.h:82
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:54
Worker(ModuleDescription const &iMD, WorkerParams const &iWP)
Definition: Worker.cc:37
void preForkReleaseResources()
Definition: Worker.h:58
ModuleDescription md_
Definition: Worker.h:123
double a
Definition: hdecay.h:121
void beginJob()
Definition: Worker.cc:60
boost::shared_ptr< edm::Exception > cached_exception_
Definition: Worker.h:125
RunStopwatch::StopwatchPointer stopwatch_
Definition: Worker.h:114
actions::ActionCodes find(const std::string &category) const
Definition: Actions.cc:95
int timesFailed_
Definition: Worker.h:119
string s
Definition: asciidump.py:422
bool doWork(typename T::MyPrincipal &, EventSetup const &c, CurrentProcessingContext const *cpc, CPUTimer *const timer)
Definition: Worker.h:162
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
State state_
Definition: Worker.h:121
int timesVisited_
Definition: Worker.h:117