00001 #ifndef FWCore_Framework_Schedule_h
00002 #define FWCore_Framework_Schedule_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
00061 #include "FWCore/Framework/interface/Actions.h"
00062 #include "FWCore/Framework/interface/EventPrincipal.h"
00063 #include "FWCore/Framework/interface/Frameworkfwd.h"
00064 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00065 #include "FWCore/Framework/interface/UnscheduledHandler.h"
00066 #include "FWCore/Framework/src/Path.h"
00067 #include "FWCore/Framework/src/RunStopwatch.h"
00068 #include "FWCore/Framework/src/Worker.h"
00069 #include "FWCore/Framework/src/WorkerRegistry.h"
00070 #include "FWCore/MessageLogger/interface/JobReport.h"
00071 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00072 #include "FWCore/ServiceRegistry/interface/Service.h"
00073 #include "FWCore/Utilities/interface/Algorithms.h"
00074
00075 #include "boost/shared_ptr.hpp"
00076
00077 #include <map>
00078 #include <memory>
00079 #include <set>
00080 #include <string>
00081 #include <vector>
00082
00083 namespace edm {
00084 namespace service {
00085 class TriggerNamesService;
00086 }
00087 class ActivityRegistry;
00088 class EventSetup;
00089 class OutputWorker;
00090 class RunStopwatch;
00091 class UnscheduledCallProducer;
00092 class WorkerInPath;
00093 class Schedule {
00094 public:
00095 typedef std::vector<std::string> vstring;
00096 typedef std::vector<Path> TrigPaths;
00097 typedef std::vector<Path> NonTrigPaths;
00098 typedef boost::shared_ptr<HLTGlobalStatus> TrigResPtr;
00099 typedef boost::shared_ptr<Worker> WorkerPtr;
00100 typedef std::vector<Worker*> AllWorkers;
00101 typedef std::vector<OutputWorker*> AllOutputWorkers;
00102
00103 typedef std::vector<Worker*> Workers;
00104
00105 typedef std::vector<WorkerInPath> PathWorkers;
00106
00107 Schedule(ParameterSet& proc_pset,
00108 service::TriggerNamesService& tns,
00109 ProductRegistry& pregistry,
00110 ActionTable const& actions,
00111 boost::shared_ptr<ActivityRegistry> areg,
00112 boost::shared_ptr<ProcessConfiguration> processConfiguration);
00113
00114 enum State { Ready = 0, Running, Latched };
00115
00116 template <typename T>
00117 void processOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& eventSetup);
00118
00119 void beginJob();
00120 void endJob();
00121
00122
00123 void writeLumi(LuminosityBlockPrincipal const& lbp);
00124
00125
00126 void writeRun(RunPrincipal const& rp);
00127
00128
00129 void closeOutputFiles();
00130
00131
00132 void openNewOutputFilesIfNeeded();
00133
00134
00135 void openOutputFiles(FileBlock& fb);
00136
00137
00138 void respondToOpenInputFile(FileBlock const& fb);
00139
00140
00141 void respondToCloseInputFile(FileBlock const& fb);
00142
00143
00144 void respondToOpenOutputFiles(FileBlock const& fb);
00145
00146
00147 void respondToCloseOutputFiles(FileBlock const& fb);
00148
00149
00150 bool shouldWeCloseOutput() const;
00151
00152 void preForkReleaseResources();
00153 void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren);
00154
00155 std::pair<double, double> timeCpuReal() const {
00156 return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
00157 }
00158
00161
00165 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
00166
00168 void availablePaths(std::vector<std::string>& oLabelsToFill) const;
00169
00171 void modulesInPath(std::string const& iPathLabel,
00172 std::vector<std::string>& oLabelsToFill) const;
00173
00177 int totalEvents() const {
00178 return total_events_;
00179 }
00180
00183 int totalEventsPassed() const {
00184 return total_passed_;
00185 }
00186
00189 int totalEventsFailed() const {
00190 return totalEvents() - totalEventsPassed();
00191 }
00192
00195 void enableEndPaths(bool active);
00196
00199 bool endPathsEnabled() const;
00200
00203 void getTriggerReport(TriggerReport& rep) const;
00204
00206 bool const terminate() const;
00207
00209 void clearCounters();
00210
00213 bool changeModule(std::string const& iLabel, ParameterSet const& iPSet);
00214
00215 private:
00216
00217 AllWorkers::const_iterator workersBegin() const {
00218 return all_workers_.begin();
00219 }
00220
00221 AllWorkers::const_iterator workersEnd() const {
00222 return all_workers_.end();
00223 }
00224
00225 AllWorkers::iterator workersBegin() {
00226 return all_workers_.begin();
00227 }
00228
00229 AllWorkers::iterator workersEnd() {
00230 return all_workers_.end();
00231 }
00232
00233 void resetAll();
00234
00235 template <typename T>
00236 bool runTriggerPaths(typename T::MyPrincipal&, EventSetup const&);
00237
00238 template <typename T>
00239 void runEndPaths(typename T::MyPrincipal&, EventSetup const&);
00240
00241 void setupOnDemandSystem(EventPrincipal& principal, EventSetup const& es);
00242
00243 void reportSkipped(EventPrincipal const& ep) const;
00244 void reportSkipped(LuminosityBlockPrincipal const&) const {}
00245 void reportSkipped(RunPrincipal const&) const {}
00246
00247 void fillWorkers(ParameterSet& proc_pset,
00248 ProductRegistry& preg,
00249 boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00250 std::string const& name, bool ignoreFilters, PathWorkers& out);
00251 void fillTrigPath(ParameterSet& proc_pset,
00252 ProductRegistry& preg,
00253 boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00254 int bitpos, std::string const& name, TrigResPtr);
00255 void fillEndPath(ParameterSet& proc_pset,
00256 ProductRegistry& preg,
00257 boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00258 int bitpos, std::string const& name);
00259
00260 void limitOutput(ParameterSet const& proc_pset);
00261
00262 void addToAllWorkers(Worker* w);
00263
00264 WorkerRegistry worker_reg_;
00265 ActionTable const* act_table_;
00266 boost::shared_ptr<ActivityRegistry> actReg_;
00267
00268 State state_;
00269 vstring trig_name_list_;
00270 vstring end_path_name_list_;
00271
00272 TrigResPtr results_;
00273 TrigResPtr endpath_results_;
00274
00275 WorkerPtr results_inserter_;
00276 AllWorkers all_workers_;
00277 AllOutputWorkers all_output_workers_;
00278 TrigPaths trig_paths_;
00279 TrigPaths end_paths_;
00280
00281 bool wantSummary_;
00282 int total_events_;
00283 int total_passed_;
00284 RunStopwatch::StopwatchPointer stopwatch_;
00285
00286 boost::shared_ptr<UnscheduledCallProducer> unscheduled_;
00287
00288 volatile bool endpathsAreActive_;
00289 };
00290
00291
00292
00293
00294
00295
00296 template <typename T>
00297 class ProcessOneOccurrence {
00298 public:
00299 typedef void result_type;
00300 ProcessOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& setup) :
00301 ep(principal), es(setup) {};
00302
00303 void operator()(Path& p) {p.processOneOccurrence<T>(ep, es);}
00304
00305 private:
00306 typename T::MyPrincipal& ep;
00307 EventSetup const& es;
00308 };
00309
00310 class UnscheduledCallProducer : public UnscheduledHandler {
00311 public:
00312 UnscheduledCallProducer() : UnscheduledHandler(), labelToWorkers_() {}
00313 void addWorker(Worker* aWorker) {
00314 assert(0 != aWorker);
00315 labelToWorkers_[aWorker->description().moduleLabel()] = aWorker;
00316 }
00317
00318 template <typename T>
00319 void runNow(typename T::MyPrincipal& p, EventSetup const& es) {
00320
00321 if(!T::isEvent_) {
00322 for(std::map<std::string, Worker*>::iterator it = labelToWorkers_.begin(), itEnd=labelToWorkers_.end();
00323 it != itEnd;
00324 ++it) {
00325 CPUTimer timer;
00326 it->second->doWork<T>(p, es, 0, &timer);
00327 }
00328 }
00329 }
00330
00331 private:
00332 virtual bool tryToFillImpl(std::string const& moduleLabel,
00333 EventPrincipal& event,
00334 EventSetup const& eventSetup,
00335 CurrentProcessingContext const* iContext) {
00336 std::map<std::string, Worker*>::const_iterator itFound =
00337 labelToWorkers_.find(moduleLabel);
00338 if(itFound != labelToWorkers_.end()) {
00339 CPUTimer timer;
00340 itFound->second->doWork<OccurrenceTraits<EventPrincipal, BranchActionBegin> >(event, eventSetup, iContext, &timer);
00341 return true;
00342 }
00343 return false;
00344 }
00345 std::map<std::string, Worker*> labelToWorkers_;
00346 };
00347
00348 void
00349 inline
00350 Schedule::reportSkipped(EventPrincipal const& ep) const {
00351 Service<JobReport> reportSvc;
00352 reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
00353 }
00354
00355 template <typename T>
00356 void
00357 Schedule::processOneOccurrence(typename T::MyPrincipal& ep, EventSetup const& es) {
00358 this->resetAll();
00359 state_ = Running;
00360
00361
00362 RunStopwatch stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer());
00363
00364 if (T::isEvent_) {
00365 ++total_events_;
00366 setupOnDemandSystem(dynamic_cast<EventPrincipal&>(ep), es);
00367 }
00368 try {
00369 try {
00370
00371 unscheduled_->runNow<T>(ep, es);
00372 if (runTriggerPaths<T>(ep, es)) {
00373 if (T::isEvent_) ++total_passed_;
00374 }
00375 state_ = Latched;
00376 }
00377 catch(cms::Exception& e) {
00378 actions::ActionCodes action = (T::isEvent_ ? act_table_->find(e.rootCause()) : actions::Rethrow);
00379 assert (action != actions::IgnoreCompletely);
00380 assert (action != actions::FailPath);
00381 assert (action != actions::FailModule);
00382 if (action == actions::SkipEvent) {
00383 LogWarning(e.category())
00384 << "an exception occurred and all paths for the event are being skipped: \n"
00385 << e.what();
00386 } else {
00387 throw;
00388 }
00389 }
00390
00391 try {
00392 CPUTimer timer;
00393 if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, 0, &timer);
00394 }
00395 catch (cms::Exception& e) {
00396 e << "EventProcessingStopped\n";
00397 e << "Attempt to insert TriggerResults into event failed.\n";
00398 throw;
00399 }
00400
00401 if (endpathsAreActive_) runEndPaths<T>(ep, es);
00402 }
00403 catch(cms::Exception& e) {
00404 actions::ActionCodes action = (T::isEvent_ ? act_table_->find(e.rootCause()) : actions::Rethrow);
00405 assert (action != actions::SkipEvent);
00406 assert (action != actions::FailPath);
00407 assert (action != actions::FailModule);
00408 switch(action) {
00409 case actions::IgnoreCompletely: {
00410 LogWarning(e.category())
00411 << "exception being ignored for current event:\n"
00412 << e.what();
00413 break;
00414 }
00415 default: {
00416 state_ = Ready;
00417 e << "EventProcessingStopped\n";
00418 e << "an exception occurred during current event processing\n";
00419 throw;
00420 }
00421 }
00422 }
00423 catch(...) {
00424 LogError("PassingThrough")
00425 << "an exception occurred during current event processing\n";
00426 state_ = Ready;
00427 throw;
00428 }
00429
00430
00431 state_ = Ready;
00432
00433 }
00434
00435 template <typename T>
00436 bool
00437 Schedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00438 for_all(trig_paths_, ProcessOneOccurrence<T>(ep, es));
00439 return results_->accept();
00440 }
00441
00442 template <typename T>
00443 void
00444 Schedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00445
00446
00447 for_all(end_paths_, ProcessOneOccurrence<T>(ep, es));
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460 }
00461 }
00462
00463 #endif