CMS 3D CMS Logo

SubProcess.cc
Go to the documentation of this file.
2 
35 
36 #include "boost/range/adaptor/reversed.hpp"
37 
38 #include <cassert>
39 #include <string>
40 #include <vector>
41 
42 namespace edm {
43 
45  ParameterSet const& topLevelParameterSet,
46  std::shared_ptr<ProductRegistry const> parentProductRegistry,
47  std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
48  ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
49  SubProcessParentageHelper const& parentSubProcessParentageHelper,
51  ActivityRegistry& parentActReg,
52  ServiceToken const& token,
54  PreallocationConfiguration const& preallocConfig,
55  ProcessContext const* parentProcessContext) :
57  serviceToken_(),
58  parentPreg_(parentProductRegistry),
59  preg_(),
60  branchIDListHelper_(),
61  act_table_(),
62  processConfiguration_(),
63  historyLumiOffset_(preallocConfig.numberOfStreams()),
64  historyRunOffset_(historyLumiOffset_+preallocConfig.numberOfLuminosityBlocks()),
65  processHistoryRegistries_(historyRunOffset_+ preallocConfig.numberOfRuns()),
66  historyAppenders_(historyRunOffset_+preallocConfig.numberOfRuns()),
67  principalCache_(),
68  esp_(),
69  schedule_(),
70  parentToChildPhID_(),
71  subProcesses_(),
72  processParameterSet_(),
73  productSelectorRules_(parameterSet, "outputCommands", "OutputModule"),
74  productSelector_(),
75  wantAllEvents_(true) {
76 
77  //Setup the event selection
79 
80  ParameterSet selectevents =
81  parameterSet.getUntrackedParameterSet("SelectEvents", ParameterSet());
82 
83  selectevents.registerIt(); // Just in case this PSet is not registered
85  tns->getProcessName(),
87  selectors_,
89  std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
91  "",
92  outputModulePathPositions,
93  parentProductRegistry->anyProductProduced());
94 
95  std::map<BranchID, bool> keepAssociation;
96  selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
97 
98  std::string const maxEvents("maxEvents");
99  std::string const maxLumis("maxLuminosityBlocks");
100 
101  // propagate_const<T> has no reset() function
102  processParameterSet_ = std::unique_ptr<ParameterSet>(parameterSet.popParameterSet(std::string("process")).release());
103 
104  // if this process has a maxEvents or maxLuminosityBlocks parameter set, remove them.
105  if(processParameterSet_->exists(maxEvents)) {
106  processParameterSet_->popParameterSet(maxEvents);
107  }
108  if(processParameterSet_->exists(maxLumis)) {
109  processParameterSet_->popParameterSet(maxLumis);
110  }
111 
112  // if the top level process has a maxEvents or maxLuminosityBlocks parameter set, add them to this process.
113  if(topLevelParameterSet.exists(maxEvents)) {
114  processParameterSet_->addUntrackedParameter<ParameterSet>(maxEvents, topLevelParameterSet.getUntrackedParameterSet(maxEvents));
115  }
116  if(topLevelParameterSet.exists(maxLumis)) {
117  processParameterSet_->addUntrackedParameter<ParameterSet>(maxLumis, topLevelParameterSet.getUntrackedParameterSet(maxLumis));
118  }
119 
120  // If there are subprocesses, pop the subprocess parameter sets out of the process parameter set
121  auto subProcessVParameterSet = popSubProcessVParameterSet(*processParameterSet_);
122  bool hasSubProcesses = subProcessVParameterSet.size() != 0ull;
123 
124  ScheduleItems items(*parentProductRegistry, *this);
125  actReg_ = items.actReg_;
126 
127  ParameterSet const& optionsPset(processParameterSet_->getUntrackedParameterSet("options", ParameterSet()));
128  IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
129 
130  //initialize the services
131  ServiceToken iToken;
132 
133  // get any configured services.
134  auto serviceSets = processParameterSet_->popVParameterSet(std::string("services"));
135 
136  ServiceToken newToken = items.initServices(serviceSets, *processParameterSet_, token, iLegacy, false);
137  parentActReg.connectToSubProcess(*items.actReg_);
138  serviceToken_ = items.addCPRandTNS(*processParameterSet_, newToken);
139 
140 
141  //make the services available
143 
144  // intialize miscellaneous items
146 
147  // intialize the event setup provider
148  esp_ = esController.makeProvider(*processParameterSet_);
149 
151  updateBranchIDListHelper(parentBranchIDListHelper->branchIDLists());
152 
154  thinnedAssociationsHelper_->updateFromParentProcess(parentThinnedAssociationsHelper, keepAssociation, droppedBranchIDToKeptBranchID_);
155 
156  // intialize the Schedule
157  schedule_ = items.initSchedule(*processParameterSet_,hasSubProcesses,preallocConfig,&processContext_);
158 
159  // set the items
161  preg_ = items.preg();
162 
164  subProcessParentageHelper_->update(parentSubProcessParentageHelper, *parentProductRegistry);
165 
166  //CMS-THREADING this only works since Run/Lumis are synchronous so when principalCache asks for
167  // the reducedProcessHistoryID from a full ProcessHistoryID that registry will not be in use by
168  // another thread. We really need to change how this is done in the PrincipalCache.
170 
171 
174  processContext_.setParentProcessContext(parentProcessContext);
175 
177  for(unsigned int index = 0; index < preallocConfig.numberOfStreams(); ++index) {
178  auto ep = std::make_shared<EventPrincipal>(preg_,
183  index,
184  false /*not primary process*/);
186  }
187 
188  subProcesses_.reserve(subProcessVParameterSet.size());
189  for(auto& subProcessPSet : subProcessVParameterSet) {
190  subProcesses_.emplace_back(subProcessPSet,
191  topLevelParameterSet,
192  preg_,
194  *thinnedAssociationsHelper_,
196  esController,
197  *items.actReg_,
198  newToken,
199  iLegacy,
200  preallocConfig,
201  &processContext_);
202  }
203  }
204 
206 
207  void
209  this->beginJob();
210  }
211 
212  void
214  endJob();
215  }
216 
217 
218  void
220  // If event selection is being used, the SubProcess class reads TriggerResults
221  // object(s) in the parent process from the event. This next call is needed for
222  // getByToken to work properly. Normally, this is done by the worker, but since
223  // a SubProcess is not a module, it has no worker.
224  updateLookup(InEvent, *parentPreg_->productLookup(InEvent),false);
225 
226  if(!droppedBranchIDToKeptBranchID().empty()) {
228  }
230  schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
232  //NOTE: this may throw
234  actReg_->preBeginJobSignal_(pathsAndConsumesOfModules_, processContext_);
235  schedule_->beginJob(*preg_);
236  for_all(subProcesses_, [](auto& subProcess){ subProcess.doBeginJob(); });
237  }
238 
239  void
242  ExceptionCollector c("Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
243  schedule_->endJob(c);
244  for(auto& subProcess : subProcesses_) {
245  c.call([&subProcess](){ subProcess.doEndJob();});
246  }
247  if(c.hasThrown()) {
248  c.rethrow();
249  }
250  }
251 
252  void
254  ThinnedAssociationsHelper const& parentThinnedAssociationsHelper,
255  std::map<BranchID, bool>& keepAssociation) {
256  if(productSelector_.initialized()) return;
258 
259  // TODO: See if we can collapse keptProducts_ and productSelector_ into a
260  // single object. See the notes in the header for ProductSelector
261  // for more information.
262 
263  std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
264  std::vector<BranchDescription const*> associationDescriptions;
265  std::set<BranchID> keptProductsInEvent;
266 
267  for(auto const& it : preg.productList()) {
268  BranchDescription const& desc = it.second;
269  if(desc.transient()) {
270  // if the class of the branch is marked transient, output nothing
271  } else if(!desc.present() && !desc.produced()) {
272  // else if the branch containing the product has been previously dropped,
273  // output nothing
274  } else if(desc.unwrappedType() == typeid(ThinnedAssociation)) {
275  associationDescriptions.push_back(&desc);
276  } else if(productSelector_.selected(desc)) {
277  keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
278  }
279  }
280 
281  parentThinnedAssociationsHelper.selectAssociationProducts(associationDescriptions,
282  keptProductsInEvent,
283  keepAssociation);
284 
285  for(auto association : associationDescriptions) {
286  if(keepAssociation[association->branchID()]) {
287  keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
288  }
289  }
290 
291  // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept.
292  ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);
293  }
294 
296  std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
297  std::set<BranchID>& keptProductsInEvent) {
298 
300  trueBranchIDToKeptBranchDesc);
301 
302  if(desc.branchType() == InEvent) {
303  if(desc.produced()) {
304  keptProductsInEvent.insert(desc.originalBranchID());
305  } else {
306  keptProductsInEvent.insert(desc.branchID());
307  }
308  }
310  InputTag{desc.moduleLabel(),
311  desc.productInstanceName(),
312  desc.processName()});
313 
314  // Now put it in the list of selected branches.
315  keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token));
316  }
317 
318  void
319  SubProcess::fixBranchIDListsForEDAliases(std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID) {
320  // Check for branches dropped while an EDAlias was kept.
321  // Replace BranchID of each dropped branch with that of the kept alias.
322  for(BranchIDList& branchIDList : branchIDListHelper_->mutableBranchIDLists()) {
323  for(BranchID::value_type& branchID : branchIDList) {
324  std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter = droppedBranchIDToKeptBranchID.find(branchID);
325  if(iter != droppedBranchIDToKeptBranchID.end()) {
326  branchID = iter->second;
327  }
328  }
329  }
330  for_all(subProcesses_, [&droppedBranchIDToKeptBranchID](auto& subProcess){ subProcess.fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID); });
331  }
332 
333  void
335  EventPrincipal const& ep) {
337  /* BEGIN relevant bits from OutputModule::doEvent */
338  if(!wantAllEvents_) {
339  EventForOutput e(ep, ModuleDescription(), nullptr);
340  e.setConsumer(this);
341  if(!selectors_.wantEvent(e)) {
342  return;
343  }
344  }
345  processAsync(std::move(iHolder),ep);
346  /* END relevant bits from OutputModule::doEvent */
347  }
348 
349  void
351  EventPrincipal const& principal) {
352  EventAuxiliary aux(principal.aux());
353  aux.setProcessHistoryID(principal.processHistoryID());
354 
355  EventSelectionIDVector esids{principal.eventSelectionIDs()};
356  if (principal.productRegistry().anyProductProduced() || !wantAllEvents_) {
357  esids.push_back(selector_config_id_);
358  }
359 
361  auto & processHistoryRegistry = processHistoryRegistries_[principal.streamID().value()];
362  processHistoryRegistry.registerProcessHistory(principal.processHistory());
363  BranchListIndexes bli(principal.branchListIndexes());
364  branchIDListHelper_->fixBranchListIndexes(bli);
365  bool deepCopyRetriever = false;
367  processHistoryRegistry,
368  std::move(esids),
369  std::move(bli),
370  *(principal.productProvenanceRetrieverPtr()),//NOTE: this transfers the per product provenance
371  principal.reader(),
372  deepCopyRetriever);
374  propagateProducts(InEvent, principal, ep);
375 
376  WaitingTaskHolder finalizeEventTask( make_waiting_task(tbb::task::allocate_root(),
377  [this,&ep,iHolder](std::exception_ptr const* iPtr) mutable
378  {
379  ep.clearEventPrincipal();
380  if(iPtr) {
381  iHolder.doneWaiting(*iPtr);
382  } else {
383  iHolder.doneWaiting(std::exception_ptr());
384  }
385  }
386  )
387  );
388  WaitingTaskHolder afterProcessTask;
389  if(subProcesses_.empty()) {
390  afterProcessTask = std::move(finalizeEventTask);
391  } else {
392  afterProcessTask = WaitingTaskHolder(
393  make_waiting_task(tbb::task::allocate_root(),
394  [this,&ep,finalizeEventTask] (std::exception_ptr const* iPtr) mutable{
395  if(not iPtr) {
396  for(auto& subProcess: boost::adaptors::reverse(subProcesses_)) {
397  subProcess.doEventAsync(finalizeEventTask,ep);
398  }
399  } else {
400  finalizeEventTask.doneWaiting(*iPtr);
401  }
402  })
403  );
404  }
405 
406  schedule_->processOneEventAsync(std::move(afterProcessTask),
407  ep.streamID().value(),ep, esp_->eventSetup());
408  }
409 
410  void
413  beginRun(principal,ts);
414  }
415 
416  void
418  auto aux = std::make_shared<RunAuxiliary>(principal.aux());
419  aux->setProcessHistoryID(principal.processHistoryID());
420  auto rpp = std::make_shared<RunPrincipal>(aux, preg_, *processConfiguration_, &(historyAppenders_[historyRunOffset_+principal.index()]),principal.index(),false);
421  auto & processHistoryRegistry = processHistoryRegistries_[historyRunOffset_+principal.index()];
422  processHistoryRegistry.registerProcessHistory(principal.processHistory());
423  rpp->fillRunPrincipal(processHistoryRegistry, principal.reader());
424  principalCache_.insert(rpp);
425 
426  ProcessHistoryID const& parentInputReducedPHID = principal.reducedProcessHistoryID();
427  ProcessHistoryID const& inputReducedPHID = rpp->reducedProcessHistoryID();
428 
429  parentToChildPhID_.insert(std::make_pair(parentInputReducedPHID,inputReducedPHID));
430 
432  propagateProducts(InRun, principal, rp);
434  schedule_->processOneGlobal<Traits>(rp, esp_->eventSetupForInstance(ts));
435  for_all(subProcesses_, [&rp, &ts](auto& subProcess){ subProcess.doBeginRun(rp, ts); });
436  }
437 
438  void
441 
442  auto aux = std::make_shared<RunAuxiliary>(principal.aux());
443  aux->setProcessHistoryID(principal.processHistoryID());
444  auto rpp = std::make_shared<RunPrincipal>(aux, preg_, *processConfiguration_, &(historyAppenders_[historyRunOffset_+principal.index()]),principal.index(),false);
445  auto & processHistoryRegistry = processHistoryRegistries_[historyRunOffset_+principal.index()];
446  processHistoryRegistry.registerProcessHistory(principal.processHistory());
447  rpp->fillRunPrincipal(processHistoryRegistry, principal.reader());
448  principalCache_.insert(rpp);
449 
450  ProcessHistoryID const& parentInputReducedPHID = principal.reducedProcessHistoryID();
451  ProcessHistoryID const& inputReducedPHID = rpp->reducedProcessHistoryID();
452 
453  parentToChildPhID_.insert(std::make_pair(parentInputReducedPHID,inputReducedPHID));
454 
456  propagateProducts(InRun, principal, rp);
458  beginGlobalTransitionAsync<Traits>(std::move(iHolder),
459  *schedule_,
460  rp,
461  ts,
462  esp_->eventSetupForInstance(ts),
463  subProcesses_);
464  }
465 
466 
467  void
468  SubProcess::doEndRun(RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {
470  endRun(principal,ts,cleaningUpAfterException);
471  }
472 
473  void
474  SubProcess::endRun(RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {
476  rp.setComplete();
477  propagateProducts(InRun, principal, rp);
479  rp.setAtEndTransition(true);
480  schedule_->processOneGlobal<Traits>(rp, esp_->eventSetupForInstance(ts), cleaningUpAfterException);
481  for_all(subProcesses_, [&rp, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndRun(rp, ts, cleaningUpAfterException); });
482  }
483 
484  void
486  RunPrincipal const& principal,
487  IOVSyncValue const& ts,
488  bool cleaningUpAfterException) {
490  rp.setComplete();
491  propagateProducts(InRun, principal, rp);
493  rp.setAtEndTransition(true);
494  endGlobalTransitionAsync<Traits>(std::move(iHolder),
495  *schedule_,
496  rp,
497  ts,
498  esp_->eventSetupForInstance(ts),
500  cleaningUpAfterException);
501  }
502 
503  void
506  std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
507  assert(it != parentToChildPhID_.end());
508  auto const& childPhID = it->second;
509  schedule_->writeRun(principalCache_.runPrincipal(childPhID, runNumber), &processContext_);
510  for_all(subProcesses_, [&childPhID, runNumber](auto& subProcess){ subProcess.writeRun(childPhID, runNumber); });
511  }
512 
513  void
515  std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
516  assert(it != parentToChildPhID_.end());
517  auto const& childPhID = it->second;
518  principalCache_.deleteRun(childPhID, runNumber);
519  for_all(subProcesses_, [&childPhID, runNumber](auto& subProcess){ subProcess.deleteRunFromCache(childPhID, runNumber); });
520  }
521 
522  void
525  beginLuminosityBlock(principal,ts);
526  }
527 
528  void
530  auto aux = std::make_shared<LuminosityBlockAuxiliary>(principal.aux());
531  aux->setProcessHistoryID(principal.processHistoryID());
532  auto lbpp = std::make_shared<LuminosityBlockPrincipal>(aux, preg_, *processConfiguration_, &(historyAppenders_[historyLumiOffset_+principal.index()]),principal.index(),false);
533  auto & processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_+principal.index()];
534  processHistoryRegistry.registerProcessHistory(principal.processHistory());
535  lbpp->fillLuminosityBlockPrincipal(processHistoryRegistry, principal.reader());
536  lbpp->setRunPrincipal(principalCache_.runPrincipalPtr());
537  principalCache_.insert(lbpp);
539  propagateProducts(InLumi, principal, lbp);
541  schedule_->processOneGlobal<Traits>(lbp, esp_->eventSetupForInstance(ts));
542  for_all(subProcesses_, [&lbp, &ts](auto& subProcess){ subProcess.doBeginLuminosityBlock(lbp, ts); });
543  }
544 
545  void
548 
549  auto aux = std::make_shared<LuminosityBlockAuxiliary>(principal.aux());
550  aux->setProcessHistoryID(principal.processHistoryID());
551  auto lbpp = std::make_shared<LuminosityBlockPrincipal>(aux, preg_, *processConfiguration_, &(historyAppenders_[historyLumiOffset_+principal.index()]),principal.index(),false);
552  auto & processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_+principal.index()];
553  processHistoryRegistry.registerProcessHistory(principal.processHistory());
554  lbpp->fillLuminosityBlockPrincipal(processHistoryRegistry, principal.reader());
555  lbpp->setRunPrincipal(principalCache_.runPrincipalPtr());
556  principalCache_.insert(lbpp);
558  propagateProducts(InLumi, principal, lbp);
560  beginGlobalTransitionAsync<Traits>(std::move(iHolder),
561  *schedule_,
562  lbp,
563  ts,
564  esp_->eventSetupForInstance(ts),
565  subProcesses_);
566  }
567 
568  void
569  SubProcess::doEndLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {
571  endLuminosityBlock(principal,ts,cleaningUpAfterException);
572  }
573 
574  void
575  SubProcess::endLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {
577  lbp.setComplete();
578  propagateProducts(InLumi, principal, lbp);
580  lbp.setAtEndTransition(true);
581  schedule_->processOneGlobal<Traits>(lbp, esp_->eventSetupForInstance(ts), cleaningUpAfterException);
582  for_all(subProcesses_, [&lbp, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doEndLuminosityBlock(lbp, ts, cleaningUpAfterException); });
583  }
584 
585  void
588  lbp.setComplete();
589  propagateProducts(InLumi, principal, lbp);
591  lbp.setAtEndTransition(true);
592  endGlobalTransitionAsync<Traits>(std::move(iHolder),
593  *schedule_,
594  lbp,
595  ts,
596  esp_->eventSetupForInstance(ts),
598  cleaningUpAfterException);
599  }
600 
601 
602  void
603  SubProcess::writeLumi(ProcessHistoryID const& parentPhID, int runNumber, int lumiNumber) {
605  std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
606  assert(it != parentToChildPhID_.end());
607  auto const& childPhID = it->second;
608  schedule_->writeLumi(principalCache_.lumiPrincipal(childPhID, runNumber, lumiNumber), &processContext_);
609  for_all(subProcesses_, [&childPhID, runNumber, lumiNumber](auto& subProcess){ subProcess.writeLumi(childPhID, runNumber, lumiNumber); });
610  }
611 
612  void
613  SubProcess::deleteLumiFromCache(ProcessHistoryID const& parentPhID, int runNumber, int lumiNumber) {
614  std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
615  assert(it != parentToChildPhID_.end());
616  auto const& childPhID = it->second;
617  principalCache_.deleteLumi(childPhID, runNumber, lumiNumber);
618  for_all(subProcesses_, [&childPhID, runNumber, lumiNumber](auto& subProcess){ subProcess.deleteLumiFromCache(childPhID, runNumber, lumiNumber); });
619  }
620 
621  void
622  SubProcess::doBeginStream(unsigned int iID) {
624  schedule_->beginStream(iID);
625  for_all(subProcesses_, [iID](auto& subProcess){ subProcess.doBeginStream(iID); });
626  }
627 
628  void
629  SubProcess::doEndStream(unsigned int iID) {
631  schedule_->endStream(iID);
632  for_all(subProcesses_, [iID](auto& subProcess){ subProcess.doEndStream(iID); });
633  }
634 
635  void
636  SubProcess::doStreamBeginRun(unsigned int id, RunPrincipal const& principal, IOVSyncValue const& ts) {
638  {
641  schedule_->processOneStream<Traits>(id,rp, esp_->eventSetupForInstance(ts));
642  for_all(subProcesses_, [id, &rp, &ts](auto& subProcess){ subProcess.doStreamBeginRun(id,rp, ts); });
643  }
644  }
645  void
647  unsigned int id, RunPrincipal const& principal, IOVSyncValue const& ts) {
649 
651 
653 
654  beginStreamTransitionAsync<Traits>(std::move(iHolder),
655  *schedule_,
656  id,
657  rp,
658  ts,
659  esp_->eventSetupForInstance(ts),
660  subProcesses_);
661 
662  }
663 
664 
665  void
666  SubProcess::doStreamEndRun(unsigned int id, RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {
668  {
671  schedule_->processOneStream<Traits>(id,rp, esp_->eventSetupForInstance(ts),cleaningUpAfterException);
672  for_all(subProcesses_, [id, &rp, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doStreamEndRun(id,rp, ts,cleaningUpAfterException); });
673  }
674  }
675 
676  void
678  unsigned int id, RunPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {
682 
683  endStreamTransitionAsync<Traits>(std::move(iHolder),
684  *schedule_,
685  id,
686  rp,
687  ts,
688  esp_->eventSetupForInstance(ts),
690  cleaningUpAfterException);
691  }
692 
693  void
696  {
699  schedule_->processOneStream<Traits>(id,lbp, esp_->eventSetupForInstance(ts));
700  for_all(subProcesses_, [id, &lbp, &ts](auto& subProcess){ subProcess.doStreamBeginLuminosityBlock(id,lbp, ts); });
701  }
702  }
703  void
705  unsigned int id, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts) {
707 
709 
711 
712  beginStreamTransitionAsync<Traits>(std::move(iHolder),
713  *schedule_,
714  id,
715  lbp,
716  ts,
717  esp_->eventSetupForInstance(ts),
718  subProcesses_);
719  }
720 
721 
722 
723  void
724  SubProcess::doStreamEndLuminosityBlock(unsigned int id, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {
726  {
729  schedule_->processOneStream<Traits>(id,lbp, esp_->eventSetupForInstance(ts),cleaningUpAfterException);
730  for_all(subProcesses_, [id, &lbp, &ts, cleaningUpAfterException](auto& subProcess){ subProcess.doStreamEndLuminosityBlock(id,lbp, ts,cleaningUpAfterException); });
731  }
732  }
733 
734  void
736  unsigned int id, LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts, bool cleaningUpAfterException) {
738 
741  endStreamTransitionAsync<Traits>(std::move(iHolder),
742  *schedule_,
743  id,
744  lbp,
745  ts,
746  esp_->eventSetupForInstance(ts),
748  cleaningUpAfterException);
749  }
750 
751 
752  void
754  SelectedProducts const& keptVector = keptProducts()[type];
755  for(auto const& item : keptVector) {
756  BranchDescription const& desc = *item.first;
757  ProductResolverBase const* parentProductResolver = parentPrincipal.getProductResolver(desc.branchID());
758  if(parentProductResolver != nullptr) {
759  ProductResolverBase* productResolver = principal.getModifiableProductResolver(desc.branchID());
760  if(productResolver != nullptr) {
761  //Propagate the per event(run)(lumi) data for this product to the subprocess.
762  //First, the product itself.
763  productResolver->connectTo(*parentProductResolver, &parentPrincipal);
764  }
765  }
766  }
767  }
768 
770  branchIDListHelper_->updateFromParent(branchIDLists);
771  for_all(subProcesses_, [this](auto& subProcess){ subProcess.updateBranchIDListHelper(branchIDListHelper_->branchIDLists()); });
772  }
773 
774  // Call respondToOpenInputFile() on all Modules
775  void
778  schedule_->respondToOpenInputFile(fb);
779  for_all(subProcesses_, [&fb](auto& subProcess){ subProcess.respondToOpenInputFile(fb); });
780  }
781 
782  // free function
783  std::vector<ParameterSet>
785  std::vector<std::string> subProcesses = parameterSet.getUntrackedParameter<std::vector<std::string>>("@all_subprocesses");
786  if(!subProcesses.empty()) {
787  return parameterSet.popVParameterSet("subProcesses");
788  }
789  return {};
790  }
791 }
unsigned int historyRunOffset_
Definition: SubProcess.h:277
unsigned int historyLumiOffset_
Definition: SubProcess.h:276
ParameterSetID selector_config_id_
Definition: SubProcess.h:297
std::shared_ptr< ActivityRegistry > actReg_
Definition: ScheduleItems.h:68
void insert(std::shared_ptr< RunPrincipal > rp)
type
Definition: HCALResponse.h:21
ProductRegistry const & productRegistry() const
Definition: Principal.h:151
ProductResolverBase * getModifiableProductResolver(BranchID const &oid)
Definition: Principal.h:158
void doEventAsync(WaitingTaskHolder iHolder, EventPrincipal const &principal)
Definition: SubProcess.cc:334
T getUntrackedParameter(std::string const &, T const &) const
bool selected(BranchDescription const &desc) const
std::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &)
void setLuminosityBlockPrincipal(std::shared_ptr< LuminosityBlockPrincipal > const &lbp)
EventSelectionIDVector const & eventSelectionIDs() const
void respondToOpenInputFile(FileBlock const &fb)
Definition: SubProcess.cc:776
void doStreamEndLuminosityBlock(unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:724
BranchType const & branchType() const
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
ProcessHistoryID const & reducedProcessHistoryID() const
Definition: RunPrincipal.h:65
void doBeginRun(RunPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:411
void deleteLumi(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi)
void writeLumi(ProcessHistoryID const &parentPhID, int runNumber, int lumiNumber)
Definition: SubProcess.cc:603
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:646
std::unique_ptr< ParameterSet > popParameterSet(std::string const &name)
static void fillDroppedToKept(ProductRegistry const &preg, std::map< BranchID, BranchDescription const * > const &trueBranchIDToKeptBranchDesc, std::map< BranchID::value_type, BranchID::value_type > &droppedBranchIDToKeptBranchID_)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: ScheduleItems.h:73
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: SubProcess.h:258
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: ScheduleItems.h:62
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
Definition: SubProcess.h:278
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: SubProcess.h:268
std::vector< SubProcess > subProcesses_
Definition: SubProcess.h:284
Definition: Hash.h:43
void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &, bool iPrefetchMayGet)
void beginRun(RunPrincipal const &r, IOVSyncValue const &ts)
Definition: SubProcess.cc:417
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:677
PathsAndConsumesOfModules pathsAndConsumesOfModules_
Definition: SubProcess.h:273
virtual ~SubProcess()
Definition: SubProcess.cc:205
LuminosityBlockPrincipal & lumiPrincipal(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
bool exists(std::string const &parameterName) const
checks if a parameter exists
void updateBranchIDListHelper(BranchIDLists const &)
Definition: SubProcess.cc:769
LuminosityBlockAuxiliary const & aux() const
SelectedProductsForBranchType const & keptProducts() const
Definition: SubProcess.h:70
std::string const & processName() const
LuminosityBlockIndex index() const
void setAtEndTransition(bool iAtEnd)
Definition: Principal.cc:323
void doEndStream(unsigned int)
Definition: SubProcess.cc:629
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:735
void setParentProcessContext(ProcessContext const *parentProcessContext)
void doBeginLuminosityBlock(LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:523
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
Definition: SubProcess.h:254
BranchListIndexes const & branchListIndexes() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
Definition: SubProcess.h:282
ProcessHistory const & processHistory() const
Definition: Principal.h:141
BranchType
Definition: BranchType.h:11
std::vector< EventSelectionID > EventSelectionIDVector
EDGetTokenT< ProductType > consumes(edm::InputTag const &tag)
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
ProductList const & productList() const
void selectAssociationProducts(std::vector< BranchDescription const * > const &associationDescriptions, std::set< BranchID > const &keptProductsInEvent, std::map< BranchID, bool > &keepAssociation) const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
virtual void connectTo(ProductResolverBase const &, Principal const *)=0
ServiceToken serviceToken_
Definition: SubProcess.h:264
void deleteRunFromCache(ProcessHistoryID const &parentPhID, int runNumber)
Definition: SubProcess.cc:514
def principal(options)
std::vector< BranchListIndex > BranchListIndexes
void doneWaiting(std::exception_ptr iExcept)
void processAsync(WaitingTaskHolder iHolder, EventPrincipal const &e)
Definition: SubProcess.cc:350
void doEndRunAsync(WaitingTaskHolder iHolder, RunPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:485
std::string const & moduleLabel() const
void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:546
unsigned int value_type
Definition: BranchID.h:16
std::string const & productInstanceName() const
ProcessHistoryID const & processHistoryID() const
Definition: Principal.h:145
std::shared_ptr< CommonParams > initMisc(ParameterSet &parameterSet)
std::shared_ptr< SubProcessParentageHelper > & subProcessParentageHelper()
Definition: ScheduleItems.h:64
SelectedProductsForBranchType keptProducts_
Definition: SubProcess.h:291
void selectProducts(ProductRegistry const &preg, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, std::map< BranchID, bool > &keepAssociation)
Definition: SubProcess.cc:253
void doBeginRunAsync(WaitingTaskHolder iHolder, RunPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:439
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
static void setThrowAnException(bool v)
ProductSelectorRules productSelectorRules_
Definition: SubProcess.h:292
StreamID streamID() const
TypeID unwrappedTypeID() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
Definition: SubProcess.h:281
std::vector< BranchDescription const * > allBranchDescriptions() const
edm::propagate_const< std::unique_ptr< ParameterSet > > processParameterSet_
Definition: SubProcess.h:285
void doEndLuminosityBlockAsync(WaitingTaskHolder iHolder, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:586
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector, ConsumesCollector &&iC)
BranchID const & branchID() const
TypeWithDict const & unwrappedType() const
RunAuxiliary const & aux() const
Definition: RunPrincipal.h:57
void keepThisBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const * > &trueBranchIDToKeptBranchDesc, std::set< BranchID > &keptProductsInEvent)
Definition: SubProcess.cc:295
ProductProvenanceRetriever const * productProvenanceRetrieverPtr() const
void beginLuminosityBlock(LuminosityBlockPrincipal const &lb, IOVSyncValue const &ts)
Definition: SubProcess.cc:529
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:704
std::unique_ptr< ExceptionToActionTable const > act_table_
Definition: SubProcess.h:270
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet &parameterSet)
Definition: SubProcess.cc:784
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
DelayedReader * reader() const
Definition: Principal.h:184
void doEndRun(RunPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:468
std::shared_ptr< ProcessConfiguration const > processConfiguration() const
Definition: ScheduleItems.h:65
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
Definition: SubProcess.h:302
ProductSelector productSelector_
Definition: SubProcess.h:293
unsigned int value() const
Definition: StreamID.h:46
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
detail::TriggerResultsBasedEventSelector selectors_
Definition: SubProcess.h:298
void doStreamBeginLuminosityBlock(unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:694
ServiceToken initServices(std::vector< ParameterSet > &servicePSets, ParameterSet &processPSet, ServiceToken const &iToken, serviceregistry::ServiceLegacy iLegacy, bool associate)
void doStreamEndRun(unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:666
void connectToSubProcess(ActivityRegistry &iOther)
std::map< ProcessHistoryID, ProcessHistoryID > parentToChildPhID_
Definition: SubProcess.h:283
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::shared_ptr< ActivityRegistry > actReg_
Definition: SubProcess.h:263
SubProcess(ParameterSet &parameterSet, ParameterSet const &topLevelParameterSet, std::shared_ptr< ProductRegistry const > parentProductRegistry, std::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, SubProcessParentageHelper const &parentSubProcessParentageHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext)
Definition: SubProcess.cc:44
void deleteLumiFromCache(ProcessHistoryID const &parentPhID, int runNumber, int lumiNumber)
Definition: SubProcess.cc:613
ServiceToken addCPRandTNS(ParameterSet const &parameterSet, ServiceToken const &token)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: SubProcess.h:267
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
void endRun(RunPrincipal const &r, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:474
RunIndex index() const
Definition: RunPrincipal.h:53
bool anyProductProduced() const
std::vector< HistoryAppender > historyAppenders_
Definition: SubProcess.h:279
std::vector< BranchID::value_type > BranchIDList
Definition: BranchIDList.h:18
std::vector< ParameterSet > popVParameterSet(std::string const &name)
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
void endLuminosityBlock(LuminosityBlockPrincipal const &lb, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:575
HLT enums.
bool initialized() const
std::vector< std::string > const & getAllTriggerNames()
void initialize(ProductSelectorRules const &rules, std::vector< BranchDescription const * > const &branchDescriptions)
void propagateProducts(BranchType type, Principal const &parentPrincipal, Principal &principal) const
Definition: SubProcess.cc:753
void doBeginStream(unsigned int)
Definition: SubProcess.cc:622
void setConsumer(EDConsumerBase const *iConsumer)
std::shared_ptr< ProductRegistry const > parentPreg_
Definition: SubProcess.h:265
std::shared_ptr< SignallingProductRegistry const > preg() const
Definition: ScheduleItems.h:58
std::shared_ptr< LuminosityBlockPrincipal > const & lumiPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run, LuminosityBlockNumber_t lumi) const
ConstProductResolverPtr getProductResolver(BranchID const &oid) const
Definition: Principal.cc:481
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
Definition: SubProcess.cc:319
ProcessContext processContext_
Definition: SubProcess.h:272
void writeRun(ProcessHistoryID const &parentPhID, int runNumber)
Definition: SubProcess.cc:504
BranchID const & originalBranchID() const
void call(std::function< void(void)>)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Definition: ScheduleItems.h:60
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Definition: SubProcess.h:260
EventAuxiliary const & aux() const
std::unique_ptr< Schedule > initSchedule(ParameterSet &parameterSet, bool hasSubprocesses, PreallocationConfiguration const &iAllocConfig, ProcessContext const *)
ParameterSet const & registerIt()
std::shared_ptr< ProductRegistry const > preg_
Definition: SubProcess.h:266
PrincipalCache principalCache_
Definition: SubProcess.h:280
ParameterSet const & parameterSet(Provenance const &provenance)
Definition: Provenance.cc:11
def move(src, dest)
Definition: eostools.py:510
void doStreamBeginRun(unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts)
Definition: SubProcess.cc:636
edm::propagate_const< std::shared_ptr< SubProcessParentageHelper > > subProcessParentageHelper_
Definition: SubProcess.h:269
void doEndLuminosityBlock(LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
Definition: SubProcess.cc:569
static void checkForDuplicateKeptBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const * > &trueBranchIDToKeptBranchDesc)
std::shared_ptr< ProcessConfiguration const > processConfiguration_
Definition: SubProcess.h:271
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)