41 #include "boost/range/adaptor/reversed.hpp"
50 std::shared_ptr<ProductRegistry const> parentProductRegistry,
51 std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
62 parentPreg_(parentProductRegistry),
64 branchIDListHelper_(),
66 processConfiguration_(),
68 historyRunOffset_(historyLumiOffset_ + preallocConfig.numberOfLuminosityBlocks()),
69 processHistoryRegistries_(historyRunOffset_ + preallocConfig.numberOfRuns()),
70 historyAppenders_(historyRunOffset_ + preallocConfig.numberOfRuns()),
76 processParameterSet_(),
77 productSelectorRules_(
parameterSet,
"outputCommands",
"OutputModule"),
79 wantAllEvents_(
true) {
88 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
90 selectevents,
"", outputModulePathPositions, parentProductRegistry->anyProductProduced());
92 std::map<BranchID, bool> keepAssociation;
93 selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
115 if (topLevelParameterSet.
exists(maxLumis)) {
122 bool hasSubProcesses = subProcessVParameterSet.size() != 0ull;
179 auto ep = std::make_shared<EventPrincipal>(
preg_,
189 auto lbpp = std::make_unique<LuminosityBlockPrincipal>(
205 for (
auto& subProcessPSet : subProcessVParameterSet) {
207 topLevelParameterSet,
231 std::vector<ModuleProcessName> consumedByChildren;
233 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
234 if (consumedByChildren.empty()) {
236 }
else if (not
c.empty()) {
237 std::vector<ModuleProcessName>
tmp;
238 tmp.reserve(consumedByChildren.size() +
c.size());
239 std::merge(consumedByChildren.begin(), consumedByChildren.end(),
c.begin(),
c.end(), std::back_inserter(
tmp));
247 not unusedModules.empty()) {
250 edm::LogInfo(
"DeleteModules").log([&unusedModules,
this](
auto&
l) {
251 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
253 "therefore they are deleted from SubProcess "
267 for (
auto const& dep :
270 consumedByChildren.end(),
272 consumedByChildren.emplace(it, dep.moduleLabel(), dep.processName());
275 return consumedByChildren;
301 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
304 c.call([&subProcess]() { subProcess.doEndJob(); });
313 std::map<BranchID, bool>& keepAssociation) {
322 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
323 std::vector<BranchDescription const*> associationDescriptions;
324 std::set<BranchID> keptProductsInEvent;
328 if (
desc.transient()) {
330 }
else if (!
desc.present() && !
desc.produced()) {
334 associationDescriptions.push_back(&
desc);
341 associationDescriptions, keptProductsInEvent, keepAssociation);
354 std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
355 std::set<BranchID>& keptProductsInEvent) {
359 if (
desc.produced()) {
360 keptProductsInEvent.insert(
desc.originalBranchID());
362 keptProductsInEvent.insert(
desc.branchID());
373 std::map<BranchID::value_type, BranchID::value_type>
const& droppedBranchIDToKeptBranchID) {
378 std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
381 branchID = iter->second;
392 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
408 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
419 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
422 bool deepCopyRetriever =
false;
423 ep.fillEventPrincipal(
436 ep.clearEventPrincipal();
445 afterProcessTask =
std::move(finalizeEventTask);
449 make_waiting_task([
this, &
ep, finalizeEventTask, iEventSetupImpls](std::exception_ptr
const* iPtr)
mutable {
452 subProcess.doEventAsync(finalizeEventTask,
ep, iEventSetupImpls);
464 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>>(
471 propagateProducts(
InProcess, parentPrincipal, processBlockPrincipal);
475 beginGlobalTransitionAsync<Traits>(
std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
479 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>>(
486 propagateProducts(
InProcess, parentPrincipal, processBlockPrincipal);
490 beginGlobalTransitionAsync<Traits>(
std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
495 bool cleaningUpAfterException) {
502 endGlobalTransitionAsync<Traits>(
510 auto aux = std::make_shared<RunAuxiliary>(parentPrincipal.
aux());
512 auto rpp = std::make_shared<RunPrincipal>(
aux,
516 parentPrincipal.
index(),
519 processHistoryRegistry.registerProcessHistory(parentPrincipal.
processHistory());
520 rpp->fillRunPrincipal(processHistoryRegistry, parentPrincipal.
reader());
538 bool cleaningUpAfterException) {
545 endGlobalTransitionAsync<Traits>(
554 task.doneWaiting(*iExcept);
558 s.writeProcessBlockAsync(
task, processBlockType);
573 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it =
parentToChildPhID_.find(parentPhID);
575 auto const& childPhID = it->second;
578 [
this, childPhID,
runNumber,
task, mergeableRunProductMetadata](std::exception_ptr
const* iExcept)
mutable {
580 task.doneWaiting(*iExcept);
584 s.writeRunAsync(
task, childPhID,
runNumber, mergeableRunProductMetadata);
592 mergeableRunProductMetadata);
596 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it =
parentToChildPhID_.find(parentPhID);
598 auto const& childPhID = it->second;
601 [&childPhID,
runNumber](
auto& subProcess) { subProcess.deleteRunFromCache(childPhID,
runNumber); });
608 s.clearProcessBlockPrincipal(processBlockType);
616 auto aux = parentPrincipal.
aux();
622 processHistoryRegistry.registerProcessHistory(parentPrincipal.
processHistory());
623 lbpp->fillLuminosityBlockPrincipal(&parentPrincipal.
processHistory(), parentPrincipal.
reader());
628 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
636 bool cleaningUpAfterException) {
641 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
644 endGlobalTransitionAsync<Traits>(
654 task.doneWaiting(*iExcept);
658 s.writeLumiAsync(
task, *
l);
669 s.deleteLumiFromCache(*lb);
671 lb->clearPrincipal();
692 beginStreamTransitionAsync<Traits>(
699 bool cleaningUpAfterException) {
704 endStreamTransitionAsync<Traits>(
714 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
716 beginStreamTransitionAsync<Traits>(
723 bool cleaningUpAfterException) {
726 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
728 endStreamTransitionAsync<Traits>(
734 for (
auto const&
item : keptVector) {
737 if (parentProductResolver !=
nullptr) {
739 if (productResolver !=
nullptr) {
741 productResolver->
connectTo(*parentProductResolver, &parentPrincipal);
750 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
762 std::vector<std::string> subProcesses =
764 if (!subProcesses.empty()) {