CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_10_patch1/src/FWCore/Framework/interface/Schedule.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_Schedule_h
00002 #define FWCore_Framework_Schedule_h
00003 
00004 /*
00005   Author: Jim Kowalkowski  28-01-06
00006 
00007   A class for creating a schedule based on paths in the configuration file.
00008   The schedule is maintained as a sequence of paths.
00009   After construction, events can be fed to the object and passed through
00010   all the modules in the schedule.  All accounting about processing
00011   of events by modules and paths is contained here or in object held
00012   by containment.
00013 
00014   The trigger results producer and product are generated and managed here.
00015   This class also manages endpaths and calls to endjob and beginjob.
00016   Endpaths are just treated as a simple list of modules that need to
00017   do processing of the event and do not participate in trigger path
00018   activities.
00019 
00020   This class requires the high-level process pset.  It uses @process_name.
00021   If the high-level pset contains an "options" pset, then the
00022   following optional parameter can be present:
00023   bool wantSummary = true/false   # default false
00024 
00025   wantSummary indicates whether or not the pass/fail/error stats
00026   for modules and paths should be printed at the end-of-job.
00027 
00028   A TriggerResults object will always be inserted into the event
00029   for any schedule.  The producer of the TriggerResults EDProduct
00030   is always the first module in the endpath.  The TriggerResultInserter
00031   is given a fixed label of "TriggerResults".
00032 
00033   Processing of an event happens by pushing the event through the Paths.
00034   The scheduler performs the reset() on each of the workers independent
00035   of the Path objects.
00036 
00037   ------------------------
00038 
00039   About Paths:
00040   Paths fit into two categories:
00041   1) trigger paths that contribute directly to saved trigger bits
00042   2) end paths
00043   The Schedule holds these paths in two data structures:
00044   1) main path list
00045   2) end path list
00046 
00047   Trigger path processing always precedes endpath processing.
00048   The order of the paths from the input configuration is
00049   preserved in the main paths list.
00050 
00051   ------------------------
00052 
00053   The Schedule uses the TriggerNamesService to get the names of the
00054   trigger paths and end paths. When a TriggerResults object is created
00055   the results are stored in the same order as the trigger names from
00056   TriggerNamesService.
00057 
00058 */
00059 
00060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
00061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00062 #include "FWCore/Framework/interface/Actions.h"
00063 #include "FWCore/Framework/interface/EventPrincipal.h"
00064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
00065 #include "FWCore/Framework/interface/Frameworkfwd.h"
00066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00067 #include "FWCore/Framework/interface/UnscheduledHandler.h"
00068 #include "FWCore/Framework/src/Path.h"
00069 #include "FWCore/Framework/src/RunStopwatch.h"
00070 #include "FWCore/Framework/src/Worker.h"
00071 #include "FWCore/Framework/src/WorkerRegistry.h"
00072 #include "FWCore/Framework/src/EarlyDeleteHelper.h"
00073 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
00074 #include "FWCore/MessageLogger/interface/JobReport.h"
00075 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00076 #include "FWCore/ServiceRegistry/interface/Service.h"
00077 #include "FWCore/Utilities/interface/Algorithms.h"
00078 #include "FWCore/Utilities/interface/BranchType.h"
00079 #include "FWCore/Utilities/interface/ConvertException.h"
00080 #include "FWCore/Utilities/interface/Exception.h"
00081 
00082 #include "boost/shared_ptr.hpp"
00083 
00084 #include <map>
00085 #include <memory>
00086 #include <set>
00087 #include <string>
00088 #include <vector>
00089 #include <sstream>
00090 
00091 namespace edm {
00092   namespace service {
00093     class TriggerNamesService;
00094   }
00095   class ActivityRegistry;
00096   class EventSetup;
00097   class ExceptionCollector;
00098   class OutputWorker;
00099   class RunStopwatch;
00100   class UnscheduledCallProducer;
00101   class WorkerInPath;
00102   class Schedule {
00103   public:
00104     typedef std::vector<std::string> vstring;
00105     typedef std::vector<Path> TrigPaths;
00106     typedef std::vector<Path> NonTrigPaths;
00107     typedef boost::shared_ptr<HLTGlobalStatus> TrigResPtr;
00108     typedef boost::shared_ptr<Worker> WorkerPtr;
00109     typedef std::vector<Worker*> AllWorkers;
00110     typedef std::vector<OutputWorker*> AllOutputWorkers;
00111 
00112     typedef std::vector<Worker*> Workers;
00113 
00114     typedef std::vector<WorkerInPath> PathWorkers;
00115 
00116     Schedule(ParameterSet& proc_pset,
00117              service::TriggerNamesService& tns,
00118              ProductRegistry& pregistry,
00119              ActionTable const& actions,
00120              boost::shared_ptr<ActivityRegistry> areg,
00121              boost::shared_ptr<ProcessConfiguration> processConfiguration,
00122              const ParameterSet* subProcPSet);
00123 
00124     enum State { Ready = 0, Running, Latched };
00125 
00126     template <typename T>
00127     void processOneOccurrence(typename T::MyPrincipal& principal,
00128                               EventSetup const& eventSetup,
00129                               bool cleaningUpAfterException = false);
00130 
00131     void beginJob();
00132     void endJob(ExceptionCollector & collector);
00133 
00134     // Write the luminosity block
00135     void writeLumi(LuminosityBlockPrincipal const& lbp);
00136 
00137     // Write the run
00138     void writeRun(RunPrincipal const& rp);
00139 
00140     // Call closeFile() on all OutputModules.
00141     void closeOutputFiles();
00142 
00143     // Call openNewFileIfNeeded() on all OutputModules
00144     void openNewOutputFilesIfNeeded();
00145 
00146     // Call openFiles() on all OutputModules
00147     void openOutputFiles(FileBlock& fb);
00148 
00149     // Call respondToOpenInputFile() on all Modules
00150     void respondToOpenInputFile(FileBlock const& fb);
00151 
00152     // Call respondToCloseInputFile() on all Modules
00153     void respondToCloseInputFile(FileBlock const& fb);
00154 
00155     // Call respondToOpenOutputFiles() on all Modules
00156     void respondToOpenOutputFiles(FileBlock const& fb);
00157 
00158     // Call respondToCloseOutputFiles() on all Modules
00159     void respondToCloseOutputFiles(FileBlock const& fb);
00160 
00161     // Call shouldWeCloseFile() on all OutputModules.
00162     bool shouldWeCloseOutput() const;
00163 
00164     void preForkReleaseResources();
00165     void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren);
00166 
00167     std::pair<double, double> timeCpuReal() const {
00168       return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
00169     }
00170 
00173 
00177     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
00178 
00180     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
00181 
00183     void modulesInPath(std::string const& iPathLabel,
00184                        std::vector<std::string>& oLabelsToFill) const;
00185 
00189     int totalEvents() const {
00190       return total_events_;
00191     }
00192 
00195     int totalEventsPassed() const {
00196       return total_passed_;
00197     }
00198 
00201     int totalEventsFailed() const {
00202       return totalEvents() - totalEventsPassed();
00203     }
00204 
00207     void enableEndPaths(bool active);
00208 
00211     bool endPathsEnabled() const;
00212 
00215     void getTriggerReport(TriggerReport& rep) const;
00216 
00218     bool terminate() const;
00219 
00221     void clearCounters();
00222 
00225     bool changeModule(std::string const& iLabel, ParameterSet const& iPSet);
00226 
00227   private:
00228 
00229     AllWorkers::const_iterator workersBegin() const {
00230       return all_workers_.begin();
00231     }
00232 
00233     AllWorkers::const_iterator workersEnd() const {
00234       return all_workers_.end();
00235     }
00236 
00237     AllWorkers::iterator workersBegin() {
00238       return  all_workers_.begin();
00239     }
00240 
00241     AllWorkers::iterator workersEnd() {
00242       return all_workers_.end();
00243     }
00244 
00245     void resetAll();
00246 
00247     template <typename T>
00248     bool runTriggerPaths(typename T::MyPrincipal&, EventSetup const&);
00249 
00250     template <typename T>
00251     void runEndPaths(typename T::MyPrincipal&, EventSetup const&);
00252 
00253     void setupOnDemandSystem(EventPrincipal& principal, EventSetup const& es);
00254 
00255     void reportSkipped(EventPrincipal const& ep) const;
00256     void reportSkipped(LuminosityBlockPrincipal const&) const {}
00257     void reportSkipped(RunPrincipal const&) const {}
00258 
00259     void reduceParameterSet(ParameterSet& proc_pset,
00260                             vstring& modulesInConfig,
00261                             std::set<std::string> const& modulesInConfigSet,
00262                             vstring& labelsOnTriggerPaths,
00263                             vstring& shouldBeUsedLabels,
00264                             std::map<std::string, std::vector<std::pair<std::string, int> > >& outputModulePathPositions);
00265 
00266     void fillWorkers(ParameterSet& proc_pset,
00267                      ProductRegistry& preg,
00268                      boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00269                      std::string const& name, bool ignoreFilters, PathWorkers& out,
00270                      vstring* labelsOnPaths);
00271     void fillTrigPath(ParameterSet& proc_pset,
00272                       ProductRegistry& preg,
00273                       boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00274                       int bitpos, std::string const& name, TrigResPtr,
00275                       vstring* labelsOnTriggerPaths);
00276     void fillEndPath(ParameterSet& proc_pset,
00277                      ProductRegistry& preg,
00278                      boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00279                      int bitpos, std::string const& name);
00280 
00281     void limitOutput(ParameterSet const& proc_pset);
00282 
00283     void addToAllWorkers(Worker* w);
00284     
00285     void resetEarlyDelete();
00286     void initializeEarlyDelete(edm::ParameterSet const& opts,
00287                                edm::ProductRegistry const& preg, 
00288                                edm::ParameterSet const* subProcPSet);
00289 
00290     WorkerRegistry                                worker_reg_;
00291     ActionTable const*                            act_table_;
00292     boost::shared_ptr<ActivityRegistry>           actReg_;
00293 
00294     State                    state_;
00295     vstring                  trig_name_list_;
00296     vstring                  end_path_name_list_;
00297 
00298     TrigResPtr               results_;
00299     TrigResPtr               endpath_results_;
00300 
00301     WorkerPtr                results_inserter_;
00302     AllWorkers               all_workers_;
00303     AllOutputWorkers         all_output_workers_;
00304     TrigPaths                trig_paths_;
00305     TrigPaths                end_paths_;
00306 
00307     //For each branch that has been marked for early deletion
00308     // keep track of how many modules are left that read this data but have
00309     // not yet been run in this event
00310     std::vector<std::pair<BranchID,unsigned int>> earlyDeleteBranchToCount_;
00311     //NOTE the following is effectively internal data for each EarlyDeleteHelper
00312     // but putting it into one vector makes for better allocation as well as
00313     // faster iteration when used to reset the earlyDeleteBranchToCount_
00314     // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
00315     // of this vector correspond to indexes into earlyDeleteBranchToCount_ so 
00316     // tell which EarlyDeleteHelper is associated with which BranchIDs.
00317     std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
00318     //There is one EarlyDeleteHelper per Module which are reading data that
00319     // has been marked for early deletion
00320     std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
00321 
00322     bool                           wantSummary_;
00323     int                            total_events_;
00324     int                            total_passed_;
00325     RunStopwatch::StopwatchPointer stopwatch_;
00326 
00327     boost::shared_ptr<UnscheduledCallProducer> unscheduled_;
00328 
00329     volatile bool           endpathsAreActive_;
00330   };
00331 
00332   // -----------------------------
00333   // ProcessOneOccurrence is a functor that has bound a specific
00334   // Principal and Event Setup, and can be called with a Path, to
00335   // execute Path::processOneOccurrence for that event
00336 
00337   template <typename T>
00338   class ProcessOneOccurrence {
00339   public:
00340     typedef void result_type;
00341     ProcessOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& setup) :
00342       ep(principal), es(setup) {};
00343 
00344       void operator()(Path& p) {p.processOneOccurrence<T>(ep, es);}
00345 
00346   private:
00347     typename T::MyPrincipal&   ep;
00348     EventSetup const& es;
00349   };
00350 
00351   class UnscheduledCallProducer : public UnscheduledHandler {
00352   public:
00353     UnscheduledCallProducer() : UnscheduledHandler(), labelToWorkers_() {}
00354     void addWorker(Worker* aWorker) {
00355       assert(0 != aWorker);
00356       labelToWorkers_[aWorker->description().moduleLabel()] = aWorker;
00357     }
00358 
00359     template <typename T>
00360     void runNow(typename T::MyPrincipal& p, EventSetup const& es) {
00361       //do nothing for event since we will run when requested
00362       if(!T::isEvent_) {
00363         for(std::map<std::string, Worker*>::iterator it = labelToWorkers_.begin(), itEnd=labelToWorkers_.end();
00364             it != itEnd;
00365             ++it) {
00366           CPUTimer timer;
00367           try {
00368             it->second->doWork<T>(p, es, 0, &timer);
00369           }
00370           catch (cms::Exception & ex) {
00371             std::ostringstream ost;
00372             if (T::isEvent_) {
00373               ost << "Calling event method";
00374             }
00375             else if (T::begin_ && T::branchType_ == InRun) {
00376               ost << "Calling beginRun";
00377             }
00378             else if (T::begin_ && T::branchType_ == InLumi) {
00379               ost << "Calling beginLuminosityBlock";
00380             }
00381             else if (!T::begin_ && T::branchType_ == InLumi) {
00382               ost << "Calling endLuminosityBlock";
00383             }
00384             else if (!T::begin_ && T::branchType_ == InRun) {
00385               ost << "Calling endRun";
00386             }
00387             else {
00388               // It should be impossible to get here ...
00389               ost << "Calling unknown function";
00390             }
00391             ost << " for unscheduled module " << it->second->description().moduleName()
00392                 << "/'" << it->second->description().moduleLabel() << "'";
00393             ex.addContext(ost.str());
00394             ost.str("");
00395             ost << "Processing " << p.id();
00396             ex.addContext(ost.str());
00397             throw;
00398           }
00399         }
00400       }
00401     }
00402 
00403   private:
00404     virtual bool tryToFillImpl(std::string const& moduleLabel,
00405                                EventPrincipal& event,
00406                                EventSetup const& eventSetup,
00407                                CurrentProcessingContext const* iContext) {
00408       std::map<std::string, Worker*>::const_iterator itFound =
00409         labelToWorkers_.find(moduleLabel);
00410       if(itFound != labelToWorkers_.end()) {
00411         CPUTimer timer;
00412         try {
00413           itFound->second->doWork<OccurrenceTraits<EventPrincipal, BranchActionBegin> >(event, eventSetup, iContext, &timer);
00414         }
00415         catch (cms::Exception & ex) {
00416           std::ostringstream ost;
00417           ost << "Calling produce method for unscheduled module " 
00418               <<  itFound->second->description().moduleName() << "/'"
00419               << itFound->second->description().moduleLabel() << "'";
00420           ex.addContext(ost.str());
00421           throw;
00422         }
00423         return true;
00424       }
00425       return false;
00426     }
00427     std::map<std::string, Worker*> labelToWorkers_;
00428   };
00429 
00430   void
00431   inline
00432   Schedule::reportSkipped(EventPrincipal const& ep) const {
00433     Service<JobReport> reportSvc;
00434     reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
00435   }
00436 
00437   template <typename T>
00438   void
00439   Schedule::processOneOccurrence(typename T::MyPrincipal& ep,
00440                                  EventSetup const& es,
00441                                  bool cleaningUpAfterException) {
00442     this->resetAll();
00443     state_ = Running;
00444 
00445     // A RunStopwatch, but only if we are processing an event.
00446     RunStopwatch stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer());
00447 
00448     if (T::isEvent_) {
00449       ++total_events_;
00450       setupOnDemandSystem(dynamic_cast<EventPrincipal&>(ep), es);
00451     }
00452     try {
00453       try {
00454         try {
00455           //make sure the unscheduled items see this transition [Event will be a no-op]
00456           unscheduled_->runNow<T>(ep, es);
00457           if (runTriggerPaths<T>(ep, es)) {
00458             if (T::isEvent_) ++total_passed_;
00459           }
00460           state_ = Latched;
00461         }
00462         catch(cms::Exception& e) {
00463           actions::ActionCodes action = (T::isEvent_ ? act_table_->find(e.category()) : actions::Rethrow);
00464           assert (action != actions::IgnoreCompletely);
00465           assert (action != actions::FailPath);
00466           if (action == actions::SkipEvent) {
00467             edm::printCmsExceptionWarning("SkipEvent", e);
00468           } else {
00469             throw;
00470           }
00471         }
00472 
00473         try {
00474           CPUTimer timer;
00475           if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, 0, &timer);
00476         }
00477         catch (cms::Exception & ex) {
00478           if (T::isEvent_) {
00479             ex.addContext("Calling produce method for module TriggerResultInserter");
00480           }
00481           std::ostringstream ost;
00482           ost << "Processing " << ep.id();
00483           ex.addContext(ost.str());
00484           throw;
00485         }
00486 
00487         if (endpathsAreActive_) runEndPaths<T>(ep, es);
00488         if(T::isEvent_) resetEarlyDelete();
00489       }
00490       catch (cms::Exception& e) { throw; }
00491       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00492       catch (std::exception& e) { convertException::stdToEDM(e); }
00493       catch(std::string& s) { convertException::stringToEDM(s); }
00494       catch(char const* c) { convertException::charPtrToEDM(c); }
00495       catch (...) { convertException::unknownToEDM(); }
00496     }
00497     catch(cms::Exception& ex) {
00498       if (ex.context().empty()) {
00499         addContextAndPrintException("Calling function Schedule::processOneOccurrence", ex, cleaningUpAfterException);
00500       } else {
00501         addContextAndPrintException("", ex, cleaningUpAfterException);
00502       }
00503       state_ = Ready;
00504       throw;
00505     }
00506     // next thing probably is not needed, the product insertion code clears it
00507     state_ = Ready;
00508   }
00509 
00510   template <typename T>
00511   bool
00512   Schedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00513     for_all(trig_paths_, ProcessOneOccurrence<T>(ep, es));
00514     return results_->accept();
00515   }
00516 
00517   template <typename T>
00518   void
00519   Schedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00520     // Note there is no state-checking safety controlling the
00521     // activation/deactivation of endpaths.
00522     for_all(end_paths_, ProcessOneOccurrence<T>(ep, es));
00523 
00524     // We could get rid of the functor ProcessOneOccurrence if we used
00525     // boost::lambda, but the use of lambda with member functions
00526     // which take multiple arguments, by both non-const and const
00527     // reference, seems much more obscure...
00528     //
00529     // using namespace boost::lambda;
00530     // for_all(end_paths_,
00531     //          bind(&Path::processOneOccurrence,
00532     //               boost::lambda::_1, // qualification to avoid ambiguity
00533     //               var(ep),           //  pass by reference (not copy)
00534     //               constant_ref(es))); // pass by const-reference (not copy)
00535   }
00536 }
00537 
00538 #endif