CMS 3D CMS Logo

SubProcess.cc
Go to the documentation of this file.
2 
42 
43 #include "boost/range/adaptor/reversed.hpp"
44 
45 #include <cassert>
46 #include <exception>
47 #include <string>
48 
49 namespace edm {
50 
52  ParameterSet const& topLevelParameterSet,
53  std::shared_ptr<ProductRegistry const> parentProductRegistry,
54  std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
55  ProcessBlockHelperBase const& parentProcessBlockHelper,
56  ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
57  SubProcessParentageHelper const& parentSubProcessParentageHelper,
59  ActivityRegistry& parentActReg,
60  ServiceToken const& token,
62  PreallocationConfiguration const& preallocConfig,
63  ProcessContext const* parentProcessContext,
64  ModuleTypeResolverMaker const* typeResolverMaker)
65  : EDConsumerBase(),
66  serviceToken_(),
67  parentPreg_(parentProductRegistry),
68  preg_(),
69  branchIDListHelper_(),
70  act_table_(),
71  processConfiguration_(),
72  historyLumiOffset_(preallocConfig.numberOfStreams()),
73  historyRunOffset_(historyLumiOffset_ + preallocConfig.numberOfLuminosityBlocks()),
74  processHistoryRegistries_(historyRunOffset_ + preallocConfig.numberOfRuns()),
75  historyAppenders_(historyRunOffset_ + preallocConfig.numberOfRuns()),
76  principalCache_(),
77  esp_(),
78  schedule_(),
79  subProcesses_(),
80  processParameterSet_(),
81  productSelectorRules_(parameterSet, "outputCommands", "OutputModule"),
82  productSelector_(),
83  wantAllEvents_(true) {
84  //Setup the event selection
86 
87  ParameterSet selectevents = parameterSet.getUntrackedParameterSet("SelectEvents", ParameterSet());
88 
89  selectevents.registerIt(); // Just in case this PSet is not registered
91  selectevents, tns->getProcessName(), getAllTriggerNames(), selectors_, consumesCollector());
92  std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
94  selectevents, "", outputModulePathPositions, parentProductRegistry->anyProductProduced());
95 
96  std::map<BranchID, bool> keepAssociation;
97  selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
98 
99  std::string const maxEvents("maxEvents");
100  std::string const maxLumis("maxLuminosityBlocks");
101 
102  // propagate_const<T> has no reset() function
104  std::unique_ptr<ParameterSet>(parameterSet.popParameterSet(std::string("process")).release());
105 
106  // if this process has a maxEvents or maxLuminosityBlocks parameter set, remove them.
107  if (processParameterSet_->exists(maxEvents)) {
108  processParameterSet_->popParameterSet(maxEvents);
109  }
110  if (processParameterSet_->exists(maxLumis)) {
111  processParameterSet_->popParameterSet(maxLumis);
112  }
113 
114  // if the top level process has a maxEvents or maxLuminosityBlocks parameter set, add them to this process.
115  if (topLevelParameterSet.exists(maxEvents)) {
116  processParameterSet_->addUntrackedParameter<ParameterSet>(
117  maxEvents, topLevelParameterSet.getUntrackedParameterSet(maxEvents));
118  }
119  if (topLevelParameterSet.exists(maxLumis)) {
120  processParameterSet_->addUntrackedParameter<ParameterSet>(
121  maxLumis, topLevelParameterSet.getUntrackedParameterSet(maxLumis));
122  }
123 
124  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
125  auto subProcessVParameterSet = popSubProcessVParameterSet(*processParameterSet_);
126  bool hasSubProcesses = !subProcessVParameterSet.empty();
127 
128  // Validates the parameters in the 'options', 'maxEvents', and 'maxLuminosityBlocks'
129  // top level parameter sets. Default values are also set in here if the
130  // parameters were not explicitly set.
132 
133  processBlockHelper_ = std::make_shared<SubProcessBlockHelper>();
134 
135  ScheduleItems items(*parentProductRegistry, *this, *processBlockHelper_, parentProcessBlockHelper);
136  actReg_ = items.actReg_;
137 
138  //initialize the services
139  ServiceToken iToken;
140 
141  // get any configured services.
142  auto serviceSets = processParameterSet_->popVParameterSet(std::string("services"));
143 
144  ServiceToken newToken = items.initServices(serviceSets, *processParameterSet_, token, iLegacy, false);
145  parentActReg.connectToSubProcess(*items.actReg_);
146  serviceToken_ = items.addCPRandTNS(*processParameterSet_, newToken);
147 
148  //make the services available
150 
151  // intialize miscellaneous items
152  items.initMisc(*processParameterSet_);
153 
154  // intialize the event setup provider
155  esp_ = esController.makeProvider(*processParameterSet_, actReg_.get());
156 
157  branchIDListHelper_ = items.branchIDListHelper();
158  updateBranchIDListHelper(parentBranchIDListHelper->branchIDLists());
159 
160  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
161  thinnedAssociationsHelper_->updateFromParentProcess(
162  parentThinnedAssociationsHelper, keepAssociation, droppedBranchIDToKeptBranchID_);
163 
164  // intialize the Schedule
165  schedule_ = items.initSchedule(*processParameterSet_,
166  hasSubProcesses,
167  preallocConfig,
169  typeResolverMaker,
171 
172  // set the items
173  act_table_ = std::move(items.act_table_);
174  preg_ = items.preg();
175 
176  subProcessParentageHelper_ = items.subProcessParentageHelper();
177  subProcessParentageHelper_->update(parentSubProcessParentageHelper, *parentProductRegistry);
178 
179  processConfiguration_ = items.processConfiguration();
181  processContext_.setParentProcessContext(parentProcessContext);
182 
184  for (unsigned int index = 0; index < preallocConfig.numberOfStreams(); ++index) {
185  auto ep = std::make_shared<EventPrincipal>(preg_,
190  index,
191  false /*not primary process*/,
194  }
195 
196  for (unsigned int index = 0; index < preallocConfig.numberOfRuns(); ++index) {
197  auto rpp = std::make_unique<RunPrincipal>(
200  }
201 
202  for (unsigned int index = 0; index < preallocConfig.numberOfLuminosityBlocks(); ++index) {
203  auto lbpp = std::make_unique<LuminosityBlockPrincipal>(
206  }
207 
208  {
209  auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_, false);
211 
212  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_, false);
214  }
215 
216  inUseRunPrincipals_.resize(preallocConfig.numberOfRuns());
217  inUseLumiPrincipals_.resize(preallocConfig.numberOfLuminosityBlocks());
218 
219  subProcesses_.reserve(subProcessVParameterSet.size());
220  for (auto& subProcessPSet : subProcessVParameterSet) {
221  subProcesses_.emplace_back(subProcessPSet,
222  topLevelParameterSet,
223  preg_,
228  esController,
229  *items.actReg_,
230  newToken,
231  iLegacy,
232  preallocConfig,
234  typeResolverMaker);
235  }
236  }
237 
239 
240  std::vector<ModuleProcessName> SubProcess::keepOnlyConsumedUnscheduledModules(bool deleteModules) {
241  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
243 
244  // Note: all these may throw
246 
247  // Consumes information from the child SubProcesses
248  std::vector<ModuleProcessName> consumedByChildren;
249  for_all(subProcesses_, [&consumedByChildren, deleteModules](auto& subProcess) {
250  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
251  if (consumedByChildren.empty()) {
252  std::swap(consumedByChildren, c);
253  } else if (not c.empty()) {
254  std::vector<ModuleProcessName> tmp;
255  tmp.reserve(consumedByChildren.size() + c.size());
256  std::merge(consumedByChildren.begin(), consumedByChildren.end(), c.begin(), c.end(), std::back_inserter(tmp));
257  std::swap(consumedByChildren, tmp);
258  }
259  });
260 
261  // Non-consumed unscheduled modules in this SubProcess, take into account of the consumes from child SubProcesses
262  if (deleteModules) {
263  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedByChildren);
264  not unusedModules.empty()) {
266 
267  edm::LogInfo("DeleteModules").log([&unusedModules, this](auto& l) {
268  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
269  "and "
270  "therefore they are deleted from SubProcess "
271  << processConfiguration_->processName() << " before beginJob transition.";
272  for (auto const& description : unusedModules) {
273  l << "\n " << description->moduleLabel();
274  }
275  });
276  for (auto const& description : unusedModules) {
277  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
278  }
279  }
280  }
281 
282  // Products possibly consumed from the parent (Sub)Process
283  for (auto const& description : pathsAndConsumesOfModules_.allModules()) {
284  for (auto const& dep :
286  auto it = std::lower_bound(consumedByChildren.begin(),
287  consumedByChildren.end(),
288  ModuleProcessName{dep.moduleLabel(), dep.processName()});
289  consumedByChildren.emplace(it, dep.moduleLabel(), dep.processName());
290  }
291  }
292  return consumedByChildren;
293  }
294 
295  void SubProcess::doBeginJob() { this->beginJob(); }
296 
298 
300  // If event selection is being used, the SubProcess class reads TriggerResults
301  // object(s) in the parent process from the event. This next call is needed for
302  // getByToken to work properly. Normally, this is done by the worker, but since
303  // a SubProcess is not a module, it has no worker.
304  updateLookup(InEvent, *parentPreg_->productLookup(InEvent), false);
305 
306  if (!droppedBranchIDToKeptBranchID().empty()) {
308  }
310  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
311  schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_);
312  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
313  }
314 
318  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
319  schedule_->endJob(c);
320  for (auto& subProcess : subProcesses_) {
321  c.call([&subProcess]() { subProcess.doEndJob(); });
322  }
323  if (c.hasThrown()) {
324  c.rethrow();
325  }
326  }
327 
329  ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
330  std::map<BranchID, bool>& keepAssociation) {
332  return;
334 
335  // TODO: See if we can collapse keptProducts_ and productSelector_ into a
336  // single object. See the notes in the header for ProductSelector
337  // for more information.
338 
339  std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
340  std::vector<BranchDescription const*> associationDescriptions;
341  std::set<BranchID> keptProductsInEvent;
342 
343  for (auto const& it : preg.productList()) {
344  BranchDescription const& desc = it.second;
345  if (desc.transient()) {
346  // if the class of the branch is marked transient, output nothing
347  } else if (!desc.present() && !desc.produced()) {
348  // else if the branch containing the product has been previously dropped,
349  // output nothing
350  } else if (desc.unwrappedType() == typeid(ThinnedAssociation)) {
351  associationDescriptions.push_back(&desc);
352  } else if (productSelector_.selected(desc)) {
353  keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
354  }
355  }
356 
357  parentThinnedAssociationsHelper.selectAssociationProducts(
358  associationDescriptions, keptProductsInEvent, keepAssociation);
359 
360  for (auto association : associationDescriptions) {
361  if (keepAssociation[association->branchID()]) {
362  keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
363  }
364  }
365 
366  // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept.
367  ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);
368  }
369 
371  std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
372  std::set<BranchID>& keptProductsInEvent) {
373  ProductSelector::checkForDuplicateKeptBranch(desc, trueBranchIDToKeptBranchDesc);
374 
375  if (desc.branchType() == InEvent) {
376  if (desc.produced()) {
377  keptProductsInEvent.insert(desc.originalBranchID());
378  } else {
379  keptProductsInEvent.insert(desc.branchID());
380  }
381  }
382  EDGetToken token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
383  InputTag{desc.moduleLabel(), desc.productInstanceName(), desc.processName()});
384 
385  // Now put it in the list of selected branches.
386  keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token));
387  }
388 
390  std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID) {
391  // Check for branches dropped while an EDAlias was kept.
392  // Replace BranchID of each dropped branch with that of the kept alias.
393  for (BranchIDList& branchIDList : branchIDListHelper_->mutableBranchIDLists()) {
394  for (BranchID::value_type& branchID : branchIDList) {
395  std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
396  droppedBranchIDToKeptBranchID.find(branchID);
397  if (iter != droppedBranchIDToKeptBranchID.end()) {
398  branchID = iter->second;
399  }
400  }
401  }
402  for_all(subProcesses_, [&droppedBranchIDToKeptBranchID](auto& subProcess) {
403  subProcess.fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID);
404  });
405  }
406 
408  EventPrincipal const& ep,
409  std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls) {
411  /* BEGIN relevant bits from OutputModule::doEvent */
412  if (!wantAllEvents_) {
413  EventForOutput e(ep, ModuleDescription(), nullptr);
414  e.setConsumer(this);
415  if (!selectors_.wantEvent(e)) {
416  return;
417  }
418  }
419  processAsync(std::move(iHolder), ep, iEventSetupImpls);
420  /* END relevant bits from OutputModule::doEvent */
421  }
422 
424  EventPrincipal const& principal,
425  std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls) {
426  EventAuxiliary aux(principal.aux());
427  aux.setProcessHistoryID(principal.processHistoryID());
428 
429  EventSelectionIDVector esids{principal.eventSelectionIDs()};
430  if (principal.productRegistry().anyProductProduced() || !wantAllEvents_) {
431  esids.push_back(selector_config_id_);
432  }
433 
435  auto& processHistoryRegistry = processHistoryRegistries_[principal.streamID().value()];
436  processHistoryRegistry.registerProcessHistory(principal.processHistory());
437  BranchListIndexes bli(principal.branchListIndexes());
438  branchIDListHelper_->fixBranchListIndexes(bli);
439  bool deepCopyRetriever = false;
440  ep.fillEventPrincipal(
441  aux,
442  &principal.processHistory(),
443  std::move(esids),
444  std::move(bli),
445  principal.eventToProcessBlockIndexes(),
446  *(principal.productProvenanceRetrieverPtr()), //NOTE: this transfers the per product provenance
447  principal.reader(),
448  deepCopyRetriever);
449  ep.setLuminosityBlockPrincipal(inUseLumiPrincipals_[principal.luminosityBlockPrincipal().index()].get());
450  propagateProducts(InEvent, principal, ep);
451 
452  using namespace edm::waiting_task;
453  chain::first([&](auto nextTask) {
454  EventTransitionInfo info(ep, *((*iEventSetupImpls)[esp_->subProcessIndex()]));
455  schedule_->processOneEventAsync(std::move(nextTask), ep.streamID().value(), info, serviceToken_);
456  }) | chain::ifThen(not subProcesses_.empty(), [this, &ep, iEventSetupImpls](auto nextTask) {
457  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
458  subProcess.doEventAsync(nextTask, ep, iEventSetupImpls);
459  }
460  }) | chain::then([&ep](std::exception_ptr const* iPtr, auto nextTask) {
461  ep.clearEventPrincipal();
462  if (iPtr) {
463  nextTask.doneWaiting(*iPtr);
464  }
465  }) | chain::runLast(std::move(iHolder));
466  }
467 
468  template <>
469  void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>>(
470  WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const& iTransitionInfo, bool cleaningUpAfterException) {
472 
474  ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
475  processBlockPrincipal.fillProcessBlockPrincipal(parentPrincipal.processName(), parentPrincipal.reader());
476  propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
477 
478  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
480  beginGlobalTransitionAsync<Traits>(
481  std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
482  }
483 
484  template <>
485  void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>>(
486  WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const& iTransitionInfo, bool) {
488 
490  ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
491  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
492  propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
493 
494  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
496  beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
497  }
498 
500  ProcessBlockTransitionInfo const& iTransitionInfo,
501  bool cleaningUpAfterException) {
503  ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
504  propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
505 
506  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
507 
508  if (parentProducedProductIsKept(parentPrincipal, processBlockPrincipal)) {
509  ProcessBlockPrincipal& inputProcessBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
510  inputProcessBlockPrincipal.fillProcessBlockPrincipal(parentPrincipal.processName(), parentPrincipal.reader());
511  propagateProducts(InProcess, parentPrincipal, inputProcessBlockPrincipal);
512  ProcessBlockTransitionInfo inputTransitionInfo(inputProcessBlockPrincipal);
513 
514  using namespace edm::waiting_task;
515  chain::first([&](const std::exception_ptr*, auto nextTask) {
517  beginGlobalTransitionAsync<TraitsInput>(std::move(nextTask),
518  *schedule_,
519  inputTransitionInfo,
522  cleaningUpAfterException);
523  }) | chain::then([this](auto nextTask) { writeProcessBlockAsync(nextTask, ProcessBlockType::Input); }) |
524  chain::then([this, info = transitionInfo, cleaningUpAfterException](std::exception_ptr const* iPtr,
525  auto nextTask) mutable {
526  ProcessBlockPrincipal& inputProcessBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
527  inputProcessBlockPrincipal.clearPrincipal();
528  for (auto& s : subProcesses_) {
529  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
530  }
531  if (iPtr) {
532  nextTask.doneWaiting(*iPtr);
533  } else {
535  endGlobalTransitionAsync<Traits>(
536  std::move(nextTask), *schedule_, info, serviceToken_, subProcesses_, cleaningUpAfterException);
537  }
538  }) |
539  chain::runLast(std::move(iHolder));
540  } else {
542  endGlobalTransitionAsync<Traits>(
543  std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
544  }
545  }
546 
547  void SubProcess::doBeginRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const& iTransitionInfo) {
549 
550  RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
551  auto aux = parentPrincipal.aux();
552  aux.setProcessHistoryID(parentPrincipal.processHistoryID());
554  rpp->setAux(aux);
555  auto& processHistoryRegistry = processHistoryRegistries_[historyRunOffset_ + parentPrincipal.index()];
556  inUseRunPrincipals_[parentPrincipal.index()] = rpp;
557  processHistoryRegistry.registerProcessHistory(parentPrincipal.processHistory());
558  rpp->fillRunPrincipal(processHistoryRegistry, parentPrincipal.reader());
559 
560  RunPrincipal& rp = *rpp;
561  propagateProducts(InRun, parentPrincipal, rp);
562 
563  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
564  RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
566  beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
567  }
568 
570  RunTransitionInfo const& iTransitionInfo,
571  bool cleaningUpAfterException) {
572  RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
573  RunPrincipal& rp = *inUseRunPrincipals_[parentPrincipal.index()];
574  propagateProducts(InRun, parentPrincipal, rp);
575 
576  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
577  RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
579  endGlobalTransitionAsync<Traits>(
580  std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
581  }
582 
584  using namespace edm::waiting_task;
585  chain::first([&](std::exception_ptr const*, auto nextTask) {
587  schedule_->writeProcessBlockAsync(
588  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
589  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
591  for (auto& s : subProcesses_) {
592  s.writeProcessBlockAsync(nextTask, processBlockType);
593  }
595  }
596 
598  RunPrincipal const& principal,
599  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
600  using namespace edm::waiting_task;
601 
602  auto rp = inUseRunPrincipals_[principal.index()];
603  chain::first([&](std::exception_ptr const*, auto nextTask) {
605  schedule_->writeRunAsync(nextTask, *rp, &processContext_, actReg_.get(), mergeableRunProductMetadata);
606  }) | chain::ifThen(not subProcesses_.empty(), [this, rp, mergeableRunProductMetadata](auto nextTask) {
607  ServiceRegistry::Operate operateWriteRun(serviceToken_);
608  for (auto& s : subProcesses_) {
609  s.writeRunAsync(nextTask, *rp, mergeableRunProductMetadata);
610  }
612  }
613 
615  //release from list but stay around till end of routine
616  auto rp = std::move(inUseRunPrincipals_[parentPrincipal.index()]);
617  for (auto& s : subProcesses_) {
618  s.clearRunPrincipal(*rp);
619  }
620  rp->clearPrincipal();
621  }
622 
624  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal(processBlockType);
625  processBlockPrincipal.clearPrincipal();
626  for (auto& s : subProcesses_) {
627  s.clearProcessBlockPrincipal(processBlockType);
628  }
629  }
630 
633 
634  LuminosityBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
635  auto aux = parentPrincipal.aux();
636  aux.setProcessHistoryID(parentPrincipal.processHistoryID());
638  lbpp->setAux(aux);
639  auto& processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_ + lbpp->index()];
640  inUseLumiPrincipals_[parentPrincipal.index()] = lbpp;
641  processHistoryRegistry.registerProcessHistory(parentPrincipal.processHistory());
642  lbpp->fillLuminosityBlockPrincipal(&parentPrincipal.processHistory(), parentPrincipal.reader());
643  lbpp->setRunPrincipal(inUseRunPrincipals_[parentPrincipal.runPrincipal().index()]);
644  LuminosityBlockPrincipal& lbp = *lbpp;
645  propagateProducts(InLumi, parentPrincipal, lbp);
646 
647  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
648  LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
650  beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
651  }
652 
654  LumiTransitionInfo const& iTransitionInfo,
655  bool cleaningUpAfterException) {
656  LuminosityBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
657  LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[parentPrincipal.index()];
658  propagateProducts(InLumi, parentPrincipal, lbp);
659 
660  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
661  LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
663  endGlobalTransitionAsync<Traits>(
664  std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
665  }
666 
668  using namespace edm::waiting_task;
669 
670  auto l = inUseLumiPrincipals_[principal.index()];
671  chain::first([&](std::exception_ptr const*, auto nextTask) {
673  schedule_->writeLumiAsync(nextTask, *l, &processContext_, actReg_.get());
674  }) | chain::ifThen(not subProcesses_.empty(), [this, l](auto nextTask) {
675  ServiceRegistry::Operate operateWriteLumi(serviceToken_);
676  for (auto& s : subProcesses_) {
677  s.writeLumiAsync(nextTask, *l);
678  }
680  }
681 
683  //release from list but stay around till end of routine
684  auto lb = std::move(inUseLumiPrincipals_[principal.index()]);
685  for (auto& s : subProcesses_) {
686  s.clearLumiPrincipal(*lb);
687  }
688  lb->setRunPrincipal(std::shared_ptr<RunPrincipal>());
689  lb->clearPrincipal();
690  }
691 
692  void SubProcess::doBeginStream(unsigned int iID) {
694  schedule_->beginStream(iID);
695  for_all(subProcesses_, [iID](auto& subProcess) { subProcess.doBeginStream(iID); });
696  }
697 
698  void SubProcess::doEndStream(unsigned int iID) {
700  schedule_->endStream(iID);
701  for_all(subProcesses_, [iID](auto& subProcess) { subProcess.doEndStream(iID); });
702  }
703 
705  unsigned int id,
706  RunTransitionInfo const& iTransitionInfo) {
708 
709  RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
710  RunPrincipal& rp = *inUseRunPrincipals_[parentPrincipal.index()];
711 
712  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
713  RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
714  beginStreamTransitionAsync<Traits>(
715  std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_);
716  }
717 
719  unsigned int id,
720  RunTransitionInfo const& iTransitionInfo,
721  bool cleaningUpAfterException) {
723 
724  RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
725  RunPrincipal& rp = *inUseRunPrincipals_[parentPrincipal.index()];
726 
727  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
728  RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
729  endStreamTransitionAsync<Traits>(
730  std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
731  }
732 
734  unsigned int id,
735  LumiTransitionInfo const& iTransitionInfo) {
737 
738  LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[iTransitionInfo.principal().index()];
739  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
740  LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
741  beginStreamTransitionAsync<Traits>(
742  std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_);
743  }
744 
746  unsigned int id,
747  LumiTransitionInfo const& iTransitionInfo,
748  bool cleaningUpAfterException) {
749  LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[iTransitionInfo.principal().index()];
751  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
752  LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
753  endStreamTransitionAsync<Traits>(
754  std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
755  }
756 
757  void SubProcess::propagateProducts(BranchType type, Principal const& parentPrincipal, Principal& principal) const {
758  SelectedProducts const& keptVector = keptProducts()[type];
759  for (auto const& item : keptVector) {
760  BranchDescription const& desc = *item.first;
761  ProductResolverBase const* parentProductResolver = parentPrincipal.getProductResolver(desc.branchID());
762  if (parentProductResolver != nullptr) {
763  ProductResolverBase* productResolver = principal.getModifiableProductResolver(desc.branchID());
764  if (productResolver != nullptr) {
765  //Propagate the per event(run)(lumi)(processBlock) data for this product to the subprocess.
766  productResolver->connectTo(*parentProductResolver, &parentPrincipal);
767  }
768  }
769  }
770  }
771 
772  bool SubProcess::parentProducedProductIsKept(Principal const& parentPrincipal, Principal& principal) const {
773  SelectedProducts const& keptVector = keptProducts()[InProcess];
774  for (auto const& item : keptVector) {
775  BranchDescription const& desc = *item.first;
776  assert(desc.branchType() == InProcess);
777  ProductResolverBase const* parentProductResolver = parentPrincipal.getProductResolver(desc.branchID());
778  if (parentProductResolver != nullptr) {
779  ProductResolverBase* productResolver = principal.getModifiableProductResolver(desc.branchID());
780  if (productResolver != nullptr) {
781  if (parentProductResolver->branchDescription().produced()) {
782  return true;
783  }
784  }
785  }
786  }
787  return false;
788  }
789 
791  branchIDListHelper_->updateFromParent(branchIDLists);
793  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
794  }
795 
796  // Call respondToOpenInputFile() on all Modules
799  schedule_->respondToOpenInputFile(fb);
800  for_all(subProcesses_, [&fb](auto& subProcess) { subProcess.respondToOpenInputFile(fb); });
801  }
802 
803  // free function
805  std::vector<std::string> subProcesses =
806  parameterSet.getUntrackedParameter<std::vector<std::string>>("@all_subprocesses");
807  if (!subProcesses.empty()) {
808  return parameterSet.popVParameterSet("subProcesses");
809  }
810  return {};
811  }
812 } // namespace edm
unsigned int historyRunOffset_
Definition: SubProcess.h:284
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &, bool cleaningUpAfterException)
Definition: SubProcess.cc:745
unsigned int historyLumiOffset_
Definition: SubProcess.h:283
ParameterSetID selector_config_id_
Definition: SubProcess.h:306
std::vector< BranchDescription const * > allBranchDescriptions() const
ProductResolverBase * getModifiableProductResolver(BranchID const &oid)
Definition: Principal.h:147
void clearPrincipal()
Definition: Principal.cc:383
static const TGPicture * info(bool iBackgroundIsBlack)
void respondToOpenInputFile(FileBlock const &fb)
Definition: SubProcess.cc:797
SelectedProductsForBranchType const & keptProducts() const
Definition: SubProcess.h:82
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
std::vector< ModuleDescription const * > const & allModules() const
EventAuxiliary const & aux() const
std::unique_ptr< ParameterSet > popParameterSet(std::string const &name)
std::vector< std::string > const & getAllTriggerNames()
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
ProductList const & productList() const
LuminosityBlockPrincipal & principal()
void clearLumiPrincipal(LuminosityBlockPrincipal &)
Definition: SubProcess.cc:682
bool parentProducedProductIsKept(Principal const &parentPrincipal, Principal &principal) const
Definition: SubProcess.cc:772
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
Definition: SubProcess.h:285
void clearProcessBlockPrincipal(ProcessBlockType)
Definition: SubProcess.cc:623
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: SubProcess.h:275
ProductRegistry const & productRegistry() const
Definition: Principal.h:140
std::vector< SubProcess > subProcesses_
Definition: SubProcess.h:293
edm::propagate_const< std::shared_ptr< SubProcessBlockHelper > > processBlockHelper_
Definition: SubProcess.h:274
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: SubProcess.h:258
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
Definition: SubProcess.cc:597
void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &, bool iPrefetchMayGet)
RunPrincipal const & runPrincipal() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
Definition: SubProcess.h:280
bool exists(std::string const &parameterName) const
checks if a parameter exists
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &, bool cleaningUpAfterException)
Definition: SubProcess.cc:718
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< ModuleProcessName > const & modulesInPreviousProcessesWhoseProductsAreConsumedBy(unsigned int moduleID) const
void updateBranchIDListHelper(BranchIDLists const &)
Definition: SubProcess.cc:790
LuminosityBlockPrincipal const & luminosityBlockPrincipal() const
void processAsync(WaitingTaskHolder iHolder, EventPrincipal const &e, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
Definition: SubProcess.cc:423
std::string_view moduleLabel() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void doEndStream(unsigned int)
Definition: SubProcess.cc:698
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
void initialize(ProductSelectorRules const &rules, std::vector< BranchDescription const *> const &branchDescriptions)
void setParentProcessContext(ProcessContext const *parentProcessContext)
StreamID streamID() const
assert(be >=bs)
SubProcess(ParameterSet &parameterSet, ParameterSet const &topLevelParameterSet, std::shared_ptr< ProductRegistry const > parentProductRegistry, std::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, ProcessBlockHelperBase const &parentProcessBlockHelper, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, SubProcessParentageHelper const &parentSubProcessParentageHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext, ModuleTypeResolverMaker const *typeResolverMaker)
Definition: SubProcess.cc:51
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
Definition: SubProcess.h:254
constexpr auto then(O &&iO)
Definition: chain_first.h:277
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: SubProcess.h:292
void doEndProcessBlockAsync(WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
Definition: SubProcess.cc:499
void swap(Association< C > &lhs, Association< C > &rhs)
Definition: Association.h:112
BranchType
Definition: BranchType.h:11
std::vector< EventSelectionID > EventSelectionIDVector
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
EventToProcessBlockIndexes const & eventToProcessBlockIndexes() const
std::vector< std::shared_ptr< RunPrincipal > > inUseRunPrincipals_
Definition: SubProcess.h:289
EDGetTokenT< ProductType > consumes(edm::InputTag const &tag)
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: SubProcess.h:262
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
bool anyProductProduced() const
ProcessBlockPrincipal & processBlockPrincipal() const
T getUntrackedParameter(std::string const &, T const &) const
void doEventAsync(WaitingTaskHolder iHolder, EventPrincipal const &principal, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
Definition: SubProcess.cc:407
virtual void connectTo(ProductResolverBase const &, Principal const *)=0
ServiceToken serviceToken_
Definition: SubProcess.h:270
std::vector< BranchListIndex > BranchListIndexes
std::vector< std::shared_ptr< LuminosityBlockPrincipal > > inUseLumiPrincipals_
Definition: SubProcess.h:290
ParameterSet const & registerIt()
void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const &iTransitionInfo)
Definition: SubProcess.cc:631
std::tuple< layerClusterToCaloParticle, caloParticleToLayerCluster > association
~SubProcess() override
Definition: SubProcess.cc:238
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
static void checkForDuplicateKeptBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const *> &trueBranchIDToKeptBranchDesc)
unsigned int value_type
Definition: BranchID.h:16
SelectedProductsForBranchType keptProducts_
Definition: SubProcess.h:300
std::vector< std::shared_ptr< const EventSetupImpl > > const * eventSetupImpls() const
void selectProducts(ProductRegistry const &preg, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, std::map< BranchID, bool > &keepAssociation)
Definition: SubProcess.cc:328
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
static void fillDroppedToKept(ProductRegistry const &preg, std::map< BranchID, BranchDescription const *> const &trueBranchIDToKeptBranchDesc, std::map< BranchID::value_type, BranchID::value_type > &droppedBranchIDToKeptBranchID_)
int merge(int argc, char *argv[])
Definition: DMRmerge.cc:37
std::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &, ActivityRegistry *, ParameterSet const *eventSetupPset=nullptr, unsigned int maxConcurrentIOVs=0, bool dumpOptions=false)
ProductSelectorRules productSelectorRules_
Definition: SubProcess.h:301
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
Definition: SubProcess.h:291
edm::propagate_const< std::unique_ptr< ParameterSet > > processParameterSet_
Definition: SubProcess.h:294
RunIndex index() const
Definition: RunPrincipal.h:56
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector, ConsumesCollector &&iC)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
void propagateProducts(BranchType type, Principal const &parentPrincipal, Principal &principal) const
Definition: SubProcess.cc:757
bool selected(BranchDescription const &desc) const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::vector< std::shared_ptr< const EventSetupImpl > > const * eventSetupImpls() const
void writeProcessBlockAsync(edm::WaitingTaskHolder task, ProcessBlockType)
Definition: SubProcess.cc:583
Log< level::Info, false > LogInfo
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: SubProcess.h:277
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:804
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
Definition: SubProcess.h:311
ProductSelector productSelector_
Definition: SubProcess.h:302
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
detail::TriggerResultsBasedEventSelector selectors_
Definition: SubProcess.h:307
std::string const & processName() const
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &)
Definition: SubProcess.cc:733
ProcessHistory const & processHistory() const
Definition: Principal.h:134
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &)
Definition: SubProcess.cc:704
void connectToSubProcess(ActivityRegistry &iOther)
std::shared_ptr< ActivityRegistry > actReg_
Definition: SubProcess.h:269
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
ProductProvenanceRetriever const * productProvenanceRetrieverPtr() const
void doEndRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
Definition: SubProcess.cc:569
void doBeginRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const &iTransitionInfo)
Definition: SubProcess.cc:547
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: SubProcess.h:273
ProcessBlockPrincipal & principal()
std::vector< HistoryAppender > historyAppenders_
Definition: SubProcess.h:286
std::vector< BranchID::value_type > BranchIDList
Definition: BranchIDList.h:18
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< ParameterSet > popVParameterSet(std::string const &name)
HLT enums.
void selectAssociationProducts(std::vector< BranchDescription const *> const &associationDescriptions, std::set< BranchID > const &keptProductsInEvent, std::map< BranchID, bool > &keepAssociation) const
void clearRunPrincipal(RunPrincipal &)
Definition: SubProcess.cc:614
void doBeginStream(unsigned int)
Definition: SubProcess.cc:692
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< ProductRegistry const > parentPreg_
Definition: SubProcess.h:271
LuminosityBlockIndex index() const
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
Definition: SubProcess.cc:389
ProcessHistoryID const & processHistoryID() const
Definition: Principal.h:136
EventSelectionIDVector const & eventSelectionIDs() const
bool initialized() const
ProcessContext processContext_
Definition: SubProcess.h:279
RunAuxiliary const & aux() const
Definition: RunPrincipal.h:59
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
LuminosityBlockAuxiliary const & aux() const
unsigned int value() const
Definition: StreamID.h:43
std::vector< ModuleProcessName > keepOnlyConsumedUnscheduledModules(bool deleteModules)
Definition: SubProcess.cc:240
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
BranchListIndexes const & branchListIndexes() const
tmp
align.sh
Definition: createJobs.py:716
void keepThisBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const *> &trueBranchIDToKeptBranchDesc, std::set< BranchID > &keptProductsInEvent)
Definition: SubProcess.cc:370
RunPrincipal & principal()
void doEndLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
Definition: SubProcess.cc:653
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
Definition: SubProcess.cc:667
std::shared_ptr< ProductRegistry const > preg_
Definition: SubProcess.h:272
PrincipalCache principalCache_
Definition: SubProcess.h:287
def move(src, dest)
Definition: eostools.py:511
edm::propagate_const< std::shared_ptr< SubProcessParentageHelper > > subProcessParentageHelper_
Definition: SubProcess.h:276
DelayedReader * reader() const
Definition: Principal.h:181
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Definition: SubProcess.h:278
ConstProductResolverPtr getProductResolver(BranchID const &oid) const
Definition: Principal.cc:555