CMS 3D CMS Logo

WorkerManager.cc
Go to the documentation of this file.
3 
10 
11 static const std::string kFilterType("EDFilter");
12 static const std::string kProducerType("EDProducer");
13 
14 namespace edm {
15  // -----------------------------
16 
17  WorkerManager::WorkerManager(std::shared_ptr<ActivityRegistry> areg, ExceptionToActionTable const& actions) :
18  workerReg_(areg),
19  actionTable_(&actions),
20  allWorkers_(),
21  unscheduled_(*areg),
22  lastSetupEventPrincipal_(nullptr)
23  {
24 
25  } // WorkerManager::WorkerManager
26 
27  WorkerManager::WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
28  std::shared_ptr<ActivityRegistry> areg,
30  workerReg_(areg,modReg),
31  actionTable_(&actions),
32  allWorkers_(),
33  unscheduled_(*areg),
35  {
36  } // WorkerManager::WorkerManager
37 
39  ProductRegistry& preg,
40  PreallocationConfiguration const* prealloc,
41  std::shared_ptr<ProcessConfiguration const> processConfiguration,
42  std::string const & label) {
43  WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
44  return workerReg_.getWorker(params, label);
45  }
46 
48  ProductRegistry& preg,
49  PreallocationConfiguration const* prealloc,
50  std::shared_ptr<ProcessConfiguration> processConfiguration,
52  std::set<std::string>& unscheduledLabels,
53  std::vector<std::string>& shouldBeUsedLabels) {
54  //Need to
55  // 1) create worker
56  // 2) if it is a WorkerT<EDProducer>, add it to our list
57  auto modType = pset.getParameter<std::string>("@module_edm_type");
58  if(modType == kProducerType || modType == kFilterType) {
59  Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
60  assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
61  unscheduledLabels.insert(label);
62  unscheduled_.addWorker(newWorker);
63  //add to list so it gets reset each new event
64  addToAllWorkers(newWorker);
65  } else {
66  shouldBeUsedLabels.push_back(label);
67  }
68  }
69 
70  void WorkerManager::setOnDemandProducts(ProductRegistry& pregistry, std::set<std::string> const& unscheduledLabels) const {
71  for(auto& prod : pregistry.productListUpdator()) {
72  if(prod.second.produced() &&
73  prod.second.branchType() == InEvent &&
74  unscheduledLabels.end() != unscheduledLabels.find(prod.second.moduleLabel())) {
75  prod.second.setOnDemand(true);
76  }
77  }
78  }
79 
81  for(auto& worker : allWorkers_) {
82  worker->endJob();
83  }
84  }
85 
87  for(auto& worker : allWorkers_) {
88  try {
90  worker->endJob();
91  });
92  }
93  catch (cms::Exception const& ex) {
94  collector.addException(ex);
95  }
96  }
97  }
98 
99 
100  void WorkerManager::beginJob(ProductRegistry const& iRegistry) {
101  auto const runLookup = iRegistry.productLookup(InRun);
102  auto const lumiLookup = iRegistry.productLookup(InLumi);
103  auto const eventLookup = iRegistry.productLookup(InEvent);
104  if(!allWorkers_.empty()) {
105  auto const& processName = allWorkers_[0]->description().processName();
106  auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
107  auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
108  auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
109  for(auto& worker : allWorkers_) {
110  worker->updateLookup(InRun,*runLookup);
111  worker->updateLookup(InLumi,*lumiLookup);
112  worker->updateLookup(InEvent,*eventLookup);
113  worker->resolvePutIndicies(InRun,runModuleToIndicies);
114  worker->resolvePutIndicies(InLumi,lumiModuleToIndicies);
115  worker->resolvePutIndicies(InEvent,eventModuleToIndicies);
116  }
117 
118  for_all(allWorkers_, std::bind(&Worker::beginJob, std::placeholders::_1));
119  }
120  }
121 
122  void
124  for(auto& worker: allWorkers_) {
125  worker->beginStream(iID, streamContext);
126  }
127  }
128 
129  void
131  for(auto& worker: allWorkers_) {
132  worker->endStream(iID, streamContext);
133  }
134  }
135 
136  void
138  for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1));
139  }
140 
141  void
143  if(!search_all(allWorkers_, w)) {
144  allWorkers_.push_back(w);
145  }
146  }
147 
148  void
150  this->resetAll();
152  if(&ep != lastSetupEventPrincipal_) {
154  ep.setupUnscheduled(config);
156  }
157  }
158 
159 }
T getParameter(std::string const &) const
void addException(cms::Exception const &exception)
void setupUnscheduled(UnscheduledConfigurator const &)
Definition: Principal.cc:351
roAction_t actions[nactions]
Definition: GenABIO.cc:187
void endStream(StreamID iID, StreamContext &streamContext)
const double w
Definition: UKUtility.cc:23
static const std::string kFilterType("EDFilter")
UnscheduledCallProducer unscheduled_
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
AllWorkers allWorkers_
void beginJob(ProductRegistry const &iRegistry)
#define nullptr
ExceptionToActionTable const * actionTable_
void const * lastSetupEventPrincipal_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
config
Definition: looper.py:287
void reset()
Definition: Worker.h:178
void beginStream(StreamID iID, StreamContext &streamContext)
static const std::string kProducerType("EDProducer")
UnscheduledAuxiliary const & auxiliary() const
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
ProductList & productListUpdator()
std::shared_ptr< ProductResolverIndexHelper const > productLookup(BranchType branchType) const
HLT enums.
void setEventSetup(EventSetup const &iSetup)
void beginJob()
Definition: Worker.cc:289
WorkerRegistry workerReg_
Worker * getWorker(WorkerParams const &p, std::string const &moduleLabel)
Retrieve the particular instance of the worker.
virtual Types moduleType() const =0
auto wrap(F iFunc) -> decltype(iFunc())
WorkerManager(std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void addToAllWorkers(Worker *w)