CMS 3D CMS Logo

SubProcess.cc
Go to the documentation of this file.
2 
42 
43 #include "boost/range/adaptor/reversed.hpp"
44 
45 #include <cassert>
46 #include <exception>
47 #include <string>
48 
49 namespace edm {
50 
52  ParameterSet const& topLevelParameterSet,
53  std::shared_ptr<ProductRegistry const> parentProductRegistry,
54  std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
55  ProcessBlockHelperBase const& parentProcessBlockHelper,
56  ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
57  SubProcessParentageHelper const& parentSubProcessParentageHelper,
59  ActivityRegistry& parentActReg,
60  ServiceToken const& token,
62  PreallocationConfiguration const& preallocConfig,
63  ProcessContext const* parentProcessContext)
64  : EDConsumerBase(),
65  serviceToken_(),
66  parentPreg_(parentProductRegistry),
67  preg_(),
68  branchIDListHelper_(),
69  act_table_(),
70  processConfiguration_(),
71  historyLumiOffset_(preallocConfig.numberOfStreams()),
72  historyRunOffset_(historyLumiOffset_ + preallocConfig.numberOfLuminosityBlocks()),
73  processHistoryRegistries_(historyRunOffset_ + preallocConfig.numberOfRuns()),
74  historyAppenders_(historyRunOffset_ + preallocConfig.numberOfRuns()),
75  principalCache_(),
76  esp_(),
77  schedule_(),
78  subProcesses_(),
79  processParameterSet_(),
80  productSelectorRules_(parameterSet, "outputCommands", "OutputModule"),
81  productSelector_(),
82  wantAllEvents_(true) {
83  //Setup the event selection
85 
86  ParameterSet selectevents = parameterSet.getUntrackedParameterSet("SelectEvents", ParameterSet());
87 
88  selectevents.registerIt(); // Just in case this PSet is not registered
90  selectevents, tns->getProcessName(), getAllTriggerNames(), selectors_, consumesCollector());
91  std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
93  selectevents, "", outputModulePathPositions, parentProductRegistry->anyProductProduced());
94 
95  std::map<BranchID, bool> keepAssociation;
96  selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
97 
98  std::string const maxEvents("maxEvents");
99  std::string const maxLumis("maxLuminosityBlocks");
100 
101  // propagate_const<T> has no reset() function
103  std::unique_ptr<ParameterSet>(parameterSet.popParameterSet(std::string("process")).release());
104 
105  // if this process has a maxEvents or maxLuminosityBlocks parameter set, remove them.
106  if (processParameterSet_->exists(maxEvents)) {
107  processParameterSet_->popParameterSet(maxEvents);
108  }
109  if (processParameterSet_->exists(maxLumis)) {
110  processParameterSet_->popParameterSet(maxLumis);
111  }
112 
113  // if the top level process has a maxEvents or maxLuminosityBlocks parameter set, add them to this process.
114  if (topLevelParameterSet.exists(maxEvents)) {
115  processParameterSet_->addUntrackedParameter<ParameterSet>(
116  maxEvents, topLevelParameterSet.getUntrackedParameterSet(maxEvents));
117  }
118  if (topLevelParameterSet.exists(maxLumis)) {
119  processParameterSet_->addUntrackedParameter<ParameterSet>(
120  maxLumis, topLevelParameterSet.getUntrackedParameterSet(maxLumis));
121  }
122 
123  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
124  auto subProcessVParameterSet = popSubProcessVParameterSet(*processParameterSet_);
125  bool hasSubProcesses = subProcessVParameterSet.size() != 0ull;
126 
127  // Validates the parameters in the 'options', 'maxEvents', and 'maxLuminosityBlocks'
128  // top level parameter sets. Default values are also set in here if the
129  // parameters were not explicitly set.
131 
132  processBlockHelper_ = std::make_shared<SubProcessBlockHelper>();
133 
134  ScheduleItems items(*parentProductRegistry, *this, *processBlockHelper_, parentProcessBlockHelper);
135  actReg_ = items.actReg_;
136 
137  //initialize the services
138  ServiceToken iToken;
139 
140  // get any configured services.
141  auto serviceSets = processParameterSet_->popVParameterSet(std::string("services"));
142 
143  ServiceToken newToken = items.initServices(serviceSets, *processParameterSet_, token, iLegacy, false);
144  parentActReg.connectToSubProcess(*items.actReg_);
145  serviceToken_ = items.addCPRandTNS(*processParameterSet_, newToken);
146 
147  //make the services available
149 
150  // intialize miscellaneous items
151  items.initMisc(*processParameterSet_);
152 
153  // intialize the event setup provider
154  esp_ = esController.makeProvider(*processParameterSet_, actReg_.get());
155 
156  branchIDListHelper_ = items.branchIDListHelper();
157  updateBranchIDListHelper(parentBranchIDListHelper->branchIDLists());
158 
159  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
160  thinnedAssociationsHelper_->updateFromParentProcess(
161  parentThinnedAssociationsHelper, keepAssociation, droppedBranchIDToKeptBranchID_);
162 
163  // intialize the Schedule
164  schedule_ = items.initSchedule(
165  *processParameterSet_, hasSubProcesses, preallocConfig, &processContext_, *processBlockHelper_);
166 
167  // set the items
168  act_table_ = std::move(items.act_table_);
169  preg_ = items.preg();
170 
171  subProcessParentageHelper_ = items.subProcessParentageHelper();
172  subProcessParentageHelper_->update(parentSubProcessParentageHelper, *parentProductRegistry);
173 
174  processConfiguration_ = items.processConfiguration();
176  processContext_.setParentProcessContext(parentProcessContext);
177 
179  for (unsigned int index = 0; index < preallocConfig.numberOfStreams(); ++index) {
180  auto ep = std::make_shared<EventPrincipal>(preg_,
185  index,
186  false /*not primary process*/,
189  }
190 
191  for (unsigned int index = 0; index < preallocConfig.numberOfRuns(); ++index) {
192  auto rpp = std::make_unique<RunPrincipal>(
195  }
196 
197  for (unsigned int index = 0; index < preallocConfig.numberOfLuminosityBlocks(); ++index) {
198  auto lbpp = std::make_unique<LuminosityBlockPrincipal>(
201  }
202 
203  {
204  auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_, false);
206 
207  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_, false);
209  }
210 
211  inUseRunPrincipals_.resize(preallocConfig.numberOfRuns());
212  inUseLumiPrincipals_.resize(preallocConfig.numberOfLuminosityBlocks());
213 
214  subProcesses_.reserve(subProcessVParameterSet.size());
215  for (auto& subProcessPSet : subProcessVParameterSet) {
216  subProcesses_.emplace_back(subProcessPSet,
217  topLevelParameterSet,
218  preg_,
223  esController,
224  *items.actReg_,
225  newToken,
226  iLegacy,
227  preallocConfig,
228  &processContext_);
229  }
230  }
231 
233 
234  std::vector<ModuleProcessName> SubProcess::keepOnlyConsumedUnscheduledModules(bool deleteModules) {
235  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
237 
238  // Note: all these may throw
240 
241  // Consumes information from the child SubProcesses
242  std::vector<ModuleProcessName> consumedByChildren;
243  for_all(subProcesses_, [&consumedByChildren, deleteModules](auto& subProcess) {
244  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
245  if (consumedByChildren.empty()) {
246  std::swap(consumedByChildren, c);
247  } else if (not c.empty()) {
248  std::vector<ModuleProcessName> tmp;
249  tmp.reserve(consumedByChildren.size() + c.size());
250  std::merge(consumedByChildren.begin(), consumedByChildren.end(), c.begin(), c.end(), std::back_inserter(tmp));
251  std::swap(consumedByChildren, tmp);
252  }
253  });
254 
255  // Non-consumed unscheduled modules in this SubProcess, take into account of the consumes from child SubProcesses
256  if (deleteModules) {
257  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedByChildren);
258  not unusedModules.empty()) {
260 
261  edm::LogInfo("DeleteModules").log([&unusedModules, this](auto& l) {
262  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
263  "and "
264  "therefore they are deleted from SubProcess "
265  << processConfiguration_->processName() << " before beginJob transition.";
266  for (auto const& description : unusedModules) {
267  l << "\n " << description->moduleLabel();
268  }
269  });
270  for (auto const& description : unusedModules) {
271  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
272  }
273  }
274  }
275 
276  // Products possibly consumed from the parent (Sub)Process
277  for (auto const& description : pathsAndConsumesOfModules_.allModules()) {
278  for (auto const& dep :
280  auto it = std::lower_bound(consumedByChildren.begin(),
281  consumedByChildren.end(),
282  ModuleProcessName{dep.moduleLabel(), dep.processName()});
283  consumedByChildren.emplace(it, dep.moduleLabel(), dep.processName());
284  }
285  }
286  return consumedByChildren;
287  }
288 
289  void SubProcess::doBeginJob() { this->beginJob(); }
290 
292 
294  // If event selection is being used, the SubProcess class reads TriggerResults
295  // object(s) in the parent process from the event. This next call is needed for
296  // getByToken to work properly. Normally, this is done by the worker, but since
297  // a SubProcess is not a module, it has no worker.
298  updateLookup(InEvent, *parentPreg_->productLookup(InEvent), false);
299 
300  if (!droppedBranchIDToKeptBranchID().empty()) {
302  }
304  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
305  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
306  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
307  }
308 
312  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
313  schedule_->endJob(c);
314  for (auto& subProcess : subProcesses_) {
315  c.call([&subProcess]() { subProcess.doEndJob(); });
316  }
317  if (c.hasThrown()) {
318  c.rethrow();
319  }
320  }
321 
323  ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
324  std::map<BranchID, bool>& keepAssociation) {
326  return;
328 
329  // TODO: See if we can collapse keptProducts_ and productSelector_ into a
330  // single object. See the notes in the header for ProductSelector
331  // for more information.
332 
333  std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
334  std::vector<BranchDescription const*> associationDescriptions;
335  std::set<BranchID> keptProductsInEvent;
336 
337  for (auto const& it : preg.productList()) {
338  BranchDescription const& desc = it.second;
339  if (desc.transient()) {
340  // if the class of the branch is marked transient, output nothing
341  } else if (!desc.present() && !desc.produced()) {
342  // else if the branch containing the product has been previously dropped,
343  // output nothing
344  } else if (desc.unwrappedType() == typeid(ThinnedAssociation)) {
345  associationDescriptions.push_back(&desc);
346  } else if (productSelector_.selected(desc)) {
347  keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
348  }
349  }
350 
351  parentThinnedAssociationsHelper.selectAssociationProducts(
352  associationDescriptions, keptProductsInEvent, keepAssociation);
353 
354  for (auto association : associationDescriptions) {
355  if (keepAssociation[association->branchID()]) {
356  keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
357  }
358  }
359 
360  // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept.
361  ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);
362  }
363 
365  std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
366  std::set<BranchID>& keptProductsInEvent) {
367  ProductSelector::checkForDuplicateKeptBranch(desc, trueBranchIDToKeptBranchDesc);
368 
369  if (desc.branchType() == InEvent) {
370  if (desc.produced()) {
371  keptProductsInEvent.insert(desc.originalBranchID());
372  } else {
373  keptProductsInEvent.insert(desc.branchID());
374  }
375  }
376  EDGetToken token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
377  InputTag{desc.moduleLabel(), desc.productInstanceName(), desc.processName()});
378 
379  // Now put it in the list of selected branches.
380  keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token));
381  }
382 
384  std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID) {
385  // Check for branches dropped while an EDAlias was kept.
386  // Replace BranchID of each dropped branch with that of the kept alias.
387  for (BranchIDList& branchIDList : branchIDListHelper_->mutableBranchIDLists()) {
388  for (BranchID::value_type& branchID : branchIDList) {
389  std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
390  droppedBranchIDToKeptBranchID.find(branchID);
391  if (iter != droppedBranchIDToKeptBranchID.end()) {
392  branchID = iter->second;
393  }
394  }
395  }
396  for_all(subProcesses_, [&droppedBranchIDToKeptBranchID](auto& subProcess) {
397  subProcess.fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID);
398  });
399  }
400 
402  EventPrincipal const& ep,
403  std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls) {
405  /* BEGIN relevant bits from OutputModule::doEvent */
406  if (!wantAllEvents_) {
407  EventForOutput e(ep, ModuleDescription(), nullptr);
408  e.setConsumer(this);
409  if (!selectors_.wantEvent(e)) {
410  return;
411  }
412  }
413  processAsync(std::move(iHolder), ep, iEventSetupImpls);
414  /* END relevant bits from OutputModule::doEvent */
415  }
416 
418  EventPrincipal const& principal,
419  std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls) {
420  EventAuxiliary aux(principal.aux());
421  aux.setProcessHistoryID(principal.processHistoryID());
422 
423  EventSelectionIDVector esids{principal.eventSelectionIDs()};
424  if (principal.productRegistry().anyProductProduced() || !wantAllEvents_) {
425  esids.push_back(selector_config_id_);
426  }
427 
429  auto& processHistoryRegistry = processHistoryRegistries_[principal.streamID().value()];
430  processHistoryRegistry.registerProcessHistory(principal.processHistory());
431  BranchListIndexes bli(principal.branchListIndexes());
432  branchIDListHelper_->fixBranchListIndexes(bli);
433  bool deepCopyRetriever = false;
434  ep.fillEventPrincipal(
435  aux,
436  &principal.processHistory(),
437  std::move(esids),
438  std::move(bli),
439  principal.eventToProcessBlockIndexes(),
440  *(principal.productProvenanceRetrieverPtr()), //NOTE: this transfers the per product provenance
441  principal.reader(),
442  deepCopyRetriever);
443  ep.setLuminosityBlockPrincipal(inUseLumiPrincipals_[principal.luminosityBlockPrincipal().index()].get());
444  propagateProducts(InEvent, principal, ep);
445 
446  using namespace edm::waiting_task;
447  chain::first([&](auto nextTask) {
448  EventTransitionInfo info(ep, *((*iEventSetupImpls)[esp_->subProcessIndex()]));
449  schedule_->processOneEventAsync(std::move(nextTask), ep.streamID().value(), info, serviceToken_);
450  }) | chain::ifThen(not subProcesses_.empty(), [this, &ep, iEventSetupImpls](auto nextTask) {
451  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
452  subProcess.doEventAsync(nextTask, ep, iEventSetupImpls);
453  }
454  }) | chain::then([&ep](std::exception_ptr const* iPtr, auto nextTask) {
455  ep.clearEventPrincipal();
456  if (iPtr) {
457  nextTask.doneWaiting(*iPtr);
458  }
459  }) | chain::runLast(std::move(iHolder));
460  }
461 
462  template <>
463  void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>>(
464  WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const& iTransitionInfo, bool cleaningUpAfterException) {
466 
468  ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
469  processBlockPrincipal.fillProcessBlockPrincipal(parentPrincipal.processName(), parentPrincipal.reader());
470  propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
471 
472  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
474  beginGlobalTransitionAsync<Traits>(
475  std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
476  }
477 
478  template <>
479  void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>>(
480  WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const& iTransitionInfo, bool) {
482 
484  ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
485  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
486  propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
487 
488  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
490  beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
491  }
492 
494  ProcessBlockTransitionInfo const& iTransitionInfo,
495  bool cleaningUpAfterException) {
497  ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
498  propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
499 
500  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
501 
502  if (parentProducedProductIsKept(parentPrincipal, processBlockPrincipal)) {
503  ProcessBlockPrincipal& inputProcessBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
504  inputProcessBlockPrincipal.fillProcessBlockPrincipal(parentPrincipal.processName(), parentPrincipal.reader());
505  propagateProducts(InProcess, parentPrincipal, inputProcessBlockPrincipal);
506  ProcessBlockTransitionInfo inputTransitionInfo(inputProcessBlockPrincipal);
507 
508  using namespace edm::waiting_task;
509  chain::first([&](const std::exception_ptr*, auto nextTask) {
511  beginGlobalTransitionAsync<TraitsInput>(std::move(nextTask),
512  *schedule_,
513  inputTransitionInfo,
516  cleaningUpAfterException);
517  }) | chain::then([this](auto nextTask) { writeProcessBlockAsync(nextTask, ProcessBlockType::Input); }) |
518  chain::then([this, info = transitionInfo, cleaningUpAfterException](std::exception_ptr const* iPtr,
519  auto nextTask) mutable {
520  ProcessBlockPrincipal& inputProcessBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
521  inputProcessBlockPrincipal.clearPrincipal();
522  for (auto& s : subProcesses_) {
523  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
524  }
525  if (iPtr) {
526  nextTask.doneWaiting(*iPtr);
527  } else {
529  endGlobalTransitionAsync<Traits>(
530  std::move(nextTask), *schedule_, info, serviceToken_, subProcesses_, cleaningUpAfterException);
531  }
532  }) |
533  chain::runLast(std::move(iHolder));
534  } else {
536  endGlobalTransitionAsync<Traits>(
537  std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
538  }
539  }
540 
541  void SubProcess::doBeginRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const& iTransitionInfo) {
543 
544  RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
545  auto aux = parentPrincipal.aux();
546  aux.setProcessHistoryID(parentPrincipal.processHistoryID());
548  rpp->setAux(aux);
549  auto& processHistoryRegistry = processHistoryRegistries_[historyRunOffset_ + parentPrincipal.index()];
550  inUseRunPrincipals_[parentPrincipal.index()] = rpp;
551  processHistoryRegistry.registerProcessHistory(parentPrincipal.processHistory());
552  rpp->fillRunPrincipal(processHistoryRegistry, parentPrincipal.reader());
553 
554  RunPrincipal& rp = *rpp;
555  propagateProducts(InRun, parentPrincipal, rp);
556 
557  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
558  RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
560  beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
561  }
562 
564  RunTransitionInfo const& iTransitionInfo,
565  bool cleaningUpAfterException) {
566  RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
567  RunPrincipal& rp = *inUseRunPrincipals_[parentPrincipal.index()];
568  propagateProducts(InRun, parentPrincipal, rp);
569 
570  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
571  RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
573  endGlobalTransitionAsync<Traits>(
574  std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
575  }
576 
578  using namespace edm::waiting_task;
579  chain::first([&](std::exception_ptr const*, auto nextTask) {
581  schedule_->writeProcessBlockAsync(
582  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
583  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
585  for (auto& s : subProcesses_) {
586  s.writeProcessBlockAsync(nextTask, processBlockType);
587  }
589  }
590 
592  RunPrincipal const& principal,
593  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
594  using namespace edm::waiting_task;
595 
596  auto rp = inUseRunPrincipals_[principal.index()];
597  chain::first([&](std::exception_ptr const*, auto nextTask) {
599  schedule_->writeRunAsync(nextTask, *rp, &processContext_, actReg_.get(), mergeableRunProductMetadata);
600  }) | chain::ifThen(not subProcesses_.empty(), [this, rp, mergeableRunProductMetadata](auto nextTask) {
601  ServiceRegistry::Operate operateWriteRun(serviceToken_);
602  for (auto& s : subProcesses_) {
603  s.writeRunAsync(nextTask, *rp, mergeableRunProductMetadata);
604  }
606  }
607 
609  //release from list but stay around till end of routine
610  auto rp = std::move(inUseRunPrincipals_[parentPrincipal.index()]);
611  for (auto& s : subProcesses_) {
612  s.clearRunPrincipal(*rp);
613  }
614  rp->clearPrincipal();
615  }
616 
618  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal(processBlockType);
619  processBlockPrincipal.clearPrincipal();
620  for (auto& s : subProcesses_) {
621  s.clearProcessBlockPrincipal(processBlockType);
622  }
623  }
624 
627 
628  LuminosityBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
629  auto aux = parentPrincipal.aux();
630  aux.setProcessHistoryID(parentPrincipal.processHistoryID());
632  lbpp->setAux(aux);
633  auto& processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_ + lbpp->index()];
634  inUseLumiPrincipals_[parentPrincipal.index()] = lbpp;
635  processHistoryRegistry.registerProcessHistory(parentPrincipal.processHistory());
636  lbpp->fillLuminosityBlockPrincipal(&parentPrincipal.processHistory(), parentPrincipal.reader());
637  lbpp->setRunPrincipal(inUseRunPrincipals_[parentPrincipal.runPrincipal().index()]);
638  LuminosityBlockPrincipal& lbp = *lbpp;
639  propagateProducts(InLumi, parentPrincipal, lbp);
640 
641  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
642  LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
644  beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
645  }
646 
648  LumiTransitionInfo const& iTransitionInfo,
649  bool cleaningUpAfterException) {
650  LuminosityBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
651  LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[parentPrincipal.index()];
652  propagateProducts(InLumi, parentPrincipal, lbp);
653 
654  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
655  LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
657  endGlobalTransitionAsync<Traits>(
658  std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
659  }
660 
662  using namespace edm::waiting_task;
663 
664  auto l = inUseLumiPrincipals_[principal.index()];
665  chain::first([&](std::exception_ptr const*, auto nextTask) {
667  schedule_->writeLumiAsync(nextTask, *l, &processContext_, actReg_.get());
668  }) | chain::ifThen(not subProcesses_.empty(), [this, l](auto nextTask) {
669  ServiceRegistry::Operate operateWriteLumi(serviceToken_);
670  for (auto& s : subProcesses_) {
671  s.writeLumiAsync(nextTask, *l);
672  }
674  }
675 
677  //release from list but stay around till end of routine
678  auto lb = std::move(inUseLumiPrincipals_[principal.index()]);
679  for (auto& s : subProcesses_) {
680  s.clearLumiPrincipal(*lb);
681  }
682  lb->setRunPrincipal(std::shared_ptr<RunPrincipal>());
683  lb->clearPrincipal();
684  }
685 
686  void SubProcess::doBeginStream(unsigned int iID) {
688  schedule_->beginStream(iID);
689  for_all(subProcesses_, [iID](auto& subProcess) { subProcess.doBeginStream(iID); });
690  }
691 
692  void SubProcess::doEndStream(unsigned int iID) {
694  schedule_->endStream(iID);
695  for_all(subProcesses_, [iID](auto& subProcess) { subProcess.doEndStream(iID); });
696  }
697 
699  unsigned int id,
700  RunTransitionInfo const& iTransitionInfo) {
702 
703  RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
704  RunPrincipal& rp = *inUseRunPrincipals_[parentPrincipal.index()];
705 
706  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
707  RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
708  beginStreamTransitionAsync<Traits>(
709  std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_);
710  }
711 
713  unsigned int id,
714  RunTransitionInfo const& iTransitionInfo,
715  bool cleaningUpAfterException) {
717 
718  RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
719  RunPrincipal& rp = *inUseRunPrincipals_[parentPrincipal.index()];
720 
721  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
722  RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
723  endStreamTransitionAsync<Traits>(
724  std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
725  }
726 
728  unsigned int id,
729  LumiTransitionInfo const& iTransitionInfo) {
731 
732  LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[iTransitionInfo.principal().index()];
733  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
734  LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
735  beginStreamTransitionAsync<Traits>(
736  std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_);
737  }
738 
740  unsigned int id,
741  LumiTransitionInfo const& iTransitionInfo,
742  bool cleaningUpAfterException) {
743  LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[iTransitionInfo.principal().index()];
745  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
746  LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
747  endStreamTransitionAsync<Traits>(
748  std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
749  }
750 
751  void SubProcess::propagateProducts(BranchType type, Principal const& parentPrincipal, Principal& principal) const {
752  SelectedProducts const& keptVector = keptProducts()[type];
753  for (auto const& item : keptVector) {
754  BranchDescription const& desc = *item.first;
755  ProductResolverBase const* parentProductResolver = parentPrincipal.getProductResolver(desc.branchID());
756  if (parentProductResolver != nullptr) {
757  ProductResolverBase* productResolver = principal.getModifiableProductResolver(desc.branchID());
758  if (productResolver != nullptr) {
759  //Propagate the per event(run)(lumi)(processBlock) data for this product to the subprocess.
760  productResolver->connectTo(*parentProductResolver, &parentPrincipal);
761  }
762  }
763  }
764  }
765 
766  bool SubProcess::parentProducedProductIsKept(Principal const& parentPrincipal, Principal& principal) const {
767  SelectedProducts const& keptVector = keptProducts()[InProcess];
768  for (auto const& item : keptVector) {
769  BranchDescription const& desc = *item.first;
770  assert(desc.branchType() == InProcess);
771  ProductResolverBase const* parentProductResolver = parentPrincipal.getProductResolver(desc.branchID());
772  if (parentProductResolver != nullptr) {
773  ProductResolverBase* productResolver = principal.getModifiableProductResolver(desc.branchID());
774  if (productResolver != nullptr) {
775  if (parentProductResolver->branchDescription().produced()) {
776  return true;
777  }
778  }
779  }
780  }
781  return false;
782  }
783 
785  branchIDListHelper_->updateFromParent(branchIDLists);
787  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
788  }
789 
790  // Call respondToOpenInputFile() on all Modules
793  schedule_->respondToOpenInputFile(fb);
794  for_all(subProcesses_, [&fb](auto& subProcess) { subProcess.respondToOpenInputFile(fb); });
795  }
796 
797  // free function
799  std::vector<std::string> subProcesses =
800  parameterSet.getUntrackedParameter<std::vector<std::string>>("@all_subprocesses");
801  if (!subProcesses.empty()) {
802  return parameterSet.popVParameterSet("subProcesses");
803  }
804  return {};
805  }
806 } // namespace edm
unsigned int historyRunOffset_
Definition: SubProcess.h:282
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &, bool cleaningUpAfterException)
Definition: SubProcess.cc:739
unsigned int historyLumiOffset_
Definition: SubProcess.h:281
ParameterSetID selector_config_id_
Definition: SubProcess.h:304
std::vector< BranchDescription const * > allBranchDescriptions() const
ProductResolverBase * getModifiableProductResolver(BranchID const &oid)
Definition: Principal.h:153
void clearPrincipal()
Definition: Principal.cc:390
static const TGPicture * info(bool iBackgroundIsBlack)
void respondToOpenInputFile(FileBlock const &fb)
Definition: SubProcess.cc:791
SelectedProductsForBranchType const & keptProducts() const
Definition: SubProcess.h:80
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
std::vector< ModuleDescription const * > const & allModules() const
EventAuxiliary const & aux() const
std::unique_ptr< ParameterSet > popParameterSet(std::string const &name)
std::vector< std::string > const & getAllTriggerNames()
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
ProductList const & productList() const
LuminosityBlockPrincipal & principal()
void clearLumiPrincipal(LuminosityBlockPrincipal &)
Definition: SubProcess.cc:676
bool parentProducedProductIsKept(Principal const &parentPrincipal, Principal &principal) const
Definition: SubProcess.cc:766
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
Definition: SubProcess.h:283
void clearProcessBlockPrincipal(ProcessBlockType)
Definition: SubProcess.cc:617
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: SubProcess.h:273
ProductRegistry const & productRegistry() const
Definition: Principal.h:146
std::vector< SubProcess > subProcesses_
Definition: SubProcess.h:291
edm::propagate_const< std::shared_ptr< SubProcessBlockHelper > > processBlockHelper_
Definition: SubProcess.h:272
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: SubProcess.h:256
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
Definition: SubProcess.cc:591
void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &, bool iPrefetchMayGet)
RunPrincipal const & runPrincipal() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
Definition: SubProcess.h:278
bool exists(std::string const &parameterName) const
checks if a parameter exists
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &, bool cleaningUpAfterException)
Definition: SubProcess.cc:712
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< ModuleProcessName > const & modulesInPreviousProcessesWhoseProductsAreConsumedBy(unsigned int moduleID) const
void updateBranchIDListHelper(BranchIDLists const &)
Definition: SubProcess.cc:784
LuminosityBlockPrincipal const & luminosityBlockPrincipal() const
void processAsync(WaitingTaskHolder iHolder, EventPrincipal const &e, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
Definition: SubProcess.cc:417
std::string_view moduleLabel() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void doEndStream(unsigned int)
Definition: SubProcess.cc:692
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
void initialize(ProductSelectorRules const &rules, std::vector< BranchDescription const *> const &branchDescriptions)
void setParentProcessContext(ProcessContext const *parentProcessContext)
StreamID streamID() const
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
Definition: SubProcess.h:252
constexpr auto then(O &&iO)
Definition: chain_first.h:277
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: SubProcess.h:290
void doEndProcessBlockAsync(WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
Definition: SubProcess.cc:493
BranchType
Definition: BranchType.h:11
std::vector< EventSelectionID > EventSelectionIDVector
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
EventToProcessBlockIndexes const & eventToProcessBlockIndexes() const
std::vector< std::shared_ptr< RunPrincipal > > inUseRunPrincipals_
Definition: SubProcess.h:287
EDGetTokenT< ProductType > consumes(edm::InputTag const &tag)
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: SubProcess.h:260
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
bool anyProductProduced() const
ProcessBlockPrincipal & processBlockPrincipal() const
T getUntrackedParameter(std::string const &, T const &) const
void doEventAsync(WaitingTaskHolder iHolder, EventPrincipal const &principal, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
Definition: SubProcess.cc:401
virtual void connectTo(ProductResolverBase const &, Principal const *)=0
ServiceToken serviceToken_
Definition: SubProcess.h:268
std::vector< BranchListIndex > BranchListIndexes
std::vector< std::shared_ptr< LuminosityBlockPrincipal > > inUseLumiPrincipals_
Definition: SubProcess.h:288
ParameterSet const & registerIt()
void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const &iTransitionInfo)
Definition: SubProcess.cc:625
std::tuple< layerClusterToCaloParticle, caloParticleToLayerCluster > association
~SubProcess() override
Definition: SubProcess.cc:232
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
static void checkForDuplicateKeptBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const *> &trueBranchIDToKeptBranchDesc)
unsigned int value_type
Definition: BranchID.h:16
SelectedProductsForBranchType keptProducts_
Definition: SubProcess.h:298
std::vector< std::shared_ptr< const EventSetupImpl > > const * eventSetupImpls() const
void selectProducts(ProductRegistry const &preg, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, std::map< BranchID, bool > &keepAssociation)
Definition: SubProcess.cc:322
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
static void fillDroppedToKept(ProductRegistry const &preg, std::map< BranchID, BranchDescription const *> const &trueBranchIDToKeptBranchDesc, std::map< BranchID::value_type, BranchID::value_type > &droppedBranchIDToKeptBranchID_)
int merge(int argc, char *argv[])
Definition: DMRmerge.cc:37
std::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &, ActivityRegistry *, ParameterSet const *eventSetupPset=nullptr, unsigned int maxConcurrentIOVs=0, bool dumpOptions=false)
ProductSelectorRules productSelectorRules_
Definition: SubProcess.h:299
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
Definition: SubProcess.h:289
edm::propagate_const< std::unique_ptr< ParameterSet > > processParameterSet_
Definition: SubProcess.h:292
RunIndex index() const
Definition: RunPrincipal.h:56
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector, ConsumesCollector &&iC)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
void propagateProducts(BranchType type, Principal const &parentPrincipal, Principal &principal) const
Definition: SubProcess.cc:751
bool selected(BranchDescription const &desc) const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::vector< std::shared_ptr< const EventSetupImpl > > const * eventSetupImpls() const
void writeProcessBlockAsync(edm::WaitingTaskHolder task, ProcessBlockType)
Definition: SubProcess.cc:577
Log< level::Info, false > LogInfo
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: SubProcess.h:275
SubProcess(ParameterSet &parameterSet, ParameterSet const &topLevelParameterSet, std::shared_ptr< ProductRegistry const > parentProductRegistry, std::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, ProcessBlockHelperBase const &parentProcessBlockHelper, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, SubProcessParentageHelper const &parentSubProcessParentageHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext)
Definition: SubProcess.cc:51
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:798
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
Definition: SubProcess.h:309
ProductSelector productSelector_
Definition: SubProcess.h:300
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
detail::TriggerResultsBasedEventSelector selectors_
Definition: SubProcess.h:305
std::string const & processName() const
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &)
Definition: SubProcess.cc:727
ProcessHistory const & processHistory() const
Definition: Principal.h:140
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &)
Definition: SubProcess.cc:698
void connectToSubProcess(ActivityRegistry &iOther)
std::shared_ptr< ActivityRegistry > actReg_
Definition: SubProcess.h:267
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
ProductProvenanceRetriever const * productProvenanceRetrieverPtr() const
void doEndRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
Definition: SubProcess.cc:563
void doBeginRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const &iTransitionInfo)
Definition: SubProcess.cc:541
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: SubProcess.h:271
ProcessBlockPrincipal & principal()
std::vector< HistoryAppender > historyAppenders_
Definition: SubProcess.h:284
std::vector< BranchID::value_type > BranchIDList
Definition: BranchIDList.h:18
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< ParameterSet > popVParameterSet(std::string const &name)
HLT enums.
void selectAssociationProducts(std::vector< BranchDescription const *> const &associationDescriptions, std::set< BranchID > const &keptProductsInEvent, std::map< BranchID, bool > &keepAssociation) const
void clearRunPrincipal(RunPrincipal &)
Definition: SubProcess.cc:608
void doBeginStream(unsigned int)
Definition: SubProcess.cc:686
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< ProductRegistry const > parentPreg_
Definition: SubProcess.h:269
LuminosityBlockIndex index() const
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
Definition: SubProcess.cc:383
ProcessHistoryID const & processHistoryID() const
Definition: Principal.h:142
EventSelectionIDVector const & eventSelectionIDs() const
bool initialized() const
ProcessContext processContext_
Definition: SubProcess.h:277
RunAuxiliary const & aux() const
Definition: RunPrincipal.h:59
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
LuminosityBlockAuxiliary const & aux() const
unsigned int value() const
Definition: StreamID.h:43
std::vector< ModuleProcessName > keepOnlyConsumedUnscheduledModules(bool deleteModules)
Definition: SubProcess.cc:234
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
BranchListIndexes const & branchListIndexes() const
tmp
align.sh
Definition: createJobs.py:716
void keepThisBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const *> &trueBranchIDToKeptBranchDesc, std::set< BranchID > &keptProductsInEvent)
Definition: SubProcess.cc:364
RunPrincipal & principal()
void doEndLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
Definition: SubProcess.cc:647
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
Definition: SubProcess.cc:661
std::shared_ptr< ProductRegistry const > preg_
Definition: SubProcess.h:270
PrincipalCache principalCache_
Definition: SubProcess.h:285
def move(src, dest)
Definition: eostools.py:511
edm::propagate_const< std::shared_ptr< SubProcessParentageHelper > > subProcessParentageHelper_
Definition: SubProcess.h:274
DelayedReader * reader() const
Definition: Principal.h:187
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Definition: SubProcess.h:276
ConstProductResolverPtr getProductResolver(BranchID const &oid) const
Definition: Principal.cc:562