42 #include "boost/range/adaptor/reversed.hpp"
52 std::shared_ptr<ProductRegistry const> parentProductRegistry,
53 std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
65 parentPreg_(parentProductRegistry),
67 branchIDListHelper_(),
69 processConfiguration_(),
71 historyRunOffset_(historyLumiOffset_ + preallocConfig.numberOfLuminosityBlocks()),
72 processHistoryRegistries_(historyRunOffset_ + preallocConfig.numberOfRuns()),
73 historyAppenders_(historyRunOffset_ + preallocConfig.numberOfRuns()),
79 processParameterSet_(),
80 productSelectorRules_(
parameterSet,
"outputCommands",
"OutputModule"),
82 wantAllEvents_(
true) {
91 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
93 selectevents,
"", outputModulePathPositions, parentProductRegistry->anyProductProduced());
95 std::map<BranchID, bool> keepAssociation;
96 selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
118 if (topLevelParameterSet.
exists(maxLumis)) {
125 bool hasSubProcesses = subProcessVParameterSet.size() != 0ull;
185 auto ep = std::make_shared<EventPrincipal>(
preg_,
196 auto lbpp = std::make_unique<LuminosityBlockPrincipal>(
212 for (
auto& subProcessPSet : subProcessVParameterSet) {
214 topLevelParameterSet,
239 std::vector<ModuleProcessName> consumedByChildren;
241 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
242 if (consumedByChildren.empty()) {
244 }
else if (not
c.empty()) {
245 std::vector<ModuleProcessName>
tmp;
246 tmp.reserve(consumedByChildren.size() +
c.size());
247 std::merge(consumedByChildren.begin(), consumedByChildren.end(),
c.begin(),
c.end(), std::back_inserter(
tmp));
255 not unusedModules.empty()) {
258 edm::LogInfo(
"DeleteModules").log([&unusedModules,
this](
auto&
l) {
259 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, "
261 "therefore they are deleted from SubProcess "
275 for (
auto const& dep :
278 consumedByChildren.end(),
280 consumedByChildren.emplace(it, dep.moduleLabel(), dep.processName());
283 return consumedByChildren;
309 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
312 c.call([&subProcess]() { subProcess.doEndJob(); });
321 std::map<BranchID, bool>& keepAssociation) {
330 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
331 std::vector<BranchDescription const*> associationDescriptions;
332 std::set<BranchID> keptProductsInEvent;
336 if (
desc.transient()) {
338 }
else if (!
desc.present() && !
desc.produced()) {
342 associationDescriptions.push_back(&
desc);
349 associationDescriptions, keptProductsInEvent, keepAssociation);
362 std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
363 std::set<BranchID>& keptProductsInEvent) {
367 if (
desc.produced()) {
368 keptProductsInEvent.insert(
desc.originalBranchID());
370 keptProductsInEvent.insert(
desc.branchID());
381 std::map<BranchID::value_type, BranchID::value_type>
const& droppedBranchIDToKeptBranchID) {
386 std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
389 branchID = iter->second;
400 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
416 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
427 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
430 bool deepCopyRetriever =
false;
431 ep.fillEventPrincipal(
445 ep.clearEventPrincipal();
454 afterProcessTask =
std::move(finalizeEventTask);
458 make_waiting_task([
this, &
ep, finalizeEventTask, iEventSetupImpls](std::exception_ptr
const* iPtr)
mutable {
461 subProcess.doEventAsync(finalizeEventTask,
ep, iEventSetupImpls);
473 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>>(
480 propagateProducts(
InProcess, parentPrincipal, processBlockPrincipal);
484 beginGlobalTransitionAsync<Traits>(
485 std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_, cleaningUpAfterException);
489 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>>(
496 propagateProducts(
InProcess, parentPrincipal, processBlockPrincipal);
500 beginGlobalTransitionAsync<Traits>(
std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
505 bool cleaningUpAfterException) {
513 auto& taskGroup = *iHolder.
group();
514 auto runEndProcessBlock =
516 std::exception_ptr
const* iPtr)
mutable {
523 iWait.doneWaiting(*iPtr);
526 endGlobalTransitionAsync<Traits>(
532 auto runWriteProcessBlock =
535 iWait.doneWaiting(*iPtr);
548 beginGlobalTransitionAsync<TraitsInput>(
std::move(writeHolder),
553 cleaningUpAfterException);
557 endGlobalTransitionAsync<Traits>(
566 auto aux = std::make_shared<RunAuxiliary>(parentPrincipal.
aux());
568 auto rpp = std::make_shared<RunPrincipal>(
aux,
572 parentPrincipal.
index(),
575 processHistoryRegistry.registerProcessHistory(parentPrincipal.
processHistory());
576 rpp->fillRunPrincipal(processHistoryRegistry, parentPrincipal.
reader());
594 bool cleaningUpAfterException) {
601 endGlobalTransitionAsync<Traits>(
610 task.doneWaiting(*iExcept);
614 s.writeProcessBlockAsync(
task, processBlockType);
629 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it =
parentToChildPhID_.find(parentPhID);
631 auto const& childPhID = it->second;
634 [
this, childPhID,
runNumber,
task, mergeableRunProductMetadata](std::exception_ptr
const* iExcept)
mutable {
636 task.doneWaiting(*iExcept);
640 s.writeRunAsync(
task, childPhID,
runNumber, mergeableRunProductMetadata);
648 mergeableRunProductMetadata);
652 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it =
parentToChildPhID_.find(parentPhID);
654 auto const& childPhID = it->second;
657 [&childPhID,
runNumber](
auto& subProcess) { subProcess.deleteRunFromCache(childPhID,
runNumber); });
664 s.clearProcessBlockPrincipal(processBlockType);
672 auto aux = parentPrincipal.
aux();
678 processHistoryRegistry.registerProcessHistory(parentPrincipal.
processHistory());
679 lbpp->fillLuminosityBlockPrincipal(&parentPrincipal.
processHistory(), parentPrincipal.
reader());
684 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
692 bool cleaningUpAfterException) {
697 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
700 endGlobalTransitionAsync<Traits>(
710 task.doneWaiting(*iExcept);
714 s.writeLumiAsync(
task, *
l);
725 s.deleteLumiFromCache(*lb);
727 lb->clearPrincipal();
748 beginStreamTransitionAsync<Traits>(
755 bool cleaningUpAfterException) {
760 endStreamTransitionAsync<Traits>(
770 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
772 beginStreamTransitionAsync<Traits>(
779 bool cleaningUpAfterException) {
782 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
784 endStreamTransitionAsync<Traits>(
790 for (
auto const&
item : keptVector) {
793 if (parentProductResolver !=
nullptr) {
795 if (productResolver !=
nullptr) {
797 productResolver->
connectTo(*parentProductResolver, &parentPrincipal);
805 for (
auto const&
item : keptVector) {
809 if (parentProductResolver !=
nullptr) {
811 if (productResolver !=
nullptr) {
812 if (parentProductResolver->branchDescription().produced()) {
824 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
836 std::vector<std::string> subProcesses =
838 if (!subProcesses.empty()) {