CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
WorkerManager.cc
Go to the documentation of this file.
2 
9 
10 static const std::string kFilterType("EDFilter");
11 static const std::string kProducerType("EDProducer");
12 
13 namespace edm {
14  // -----------------------------
15 
16  WorkerManager::WorkerManager(std::shared_ptr<ActivityRegistry> areg, ExceptionToActionTable const& actions) :
17  workerReg_(areg),
18  actionTable_(&actions),
19  allWorkers_(),
20  unscheduled_(new UnscheduledCallProducer) {
21  } // WorkerManager::WorkerManager
22 
23  WorkerManager::WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
24  std::shared_ptr<ActivityRegistry> areg,
26  workerReg_(areg,modReg),
27  actionTable_(&actions),
28  allWorkers_(),
29  unscheduled_(new UnscheduledCallProducer) {
30  } // WorkerManager::WorkerManager
31 
35  std::shared_ptr<ProcessConfiguration const> processConfiguration,
36  std::string const & label) {
37  WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
38  return workerReg_.getWorker(params, label);
39  }
40 
44  std::shared_ptr<ProcessConfiguration> processConfiguration,
46  std::set<std::string>& unscheduledLabels,
47  std::vector<std::string>& shouldBeUsedLabels) {
48  //Need to
49  // 1) create worker
50  // 2) if it is a WorkerT<EDProducer>, add it to our list
51  auto modType = pset.getParameter<std::string>("@module_edm_type");
52  if(modType == kProducerType || modType == kFilterType) {
53  Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
54  assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
55  unscheduledLabels.insert(label);
56  unscheduled_->addWorker(newWorker);
57  //add to list so it gets reset each new event
58  addToAllWorkers(newWorker);
59  } else {
60  shouldBeUsedLabels.push_back(label);
61  }
62  }
63 
64  void WorkerManager::setOnDemandProducts(ProductRegistry& pregistry, std::set<std::string> const& unscheduledLabels) const {
65  for(auto& prod : pregistry.productListUpdator()) {
66  if(prod.second.produced() &&
67  prod.second.branchType() == InEvent &&
68  unscheduledLabels.end() != unscheduledLabels.find(prod.second.moduleLabel())) {
69  prod.second.setOnDemand(true);
70  }
71  }
72  }
73 
75  for(auto& worker : allWorkers_) {
76  worker->endJob();
77  }
78  }
79 
81  for(auto& worker : allWorkers_) {
82  try {
84  worker->endJob();
85  });
86  }
87  catch (cms::Exception const& ex) {
88  collector.addException(ex);
89  }
90  }
91  }
92 
93 
94  void WorkerManager::beginJob(ProductRegistry const& iRegistry) {
95  auto const runLookup = iRegistry.productLookup(InRun);
96  auto const lumiLookup = iRegistry.productLookup(InLumi);
97  auto const eventLookup = iRegistry.productLookup(InEvent);
98  for(auto& worker : allWorkers_) {
99  worker->updateLookup(InRun,*runLookup);
100  worker->updateLookup(InLumi,*lumiLookup);
101  worker->updateLookup(InEvent,*eventLookup);
102  }
103 
104  for_all(allWorkers_, std::bind(&Worker::beginJob, std::placeholders::_1));
105  }
106 
107  void
109  for(auto& worker: allWorkers_) {
110  worker->beginStream(iID, streamContext);
111  }
112  }
113 
114  void
116  for(auto& worker: allWorkers_) {
117  worker->endStream(iID, streamContext);
118  }
119  }
120 
121  void
123  for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1));
124  }
125 
126  void
128  if(!search_all(allWorkers_, w)) {
129  allWorkers_.push_back(w);
130  }
131  }
132 
133  void
135  // NOTE: who owns the productdescrption? Just copied by value
136  unscheduled_->setEventSetup(es);
138  }
139 
140 }
std::shared_ptr< UnscheduledCallProducer const > unscheduled() const
Definition: WorkerManager.h:82
T getParameter(std::string const &) const
void addException(cms::Exception const &exception)
void endStream(StreamID iID, StreamContext &streamContext)
const double w
Definition: UKUtility.cc:23
static const std::string kFilterType("EDFilter")
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
AllWorkers allWorkers_
Definition: WorkerManager.h:88
assert(m_qm.get())
void beginJob(ProductRegistry const &iRegistry)
edm::propagate_const< std::shared_ptr< UnscheduledCallProducer > > unscheduled_
Definition: WorkerManager.h:90
processConfiguration
Definition: Schedule.cc:374
std::shared_ptr< ProductHolderIndexHelper const > productLookup(BranchType branchType) const
actions
Definition: Schedule.cc:374
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:86
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setUnscheduledHandler(std::shared_ptr< UnscheduledHandler > iHandler)
void reset()
Definition: Worker.h:95
areg
Definition: Schedule.cc:374
virtual Types moduleType() const =0
void beginStream(StreamID iID, StreamContext &streamContext)
static const std::string kProducerType("EDProducer")
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
ProductList & productListUpdator()
void beginJob()
Definition: Worker.cc:104
WorkerRegistry workerReg_
Definition: WorkerManager.h:85
Worker * getWorker(WorkerParams const &p, std::string const &moduleLabel)
Retrieve the particular instance of the worker.
auto wrap(F iFunc) -> decltype(iFunc())
WorkerManager(std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
preg
Definition: Schedule.cc:374
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void setupOnDemandSystem(EventPrincipal &principal, EventSetup const &es)
prealloc
Definition: Schedule.cc:374
void addToAllWorkers(Worker *w)