00001 #ifndef FWCore_Framework_Schedule_h
00002 #define FWCore_Framework_Schedule_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
00061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00062 #include "FWCore/Framework/interface/Actions.h"
00063 #include "FWCore/Framework/interface/EventPrincipal.h"
00064 #include "FWCore/Framework/interface/Frameworkfwd.h"
00065 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00066 #include "FWCore/Framework/interface/UnscheduledHandler.h"
00067 #include "FWCore/Framework/src/Path.h"
00068 #include "FWCore/Framework/src/RunStopwatch.h"
00069 #include "FWCore/Framework/src/Worker.h"
00070 #include "FWCore/Framework/src/WorkerRegistry.h"
00071 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
00072 #include "FWCore/MessageLogger/interface/JobReport.h"
00073 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00074 #include "FWCore/ServiceRegistry/interface/Service.h"
00075 #include "FWCore/Utilities/interface/Algorithms.h"
00076 #include "FWCore/Utilities/interface/BranchType.h"
00077 #include "FWCore/Utilities/interface/ConvertException.h"
00078 #include "FWCore/Utilities/interface/Exception.h"
00079
00080 #include "boost/shared_ptr.hpp"
00081
00082 #include <map>
00083 #include <memory>
00084 #include <set>
00085 #include <string>
00086 #include <vector>
00087 #include <sstream>
00088
00089 namespace edm {
00090 namespace service {
00091 class TriggerNamesService;
00092 }
00093 class ActivityRegistry;
00094 class EventSetup;
00095 class ExceptionCollector;
00096 class OutputWorker;
00097 class RunStopwatch;
00098 class UnscheduledCallProducer;
00099 class WorkerInPath;
00100 class Schedule {
00101 public:
00102 typedef std::vector<std::string> vstring;
00103 typedef std::vector<Path> TrigPaths;
00104 typedef std::vector<Path> NonTrigPaths;
00105 typedef boost::shared_ptr<HLTGlobalStatus> TrigResPtr;
00106 typedef boost::shared_ptr<Worker> WorkerPtr;
00107 typedef std::vector<Worker*> AllWorkers;
00108 typedef std::vector<OutputWorker*> AllOutputWorkers;
00109
00110 typedef std::vector<Worker*> Workers;
00111
00112 typedef std::vector<WorkerInPath> PathWorkers;
00113
00114 Schedule(ParameterSet& proc_pset,
00115 service::TriggerNamesService& tns,
00116 ProductRegistry& pregistry,
00117 ActionTable const& actions,
00118 boost::shared_ptr<ActivityRegistry> areg,
00119 boost::shared_ptr<ProcessConfiguration> processConfiguration);
00120
00121 enum State { Ready = 0, Running, Latched };
00122
00123 template <typename T>
00124 void processOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& eventSetup);
00125
00126 void beginJob();
00127 void endJob(ExceptionCollector & collector);
00128
00129
00130 void writeLumi(LuminosityBlockPrincipal const& lbp);
00131
00132
00133 void writeRun(RunPrincipal const& rp);
00134
00135
00136 void closeOutputFiles();
00137
00138
00139 void openNewOutputFilesIfNeeded();
00140
00141
00142 void openOutputFiles(FileBlock& fb);
00143
00144
00145 void respondToOpenInputFile(FileBlock const& fb);
00146
00147
00148 void respondToCloseInputFile(FileBlock const& fb);
00149
00150
00151 void respondToOpenOutputFiles(FileBlock const& fb);
00152
00153
00154 void respondToCloseOutputFiles(FileBlock const& fb);
00155
00156
00157 bool shouldWeCloseOutput() const;
00158
00159 void preForkReleaseResources();
00160 void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren);
00161
00162 std::pair<double, double> timeCpuReal() const {
00163 return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
00164 }
00165
00168
00172 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
00173
00175 void availablePaths(std::vector<std::string>& oLabelsToFill) const;
00176
00178 void modulesInPath(std::string const& iPathLabel,
00179 std::vector<std::string>& oLabelsToFill) const;
00180
00184 int totalEvents() const {
00185 return total_events_;
00186 }
00187
00190 int totalEventsPassed() const {
00191 return total_passed_;
00192 }
00193
00196 int totalEventsFailed() const {
00197 return totalEvents() - totalEventsPassed();
00198 }
00199
00202 void enableEndPaths(bool active);
00203
00206 bool endPathsEnabled() const;
00207
00210 void getTriggerReport(TriggerReport& rep) const;
00211
00213 bool const terminate() const;
00214
00216 void clearCounters();
00217
00220 bool changeModule(std::string const& iLabel, ParameterSet const& iPSet);
00221
00222 private:
00223
00224 AllWorkers::const_iterator workersBegin() const {
00225 return all_workers_.begin();
00226 }
00227
00228 AllWorkers::const_iterator workersEnd() const {
00229 return all_workers_.end();
00230 }
00231
00232 AllWorkers::iterator workersBegin() {
00233 return all_workers_.begin();
00234 }
00235
00236 AllWorkers::iterator workersEnd() {
00237 return all_workers_.end();
00238 }
00239
00240 void resetAll();
00241
00242 template <typename T>
00243 bool runTriggerPaths(typename T::MyPrincipal&, EventSetup const&);
00244
00245 template <typename T>
00246 void runEndPaths(typename T::MyPrincipal&, EventSetup const&);
00247
00248 void setupOnDemandSystem(EventPrincipal& principal, EventSetup const& es);
00249
00250 void reportSkipped(EventPrincipal const& ep) const;
00251 void reportSkipped(LuminosityBlockPrincipal const&) const {}
00252 void reportSkipped(RunPrincipal const&) const {}
00253
00254 void fillWorkers(ParameterSet& proc_pset,
00255 ProductRegistry& preg,
00256 boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00257 std::string const& name, bool ignoreFilters, PathWorkers& out);
00258 void fillTrigPath(ParameterSet& proc_pset,
00259 ProductRegistry& preg,
00260 boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00261 int bitpos, std::string const& name, TrigResPtr);
00262 void fillEndPath(ParameterSet& proc_pset,
00263 ProductRegistry& preg,
00264 boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00265 int bitpos, std::string const& name);
00266
00267 void limitOutput(ParameterSet const& proc_pset);
00268
00269 void addToAllWorkers(Worker* w);
00270
00271 WorkerRegistry worker_reg_;
00272 ActionTable const* act_table_;
00273 boost::shared_ptr<ActivityRegistry> actReg_;
00274
00275 State state_;
00276 vstring trig_name_list_;
00277 vstring end_path_name_list_;
00278
00279 TrigResPtr results_;
00280 TrigResPtr endpath_results_;
00281
00282 WorkerPtr results_inserter_;
00283 AllWorkers all_workers_;
00284 AllOutputWorkers all_output_workers_;
00285 TrigPaths trig_paths_;
00286 TrigPaths end_paths_;
00287
00288 bool wantSummary_;
00289 int total_events_;
00290 int total_passed_;
00291 RunStopwatch::StopwatchPointer stopwatch_;
00292
00293 boost::shared_ptr<UnscheduledCallProducer> unscheduled_;
00294
00295 volatile bool endpathsAreActive_;
00296
00297 bool printedFirstException_;
00298 };
00299
00300
00301
00302
00303
00304
00305 template <typename T>
00306 class ProcessOneOccurrence {
00307 public:
00308 typedef void result_type;
00309 ProcessOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& setup) :
00310 ep(principal), es(setup) {};
00311
00312 void operator()(Path& p) {p.processOneOccurrence<T>(ep, es);}
00313
00314 private:
00315 typename T::MyPrincipal& ep;
00316 EventSetup const& es;
00317 };
00318
00319 class UnscheduledCallProducer : public UnscheduledHandler {
00320 public:
00321 UnscheduledCallProducer() : UnscheduledHandler(), labelToWorkers_() {}
00322 void addWorker(Worker* aWorker) {
00323 assert(0 != aWorker);
00324 labelToWorkers_[aWorker->description().moduleLabel()] = aWorker;
00325 }
00326
00327 template <typename T>
00328 void runNow(typename T::MyPrincipal& p, EventSetup const& es) {
00329
00330 if(!T::isEvent_) {
00331 for(std::map<std::string, Worker*>::iterator it = labelToWorkers_.begin(), itEnd=labelToWorkers_.end();
00332 it != itEnd;
00333 ++it) {
00334 CPUTimer timer;
00335 try {
00336 it->second->doWork<T>(p, es, 0, &timer);
00337 }
00338 catch (cms::Exception & ex) {
00339 std::ostringstream ost;
00340 if (T::isEvent_) {
00341 ost << "Calling event method";
00342 }
00343 else if (T::begin_ && T::branchType_ == InRun) {
00344 ost << "Calling beginRun";
00345 }
00346 else if (T::begin_ && T::branchType_ == InLumi) {
00347 ost << "Calling beginLuminosityBlock";
00348 }
00349 else if (!T::begin_ && T::branchType_ == InLumi) {
00350 ost << "Calling endLuminosityBlock";
00351 }
00352 else if (!T::begin_ && T::branchType_ == InRun) {
00353 ost << "Calling endRun";
00354 }
00355 else {
00356
00357 ost << "Calling unknown function";
00358 }
00359 ost << " for unscheduled module " << it->second->description().moduleName()
00360 << "/'" << it->second->description().moduleLabel() << "'";
00361 ex.addContext(ost.str());
00362 ost.str("");
00363 ost << "Processing " << p.id();
00364 ex.addContext(ost.str());
00365 throw;
00366 }
00367 }
00368 }
00369 }
00370
00371 private:
00372 virtual bool tryToFillImpl(std::string const& moduleLabel,
00373 EventPrincipal& event,
00374 EventSetup const& eventSetup,
00375 CurrentProcessingContext const* iContext) {
00376 std::map<std::string, Worker*>::const_iterator itFound =
00377 labelToWorkers_.find(moduleLabel);
00378 if(itFound != labelToWorkers_.end()) {
00379 CPUTimer timer;
00380 try {
00381 itFound->second->doWork<OccurrenceTraits<EventPrincipal, BranchActionBegin> >(event, eventSetup, iContext, &timer);
00382 }
00383 catch (cms::Exception & ex) {
00384 std::ostringstream ost;
00385 ost << "Calling produce method for unscheduled module "
00386 << itFound->second->description().moduleName() << "/'"
00387 << itFound->second->description().moduleLabel() << "'";
00388 ex.addContext(ost.str());
00389 throw;
00390 }
00391 return true;
00392 }
00393 return false;
00394 }
00395 std::map<std::string, Worker*> labelToWorkers_;
00396 };
00397
00398 void
00399 inline
00400 Schedule::reportSkipped(EventPrincipal const& ep) const {
00401 Service<JobReport> reportSvc;
00402 reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
00403 }
00404
00405 template <typename T>
00406 void
00407 Schedule::processOneOccurrence(typename T::MyPrincipal& ep, EventSetup const& es) {
00408 this->resetAll();
00409 state_ = Running;
00410
00411
00412 RunStopwatch stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer());
00413
00414 if (T::isEvent_) {
00415 ++total_events_;
00416 setupOnDemandSystem(dynamic_cast<EventPrincipal&>(ep), es);
00417 }
00418 try {
00419 try {
00420 try {
00421
00422 unscheduled_->runNow<T>(ep, es);
00423 if (runTriggerPaths<T>(ep, es)) {
00424 if (T::isEvent_) ++total_passed_;
00425 }
00426 state_ = Latched;
00427 }
00428 catch(cms::Exception& e) {
00429 actions::ActionCodes action = (T::isEvent_ ? act_table_->find(e.category()) : actions::Rethrow);
00430 assert (action != actions::IgnoreCompletely);
00431 assert (action != actions::FailPath);
00432 if (action == actions::SkipEvent) {
00433 edm::printCmsExceptionWarning("SkipEvent", e);
00434 } else {
00435 throw;
00436 }
00437 }
00438
00439 try {
00440 CPUTimer timer;
00441 if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, 0, &timer);
00442 }
00443 catch (cms::Exception & ex) {
00444 if (T::isEvent_) {
00445 ex.addContext("Calling produce method for module TriggerResultInserter");
00446 }
00447 std::ostringstream ost;
00448 ost << "Processing " << ep.id();
00449 ex.addContext(ost.str());
00450 throw;
00451 }
00452
00453 if (endpathsAreActive_) runEndPaths<T>(ep, es);
00454 }
00455 catch (cms::Exception& e) { throw; }
00456 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00457 catch (std::exception& e) { convertException::stdToEDM(e); }
00458 catch(std::string& s) { convertException::stringToEDM(s); }
00459 catch(char const* c) { convertException::charPtrToEDM(c); }
00460 catch (...) { convertException::unknownToEDM(); }
00461 }
00462 catch(cms::Exception& ex) {
00463 if (!printedFirstException_) {
00464 Service<JobReport> jobReportSvc;
00465 if (ex.context().empty()) {
00466 ex.addContext("Calling function Schedule::processOneOccurrence");
00467 }
00468 if (jobReportSvc.isAvailable()) {
00469 JobReport *jobRep = jobReportSvc.operator->();
00470 edm::printCmsException(ex, jobRep, ex.returnCode());
00471 }
00472 else {
00473 edm::printCmsException(ex);
00474 }
00475 ex.setAlreadyPrinted(true);
00476 printedFirstException_ = true;
00477 }
00478 state_ = Ready;
00479 throw;
00480 }
00481
00482 state_ = Ready;
00483 }
00484
00485 template <typename T>
00486 bool
00487 Schedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00488 for_all(trig_paths_, ProcessOneOccurrence<T>(ep, es));
00489 return results_->accept();
00490 }
00491
00492 template <typename T>
00493 void
00494 Schedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00495
00496
00497 for_all(end_paths_, ProcessOneOccurrence<T>(ep, es));
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508
00509
00510 }
00511 }
00512
00513 #endif