CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch13/src/FWCore/Framework/src/Worker.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_Worker_h
00002 #define FWCore_Framework_Worker_h
00003 
00004 /*----------------------------------------------------------------------
00005 
00006 Worker: this is a basic scheduling unit - an abstract base class to
00007 something that is really a producer or filter.
00008 
00009 A worker will not actually call through to the module unless it is
00010 in a Ready state.  After a module is actually run, the state will not
00011 be Ready.  The Ready state can only be reestablished by doing a reset().
00012 
00013 Pre/post module signals are posted only in the Ready state.
00014 
00015 Execution statistics are kept here.
00016 
00017 If a module has thrown an exception during execution, that exception
00018 will be rethrown if the worker is entered again and the state is not Ready.
00019 In other words, execution results (status) are cached and reused until
00020 the worker is reset().
00021 
00022 ----------------------------------------------------------------------*/
00023 
00024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00025 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00026 #include "FWCore/Framework/src/WorkerParams.h"
00027 #include "FWCore/Framework/interface/Actions.h"
00028 #include "FWCore/Framework/interface/CurrentProcessingContext.h"
00029 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00030 #include "FWCore/Utilities/interface/Exception.h"
00031 
00032 #include "boost/shared_ptr.hpp"
00033 #include "boost/utility.hpp"
00034 
00035 #include "FWCore/Framework/src/RunStopwatch.h"
00036 #include "FWCore/Framework/interface/Frameworkfwd.h"
00037 
00038 namespace edm {
00039 
00040   class Worker : private boost::noncopyable {
00041   public:
00042     enum State { Ready, Pass, Fail, Exception };
00043 
00044     Worker(ModuleDescription const& iMD, WorkerParams const& iWP);
00045     virtual ~Worker();
00046 
00047     template <typename T>
00048     bool doWork(typename T::MyPrincipal&, EventSetup const& c,
00049                 CurrentProcessingContext const* cpc,
00050                 CPUTimer *const timer);
00051     void beginJob() ;
00052     void endJob();
00053     void respondToOpenInputFile(FileBlock const& fb) {implRespondToOpenInputFile(fb);}
00054     void respondToCloseInputFile(FileBlock const& fb) {implRespondToCloseInputFile(fb);}
00055     void respondToOpenOutputFiles(FileBlock const& fb) {implRespondToOpenOutputFiles(fb);}
00056     void respondToCloseOutputFiles(FileBlock const& fb) {implRespondToCloseOutputFiles(fb);}
00057 
00058     void preForkReleaseResources() {implPreForkReleaseResources();}
00059     void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
00060 
00061     void reset() { state_ = Ready; }
00062 
00063     ModuleDescription const& description() const {return md_;}
00064     ModuleDescription const* descPtr() const {return &md_; }
00067     void setActivityRegistry(boost::shared_ptr<ActivityRegistry> areg);
00068 
00069     std::pair<double, double> timeCpuReal() const {
00070       return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
00071     }
00072 
00073     void clearCounters() {
00074       timesRun_ = timesVisited_ = timesPassed_ = timesFailed_ = timesExcept_ = 0;
00075     }
00076     
00077     void useStopwatch();
00078 
00079     int timesRun() const { return timesRun_; }
00080     int timesVisited() const { return timesVisited_; }
00081     int timesPassed() const { return timesPassed_; }
00082     int timesFailed() const { return timesFailed_; }
00083     int timesExcept() const { return timesExcept_; }
00084     State state() const { return state_; }
00085 
00086     int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
00087 
00088   protected:
00089     virtual std::string workerType() const = 0;
00090     virtual bool implDoBegin(EventPrincipal&, EventSetup const& c,
00091                             CurrentProcessingContext const* cpc) = 0;
00092     virtual bool implDoEnd(EventPrincipal&, EventSetup const& c,
00093                             CurrentProcessingContext const* cpc) = 0;
00094     virtual bool implDoBegin(RunPrincipal& rp, EventSetup const& c,
00095                             CurrentProcessingContext const* cpc) = 0;
00096     virtual bool implDoEnd(RunPrincipal& rp, EventSetup const& c,
00097                             CurrentProcessingContext const* cpc) = 0;
00098     virtual bool implDoBegin(LuminosityBlockPrincipal& lbp, EventSetup const& c,
00099                             CurrentProcessingContext const* cpc) = 0;
00100     virtual bool implDoEnd(LuminosityBlockPrincipal& lbp, EventSetup const& c,
00101                             CurrentProcessingContext const* cpc) = 0;
00102     virtual void implBeginJob() = 0;
00103     virtual void implEndJob() = 0;
00104 
00105   private:
00106     virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
00107     virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
00108     virtual void implRespondToOpenOutputFiles(FileBlock const& fb) = 0;
00109     virtual void implRespondToCloseOutputFiles(FileBlock const& fb) = 0;
00110 
00111     virtual void implPreForkReleaseResources() = 0;
00112     virtual void implPostForkReacquireResources(unsigned int iChildIndex,
00113                                                unsigned int iNumberOfChildren) = 0;
00114     RunStopwatch::StopwatchPointer stopwatch_;
00115 
00116     int timesRun_;
00117     int timesVisited_;
00118     int timesPassed_;
00119     int timesFailed_;
00120     int timesExcept_;
00121     State state_;
00122 
00123     ModuleDescription md_;
00124     ActionTable const* actions_; // memory assumed to be managed elsewhere
00125     boost::shared_ptr<edm::Exception> cached_exception_; // if state is 'exception'
00126 
00127     boost::shared_ptr<ActivityRegistry> actReg_;
00128   };
00129 
00130   namespace {
00131     template <typename T>
00132     class ModuleSignalSentry {
00133     public:
00134       ModuleSignalSentry(ActivityRegistry *a, ModuleDescription& md) : a_(a), md_(&md) {
00135         if(a_) T::preModuleSignal(a_, md_);
00136       }
00137       ~ModuleSignalSentry() {
00138         if(a_) T::postModuleSignal(a_, md_);
00139       }
00140     private:
00141       ActivityRegistry* a_;
00142       ModuleDescription* md_;
00143     };
00144 
00145     template <typename T>
00146     cms::Exception& exceptionContext(ModuleDescription const& iMD,
00147                                      T const& ip,
00148                                      cms::Exception& iEx) {
00149       iEx << iMD.moduleName() << "/" << iMD.moduleLabel()
00150         << " " << ip.id() << "\n";
00151       return iEx;
00152     }
00153 
00154     cms::Exception& exceptionContext(ModuleDescription const& iMD,
00155                                      cms::Exception& iEx) {
00156       iEx << iMD.moduleName() << "/" << iMD.moduleLabel() << "\n";
00157       return iEx;
00158     }
00159   }
00160 
00161    template <typename T>
00162    bool Worker::doWork(typename T::MyPrincipal& ep, 
00163                        EventSetup const& es,
00164                        CurrentProcessingContext const* cpc,
00165                        CPUTimer* const iTimer) {
00166 
00167     // A RunStopwatch, but only if we are processing an event.
00168     RunDualStopwatches stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer(),
00169                                  iTimer);
00170 
00171     if (T::isEvent_) {
00172       ++timesVisited_;
00173     }
00174     bool rc = false;
00175 
00176     switch(state_) {
00177       case Ready: break;
00178       case Pass: return true;
00179       case Fail: return false;
00180       case Exception: {
00181           // rethrow the cached exception again
00182           // It seems impossible to
00183           // get here a second time until a cms::Exception has been
00184           // thrown prviously.
00185           LogWarning("repeat") << "A module has been invoked a second "
00186                                << "time even though it caught an "
00187                                << "exception during the previous "
00188                                << "invocation.\n"
00189                                << "This may be an indication of a "
00190                                << "configuration problem.\n";
00191 
00192           cached_exception_->raise();
00193       }
00194     }
00195 
00196     if (T::isEvent_) ++timesRun_;
00197 
00198     try {
00199 
00200         ModuleSignalSentry<T> cpp(actReg_.get(), md_);
00201         if (T::begin_) {
00202           rc = implDoBegin(ep, es, cpc);
00203         } else {
00204           rc = implDoEnd(ep, es, cpc);
00205         }
00206 
00207         if (rc) {
00208           state_ = Pass;
00209           if (T::isEvent_) ++timesPassed_;
00210         } else {
00211           state_ = Fail;
00212           if (T::isEvent_) ++timesFailed_;
00213         }
00214     }
00215 
00216     catch(cms::Exception& e) {
00217 
00218         // NOTE: the warning printed as a result of ignoring or failing
00219         // a module will only be printed during the full true processing
00220         // pass of this module
00221 
00222         // Get the action corresponding to this exception.  However, if processing
00223         // something other than an event (e.g. run, lumi) always rethrow.
00224         actions::ActionCodes action = (T::isEvent_ ? actions_->find(e.rootCause()) : actions::Rethrow);
00225 
00226         // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
00227         // as FailModule, so any subsequent OutputModules are still run.
00228         // For unscheduled modules only treat FailPath as a FailModule but still allow SkipEvent to throw
00229         if (cpc && cpc->isEndPath()) {
00230           if ((action == actions::SkipEvent && !cpc->isUnscheduled()) ||
00231                action == actions::FailPath) action = actions::FailModule;
00232         }
00233         switch(action) {
00234           case actions::IgnoreCompletely: {
00235               rc = true;
00236               ++timesPassed_;
00237               state_ = Pass;
00238               LogWarning("IgnoreCompletely")
00239                 << "Module ignored an exception\n"
00240                 << e.what() << "\n";
00241               break;
00242           }
00243 
00244           case actions::FailModule: {
00245               rc = true;
00246               LogWarning("FailModule")
00247                 << "Module failed due to an exception\n"
00248                 << e.what() << "\n";
00249               ++timesFailed_;
00250               state_ = Fail;
00251               break;
00252           }
00253 
00254           default: {
00255 
00256               // we should not need to include the event/run/module names
00257               // the exception because the error logger will pick this
00258               // up automatically.  I'm leaving it in until this is
00259               // verified
00260 
00261               // here we simply add a small amount of data to the
00262               // exception to add some context, we could have rethrown
00263               // it as something else and embedded with this exception
00264               // as an argument to the constructor.
00265 
00266               if (T::isEvent_) ++timesExcept_;
00267               state_ = Exception;
00268               e << "cms::Exception going through module ";
00269               exceptionContext(md_, ep, e);
00270               edm::Exception *edmEx = dynamic_cast<edm::Exception *>(&e);
00271               if (edmEx) {
00272                 cached_exception_.reset(new edm::Exception(*edmEx));
00273               } else {
00274                 cached_exception_.reset(new edm::Exception(errors::OtherCMS, std::string(), e));
00275               }
00276               throw;
00277           }
00278         }
00279       }
00280 
00281     catch(std::bad_alloc& bda) {
00282         if (T::isEvent_) ++timesExcept_;
00283         state_ = Exception;
00284         cached_exception_.reset(new edm::Exception(errors::BadAlloc));
00285         *cached_exception_
00286           << "A std::bad_alloc exception occurred during a call to the module ";
00287         exceptionContext(md_, ep, *cached_exception_)
00288           << "The job has probably exhausted the virtual memory available to the process.\n";
00289         cached_exception_->raise();
00290     }
00291     catch(std::exception& e) {
00292         if (T::isEvent_) ++timesExcept_;
00293         state_ = Exception;
00294         cached_exception_.reset(new edm::Exception(errors::StdException));
00295         *cached_exception_
00296           << "A std::exception occurred during a call to the module ";
00297         exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n"
00298           << "Previous information:\n" << e.what();
00299         cached_exception_->raise();
00300     }
00301     catch(std::string& s) {
00302         if (T::isEvent_) ++timesExcept_;
00303         state_ = Exception;
00304         cached_exception_.reset(new edm::Exception(errors::BadExceptionType, "std::string"));
00305         *cached_exception_
00306           << "A std::string thrown as an exception occurred during a call to the module ";
00307         exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n"
00308           << "Previous information:\n string = " << s;
00309         cached_exception_->raise();
00310     }
00311     catch(char const* c) {
00312         if (T::isEvent_) ++timesExcept_;
00313         state_ = Exception;
00314         cached_exception_.reset(new edm::Exception(errors::BadExceptionType, "const char *"));
00315         *cached_exception_
00316           << "A const char* thrown as an exception occurred during a call to the module ";
00317         exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n"
00318           << "Previous information:\n const char* = " << c << "\n";
00319         cached_exception_->raise();
00320     }
00321     catch(...) {
00322         if (T::isEvent_) ++timesExcept_;
00323         state_ = Exception;
00324         cached_exception_.reset(new edm::Exception(errors::Unknown, "repeated"));
00325         *cached_exception_
00326           << "An unknown occurred during a previous call to the module ";
00327         exceptionContext(md_, ep, *cached_exception_) << "and cannot be repropagated.\n";
00328         cached_exception_->raise();
00329     }
00330 
00331     return rc;
00332   }
00333 
00334 }
00335 #endif