41 #include "boost/range/adaptor/reversed.hpp"
50 std::shared_ptr<ProductRegistry const> parentProductRegistry,
51 std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
62 parentPreg_(parentProductRegistry),
64 branchIDListHelper_(),
66 processConfiguration_(),
68 historyRunOffset_(historyLumiOffset_ + preallocConfig.numberOfLuminosityBlocks()),
69 processHistoryRegistries_(historyRunOffset_ + preallocConfig.numberOfRuns()),
70 historyAppenders_(historyRunOffset_ + preallocConfig.numberOfRuns()),
76 processParameterSet_(),
77 productSelectorRules_(
parameterSet,
"outputCommands",
"OutputModule"),
79 wantAllEvents_(
true) {
88 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
90 selectevents,
"", outputModulePathPositions, parentProductRegistry->anyProductProduced());
92 std::map<BranchID, bool> keepAssociation;
93 selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
115 if (topLevelParameterSet.
exists(maxLumis)) {
122 bool hasSubProcesses = subProcessVParameterSet.size() != 0ull;
179 auto ep = std::make_shared<EventPrincipal>(
preg_,
189 auto lbpp = std::make_unique<LuminosityBlockPrincipal>(
205 for (
auto& subProcessPSet : subProcessVParameterSet) {
207 topLevelParameterSet,
250 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
253 c.call([&subProcess]() { subProcess.doEndJob(); });
262 std::map<BranchID, bool>& keepAssociation) {
271 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
272 std::vector<BranchDescription const*> associationDescriptions;
273 std::set<BranchID> keptProductsInEvent;
277 if (
desc.transient()) {
279 }
else if (!
desc.present() && !
desc.produced()) {
283 associationDescriptions.push_back(&
desc);
290 associationDescriptions, keptProductsInEvent, keepAssociation);
303 std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
304 std::set<BranchID>& keptProductsInEvent) {
308 if (
desc.produced()) {
309 keptProductsInEvent.insert(
desc.originalBranchID());
311 keptProductsInEvent.insert(
desc.branchID());
322 std::map<BranchID::value_type, BranchID::value_type>
const& droppedBranchIDToKeptBranchID) {
327 std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
330 branchID = iter->second;
341 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
357 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
368 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
371 bool deepCopyRetriever =
false;
372 ep.fillEventPrincipal(
384 make_waiting_task(tbb::task::allocate_root(), [&
ep, iHolder](std::exception_ptr
const* iPtr)
mutable {
385 ep.clearEventPrincipal();
394 afterProcessTask =
std::move(finalizeEventTask);
398 [
this, &
ep, finalizeEventTask, iEventSetupImpls](std::exception_ptr
const* iPtr)
mutable {
401 subProcess.doEventAsync(finalizeEventTask,
ep, iEventSetupImpls);
413 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>>(
420 propagateProducts(
InProcess, parentPrincipal, processBlockPrincipal);
424 beginGlobalTransitionAsync<Traits>(
std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
428 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>>(
435 propagateProducts(
InProcess, parentPrincipal, processBlockPrincipal);
439 beginGlobalTransitionAsync<Traits>(
std::move(iHolder), *schedule_, transitionInfo, serviceToken_, subProcesses_);
444 bool cleaningUpAfterException) {
451 endGlobalTransitionAsync<Traits>(
459 auto aux = std::make_shared<RunAuxiliary>(parentPrincipal.
aux());
461 auto rpp = std::make_shared<RunPrincipal>(
aux,
465 parentPrincipal.
index(),
468 processHistoryRegistry.registerProcessHistory(parentPrincipal.
processHistory());
469 rpp->fillRunPrincipal(processHistoryRegistry, parentPrincipal.
reader());
487 bool cleaningUpAfterException) {
494 endGlobalTransitionAsync<Traits>(
502 [
this,
task, processBlockType](std::exception_ptr
const* iExcept)
mutable {
504 task.doneWaiting(*iExcept);
508 s.writeProcessBlockAsync(
task, processBlockType);
523 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it =
parentToChildPhID_.find(parentPhID);
525 auto const& childPhID = it->second;
528 tbb::task::allocate_root(),
529 [
this, childPhID,
runNumber,
task, mergeableRunProductMetadata](std::exception_ptr
const* iExcept)
mutable {
531 task.doneWaiting(*iExcept);
535 s.writeRunAsync(
task, childPhID,
runNumber, mergeableRunProductMetadata);
543 mergeableRunProductMetadata);
547 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it =
parentToChildPhID_.find(parentPhID);
549 auto const& childPhID = it->second;
552 [&childPhID,
runNumber](
auto& subProcess) { subProcess.deleteRunFromCache(childPhID,
runNumber); });
559 s.clearProcessBlockPrincipal(processBlockType);
567 auto aux = parentPrincipal.
aux();
573 processHistoryRegistry.registerProcessHistory(parentPrincipal.
processHistory());
574 lbpp->fillLuminosityBlockPrincipal(&parentPrincipal.
processHistory(), parentPrincipal.
reader());
579 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
587 bool cleaningUpAfterException) {
592 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
595 endGlobalTransitionAsync<Traits>(
606 task.doneWaiting(*iExcept);
610 s.writeLumiAsync(
task, *
l);
621 s.deleteLumiFromCache(*lb);
623 lb->clearPrincipal();
644 beginStreamTransitionAsync<Traits>(
651 bool cleaningUpAfterException) {
656 endStreamTransitionAsync<Traits>(
666 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
668 beginStreamTransitionAsync<Traits>(
675 bool cleaningUpAfterException) {
678 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
680 endStreamTransitionAsync<Traits>(
686 for (
auto const&
item : keptVector) {
689 if (parentProductResolver !=
nullptr) {
691 if (productResolver !=
nullptr) {
693 productResolver->
connectTo(*parentProductResolver, &parentPrincipal);
702 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
714 std::vector<std::string> subProcesses =
716 if (!subProcesses.empty()) {