CMS 3D CMS Logo

Schedule.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_Schedule_h
00002 #define FWCore_Framework_Schedule_h
00003 
00004 /*
00005   Author: Jim Kowalkowski  28-01-06
00006 
00007   A class for creating a schedule based on paths in the configuration file.
00008   The schedule is maintained as a sequence of paths.
00009   After construction, events can be fed to the object and passed through
00010   all the modules in the schedule.  All accounting about processing
00011   of events by modules and paths is contained here or in object held
00012   by containment.
00013 
00014   The trigger results producer and product are generated and managed here.
00015   This class also manages endpaths and calls to endjob and beginjob.
00016   Endpaths are just treated as a simple list of modules that need to
00017   do processing of the event and do not participate in trigger path
00018   activities.
00019 
00020   This class requires the high-level process pset.  It uses @process_name.
00021   If the high-level pset contains an "options" pset, then the
00022   following optional parameter can be present:
00023   bool wantSummary = true/false   # default false
00024 
00025   wantSummary indicates whether or not the pass/fail/error stats
00026   for modules and paths should be printed at the end-of-job.
00027 
00028   A TriggerResults object will always be inserted into the event
00029   for any schedule.  The producer of the TriggerResults EDProduct
00030   is always the first module in the endpath.  The TriggerResultInserter
00031   is given a fixed label of "TriggerResults".
00032 
00033   The Schedule throws an exception if an output modules is present
00034   in a path.  It belongs in an endpath.
00035 
00036   The Schedule throws an exception if a filter is present
00037   in an endpath.  It belongs in a path.
00038 
00039   The Schedule issues a warning if a producer is present
00040   in an endpath.  It belongs in a path. 
00041 
00042   Processing of an event happens by pushing the event through the Paths.
00043   The scheduler performs the reset() on each of the workers independent
00044   of the Path objects. 
00045 
00046   ------------------------
00047 
00048   About Paths:
00049   Paths fit into two categories:
00050   1) trigger paths that contribute directly to saved trigger bits
00051   2) end paths
00052   The Schedule holds these paths in two data structures:
00053   1) main path list
00054   2) end path list
00055 
00056   Trigger path processing always precedes endpath processing.
00057   The order of the paths from the input configuration is
00058   preserved in the main paths list.
00059 
00060   ------------------------
00061 
00062   The Schedule uses the TriggerNamesService to get the names of the
00063   trigger paths and end paths. When a TriggerResults object is created
00064   the results are stored in the same order as the trigger names from
00065   TriggerNamesService.
00066 
00067 */
00068 
00069 #include "FWCore/Utilities/interface/Algorithms.h"
00070 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00071 #include "DataFormats/Provenance/interface/ProvenanceFwd.h"
00072 #include "DataFormats/Provenance/interface/Provenance.h"
00073 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
00074 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00075 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00076 #include "FWCore/ServiceRegistry/interface/Service.h"
00077 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00078 #include "FWCore/MessageLogger/interface/JobReport.h"
00079 #include "FWCore/Framework/interface/Frameworkfwd.h"
00080 #include "FWCore/Framework/interface/Actions.h"
00081 #include "FWCore/Framework/src/Path.h"
00082 #include "FWCore/Framework/src/RunStopwatch.h"
00083 #include "FWCore/Framework/interface/EventPrincipal.h"
00084 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00085 #include "FWCore/Framework/interface/UnscheduledHandler.h"
00086 #include "FWCore/Framework/src/Worker.h"
00087 
00088 #include "boost/shared_ptr.hpp"
00089 
00090 #include <map>
00091 #include <memory>
00092 #include <string>
00093 #include <vector>
00094 #include <set>
00095 
00096 namespace edm {
00097   namespace service {
00098     class TriggerNamesService;
00099   }
00100   class ActivityRegistry;
00101   class EventSetup;
00102   class OutputWorker;
00103   class UnscheduledCallProducer;
00104   class RunStopwatch;
00105   class WorkerInPath;
00106   class WorkerRegistry;
00107   class Schedule {
00108   public:
00109     typedef std::vector<std::string> vstring;
00110     typedef std::vector<Path> TrigPaths;
00111     typedef std::vector<Path> NonTrigPaths;
00112     typedef boost::shared_ptr<HLTGlobalStatus> TrigResPtr;
00113     typedef boost::shared_ptr<Worker> WorkerPtr;
00114     typedef std::vector<Worker*> AllWorkers;
00115     typedef std::vector<OutputWorker*> AllOutputWorkers;
00116 
00117     typedef std::vector<Worker*> Workers;
00118 
00119     typedef std::vector<WorkerInPath> PathWorkers;
00120 
00121     Schedule(ParameterSet const& processDesc,
00122              edm::service::TriggerNamesService& tns,
00123              WorkerRegistry& wregistry,
00124              ProductRegistry& pregistry,
00125              ActionTable& actions,
00126              boost::shared_ptr<ActivityRegistry> areg);
00127 
00128     enum State { Ready=0, Running, Latched };
00129 
00130     template <typename T>
00131     void processOneOccurrence(typename T::MyPrincipal& principal, 
00132                      EventSetup const& eventSetup);
00133 
00134     void beginJob(EventSetup const&);
00135     void endJob();
00136 
00137     // Write the luminosity block
00138     void writeLumi(LuminosityBlockPrincipal const& lbp);
00139 
00140     // Write the run
00141     void writeRun(RunPrincipal const& rp);
00142 
00143     // Call closeFile() on all OutputModules.
00144     void closeOutputFiles();
00145 
00146     // Call openNewFileIfNeeded() on all OutputModules
00147     void openNewOutputFilesIfNeeded();
00148 
00149     // Call openFiles() on all OutputModules
00150     void openOutputFiles(FileBlock & fb);
00151 
00152     // Call respondToOpenInputFile() on all Modules
00153     void respondToOpenInputFile(FileBlock const& fb);
00154 
00155     // Call respondToCloseInputFile() on all Modules
00156     void respondToCloseInputFile(FileBlock const& fb);
00157 
00158     // Call respondToOpenOutputFiles() on all Modules
00159     void respondToOpenOutputFiles(FileBlock const& fb);
00160 
00161     // Call respondToCloseOutputFiles() on all Modules
00162     void respondToCloseOutputFiles(FileBlock const& fb);
00163 
00164     // Call shouldWeCloseFile() on all OutputModules.
00165     bool shouldWeCloseOutput() const;
00166 
00167     std::pair<double,double> timeCpuReal() const {
00168       return std::pair<double,double>(stopwatch_->cpuTime(),stopwatch_->realTime());
00169     }
00170 
00173 
00177     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
00178 
00182     int totalEvents() const {
00183       return total_events_;
00184     }
00185 
00188     int totalEventsPassed() const {
00189       return total_passed_;
00190     }
00191 
00194     int totalEventsFailed() const {
00195       return totalEvents() - totalEventsPassed();
00196     }
00197 
00200     void enableEndPaths(bool active);
00201 
00204     bool endPathsEnabled() const;
00205 
00208     void getTriggerReport(TriggerReport& rep) const;      
00209 
00211     bool const terminate() const;
00212 
00214     void clearCounters();
00215 
00216   private:
00217     AllWorkers::const_iterator workersBegin() const 
00218     { return all_workers_.begin(); }
00219 
00220     AllWorkers::const_iterator workersEnd() const 
00221     { return all_workers_.end(); }
00222 
00223     AllWorkers::iterator workersBegin() 
00224     { return  all_workers_.begin(); }
00225 
00226     AllWorkers::iterator workersEnd() 
00227     { return all_workers_.end(); }
00228 
00229     void resetAll();
00230 
00231     template <typename T>
00232     bool runTriggerPaths(typename T::MyPrincipal &, EventSetup const&);
00233 
00234     template <typename T>
00235     void runEndPaths(typename T::MyPrincipal &, EventSetup const&);
00236 
00237     void setupOnDemandSystem(EventPrincipal& principal, EventSetup const& es);
00238 
00239     void reportSkipped(EventPrincipal const& ep) const;
00240     void reportSkipped(LuminosityBlockPrincipal const&) const {}
00241     void reportSkipped(RunPrincipal const&) const {}
00242 
00243     void fillWorkers(std::string const& name, PathWorkers& out);
00244     void fillTrigPath(int bitpos, std::string const& name, TrigResPtr);
00245     void fillEndPath(int bitpos, std::string const& name);
00246 
00247     void limitOutput();
00248 
00249     void addToAllWorkers(Worker* w);
00250 
00251     ParameterSet        pset_;
00252     WorkerRegistry*     worker_reg_;
00253     ProductRegistry*    prod_reg_;
00254     ActionTable*        act_table_;
00255     std::string         processName_;
00256     boost::shared_ptr<ActivityRegistry> actReg_;
00257 
00258     State state_;
00259     vstring trig_name_list_;
00260     vstring               end_path_name_list_;
00261 
00262     TrigResPtr   results_;
00263     TrigResPtr   endpath_results_;
00264 
00265     WorkerPtr                results_inserter_;
00266     AllWorkers               all_workers_;
00267     AllOutputWorkers         all_output_workers_;
00268     TrigPaths                trig_paths_;
00269     TrigPaths                end_paths_;
00270 
00271     bool                             wantSummary_;
00272     int                              total_events_;
00273     int                              total_passed_;
00274     RunStopwatch::StopwatchPointer   stopwatch_;
00275 
00276     boost::shared_ptr<UnscheduledCallProducer> unscheduled_;
00277     std::vector<boost::shared_ptr<ConstBranchDescription const> >  demandBranches_;
00278 
00279     volatile bool       endpathsAreActive_;
00280   };
00281 
00282   namespace {
00283     template <typename T>
00284     class ScheduleSignalSentry {
00285     public:
00286       ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* ep, EventSetup const* es) :
00287            a_(a),ep_(ep),es_(es) {
00288         if (a_) T::preScheduleSignal(a_, ep_);
00289       }
00290       ~ScheduleSignalSentry() {
00291         if (a_) if (ep_) T::postScheduleSignal(a_, ep_, es_);
00292       }
00293 
00294     private:
00295       // We own none of these resources.
00296       ActivityRegistry* a_;
00297       typename T::MyPrincipal*  ep_;
00298       EventSetup const* es_;
00299     };
00300   }
00301 
00302   // -----------------------------
00303   // ProcessOneOccurrence is a functor that has bound a specific
00304   // Principal and Event Setup, and can be called with a Path, to
00305   // execute Path::processOneOccurrence for that event
00306     
00307   template <typename T>
00308   class ProcessOneOccurrence {
00309   public:
00310     typedef void result_type;
00311     ProcessOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& setup) :
00312       ep(principal), es(setup) {};
00313 
00314       void operator()(Path& p) {p.processOneOccurrence<T>(ep, es);}
00315 
00316   private:      
00317     typename T::MyPrincipal&   ep;
00318     EventSetup const& es;
00319   };
00320 
00321   class UnscheduledCallProducer : public UnscheduledHandler {
00322   public:
00323     UnscheduledCallProducer() : UnscheduledHandler(), labelToWorkers_() {}
00324     void addWorker(Worker* aWorker) {
00325       assert(0 != aWorker);
00326       labelToWorkers_[aWorker->description().moduleLabel_]=aWorker;
00327     }
00328   private:
00329     virtual bool tryToFillImpl(std::string const& moduleLabel,
00330                                EventPrincipal& event,
00331                                const EventSetup& eventSetup) {
00332       std::map<std::string, Worker*>::const_iterator itFound =
00333         labelToWorkers_.find(moduleLabel);
00334       if(itFound != labelToWorkers_.end()) {
00335           // Unscheduled reconstruction has no accepted definition
00336           // (yet) of the "current path". We indicate this by passing
00337           // a null pointer as the CurrentProcessingContext.
00338           itFound->second->doWork<OccurrenceTraits<EventPrincipal, BranchActionBegin> >(event, eventSetup, 0);
00339           return true;
00340       }
00341       return false;
00342     }
00343     std::map<std::string, Worker*> labelToWorkers_;
00344   };
00345 
00346   void
00347   inline
00348   Schedule::reportSkipped(EventPrincipal const& ep) const {
00349     Service<JobReport> reportSvc;
00350     reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
00351   }
00352   
00353   template <typename T>
00354   void
00355   Schedule::processOneOccurrence(typename T::MyPrincipal& ep, EventSetup const& es) {
00356     this->resetAll();
00357     state_ = Running;
00358 
00359     // A RunStopwatch, but only if we are processing an event.
00360     std::auto_ptr<RunStopwatch> stopwatch(T::isEvent_ ? new RunStopwatch(stopwatch_) : 0);
00361 
00362     if (T::isEvent_) {
00363       ++total_events_;
00364       setupOnDemandSystem(dynamic_cast<EventPrincipal &>(ep), es);
00365     }
00366     try {
00367       //If the ScheduleSignalSentry object is used, it must live for the entire time the event is
00368       // being processed
00369       std::auto_ptr<ScheduleSignalSentry<T> > sentry;
00370       try {
00371         sentry = std::auto_ptr<ScheduleSignalSentry<T> >(new ScheduleSignalSentry<T>(actReg_.get(), &ep, &es));
00372         if (runTriggerPaths<T>(ep, es)) {
00373           if (T::isEvent_) ++total_passed_;
00374         }
00375         state_ = Latched;
00376         
00377         if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, 0);
00378       }
00379       catch(cms::Exception& e) {
00380         actions::ActionCodes action = (T::isEvent_ ? act_table_->find(e.rootCause()) : actions::Rethrow);
00381         assert (action != actions::IgnoreCompletely);
00382         assert (action != actions::FailPath);
00383         assert (action != actions::FailModule);
00384         if (action == actions::SkipEvent) {
00385             LogWarning(e.category())
00386               << "an exception occurred and all paths for the event are being skipped: \n"
00387               << e.what();
00388         } else {
00389           throw;
00390         }
00391       }
00392 
00393       if (endpathsAreActive_) runEndPaths<T>(ep, es);
00394     }
00395     catch(cms::Exception& ex) {
00396       actions::ActionCodes action = (T::isEvent_ ? act_table_->find(ex.rootCause()) : actions::Rethrow);
00397       assert (action != actions::SkipEvent);
00398       assert (action != actions::FailPath);
00399       assert (action != actions::FailModule);
00400       switch(action) {
00401       case actions::IgnoreCompletely: {
00402         LogWarning(ex.category())
00403           << "exception being ignored for current event:\n"
00404           << ex.what();
00405         break;
00406       }
00407       default: {
00408         state_ = Ready;
00409         throw edm::Exception(errors::EventProcessorFailure,
00410                              "EventProcessingStopped",ex)
00411           << "an exception occurred during current event processing\n";
00412       }
00413       }
00414     }
00415     catch(...) {
00416       LogError("PassingThrough")
00417         << "an exception occurred during current event processing\n";
00418       state_ = Ready;
00419       throw;
00420     }
00421 
00422     // next thing probably is not needed, the product insertion code clears it
00423     state_ = Ready;
00424 
00425   }
00426 
00427   template <typename T>
00428   bool
00429   Schedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00430     for_all(trig_paths_, ProcessOneOccurrence<T>(ep, es));
00431     return results_->accept();
00432   }
00433 
00434   template <typename T>
00435   void
00436   Schedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00437     // Note there is no state-checking safety controlling the
00438     // activation/deactivation of endpaths.
00439     for_all(end_paths_, ProcessOneOccurrence<T>(ep, es));
00440 
00441     // We could get rid of the functor ProcessOneOccurrence if we used
00442     // boost::lambda, but the use of lambda with member functions
00443     // which take multiple arguments, by both non-const and const
00444     // reference, seems much more obscure...
00445     //
00446     // using namespace boost::lambda;
00447     // for_all(end_paths_,
00448     //          bind(&Path::processOneOccurrence, 
00449     //               boost::lambda::_1, // qualification to avoid ambiguity
00450     //               var(ep),           //  pass by reference (not copy)
00451     //               constant_ref(es))); // pass by const-reference (not copy)
00452   }
00453   
00454 }
00455 
00456 #endif

Generated on Tue Jun 9 17:35:49 2009 for CMSSW by  doxygen 1.5.4