CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_2_7_hltpatch2/src/FWCore/Framework/interface/Schedule.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_Schedule_h
00002 #define FWCore_Framework_Schedule_h
00003 
00004 /*
00005   Author: Jim Kowalkowski  28-01-06
00006 
00007   A class for creating a schedule based on paths in the configuration file.
00008   The schedule is maintained as a sequence of paths.
00009   After construction, events can be fed to the object and passed through
00010   all the modules in the schedule.  All accounting about processing
00011   of events by modules and paths is contained here or in object held
00012   by containment.
00013 
00014   The trigger results producer and product are generated and managed here.
00015   This class also manages endpaths and calls to endjob and beginjob.
00016   Endpaths are just treated as a simple list of modules that need to
00017   do processing of the event and do not participate in trigger path
00018   activities.
00019 
00020   This class requires the high-level process pset.  It uses @process_name.
00021   If the high-level pset contains an "options" pset, then the
00022   following optional parameter can be present:
00023   bool wantSummary = true/false   # default false
00024 
00025   wantSummary indicates whether or not the pass/fail/error stats
00026   for modules and paths should be printed at the end-of-job.
00027 
00028   A TriggerResults object will always be inserted into the event
00029   for any schedule.  The producer of the TriggerResults EDProduct
00030   is always the first module in the endpath.  The TriggerResultInserter
00031   is given a fixed label of "TriggerResults".
00032 
00033   Processing of an event happens by pushing the event through the Paths.
00034   The scheduler performs the reset() on each of the workers independent
00035   of the Path objects.
00036 
00037   ------------------------
00038 
00039   About Paths:
00040   Paths fit into two categories:
00041   1) trigger paths that contribute directly to saved trigger bits
00042   2) end paths
00043   The Schedule holds these paths in two data structures:
00044   1) main path list
00045   2) end path list
00046 
00047   Trigger path processing always precedes endpath processing.
00048   The order of the paths from the input configuration is
00049   preserved in the main paths list.
00050 
00051   ------------------------
00052 
00053   The Schedule uses the TriggerNamesService to get the names of the
00054   trigger paths and end paths. When a TriggerResults object is created
00055   the results are stored in the same order as the trigger names from
00056   TriggerNamesService.
00057 
00058 */
00059 
00060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
00061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00062 #include "FWCore/Framework/interface/Actions.h"
00063 #include "FWCore/Framework/interface/EventPrincipal.h"
00064 #include "FWCore/Framework/interface/Frameworkfwd.h"
00065 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00066 #include "FWCore/Framework/interface/UnscheduledHandler.h"
00067 #include "FWCore/Framework/src/Path.h"
00068 #include "FWCore/Framework/src/RunStopwatch.h"
00069 #include "FWCore/Framework/src/Worker.h"
00070 #include "FWCore/Framework/src/WorkerRegistry.h"
00071 #include "FWCore/Framework/src/EarlyDeleteHelper.h"
00072 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
00073 #include "FWCore/MessageLogger/interface/JobReport.h"
00074 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00075 #include "FWCore/ServiceRegistry/interface/Service.h"
00076 #include "FWCore/Utilities/interface/Algorithms.h"
00077 #include "FWCore/Utilities/interface/BranchType.h"
00078 #include "FWCore/Utilities/interface/ConvertException.h"
00079 #include "FWCore/Utilities/interface/Exception.h"
00080 
00081 #include "boost/shared_ptr.hpp"
00082 
00083 #include <map>
00084 #include <memory>
00085 #include <set>
00086 #include <string>
00087 #include <vector>
00088 #include <sstream>
00089 
00090 namespace edm {
00091   namespace service {
00092     class TriggerNamesService;
00093   }
00094   class ActivityRegistry;
00095   class EventSetup;
00096   class ExceptionCollector;
00097   class OutputWorker;
00098   class RunStopwatch;
00099   class UnscheduledCallProducer;
00100   class WorkerInPath;
00101   class Schedule {
00102   public:
00103     typedef std::vector<std::string> vstring;
00104     typedef std::vector<Path> TrigPaths;
00105     typedef std::vector<Path> NonTrigPaths;
00106     typedef boost::shared_ptr<HLTGlobalStatus> TrigResPtr;
00107     typedef boost::shared_ptr<Worker> WorkerPtr;
00108     typedef std::vector<Worker*> AllWorkers;
00109     typedef std::vector<OutputWorker*> AllOutputWorkers;
00110 
00111     typedef std::vector<Worker*> Workers;
00112 
00113     typedef std::vector<WorkerInPath> PathWorkers;
00114 
00115     Schedule(ParameterSet& proc_pset,
00116              service::TriggerNamesService& tns,
00117              ProductRegistry& pregistry,
00118              ActionTable const& actions,
00119              boost::shared_ptr<ActivityRegistry> areg,
00120              boost::shared_ptr<ProcessConfiguration> processConfiguration,
00121              const ParameterSet* subProcPSet);
00122 
00123     enum State { Ready = 0, Running, Latched };
00124 
00125     template <typename T>
00126     void processOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& eventSetup);
00127 
00128     void beginJob();
00129     void endJob(ExceptionCollector & collector);
00130 
00131     // Write the luminosity block
00132     void writeLumi(LuminosityBlockPrincipal const& lbp);
00133 
00134     // Write the run
00135     void writeRun(RunPrincipal const& rp);
00136 
00137     // Call closeFile() on all OutputModules.
00138     void closeOutputFiles();
00139 
00140     // Call openNewFileIfNeeded() on all OutputModules
00141     void openNewOutputFilesIfNeeded();
00142 
00143     // Call openFiles() on all OutputModules
00144     void openOutputFiles(FileBlock& fb);
00145 
00146     // Call respondToOpenInputFile() on all Modules
00147     void respondToOpenInputFile(FileBlock const& fb);
00148 
00149     // Call respondToCloseInputFile() on all Modules
00150     void respondToCloseInputFile(FileBlock const& fb);
00151 
00152     // Call respondToOpenOutputFiles() on all Modules
00153     void respondToOpenOutputFiles(FileBlock const& fb);
00154 
00155     // Call respondToCloseOutputFiles() on all Modules
00156     void respondToCloseOutputFiles(FileBlock const& fb);
00157 
00158     // Call shouldWeCloseFile() on all OutputModules.
00159     bool shouldWeCloseOutput() const;
00160 
00161     void preForkReleaseResources();
00162     void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren);
00163 
00164     std::pair<double, double> timeCpuReal() const {
00165       return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
00166     }
00167 
00170 
00174     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
00175 
00177     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
00178 
00180     void modulesInPath(std::string const& iPathLabel,
00181                        std::vector<std::string>& oLabelsToFill) const;
00182 
00186     int totalEvents() const {
00187       return total_events_;
00188     }
00189 
00192     int totalEventsPassed() const {
00193       return total_passed_;
00194     }
00195 
00198     int totalEventsFailed() const {
00199       return totalEvents() - totalEventsPassed();
00200     }
00201 
00204     void enableEndPaths(bool active);
00205 
00208     bool endPathsEnabled() const;
00209 
00212     void getTriggerReport(TriggerReport& rep) const;
00213 
00215     bool terminate() const;
00216 
00218     void clearCounters();
00219 
00222     bool changeModule(std::string const& iLabel, ParameterSet const& iPSet);
00223 
00224   private:
00225 
00226     AllWorkers::const_iterator workersBegin() const {
00227       return all_workers_.begin();
00228     }
00229 
00230     AllWorkers::const_iterator workersEnd() const {
00231       return all_workers_.end();
00232     }
00233 
00234     AllWorkers::iterator workersBegin() {
00235       return  all_workers_.begin();
00236     }
00237 
00238     AllWorkers::iterator workersEnd() {
00239       return all_workers_.end();
00240     }
00241 
00242     void resetAll();
00243 
00244     template <typename T>
00245     bool runTriggerPaths(typename T::MyPrincipal&, EventSetup const&);
00246 
00247     template <typename T>
00248     void runEndPaths(typename T::MyPrincipal&, EventSetup const&);
00249 
00250     void setupOnDemandSystem(EventPrincipal& principal, EventSetup const& es);
00251 
00252     void reportSkipped(EventPrincipal const& ep) const;
00253     void reportSkipped(LuminosityBlockPrincipal const&) const {}
00254     void reportSkipped(RunPrincipal const&) const {}
00255 
00256     void reduceParameterSet(ParameterSet& proc_pset,
00257                             vstring& modulesInConfig,
00258                             std::set<std::string> const& modulesInConfigSet,
00259                             vstring& labelsOnTriggerPaths,
00260                             vstring& shouldBeUsedLabels,
00261                             std::map<std::string, std::vector<std::pair<std::string, int> > >& outputModulePathPositions);
00262 
00263     void fillWorkers(ParameterSet& proc_pset,
00264                      ProductRegistry& preg,
00265                      boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00266                      std::string const& name, bool ignoreFilters, PathWorkers& out,
00267                      vstring* labelsOnPaths);
00268     void fillTrigPath(ParameterSet& proc_pset,
00269                       ProductRegistry& preg,
00270                       boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00271                       int bitpos, std::string const& name, TrigResPtr,
00272                       vstring* labelsOnTriggerPaths);
00273     void fillEndPath(ParameterSet& proc_pset,
00274                      ProductRegistry& preg,
00275                      boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00276                      int bitpos, std::string const& name);
00277 
00278     void limitOutput(ParameterSet const& proc_pset);
00279 
00280     void addToAllWorkers(Worker* w);
00281     
00282     void resetEarlyDelete();
00283     void initializeEarlyDelete(edm::ParameterSet const& opts,
00284                                edm::ProductRegistry const& preg, 
00285                                edm::ParameterSet const* subProcPSet);
00286 
00287     WorkerRegistry                                worker_reg_;
00288     ActionTable const*                            act_table_;
00289     boost::shared_ptr<ActivityRegistry>           actReg_;
00290 
00291     State                    state_;
00292     vstring                  trig_name_list_;
00293     vstring                  end_path_name_list_;
00294 
00295     TrigResPtr               results_;
00296     TrigResPtr               endpath_results_;
00297 
00298     WorkerPtr                results_inserter_;
00299     AllWorkers               all_workers_;
00300     AllOutputWorkers         all_output_workers_;
00301     TrigPaths                trig_paths_;
00302     TrigPaths                end_paths_;
00303 
00304     //For each branch that has been marked for early deletion
00305     // keep track of how many modules are left that read this data but have
00306     // not yet been run in this event
00307     std::vector<std::pair<BranchID,unsigned int>> earlyDeleteBranchToCount_;
00308     //NOTE the following is effectively internal data for each EarlyDeleteHelper
00309     // but putting it into one vector makes for better allocation as well as
00310     // faster iteration when used to reset the earlyDeleteBranchToCount_
00311     // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
00312     // of this vector correspond to indexes into earlyDeleteBranchToCount_ so 
00313     // tell which EarlyDeleteHelper is associated with which BranchIDs.
00314     std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
00315     //There is one EarlyDeleteHelper per Module which are reading data that
00316     // has been marked for early deletion
00317     std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
00318 
00319     bool                           wantSummary_;
00320     int                            total_events_;
00321     int                            total_passed_;
00322     RunStopwatch::StopwatchPointer stopwatch_;
00323 
00324     boost::shared_ptr<UnscheduledCallProducer> unscheduled_;
00325 
00326     volatile bool           endpathsAreActive_;
00327 
00328     bool                    printedFirstException_;
00329   };
00330 
00331   // -----------------------------
00332   // ProcessOneOccurrence is a functor that has bound a specific
00333   // Principal and Event Setup, and can be called with a Path, to
00334   // execute Path::processOneOccurrence for that event
00335 
00336   template <typename T>
00337   class ProcessOneOccurrence {
00338   public:
00339     typedef void result_type;
00340     ProcessOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& setup) :
00341       ep(principal), es(setup) {};
00342 
00343       void operator()(Path& p) {p.processOneOccurrence<T>(ep, es);}
00344 
00345   private:
00346     typename T::MyPrincipal&   ep;
00347     EventSetup const& es;
00348   };
00349 
00350   class UnscheduledCallProducer : public UnscheduledHandler {
00351   public:
00352     UnscheduledCallProducer() : UnscheduledHandler(), labelToWorkers_() {}
00353     void addWorker(Worker* aWorker) {
00354       assert(0 != aWorker);
00355       labelToWorkers_[aWorker->description().moduleLabel()] = aWorker;
00356     }
00357 
00358     template <typename T>
00359     void runNow(typename T::MyPrincipal& p, EventSetup const& es) {
00360       //do nothing for event since we will run when requested
00361       if(!T::isEvent_) {
00362         for(std::map<std::string, Worker*>::iterator it = labelToWorkers_.begin(), itEnd=labelToWorkers_.end();
00363             it != itEnd;
00364             ++it) {
00365           CPUTimer timer;
00366           try {
00367             it->second->doWork<T>(p, es, 0, &timer);
00368           }
00369           catch (cms::Exception & ex) {
00370             std::ostringstream ost;
00371             if (T::isEvent_) {
00372               ost << "Calling event method";
00373             }
00374             else if (T::begin_ && T::branchType_ == InRun) {
00375               ost << "Calling beginRun";
00376             }
00377             else if (T::begin_ && T::branchType_ == InLumi) {
00378               ost << "Calling beginLuminosityBlock";
00379             }
00380             else if (!T::begin_ && T::branchType_ == InLumi) {
00381               ost << "Calling endLuminosityBlock";
00382             }
00383             else if (!T::begin_ && T::branchType_ == InRun) {
00384               ost << "Calling endRun";
00385             }
00386             else {
00387               // It should be impossible to get here ...
00388               ost << "Calling unknown function";
00389             }
00390             ost << " for unscheduled module " << it->second->description().moduleName()
00391                 << "/'" << it->second->description().moduleLabel() << "'";
00392             ex.addContext(ost.str());
00393             ost.str("");
00394             ost << "Processing " << p.id();
00395             ex.addContext(ost.str());
00396             throw;
00397           }
00398         }
00399       }
00400     }
00401 
00402   private:
00403     virtual bool tryToFillImpl(std::string const& moduleLabel,
00404                                EventPrincipal& event,
00405                                EventSetup const& eventSetup,
00406                                CurrentProcessingContext const* iContext) {
00407       std::map<std::string, Worker*>::const_iterator itFound =
00408         labelToWorkers_.find(moduleLabel);
00409       if(itFound != labelToWorkers_.end()) {
00410         CPUTimer timer;
00411         try {
00412           itFound->second->doWork<OccurrenceTraits<EventPrincipal, BranchActionBegin> >(event, eventSetup, iContext, &timer);
00413         }
00414         catch (cms::Exception & ex) {
00415           std::ostringstream ost;
00416           ost << "Calling produce method for unscheduled module " 
00417               <<  itFound->second->description().moduleName() << "/'"
00418               << itFound->second->description().moduleLabel() << "'";
00419           ex.addContext(ost.str());
00420           throw;
00421         }
00422         return true;
00423       }
00424       return false;
00425     }
00426     std::map<std::string, Worker*> labelToWorkers_;
00427   };
00428 
00429   void
00430   inline
00431   Schedule::reportSkipped(EventPrincipal const& ep) const {
00432     Service<JobReport> reportSvc;
00433     reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
00434   }
00435 
00436   template <typename T>
00437   void
00438   Schedule::processOneOccurrence(typename T::MyPrincipal& ep, EventSetup const& es) {
00439     this->resetAll();
00440     state_ = Running;
00441 
00442     // A RunStopwatch, but only if we are processing an event.
00443     RunStopwatch stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer());
00444 
00445     if (T::isEvent_) {
00446       ++total_events_;
00447       setupOnDemandSystem(dynamic_cast<EventPrincipal&>(ep), es);
00448     }
00449     try {
00450       try {
00451         try {
00452           //make sure the unscheduled items see this transition [Event will be a no-op]
00453           unscheduled_->runNow<T>(ep, es);
00454           if (runTriggerPaths<T>(ep, es)) {
00455             if (T::isEvent_) ++total_passed_;
00456           }
00457           state_ = Latched;
00458         }
00459         catch(cms::Exception& e) {
00460           actions::ActionCodes action = (T::isEvent_ ? act_table_->find(e.category()) : actions::Rethrow);
00461           assert (action != actions::IgnoreCompletely);
00462           assert (action != actions::FailPath);
00463           if (action == actions::SkipEvent) {
00464             edm::printCmsExceptionWarning("SkipEvent", e);
00465           } else {
00466             throw;
00467           }
00468         }
00469 
00470         try {
00471           CPUTimer timer;
00472           if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, 0, &timer);
00473         }
00474         catch (cms::Exception & ex) {
00475           if (T::isEvent_) {
00476             ex.addContext("Calling produce method for module TriggerResultInserter");
00477           }
00478           std::ostringstream ost;
00479           ost << "Processing " << ep.id();
00480           ex.addContext(ost.str());
00481           throw;
00482         }
00483 
00484         if (endpathsAreActive_) runEndPaths<T>(ep, es);
00485         if(T::isEvent_) resetEarlyDelete();
00486       }
00487       catch (cms::Exception& e) { throw; }
00488       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00489       catch (std::exception& e) { convertException::stdToEDM(e); }
00490       catch(std::string& s) { convertException::stringToEDM(s); }
00491       catch(char const* c) { convertException::charPtrToEDM(c); }
00492       catch (...) { convertException::unknownToEDM(); }
00493     }
00494     catch(cms::Exception& ex) {
00495       if (!printedFirstException_) {
00496         Service<JobReport> jobReportSvc;
00497         if (ex.context().empty()) {
00498           ex.addContext("Calling function Schedule::processOneOccurrence");
00499         }
00500         if (jobReportSvc.isAvailable()) {
00501           JobReport *jobRep = jobReportSvc.operator->();
00502           edm::printCmsException(ex, jobRep, ex.returnCode());
00503         }
00504         else {
00505           edm::printCmsException(ex);
00506         }
00507         ex.setAlreadyPrinted(true);
00508         printedFirstException_ = true;
00509       }
00510       state_ = Ready;
00511       throw;
00512     }
00513     // next thing probably is not needed, the product insertion code clears it
00514     state_ = Ready;
00515   }
00516 
00517   template <typename T>
00518   bool
00519   Schedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00520     for_all(trig_paths_, ProcessOneOccurrence<T>(ep, es));
00521     return results_->accept();
00522   }
00523 
00524   template <typename T>
00525   void
00526   Schedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00527     // Note there is no state-checking safety controlling the
00528     // activation/deactivation of endpaths.
00529     for_all(end_paths_, ProcessOneOccurrence<T>(ep, es));
00530 
00531     // We could get rid of the functor ProcessOneOccurrence if we used
00532     // boost::lambda, but the use of lambda with member functions
00533     // which take multiple arguments, by both non-const and const
00534     // reference, seems much more obscure...
00535     //
00536     // using namespace boost::lambda;
00537     // for_all(end_paths_,
00538     //          bind(&Path::processOneOccurrence,
00539     //               boost::lambda::_1, // qualification to avoid ambiguity
00540     //               var(ep),           //  pass by reference (not copy)
00541     //               constant_ref(es))); // pass by const-reference (not copy)
00542   }
00543 }
00544 
00545 #endif