CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_6_1_1/src/FWCore/Framework/src/Worker.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_Worker_h
00002 #define FWCore_Framework_Worker_h
00003 
00004 /*----------------------------------------------------------------------
00005 
00006 Worker: this is a basic scheduling unit - an abstract base class to
00007 something that is really a producer or filter.
00008 
00009 A worker will not actually call through to the module unless it is
00010 in a Ready state.  After a module is actually run, the state will not
00011 be Ready.  The Ready state can only be reestablished by doing a reset().
00012 
00013 Pre/post module signals are posted only in the Ready state.
00014 
00015 Execution statistics are kept here.
00016 
00017 If a module has thrown an exception during execution, that exception
00018 will be rethrown if the worker is entered again and the state is not Ready.
00019 In other words, execution results (status) are cached and reused until
00020 the worker is reset().
00021 
00022 ----------------------------------------------------------------------*/
00023 
00024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00025 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
00026 #include "FWCore/Framework/src/WorkerParams.h"
00027 #include "FWCore/Framework/interface/Actions.h"
00028 #include "FWCore/Framework/interface/CurrentProcessingContext.h"
00029 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00030 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00031 #include "FWCore/Utilities/interface/Exception.h"
00032 #include "FWCore/Utilities/interface/ConvertException.h"
00033 #include "FWCore/Utilities/interface/BranchType.h"
00034 
00035 #include "boost/shared_ptr.hpp"
00036 #include "boost/utility.hpp"
00037 
00038 #include "FWCore/Framework/src/RunStopwatch.h"
00039 #include "FWCore/Framework/interface/Frameworkfwd.h"
00040 
00041 #include <sstream>
00042 
00043 namespace edm {
00044   class EventPrincipal;
00045   class EarlyDeleteHelper;
00046 
00047   class Worker : private boost::noncopyable {
00048   public:
00049     enum State { Ready, Pass, Fail, Exception };
00050 
00051     Worker(ModuleDescription const& iMD, WorkerParams const& iWP);
00052     virtual ~Worker();
00053 
00054     template <typename T>
00055     bool doWork(typename T::MyPrincipal&, EventSetup const& c,
00056                 CurrentProcessingContext const* cpc,
00057                 CPUTimer *const timer);
00058     void beginJob() ;
00059     void endJob();
00060     void respondToOpenInputFile(FileBlock const& fb) {implRespondToOpenInputFile(fb);}
00061     void respondToCloseInputFile(FileBlock const& fb) {implRespondToCloseInputFile(fb);}
00062     void respondToOpenOutputFiles(FileBlock const& fb) {implRespondToOpenOutputFiles(fb);}
00063     void respondToCloseOutputFiles(FileBlock const& fb) {implRespondToCloseOutputFiles(fb);}
00064 
00065     void preForkReleaseResources() {implPreForkReleaseResources();}
00066     void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {implPostForkReacquireResources(iChildIndex, iNumberOfChildren);}
00067 
00068     void reset() { state_ = Ready; }
00069 
00070     void pathFinished(EventPrincipal&);
00071     void postDoEvent(EventPrincipal&);
00072     
00073     ModuleDescription const& description() const {return md_;}
00074     ModuleDescription const* descPtr() const {return &md_; }
00077     void setActivityRegistry(boost::shared_ptr<ActivityRegistry> areg);
00078     
00079     void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
00080 
00081     std::pair<double, double> timeCpuReal() const {
00082       return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
00083     }
00084 
00085     void clearCounters() {
00086       timesRun_ = timesVisited_ = timesPassed_ = timesFailed_ = timesExcept_ = 0;
00087     }
00088     
00089     void useStopwatch();
00090 
00091     int timesRun() const { return timesRun_; }
00092     int timesVisited() const { return timesVisited_; }
00093     int timesPassed() const { return timesPassed_; }
00094     int timesFailed() const { return timesFailed_; }
00095     int timesExcept() const { return timesExcept_; }
00096     State state() const { return state_; }
00097 
00098     int timesPass() const { return timesPassed(); } // for backward compatibility only - to be removed soon
00099 
00100   protected:
00101     virtual std::string workerType() const = 0;
00102     virtual bool implDoBegin(EventPrincipal&, EventSetup const& c,
00103                             CurrentProcessingContext const* cpc) = 0;
00104     virtual bool implDoEnd(EventPrincipal&, EventSetup const& c,
00105                             CurrentProcessingContext const* cpc) = 0;
00106     virtual bool implDoBegin(RunPrincipal& rp, EventSetup const& c,
00107                             CurrentProcessingContext const* cpc) = 0;
00108     virtual bool implDoEnd(RunPrincipal& rp, EventSetup const& c,
00109                             CurrentProcessingContext const* cpc) = 0;
00110     virtual bool implDoBegin(LuminosityBlockPrincipal& lbp, EventSetup const& c,
00111                             CurrentProcessingContext const* cpc) = 0;
00112     virtual bool implDoEnd(LuminosityBlockPrincipal& lbp, EventSetup const& c,
00113                             CurrentProcessingContext const* cpc) = 0;
00114     virtual void implBeginJob() = 0;
00115     virtual void implEndJob() = 0;
00116 
00117   private:
00118     virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
00119     virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
00120     virtual void implRespondToOpenOutputFiles(FileBlock const& fb) = 0;
00121     virtual void implRespondToCloseOutputFiles(FileBlock const& fb) = 0;
00122 
00123     virtual void implPreForkReleaseResources() = 0;
00124     virtual void implPostForkReacquireResources(unsigned int iChildIndex,
00125                                                unsigned int iNumberOfChildren) = 0;
00126     RunStopwatch::StopwatchPointer stopwatch_;
00127 
00128     int timesRun_;
00129     int timesVisited_;
00130     int timesPassed_;
00131     int timesFailed_;
00132     int timesExcept_;
00133     State state_;
00134 
00135     ModuleDescription md_;
00136     ActionTable const* actions_; // memory assumed to be managed elsewhere
00137     boost::shared_ptr<cms::Exception> cached_exception_; // if state is 'exception'
00138 
00139     boost::shared_ptr<ActivityRegistry> actReg_;
00140     
00141     EarlyDeleteHelper* earlyDeleteHelper_;
00142   };
00143 
00144   namespace {
00145     template <typename T>
00146     class ModuleSignalSentry {
00147     public:
00148       ModuleSignalSentry(ActivityRegistry *a, ModuleDescription& md) : a_(a), md_(&md) {
00149         if(a_) T::preModuleSignal(a_, md_);
00150       }
00151       ~ModuleSignalSentry() {
00152         if(a_) T::postModuleSignal(a_, md_);
00153       }
00154     private:
00155       ActivityRegistry* a_;
00156       ModuleDescription* md_;
00157     };
00158 
00159     template <typename T>
00160     void exceptionContext(typename T::MyPrincipal const& principal,
00161                           cms::Exception& ex,
00162                           CurrentProcessingContext const* cpc) {
00163       std::ostringstream ost;
00164       if (T::isEvent_) {
00165         ost << "Calling event method";
00166       }
00167       else if (T::begin_ && T::branchType_ == InRun) {
00168         ost << "Calling beginRun";
00169       }
00170       else if (T::begin_ && T::branchType_ == InLumi) {
00171         ost << "Calling beginLuminosityBlock";
00172       }
00173       else if (!T::begin_ && T::branchType_ == InLumi) {
00174         ost << "Calling endLuminosityBlock";
00175       }
00176       else if (!T::begin_ && T::branchType_ == InRun) {
00177         ost << "Calling endRun";
00178       }
00179       else {
00180         // It should be impossible to get here ...
00181         ost << "Calling unknown function";
00182       }
00183       if (cpc && cpc->moduleDescription()) {
00184         ost << " for module " << cpc->moduleDescription()->moduleName() << "/'" << cpc->moduleDescription()->moduleLabel() << "'";
00185       }
00186       ex.addContext(ost.str());
00187       ost.str("");
00188       ost << "Running path '";
00189       if (cpc && cpc->pathName()) {
00190         ost << *cpc->pathName() << "'";
00191       }
00192       else {
00193         ost << "unknown'";
00194       }
00195       ex.addContext(ost.str());
00196       ost.str("");
00197       ost << "Processing ";
00198       ost << principal.id();
00199       ex.addContext(ost.str());
00200     }
00201   }
00202 
00203   template <typename T>
00204   bool Worker::doWork(typename T::MyPrincipal& ep, 
00205                        EventSetup const& es,
00206                        CurrentProcessingContext const* cpc,
00207                        CPUTimer* const iTimer) {
00208 
00209     // A RunStopwatch, but only if we are processing an event.
00210     RunDualStopwatches stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer(),
00211                                  iTimer);
00212 
00213     if (T::isEvent_) {
00214       ++timesVisited_;
00215     }
00216     bool rc = false;
00217 
00218     switch(state_) {
00219       case Ready: break;
00220       case Pass: return true;
00221       case Fail: return false;
00222       case Exception: {
00223           cached_exception_->raise();
00224       }
00225     }
00226 
00227     if (T::isEvent_) ++timesRun_;
00228 
00229     try {
00230       try {
00231 
00232         ModuleSignalSentry<T> cpp(actReg_.get(), md_);
00233         if (T::begin_) {
00234           rc = implDoBegin(ep, es, cpc);
00235         } else {
00236           rc = implDoEnd(ep, es, cpc);
00237         }
00238 
00239         if (rc) {
00240           state_ = Pass;
00241           if (T::isEvent_) ++timesPassed_;
00242         } else {
00243           state_ = Fail;
00244           if (T::isEvent_) ++timesFailed_;
00245         }
00246       }
00247       catch (cms::Exception& e) { throw; }
00248       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00249       catch (std::exception& e) { convertException::stdToEDM(e); }
00250       catch(std::string& s) { convertException::stringToEDM(s); }
00251       catch(char const* c) { convertException::charPtrToEDM(c); }
00252       catch (...) { convertException::unknownToEDM(); }
00253     }
00254     catch(cms::Exception& ex) {
00255 
00256       // NOTE: the warning printed as a result of ignoring or failing
00257       // a module will only be printed during the full true processing
00258       // pass of this module
00259 
00260       // Get the action corresponding to this exception.  However, if processing
00261       // something other than an event (e.g. run, lumi) always rethrow.
00262       actions::ActionCodes action = (T::isEvent_ ? actions_->find(ex.category()) : actions::Rethrow);
00263 
00264       // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
00265       // as IgnoreCompletely, so any subsequent OutputModules are still run.
00266       // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
00267       if (cpc && cpc->isEndPath()) {
00268         if ((action == actions::SkipEvent && !cpc->isUnscheduled()) ||
00269              action == actions::FailPath) action = actions::IgnoreCompletely;
00270       }
00271       switch(action) {
00272         case actions::IgnoreCompletely:
00273           rc = true;
00274           ++timesPassed_;
00275           state_ = Pass;
00276           exceptionContext<T>(ep, ex, cpc);
00277           edm::printCmsExceptionWarning("IgnoreCompletely", ex);
00278           break;
00279         default:
00280           if (T::isEvent_) ++timesExcept_;
00281           state_ = Exception;
00282           cached_exception_.reset(ex.clone());
00283           cached_exception_->raise();
00284       }
00285     }
00286     return rc;
00287   }
00288 }
00289 #endif