CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_4_5_patch3/src/FWCore/Framework/src/Worker.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_Worker_h
00002 #define FWCore_Framework_Worker_h
00003 
00004 /*----------------------------------------------------------------------
00005 
00006 Worker: this is a basic scheduling unit - an abstract base class to
00007 something that is really a producer or filter.
00008 
00009 A worker will not actually call through to the module unless it is
00010 in a Ready state.  After a module is actually run, the state will not
00011 be Ready.  The Ready state can only be reestablished by doing a reset().
00012 
00013 Pre/post module signals are posted only in the Ready state.
00014 
00015 Execution statistics are kept here.
00016 
00017 If a module has thrown an exception during execution, that exception
00018 will be rethrown if the worker is entered again and the state is not Ready.
00019 In other words, execution results (status) are cached and reused until
00020 the worker is reset().
00021 
00022 ----------------------------------------------------------------------*/
00023 
00024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00025 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
00026 #include "FWCore/Framework/src/WorkerParams.h"
00027 #include "FWCore/Framework/interface/Actions.h"
00028 #include "FWCore/Framework/interface/CurrentProcessingContext.h"
00029 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00030 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00031 #include "FWCore/Utilities/interface/Exception.h"
00032 #include "FWCore/Utilities/interface/ConvertException.h"
00033 #include "FWCore/Utilities/interface/BranchType.h"
00034 
00035 #include "boost/shared_ptr.hpp"
00036 #include "boost/utility.hpp"
00037 
00038 #include "FWCore/Framework/src/RunStopwatch.h"
00039 #include "FWCore/Framework/interface/Frameworkfwd.h"
00040 
00041 #include <sstream>
00042 
00043 namespace edm {
00044 
00045   class Worker : private boost::noncopyable {
00046   public:
00047     enum State { Ready, Pass, Fail, Exception };
00048 
00049     Worker(ModuleDescription const& iMD, WorkerParams const& iWP);
00050     virtual ~Worker();
00051 
00052     template <typename T>
00053     bool doWork(typename T::MyPrincipal&, EventSetup const& c,
00054                 CurrentProcessingContext const* cpc,
00055                 CPUTimer *const timer);
00056     void beginJob() ;
00057     void endJob();
00058     void respondToOpenInputFile(FileBlock const& fb) {implRespondToOpenInputFile(fb);}
00059     void respondToCloseInputFile(FileBlock const& fb) {implRespondToCloseInputFile(fb);}
00060     void respondToOpenOutputFiles(FileBlock const& fb) {implRespondToOpenOutputFiles(fb);}
00061     void respondToCloseOutputFiles(FileBlock const& fb) {implRespondToCloseOutputFiles(fb);}
00062 
00063     void preForkReleaseResources() {implPreForkReleaseResources();}
00064     void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
00065 
00066     void reset() { state_ = Ready; }
00067 
00068     ModuleDescription const& description() const {return md_;}
00069     ModuleDescription const* descPtr() const {return &md_; }
00072     void setActivityRegistry(boost::shared_ptr<ActivityRegistry> areg);
00073 
00074     std::pair<double, double> timeCpuReal() const {
00075       return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
00076     }
00077 
00078     void clearCounters() {
00079       timesRun_ = timesVisited_ = timesPassed_ = timesFailed_ = timesExcept_ = 0;
00080     }
00081     
00082     void useStopwatch();
00083 
00084     int timesRun() const { return timesRun_; }
00085     int timesVisited() const { return timesVisited_; }
00086     int timesPassed() const { return timesPassed_; }
00087     int timesFailed() const { return timesFailed_; }
00088     int timesExcept() const { return timesExcept_; }
00089     State state() const { return state_; }
00090 
00091     int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
00092 
00093   protected:
00094     virtual std::string workerType() const = 0;
00095     virtual bool implDoBegin(EventPrincipal&, EventSetup const& c,
00096                             CurrentProcessingContext const* cpc) = 0;
00097     virtual bool implDoEnd(EventPrincipal&, EventSetup const& c,
00098                             CurrentProcessingContext const* cpc) = 0;
00099     virtual bool implDoBegin(RunPrincipal& rp, EventSetup const& c,
00100                             CurrentProcessingContext const* cpc) = 0;
00101     virtual bool implDoEnd(RunPrincipal& rp, EventSetup const& c,
00102                             CurrentProcessingContext const* cpc) = 0;
00103     virtual bool implDoBegin(LuminosityBlockPrincipal& lbp, EventSetup const& c,
00104                             CurrentProcessingContext const* cpc) = 0;
00105     virtual bool implDoEnd(LuminosityBlockPrincipal& lbp, EventSetup const& c,
00106                             CurrentProcessingContext const* cpc) = 0;
00107     virtual void implBeginJob() = 0;
00108     virtual void implEndJob() = 0;
00109 
00110   private:
00111     virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
00112     virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
00113     virtual void implRespondToOpenOutputFiles(FileBlock const& fb) = 0;
00114     virtual void implRespondToCloseOutputFiles(FileBlock const& fb) = 0;
00115 
00116     virtual void implPreForkReleaseResources() = 0;
00117     virtual void implPostForkReacquireResources(unsigned int iChildIndex,
00118                                                unsigned int iNumberOfChildren) = 0;
00119     RunStopwatch::StopwatchPointer stopwatch_;
00120 
00121     int timesRun_;
00122     int timesVisited_;
00123     int timesPassed_;
00124     int timesFailed_;
00125     int timesExcept_;
00126     State state_;
00127 
00128     ModuleDescription md_;
00129     ActionTable const* actions_; // memory assumed to be managed elsewhere
00130     boost::shared_ptr<cms::Exception> cached_exception_; // if state is 'exception'
00131 
00132     boost::shared_ptr<ActivityRegistry> actReg_;
00133   };
00134 
00135   namespace {
00136     template <typename T>
00137     class ModuleSignalSentry {
00138     public:
00139       ModuleSignalSentry(ActivityRegistry *a, ModuleDescription& md) : a_(a), md_(&md) {
00140         if(a_) T::preModuleSignal(a_, md_);
00141       }
00142       ~ModuleSignalSentry() {
00143         if(a_) T::postModuleSignal(a_, md_);
00144       }
00145     private:
00146       ActivityRegistry* a_;
00147       ModuleDescription* md_;
00148     };
00149 
00150     template <typename T>
00151     void exceptionContext(typename T::MyPrincipal const& principal,
00152                           cms::Exception& ex,
00153                           CurrentProcessingContext const* cpc) {
00154       std::ostringstream ost;
00155       if (T::isEvent_) {
00156         ost << "Calling event method";
00157       }
00158       else if (T::begin_ && T::branchType_ == InRun) {
00159         ost << "Calling beginRun";
00160       }
00161       else if (T::begin_ && T::branchType_ == InLumi) {
00162         ost << "Calling beginLuminosityBlock";
00163       }
00164       else if (!T::begin_ && T::branchType_ == InLumi) {
00165         ost << "Calling endLuminosityBlock";
00166       }
00167       else if (!T::begin_ && T::branchType_ == InRun) {
00168         ost << "Calling endRun";
00169       }
00170       else {
00171         // It should be impossible to get here ...
00172         ost << "Calling unknown function";
00173       }
00174       if (cpc && cpc->moduleDescription()) {
00175         ost << " for module " << cpc->moduleDescription()->moduleName() << "/'" << cpc->moduleDescription()->moduleLabel() << "'";
00176       }
00177       ex.addContext(ost.str());
00178       ost.str("");
00179       ost << "Running path '";
00180       if (cpc && cpc->pathName()) {
00181         ost << *cpc->pathName() << "'";
00182       }
00183       else {
00184         ost << "unknown'";
00185       }
00186       ex.addContext(ost.str());
00187       ost.str("");
00188       ost << "Processing ";
00189       ost << principal.id();
00190       ex.addContext(ost.str());
00191     }
00192   }
00193 
00194   template <typename T>
00195   bool Worker::doWork(typename T::MyPrincipal& ep, 
00196                        EventSetup const& es,
00197                        CurrentProcessingContext const* cpc,
00198                        CPUTimer* const iTimer) {
00199 
00200     // A RunStopwatch, but only if we are processing an event.
00201     RunDualStopwatches stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer(),
00202                                  iTimer);
00203 
00204     if (T::isEvent_) {
00205       ++timesVisited_;
00206     }
00207     bool rc = false;
00208 
00209     switch(state_) {
00210       case Ready: break;
00211       case Pass: return true;
00212       case Fail: return false;
00213       case Exception: {
00214           cached_exception_->raise();
00215       }
00216     }
00217 
00218     if (T::isEvent_) ++timesRun_;
00219 
00220     try {
00221       try {
00222 
00223         ModuleSignalSentry<T> cpp(actReg_.get(), md_);
00224         if (T::begin_) {
00225           rc = implDoBegin(ep, es, cpc);
00226         } else {
00227           rc = implDoEnd(ep, es, cpc);
00228         }
00229 
00230         if (rc) {
00231           state_ = Pass;
00232           if (T::isEvent_) ++timesPassed_;
00233         } else {
00234           state_ = Fail;
00235           if (T::isEvent_) ++timesFailed_;
00236         }
00237       }
00238       catch (cms::Exception& e) { throw; }
00239       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00240       catch (std::exception& e) { convertException::stdToEDM(e); }
00241       catch(std::string& s) { convertException::stringToEDM(s); }
00242       catch(char const* c) { convertException::charPtrToEDM(c); }
00243       catch (...) { convertException::unknownToEDM(); }
00244     }
00245     catch(cms::Exception& ex) {
00246 
00247       // NOTE: the warning printed as a result of ignoring or failing
00248       // a module will only be printed during the full true processing
00249       // pass of this module
00250 
00251       // Get the action corresponding to this exception.  However, if processing
00252       // something other than an event (e.g. run, lumi) always rethrow.
00253       actions::ActionCodes action = (T::isEvent_ ? actions_->find(ex.category()) : actions::Rethrow);
00254 
00255       // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
00256       // as IgnoreCompletely, so any subsequent OutputModules are still run.
00257       // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
00258       if (cpc && cpc->isEndPath()) {
00259         if ((action == actions::SkipEvent && !cpc->isUnscheduled()) ||
00260              action == actions::FailPath) action = actions::IgnoreCompletely;
00261       }
00262       switch(action) {
00263         case actions::IgnoreCompletely:
00264           rc = true;
00265           ++timesPassed_;
00266           state_ = Pass;
00267           exceptionContext<T>(ep, ex, cpc);
00268           edm::printCmsExceptionWarning("IgnoreCompletely", ex);
00269           break;
00270         default:
00271           if (T::isEvent_) ++timesExcept_;
00272           state_ = Exception;
00273           cached_exception_.reset(ex.clone());
00274           cached_exception_->raise();
00275       }
00276     }
00277     return rc;
00278   }
00279 }
00280 #endif