CMS 3D CMS Logo

SubProcess.cc
Go to the documentation of this file.
2 
36 
37 #include "boost/range/adaptor/reversed.hpp"
38 
39 #include <cassert>
40 #include <string>
41 #include <vector>
42 
43 namespace edm {
44 
46  ParameterSet const& topLevelParameterSet,
47  std::shared_ptr<ProductRegistry const> parentProductRegistry,
48  std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
49  ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
50  SubProcessParentageHelper const& parentSubProcessParentageHelper,
52  ActivityRegistry& parentActReg,
53  ServiceToken const& token,
55  PreallocationConfiguration const& preallocConfig,
56  ProcessContext const* parentProcessContext) :
58  serviceToken_(),
59  parentPreg_(parentProductRegistry),
60  preg_(),
61  branchIDListHelper_(),
62  act_table_(),
63  processConfiguration_(),
64  historyLumiOffset_(preallocConfig.numberOfStreams()),
65  historyRunOffset_(historyLumiOffset_+preallocConfig.numberOfLuminosityBlocks()),
66  processHistoryRegistries_(historyRunOffset_+ preallocConfig.numberOfRuns()),
67  historyAppenders_(historyRunOffset_+preallocConfig.numberOfRuns()),
68  principalCache_(),
69  esp_(),
70  schedule_(),
71  parentToChildPhID_(),
72  subProcesses_(),
73  processParameterSet_(),
74  productSelectorRules_(parameterSet, "outputCommands", "OutputModule"),
75  productSelector_(),
76  wantAllEvents_(true) {
77 
78  //Setup the event selection
80 
81  ParameterSet selectevents =
82  parameterSet.getUntrackedParameterSet("SelectEvents", ParameterSet());
83 
84  selectevents.registerIt(); // Just in case this PSet is not registered
86  tns->getProcessName(),
88  selectors_,
90  std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
92  "",
93  outputModulePathPositions,
94  parentProductRegistry->anyProductProduced());
95 
96  std::map<BranchID, bool> keepAssociation;
97  selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
98 
99  std::string const maxEvents("maxEvents");
100  std::string const maxLumis("maxLuminosityBlocks");
101 
102  // propagate_const<T> has no reset() function
103  processParameterSet_ = std::unique_ptr<ParameterSet>(parameterSet.popParameterSet(std::string("process")).release());
104 
105  // if this process has a maxEvents or maxLuminosityBlocks parameter set, remove them.
106  if(processParameterSet_->exists(maxEvents)) {
107  processParameterSet_->popParameterSet(maxEvents);
108  }
109  if(processParameterSet_->exists(maxLumis)) {
110  processParameterSet_->popParameterSet(maxLumis);
111  }
112 
113  // if the top level process has a maxEvents or maxLuminosityBlocks parameter set, add them to this process.
114  if(topLevelParameterSet.exists(maxEvents)) {
115  processParameterSet_->addUntrackedParameter<ParameterSet>(maxEvents, topLevelParameterSet.getUntrackedParameterSet(maxEvents));
116  }
117  if(topLevelParameterSet.exists(maxLumis)) {
118  processParameterSet_->addUntrackedParameter<ParameterSet>(maxLumis, topLevelParameterSet.getUntrackedParameterSet(maxLumis));
119  }
120 
121  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
122  auto subProcessVParameterSet = popSubProcessVParameterSet(*processParameterSet_);
123  bool hasSubProcesses = subProcessVParameterSet.size() != 0ull;
124 
125  // Validates the parameters in the 'options', 'maxEvents', and 'maxLuminosityBlocks'
126  // top level parameter sets. Default values are also set in here if the
127  // parameters were not explicitly set.
129 
130  ScheduleItems items(*parentProductRegistry, *this);
131  actReg_ = items.actReg_;
132 
133  //initialize the services
134  ServiceToken iToken;
135 
136  // get any configured services.
137  auto serviceSets = processParameterSet_->popVParameterSet(std::string("services"));
138 
139  ServiceToken newToken = items.initServices(serviceSets, *processParameterSet_, token, iLegacy, false);
140  parentActReg.connectToSubProcess(*items.actReg_);
141  serviceToken_ = items.addCPRandTNS(*processParameterSet_, newToken);
142 
143 
144  //make the services available
146 
147  // intialize miscellaneous items
148  items.initMisc(*processParameterSet_);
149 
150  // intialize the event setup provider
151  esp_ = esController.makeProvider(*processParameterSet_, actReg_.get());
152 
153  branchIDListHelper_ = items.branchIDListHelper();
154  updateBranchIDListHelper(parentBranchIDListHelper->branchIDLists());
155 
156  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
157  thinnedAssociationsHelper_->updateFromParentProcess(parentThinnedAssociationsHelper, keepAssociation, droppedBranchIDToKeptBranchID_);
158 
159  // intialize the Schedule
160  schedule_ = items.initSchedule(*processParameterSet_,hasSubProcesses,preallocConfig,&processContext_);
161 
162  // set the items
163  act_table_ = std::move(items.act_table_);
164  preg_ = items.preg();
165 
166  subProcessParentageHelper_ = items.subProcessParentageHelper();
167  subProcessParentageHelper_->update(parentSubProcessParentageHelper, *parentProductRegistry);
168 
169  //CMS-THREADING this only works since Run/Lumis are synchronous so when principalCache asks for
170  // the reducedProcessHistoryID from a full ProcessHistoryID that registry will not be in use by
171  // another thread. We really need to change how this is done in the PrincipalCache.
173 
174 
175  processConfiguration_ = items.processConfiguration();
177  processContext_.setParentProcessContext(parentProcessContext);
178 
180  for(unsigned int index = 0; index < preallocConfig.numberOfStreams(); ++index) {
181  auto ep = std::make_shared<EventPrincipal>(preg_,
186  index,
187  false /*not primary process*/);
189  }
190  for(unsigned int index = 0; index < preallocConfig.numberOfLuminosityBlocks(); ++ index) {
191  auto lbpp = std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_, &(historyAppenders_[historyLumiOffset_+index]),index,false);
193  }
194 
195  inUseLumiPrincipals_.resize(preallocConfig.numberOfLuminosityBlocks());
196 
197  subProcesses_.reserve(subProcessVParameterSet.size());
198  for(auto& subProcessPSet : subProcessVParameterSet) {
199  subProcesses_.emplace_back(subProcessPSet,
200  topLevelParameterSet,
201  preg_,
203  *thinnedAssociationsHelper_,
205  esController,
206  *items.actReg_,
207  newToken,
208  iLegacy,
209  preallocConfig,
210  &processContext_);
211  }
212  }
213 
215 
216  void
218  this->beginJob();
219  }
220 
221  void
223  endJob();
224  }
225 
226 
227  void
229  // If event selection is being used, the SubProcess class reads TriggerResults
230  // object(s) in the parent process from the event. This next call is needed for
231  // getByToken to work properly. Normally, this is done by the worker, but since
232  // a SubProcess is not a module, it has no worker.
233  updateLookup(InEvent, *parentPreg_->productLookup(InEvent),false);
234 
235  if(!droppedBranchIDToKeptBranchID().empty()) {
237  }
239  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
241  //NOTE: this may throw
243  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
244  schedule_->beginJob(*preg_);
245  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
246  }
247 
248  void
251  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
252  schedule_->endJob(c);
253  for(auto& subProcess : subProcesses_) {
254  c.call([&subProcess](){ subProcess.doEndJob();});
255  }
256  if(c.hasThrown()) {
257  c.rethrow();
258  }
259  }
260 
261  void
263  ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
264  std::map<BranchID, bool>& keepAssociation) {
265  if(productSelector_.initialized()) return;
267 
268  // TODO: See if we can collapse keptProducts_ and productSelector_ into a
269  // single object. See the notes in the header for ProductSelector
270  // for more information.
271 
272  std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
273  std::vector<BranchDescription const*> associationDescriptions;
274  std::set<BranchID> keptProductsInEvent;
275 
276  for(auto const& it : preg.productList()) {
277  BranchDescription const& desc = it.second;
278  if(desc.transient()) {
279  // if the class of the branch is marked transient, output nothing
280  } else if(!desc.present() && !desc.produced()) {
281  // else if the branch containing the product has been previously dropped,
282  // output nothing
283  } else if(desc.unwrappedType() == typeid(ThinnedAssociation)) {
284  associationDescriptions.push_back(&desc);
285  } else if(productSelector_.selected(desc)) {
286  keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
287  }
288  }
289 
290  parentThinnedAssociationsHelper.selectAssociationProducts(associationDescriptions,
291  keptProductsInEvent,
292  keepAssociation);
293 
294  for(auto association : associationDescriptions) {
295  if(keepAssociation[association->branchID()]) {
296  keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
297  }
298  }
299 
300  // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept.
301  ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);
302  }
303 
305  std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
306  std::set<BranchID>& keptProductsInEvent) {
307 
309  trueBranchIDToKeptBranchDesc);
310 
311  if(desc.branchType() == InEvent) {
312  if(desc.produced()) {
313  keptProductsInEvent.insert(desc.originalBranchID());
314  } else {
315  keptProductsInEvent.insert(desc.branchID());
316  }
317  }
319  InputTag{desc.moduleLabel(),
320  desc.productInstanceName(),
321  desc.processName()});
322 
323  // Now put it in the list of selected branches.
324  keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token));
325  }
326 
327  void
328  SubProcess::fixBranchIDListsForEDAliases(std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID) {
329  // Check for branches dropped while an EDAlias was kept.
330  // Replace BranchID of each dropped branch with that of the kept alias.
331  for(BranchIDList& branchIDList : branchIDListHelper_->mutableBranchIDLists()) {
332  for(BranchID::value_type& branchID : branchIDList) {
333  std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter = droppedBranchIDToKeptBranchID.find(branchID);
334  if(iter != droppedBranchIDToKeptBranchID.end()) {
335  branchID = iter->second;
336  }
337  }
338  }
339  for_all(subProcesses_, [&droppedBranchIDToKeptBranchID](auto& subProcess){ subProcess.fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID); });
340  }
341 
342  void
344  EventPrincipal const& ep) {
346  /* BEGIN relevant bits from OutputModule::doEvent */
347  if(!wantAllEvents_) {
348  EventForOutput e(ep, ModuleDescription(), nullptr);
349  e.setConsumer(this);
350  if(!selectors_.wantEvent(e)) {
351  return;
352  }
353  }
354  processAsync(std::move(iHolder),ep);
355  /* END relevant bits from OutputModule::doEvent */
356  }
357 
358  void
360  EventPrincipal const& principal) {
361  EventAuxiliary aux(principal.aux());
362  aux.setProcessHistoryID(principal.processHistoryID());
363 
364  EventSelectionIDVector esids{principal.eventSelectionIDs()};
365  if (principal.productRegistry().anyProductProduced() || !wantAllEvents_) {
366  esids.push_back(selector_config_id_);
367  }
368 
370  auto & processHistoryRegistry = processHistoryRegistries_[principal.streamID().value()];
371  processHistoryRegistry.registerProcessHistory(principal.processHistory());
372  BranchListIndexes bli(principal.branchListIndexes());
373  branchIDListHelper_->fixBranchListIndexes(bli);
374  bool deepCopyRetriever = false;
376  processHistoryRegistry,
377  std::move(esids),
378  std::move(bli),
379  *(principal.productProvenanceRetrieverPtr()),//NOTE: this transfers the per product provenance
380  principal.reader(),
381  deepCopyRetriever);
383  propagateProducts(InEvent, principal, ep);
384 
385  WaitingTaskHolder finalizeEventTask( make_waiting_task(tbb::task::allocate_root(),
386  [&ep,iHolder](std::exception_ptr const* iPtr) mutable
387  {
388  ep.clearEventPrincipal();
389  if(iPtr) {
390  iHolder.doneWaiting(*iPtr);
391  } else {
392  iHolder.doneWaiting(std::exception_ptr());
393  }
394  }
395  )
396  );
397  WaitingTaskHolder afterProcessTask;
398  if(subProcesses_.empty()) {
399  afterProcessTask = std::move(finalizeEventTask);
400  } else {
401  afterProcessTask = WaitingTaskHolder(
402  make_waiting_task(tbb::task::allocate_root(),
403  [this,&ep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable{
404  if(not iPtr) {
405  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
406  subProcess.doEventAsync(finalizeEventTask,ep);
407  }
408  } else {
409  finalizeEventTask.doneWaiting(*iPtr);
410  }
411  })
412  );
413  }
414 
415  schedule_->processOneEventAsync(std::move(afterProcessTask),
416  ep.streamID().value(),ep, esp_->eventSetup(),serviceToken_);
417  }
418 
419  void
422 
423  auto aux = std::make_shared<RunAuxiliary>(principal.aux());
424  aux->setProcessHistoryID(principal.processHistoryID());
425  auto rpp = std::make_shared<RunPrincipal>(aux, preg_, *processConfiguration_, &(historyAppenders_[historyRunOffset_+principal.index()]),principal.index(),false);
426  auto & processHistoryRegistry = processHistoryRegistries_[historyRunOffset_+principal.index()];
427  processHistoryRegistry.registerProcessHistory(principal.processHistory());
428  rpp->fillRunPrincipal(processHistoryRegistry, principal.reader());
429  principalCache_.insert(rpp);
430 
431  ProcessHistoryID const& parentInputReducedPHID = principal.reducedProcessHistoryID();
432  ProcessHistoryID const& inputReducedPHID = rpp->reducedProcessHistoryID();
433 
434  parentToChildPhID_.insert(std::make_pair(parentInputReducedPHID,inputReducedPHID));
435 
437  propagateProducts(InRun, principal, rp);
439  beginGlobalTransitionAsync<Traits>(std::move(iHolder),
440  *schedule_,
441  rp,
442  ts,
443  esp_->eventSetup(),
445  subProcesses_);
446  }
447 
448 
449  void
451  RunPrincipal const& principal,
452  IOVSyncValue const& ts,
453  bool cleaningUpAfterException) {
455  propagateProducts(InRun, principal, rp);
457  endGlobalTransitionAsync<Traits>(std::move(iHolder),
458  *schedule_,
459  rp,
460  ts,
461  esp_->eventSetup(),
464  cleaningUpAfterException);
465  }
466 
467  void
469  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
471  std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
472  assert(it != parentToChildPhID_.end());
473  auto const& childPhID = it->second;
474 
475  auto subTasks = edm::make_waiting_task(tbb::task::allocate_root(), [this,childPhID,runNumber, task, mergeableRunProductMetadata](std::exception_ptr const* iExcept) mutable {
476  if( iExcept) {
477  task.doneWaiting(*iExcept);
478  } else {
480  for(auto& s: subProcesses_) {
481  s.writeRunAsync(task, childPhID, runNumber, mergeableRunProductMetadata);
482  }
483  }
484  });
485  schedule_->writeRunAsync(WaitingTaskHolder(subTasks),principalCache_.runPrincipal(childPhID, runNumber),
486  &processContext_, actReg_.get(), mergeableRunProductMetadata);
487  }
488 
489  void
491  std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
492  assert(it != parentToChildPhID_.end());
493  auto const& childPhID = it->second;
494  principalCache_.deleteRun(childPhID, runNumber);
495  for_all(subProcesses_, [&childPhID, runNumber](auto& subProcess){ subProcess.deleteRunFromCache(childPhID, runNumber); });
496  }
497 
498  void
501 
502  auto aux = principal.aux();
503  aux.setProcessHistoryID(principal.processHistoryID());
505  lbpp->setAux(aux);
506  auto & processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_+lbpp->index()];
507  inUseLumiPrincipals_[principal.index()] = lbpp;
508  processHistoryRegistry.registerProcessHistory(principal.processHistory());
509  lbpp->fillLuminosityBlockPrincipal(processHistoryRegistry, principal.reader());
510  lbpp->setRunPrincipal(principalCache_.runPrincipalPtr());
511  LuminosityBlockPrincipal& lbp = *lbpp;
512  propagateProducts(InLumi, principal, lbp);
514  beginGlobalTransitionAsync<Traits>(std::move(iHolder),
515  *schedule_,
516  lbp,
517  ts,
518  esp_->eventSetup(),
520  subProcesses_);
521  }
522 
523  void
525 
527  propagateProducts(InLumi, principal, lbp);
529  endGlobalTransitionAsync<Traits>(std::move(iHolder),
530  *schedule_,
531  lbp,
532  ts,
533  esp_->eventSetup(),
536  cleaningUpAfterException);
537  }
538 
539 
540  void
543 
544  auto l =inUseLumiPrincipals_[principal.index()];
545  auto subTasks = edm::make_waiting_task(tbb::task::allocate_root(), [this,l, task](std::exception_ptr const* iExcept) mutable {
546  if( iExcept) {
547  task.doneWaiting(*iExcept);
548  } else {
550  for(auto& s: subProcesses_) {
551  s.writeLumiAsync(task, *l);
552  }
553  }
554  });
555  schedule_->writeLumiAsync(WaitingTaskHolder(subTasks),*l, &processContext_, actReg_.get());
556  }
557 
558  void
560  //release from list but stay around till end of routine
561  auto lb = std::move(inUseLumiPrincipals_[principal.index()]);
562  for(auto& s: subProcesses_) {
563  s.deleteLumiFromCache(*lb);
564  }
565  lb->clearPrincipal();
566  }
567 
568  void
569  SubProcess::doBeginStream(unsigned int iID) {
571  schedule_->beginStream(iID);
572  for_all(subProcesses_, [iID](auto& subProcess){ subProcess.doBeginStream(iID); });
573  }
574 
575  void
576  SubProcess::doEndStream(unsigned int iID) {
578  schedule_->endStream(iID);
579  for_all(subProcesses_, [iID](auto& subProcess){ subProcess.doEndStream(iID); });
580  }
581 
582  void
584  unsigned int id, RunPrincipal const& principal, IOVSyncValue const& ts) {
586 
588 
589  beginStreamTransitionAsync<Traits>(std::move(iHolder),
590  *schedule_,
591  id,
592  rp,
593  ts,
594  esp_->eventSetup(),
596  subProcesses_);
597 
598  }
599 
600 
601  void
603  unsigned int id, RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {
606 
607  endStreamTransitionAsync<Traits>(std::move(iHolder),
608  *schedule_,
609  id,
610  rp,
611  ts,
612  esp_->eventSetup(),
615  cleaningUpAfterException);
616  }
617 
618  void
620  unsigned int id, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts) {
622 
624 
625  beginStreamTransitionAsync<Traits>(std::move(iHolder),
626  *schedule_,
627  id,
628  lbp,
629  ts,
630  esp_->eventSetup(),
632  subProcesses_);
633  }
634 
635 
636 
637  void
639  unsigned int id, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {
642  endStreamTransitionAsync<Traits>(std::move(iHolder),
643  *schedule_,
644  id,
645  lbp,
646  ts,
647  esp_->eventSetup(),
650  cleaningUpAfterException);
651  }
652 
653 
654  void
656  SelectedProducts const& keptVector = keptProducts()[type];
657  for(auto const& item : keptVector) {
658  BranchDescription const& desc = *item.first;
659  ProductResolverBase const* parentProductResolver = parentPrincipal.getProductResolver(desc.branchID());
660  if(parentProductResolver != nullptr) {
661  ProductResolverBase* productResolver = principal.getModifiableProductResolver(desc.branchID());
662  if(productResolver != nullptr) {
663  //Propagate the per event(run)(lumi) data for this product to the subprocess.
664  //First, the product itself.
665  productResolver->connectTo(*parentProductResolver, &parentPrincipal);
666  }
667  }
668  }
669  }
670 
672  branchIDListHelper_->updateFromParent(branchIDLists);
673  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
674  }
675 
676  // Call respondToOpenInputFile() on all Modules
677  void
680  schedule_->respondToOpenInputFile(fb);
681  for_all(subProcesses_, [&fb](auto& subProcess){ subProcess.respondToOpenInputFile(fb); });
682  }
683 
684  // free function
685  std::vector<ParameterSet>
687  std::vector<std::string> subProcesses = parameterSet.getUntrackedParameter<std::vector<std::string>>("@all_subprocesses");
688  if(!subProcesses.empty()) {
689  return parameterSet.popVParameterSet("subProcesses");
690  }
691  return {};
692  }
693 }
unsigned int historyRunOffset_
Definition: SubProcess.h:270
unsigned int historyLumiOffset_
Definition: SubProcess.h:269
ParameterSetID selector_config_id_
Definition: SubProcess.h:292
void insert(std::shared_ptr< RunPrincipal > rp)
type
Definition: HCALResponse.h:21
void setLuminosityBlockPrincipal(LuminosityBlockPrincipal *lbp)
ProductRegistry const & productRegistry() const
Definition: Principal.h:150
ProductResolverBase * getModifiableProductResolver(BranchID const &oid)
Definition: Principal.h:157
void doEventAsync(WaitingTaskHolder iHolder, EventPrincipal const &principal)
Definition: SubProcess.cc:343
T getUntrackedParameter(std::string const &, T const &) const
bool selected(BranchDescription const &desc) const
EventSelectionIDVector const & eventSelectionIDs() const
void respondToOpenInputFile(FileBlock const &fb)
Definition: SubProcess.cc:678
BranchType const & branchType() const
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
ProcessHistoryID const & reducedProcessHistoryID() const
Definition: RunPrincipal.h:69
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:583
std::unique_ptr< ParameterSet > popParameterSet(std::string const &name)
static void fillDroppedToKept(ProductRegistry const &preg, std::map< BranchID, BranchDescription const * > const &trueBranchIDToKeptBranchDesc, std::map< BranchID::value_type, BranchID::value_type > &droppedBranchIDToKeptBranchID_)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: SubProcess.h:251
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
Definition: SubProcess.h:271
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: SubProcess.h:261
std::vector< SubProcess > subProcesses_
Definition: SubProcess.h:279
Definition: Hash.h:43
void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &, bool iPrefetchMayGet)
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:602
PathsAndConsumesOfModules pathsAndConsumesOfModules_
Definition: SubProcess.h:266
bool exists(std::string const &parameterName) const
checks if a parameter exists
void updateBranchIDListHelper(BranchIDLists const &)
Definition: SubProcess.cc:671
LuminosityBlockAuxiliary const & aux() const
SelectedProductsForBranchType const & keptProducts() const
Definition: SubProcess.h:71
std::string const & processName() const
LuminosityBlockIndex index() const
void doEndStream(unsigned int)
Definition: SubProcess.cc:576
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:638
std::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &, ActivityRegistry *)
void setParentProcessContext(ProcessContext const *parentProcessContext)
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
Definition: SubProcess.h:247
BranchListIndexes const & branchListIndexes() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: SubProcess.h:277
ProcessHistory const & processHistory() const
Definition: Principal.h:140
BranchType
Definition: BranchType.h:11
std::vector< EventSelectionID > EventSelectionIDVector
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
EDGetTokenT< ProductType > consumes(edm::InputTag const &tag)
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
ProductList const & productList() const
void selectAssociationProducts(std::vector< BranchDescription const * > const &associationDescriptions, std::set< BranchID > const &keptProductsInEvent, std::map< BranchID, bool > &keepAssociation) const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
virtual void connectTo(ProductResolverBase const &, Principal const *)=0
ServiceToken serviceToken_
Definition: SubProcess.h:257
void deleteRunFromCache(ProcessHistoryID const &parentPhID, int runNumber)
Definition: SubProcess.cc:490
def principal(options)
std::vector< BranchListIndex > BranchListIndexes
void doneWaiting(std::exception_ptr iExcept)
std::vector< std::shared_ptr< LuminosityBlockPrincipal > > inUseLumiPrincipals_
Definition: SubProcess.h:275
void processAsync(WaitingTaskHolder iHolder, EventPrincipal const &e)
Definition: SubProcess.cc:359
void doEndRunAsync(WaitingTaskHolder iHolder, RunPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:450
~SubProcess() override
Definition: SubProcess.cc:214
std::string const & moduleLabel() const
void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:499
unsigned int value_type
Definition: BranchID.h:16
std::string const & productInstanceName() const
ProcessHistoryID const & processHistoryID() const
Definition: Principal.h:144
SelectedProductsForBranchType keptProducts_
Definition: SubProcess.h:286
LuminosityBlockPrincipal const & luminosityBlockPrincipal() const
void selectProducts(ProductRegistry const &preg, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, std::map< BranchID, bool > &keepAssociation)
Definition: SubProcess.cc:262
void doBeginRunAsync(WaitingTaskHolder iHolder, RunPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:420
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
void deleteLumiFromCache(LuminosityBlockPrincipal &)
Definition: SubProcess.cc:559
ProductSelectorRules productSelectorRules_
Definition: SubProcess.h:287
StreamID streamID() const
TypeID unwrappedTypeID() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
Definition: SubProcess.h:276
std::vector< BranchDescription const * > allBranchDescriptions() const
edm::propagate_const< std::unique_ptr< ParameterSet > > processParameterSet_
Definition: SubProcess.h:280
void doEndLuminosityBlockAsync(WaitingTaskHolder iHolder, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:524
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector, ConsumesCollector &&iC)
BranchID const & branchID() const
TypeWithDict const & unwrappedType() const
RunAuxiliary const & aux() const
Definition: RunPrincipal.h:61
void keepThisBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const * > &trueBranchIDToKeptBranchDesc, std::set< BranchID > &keptProductsInEvent)
Definition: SubProcess.cc:304
ProductProvenanceRetriever const * productProvenanceRetrieverPtr() const
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:619
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: SubProcess.h:263
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:686
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
DelayedReader * reader() const
Definition: Principal.h:183
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
Definition: SubProcess.h:297
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &parentPhID, int runNumber, MergeableRunProductMetadata const *)
Definition: SubProcess.cc:468
ProductSelector productSelector_
Definition: SubProcess.h:288
unsigned int value() const
Definition: StreamID.h:46
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
detail::TriggerResultsBasedEventSelector selectors_
Definition: SubProcess.h:293
void connectToSubProcess(ActivityRegistry &iOther)
std::map< ProcessHistoryID, ProcessHistoryID > parentToChildPhID_
Definition: SubProcess.h:278
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::shared_ptr< ActivityRegistry > actReg_
Definition: SubProcess.h:256
SubProcess(ParameterSet &parameterSet, ParameterSet const &topLevelParameterSet, std::shared_ptr< ProductRegistry const > parentProductRegistry, std::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, SubProcessParentageHelper const &parentSubProcessParentageHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext)
Definition: SubProcess.cc:45
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: SubProcess.h:260
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
RunIndex index() const
Definition: RunPrincipal.h:57
bool anyProductProduced() const
std::vector< HistoryAppender > historyAppenders_
Definition: SubProcess.h:272
std::vector< BranchID::value_type > BranchIDList
Definition: BranchIDList.h:18
std::vector< ParameterSet > popVParameterSet(std::string const &name)
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
HLT enums.
bool initialized() const
std::vector< std::string > const & getAllTriggerNames()
void initialize(ProductSelectorRules const &rules, std::vector< BranchDescription const * > const &branchDescriptions)
void propagateProducts(BranchType type, Principal const &parentPrincipal, Principal &principal) const
Definition: SubProcess.cc:655
void doBeginStream(unsigned int)
Definition: SubProcess.cc:569
void setConsumer(EDConsumerBase const *iConsumer)
std::shared_ptr< ProductRegistry const > parentPreg_
Definition: SubProcess.h:258
ConstProductResolverPtr getProductResolver(BranchID const &oid) const
Definition: Principal.cc:511
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
Definition: SubProcess.cc:328
ProcessContext processContext_
Definition: SubProcess.h:265
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
BranchID const & originalBranchID() const
void call(std::function< void(void)>)
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: SubProcess.h:253
EventAuxiliary const & aux() const
ParameterSet const & registerIt()
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
Definition: SubProcess.cc:541
std::shared_ptr< ProductRegistry const > preg_
Definition: SubProcess.h:259
PrincipalCache principalCache_
Definition: SubProcess.h:273
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:511
edm::propagate_const< std::shared_ptr< SubProcessParentageHelper > > subProcessParentageHelper_
Definition: SubProcess.h:262
static void checkForDuplicateKeptBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const * > &trueBranchIDToKeptBranchDesc)
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Definition: SubProcess.h:264
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)