CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Worker.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Worker_h
2 #define FWCore_Framework_Worker_h
3 
4 /*----------------------------------------------------------------------
5 
6 Worker: this is a basic scheduling unit - an abstract base class to
7 something that is really a producer or filter.
8 
9 A worker will not actually call through to the module unless it is
10 in a Ready state. After a module is actually run, the state will not
11 be Ready. The Ready state can only be reestablished by doing a reset().
12 
13 Pre/post module signals are posted only in the Ready state.
14 
15 Execution statistics are kept here.
16 
17 If a module has thrown an exception during execution, that exception
18 will be rethrown if the worker is entered again and the state is not Ready.
19 In other words, execution results (status) are cached and reused until
20 the worker is reset().
21 
22 ----------------------------------------------------------------------*/
23 
31 
32 #include "boost/shared_ptr.hpp"
33 #include "boost/utility.hpp"
34 
37 
38 namespace edm {
39 
40  class Worker : private boost::noncopyable {
41  public:
42  enum State { Ready, Pass, Fail, Exception };
43 
44  Worker(ModuleDescription const& iMD, WorkerParams const& iWP);
45  virtual ~Worker();
46 
47  template <typename T>
48  bool doWork(typename T::MyPrincipal&, EventSetup const& c,
49  CurrentProcessingContext const* cpc,
50  CPUTimer *const timer);
51  void beginJob() ;
52  void endJob();
57 
59  void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
60 
61  void reset() { state_ = Ready; }
62 
63  ModuleDescription const& description() const {return md_;}
64  ModuleDescription const* descPtr() const {return &md_; }
67  void setActivityRegistry(boost::shared_ptr<ActivityRegistry> areg);
68 
69  std::pair<double, double> timeCpuReal() const {
70  return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
71  }
72 
73  void clearCounters() {
75  }
76 
77  void useStopwatch();
78 
79  int timesRun() const { return timesRun_; }
80  int timesVisited() const { return timesVisited_; }
81  int timesPassed() const { return timesPassed_; }
82  int timesFailed() const { return timesFailed_; }
83  int timesExcept() const { return timesExcept_; }
84  State state() const { return state_; }
85 
86  int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
87 
88  protected:
89  virtual std::string workerType() const = 0;
90  virtual bool implDoBegin(EventPrincipal&, EventSetup const& c,
91  CurrentProcessingContext const* cpc) = 0;
92  virtual bool implDoEnd(EventPrincipal&, EventSetup const& c,
93  CurrentProcessingContext const* cpc) = 0;
94  virtual bool implDoBegin(RunPrincipal& rp, EventSetup const& c,
95  CurrentProcessingContext const* cpc) = 0;
96  virtual bool implDoEnd(RunPrincipal& rp, EventSetup const& c,
97  CurrentProcessingContext const* cpc) = 0;
98  virtual bool implDoBegin(LuminosityBlockPrincipal& lbp, EventSetup const& c,
99  CurrentProcessingContext const* cpc) = 0;
100  virtual bool implDoEnd(LuminosityBlockPrincipal& lbp, EventSetup const& c,
101  CurrentProcessingContext const* cpc) = 0;
102  virtual void implBeginJob() = 0;
103  virtual void implEndJob() = 0;
104 
105  private:
106  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
107  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
108  virtual void implRespondToOpenOutputFiles(FileBlock const& fb) = 0;
109  virtual void implRespondToCloseOutputFiles(FileBlock const& fb) = 0;
110 
111  virtual void implPreForkReleaseResources() = 0;
112  virtual void implPostForkReacquireResources(unsigned int iChildIndex,
113  unsigned int iNumberOfChildren) = 0;
115 
122 
124  ActionTable const* actions_; // memory assumed to be managed elsewhere
125  boost::shared_ptr<edm::Exception> cached_exception_; // if state is 'exception'
126 
127  boost::shared_ptr<ActivityRegistry> actReg_;
128  };
129 
130  namespace {
131  template <typename T>
132  class ModuleSignalSentry {
133  public:
134  ModuleSignalSentry(ActivityRegistry *a, ModuleDescription& md) : a_(a), md_(&md) {
135  if(a_) T::preModuleSignal(a_, md_);
136  }
137  ~ModuleSignalSentry() {
138  if(a_) T::postModuleSignal(a_, md_);
139  }
140  private:
141  ActivityRegistry* a_;
142  ModuleDescription* md_;
143  };
144 
145  template <typename T>
146  cms::Exception& exceptionContext(ModuleDescription const& iMD,
147  T const& ip,
148  cms::Exception& iEx) {
149  iEx << iMD.moduleName() << "/" << iMD.moduleLabel()
150  << " " << ip.id() << "\n";
151  return iEx;
152  }
153  }
154 
155  template <typename T>
156  bool Worker::doWork(typename T::MyPrincipal& ep,
157  EventSetup const& es,
158  CurrentProcessingContext const* cpc,
159  CPUTimer* const iTimer) {
160 
161  // A RunStopwatch, but only if we are processing an event.
163  iTimer);
164 
165  if (T::isEvent_) {
166  ++timesVisited_;
167  }
168  bool rc = false;
169 
170  switch(state_) {
171  case Ready: break;
172  case Pass: return true;
173  case Fail: return false;
174  case Exception: {
175  // rethrow the cached exception again
176  // It seems impossible to
177  // get here a second time until a cms::Exception has been
178  // thrown prviously.
179  LogWarning("repeat") << "A module has been invoked a second "
180  << "time even though it caught an "
181  << "exception during the previous "
182  << "invocation.\n"
183  << "This may be an indication of a "
184  << "configuration problem.\n";
185 
186  cached_exception_->raise();
187  }
188  }
189 
190  if (T::isEvent_) ++timesRun_;
191 
192  try {
193 
194  ModuleSignalSentry<T> cpp(actReg_.get(), md_);
195  if (T::begin_) {
196  rc = implDoBegin(ep, es, cpc);
197  } else {
198  rc = implDoEnd(ep, es, cpc);
199  }
200 
201  if (rc) {
202  state_ = Pass;
203  if (T::isEvent_) ++timesPassed_;
204  } else {
205  state_ = Fail;
206  if (T::isEvent_) ++timesFailed_;
207  }
208  }
209 
210  catch(cms::Exception& e) {
211 
212  // NOTE: the warning printed as a result of ignoring or failing
213  // a module will only be printed during the full true processing
214  // pass of this module
215 
216  // Get the action corresponding to this exception. However, if processing
217  // something other than an event (e.g. run, lumi) always rethrow.
219 
220  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
221  // as FailModule, so any subsequent OutputModules are still run.
222  // For unscheduled modules only treat FailPath as a FailModule but still allow SkipEvent to throw
223  if (cpc && cpc->isEndPath()) {
224  if ((action == actions::SkipEvent && !cpc->isUnscheduled()) ||
225  action == actions::FailPath) action = actions::FailModule;
226  }
227  switch(action) {
229  rc = true;
230  ++timesPassed_;
231  state_ = Pass;
232  LogWarning("IgnoreCompletely")
233  << "Module ignored an exception\n"
234  << e.what() << "\n";
235  break;
236  }
237 
238  case actions::FailModule: {
239  rc = true;
240  LogWarning("FailModule")
241  << "Module failed due to an exception\n"
242  << e.what() << "\n";
243  ++timesFailed_;
244  state_ = Fail;
245  break;
246  }
247 
248  default: {
249 
250  // we should not need to include the event/run/module names
251  // the exception because the error logger will pick this
252  // up automatically. I'm leaving it in until this is
253  // verified
254 
255  // here we simply add a small amount of data to the
256  // exception to add some context, we could have rethrown
257  // it as something else and embedded with this exception
258  // as an argument to the constructor.
259 
260  if (T::isEvent_) ++timesExcept_;
261  state_ = Exception;
262  e << "cms::Exception going through module ";
263  exceptionContext(md_, ep, e);
264  edm::Exception *edmEx = dynamic_cast<edm::Exception *>(&e);
265  if (edmEx) {
266  cached_exception_.reset(new edm::Exception(*edmEx));
267  } else {
268  cached_exception_.reset(new edm::Exception(errors::OtherCMS, std::string(), e));
269  }
270  throw;
271  }
272  }
273  }
274 
275  catch(std::bad_alloc& bda) {
276  if (T::isEvent_) ++timesExcept_;
277  state_ = Exception;
280  << "A std::bad_alloc exception occurred during a call to the module ";
281  exceptionContext(md_, ep, *cached_exception_)
282  << "The job has probably exhausted the virtual memory available to the process.\n";
283  cached_exception_->raise();
284  }
285  catch(std::exception& e) {
286  if (T::isEvent_) ++timesExcept_;
287  state_ = Exception;
290  << "A std::exception occurred during a call to the module ";
291  exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n"
292  << "Previous information:\n" << e.what();
293  cached_exception_->raise();
294  }
295  catch(std::string& s) {
296  if (T::isEvent_) ++timesExcept_;
297  state_ = Exception;
298  cached_exception_.reset(new edm::Exception(errors::BadExceptionType, "std::string"));
300  << "A std::string thrown as an exception occurred during a call to the module ";
301  exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n"
302  << "Previous information:\n string = " << s;
303  cached_exception_->raise();
304  }
305  catch(char const* c) {
306  if (T::isEvent_) ++timesExcept_;
307  state_ = Exception;
308  cached_exception_.reset(new edm::Exception(errors::BadExceptionType, "const char *"));
310  << "A const char* thrown as an exception occurred during a call to the module ";
311  exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n"
312  << "Previous information:\n const char* = " << c << "\n";
313  cached_exception_->raise();
314  }
315  catch(...) {
316  if (T::isEvent_) ++timesExcept_;
317  state_ = Exception;
318  cached_exception_.reset(new edm::Exception(errors::Unknown, "repeated"));
320  << "An unknown occurred during a previous call to the module ";
321  exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n";
322  cached_exception_->raise();
323  }
324 
325  return rc;
326  }
327 
328 }
329 #endif
virtual char const * what() const
Definition: Exception.cc:97
ModuleDescription const & description() const
Definition: Worker.h:63
bool isEndPath() const
Return true if the path is an end path, and false otherwise.
virtual void implPostForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)=0
virtual ~Worker()
Definition: Worker.cc:59
std::string rootCause() const
Definition: Exception.cc:78
void endJob()
Definition: Worker.cc:138
virtual void implRespondToOpenOutputFiles(FileBlock const &fb)=0
boost::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:127
int timesPassed() const
Definition: Worker.h:81
void setActivityRegistry(boost::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:62
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
State state() const
Definition: Worker.h:84
void respondToOpenOutputFiles(FileBlock const &fb)
Definition: Worker.h:55
void clearCounters()
Definition: Worker.h:73
virtual void implPreForkReleaseResources()=0
virtual std::string workerType() const =0
virtual bool implDoEnd(EventPrincipal &, EventSetup const &c, CurrentProcessingContext const *cpc)=0
std::pair< double, double > timeCpuReal() const
Definition: Worker.h:69
bool isUnscheduled() const
Returns true if the module is being called via unscheduled execution.
int timesExcept() const
Definition: Worker.h:83
int timesExcept_
Definition: Worker.h:120
virtual bool implDoBegin(EventPrincipal &, EventSetup const &c, CurrentProcessingContext const *cpc)=0
boost::shared_ptr< CPUTimer > StopwatchPointer
Definition: RunStopwatch.h:23
void reset()
Definition: Worker.h:61
int timesVisited() const
Definition: Worker.h:80
virtual void implRespondToCloseOutputFiles(FileBlock const &fb)=0
int timesPassed_
Definition: Worker.h:118
ModuleDescription const * descPtr() const
Definition: Worker.h:64
void useStopwatch()
Definition: Worker.cc:210
int timesRun() const
Definition: Worker.h:79
virtual void implEndJob()=0
void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren)
Definition: Worker.h:59
void respondToCloseOutputFiles(FileBlock const &fb)
Definition: Worker.h:56
virtual void implBeginJob()=0
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.h:53
int timesRun_
Definition: Worker.h:116
int timesPass() const
Definition: Worker.h:86
ActionTable const * actions_
Definition: Worker.h:124
int timesFailed() const
Definition: Worker.h:82
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.h:54
Worker(ModuleDescription const &iMD, WorkerParams const &iWP)
Definition: Worker.cc:43
void preForkReleaseResources()
Definition: Worker.h:58
ModuleDescription md_
Definition: Worker.h:123
double a
Definition: hdecay.h:121
void beginJob()
Definition: Worker.cc:66
boost::shared_ptr< edm::Exception > cached_exception_
Definition: Worker.h:125
RunStopwatch::StopwatchPointer stopwatch_
Definition: Worker.h:114
actions::ActionCodes find(const std::string &category) const
Definition: Actions.cc:95
int timesFailed_
Definition: Worker.h:119
string s
Definition: asciidump.py:422
bool doWork(typename T::MyPrincipal &, EventSetup const &c, CurrentProcessingContext const *cpc, CPUTimer *const timer)
Definition: Worker.h:156
long double T
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
State state_
Definition: Worker.h:121
int timesVisited_
Definition: Worker.h:117