CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_2_9_HLT1_bphpatch4/src/FWCore/Framework/interface/Schedule.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_Schedule_h
00002 #define FWCore_Framework_Schedule_h
00003 
00004 /*
00005   Author: Jim Kowalkowski  28-01-06
00006 
00007   A class for creating a schedule based on paths in the configuration file.
00008   The schedule is maintained as a sequence of paths.
00009   After construction, events can be fed to the object and passed through
00010   all the modules in the schedule.  All accounting about processing
00011   of events by modules and paths is contained here or in object held
00012   by containment.
00013 
00014   The trigger results producer and product are generated and managed here.
00015   This class also manages endpaths and calls to endjob and beginjob.
00016   Endpaths are just treated as a simple list of modules that need to
00017   do processing of the event and do not participate in trigger path
00018   activities.
00019 
00020   This class requires the high-level process pset.  It uses @process_name.
00021   If the high-level pset contains an "options" pset, then the
00022   following optional parameter can be present:
00023   bool wantSummary = true/false   # default false
00024 
00025   wantSummary indicates whether or not the pass/fail/error stats
00026   for modules and paths should be printed at the end-of-job.
00027 
00028   A TriggerResults object will always be inserted into the event
00029   for any schedule.  The producer of the TriggerResults EDProduct
00030   is always the first module in the endpath.  The TriggerResultInserter
00031   is given a fixed label of "TriggerResults".
00032 
00033   Processing of an event happens by pushing the event through the Paths.
00034   The scheduler performs the reset() on each of the workers independent
00035   of the Path objects.
00036 
00037   ------------------------
00038 
00039   About Paths:
00040   Paths fit into two categories:
00041   1) trigger paths that contribute directly to saved trigger bits
00042   2) end paths
00043   The Schedule holds these paths in two data structures:
00044   1) main path list
00045   2) end path list
00046 
00047   Trigger path processing always precedes endpath processing.
00048   The order of the paths from the input configuration is
00049   preserved in the main paths list.
00050 
00051   ------------------------
00052 
00053   The Schedule uses the TriggerNamesService to get the names of the
00054   trigger paths and end paths. When a TriggerResults object is created
00055   the results are stored in the same order as the trigger names from
00056   TriggerNamesService.
00057 
00058 */
00059 
00060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
00061 #include "FWCore/Framework/interface/Actions.h"
00062 #include "FWCore/Framework/interface/EventPrincipal.h"
00063 #include "FWCore/Framework/interface/Frameworkfwd.h"
00064 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00065 #include "FWCore/Framework/interface/UnscheduledHandler.h"
00066 #include "FWCore/Framework/src/Path.h"
00067 #include "FWCore/Framework/src/RunStopwatch.h"
00068 #include "FWCore/Framework/src/Worker.h"
00069 #include "FWCore/Framework/src/WorkerRegistry.h"
00070 #include "FWCore/MessageLogger/interface/JobReport.h"
00071 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00072 #include "FWCore/ServiceRegistry/interface/Service.h"
00073 #include "FWCore/Utilities/interface/Algorithms.h"
00074 
00075 #include "boost/shared_ptr.hpp"
00076 
00077 #include <map>
00078 #include <memory>
00079 #include <set>
00080 #include <string>
00081 #include <vector>
00082 
00083 namespace edm {
00084   namespace service {
00085     class TriggerNamesService;
00086   }
00087   class ActivityRegistry;
00088   class EventSetup;
00089   class OutputWorker;
00090   class RunStopwatch;
00091   class UnscheduledCallProducer;
00092   class WorkerInPath;
00093   class Schedule {
00094   public:
00095     typedef std::vector<std::string> vstring;
00096     typedef std::vector<Path> TrigPaths;
00097     typedef std::vector<Path> NonTrigPaths;
00098     typedef boost::shared_ptr<HLTGlobalStatus> TrigResPtr;
00099     typedef boost::shared_ptr<Worker> WorkerPtr;
00100     typedef std::vector<Worker*> AllWorkers;
00101     typedef std::vector<OutputWorker*> AllOutputWorkers;
00102 
00103     typedef std::vector<Worker*> Workers;
00104 
00105     typedef std::vector<WorkerInPath> PathWorkers;
00106 
00107     Schedule(ParameterSet& proc_pset,
00108              service::TriggerNamesService& tns,
00109              ProductRegistry& pregistry,
00110              ActionTable const& actions,
00111              boost::shared_ptr<ActivityRegistry> areg,
00112              boost::shared_ptr<ProcessConfiguration> processConfiguration);
00113 
00114     enum State { Ready = 0, Running, Latched };
00115 
00116     template <typename T>
00117     void processOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& eventSetup);
00118 
00119     void beginJob();
00120     void endJob();
00121 
00122     // Write the luminosity block
00123     void writeLumi(LuminosityBlockPrincipal const& lbp);
00124 
00125     // Write the run
00126     void writeRun(RunPrincipal const& rp);
00127 
00128     // Call closeFile() on all OutputModules.
00129     void closeOutputFiles();
00130 
00131     // Call openNewFileIfNeeded() on all OutputModules
00132     void openNewOutputFilesIfNeeded();
00133 
00134     // Call openFiles() on all OutputModules
00135     void openOutputFiles(FileBlock& fb);
00136 
00137     // Call respondToOpenInputFile() on all Modules
00138     void respondToOpenInputFile(FileBlock const& fb);
00139 
00140     // Call respondToCloseInputFile() on all Modules
00141     void respondToCloseInputFile(FileBlock const& fb);
00142 
00143     // Call respondToOpenOutputFiles() on all Modules
00144     void respondToOpenOutputFiles(FileBlock const& fb);
00145 
00146     // Call respondToCloseOutputFiles() on all Modules
00147     void respondToCloseOutputFiles(FileBlock const& fb);
00148 
00149     // Call shouldWeCloseFile() on all OutputModules.
00150     bool shouldWeCloseOutput() const;
00151 
00152     void preForkReleaseResources();
00153     void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren);
00154 
00155     std::pair<double, double> timeCpuReal() const {
00156       return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
00157     }
00158 
00161 
00165     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
00166 
00168     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
00169 
00171     void modulesInPath(std::string const& iPathLabel,
00172                        std::vector<std::string>& oLabelsToFill) const;
00173 
00177     int totalEvents() const {
00178       return total_events_;
00179     }
00180 
00183     int totalEventsPassed() const {
00184       return total_passed_;
00185     }
00186 
00189     int totalEventsFailed() const {
00190       return totalEvents() - totalEventsPassed();
00191     }
00192 
00195     void enableEndPaths(bool active);
00196 
00199     bool endPathsEnabled() const;
00200 
00203     void getTriggerReport(TriggerReport& rep) const;
00204 
00206     bool const terminate() const;
00207 
00209     void clearCounters();
00210 
00213     bool changeModule(std::string const& iLabel, ParameterSet const& iPSet);
00214 
00215   private:
00216 
00217     AllWorkers::const_iterator workersBegin() const {
00218       return all_workers_.begin();
00219     }
00220 
00221     AllWorkers::const_iterator workersEnd() const {
00222       return all_workers_.end();
00223     }
00224 
00225     AllWorkers::iterator workersBegin() {
00226       return  all_workers_.begin();
00227     }
00228 
00229     AllWorkers::iterator workersEnd() {
00230       return all_workers_.end();
00231     }
00232 
00233     void resetAll();
00234 
00235     template <typename T>
00236     bool runTriggerPaths(typename T::MyPrincipal&, EventSetup const&);
00237 
00238     template <typename T>
00239     void runEndPaths(typename T::MyPrincipal&, EventSetup const&);
00240 
00241     void setupOnDemandSystem(EventPrincipal& principal, EventSetup const& es);
00242 
00243     void reportSkipped(EventPrincipal const& ep) const;
00244     void reportSkipped(LuminosityBlockPrincipal const&) const {}
00245     void reportSkipped(RunPrincipal const&) const {}
00246 
00247     void fillWorkers(ParameterSet& proc_pset,
00248                      ProductRegistry& preg,
00249                      boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00250                      std::string const& name, bool ignoreFilters, PathWorkers& out);
00251     void fillTrigPath(ParameterSet& proc_pset,
00252                       ProductRegistry& preg,
00253                       boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00254                       int bitpos, std::string const& name, TrigResPtr);
00255     void fillEndPath(ParameterSet& proc_pset,
00256                      ProductRegistry& preg,
00257                      boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00258                      int bitpos, std::string const& name);
00259 
00260     void limitOutput(ParameterSet const& proc_pset);
00261 
00262     void addToAllWorkers(Worker* w);
00263 
00264     WorkerRegistry                                worker_reg_;
00265     ActionTable const*                            act_table_;
00266     boost::shared_ptr<ActivityRegistry>           actReg_;
00267 
00268     State                    state_;
00269     vstring                  trig_name_list_;
00270     vstring                  end_path_name_list_;
00271 
00272     TrigResPtr               results_;
00273     TrigResPtr               endpath_results_;
00274 
00275     WorkerPtr                results_inserter_;
00276     AllWorkers               all_workers_;
00277     AllOutputWorkers         all_output_workers_;
00278     TrigPaths                trig_paths_;
00279     TrigPaths                end_paths_;
00280 
00281     bool                           wantSummary_;
00282     int                            total_events_;
00283     int                            total_passed_;
00284     RunStopwatch::StopwatchPointer stopwatch_;
00285 
00286     boost::shared_ptr<UnscheduledCallProducer> unscheduled_;
00287 
00288     volatile bool           endpathsAreActive_;
00289   };
00290 
00291   // -----------------------------
00292   // ProcessOneOccurrence is a functor that has bound a specific
00293   // Principal and Event Setup, and can be called with a Path, to
00294   // execute Path::processOneOccurrence for that event
00295 
00296   template <typename T>
00297   class ProcessOneOccurrence {
00298   public:
00299     typedef void result_type;
00300     ProcessOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& setup) :
00301       ep(principal), es(setup) {};
00302 
00303       void operator()(Path& p) {p.processOneOccurrence<T>(ep, es);}
00304 
00305   private:
00306     typename T::MyPrincipal&   ep;
00307     EventSetup const& es;
00308   };
00309 
00310   class UnscheduledCallProducer : public UnscheduledHandler {
00311   public:
00312     UnscheduledCallProducer() : UnscheduledHandler(), labelToWorkers_() {}
00313     void addWorker(Worker* aWorker) {
00314       assert(0 != aWorker);
00315       labelToWorkers_[aWorker->description().moduleLabel()] = aWorker;
00316     }
00317 
00318     template <typename T>
00319     void runNow(typename T::MyPrincipal& p, EventSetup const& es) {
00320       //do nothing for event since we will run when requested
00321       if(!T::isEvent_) {
00322         for(std::map<std::string, Worker*>::iterator it = labelToWorkers_.begin(), itEnd=labelToWorkers_.end();
00323             it != itEnd;
00324             ++it) {
00325           CPUTimer timer;
00326           it->second->doWork<T>(p, es, 0, &timer);
00327         }
00328       }
00329     }
00330 
00331   private:
00332     virtual bool tryToFillImpl(std::string const& moduleLabel,
00333                                EventPrincipal& event,
00334                                EventSetup const& eventSetup,
00335                                CurrentProcessingContext const* iContext) {
00336       std::map<std::string, Worker*>::const_iterator itFound =
00337         labelToWorkers_.find(moduleLabel);
00338       if(itFound != labelToWorkers_.end()) {
00339         CPUTimer timer;
00340         itFound->second->doWork<OccurrenceTraits<EventPrincipal, BranchActionBegin> >(event, eventSetup, iContext, &timer);
00341         return true;
00342       }
00343       return false;
00344     }
00345     std::map<std::string, Worker*> labelToWorkers_;
00346   };
00347 
00348   void
00349   inline
00350   Schedule::reportSkipped(EventPrincipal const& ep) const {
00351     Service<JobReport> reportSvc;
00352     reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
00353   }
00354 
00355   template <typename T>
00356   void
00357   Schedule::processOneOccurrence(typename T::MyPrincipal& ep, EventSetup const& es) {
00358     this->resetAll();
00359     state_ = Running;
00360 
00361     // A RunStopwatch, but only if we are processing an event.
00362     RunStopwatch stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer());
00363 
00364     if (T::isEvent_) {
00365       ++total_events_;
00366       setupOnDemandSystem(dynamic_cast<EventPrincipal&>(ep), es);
00367     }
00368     try {
00369       try {
00370         //make sure the unscheduled items see this transition [Event will be a no-op]
00371         unscheduled_->runNow<T>(ep, es);
00372         if (runTriggerPaths<T>(ep, es)) {
00373           if (T::isEvent_) ++total_passed_;
00374         }
00375         state_ = Latched;
00376       }
00377       catch(cms::Exception& e) {
00378         actions::ActionCodes action = (T::isEvent_ ? act_table_->find(e.rootCause()) : actions::Rethrow);
00379         assert (action != actions::IgnoreCompletely);
00380         assert (action != actions::FailPath);
00381         assert (action != actions::FailModule);
00382         if (action == actions::SkipEvent) {
00383             LogWarning(e.category())
00384               << "an exception occurred and all paths for the event are being skipped: \n"
00385               << e.what();
00386         } else {
00387            throw;
00388         }
00389       }
00390 
00391       try {
00392         CPUTimer timer;
00393         if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, 0, &timer);
00394       }
00395       catch (cms::Exception& e) {
00396         e << "EventProcessingStopped\n";
00397         e << "Attempt to insert TriggerResults into event failed.\n";
00398         throw;
00399       }
00400 
00401       if (endpathsAreActive_) runEndPaths<T>(ep, es);
00402     }
00403     catch(cms::Exception& e) {
00404       actions::ActionCodes action = (T::isEvent_ ? act_table_->find(e.rootCause()) : actions::Rethrow);
00405       assert (action != actions::SkipEvent);
00406       assert (action != actions::FailPath);
00407       assert (action != actions::FailModule);
00408       switch(action) {
00409       case actions::IgnoreCompletely: {
00410           LogWarning(e.category())
00411             << "exception being ignored for current event:\n"
00412             << e.what();
00413           break;
00414       }
00415       default: {
00416         state_ = Ready;
00417         e << "EventProcessingStopped\n";
00418         e << "an exception occurred during current event processing\n";
00419         throw;
00420       }
00421       }
00422     }
00423     catch(...) {
00424       LogError("PassingThrough")
00425         << "an exception occurred during current event processing\n";
00426       state_ = Ready;
00427       throw;
00428     }
00429 
00430     // next thing probably is not needed, the product insertion code clears it
00431     state_ = Ready;
00432 
00433   }
00434 
00435   template <typename T>
00436   bool
00437   Schedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00438     for_all(trig_paths_, ProcessOneOccurrence<T>(ep, es));
00439     return results_->accept();
00440   }
00441 
00442   template <typename T>
00443   void
00444   Schedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00445     // Note there is no state-checking safety controlling the
00446     // activation/deactivation of endpaths.
00447     for_all(end_paths_, ProcessOneOccurrence<T>(ep, es));
00448 
00449     // We could get rid of the functor ProcessOneOccurrence if we used
00450     // boost::lambda, but the use of lambda with member functions
00451     // which take multiple arguments, by both non-const and const
00452     // reference, seems much more obscure...
00453     //
00454     // using namespace boost::lambda;
00455     // for_all(end_paths_,
00456     //          bind(&Path::processOneOccurrence,
00457     //               boost::lambda::_1, // qualification to avoid ambiguity
00458     //               var(ep),           //  pass by reference (not copy)
00459     //               constant_ref(es))); // pass by const-reference (not copy)
00460   }
00461 }
00462 
00463 #endif