CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/FWCore/Framework/src/Worker.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_Worker_h
00002 #define FWCore_Framework_Worker_h
00003 
00004 /*----------------------------------------------------------------------
00005 
00006 Worker: this is a basic scheduling unit - an abstract base class to
00007 something that is really a producer or filter.
00008 
00009 A worker will not actually call through to the module unless it is
00010 in a Ready state.  After a module is actually run, the state will not
00011 be Ready.  The Ready state can only be reestablished by doing a reset().
00012 
00013 Pre/post module signals are posted only in the Ready state.
00014 
00015 Execution statistics are kept here.
00016 
00017 If a module has thrown an exception during execution, that exception
00018 will be rethrown if the worker is entered again and the state is not Ready.
00019 In other words, execution results (status) are cached and reused until
00020 the worker is reset().
00021 
00022 ----------------------------------------------------------------------*/
00023 
00024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00025 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00026 #include "FWCore/Framework/src/WorkerParams.h"
00027 #include "FWCore/Framework/interface/Actions.h"
00028 #include "FWCore/Framework/interface/CurrentProcessingContext.h"
00029 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00030 #include "FWCore/Utilities/interface/Exception.h"
00031 
00032 #include "boost/shared_ptr.hpp"
00033 #include "boost/utility.hpp"
00034 
00035 #include "FWCore/Framework/src/RunStopwatch.h"
00036 #include "FWCore/Framework/interface/Frameworkfwd.h"
00037 
00038 namespace edm {
00039 
00040   class Worker : private boost::noncopyable {
00041   public:
00042     enum State { Ready, Pass, Fail, Exception };
00043 
00044     Worker(ModuleDescription const& iMD, WorkerParams const& iWP);
00045     virtual ~Worker();
00046 
00047     template <typename T>
00048     bool doWork(typename T::MyPrincipal&, EventSetup const& c,
00049                 CurrentProcessingContext const* cpc,
00050                 CPUTimer *const timer);
00051     void beginJob() ;
00052     void endJob();
00053     void respondToOpenInputFile(FileBlock const& fb) {implRespondToOpenInputFile(fb);}
00054     void respondToCloseInputFile(FileBlock const& fb) {implRespondToCloseInputFile(fb);}
00055     void respondToOpenOutputFiles(FileBlock const& fb) {implRespondToOpenOutputFiles(fb);}
00056     void respondToCloseOutputFiles(FileBlock const& fb) {implRespondToCloseOutputFiles(fb);}
00057 
00058     void preForkReleaseResources() {implPreForkReleaseResources();}
00059     void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
00060 
00061     void reset() { state_ = Ready; }
00062 
00063     ModuleDescription const& description() const {return md_;}
00064     ModuleDescription const* descPtr() const {return &md_; }
00067     void setActivityRegistry(boost::shared_ptr<ActivityRegistry> areg);
00068 
00069     std::pair<double, double> timeCpuReal() const {
00070       return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
00071     }
00072 
00073     void clearCounters() {
00074       timesRun_ = timesVisited_ = timesPassed_ = timesFailed_ = timesExcept_ = 0;
00075     }
00076     
00077     void useStopwatch();
00078 
00079     int timesRun() const { return timesRun_; }
00080     int timesVisited() const { return timesVisited_; }
00081     int timesPassed() const { return timesPassed_; }
00082     int timesFailed() const { return timesFailed_; }
00083     int timesExcept() const { return timesExcept_; }
00084     State state() const { return state_; }
00085 
00086     int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
00087 
00088   protected:
00089     virtual std::string workerType() const = 0;
00090     virtual bool implDoBegin(EventPrincipal&, EventSetup const& c,
00091                             CurrentProcessingContext const* cpc) = 0;
00092     virtual bool implDoEnd(EventPrincipal&, EventSetup const& c,
00093                             CurrentProcessingContext const* cpc) = 0;
00094     virtual bool implDoBegin(RunPrincipal& rp, EventSetup const& c,
00095                             CurrentProcessingContext const* cpc) = 0;
00096     virtual bool implDoEnd(RunPrincipal& rp, EventSetup const& c,
00097                             CurrentProcessingContext const* cpc) = 0;
00098     virtual bool implDoBegin(LuminosityBlockPrincipal& lbp, EventSetup const& c,
00099                             CurrentProcessingContext const* cpc) = 0;
00100     virtual bool implDoEnd(LuminosityBlockPrincipal& lbp, EventSetup const& c,
00101                             CurrentProcessingContext const* cpc) = 0;
00102     virtual void implBeginJob() = 0;
00103     virtual void implEndJob() = 0;
00104 
00105   private:
00106     virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
00107     virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
00108     virtual void implRespondToOpenOutputFiles(FileBlock const& fb) = 0;
00109     virtual void implRespondToCloseOutputFiles(FileBlock const& fb) = 0;
00110 
00111     virtual void implPreForkReleaseResources() = 0;
00112     virtual void implPostForkReacquireResources(unsigned int iChildIndex,
00113                                                unsigned int iNumberOfChildren) = 0;
00114     RunStopwatch::StopwatchPointer stopwatch_;
00115 
00116     int timesRun_;
00117     int timesVisited_;
00118     int timesPassed_;
00119     int timesFailed_;
00120     int timesExcept_;
00121     State state_;
00122 
00123     ModuleDescription md_;
00124     ActionTable const* actions_; // memory assumed to be managed elsewhere
00125     boost::shared_ptr<edm::Exception> cached_exception_; // if state is 'exception'
00126 
00127     boost::shared_ptr<ActivityRegistry> actReg_;
00128   };
00129 
00130   namespace {
00131     template <typename T>
00132     class ModuleSignalSentry {
00133     public:
00134       ModuleSignalSentry(ActivityRegistry *a, ModuleDescription& md) : a_(a), md_(&md) {
00135         if(a_) T::preModuleSignal(a_, md_);
00136       }
00137       ~ModuleSignalSentry() {
00138         if(a_) T::postModuleSignal(a_, md_);
00139       }
00140     private:
00141       ActivityRegistry* a_;
00142       ModuleDescription* md_;
00143     };
00144 
00145     template <typename T>
00146     cms::Exception& exceptionContext(ModuleDescription const& iMD,
00147                                      T const& ip,
00148                                      cms::Exception& iEx) {
00149       iEx << iMD.moduleName() << "/" << iMD.moduleLabel()
00150         << " " << ip.id() << "\n";
00151       return iEx;
00152     }
00153   }
00154 
00155    template <typename T>
00156    bool Worker::doWork(typename T::MyPrincipal& ep, 
00157                        EventSetup const& es,
00158                        CurrentProcessingContext const* cpc,
00159                        CPUTimer* const iTimer) {
00160 
00161     // A RunStopwatch, but only if we are processing an event.
00162     RunDualStopwatches stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer(),
00163                                  iTimer);
00164 
00165     if (T::isEvent_) {
00166       ++timesVisited_;
00167     }
00168     bool rc = false;
00169 
00170     switch(state_) {
00171       case Ready: break;
00172       case Pass: return true;
00173       case Fail: return false;
00174       case Exception: {
00175           // rethrow the cached exception again
00176           // It seems impossible to
00177           // get here a second time until a cms::Exception has been
00178           // thrown prviously.
00179           LogWarning("repeat") << "A module has been invoked a second "
00180                                << "time even though it caught an "
00181                                << "exception during the previous "
00182                                << "invocation.\n"
00183                                << "This may be an indication of a "
00184                                << "configuration problem.\n";
00185 
00186           cached_exception_->raise();
00187       }
00188     }
00189 
00190     if (T::isEvent_) ++timesRun_;
00191 
00192     try {
00193 
00194         ModuleSignalSentry<T> cpp(actReg_.get(), md_);
00195         if (T::begin_) {
00196           rc = implDoBegin(ep, es, cpc);
00197         } else {
00198           rc = implDoEnd(ep, es, cpc);
00199         }
00200 
00201         if (rc) {
00202           state_ = Pass;
00203           if (T::isEvent_) ++timesPassed_;
00204         } else {
00205           state_ = Fail;
00206           if (T::isEvent_) ++timesFailed_;
00207         }
00208     }
00209 
00210     catch(cms::Exception& e) {
00211 
00212         // NOTE: the warning printed as a result of ignoring or failing
00213         // a module will only be printed during the full true processing
00214         // pass of this module
00215 
00216         // Get the action corresponding to this exception.  However, if processing
00217         // something other than an event (e.g. run, lumi) always rethrow.
00218         actions::ActionCodes action = (T::isEvent_ ? actions_->find(e.rootCause()) : actions::Rethrow);
00219 
00220         // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
00221         // as FailModule, so any subsequent OutputModules are still run.
00222         // For unscheduled modules only treat FailPath as a FailModule but still allow SkipEvent to throw
00223         if (cpc && cpc->isEndPath()) {
00224           if ((action == actions::SkipEvent && !cpc->isUnscheduled()) ||
00225                action == actions::FailPath) action = actions::FailModule;
00226         }
00227         switch(action) {
00228           case actions::IgnoreCompletely: {
00229               rc = true;
00230               ++timesPassed_;
00231               state_ = Pass;
00232               LogWarning("IgnoreCompletely")
00233                 << "Module ignored an exception\n"
00234                 << e.what() << "\n";
00235               break;
00236           }
00237 
00238           case actions::FailModule: {
00239               rc = true;
00240               LogWarning("FailModule")
00241                 << "Module failed due to an exception\n"
00242                 << e.what() << "\n";
00243               ++timesFailed_;
00244               state_ = Fail;
00245               break;
00246           }
00247 
00248           default: {
00249 
00250               // we should not need to include the event/run/module names
00251               // the exception because the error logger will pick this
00252               // up automatically.  I'm leaving it in until this is
00253               // verified
00254 
00255               // here we simply add a small amount of data to the
00256               // exception to add some context, we could have rethrown
00257               // it as something else and embedded with this exception
00258               // as an argument to the constructor.
00259 
00260               if (T::isEvent_) ++timesExcept_;
00261               state_ = Exception;
00262               e << "cms::Exception going through module ";
00263               exceptionContext(md_, ep, e);
00264               edm::Exception *edmEx = dynamic_cast<edm::Exception *>(&e);
00265               if (edmEx) {
00266                 cached_exception_.reset(new edm::Exception(*edmEx));
00267               } else {
00268                 cached_exception_.reset(new edm::Exception(errors::OtherCMS, std::string(), e));
00269               }
00270               throw;
00271           }
00272         }
00273       }
00274 
00275     catch(std::bad_alloc& bda) {
00276         if (T::isEvent_) ++timesExcept_;
00277         state_ = Exception;
00278         cached_exception_.reset(new edm::Exception(errors::BadAlloc));
00279         *cached_exception_
00280           << "A std::bad_alloc exception occurred during a call to the module ";
00281         exceptionContext(md_, ep, *cached_exception_)
00282           << "The job has probably exhausted the virtual memory available to the process.\n";
00283         cached_exception_->raise();
00284     }
00285     catch(std::exception& e) {
00286         if (T::isEvent_) ++timesExcept_;
00287         state_ = Exception;
00288         cached_exception_.reset(new edm::Exception(errors::StdException));
00289         *cached_exception_
00290           << "A std::exception occurred during a call to the module ";
00291         exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n"
00292           << "Previous information:\n" << e.what();
00293         cached_exception_->raise();
00294     }
00295     catch(std::string& s) {
00296         if (T::isEvent_) ++timesExcept_;
00297         state_ = Exception;
00298         cached_exception_.reset(new edm::Exception(errors::BadExceptionType, "std::string"));
00299         *cached_exception_
00300           << "A std::string thrown as an exception occurred during a call to the module ";
00301         exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n"
00302           << "Previous information:\n string = " << s;
00303         cached_exception_->raise();
00304     }
00305     catch(char const* c) {
00306         if (T::isEvent_) ++timesExcept_;
00307         state_ = Exception;
00308         cached_exception_.reset(new edm::Exception(errors::BadExceptionType, "const char *"));
00309         *cached_exception_
00310           << "A const char* thrown as an exception occurred during a call to the module ";
00311         exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n"
00312           << "Previous information:\n const char* = " << c << "\n";
00313         cached_exception_->raise();
00314     }
00315     catch(...) {
00316         if (T::isEvent_) ++timesExcept_;
00317         state_ = Exception;
00318         cached_exception_.reset(new edm::Exception(errors::Unknown, "repeated"));
00319         *cached_exception_
00320           << "An unknown occurred during a previous call to the module ";
00321         exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n";
00322         cached_exception_->raise();
00323     }
00324 
00325     return rc;
00326   }
00327 
00328 }
00329 #endif