00001 #ifndef FWCore_Framework_Schedule_h
00002 #define FWCore_Framework_Schedule_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
00061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00062 #include "FWCore/Framework/interface/Actions.h"
00063 #include "FWCore/Framework/interface/EventPrincipal.h"
00064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
00065 #include "FWCore/Framework/interface/Frameworkfwd.h"
00066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00067 #include "FWCore/Framework/interface/UnscheduledHandler.h"
00068 #include "FWCore/Framework/src/Path.h"
00069 #include "FWCore/Framework/src/RunStopwatch.h"
00070 #include "FWCore/Framework/src/Worker.h"
00071 #include "FWCore/Framework/src/WorkerRegistry.h"
00072 #include "FWCore/Framework/src/EarlyDeleteHelper.h"
00073 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
00074 #include "FWCore/MessageLogger/interface/JobReport.h"
00075 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00076 #include "FWCore/ServiceRegistry/interface/Service.h"
00077 #include "FWCore/Utilities/interface/Algorithms.h"
00078 #include "FWCore/Utilities/interface/BranchType.h"
00079 #include "FWCore/Utilities/interface/ConvertException.h"
00080 #include "FWCore/Utilities/interface/Exception.h"
00081
00082 #include "boost/shared_ptr.hpp"
00083
00084 #include <map>
00085 #include <memory>
00086 #include <set>
00087 #include <string>
00088 #include <vector>
00089 #include <sstream>
00090
00091 namespace edm {
00092 namespace service {
00093 class TriggerNamesService;
00094 }
00095 class ActivityRegistry;
00096 class EventSetup;
00097 class ExceptionCollector;
00098 class OutputWorker;
00099 class RunStopwatch;
00100 class UnscheduledCallProducer;
00101 class WorkerInPath;
00102 class Schedule {
00103 public:
00104 typedef std::vector<std::string> vstring;
00105 typedef std::vector<Path> TrigPaths;
00106 typedef std::vector<Path> NonTrigPaths;
00107 typedef boost::shared_ptr<HLTGlobalStatus> TrigResPtr;
00108 typedef boost::shared_ptr<Worker> WorkerPtr;
00109 typedef std::vector<Worker*> AllWorkers;
00110 typedef std::vector<OutputWorker*> AllOutputWorkers;
00111
00112 typedef std::vector<Worker*> Workers;
00113
00114 typedef std::vector<WorkerInPath> PathWorkers;
00115
00116 Schedule(ParameterSet& proc_pset,
00117 service::TriggerNamesService& tns,
00118 ProductRegistry& pregistry,
00119 ActionTable const& actions,
00120 boost::shared_ptr<ActivityRegistry> areg,
00121 boost::shared_ptr<ProcessConfiguration> processConfiguration,
00122 const ParameterSet* subProcPSet);
00123
00124 enum State { Ready = 0, Running, Latched };
00125
00126 template <typename T>
00127 void processOneOccurrence(typename T::MyPrincipal& principal,
00128 EventSetup const& eventSetup,
00129 bool cleaningUpAfterException = false);
00130
00131 void beginJob();
00132 void endJob(ExceptionCollector & collector);
00133
00134
00135 void writeLumi(LuminosityBlockPrincipal const& lbp);
00136
00137
00138 void writeRun(RunPrincipal const& rp);
00139
00140
00141 void closeOutputFiles();
00142
00143
00144 void openNewOutputFilesIfNeeded();
00145
00146
00147 void openOutputFiles(FileBlock& fb);
00148
00149
00150 void respondToOpenInputFile(FileBlock const& fb);
00151
00152
00153 void respondToCloseInputFile(FileBlock const& fb);
00154
00155
00156 void respondToOpenOutputFiles(FileBlock const& fb);
00157
00158
00159 void respondToCloseOutputFiles(FileBlock const& fb);
00160
00161
00162 bool shouldWeCloseOutput() const;
00163
00164 void preForkReleaseResources();
00165 void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren);
00166
00167 std::pair<double, double> timeCpuReal() const {
00168 return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
00169 }
00170
00173
00177 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
00178
00180 void availablePaths(std::vector<std::string>& oLabelsToFill) const;
00181
00183 void modulesInPath(std::string const& iPathLabel,
00184 std::vector<std::string>& oLabelsToFill) const;
00185
00189 int totalEvents() const {
00190 return total_events_;
00191 }
00192
00195 int totalEventsPassed() const {
00196 return total_passed_;
00197 }
00198
00201 int totalEventsFailed() const {
00202 return totalEvents() - totalEventsPassed();
00203 }
00204
00207 void enableEndPaths(bool active);
00208
00211 bool endPathsEnabled() const;
00212
00215 void getTriggerReport(TriggerReport& rep) const;
00216
00218 bool terminate() const;
00219
00221 void clearCounters();
00222
00225 bool changeModule(std::string const& iLabel, ParameterSet const& iPSet);
00226
00227 private:
00228
00229 AllWorkers::const_iterator workersBegin() const {
00230 return all_workers_.begin();
00231 }
00232
00233 AllWorkers::const_iterator workersEnd() const {
00234 return all_workers_.end();
00235 }
00236
00237 AllWorkers::iterator workersBegin() {
00238 return all_workers_.begin();
00239 }
00240
00241 AllWorkers::iterator workersEnd() {
00242 return all_workers_.end();
00243 }
00244
00245 void resetAll();
00246
00247 template <typename T>
00248 bool runTriggerPaths(typename T::MyPrincipal&, EventSetup const&);
00249
00250 template <typename T>
00251 void runEndPaths(typename T::MyPrincipal&, EventSetup const&);
00252
00253 void setupOnDemandSystem(EventPrincipal& principal, EventSetup const& es);
00254
00255 void reportSkipped(EventPrincipal const& ep) const;
00256 void reportSkipped(LuminosityBlockPrincipal const&) const {}
00257 void reportSkipped(RunPrincipal const&) const {}
00258
00259 void reduceParameterSet(ParameterSet& proc_pset,
00260 vstring& modulesInConfig,
00261 std::set<std::string> const& modulesInConfigSet,
00262 vstring& labelsOnTriggerPaths,
00263 vstring& shouldBeUsedLabels,
00264 std::map<std::string, std::vector<std::pair<std::string, int> > >& outputModulePathPositions);
00265
00266 void fillWorkers(ParameterSet& proc_pset,
00267 ProductRegistry& preg,
00268 boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00269 std::string const& name, bool ignoreFilters, PathWorkers& out,
00270 vstring* labelsOnPaths);
00271 void fillTrigPath(ParameterSet& proc_pset,
00272 ProductRegistry& preg,
00273 boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00274 int bitpos, std::string const& name, TrigResPtr,
00275 vstring* labelsOnTriggerPaths);
00276 void fillEndPath(ParameterSet& proc_pset,
00277 ProductRegistry& preg,
00278 boost::shared_ptr<ProcessConfiguration const> processConfiguration,
00279 int bitpos, std::string const& name);
00280
00281 void limitOutput(ParameterSet const& proc_pset);
00282
00283 void addToAllWorkers(Worker* w);
00284
00285 void resetEarlyDelete();
00286 void initializeEarlyDelete(edm::ParameterSet const& opts,
00287 edm::ProductRegistry const& preg,
00288 edm::ParameterSet const* subProcPSet);
00289
00290 WorkerRegistry worker_reg_;
00291 ActionTable const* act_table_;
00292 boost::shared_ptr<ActivityRegistry> actReg_;
00293
00294 State state_;
00295 vstring trig_name_list_;
00296 vstring end_path_name_list_;
00297
00298 TrigResPtr results_;
00299 TrigResPtr endpath_results_;
00300
00301 WorkerPtr results_inserter_;
00302 AllWorkers all_workers_;
00303 AllOutputWorkers all_output_workers_;
00304 TrigPaths trig_paths_;
00305 TrigPaths end_paths_;
00306
00307
00308
00309
00310 std::vector<std::pair<BranchID,unsigned int>> earlyDeleteBranchToCount_;
00311
00312
00313
00314
00315
00316
00317 std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
00318
00319
00320 std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
00321
00322 bool wantSummary_;
00323 int total_events_;
00324 int total_passed_;
00325 RunStopwatch::StopwatchPointer stopwatch_;
00326
00327 boost::shared_ptr<UnscheduledCallProducer> unscheduled_;
00328
00329 volatile bool endpathsAreActive_;
00330 };
00331
00332
00333
00334
00335
00336
00337 template <typename T>
00338 class ProcessOneOccurrence {
00339 public:
00340 typedef void result_type;
00341 ProcessOneOccurrence(typename T::MyPrincipal& principal, EventSetup const& setup) :
00342 ep(principal), es(setup) {};
00343
00344 void operator()(Path& p) {p.processOneOccurrence<T>(ep, es);}
00345
00346 private:
00347 typename T::MyPrincipal& ep;
00348 EventSetup const& es;
00349 };
00350
00351 class UnscheduledCallProducer : public UnscheduledHandler {
00352 public:
00353 UnscheduledCallProducer() : UnscheduledHandler(), labelToWorkers_() {}
00354 void addWorker(Worker* aWorker) {
00355 assert(0 != aWorker);
00356 labelToWorkers_[aWorker->description().moduleLabel()] = aWorker;
00357 }
00358
00359 template <typename T>
00360 void runNow(typename T::MyPrincipal& p, EventSetup const& es) {
00361
00362 if(!T::isEvent_) {
00363 for(std::map<std::string, Worker*>::iterator it = labelToWorkers_.begin(), itEnd=labelToWorkers_.end();
00364 it != itEnd;
00365 ++it) {
00366 CPUTimer timer;
00367 try {
00368 it->second->doWork<T>(p, es, 0, &timer);
00369 }
00370 catch (cms::Exception & ex) {
00371 std::ostringstream ost;
00372 if (T::isEvent_) {
00373 ost << "Calling event method";
00374 }
00375 else if (T::begin_ && T::branchType_ == InRun) {
00376 ost << "Calling beginRun";
00377 }
00378 else if (T::begin_ && T::branchType_ == InLumi) {
00379 ost << "Calling beginLuminosityBlock";
00380 }
00381 else if (!T::begin_ && T::branchType_ == InLumi) {
00382 ost << "Calling endLuminosityBlock";
00383 }
00384 else if (!T::begin_ && T::branchType_ == InRun) {
00385 ost << "Calling endRun";
00386 }
00387 else {
00388
00389 ost << "Calling unknown function";
00390 }
00391 ost << " for unscheduled module " << it->second->description().moduleName()
00392 << "/'" << it->second->description().moduleLabel() << "'";
00393 ex.addContext(ost.str());
00394 ost.str("");
00395 ost << "Processing " << p.id();
00396 ex.addContext(ost.str());
00397 throw;
00398 }
00399 }
00400 }
00401 }
00402
00403 private:
00404 virtual bool tryToFillImpl(std::string const& moduleLabel,
00405 EventPrincipal& event,
00406 EventSetup const& eventSetup,
00407 CurrentProcessingContext const* iContext) {
00408 std::map<std::string, Worker*>::const_iterator itFound =
00409 labelToWorkers_.find(moduleLabel);
00410 if(itFound != labelToWorkers_.end()) {
00411 CPUTimer timer;
00412 try {
00413 itFound->second->doWork<OccurrenceTraits<EventPrincipal, BranchActionBegin> >(event, eventSetup, iContext, &timer);
00414 }
00415 catch (cms::Exception & ex) {
00416 std::ostringstream ost;
00417 ost << "Calling produce method for unscheduled module "
00418 << itFound->second->description().moduleName() << "/'"
00419 << itFound->second->description().moduleLabel() << "'";
00420 ex.addContext(ost.str());
00421 throw;
00422 }
00423 return true;
00424 }
00425 return false;
00426 }
00427 std::map<std::string, Worker*> labelToWorkers_;
00428 };
00429
00430 void
00431 inline
00432 Schedule::reportSkipped(EventPrincipal const& ep) const {
00433 Service<JobReport> reportSvc;
00434 reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
00435 }
00436
00437 template <typename T>
00438 void
00439 Schedule::processOneOccurrence(typename T::MyPrincipal& ep,
00440 EventSetup const& es,
00441 bool cleaningUpAfterException) {
00442 this->resetAll();
00443 state_ = Running;
00444
00445
00446 RunStopwatch stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer());
00447
00448 if (T::isEvent_) {
00449 ++total_events_;
00450 setupOnDemandSystem(dynamic_cast<EventPrincipal&>(ep), es);
00451 }
00452 try {
00453 try {
00454 try {
00455
00456 unscheduled_->runNow<T>(ep, es);
00457 if (runTriggerPaths<T>(ep, es)) {
00458 if (T::isEvent_) ++total_passed_;
00459 }
00460 state_ = Latched;
00461 }
00462 catch(cms::Exception& e) {
00463 actions::ActionCodes action = (T::isEvent_ ? act_table_->find(e.category()) : actions::Rethrow);
00464 assert (action != actions::IgnoreCompletely);
00465 assert (action != actions::FailPath);
00466 if (action == actions::SkipEvent) {
00467 edm::printCmsExceptionWarning("SkipEvent", e);
00468 } else {
00469 throw;
00470 }
00471 }
00472
00473 try {
00474 CPUTimer timer;
00475 if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, 0, &timer);
00476 }
00477 catch (cms::Exception & ex) {
00478 if (T::isEvent_) {
00479 ex.addContext("Calling produce method for module TriggerResultInserter");
00480 }
00481 std::ostringstream ost;
00482 ost << "Processing " << ep.id();
00483 ex.addContext(ost.str());
00484 throw;
00485 }
00486
00487 if (endpathsAreActive_) runEndPaths<T>(ep, es);
00488 if(T::isEvent_) resetEarlyDelete();
00489 }
00490 catch (cms::Exception& e) { throw; }
00491 catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
00492 catch (std::exception& e) { convertException::stdToEDM(e); }
00493 catch(std::string& s) { convertException::stringToEDM(s); }
00494 catch(char const* c) { convertException::charPtrToEDM(c); }
00495 catch (...) { convertException::unknownToEDM(); }
00496 }
00497 catch(cms::Exception& ex) {
00498 if (ex.context().empty()) {
00499 addContextAndPrintException("Calling function Schedule::processOneOccurrence", ex, cleaningUpAfterException);
00500 } else {
00501 addContextAndPrintException("", ex, cleaningUpAfterException);
00502 }
00503 state_ = Ready;
00504 throw;
00505 }
00506
00507 state_ = Ready;
00508 }
00509
00510 template <typename T>
00511 bool
00512 Schedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00513 for_all(trig_paths_, ProcessOneOccurrence<T>(ep, es));
00514 return results_->accept();
00515 }
00516
00517 template <typename T>
00518 void
00519 Schedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es) {
00520
00521
00522 for_all(end_paths_, ProcessOneOccurrence<T>(ep, es));
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535 }
00536 }
00537
00538 #endif