CMS 3D CMS Logo

SubProcess.cc
Go to the documentation of this file.
2 
42 
43 #include "boost/range/adaptor/reversed.hpp"
44 
45 #include <cassert>
46 #include <exception>
47 #include <string>
48 
49 namespace edm {
50 
52  ParameterSet const& topLevelParameterSet,
53  std::shared_ptr<ProductRegistry const> parentProductRegistry,
54  std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
55  ProcessBlockHelperBase const& parentProcessBlockHelper,
56  ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
57  SubProcessParentageHelper const& parentSubProcessParentageHelper,
59  ActivityRegistry& parentActReg,
60  ServiceToken const& token,
62  PreallocationConfiguration const& preallocConfig,
63  ProcessContext const* parentProcessContext)
64  : EDConsumerBase(),
65  serviceToken_(),
66  parentPreg_(parentProductRegistry),
67  preg_(),
68  branchIDListHelper_(),
69  act_table_(),
70  processConfiguration_(),
71  historyLumiOffset_(preallocConfig.numberOfStreams()),
72  historyRunOffset_(historyLumiOffset_ + preallocConfig.numberOfLuminosityBlocks()),
73  processHistoryRegistries_(historyRunOffset_ + preallocConfig.numberOfRuns()),
74  historyAppenders_(historyRunOffset_ + preallocConfig.numberOfRuns()),
75  principalCache_(),
76  esp_(),
77  schedule_(),
78  parentToChildPhID_(),
79  subProcesses_(),
80  processParameterSet_(),
81  productSelectorRules_(parameterSet, "outputCommands", "OutputModule"),
82  productSelector_(),
83  wantAllEvents_(true) {
84  //Setup the event selection
86 
87  ParameterSet selectevents = parameterSet.getUntrackedParameterSet("SelectEvents", ParameterSet());
88 
89  selectevents.registerIt(); // Just in case this PSet is not registered
91  selectevents, tns->getProcessName(), getAllTriggerNames(), selectors_, consumesCollector());
92  std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
94  selectevents, "", outputModulePathPositions, parentProductRegistry->anyProductProduced());
95 
96  std::map<BranchID, bool> keepAssociation;
97  selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
98 
99  std::string const maxEvents("maxEvents");
100  std::string const maxLumis("maxLuminosityBlocks");
101 
102  // propagate_const<T> has no reset() function
104  std::unique_ptr<ParameterSet>(parameterSet.popParameterSet(std::string("process")).release());
105 
106  // if this process has a maxEvents or maxLuminosityBlocks parameter set, remove them.
107  if (processParameterSet_->exists(maxEvents)) {
108  processParameterSet_->popParameterSet(maxEvents);
109  }
110  if (processParameterSet_->exists(maxLumis)) {
111  processParameterSet_->popParameterSet(maxLumis);
112  }
113 
114  // if the top level process has a maxEvents or maxLuminosityBlocks parameter set, add them to this process.
115  if (topLevelParameterSet.exists(maxEvents)) {
116  processParameterSet_->addUntrackedParameter<ParameterSet>(
117  maxEvents, topLevelParameterSet.getUntrackedParameterSet(maxEvents));
118  }
119  if (topLevelParameterSet.exists(maxLumis)) {
120  processParameterSet_->addUntrackedParameter<ParameterSet>(
121  maxLumis, topLevelParameterSet.getUntrackedParameterSet(maxLumis));
122  }
123 
124  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
125  auto subProcessVParameterSet = popSubProcessVParameterSet(*processParameterSet_);
126  bool hasSubProcesses = subProcessVParameterSet.size() != 0ull;
127 
128  // Validates the parameters in the 'options', 'maxEvents', and 'maxLuminosityBlocks'
129  // top level parameter sets. Default values are also set in here if the
130  // parameters were not explicitly set.
132 
133  processBlockHelper_ = std::make_shared<SubProcessBlockHelper>();
134 
135  ScheduleItems items(*parentProductRegistry, *this, *processBlockHelper_, parentProcessBlockHelper);
136  actReg_ = items.actReg_;
137 
138  //initialize the services
139  ServiceToken iToken;
140 
141  // get any configured services.
142  auto serviceSets = processParameterSet_->popVParameterSet(std::string("services"));
143 
144  ServiceToken newToken = items.initServices(serviceSets, *processParameterSet_, token, iLegacy, false);
145  parentActReg.connectToSubProcess(*items.actReg_);
146  serviceToken_ = items.addCPRandTNS(*processParameterSet_, newToken);
147 
148  //make the services available
150 
151  // intialize miscellaneous items
152  items.initMisc(*processParameterSet_);
153 
154  // intialize the event setup provider
155  esp_ = esController.makeProvider(*processParameterSet_, actReg_.get());
156 
157  branchIDListHelper_ = items.branchIDListHelper();
158  updateBranchIDListHelper(parentBranchIDListHelper->branchIDLists());
159 
160  thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
161  thinnedAssociationsHelper_->updateFromParentProcess(
162  parentThinnedAssociationsHelper, keepAssociation, droppedBranchIDToKeptBranchID_);
163 
164  // intialize the Schedule
165  schedule_ = items.initSchedule(
166  *processParameterSet_, hasSubProcesses, preallocConfig, &processContext_, *processBlockHelper_);
167 
168  // set the items
169  act_table_ = std::move(items.act_table_);
170  preg_ = items.preg();
171 
172  subProcessParentageHelper_ = items.subProcessParentageHelper();
173  subProcessParentageHelper_->update(parentSubProcessParentageHelper, *parentProductRegistry);
174 
175  //CMS-THREADING this only works since Run/Lumis are synchronous so when principalCache asks for
176  // the reducedProcessHistoryID from a full ProcessHistoryID that registry will not be in use by
177  // another thread. We really need to change how this is done in the PrincipalCache.
179 
180  processConfiguration_ = items.processConfiguration();
182  processContext_.setParentProcessContext(parentProcessContext);
183 
185  for (unsigned int index = 0; index < preallocConfig.numberOfStreams(); ++index) {
186  auto ep = std::make_shared<EventPrincipal>(preg_,
191  index,
192  false /*not primary process*/,
195  }
196  for (unsigned int index = 0; index < preallocConfig.numberOfLuminosityBlocks(); ++index) {
197  auto lbpp = std::make_unique<LuminosityBlockPrincipal>(
200  }
201 
202  {
203  auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_, false);
205 
206  auto pbForInput = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_, false);
208  }
209 
210  inUseLumiPrincipals_.resize(preallocConfig.numberOfLuminosityBlocks());
211 
212  subProcesses_.reserve(subProcessVParameterSet.size());
213  for (auto& subProcessPSet : subProcessVParameterSet) {
214  subProcesses_.emplace_back(subProcessPSet,
215  topLevelParameterSet,
216  preg_,
221  esController,
222  *items.actReg_,
223  newToken,
224  iLegacy,
225  preallocConfig,
226  &processContext_);
227  }
228  }
229 
231 
232  std::vector<ModuleProcessName> SubProcess::keepOnlyConsumedUnscheduledModules(bool deleteModules) {
233  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
235 
236  // Note: all these may throw
238 
239  // Consumes information from the child SubProcesses
240  std::vector<ModuleProcessName> consumedByChildren;
241  for_all(subProcesses_, [&consumedByChildren, deleteModules](auto& subProcess) {
242  auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
243  if (consumedByChildren.empty()) {
244  std::swap(consumedByChildren, c);
245  } else if (not c.empty()) {
246  std::vector<ModuleProcessName> tmp;
247  tmp.reserve(consumedByChildren.size() + c.size());
248  std::merge(consumedByChildren.begin(), consumedByChildren.end(), c.begin(), c.end(), std::back_inserter(tmp));
249  std::swap(consumedByChildren, tmp);
250  }
251  });
252 
253  // Non-consumed unscheduled modules in this SubProcess, take into account of the consumes from child SubProcesses
254  if (deleteModules) {
255  if (auto const unusedModules = nonConsumedUnscheduledModules(pathsAndConsumesOfModules_, consumedByChildren);
256  not unusedModules.empty()) {
258 
259  edm::LogInfo("DeleteModules").log([&unusedModules, this](auto& l) {
260  l << "Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
261  "and "
262  "therefore they are deleted from SubProcess "
263  << processConfiguration_->processName() << " before beginJob transition.";
264  for (auto const& description : unusedModules) {
265  l << "\n " << description->moduleLabel();
266  }
267  });
268  for (auto const& description : unusedModules) {
269  schedule_->deleteModule(description->moduleLabel(), actReg_.get());
270  }
271  }
272  }
273 
274  // Products possibly consumed from the parent (Sub)Process
275  for (auto const& description : pathsAndConsumesOfModules_.allModules()) {
276  for (auto const& dep :
278  auto it = std::lower_bound(consumedByChildren.begin(),
279  consumedByChildren.end(),
280  ModuleProcessName{dep.moduleLabel(), dep.processName()});
281  consumedByChildren.emplace(it, dep.moduleLabel(), dep.processName());
282  }
283  }
284  return consumedByChildren;
285  }
286 
287  void SubProcess::doBeginJob() { this->beginJob(); }
288 
290 
292  // If event selection is being used, the SubProcess class reads TriggerResults
293  // object(s) in the parent process from the event. This next call is needed for
294  // getByToken to work properly. Normally, this is done by the worker, but since
295  // a SubProcess is not a module, it has no worker.
296  updateLookup(InEvent, *parentPreg_->productLookup(InEvent), false);
297 
298  if (!droppedBranchIDToKeptBranchID().empty()) {
300  }
302  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
303  schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
304  for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); });
305  }
306 
310  "Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
311  schedule_->endJob(c);
312  for (auto& subProcess : subProcesses_) {
313  c.call([&subProcess]() { subProcess.doEndJob(); });
314  }
315  if (c.hasThrown()) {
316  c.rethrow();
317  }
318  }
319 
321  ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
322  std::map<BranchID, bool>& keepAssociation) {
324  return;
326 
327  // TODO: See if we can collapse keptProducts_ and productSelector_ into a
328  // single object. See the notes in the header for ProductSelector
329  // for more information.
330 
331  std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
332  std::vector<BranchDescription const*> associationDescriptions;
333  std::set<BranchID> keptProductsInEvent;
334 
335  for (auto const& it : preg.productList()) {
336  BranchDescription const& desc = it.second;
337  if (desc.transient()) {
338  // if the class of the branch is marked transient, output nothing
339  } else if (!desc.present() && !desc.produced()) {
340  // else if the branch containing the product has been previously dropped,
341  // output nothing
342  } else if (desc.unwrappedType() == typeid(ThinnedAssociation)) {
343  associationDescriptions.push_back(&desc);
344  } else if (productSelector_.selected(desc)) {
345  keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
346  }
347  }
348 
349  parentThinnedAssociationsHelper.selectAssociationProducts(
350  associationDescriptions, keptProductsInEvent, keepAssociation);
351 
352  for (auto association : associationDescriptions) {
353  if (keepAssociation[association->branchID()]) {
354  keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
355  }
356  }
357 
358  // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept.
359  ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);
360  }
361 
363  std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
364  std::set<BranchID>& keptProductsInEvent) {
365  ProductSelector::checkForDuplicateKeptBranch(desc, trueBranchIDToKeptBranchDesc);
366 
367  if (desc.branchType() == InEvent) {
368  if (desc.produced()) {
369  keptProductsInEvent.insert(desc.originalBranchID());
370  } else {
371  keptProductsInEvent.insert(desc.branchID());
372  }
373  }
374  EDGetToken token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
375  InputTag{desc.moduleLabel(), desc.productInstanceName(), desc.processName()});
376 
377  // Now put it in the list of selected branches.
378  keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token));
379  }
380 
382  std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID) {
383  // Check for branches dropped while an EDAlias was kept.
384  // Replace BranchID of each dropped branch with that of the kept alias.
385  for (BranchIDList& branchIDList : branchIDListHelper_->mutableBranchIDLists()) {
386  for (BranchID::value_type& branchID : branchIDList) {
387  std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
388  droppedBranchIDToKeptBranchID.find(branchID);
389  if (iter != droppedBranchIDToKeptBranchID.end()) {
390  branchID = iter->second;
391  }
392  }
393  }
394  for_all(subProcesses_, [&droppedBranchIDToKeptBranchID](auto& subProcess) {
395  subProcess.fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID);
396  });
397  }
398 
400  EventPrincipal const& ep,
401  std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls) {
403  /* BEGIN relevant bits from OutputModule::doEvent */
404  if (!wantAllEvents_) {
405  EventForOutput e(ep, ModuleDescription(), nullptr);
406  e.setConsumer(this);
407  if (!selectors_.wantEvent(e)) {
408  return;
409  }
410  }
411  processAsync(std::move(iHolder), ep, iEventSetupImpls);
412  /* END relevant bits from OutputModule::doEvent */
413  }
414 
416  EventPrincipal const& principal,
417  std::vector<std::shared_ptr<const EventSetupImpl>> const* iEventSetupImpls) {
418  EventAuxiliary aux(principal.aux());
419  aux.setProcessHistoryID(principal.processHistoryID());
420 
421  EventSelectionIDVector esids{principal.eventSelectionIDs()};
422  if (principal.productRegistry().anyProductProduced() || !wantAllEvents_) {
423  esids.push_back(selector_config_id_);
424  }
425 
427  auto& processHistoryRegistry = processHistoryRegistries_[principal.streamID().value()];
428  processHistoryRegistry.registerProcessHistory(principal.processHistory());
429  BranchListIndexes bli(principal.branchListIndexes());
430  branchIDListHelper_->fixBranchListIndexes(bli);
431  bool deepCopyRetriever = false;
432  ep.fillEventPrincipal(
433  aux,
434  &principal.processHistory(),
435  std::move(esids),
436  std::move(bli),
437  principal.eventToProcessBlockIndexes(),
438  *(principal.productProvenanceRetrieverPtr()), //NOTE: this transfers the per product provenance
439  principal.reader(),
440  deepCopyRetriever);
441  ep.setLuminosityBlockPrincipal(inUseLumiPrincipals_[principal.luminosityBlockPrincipal().index()].get());
442  propagateProducts(InEvent, principal, ep);
443 
444  using namespace edm::waiting_task;
445  chain::first([&](auto nextTask) {
446  EventTransitionInfo info(ep, *((*iEventSetupImpls)[esp_->subProcessIndex()]));
447  schedule_->processOneEventAsync(std::move(nextTask), ep.streamID().value(), info, serviceToken_);
448  }) | chain::ifThen(not subProcesses_.empty(), [this, &ep, iEventSetupImpls](auto nextTask) {
449  for (auto& subProcess : boost::adaptors::reverse(subProcesses_)) {
450  subProcess.doEventAsync(nextTask, ep, iEventSetupImpls);
451  }
452  }) | chain::then([&ep](std::exception_ptr const* iPtr, auto nextTask) {
453  ep.clearEventPrincipal();
454  if (iPtr) {
455  nextTask.doneWaiting(*iPtr);
456  }
457  }) | chain::runLast(std::move(iHolder));
458  }
459 
460  template <>
461  void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>>(
462  WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const& iTransitionInfo, bool cleaningUpAfterException) {
464 
466  ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
467  processBlockPrincipal.fillProcessBlockPrincipal(parentPrincipal.processName(), parentPrincipal.reader());
468  propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
469 
470  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
472  beginGlobalTransitionAsync<Traits>(
473  std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
474  }
475 
476  template <>
477  void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>>(
478  WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const& iTransitionInfo, bool) {
480 
482  ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
483  processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
484  propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
485 
486  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
488  beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
489  }
490 
492  ProcessBlockTransitionInfo const& iTransitionInfo,
493  bool cleaningUpAfterException) {
495  ProcessBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
496  propagateProducts(InProcess, parentPrincipal, processBlockPrincipal);
497 
498  ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
499 
500  if (parentProducedProductIsKept(parentPrincipal, processBlockPrincipal)) {
501  ProcessBlockPrincipal& inputProcessBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
502  inputProcessBlockPrincipal.fillProcessBlockPrincipal(parentPrincipal.processName(), parentPrincipal.reader());
503  propagateProducts(InProcess, parentPrincipal, inputProcessBlockPrincipal);
504  ProcessBlockTransitionInfo inputTransitionInfo(inputProcessBlockPrincipal);
505 
506  using namespace edm::waiting_task;
507  chain::first([&](const std::exception_ptr*, auto nextTask) {
509  beginGlobalTransitionAsync<TraitsInput>(std::move(nextTask),
510  *schedule_,
511  inputTransitionInfo,
514  cleaningUpAfterException);
515  }) | chain::then([this](auto nextTask) { writeProcessBlockAsync(nextTask, ProcessBlockType::Input); }) |
516  chain::then([this, info = transitionInfo, cleaningUpAfterException](std::exception_ptr const* iPtr,
517  auto nextTask) mutable {
518  ProcessBlockPrincipal& inputProcessBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
519  inputProcessBlockPrincipal.clearPrincipal();
520  for (auto& s : subProcesses_) {
521  s.clearProcessBlockPrincipal(ProcessBlockType::Input);
522  }
523  if (iPtr) {
524  nextTask.doneWaiting(*iPtr);
525  } else {
527  endGlobalTransitionAsync<Traits>(
528  std::move(nextTask), *schedule_, info, serviceToken_, subProcesses_, cleaningUpAfterException);
529  }
530  }) |
531  chain::runLast(std::move(iHolder));
532  } else {
534  endGlobalTransitionAsync<Traits>(
535  std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
536  }
537  }
538 
539  void SubProcess::doBeginRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const& iTransitionInfo) {
541 
542  RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
543  auto aux = std::make_shared<RunAuxiliary>(parentPrincipal.aux());
544  aux->setProcessHistoryID(parentPrincipal.processHistoryID());
545  auto rpp = std::make_shared<RunPrincipal>(aux,
546  preg_,
548  &(historyAppenders_[historyRunOffset_ + parentPrincipal.index()]),
549  parentPrincipal.index(),
550  false);
551  auto& processHistoryRegistry = processHistoryRegistries_[historyRunOffset_ + parentPrincipal.index()];
552  processHistoryRegistry.registerProcessHistory(parentPrincipal.processHistory());
553  rpp->fillRunPrincipal(processHistoryRegistry, parentPrincipal.reader());
554  principalCache_.insert(rpp);
555 
556  ProcessHistoryID const& parentInputReducedPHID = parentPrincipal.reducedProcessHistoryID();
557  ProcessHistoryID const& inputReducedPHID = rpp->reducedProcessHistoryID();
558 
559  parentToChildPhID_.insert(std::make_pair(parentInputReducedPHID, inputReducedPHID));
560 
562  propagateProducts(InRun, parentPrincipal, rp);
563 
564  RunTransitionInfo transitionInfo(rp, esp_->eventSetupImpl());
566  beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
567  }
568 
570  RunTransitionInfo const& iTransitionInfo,
571  bool cleaningUpAfterException) {
572  RunPrincipal const& parentPrincipal = iTransitionInfo.principal();
574  propagateProducts(InRun, parentPrincipal, rp);
575 
576  RunTransitionInfo transitionInfo(rp, esp_->eventSetupImpl());
578  endGlobalTransitionAsync<Traits>(
579  std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
580  }
581 
583  using namespace edm::waiting_task;
584  chain::first([&](std::exception_ptr const*, auto nextTask) {
586  schedule_->writeProcessBlockAsync(
587  nextTask, principalCache_.processBlockPrincipal(processBlockType), &processContext_, actReg_.get());
588  }) | chain::ifThen(not subProcesses_.empty(), [this, processBlockType](auto nextTask) {
590  for (auto& s : subProcesses_) {
591  s.writeProcessBlockAsync(nextTask, processBlockType);
592  }
594  }
595 
597  ProcessHistoryID const& parentPhID,
598  int runNumber,
599  MergeableRunProductMetadata const* mergeableRunProductMetadata) {
601  std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
602  assert(it != parentToChildPhID_.end());
603  auto const& childPhID = it->second;
604 
605  using namespace edm::waiting_task;
606  chain::first([&](std::exception_ptr const*, auto nextTask) {
608  schedule_->writeRunAsync(nextTask,
611  actReg_.get(),
612  mergeableRunProductMetadata);
613  }) |
614  chain::ifThen(not subProcesses_.empty(),
615  [this, childPhID, runNumber, mergeableRunProductMetadata](auto nextTask) {
616  ServiceRegistry::Operate operateWriteRun(serviceToken_);
617  for (auto& s : subProcesses_) {
618  s.writeRunAsync(nextTask, childPhID, runNumber, mergeableRunProductMetadata);
619  }
620  }) |
622  }
623 
625  std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
626  assert(it != parentToChildPhID_.end());
627  auto const& childPhID = it->second;
628  principalCache_.deleteRun(childPhID, runNumber);
630  [&childPhID, runNumber](auto& subProcess) { subProcess.deleteRunFromCache(childPhID, runNumber); });
631  }
632 
634  ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal(processBlockType);
635  processBlockPrincipal.clearPrincipal();
636  for (auto& s : subProcesses_) {
637  s.clearProcessBlockPrincipal(processBlockType);
638  }
639  }
640 
643 
644  LuminosityBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
645  auto aux = parentPrincipal.aux();
646  aux.setProcessHistoryID(parentPrincipal.processHistoryID());
648  lbpp->setAux(aux);
649  auto& processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_ + lbpp->index()];
650  inUseLumiPrincipals_[parentPrincipal.index()] = lbpp;
651  processHistoryRegistry.registerProcessHistory(parentPrincipal.processHistory());
652  lbpp->fillLuminosityBlockPrincipal(&parentPrincipal.processHistory(), parentPrincipal.reader());
653  lbpp->setRunPrincipal(principalCache_.runPrincipalPtr());
654  LuminosityBlockPrincipal& lbp = *lbpp;
655  propagateProducts(InLumi, parentPrincipal, lbp);
656 
657  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
658  LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
660  beginGlobalTransitionAsync<Traits>(std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
661  }
662 
664  LumiTransitionInfo const& iTransitionInfo,
665  bool cleaningUpAfterException) {
666  LuminosityBlockPrincipal const& parentPrincipal = iTransitionInfo.principal();
667  LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[parentPrincipal.index()];
668  propagateProducts(InLumi, parentPrincipal, lbp);
669 
670  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
671  LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
673  endGlobalTransitionAsync<Traits>(
674  std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
675  }
676 
678  using namespace edm::waiting_task;
679 
680  auto l = inUseLumiPrincipals_[principal.index()];
681  chain::first([&](std::exception_ptr const*, auto nextTask) {
683  schedule_->writeLumiAsync(nextTask, *l, &processContext_, actReg_.get());
684  }) | chain::ifThen(not subProcesses_.empty(), [this, l](auto nextTask) {
685  ServiceRegistry::Operate operateWriteLumi(serviceToken_);
686  for (auto& s : subProcesses_) {
687  s.writeLumiAsync(nextTask, *l);
688  }
690  }
691 
693  //release from list but stay around till end of routine
694  auto lb = std::move(inUseLumiPrincipals_[principal.index()]);
695  for (auto& s : subProcesses_) {
696  s.deleteLumiFromCache(*lb);
697  }
698  lb->clearPrincipal();
699  }
700 
701  void SubProcess::doBeginStream(unsigned int iID) {
703  schedule_->beginStream(iID);
704  for_all(subProcesses_, [iID](auto& subProcess) { subProcess.doBeginStream(iID); });
705  }
706 
707  void SubProcess::doEndStream(unsigned int iID) {
709  schedule_->endStream(iID);
710  for_all(subProcesses_, [iID](auto& subProcess) { subProcess.doEndStream(iID); });
711  }
712 
715 
717 
718  RunTransitionInfo transitionInfo(rp, esp_->eventSetupImpl());
719  beginStreamTransitionAsync<Traits>(
720  std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_);
721  }
722 
724  unsigned int id,
725  RunTransitionInfo const&,
726  bool cleaningUpAfterException) {
729 
730  RunTransitionInfo transitionInfo(rp, esp_->eventSetupImpl());
731  endStreamTransitionAsync<Traits>(
732  std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
733  }
734 
736  unsigned int id,
737  LumiTransitionInfo const& iTransitionInfo) {
739 
740  LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[iTransitionInfo.principal().index()];
741  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
742  LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
743  beginStreamTransitionAsync<Traits>(
744  std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_);
745  }
746 
748  unsigned int id,
749  LumiTransitionInfo const& iTransitionInfo,
750  bool cleaningUpAfterException) {
751  LuminosityBlockPrincipal& lbp = *inUseLumiPrincipals_[iTransitionInfo.principal().index()];
753  std::vector<std::shared_ptr<const EventSetupImpl>> const* eventSetupImpls = iTransitionInfo.eventSetupImpls();
754  LumiTransitionInfo transitionInfo(lbp, *((*eventSetupImpls)[esp_->subProcessIndex()]), eventSetupImpls);
755  endStreamTransitionAsync<Traits>(
756  std::move(iHolder), *schedule_, id, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
757  }
758 
759  void SubProcess::propagateProducts(BranchType type, Principal const& parentPrincipal, Principal& principal) const {
760  SelectedProducts const& keptVector = keptProducts()[type];
761  for (auto const& item : keptVector) {
762  BranchDescription const& desc = *item.first;
763  ProductResolverBase const* parentProductResolver = parentPrincipal.getProductResolver(desc.branchID());
764  if (parentProductResolver != nullptr) {
765  ProductResolverBase* productResolver = principal.getModifiableProductResolver(desc.branchID());
766  if (productResolver != nullptr) {
767  //Propagate the per event(run)(lumi)(processBlock) data for this product to the subprocess.
768  productResolver->connectTo(*parentProductResolver, &parentPrincipal);
769  }
770  }
771  }
772  }
773 
774  bool SubProcess::parentProducedProductIsKept(Principal const& parentPrincipal, Principal& principal) const {
775  SelectedProducts const& keptVector = keptProducts()[InProcess];
776  for (auto const& item : keptVector) {
777  BranchDescription const& desc = *item.first;
778  assert(desc.branchType() == InProcess);
779  ProductResolverBase const* parentProductResolver = parentPrincipal.getProductResolver(desc.branchID());
780  if (parentProductResolver != nullptr) {
781  ProductResolverBase* productResolver = principal.getModifiableProductResolver(desc.branchID());
782  if (productResolver != nullptr) {
783  if (parentProductResolver->branchDescription().produced()) {
784  return true;
785  }
786  }
787  }
788  }
789  return false;
790  }
791 
793  branchIDListHelper_->updateFromParent(branchIDLists);
795  [this](auto& subProcess) { subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
796  }
797 
798  // Call respondToOpenInputFile() on all Modules
801  schedule_->respondToOpenInputFile(fb);
802  for_all(subProcesses_, [&fb](auto& subProcess) { subProcess.respondToOpenInputFile(fb); });
803  }
804 
805  // free function
807  std::vector<std::string> subProcesses =
808  parameterSet.getUntrackedParameter<std::vector<std::string>>("@all_subprocesses");
809  if (!subProcesses.empty()) {
810  return parameterSet.popVParameterSet("subProcesses");
811  }
812  return {};
813  }
814 } // namespace edm
unsigned int historyRunOffset_
Definition: SubProcess.h:285
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &, bool cleaningUpAfterException)
Definition: SubProcess.cc:747
unsigned int historyLumiOffset_
Definition: SubProcess.h:284
ParameterSetID selector_config_id_
Definition: SubProcess.h:307
std::vector< BranchDescription const * > allBranchDescriptions() const
ProductResolverBase * getModifiableProductResolver(BranchID const &oid)
Definition: Principal.h:153
void clearPrincipal()
Definition: Principal.cc:382
static const TGPicture * info(bool iBackgroundIsBlack)
void respondToOpenInputFile(FileBlock const &fb)
Definition: SubProcess.cc:799
SelectedProductsForBranchType const & keptProducts() const
Definition: SubProcess.h:80
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
std::vector< ModuleDescription const * > const & allModules() const
EventAuxiliary const & aux() const
std::unique_ptr< ParameterSet > popParameterSet(std::string const &name)
std::vector< std::string > const & getAllTriggerNames()
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
ProductList const & productList() const
LuminosityBlockPrincipal & principal()
bool parentProducedProductIsKept(Principal const &parentPrincipal, Principal &principal) const
Definition: SubProcess.cc:774
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
Definition: SubProcess.h:286
void clearProcessBlockPrincipal(ProcessBlockType)
Definition: SubProcess.cc:633
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: SubProcess.h:276
ProductRegistry const & productRegistry() const
Definition: Principal.h:146
std::vector< SubProcess > subProcesses_
Definition: SubProcess.h:294
edm::propagate_const< std::shared_ptr< SubProcessBlockHelper > > processBlockHelper_
Definition: SubProcess.h:275
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: SubProcess.h:259
void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &, bool iPrefetchMayGet)
PathsAndConsumesOfModules pathsAndConsumesOfModules_
Definition: SubProcess.h:281
bool exists(std::string const &parameterName) const
checks if a parameter exists
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &, bool cleaningUpAfterException)
Definition: SubProcess.cc:723
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< ModuleProcessName > const & modulesInPreviousProcessesWhoseProductsAreConsumedBy(unsigned int moduleID) const
void updateBranchIDListHelper(BranchIDLists const &)
Definition: SubProcess.cc:792
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
LuminosityBlockPrincipal const & luminosityBlockPrincipal() const
void processAsync(WaitingTaskHolder iHolder, EventPrincipal const &e, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
Definition: SubProcess.cc:415
std::string_view moduleLabel() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void doEndStream(unsigned int)
Definition: SubProcess.cc:707
void initialize(ProductSelectorRules const &rules, std::vector< BranchDescription const *> const &branchDescriptions)
void setParentProcessContext(ProcessContext const *parentProcessContext)
StreamID streamID() const
assert(be >=bs)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
Definition: Provenance.cc:11
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
Definition: SubProcess.h:255
constexpr auto then(O &&iO)
Definition: chain_first.h:277
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: SubProcess.h:292
void doEndProcessBlockAsync(WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
Definition: SubProcess.cc:491
BranchType
Definition: BranchType.h:11
std::vector< EventSelectionID > EventSelectionIDVector
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
EventToProcessBlockIndexes const & eventToProcessBlockIndexes() const
EDGetTokenT< ProductType > consumes(edm::InputTag const &tag)
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: SubProcess.h:263
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
bool anyProductProduced() const
ProcessBlockPrincipal & processBlockPrincipal() const
T getUntrackedParameter(std::string const &, T const &) const
void doEventAsync(WaitingTaskHolder iHolder, EventPrincipal const &principal, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
Definition: SubProcess.cc:399
virtual void connectTo(ProductResolverBase const &, Principal const *)=0
ServiceToken serviceToken_
Definition: SubProcess.h:271
void deleteRunFromCache(ProcessHistoryID const &parentPhID, int runNumber)
Definition: SubProcess.cc:624
std::vector< BranchListIndex > BranchListIndexes
std::vector< std::shared_ptr< LuminosityBlockPrincipal > > inUseLumiPrincipals_
Definition: SubProcess.h:290
ParameterSet const & registerIt()
void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const &iTransitionInfo)
Definition: SubProcess.cc:641
std::tuple< layerClusterToCaloParticle, caloParticleToLayerCluster > association
ProcessHistoryID const & reducedProcessHistoryID() const
Definition: RunPrincipal.h:63
~SubProcess() override
Definition: SubProcess.cc:230
auto runLast(edm::WaitingTaskHolder iTask)
Definition: chain_first.h:297
static void checkForDuplicateKeptBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const *> &trueBranchIDToKeptBranchDesc)
unsigned int value_type
Definition: BranchID.h:16
SelectedProductsForBranchType keptProducts_
Definition: SubProcess.h:301
std::vector< std::shared_ptr< const EventSetupImpl > > const * eventSetupImpls() const
void selectProducts(ProductRegistry const &preg, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, std::map< BranchID, bool > &keepAssociation)
Definition: SubProcess.cc:320
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
static void fillDroppedToKept(ProductRegistry const &preg, std::map< BranchID, BranchDescription const *> const &trueBranchIDToKeptBranchDesc, std::map< BranchID::value_type, BranchID::value_type > &droppedBranchIDToKeptBranchID_)
void deleteLumiFromCache(LuminosityBlockPrincipal &)
Definition: SubProcess.cc:692
std::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &, ActivityRegistry *, ParameterSet const *eventSetupPset=nullptr, unsigned int maxConcurrentIOVs=0, bool dumpOptions=false)
ProductSelectorRules productSelectorRules_
Definition: SubProcess.h:302
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
Definition: SubProcess.h:291
edm::propagate_const< std::unique_ptr< ParameterSet > > processParameterSet_
Definition: SubProcess.h:295
RunIndex index() const
Definition: RunPrincipal.h:57
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector, ConsumesCollector &&iC)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
Definition: chain_first.h:288
void propagateProducts(BranchType type, Principal const &parentPrincipal, Principal &principal) const
Definition: SubProcess.cc:759
bool selected(BranchDescription const &desc) const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
void writeProcessBlockAsync(edm::WaitingTaskHolder task, ProcessBlockType)
Definition: SubProcess.cc:582
Log< level::Info, false > LogInfo
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: SubProcess.h:278
SubProcess(ParameterSet &parameterSet, ParameterSet const &topLevelParameterSet, std::shared_ptr< ProductRegistry const > parentProductRegistry, std::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, ProcessBlockHelperBase const &parentProcessBlockHelper, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, SubProcessParentageHelper const &parentSubProcessParentageHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext)
Definition: SubProcess.cc:51
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:806
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
Definition: SubProcess.h:312
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &parentPhID, int runNumber, MergeableRunProductMetadata const *)
Definition: SubProcess.cc:596
ProductSelector productSelector_
Definition: SubProcess.h:303
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
detail::TriggerResultsBasedEventSelector selectors_
Definition: SubProcess.h:308
std::string const & processName() const
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &)
Definition: SubProcess.cc:735
ProcessHistory const & processHistory() const
Definition: Principal.h:140
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &)
Definition: SubProcess.cc:713
void connectToSubProcess(ActivityRegistry &iOther)
std::map< ProcessHistoryID, ProcessHistoryID > parentToChildPhID_
Definition: SubProcess.h:293
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::shared_ptr< ActivityRegistry > actReg_
Definition: SubProcess.h:270
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
ProductProvenanceRetriever const * productProvenanceRetrieverPtr() const
void doEndRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
Definition: SubProcess.cc:569
void doBeginRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const &iTransitionInfo)
Definition: SubProcess.cc:539
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: SubProcess.h:274
ProcessBlockPrincipal & principal()
std::vector< HistoryAppender > historyAppenders_
Definition: SubProcess.h:287
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::vector< BranchID::value_type > BranchIDList
Definition: BranchIDList.h:18
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< ParameterSet > popVParameterSet(std::string const &name)
HLT enums.
void selectAssociationProducts(std::vector< BranchDescription const *> const &associationDescriptions, std::set< BranchID > const &keptProductsInEvent, std::map< BranchID, bool > &keepAssociation) const
void doBeginStream(unsigned int)
Definition: SubProcess.cc:701
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< ProductRegistry const > parentPreg_
Definition: SubProcess.h:272
LuminosityBlockIndex index() const
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
Definition: SubProcess.cc:381
ProcessHistoryID const & processHistoryID() const
Definition: Principal.h:142
EventSelectionIDVector const & eventSelectionIDs() const
bool initialized() const
ProcessContext processContext_
Definition: SubProcess.h:280
RunAuxiliary const & aux() const
Definition: RunPrincipal.h:59
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
LuminosityBlockAuxiliary const & aux() const
unsigned int value() const
Definition: StreamID.h:43
std::vector< ModuleProcessName > keepOnlyConsumedUnscheduledModules(bool deleteModules)
Definition: SubProcess.cc:232
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
BranchListIndexes const & branchListIndexes() const
tmp
align.sh
Definition: createJobs.py:716
void keepThisBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const *> &trueBranchIDToKeptBranchDesc, std::set< BranchID > &keptProductsInEvent)
Definition: SubProcess.cc:362
RunPrincipal & principal()
void doEndLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
Definition: SubProcess.cc:663
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
Definition: SubProcess.cc:677
std::shared_ptr< ProductRegistry const > preg_
Definition: SubProcess.h:273
PrincipalCache principalCache_
Definition: SubProcess.h:288
def move(src, dest)
Definition: eostools.py:511
edm::propagate_const< std::shared_ptr< SubProcessParentageHelper > > subProcessParentageHelper_
Definition: SubProcess.h:277
DelayedReader * reader() const
Definition: Principal.h:187
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Definition: SubProcess.h:279
ConstProductResolverPtr getProductResolver(BranchID const &oid) const
Definition: Principal.cc:554
def merge(dictlist, TELL=False)
Definition: MatrixUtil.py:205