CMS 3D CMS Logo

CMSSW_4_4_3_patch1/src/FWCore/Framework/interface/Schedule.h

Go to the documentation of this file.
00001 #ifndef FWCore_Framework_Schedule_h
00002 #define FWCore_Framework_Schedule_h
00003 
00004 /*
00005   Author: Jim Kowalkowski  28-01-06
00006 
00007   A class for creating a schedule based on paths in the configuration file.
00008   The schedule is maintained as a sequence of paths.
00009   After construction, events can be fed to the object and passed through
00010   all the modules in the schedule.  All accounting about processing
00011   of events by modules and paths is contained here or in object held
00012   by containment.
00013 
00014   The trigger results producer and product are generated and managed here.
00015   This class also manages endpaths and calls to endjob and beginjob.
00016   Endpaths are just treated as a simple list of modules that need to
00017   do processing of the event and do not participate in trigger path
00018   activities.
00019 
00020   This class requires the high-level process pset.  It uses @process_name.
00021   If the high-level pset contains an "options" pset, then the
00022   following optional parameter can be present:
00023   bool wantSummary = true/false   # default false
00024 
00025   wantSummary indicates whether or not the pass/fail/error stats
00026   for modules and paths should be printed at the end-of-job.
00027 
00028   A TriggerResults object will always be inserted into the event
00029   for any schedule.  The producer of the TriggerResults EDProduct
00030   is always the first module in the endpath.  The TriggerResultInserter
00031   is given a fixed label of "TriggerResults".
00032 
00033   Processing of an event happens by pushing the event through the Paths.
00034   The scheduler performs the reset() on each of the workers independent
00035   of the Path objects.
00036 
00037   ------------------------
00038 
00039   About Paths:
00040   Paths fit into two categories:
00041   1) trigger paths that contribute directly to saved trigger bits
00042   2) end paths
00043   The Schedule holds these paths in two data structures:
00044   1) main path list
00045   2) end path list
00046 
00047   Trigger path processing always precedes endpath processing.
00048   The order of the paths from the input configuration is
00049   preserved in the main paths list.
00050 
00051   ------------------------
00052 
00053   The Schedule uses the TriggerNamesService to get the names of the
00054   trigger paths and end paths. When a TriggerResults object is created
00055   the results are stored in the same order as the trigger names from
00056   TriggerNamesService.
00057 
00058 */
00059 
00060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
00061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00062 #include "FWCore/Framework/interface/Actions.h"
00063 #include "FWCore/Framework/interface/EventPrincipal.h"
00064 #include "FWCore/Framework/interface/Frameworkfwd.h"
00065 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00066 #include "FWCore/Framework/interface/UnscheduledHandler.h"
00067 #include "FWCore/Framework/src/Path.h"
00068 #include "FWCore/Framework/src/RunStopwatch.h"
00069 #include "FWCore/Framework/src/Worker.h"
00070 #include "FWCore/Framework/src/WorkerRegistry.h"
00071 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
00072 #include "FWCore/MessageLogger/interface/JobReport.h"
00073 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00074 #include "FWCore/ServiceRegistry/interface/Service.h"
00075 #include "FWCore/Utilities/interface/Algorithms.h"
00076 #include "FWCore/Utilities/interface/BranchType.h"
00077 #include "FWCore/Utilities/interface/ConvertException.h"
00078 #include "FWCore/Utilities/interface/Exception.h"
00079 
00080 #include "boost/shared_ptr.hpp"
00081 
00082 #include <map>
00083 #include <memory>
00084 #include <set>
00085 #include <string>
00086 #include <vector>
00087 #include <sstream>
00088 
00089 namespace edm {
00090   namespace service {
00091     class TriggerNamesService;
00092   }
00093   class ActivityRegistry;
00094   class EventSetup;
00095   class ExceptionCollector;
00096   class OutputWorker;
00097   class RunStopwatch;
00098   class UnscheduledCallProducer;
00099   class WorkerInPath;
00100   class Schedule {
00101   public:
00102     typedef std::vector<std::string> vstring;
00103     typedef std::vector<Path> TrigPaths;
00104     typedef std::vector<Path> NonTrigPaths;
00105     typedef boost::shared_ptr<HLTGlobalStatus> TrigResPtr;
00106     typedef boost::shared_ptr<Worker> WorkerPtr;
00107     typedef std::vector<Worker*> AllWorkers;
00108     typedef std::vector<OutputWorker*> AllOutputWorkers;
00109 
00110     typedef std::vector<Worker*> Workers;
00111 
00112     typedef std::vector<WorkerInPath> PathWorkers;
00113 
00114     Schedule(ParameterSet& proc_pset,
00115              service::TriggerNamesService& tns,
00116              ProductRegistry& pregistry,
00117              ActionTable const& actions,
00118              boost::shared_ptr<ActivityRegistry> areg,
00119              boost::shared_ptr<ProcessConfiguration> processConfiguration);
00120 
00121     enum State { Ready = 0, Running, Latched };
00122 
00123     template <typename T>
00124     void processOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& eventSetup);
00125 
00126     void beginJob();
00127     void endJob(ExceptionCollector & collector);
00128 
00129     // Write the luminosity block
00130     void writeLumi(LuminosityBlockPrincipal const& lbp);
00131 
00132     // Write the run
00133     void writeRun(RunPrincipal const& rp);
00134 
00135     // Call closeFile() on all OutputModules.
00136     void closeOutputFiles();
00137 
00138     // Call openNewFileIfNeeded() on all OutputModules
00139     void openNewOutputFilesIfNeeded();
00140 
00141     // Call openFiles() on all OutputModules
00142     void openOutputFiles(FileBlock& fb);
00143 
00144     // Call respondToOpenInputFile() on all Modules
00145     void respondToOpenInputFile(FileBlock const& fb);
00146 
00147     // Call respondToCloseInputFile() on all Modules
00148     void respondToCloseInputFile(FileBlock const& fb);
00149 
00150     // Call respondToOpenOutputFiles() on all Modules
00151     void respondToOpenOutputFiles(FileBlock const& fb);
00152 
00153     // Call respondToCloseOutputFiles() on all Modules
00154     void respondToCloseOutputFiles(FileBlock const& fb);
00155 
00156     // Call shouldWeCloseFile() on all OutputModules.
00157     bool shouldWeCloseOutput() const;
00158 
00159     void preForkReleaseResources();
00160     void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren);
00161 
00162     std::pair<double, double> timeCpuReal() const {
00163       return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
00164     }
00165 
00168 
00172     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
00173 
00175     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
00176 
00178     void modulesInPath(std::string const& iPathLabel,
00179                        std::vector<std::string>& oLabelsToFill) const;
00180 
00184     int totalEvents() const {
00185       return total_events_;
00186     }
00187 
00190     int totalEventsPassed() const {
00191       return total_passed_;
00192     }
00193 
00196     int totalEventsFailed() const {
00197       return totalEvents() - totalEventsPassed();
00198     }
00199 
00202     void enableEndPaths(bool active);
00203 
00206     bool endPathsEnabled() const;
00207 
00210     void getTriggerReport(TriggerReport& rep) const;
00211 
00213     bool const terminate() const;
00214 
00216     void clearCounters();
00217 
00220     bool changeModule(std::string const& iLabel, ParameterSet const& iPSet);
00221 
00222   private:
00223 
00224     AllWorkers::const_iterator workersBegin() const {
00225       return all_workers_.begin();
00226     }
00227 
00228     AllWorkers::const_iterator workersEnd() const {
00229       return all_workers_.end();
00230     }
00231 
00232     AllWorkers::iterator workersBegin() {
00233       return  all_workers_.begin();
00234     }
00235 
00236     AllWorkers::iterator workersEnd() {
00237       return all_workers_.end();
00238     }
00239 
00240     void resetAll();
00241 
00242     template <typename T>
00243     bool runTriggerPaths(typename T::MyPrincipal&, EventSetup const&);
00244 
00245     template <typename T>
00246     void runEndPaths(typename T::MyPrincipal&, EventSetup const&);
00247 
00248     void setupOnDemandSystem(EventPrincipal& principal, EventSetup const& es);
00249 
00250     void reportSkipped(EventPrincipal const& ep) const;
00251     void reportSkipped(LuminosityBlockPrincipal const&) const {}
00252     void reportSkipped(RunPrincipal const&) const {}
00253 
00254     void fillWorkers(ParameterSet& proc_pset,
00255                      ProductRegistry& preg,
00256                      boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00257                      std::string const& name, bool ignoreFilters, PathWorkers& out);
00258     void fillTrigPath(ParameterSet& proc_pset,
00259                       ProductRegistry& preg,
00260                       boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00261                       int bitpos, std::string const& name, TrigResPtr);
00262     void fillEndPath(ParameterSet& proc_pset,
00263                      ProductRegistry& preg,
00264                      boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00265                      int bitpos, std::string const& name);
00266 
00267     void limitOutput(ParameterSet const& proc_pset);
00268 
00269     void addToAllWorkers(Worker* w);
00270 
00271     WorkerRegistry                                worker_reg_;
00272     ActionTable const*                            act_table_;
00273     boost::shared_ptr<ActivityRegistry>           actReg_;
00274 
00275     State                    state_;
00276     vstring                  trig_name_list_;
00277     vstring                  end_path_name_list_;
00278 
00279     TrigResPtr               results_;
00280     TrigResPtr               endpath_results_;
00281 
00282     WorkerPtr                results_inserter_;
00283     AllWorkers               all_workers_;
00284     AllOutputWorkers         all_output_workers_;
00285     TrigPaths                trig_paths_;
00286     TrigPaths                end_paths_;
00287 
00288     bool                           wantSummary_;
00289     int                            total_events_;
00290     int                            total_passed_;
00291     RunStopwatch::StopwatchPointer stopwatch_;
00292 
00293     boost::shared_ptr<UnscheduledCallProducer> unscheduled_;
00294 
00295     volatile bool           endpathsAreActive_;
00296 
00297     bool                    printedFirstException_;
00298   };
00299 
00300   // -----------------------------
00301   // ProcessOneOccurrence is a functor that has bound a specific
00302   // Principal and Event Setup, and can be called with a Path, to
00303   // execute Path::processOneOccurrence for that event
00304 
00305   template <typename T>
00306   class ProcessOneOccurrence {
00307   public:
00308     typedef void result_type;
00309     ProcessOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& setup) :
00310       ep(principal), es(setup) {};
00311 
00312       void operator()(Path& p) {p.processOneOccurrence<T>(ep, es);}
00313 
00314   private:
00315     typename T::MyPrincipal&   ep;
00316     EventSetup const& es;
00317   };
00318 
00319   class UnscheduledCallProducer : public UnscheduledHandler {
00320   public:
00321     UnscheduledCallProducer() : UnscheduledHandler(), labelToWorkers_() {}
00322     void addWorker(Worker* aWorker) {
00323       assert(0 != aWorker);
00324       labelToWorkers_[aWorker->description().moduleLabel()] = aWorker;
00325     }
00326 
00327     template <typename T>
00328     void runNow(typename T::MyPrincipal& p, EventSetup const& es) {
00329       //do nothing for event since we will run when requested
00330       if(!T::isEvent_) {
00331         for(std::map<std::string, Worker*>::iterator it = labelToWorkers_.begin(), itEnd=labelToWorkers_.end();
00332             it != itEnd;
00333             ++it) {
00334           CPUTimer timer;
00335           try {
00336             it->second->doWork<T>(p, es, 0, &timer);
00337           }
00338           catch (cms::Exception & ex) {
00339             std::ostringstream ost;
00340             if (T::isEvent_) {
00341               ost << "Calling event method";
00342             }
00343             else if (T::begin_ && T::branchType_ == InRun) {
00344               ost << "Calling beginRun";
00345             }
00346             else if (T::begin_ && T::branchType_ == InLumi) {
00347               ost << "Calling beginLuminosityBlock";
00348             }
00349             else if (!T::begin_ && T::branchType_ == InLumi) {
00350               ost << "Calling endLuminosityBlock";
00351             }
00352             else if (!T::begin_ && T::branchType_ == InRun) {
00353               ost << "Calling endRun";
00354             }
00355             else {
00356               // It should be impossible to get here ...
00357               ost << "Calling unknown function";
00358             }
00359             ost << " for unscheduled module " << it->second->description().moduleName()
00360                 << "/'" << it->second->description().moduleLabel() << "'";
00361             ex.addContext(ost.str());
00362             ost.str("");
00363             ost << "Processing " << p.id();
00364             ex.addContext(ost.str());
00365             throw;
00366           }
00367         }
00368       }
00369     }
00370 
00371   private:
00372     virtual bool tryToFillImpl(std::string const& moduleLabel,
00373                                EventPrincipal& event,
00374                                EventSetup const& eventSetup,
00375                                CurrentProcessingContext const* iContext) {
00376       std::map<std::string, Worker*>::const_iterator itFound =
00377         labelToWorkers_.find(moduleLabel);
00378       if(itFound != labelToWorkers_.end()) {
00379         CPUTimer timer;
00380         try {
00381           itFound->second->doWork<OccurrenceTraits<EventPrincipal, BranchActionBegin> >(event, eventSetup, iContext, &timer);
00382         }
00383         catch (cms::Exception & ex) {
00384           std::ostringstream ost;
00385           ost << "Calling produce method for unscheduled module " 
00386               <<  itFound->second->description().moduleName() << "/'"
00387               << itFound->second->description().moduleLabel() << "'";
00388           ex.addContext(ost.str());
00389           throw;
00390         }
00391         return true;
00392       }
00393       return false;
00394     }
00395     std::map<std::string, Worker*> labelToWorkers_;
00396   };
00397 
00398   void
00399   inline
00400   Schedule::reportSkipped(EventPrincipal const& ep) const {
00401     Service<JobReport> reportSvc;
00402     reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
00403   }
00404 
00405   template <typename T>
00406   void
00407   Schedule::processOneOccurrence(typename T::MyPrincipal& ep, EventSetup const& es) {
00408     this->resetAll();
00409     state_ = Running;
00410 
00411     // A RunStopwatch, but only if we are processing an event.
00412     RunStopwatch stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer());
00413 
00414     if (T::isEvent_) {
00415       ++total_events_;
00416       setupOnDemandSystem(dynamic_cast<EventPrincipal&>(ep), es);
00417     }
00418     try {
00419       try {
00420         try {
00421           //make sure the unscheduled items see this transition [Event will be a no-op]
00422           unscheduled_->runNow<T>(ep, es);
00423           if (runTriggerPaths<T>(ep, es)) {
00424             if (T::isEvent_) ++total_passed_;
00425           }
00426           state_ = Latched;
00427         }
00428         catch(cms::Exception& e) {
00429           actions::ActionCodes action = (T::isEvent_ ? act_table_->find(e.category()) : actions::Rethrow);
00430           assert (action != actions::IgnoreCompletely);
00431           assert (action != actions::FailPath);
00432           if (action == actions::SkipEvent) {
00433             edm::printCmsExceptionWarning("SkipEvent", e);
00434           } else {
00435             throw;
00436           }
00437         }
00438 
00439         try {
00440           CPUTimer timer;
00441           if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, 0, &timer);
00442         }
00443         catch (cms::Exception & ex) {
00444           if (T::isEvent_) {
00445             ex.addContext("Calling produce method for module TriggerResultInserter");
00446           }
00447           std::ostringstream ost;
00448           ost << "Processing " << ep.id();
00449           ex.addContext(ost.str());
00450           throw;
00451         }
00452 
00453         if (endpathsAreActive_) runEndPaths<T>(ep, es);
00454       }
00455       catch (cms::Exception& e) { throw; }
00456       catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00457       catch (std::exception& e) { convertException::stdToEDM(e); }
00458       catch(std::string& s) { convertException::stringToEDM(s); }
00459       catch(char const* c) { convertException::charPtrToEDM(c); }
00460       catch (...) { convertException::unknownToEDM(); }
00461     }
00462     catch(cms::Exception& ex) {
00463       if (!printedFirstException_) {
00464         Service<JobReport> jobReportSvc;
00465         if (ex.context().empty()) {
00466           ex.addContext("Calling function Schedule::processOneOccurrence");
00467         }
00468         if (jobReportSvc.isAvailable()) {
00469           JobReport *jobRep = jobReportSvc.operator->();
00470           edm::printCmsException(ex, jobRep, ex.returnCode());
00471         }
00472         else {
00473           edm::printCmsException(ex);
00474         }
00475         ex.setAlreadyPrinted(true);
00476         printedFirstException_ = true;
00477       }
00478       state_ = Ready;
00479       throw;
00480     }
00481     // next thing probably is not needed, the product insertion code clears it
00482     state_ = Ready;
00483   }
00484 
00485   template <typename T>
00486   bool
00487   Schedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00488     for_all(trig_paths_, ProcessOneOccurrence<T>(ep, es));
00489     return results_->accept();
00490   }
00491 
00492   template <typename T>
00493   void
00494   Schedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00495     // Note there is no state-checking safety controlling the
00496     // activation/deactivation of endpaths.
00497     for_all(end_paths_, ProcessOneOccurrence<T>(ep, es));
00498 
00499     // We could get rid of the functor ProcessOneOccurrence if we used
00500     // boost::lambda, but the use of lambda with member functions
00501     // which take multiple arguments, by both non-const and const
00502     // reference, seems much more obscure...
00503     //
00504     // using namespace boost::lambda;
00505     // for_all(end_paths_,
00506     //          bind(&Path::processOneOccurrence,
00507     //               boost::lambda::_1, // qualification to avoid ambiguity
00508     //               var(ep),           //  pass by reference (not copy)
00509     //               constant_ref(es))); // pass by const-reference (not copy)
00510   }
00511 }
00512 
00513 #endif