00001 #ifndef FWCore_Framework_Schedule_h
00002 #define FWCore_Framework_Schedule_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069 #include "FWCore/Utilities/interface/Algorithms.h"
00070 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00071 #include "DataFormats/Provenance/interface/ProvenanceFwd.h"
00072 #include "DataFormats/Provenance/interface/Provenance.h"
00073 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
00074 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00075 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
00076 #include "FWCore/ServiceRegistry/interface/Service.h"
00077 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00078 #include "FWCore/MessageLogger/interface/JobReport.h"
00079 #include "FWCore/Framework/interface/Frameworkfwd.h"
00080 #include "FWCore/Framework/interface/Actions.h"
00081 #include "FWCore/Framework/src/Path.h"
00082 #include "FWCore/Framework/src/RunStopwatch.h"
00083 #include "FWCore/Framework/interface/EventPrincipal.h"
00084 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00085 #include "FWCore/Framework/interface/UnscheduledHandler.h"
00086 #include "FWCore/Framework/src/Worker.h"
00087
00088 #include "boost/shared_ptr.hpp"
00089
00090 #include <map>
00091 #include <memory>
00092 #include <string>
00093 #include <vector>
00094 #include <set>
00095
00096 namespace edm {
00097 namespace service {
00098 class TriggerNamesService;
00099 }
00100 class ActivityRegistry;
00101 class EventSetup;
00102 class OutputWorker;
00103 class UnscheduledCallProducer;
00104 class RunStopwatch;
00105 class WorkerInPath;
00106 class WorkerRegistry;
00107 class Schedule {
00108 public:
00109 typedef std::vector<std::string> vstring;
00110 typedef std::vector<Path> TrigPaths;
00111 typedef std::vector<Path> NonTrigPaths;
00112 typedef boost::shared_ptr<HLTGlobalStatus> TrigResPtr;
00113 typedef boost::shared_ptr<Worker> WorkerPtr;
00114 typedef std::vector<Worker*> AllWorkers;
00115 typedef std::vector<OutputWorker*> AllOutputWorkers;
00116
00117 typedef std::vector<Worker*> Workers;
00118
00119 typedef std::vector<WorkerInPath> PathWorkers;
00120
00121 Schedule(ParameterSet const& processDesc,
00122 edm::service::TriggerNamesService& tns,
00123 WorkerRegistry& wregistry,
00124 ProductRegistry& pregistry,
00125 ActionTable& actions,
00126 boost::shared_ptr<ActivityRegistry> areg);
00127
00128 enum State { Ready=0, Running, Latched };
00129
00130 template <typename T>
00131 void processOneOccurrence(typename T::MyPrincipal& principal,
00132 EventSetup const& eventSetup);
00133
00134 void beginJob(EventSetup const&);
00135 void endJob();
00136
00137
00138 void writeLumi(LuminosityBlockPrincipal const& lbp);
00139
00140
00141 void writeRun(RunPrincipal const& rp);
00142
00143
00144 void closeOutputFiles();
00145
00146
00147 void openNewOutputFilesIfNeeded();
00148
00149
00150 void openOutputFiles(FileBlock & fb);
00151
00152
00153 void respondToOpenInputFile(FileBlock const& fb);
00154
00155
00156 void respondToCloseInputFile(FileBlock const& fb);
00157
00158
00159 void respondToOpenOutputFiles(FileBlock const& fb);
00160
00161
00162 void respondToCloseOutputFiles(FileBlock const& fb);
00163
00164
00165 bool shouldWeCloseOutput() const;
00166
00167 std::pair<double,double> timeCpuReal() const {
00168 return std::pair<double,double>(stopwatch_->cpuTime(),stopwatch_->realTime());
00169 }
00170
00173
00177 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
00178
00182 int totalEvents() const {
00183 return total_events_;
00184 }
00185
00188 int totalEventsPassed() const {
00189 return total_passed_;
00190 }
00191
00194 int totalEventsFailed() const {
00195 return totalEvents() - totalEventsPassed();
00196 }
00197
00200 void enableEndPaths(bool active);
00201
00204 bool endPathsEnabled() const;
00205
00208 void getTriggerReport(TriggerReport& rep) const;
00209
00211 bool const terminate() const;
00212
00214 void clearCounters();
00215
00216 private:
00217 AllWorkers::const_iterator workersBegin() const
00218 { return all_workers_.begin(); }
00219
00220 AllWorkers::const_iterator workersEnd() const
00221 { return all_workers_.end(); }
00222
00223 AllWorkers::iterator workersBegin()
00224 { return all_workers_.begin(); }
00225
00226 AllWorkers::iterator workersEnd()
00227 { return all_workers_.end(); }
00228
00229 void resetAll();
00230
00231 template <typename T>
00232 bool runTriggerPaths(typename T::MyPrincipal &, EventSetup const&);
00233
00234 template <typename T>
00235 void runEndPaths(typename T::MyPrincipal &, EventSetup const&);
00236
00237 void setupOnDemandSystem(EventPrincipal& principal, EventSetup const& es);
00238
00239 void reportSkipped(EventPrincipal const& ep) const;
00240 void reportSkipped(LuminosityBlockPrincipal const&) const {}
00241 void reportSkipped(RunPrincipal const&) const {}
00242
00243 void fillWorkers(std::string const& name, PathWorkers& out);
00244 void fillTrigPath(int bitpos, std::string const& name, TrigResPtr);
00245 void fillEndPath(int bitpos, std::string const& name);
00246
00247 void limitOutput();
00248
00249 void addToAllWorkers(Worker* w);
00250
00251 ParameterSet pset_;
00252 WorkerRegistry* worker_reg_;
00253 ProductRegistry* prod_reg_;
00254 ActionTable* act_table_;
00255 std::string processName_;
00256 boost::shared_ptr<ActivityRegistry> actReg_;
00257
00258 State state_;
00259 vstring trig_name_list_;
00260 vstring end_path_name_list_;
00261
00262 TrigResPtr results_;
00263 TrigResPtr endpath_results_;
00264
00265 WorkerPtr results_inserter_;
00266 AllWorkers all_workers_;
00267 AllOutputWorkers all_output_workers_;
00268 TrigPaths trig_paths_;
00269 TrigPaths end_paths_;
00270
00271 bool wantSummary_;
00272 int total_events_;
00273 int total_passed_;
00274 RunStopwatch::StopwatchPointer stopwatch_;
00275
00276 boost::shared_ptr<UnscheduledCallProducer> unscheduled_;
00277 std::vector<boost::shared_ptr<ConstBranchDescription const> > demandBranches_;
00278
00279 volatile bool endpathsAreActive_;
00280 };
00281
00282 namespace {
00283 template <typename T>
00284 class ScheduleSignalSentry {
00285 public:
00286 ScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* ep, EventSetup const* es) :
00287 a_(a),ep_(ep),es_(es) {
00288 if (a_) T::preScheduleSignal(a_, ep_);
00289 }
00290 ~ScheduleSignalSentry() {
00291 if (a_) if (ep_) T::postScheduleSignal(a_, ep_, es_);
00292 }
00293
00294 private:
00295
00296 ActivityRegistry* a_;
00297 typename T::MyPrincipal* ep_;
00298 EventSetup const* es_;
00299 };
00300 }
00301
00302
00303
00304
00305
00306
00307 template <typename T>
00308 class ProcessOneOccurrence {
00309 public:
00310 typedef void result_type;
00311 ProcessOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& setup) :
00312 ep(principal), es(setup) {};
00313
00314 void operator()(Path& p) {p.processOneOccurrence<T>(ep, es);}
00315
00316 private:
00317 typename T::MyPrincipal& ep;
00318 EventSetup const& es;
00319 };
00320
00321 class UnscheduledCallProducer : public UnscheduledHandler {
00322 public:
00323 UnscheduledCallProducer() : UnscheduledHandler(), labelToWorkers_() {}
00324 void addWorker(Worker* aWorker) {
00325 assert(0 != aWorker);
00326 labelToWorkers_[aWorker->description().moduleLabel_]=aWorker;
00327 }
00328 private:
00329 virtual bool tryToFillImpl(std::string const& moduleLabel,
00330 EventPrincipal& event,
00331 const EventSetup& eventSetup) {
00332 std::map<std::string, Worker*>::const_iterator itFound =
00333 labelToWorkers_.find(moduleLabel);
00334 if(itFound != labelToWorkers_.end()) {
00335
00336
00337
00338 itFound->second->doWork<OccurrenceTraits<EventPrincipal, BranchActionBegin> >(event, eventSetup, 0);
00339 return true;
00340 }
00341 return false;
00342 }
00343 std::map<std::string, Worker*> labelToWorkers_;
00344 };
00345
00346 void
00347 inline
00348 Schedule::reportSkipped(EventPrincipal const& ep) const {
00349 Service<JobReport> reportSvc;
00350 reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
00351 }
00352
00353 template <typename T>
00354 void
00355 Schedule::processOneOccurrence(typename T::MyPrincipal& ep, EventSetup const& es) {
00356 this->resetAll();
00357 state_ = Running;
00358
00359
00360 std::auto_ptr<RunStopwatch> stopwatch(T::isEvent_ ? new RunStopwatch(stopwatch_) : 0);
00361
00362 if (T::isEvent_) {
00363 ++total_events_;
00364 setupOnDemandSystem(dynamic_cast<EventPrincipal &>(ep), es);
00365 }
00366 try {
00367
00368
00369 std::auto_ptr<ScheduleSignalSentry<T> > sentry;
00370 try {
00371 sentry = std::auto_ptr<ScheduleSignalSentry<T> >(new ScheduleSignalSentry<T>(actReg_.get(), &ep, &es));
00372 if (runTriggerPaths<T>(ep, es)) {
00373 if (T::isEvent_) ++total_passed_;
00374 }
00375 state_ = Latched;
00376
00377 if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, 0);
00378 }
00379 catch(cms::Exception& e) {
00380 actions::ActionCodes action = (T::isEvent_ ? act_table_->find(e.rootCause()) : actions::Rethrow);
00381 assert (action != actions::IgnoreCompletely);
00382 assert (action != actions::FailPath);
00383 assert (action != actions::FailModule);
00384 if (action == actions::SkipEvent) {
00385 LogWarning(e.category())
00386 << "an exception occurred and all paths for the event are being skipped: \n"
00387 << e.what();
00388 } else {
00389 throw;
00390 }
00391 }
00392
00393 if (endpathsAreActive_) runEndPaths<T>(ep, es);
00394 }
00395 catch(cms::Exception& ex) {
00396 actions::ActionCodes action = (T::isEvent_ ? act_table_->find(ex.rootCause()) : actions::Rethrow);
00397 assert (action != actions::SkipEvent);
00398 assert (action != actions::FailPath);
00399 assert (action != actions::FailModule);
00400 switch(action) {
00401 case actions::IgnoreCompletely: {
00402 LogWarning(ex.category())
00403 << "exception being ignored for current event:\n"
00404 << ex.what();
00405 break;
00406 }
00407 default: {
00408 state_ = Ready;
00409 throw edm::Exception(errors::EventProcessorFailure,
00410 "EventProcessingStopped",ex)
00411 << "an exception occurred during current event processing\n";
00412 }
00413 }
00414 }
00415 catch(...) {
00416 LogError("PassingThrough")
00417 << "an exception occurred during current event processing\n";
00418 state_ = Ready;
00419 throw;
00420 }
00421
00422
00423 state_ = Ready;
00424
00425 }
00426
00427 template <typename T>
00428 bool
00429 Schedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00430 for_all(trig_paths_, ProcessOneOccurrence<T>(ep, es));
00431 return results_->accept();
00432 }
00433
00434 template <typename T>
00435 void
00436 Schedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00437
00438
00439 for_all(end_paths_, ProcessOneOccurrence<T>(ep, es));
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452 }
00453
00454 }
00455
00456 #endif