CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_7/src/FWCore/Framework/interface/Schedule.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_Schedule_h
00002 #define FWCore_Framework_Schedule_h
00003 
00004 /*
00005   Author: Jim Kowalkowski  28-01-06
00006 
00007   A class for creating a schedule based on paths in the configuration file.
00008   The schedule is maintained as a sequence of paths.
00009   After construction, events can be fed to the object and passed through
00010   all the modules in the schedule.  All accounting about processing
00011   of events by modules and paths is contained here or in object held
00012   by containment.
00013 
00014   The trigger results producer and product are generated and managed here.
00015   This class also manages endpaths and calls to endjob and beginjob.
00016   Endpaths are just treated as a simple list of modules that need to
00017   do processing of the event and do not participate in trigger path
00018   activities.
00019 
00020   This class requires the high-level process pset.  It uses @process_name.
00021   If the high-level pset contains an "options" pset, then the
00022   following optional parameter can be present:
00023   bool wantSummary = true/false   # default false
00024 
00025   wantSummary indicates whether or not the pass/fail/error stats
00026   for modules and paths should be printed at the end-of-job.
00027 
00028   A TriggerResults object will always be inserted into the event
00029   for any schedule.  The producer of the TriggerResults EDProduct
00030   is always the first module in the endpath.  The TriggerResultInserter
00031   is given a fixed label of "TriggerResults".
00032 
00033   Processing of an event happens by pushing the event through the Paths.
00034   The scheduler performs the reset() on each of the workers independent
00035   of the Path objects.
00036 
00037   ------------------------
00038 
00039   About Paths:
00040   Paths fit into two categories:
00041   1) trigger paths that contribute directly to saved trigger bits
00042   2) end paths
00043   The Schedule holds these paths in two data structures:
00044   1) main path list
00045   2) end path list
00046 
00047   Trigger path processing always precedes endpath processing.
00048   The order of the paths from the input configuration is
00049   preserved in the main paths list.
00050 
00051   ------------------------
00052 
00053   The Schedule uses the TriggerNamesService to get the names of the
00054   trigger paths and end paths. When a TriggerResults object is created
00055   the results are stored in the same order as the trigger names from
00056   TriggerNamesService.
00057 
00058 */
00059 
00060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
00061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00062 #include "FWCore/Framework/interface/Actions.h"
00063 #include "FWCore/Framework/interface/EventPrincipal.h"
00064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
00065 #include "FWCore/Framework/interface/Frameworkfwd.h"
00066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00067 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
00068 #include "FWCore/Framework/src/Path.h"
00069 #include "FWCore/Framework/src/RunStopwatch.h"
00070 #include "FWCore/Framework/src/Worker.h"
00071 #include "FWCore/Framework/src/WorkerRegistry.h"
00072 #include "FWCore/Framework/src/EarlyDeleteHelper.h"
00073 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
00074 #include "FWCore/MessageLogger/interface/JobReport.h"
00075 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00076 #include "FWCore/ServiceRegistry/interface/Service.h"
00077 #include "FWCore/Utilities/interface/Algorithms.h"
00078 #include "FWCore/Utilities/interface/BranchType.h"
00079 #include "FWCore/Utilities/interface/ConvertException.h"
00080 #include "FWCore/Utilities/interface/Exception.h"
00081 
00082 #include "boost/shared_ptr.hpp"
00083 
00084 #include <map>
00085 #include <memory>
00086 #include <set>
00087 #include <string>
00088 #include <vector>
00089 #include <sstream>
00090 
00091 namespace edm {
00092   namespace service {
00093     class TriggerNamesService;
00094   }
00095   class ActivityRegistry;
00096   class BranchIDListHelper;
00097   class EventSetup;
00098   class ExceptionCollector;
00099   class OutputWorker;
00100   class RunStopwatch;
00101   class UnscheduledCallProducer;
00102   class WorkerInPath;
00103   class Schedule {
00104   public:
00105     typedef std::vector<std::string> vstring;
00106     typedef std::vector<Path> TrigPaths;
00107     typedef std::vector<Path> NonTrigPaths;
00108     typedef boost::shared_ptr<HLTGlobalStatus> TrigResPtr;
00109     typedef boost::shared_ptr<Worker> WorkerPtr;
00110     typedef std::vector<Worker*> AllWorkers;
00111     typedef std::vector<OutputWorker*> AllOutputWorkers;
00112 
00113     typedef std::vector<Worker*> Workers;
00114 
00115     typedef std::vector<WorkerInPath> PathWorkers;
00116 
00117     Schedule(ParameterSet& proc_pset,
00118              service::TriggerNamesService& tns,
00119              ProductRegistry& pregistry,
00120              BranchIDListHelper& branchIDListHelper,
00121              ActionTable const& actions,
00122              boost::shared_ptr<ActivityRegistry> areg,
00123              boost::shared_ptr<ProcessConfiguration> processConfiguration,
00124              const ParameterSet* subProcPSet);
00125 
00126     enum State { Ready = 0, Running, Latched };
00127 
00128     template <typename T>
00129     void processOneOccurrence(typename T::MyPrincipal& principal,
00130                               EventSetup const& eventSetup,
00131                               bool cleaningUpAfterException = false);
00132 
00133     void beginJob(ProductRegistry const&);
00134     void endJob(ExceptionCollector & collector);
00135 
00136     // Write the luminosity block
00137     void writeLumi(LuminosityBlockPrincipal const& lbp);
00138 
00139     // Write the run
00140     void writeRun(RunPrincipal const& rp);
00141 
00142     // Call closeFile() on all OutputModules.
00143     void closeOutputFiles();
00144 
00145     // Call openNewFileIfNeeded() on all OutputModules
00146     void openNewOutputFilesIfNeeded();
00147 
00148     // Call openFiles() on all OutputModules
00149     void openOutputFiles(FileBlock& fb);
00150 
00151     // Call respondToOpenInputFile() on all Modules
00152     void respondToOpenInputFile(FileBlock const& fb);
00153 
00154     // Call respondToCloseInputFile() on all Modules
00155     void respondToCloseInputFile(FileBlock const& fb);
00156 
00157     // Call respondToOpenOutputFiles() on all Modules
00158     void respondToOpenOutputFiles(FileBlock const& fb);
00159 
00160     // Call respondToCloseOutputFiles() on all Modules
00161     void respondToCloseOutputFiles(FileBlock const& fb);
00162 
00163     // Call shouldWeCloseFile() on all OutputModules.
00164     bool shouldWeCloseOutput() const;
00165 
00166     void preForkReleaseResources();
00167     void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren);
00168 
00169     std::pair<double, double> timeCpuReal() const {
00170       return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
00171     }
00172 
00175 
00179     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
00180 
00182     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
00183 
00185     void modulesInPath(std::string const& iPathLabel,
00186                        std::vector<std::string>& oLabelsToFill) const;
00187 
00191     int totalEvents() const {
00192       return total_events_;
00193     }
00194 
00197     int totalEventsPassed() const {
00198       return total_passed_;
00199     }
00200 
00203     int totalEventsFailed() const {
00204       return totalEvents() - totalEventsPassed();
00205     }
00206 
00209     void enableEndPaths(bool active);
00210 
00213     bool endPathsEnabled() const;
00214 
00217     void getTriggerReport(TriggerReport& rep) const;
00218 
00220     bool terminate() const;
00221 
00223     void clearCounters();
00224 
00227     bool changeModule(std::string const& iLabel, ParameterSet const& iPSet);
00228 
00229   private:
00230 
00231     AllWorkers::const_iterator workersBegin() const {
00232       return all_workers_.begin();
00233     }
00234 
00235     AllWorkers::const_iterator workersEnd() const {
00236       return all_workers_.end();
00237     }
00238 
00239     AllWorkers::iterator workersBegin() {
00240       return  all_workers_.begin();
00241     }
00242 
00243     AllWorkers::iterator workersEnd() {
00244       return all_workers_.end();
00245     }
00246 
00247     void resetAll();
00248 
00249     template <typename T>
00250     bool runTriggerPaths(typename T::MyPrincipal&, EventSetup const&);
00251 
00252     template <typename T>
00253     void runEndPaths(typename T::MyPrincipal&, EventSetup const&);
00254 
00255     void setupOnDemandSystem(EventPrincipal& principal, EventSetup const& es);
00256 
00257     void reportSkipped(EventPrincipal const& ep) const;
00258     void reportSkipped(LuminosityBlockPrincipal const&) const {}
00259     void reportSkipped(RunPrincipal const&) const {}
00260 
00261     void reduceParameterSet(ParameterSet& proc_pset,
00262                             vstring& modulesInConfig,
00263                             std::set<std::string> const& modulesInConfigSet,
00264                             vstring& labelsOnTriggerPaths,
00265                             vstring& shouldBeUsedLabels,
00266                             std::map<std::string, std::vector<std::pair<std::string, int> > >& outputModulePathPositions);
00267 
00268     void fillWorkers(ParameterSet& proc_pset,
00269                      ProductRegistry& preg,
00270                      boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00271                      std::string const& name, bool ignoreFilters, PathWorkers& out,
00272                      vstring* labelsOnPaths);
00273     void fillTrigPath(ParameterSet& proc_pset,
00274                       ProductRegistry& preg,
00275                       boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00276                       int bitpos, std::string const& name, TrigResPtr,
00277                       vstring* labelsOnTriggerPaths);
00278     void fillEndPath(ParameterSet& proc_pset,
00279                      ProductRegistry& preg,
00280                      boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00281                      int bitpos, std::string const& name);
00282 
00283     void limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists);
00284 
00285     void addToAllWorkers(Worker* w);
00286     
00287     void resetEarlyDelete();
00288     void initializeEarlyDelete(edm::ParameterSet const& opts,
00289                                edm::ProductRegistry const& preg, 
00290                                edm::ParameterSet const* subProcPSet);
00291 
00292     WorkerRegistry                                worker_reg_;
00293     ActionTable const*                            act_table_;
00294     boost::shared_ptr<ActivityRegistry>           actReg_;
00295 
00296     State                    state_;
00297     vstring                  trig_name_list_;
00298     vstring                  end_path_name_list_;
00299 
00300     TrigResPtr               results_;
00301     TrigResPtr               endpath_results_;
00302 
00303     WorkerPtr                results_inserter_;
00304     AllWorkers               all_workers_;
00305     AllOutputWorkers         all_output_workers_;
00306     TrigPaths                trig_paths_;
00307     TrigPaths                end_paths_;
00308     std::vector<int>         empty_trig_paths_;
00309     vstring                  empty_trig_path_names_;
00310 
00311     //For each branch that has been marked for early deletion
00312     // keep track of how many modules are left that read this data but have
00313     // not yet been run in this event
00314     std::vector<std::pair<BranchID,unsigned int>> earlyDeleteBranchToCount_;
00315     //NOTE the following is effectively internal data for each EarlyDeleteHelper
00316     // but putting it into one vector makes for better allocation as well as
00317     // faster iteration when used to reset the earlyDeleteBranchToCount_
00318     // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
00319     // of this vector correspond to indexes into earlyDeleteBranchToCount_ so 
00320     // tell which EarlyDeleteHelper is associated with which BranchIDs.
00321     std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
00322     //There is one EarlyDeleteHelper per Module which are reading data that
00323     // has been marked for early deletion
00324     std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
00325 
00326     bool                           wantSummary_;
00327     int                            total_events_;
00328     int                            total_passed_;
00329     RunStopwatch::StopwatchPointer stopwatch_;
00330 
00331     boost::shared_ptr<UnscheduledCallProducer> unscheduled_;
00332 
00333     volatile bool           endpathsAreActive_;
00334   };
00335 
00336   // -----------------------------
00337   // ProcessOneOccurrence is a functor that has bound a specific
00338   // Principal and Event Setup, and can be called with a Path, to
00339   // execute Path::processOneOccurrence for that event
00340 
00341   template <typename T>
00342   class ProcessOneOccurrence {
00343   public:
00344     typedef void result_type;
00345     ProcessOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& setup) :
00346       ep(principal), es(setup) {};
00347 
00348       void operator()(Path& p) {p.processOneOccurrence<T>(ep, es);}
00349 
00350   private:
00351     typename T::MyPrincipal&   ep;
00352     EventSetup const& es;
00353   };
00354 
00355   void
00356   inline
00357   Schedule::reportSkipped(EventPrincipal const& ep) const {
00358     Service<JobReport> reportSvc;
00359     reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
00360   }
00361 
00362   template <typename T>
00363   void
00364   Schedule::processOneOccurrence(typename T::MyPrincipal& ep,
00365                                  EventSetup const& es,
00366                                  bool cleaningUpAfterException) {
00367     this->resetAll();
00368     for (int empty_trig_path : empty_trig_paths_) {
00369       results_->at(empty_trig_path) = HLTPathStatus(hlt::Pass, 0);
00370     }
00371     state_ = Running;
00372 
00373     // A RunStopwatch, but only if we are processing an event.
00374     RunStopwatch stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer());
00375 
00376     if (T::isEvent_) {
00377       ++total_events_;
00378       setupOnDemandSystem(dynamic_cast<EventPrincipal&>(ep), es);
00379     }
00380     try {
00381       try {
00382         try {
00383           //make sure the unscheduled items see this transition [Event will be a no-op]
00384           unscheduled_->runNow<T>(ep, es);
00385           if (runTriggerPaths<T>(ep, es)) {
00386             if (T::isEvent_) ++total_passed_;
00387           }
00388           state_ = Latched;
00389         }
00390         catch(cms::Exception& e) {
00391           actions::ActionCodes action = (T::isEvent_ ? act_table_->find(e.category()) : actions::Rethrow);
00392           assert (action != actions::IgnoreCompletely);
00393           assert (action != actions::FailPath);
00394           if (action == actions::SkipEvent) {
00395             edm::printCmsExceptionWarning("SkipEvent", e);
00396           } else {
00397             throw;
00398           }
00399         }
00400 
00401         try {
00402           CPUTimer timer;
00403           if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, nullptr, &timer);
00404         }
00405         catch (cms::Exception & ex) {
00406           if (T::isEvent_) {
00407             ex.addContext("Calling produce method for module TriggerResultInserter");
00408           }
00409           std::ostringstream ost;
00410           ost << "Processing " << ep.id();
00411           ex.addContext(ost.str());
00412           throw;
00413         }
00414 
00415         if (endpathsAreActive_) runEndPaths<T>(ep, es);
00416         if(T::isEvent_) resetEarlyDelete();
00417       }
00418       catch (cms::Exception& e) { throw; }
00419       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00420       catch (std::exception& e) { convertException::stdToEDM(e); }
00421       catch(std::string& s) { convertException::stringToEDM(s); }
00422       catch(char const* c) { convertException::charPtrToEDM(c); }
00423       catch (...) { convertException::unknownToEDM(); }
00424     }
00425     catch(cms::Exception& ex) {
00426       if (ex.context().empty()) {
00427         addContextAndPrintException("Calling function Schedule::processOneOccurrence", ex, cleaningUpAfterException);
00428       } else {
00429         addContextAndPrintException("", ex, cleaningUpAfterException);
00430       }
00431       state_ = Ready;
00432       throw;
00433     }
00434     // next thing probably is not needed, the product insertion code clears it
00435     state_ = Ready;
00436   }
00437 
00438   template <typename T>
00439   bool
00440   Schedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00441     for_all(trig_paths_, ProcessOneOccurrence<T>(ep, es));
00442     return results_->accept();
00443   }
00444 
00445   template <typename T>
00446   void
00447   Schedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00448     // Note there is no state-checking safety controlling the
00449     // activation/deactivation of endpaths.
00450     for_all(end_paths_, ProcessOneOccurrence<T>(ep, es));
00451 
00452     // We could get rid of the functor ProcessOneOccurrence if we used
00453     // boost::lambda, but the use of lambda with member functions
00454     // which take multiple arguments, by both non-const and const
00455     // reference, seems much more obscure...
00456     //
00457     // using namespace boost::lambda;
00458     // for_all(end_paths_,
00459     //          bind(&Path::processOneOccurrence,
00460     //               boost::lambda::_1, // qualification to avoid ambiguity
00461     //               var(ep),           //  pass by reference (not copy)
00462     //               constant_ref(es))); // pass by const-reference (not copy)
00463   }
00464 }
00465 
00466 #endif