CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
WorkerManager.cc
Go to the documentation of this file.
2 
8 
9 static const std::string kFilterType("EDFilter");
10 static const std::string kProducerType("EDProducer");
11 
12 namespace edm {
13  // -----------------------------
14 
15  WorkerManager::WorkerManager(boost::shared_ptr<ActivityRegistry> areg, ExceptionToActionTable const& actions) :
16  workerReg_(areg),
17  actionTable_(&actions),
18  allWorkers_(),
19  unscheduled_(new UnscheduledCallProducer) {
20  } // WorkerManager::WorkerManager
21 
22  WorkerManager::WorkerManager(boost::shared_ptr<ModuleRegistry> modReg,
23  boost::shared_ptr<ActivityRegistry> areg,
25  workerReg_(areg,modReg),
26  actionTable_(&actions),
27  allWorkers_(),
28  unscheduled_(new UnscheduledCallProducer) {
29  } // WorkerManager::WorkerManager
30 
34  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
35  std::string const & label) {
36  WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
37  return workerReg_.getWorker(params, label);
38  }
39 
43  boost::shared_ptr<ProcessConfiguration> processConfiguration,
45  bool useStopwatch,
46  std::set<std::string>& unscheduledLabels,
47  std::vector<std::string>& shouldBeUsedLabels) {
48  //Need to
49  // 1) create worker
50  // 2) if it is a WorkerT<EDProducer>, add it to our list
51  auto modType = pset.getParameter<std::string>("@module_edm_type");
52  if(modType == kProducerType || modType == kFilterType) {
53  Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
54  assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
55  unscheduledLabels.insert(label);
56  unscheduled_->addWorker(newWorker);
57  //add to list so it gets reset each new event
58  addToAllWorkers(newWorker, useStopwatch);
59  } else {
60  shouldBeUsedLabels.push_back(label);
61  }
62  }
63 
64  void WorkerManager::setOnDemandProducts(ProductRegistry& pregistry, std::set<std::string> const& unscheduledLabels) const {
65  for(auto& prod : pregistry.productListUpdator()) {
66  if(prod.second.produced() &&
67  prod.second.branchType() == InEvent &&
68  unscheduledLabels.end() != unscheduledLabels.find(prod.second.moduleLabel())) {
69  prod.second.setOnDemand(true);
70  }
71  }
72  }
73 
75  for(auto& worker : allWorkers_) {
76  worker->endJob();
77  }
78  }
79 
81  for(auto& worker : allWorkers_) {
82  try {
84  worker->endJob();
85  });
86  }
87  catch (cms::Exception const& ex) {
88  collector.addException(ex);
89  }
90  }
91  }
92 
93 
94  void WorkerManager::beginJob(ProductRegistry const& iRegistry) {
95  auto const runLookup = iRegistry.productLookup(InRun);
96  auto const lumiLookup = iRegistry.productLookup(InLumi);
97  auto const eventLookup = iRegistry.productLookup(InEvent);
98  for(auto& worker : allWorkers_) {
99  worker->updateLookup(InRun,*runLookup);
100  worker->updateLookup(InLumi,*lumiLookup);
101  worker->updateLookup(InEvent,*eventLookup);
102  }
103 
104  for_all(allWorkers_, boost::bind(&Worker::beginJob, _1));
106  }
107 
108  void
110  for(auto& worker: allWorkers_) {
111  worker->beginStream(iID, streamContext);
112  }
113  }
114 
115  void
117  for(auto& worker: allWorkers_) {
118  worker->endStream(iID, streamContext);
119  }
120  }
121 
122  void
124  for_all(allWorkers_, boost::bind(&Worker::reset, _1));
125  }
126 
127  void
128  WorkerManager::addToAllWorkers(Worker* w, bool useStopwatch) {
129  if(!search_all(allWorkers_, w)) {
130  if(useStopwatch) {
131  w->useStopwatch();
132  }
133  allWorkers_.push_back(w);
134  }
135  }
136 
137  void
139  // NOTE: who owns the productdescrption? Just copied by value
140  unscheduled_->setEventSetup(es);
142  }
143 
144 }
T getParameter(std::string const &) const
void addException(cms::Exception const &exception)
void endStream(StreamID iID, StreamContext &streamContext)
const double w
Definition: UKUtility.cc:23
static const std::string kFilterType("EDFilter")
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
AllWorkers allWorkers_
Definition: WorkerManager.h:85
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, bool useStopwatch, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
void beginJob(ProductRegistry const &iRegistry)
processConfiguration
Definition: Schedule.cc:369
actions
Definition: Schedule.cc:369
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:83
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
boost::shared_ptr< ProductHolderIndexHelper > const & productLookup(BranchType branchType) const
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void reset()
Definition: Worker.h:92
void useStopwatch()
Definition: Worker.cc:185
boost::shared_ptr< UnscheduledCallProducer > unscheduled_
Definition: WorkerManager.h:87
areg
Definition: Schedule.cc:369
virtual Types moduleType() const =0
void beginStream(StreamID iID, StreamContext &streamContext)
WorkerManager(boost::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
static const std::string kProducerType("EDProducer")
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
ProductList & productListUpdator()
void loadMissingDictionaries()
void beginJob()
Definition: Worker.cc:105
WorkerRegistry workerReg_
Definition: WorkerManager.h:82
Worker * getWorker(WorkerParams const &p, std::string const &moduleLabel)
Retrieve the particular instance of the worker.
void setUnscheduledHandler(boost::shared_ptr< UnscheduledHandler > iHandler)
auto wrap(F iFunc) -> decltype(iFunc())
void addToAllWorkers(Worker *w, bool useStopwatch)
preg
Definition: Schedule.cc:369
void setupOnDemandSystem(EventPrincipal &principal, EventSetup const &es)
prealloc
Definition: Schedule.cc:369