CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
WorkerManager.cc
Go to the documentation of this file.
2 
8 
9 static const std::string kFilterType("EDFilter");
10 static const std::string kProducerType("EDProducer");
11 
12 namespace edm {
13  // -----------------------------
14 
15  WorkerManager::WorkerManager(std::shared_ptr<ActivityRegistry> areg, ExceptionToActionTable const& actions) :
16  workerReg_(areg),
17  actionTable_(&actions),
18  allWorkers_(),
19  unscheduled_(new UnscheduledCallProducer) {
20  } // WorkerManager::WorkerManager
21 
22  WorkerManager::WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
23  std::shared_ptr<ActivityRegistry> areg,
25  workerReg_(areg,modReg),
26  actionTable_(&actions),
27  allWorkers_(),
28  unscheduled_(new UnscheduledCallProducer) {
29  } // WorkerManager::WorkerManager
30 
34  std::shared_ptr<ProcessConfiguration const> processConfiguration,
35  std::string const & label) {
36  WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
37  return workerReg_.getWorker(params, label);
38  }
39 
43  std::shared_ptr<ProcessConfiguration> processConfiguration,
45  std::set<std::string>& unscheduledLabels,
46  std::vector<std::string>& shouldBeUsedLabels) {
47  //Need to
48  // 1) create worker
49  // 2) if it is a WorkerT<EDProducer>, add it to our list
50  auto modType = pset.getParameter<std::string>("@module_edm_type");
51  if(modType == kProducerType || modType == kFilterType) {
52  Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
53  assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
54  unscheduledLabels.insert(label);
55  unscheduled_->addWorker(newWorker);
56  //add to list so it gets reset each new event
57  addToAllWorkers(newWorker);
58  } else {
59  shouldBeUsedLabels.push_back(label);
60  }
61  }
62 
63  void WorkerManager::setOnDemandProducts(ProductRegistry& pregistry, std::set<std::string> const& unscheduledLabels) const {
64  for(auto& prod : pregistry.productListUpdator()) {
65  if(prod.second.produced() &&
66  prod.second.branchType() == InEvent &&
67  unscheduledLabels.end() != unscheduledLabels.find(prod.second.moduleLabel())) {
68  prod.second.setOnDemand(true);
69  }
70  }
71  }
72 
74  for(auto& worker : allWorkers_) {
75  worker->endJob();
76  }
77  }
78 
80  for(auto& worker : allWorkers_) {
81  try {
83  worker->endJob();
84  });
85  }
86  catch (cms::Exception const& ex) {
87  collector.addException(ex);
88  }
89  }
90  }
91 
92 
93  void WorkerManager::beginJob(ProductRegistry const& iRegistry) {
94  auto const runLookup = iRegistry.productLookup(InRun);
95  auto const lumiLookup = iRegistry.productLookup(InLumi);
96  auto const eventLookup = iRegistry.productLookup(InEvent);
97  for(auto& worker : allWorkers_) {
98  worker->updateLookup(InRun,*runLookup);
99  worker->updateLookup(InLumi,*lumiLookup);
100  worker->updateLookup(InEvent,*eventLookup);
101  }
102 
103  for_all(allWorkers_, std::bind(&Worker::beginJob, std::placeholders::_1));
105  }
106 
107  void
109  for(auto& worker: allWorkers_) {
110  worker->beginStream(iID, streamContext);
111  }
112  }
113 
114  void
116  for(auto& worker: allWorkers_) {
117  worker->endStream(iID, streamContext);
118  }
119  }
120 
121  void
123  for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1));
124  }
125 
126  void
128  if(!search_all(allWorkers_, w)) {
129  allWorkers_.push_back(w);
130  }
131  }
132 
133  void
135  // NOTE: who owns the productdescrption? Just copied by value
136  unscheduled_->setEventSetup(es);
138  }
139 
140 }
T getParameter(std::string const &) const
void addException(cms::Exception const &exception)
void endStream(StreamID iID, StreamContext &streamContext)
static const std::string kFilterType("EDFilter")
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
AllWorkers allWorkers_
Definition: WorkerManager.h:84
void beginJob(ProductRegistry const &iRegistry)
processConfiguration
Definition: Schedule.cc:368
std::shared_ptr< ProductHolderIndexHelper > const & productLookup(BranchType branchType) const
actions
Definition: Schedule.cc:368
ExceptionToActionTable const * actionTable_
Definition: WorkerManager.h:82
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void setUnscheduledHandler(std::shared_ptr< UnscheduledHandler > iHandler)
void reset()
Definition: Worker.h:88
areg
Definition: Schedule.cc:368
virtual Types moduleType() const =0
void beginStream(StreamID iID, StreamContext &streamContext)
static const std::string kProducerType("EDProducer")
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
ProductList & productListUpdator()
void loadMissingDictionaries()
std::shared_ptr< UnscheduledCallProducer > unscheduled_
Definition: WorkerManager.h:86
void beginJob()
Definition: Worker.cc:104
WorkerRegistry workerReg_
Definition: WorkerManager.h:81
Worker * getWorker(WorkerParams const &p, std::string const &moduleLabel)
Retrieve the particular instance of the worker.
T w() const
auto wrap(F iFunc) -> decltype(iFunc())
WorkerManager(std::shared_ptr< ActivityRegistry > actReg, ExceptionToActionTable const &actions)
preg
Definition: Schedule.cc:368
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void setupOnDemandSystem(EventPrincipal &principal, EventSetup const &es)
prealloc
Definition: Schedule.cc:368
void addToAllWorkers(Worker *w)