CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_6_1_2_SLHC2/src/FWCore/Framework/interface/Schedule.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_Schedule_h
00002 #define FWCore_Framework_Schedule_h
00003 
00004 /*
00005   Author: Jim Kowalkowski  28-01-06
00006 
00007   A class for creating a schedule based on paths in the configuration file.
00008   The schedule is maintained as a sequence of paths.
00009   After construction, events can be fed to the object and passed through
00010   all the modules in the schedule.  All accounting about processing
00011   of events by modules and paths is contained here or in object held
00012   by containment.
00013 
00014   The trigger results producer and product are generated and managed here.
00015   This class also manages endpaths and calls to endjob and beginjob.
00016   Endpaths are just treated as a simple list of modules that need to
00017   do processing of the event and do not participate in trigger path
00018   activities.
00019 
00020   This class requires the high-level process pset.  It uses @process_name.
00021   If the high-level pset contains an "options" pset, then the
00022   following optional parameter can be present:
00023   bool wantSummary = true/false   # default false
00024 
00025   wantSummary indicates whether or not the pass/fail/error stats
00026   for modules and paths should be printed at the end-of-job.
00027 
00028   A TriggerResults object will always be inserted into the event
00029   for any schedule.  The producer of the TriggerResults EDProduct
00030   is always the first module in the endpath.  The TriggerResultInserter
00031   is given a fixed label of "TriggerResults".
00032 
00033   Processing of an event happens by pushing the event through the Paths.
00034   The scheduler performs the reset() on each of the workers independent
00035   of the Path objects.
00036 
00037   ------------------------
00038 
00039   About Paths:
00040   Paths fit into two categories:
00041   1) trigger paths that contribute directly to saved trigger bits
00042   2) end paths
00043   The Schedule holds these paths in two data structures:
00044   1) main path list
00045   2) end path list
00046 
00047   Trigger path processing always precedes endpath processing.
00048   The order of the paths from the input configuration is
00049   preserved in the main paths list.
00050 
00051   ------------------------
00052 
00053   The Schedule uses the TriggerNamesService to get the names of the
00054   trigger paths and end paths. When a TriggerResults object is created
00055   the results are stored in the same order as the trigger names from
00056   TriggerNamesService.
00057 
00058 */
00059 
00060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
00061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00062 #include "FWCore/Framework/interface/Actions.h"
00063 #include "FWCore/Framework/interface/EventPrincipal.h"
00064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
00065 #include "FWCore/Framework/interface/Frameworkfwd.h"
00066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00067 #include "FWCore/Framework/interface/UnscheduledHandler.h"
00068 #include "FWCore/Framework/src/Path.h"
00069 #include "FWCore/Framework/src/RunStopwatch.h"
00070 #include "FWCore/Framework/src/Worker.h"
00071 #include "FWCore/Framework/src/WorkerRegistry.h"
00072 #include "FWCore/Framework/src/EarlyDeleteHelper.h"
00073 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
00074 #include "FWCore/MessageLogger/interface/JobReport.h"
00075 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00076 #include "FWCore/ServiceRegistry/interface/Service.h"
00077 #include "FWCore/Utilities/interface/Algorithms.h"
00078 #include "FWCore/Utilities/interface/BranchType.h"
00079 #include "FWCore/Utilities/interface/ConvertException.h"
00080 #include "FWCore/Utilities/interface/Exception.h"
00081 
00082 #include "boost/shared_ptr.hpp"
00083 
00084 #include <map>
00085 #include <memory>
00086 #include <set>
00087 #include <string>
00088 #include <vector>
00089 #include <sstream>
00090 
00091 namespace edm {
00092   namespace service {
00093     class TriggerNamesService;
00094   }
00095   class ActivityRegistry;
00096   class BranchIDListHelper;
00097   class EventSetup;
00098   class ExceptionCollector;
00099   class OutputWorker;
00100   class RunStopwatch;
00101   class UnscheduledCallProducer;
00102   class WorkerInPath;
00103   class Schedule {
00104   public:
00105     typedef std::vector<std::string> vstring;
00106     typedef std::vector<Path> TrigPaths;
00107     typedef std::vector<Path> NonTrigPaths;
00108     typedef boost::shared_ptr<HLTGlobalStatus> TrigResPtr;
00109     typedef boost::shared_ptr<Worker> WorkerPtr;
00110     typedef std::vector<Worker*> AllWorkers;
00111     typedef std::vector<OutputWorker*> AllOutputWorkers;
00112 
00113     typedef std::vector<Worker*> Workers;
00114 
00115     typedef std::vector<WorkerInPath> PathWorkers;
00116 
00117     Schedule(ParameterSet& proc_pset,
00118              service::TriggerNamesService& tns,
00119              ProductRegistry& pregistry,
00120              BranchIDListHelper& branchIDListHelper,
00121              ActionTable const& actions,
00122              boost::shared_ptr<ActivityRegistry> areg,
00123              boost::shared_ptr<ProcessConfiguration> processConfiguration,
00124              const ParameterSet* subProcPSet);
00125 
00126     enum State { Ready = 0, Running, Latched };
00127 
00128     template <typename T>
00129     void processOneOccurrence(typename T::MyPrincipal& principal,
00130                               EventSetup const& eventSetup,
00131                               bool cleaningUpAfterException = false);
00132 
00133     void beginJob();
00134     void endJob(ExceptionCollector & collector);
00135 
00136     // Write the luminosity block
00137     void writeLumi(LuminosityBlockPrincipal const& lbp);
00138 
00139     // Write the run
00140     void writeRun(RunPrincipal const& rp);
00141 
00142     // Call closeFile() on all OutputModules.
00143     void closeOutputFiles();
00144 
00145     // Call openNewFileIfNeeded() on all OutputModules
00146     void openNewOutputFilesIfNeeded();
00147 
00148     // Call openFiles() on all OutputModules
00149     void openOutputFiles(FileBlock& fb);
00150 
00151     // Call respondToOpenInputFile() on all Modules
00152     void respondToOpenInputFile(FileBlock const& fb);
00153 
00154     // Call respondToCloseInputFile() on all Modules
00155     void respondToCloseInputFile(FileBlock const& fb);
00156 
00157     // Call respondToOpenOutputFiles() on all Modules
00158     void respondToOpenOutputFiles(FileBlock const& fb);
00159 
00160     // Call respondToCloseOutputFiles() on all Modules
00161     void respondToCloseOutputFiles(FileBlock const& fb);
00162 
00163     // Call shouldWeCloseFile() on all OutputModules.
00164     bool shouldWeCloseOutput() const;
00165 
00166     void preForkReleaseResources();
00167     void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren);
00168 
00169     std::pair<double, double> timeCpuReal() const {
00170       return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
00171     }
00172 
00175 
00179     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
00180 
00182     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
00183 
00185     void modulesInPath(std::string const& iPathLabel,
00186                        std::vector<std::string>& oLabelsToFill) const;
00187 
00191     int totalEvents() const {
00192       return total_events_;
00193     }
00194 
00197     int totalEventsPassed() const {
00198       return total_passed_;
00199     }
00200 
00203     int totalEventsFailed() const {
00204       return totalEvents() - totalEventsPassed();
00205     }
00206 
00209     void enableEndPaths(bool active);
00210 
00213     bool endPathsEnabled() const;
00214 
00217     void getTriggerReport(TriggerReport& rep) const;
00218 
00220     bool terminate() const;
00221 
00223     void clearCounters();
00224 
00227     bool changeModule(std::string const& iLabel, ParameterSet const& iPSet);
00228 
00229   private:
00230 
00231     AllWorkers::const_iterator workersBegin() const {
00232       return all_workers_.begin();
00233     }
00234 
00235     AllWorkers::const_iterator workersEnd() const {
00236       return all_workers_.end();
00237     }
00238 
00239     AllWorkers::iterator workersBegin() {
00240       return  all_workers_.begin();
00241     }
00242 
00243     AllWorkers::iterator workersEnd() {
00244       return all_workers_.end();
00245     }
00246 
00247     void resetAll();
00248 
00249     template <typename T>
00250     bool runTriggerPaths(typename T::MyPrincipal&, EventSetup const&);
00251 
00252     template <typename T>
00253     void runEndPaths(typename T::MyPrincipal&, EventSetup const&);
00254 
00255     void setupOnDemandSystem(EventPrincipal& principal, EventSetup const& es);
00256 
00257     void reportSkipped(EventPrincipal const& ep) const;
00258     void reportSkipped(LuminosityBlockPrincipal const&) const {}
00259     void reportSkipped(RunPrincipal const&) const {}
00260 
00261     void reduceParameterSet(ParameterSet& proc_pset,
00262                             vstring& modulesInConfig,
00263                             std::set<std::string> const& modulesInConfigSet,
00264                             vstring& labelsOnTriggerPaths,
00265                             vstring& shouldBeUsedLabels,
00266                             std::map<std::string, std::vector<std::pair<std::string, int> > >& outputModulePathPositions);
00267 
00268     void fillWorkers(ParameterSet& proc_pset,
00269                      ProductRegistry& preg,
00270                      boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00271                      std::string const& name, bool ignoreFilters, PathWorkers& out,
00272                      vstring* labelsOnPaths);
00273     void fillTrigPath(ParameterSet& proc_pset,
00274                       ProductRegistry& preg,
00275                       boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00276                       int bitpos, std::string const& name, TrigResPtr,
00277                       vstring* labelsOnTriggerPaths);
00278     void fillEndPath(ParameterSet& proc_pset,
00279                      ProductRegistry& preg,
00280                      boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00281                      int bitpos, std::string const& name);
00282 
00283     void limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists);
00284 
00285     void addToAllWorkers(Worker* w);
00286     
00287     void resetEarlyDelete();
00288     void initializeEarlyDelete(edm::ParameterSet const& opts,
00289                                edm::ProductRegistry const& preg, 
00290                                edm::ParameterSet const* subProcPSet);
00291 
00292     WorkerRegistry                                worker_reg_;
00293     ActionTable const*                            act_table_;
00294     boost::shared_ptr<ActivityRegistry>           actReg_;
00295 
00296     State                    state_;
00297     vstring                  trig_name_list_;
00298     vstring                  end_path_name_list_;
00299 
00300     TrigResPtr               results_;
00301     TrigResPtr               endpath_results_;
00302 
00303     WorkerPtr                results_inserter_;
00304     AllWorkers               all_workers_;
00305     AllOutputWorkers         all_output_workers_;
00306     TrigPaths                trig_paths_;
00307     TrigPaths                end_paths_;
00308 
00309     //For each branch that has been marked for early deletion
00310     // keep track of how many modules are left that read this data but have
00311     // not yet been run in this event
00312     std::vector<std::pair<BranchID,unsigned int>> earlyDeleteBranchToCount_;
00313     //NOTE the following is effectively internal data for each EarlyDeleteHelper
00314     // but putting it into one vector makes for better allocation as well as
00315     // faster iteration when used to reset the earlyDeleteBranchToCount_
00316     // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
00317     // of this vector correspond to indexes into earlyDeleteBranchToCount_ so 
00318     // tell which EarlyDeleteHelper is associated with which BranchIDs.
00319     std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
00320     //There is one EarlyDeleteHelper per Module which are reading data that
00321     // has been marked for early deletion
00322     std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
00323 
00324     bool                           wantSummary_;
00325     int                            total_events_;
00326     int                            total_passed_;
00327     RunStopwatch::StopwatchPointer stopwatch_;
00328 
00329     boost::shared_ptr<UnscheduledCallProducer> unscheduled_;
00330 
00331     volatile bool           endpathsAreActive_;
00332   };
00333 
00334   // -----------------------------
00335   // ProcessOneOccurrence is a functor that has bound a specific
00336   // Principal and Event Setup, and can be called with a Path, to
00337   // execute Path::processOneOccurrence for that event
00338 
00339   template <typename T>
00340   class ProcessOneOccurrence {
00341   public:
00342     typedef void result_type;
00343     ProcessOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& setup) :
00344       ep(principal), es(setup) {};
00345 
00346       void operator()(Path& p) {p.processOneOccurrence<T>(ep, es);}
00347 
00348   private:
00349     typename T::MyPrincipal&   ep;
00350     EventSetup const& es;
00351   };
00352 
00353   class UnscheduledCallProducer : public UnscheduledHandler {
00354   public:
00355     UnscheduledCallProducer() : UnscheduledHandler(), labelToWorkers_() {}
00356     void addWorker(Worker* aWorker) {
00357       assert(0 != aWorker);
00358       labelToWorkers_[aWorker->description().moduleLabel()] = aWorker;
00359     }
00360 
00361     template <typename T>
00362     void runNow(typename T::MyPrincipal& p, EventSetup const& es) {
00363       //do nothing for event since we will run when requested
00364       if(!T::isEvent_) {
00365         for(std::map<std::string, Worker*>::iterator it = labelToWorkers_.begin(), itEnd=labelToWorkers_.end();
00366             it != itEnd;
00367             ++it) {
00368           CPUTimer timer;
00369           try {
00370             it->second->doWork<T>(p, es, 0, &timer);
00371           }
00372           catch (cms::Exception & ex) {
00373             std::ostringstream ost;
00374             if (T::isEvent_) {
00375               ost << "Calling event method";
00376             }
00377             else if (T::begin_ && T::branchType_ == InRun) {
00378               ost << "Calling beginRun";
00379             }
00380             else if (T::begin_ && T::branchType_ == InLumi) {
00381               ost << "Calling beginLuminosityBlock";
00382             }
00383             else if (!T::begin_ && T::branchType_ == InLumi) {
00384               ost << "Calling endLuminosityBlock";
00385             }
00386             else if (!T::begin_ && T::branchType_ == InRun) {
00387               ost << "Calling endRun";
00388             }
00389             else {
00390               // It should be impossible to get here ...
00391               ost << "Calling unknown function";
00392             }
00393             ost << " for unscheduled module " << it->second->description().moduleName()
00394                 << "/'" << it->second->description().moduleLabel() << "'";
00395             ex.addContext(ost.str());
00396             ost.str("");
00397             ost << "Processing " << p.id();
00398             ex.addContext(ost.str());
00399             throw;
00400           }
00401         }
00402       }
00403     }
00404 
00405   private:
00406     virtual bool tryToFillImpl(std::string const& moduleLabel,
00407                                EventPrincipal& event,
00408                                EventSetup const& eventSetup,
00409                                CurrentProcessingContext const* iContext) {
00410       std::map<std::string, Worker*>::const_iterator itFound =
00411         labelToWorkers_.find(moduleLabel);
00412       if(itFound != labelToWorkers_.end()) {
00413         CPUTimer timer;
00414         try {
00415           itFound->second->doWork<OccurrenceTraits<EventPrincipal, BranchActionBegin> >(event, eventSetup, iContext, &timer);
00416         }
00417         catch (cms::Exception & ex) {
00418           std::ostringstream ost;
00419           ost << "Calling produce method for unscheduled module " 
00420               <<  itFound->second->description().moduleName() << "/'"
00421               << itFound->second->description().moduleLabel() << "'";
00422           ex.addContext(ost.str());
00423           throw;
00424         }
00425         return true;
00426       }
00427       return false;
00428     }
00429     std::map<std::string, Worker*> labelToWorkers_;
00430   };
00431 
00432   void
00433   inline
00434   Schedule::reportSkipped(EventPrincipal const& ep) const {
00435     Service<JobReport> reportSvc;
00436     reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
00437   }
00438 
00439   template <typename T>
00440   void
00441   Schedule::processOneOccurrence(typename T::MyPrincipal& ep,
00442                                  EventSetup const& es,
00443                                  bool cleaningUpAfterException) {
00444     this->resetAll();
00445     state_ = Running;
00446 
00447     // A RunStopwatch, but only if we are processing an event.
00448     RunStopwatch stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer());
00449 
00450     if (T::isEvent_) {
00451       ++total_events_;
00452       setupOnDemandSystem(dynamic_cast<EventPrincipal&>(ep), es);
00453     }
00454     try {
00455       try {
00456         try {
00457           //make sure the unscheduled items see this transition [Event will be a no-op]
00458           unscheduled_->runNow<T>(ep, es);
00459           if (runTriggerPaths<T>(ep, es)) {
00460             if (T::isEvent_) ++total_passed_;
00461           }
00462           state_ = Latched;
00463         }
00464         catch(cms::Exception& e) {
00465           actions::ActionCodes action = (T::isEvent_ ? act_table_->find(e.category()) : actions::Rethrow);
00466           assert (action != actions::IgnoreCompletely);
00467           assert (action != actions::FailPath);
00468           if (action == actions::SkipEvent) {
00469             edm::printCmsExceptionWarning("SkipEvent", e);
00470           } else {
00471             throw;
00472           }
00473         }
00474 
00475         try {
00476           CPUTimer timer;
00477           if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, 0, &timer);
00478         }
00479         catch (cms::Exception & ex) {
00480           if (T::isEvent_) {
00481             ex.addContext("Calling produce method for module TriggerResultInserter");
00482           }
00483           std::ostringstream ost;
00484           ost << "Processing " << ep.id();
00485           ex.addContext(ost.str());
00486           throw;
00487         }
00488 
00489         if (endpathsAreActive_) runEndPaths<T>(ep, es);
00490         if(T::isEvent_) resetEarlyDelete();
00491       }
00492       catch (cms::Exception& e) { throw; }
00493       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00494       catch (std::exception& e) { convertException::stdToEDM(e); }
00495       catch(std::string& s) { convertException::stringToEDM(s); }
00496       catch(char const* c) { convertException::charPtrToEDM(c); }
00497       catch (...) { convertException::unknownToEDM(); }
00498     }
00499     catch(cms::Exception& ex) {
00500       if (ex.context().empty()) {
00501         addContextAndPrintException("Calling function Schedule::processOneOccurrence", ex, cleaningUpAfterException);
00502       } else {
00503         addContextAndPrintException("", ex, cleaningUpAfterException);
00504       }
00505       state_ = Ready;
00506       throw;
00507     }
00508     // next thing probably is not needed, the product insertion code clears it
00509     state_ = Ready;
00510   }
00511 
00512   template <typename T>
00513   bool
00514   Schedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00515     for_all(trig_paths_, ProcessOneOccurrence<T>(ep, es));
00516     return results_->accept();
00517   }
00518 
00519   template <typename T>
00520   void
00521   Schedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00522     // Note there is no state-checking safety controlling the
00523     // activation/deactivation of endpaths.
00524     for_all(end_paths_, ProcessOneOccurrence<T>(ep, es));
00525 
00526     // We could get rid of the functor ProcessOneOccurrence if we used
00527     // boost::lambda, but the use of lambda with member functions
00528     // which take multiple arguments, by both non-const and const
00529     // reference, seems much more obscure...
00530     //
00531     // using namespace boost::lambda;
00532     // for_all(end_paths_,
00533     //          bind(&Path::processOneOccurrence,
00534     //               boost::lambda::_1, // qualification to avoid ambiguity
00535     //               var(ep),           //  pass by reference (not copy)
00536     //               constant_ref(es))); // pass by const-reference (not copy)
00537   }
00538 }
00539 
00540 #endif