43 #include "boost/range/adaptor/reversed.hpp" 53 std::shared_ptr<ProductRegistry const> parentProductRegistry,
54 std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
66 parentPreg_(parentProductRegistry),
68 branchIDListHelper_(),
70 processConfiguration_(),
72 historyRunOffset_(historyLumiOffset_ + preallocConfig.numberOfLuminosityBlocks()),
73 processHistoryRegistries_(historyRunOffset_ + preallocConfig.numberOfRuns()),
74 historyAppenders_(historyRunOffset_ + preallocConfig.numberOfRuns()),
80 processParameterSet_(),
81 productSelectorRules_(
parameterSet,
"outputCommands",
"OutputModule"),
83 wantAllEvents_(
true) {
92 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
94 selectevents,
"", outputModulePathPositions, parentProductRegistry->anyProductProduced());
96 std::map<BranchID, bool> keepAssociation;
97 selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
119 if (topLevelParameterSet.
exists(maxLumis)) {
126 bool hasSubProcesses = subProcessVParameterSet.size() != 0ull;
186 auto ep = std::make_shared<EventPrincipal>(
preg_,
197 auto lbpp = std::make_unique<LuminosityBlockPrincipal>(
213 for (
auto& subProcessPSet : subProcessVParameterSet) {
215 topLevelParameterSet,
240 std::vector<ModuleProcessName> consumedByChildren;
242 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
243 if (consumedByChildren.empty()) {
245 }
else if (not
c.empty()) {
246 std::vector<ModuleProcessName>
tmp;
247 tmp.reserve(consumedByChildren.size() +
c.size());
248 std::merge(consumedByChildren.begin(), consumedByChildren.end(),
c.begin(),
c.end(), std::back_inserter(
tmp));
256 not unusedModules.empty()) {
259 edm::LogInfo(
"DeleteModules").log([&unusedModules,
this](
auto&
l) {
260 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, " 262 "therefore they are deleted from SubProcess " 276 for (
auto const& dep :
279 consumedByChildren.end(),
281 consumedByChildren.emplace(it, dep.moduleLabel(), dep.processName());
284 return consumedByChildren;
310 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
313 c.call([&subProcess]() { subProcess.doEndJob(); });
322 std::map<BranchID, bool>& keepAssociation) {
331 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
332 std::vector<BranchDescription const*> associationDescriptions;
333 std::set<BranchID> keptProductsInEvent;
337 if (
desc.transient()) {
339 }
else if (!
desc.present() && !
desc.produced()) {
343 associationDescriptions.push_back(&
desc);
350 associationDescriptions, keptProductsInEvent, keepAssociation);
363 std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
364 std::set<BranchID>& keptProductsInEvent) {
368 if (
desc.produced()) {
369 keptProductsInEvent.insert(
desc.originalBranchID());
371 keptProductsInEvent.insert(
desc.branchID());
382 std::map<BranchID::value_type, BranchID::value_type>
const& droppedBranchIDToKeptBranchID) {
387 std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
390 branchID = iter->second;
401 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
417 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
428 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
431 bool deepCopyRetriever =
false;
432 ep.fillEventPrincipal(
450 subProcess.doEventAsync(nextTask,
ep, iEventSetupImpls);
452 }) |
chain::then([&
ep](std::exception_ptr
const* iPtr,
auto nextTask) {
453 ep.clearEventPrincipal();
455 nextTask.doneWaiting(*iPtr);
461 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>>(
472 beginGlobalTransitionAsync<Traits>(
477 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>>(
493 bool cleaningUpAfterException) {
507 chain::first([&](
const std::exception_ptr*,
auto nextTask) {
509 beginGlobalTransitionAsync<TraitsInput>(
std::move(nextTask),
514 cleaningUpAfterException);
516 chain::then([
this,
info = transitionInfo, cleaningUpAfterException](std::exception_ptr
const* iPtr,
517 auto nextTask)
mutable {
524 nextTask.doneWaiting(*iPtr);
527 endGlobalTransitionAsync<Traits>(
534 endGlobalTransitionAsync<Traits>(
543 auto aux = std::make_shared<RunAuxiliary>(parentPrincipal.
aux());
545 auto rpp = std::make_shared<RunPrincipal>(
aux,
549 parentPrincipal.
index(),
552 processHistoryRegistry.registerProcessHistory(parentPrincipal.
processHistory());
553 rpp->fillRunPrincipal(processHistoryRegistry, parentPrincipal.
reader());
571 bool cleaningUpAfterException) {
578 endGlobalTransitionAsync<Traits>(
584 chain::first([&](std::exception_ptr
const*,
auto nextTask) {
591 s.writeProcessBlockAsync(nextTask, processBlockType);
601 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it =
parentToChildPhID_.find(parentPhID);
603 auto const& childPhID = it->second;
606 chain::first([&](std::exception_ptr
const*,
auto nextTask) {
612 mergeableRunProductMetadata);
615 [
this, childPhID,
runNumber, mergeableRunProductMetadata](
auto nextTask) {
618 s.writeRunAsync(nextTask, childPhID,
runNumber, mergeableRunProductMetadata);
625 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it =
parentToChildPhID_.find(parentPhID);
627 auto const& childPhID = it->second;
630 [&childPhID,
runNumber](
auto& subProcess) { subProcess.deleteRunFromCache(childPhID,
runNumber); });
637 s.clearProcessBlockPrincipal(processBlockType);
645 auto aux = parentPrincipal.
aux();
651 processHistoryRegistry.registerProcessHistory(parentPrincipal.
processHistory());
652 lbpp->fillLuminosityBlockPrincipal(&parentPrincipal.
processHistory(), parentPrincipal.
reader());
657 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
665 bool cleaningUpAfterException) {
670 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
673 endGlobalTransitionAsync<Traits>(
681 chain::first([&](std::exception_ptr
const*,
auto nextTask) {
687 s.writeLumiAsync(nextTask, *
l);
696 s.deleteLumiFromCache(*lb);
698 lb->clearPrincipal();
719 beginStreamTransitionAsync<Traits>(
726 bool cleaningUpAfterException) {
731 endStreamTransitionAsync<Traits>(
741 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
743 beginStreamTransitionAsync<Traits>(
750 bool cleaningUpAfterException) {
753 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
755 endStreamTransitionAsync<Traits>(
761 for (
auto const&
item : keptVector) {
764 if (parentProductResolver !=
nullptr) {
766 if (productResolver !=
nullptr) {
768 productResolver->
connectTo(*parentProductResolver, &parentPrincipal);
776 for (
auto const&
item : keptVector) {
780 if (parentProductResolver !=
nullptr) {
782 if (productResolver !=
nullptr) {
783 if (parentProductResolver->branchDescription().produced()) {
795 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
807 std::vector<std::string> subProcesses =
809 if (!subProcesses.empty()) {
unsigned int historyRunOffset_
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &, bool cleaningUpAfterException)
unsigned int historyLumiOffset_
ParameterSetID selector_config_id_
std::vector< BranchDescription const * > allBranchDescriptions() const
ProductResolverBase * getModifiableProductResolver(BranchID const &oid)
bool wantEvent(EventForOutput const &e)
void respondToOpenInputFile(FileBlock const &fb)
SelectedProductsForBranchType const & keptProducts() const
std::vector< BranchIDList > BranchIDLists
std::vector< ModuleDescription const * > const & allModules() const
EventAuxiliary const & aux() const
std::unique_ptr< ParameterSet > popParameterSet(std::string const &name)
std::vector< std::string > const & getAllTriggerNames()
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
ProductList const & productList() const
LuminosityBlockPrincipal & principal()
bool parentProducedProductIsKept(Principal const &parentPrincipal, Principal &principal) const
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
void clearProcessBlockPrincipal(ProcessBlockType)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
ProductRegistry const & productRegistry() const
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::shared_ptr< SubProcessBlockHelper > > processBlockHelper_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &, bool iPrefetchMayGet)
PathsAndConsumesOfModules pathsAndConsumesOfModules_
bool exists(std::string const ¶meterName) const
checks if a parameter exists
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &, bool cleaningUpAfterException)
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< ModuleProcessName > const & modulesInPreviousProcessesWhoseProductsAreConsumedBy(unsigned int moduleID) const
void updateBranchIDListHelper(BranchIDLists const &)
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
LuminosityBlockPrincipal const & luminosityBlockPrincipal() const
void processAsync(WaitingTaskHolder iHolder, EventPrincipal const &e, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
std::string_view moduleLabel() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void doEndStream(unsigned int)
void initialize(ProductSelectorRules const &rules, std::vector< BranchDescription const *> const &branchDescriptions)
void setParentProcessContext(ProcessContext const *parentProcessContext)
StreamID streamID() const
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
constexpr auto then(O &&iO)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void doEndProcessBlockAsync(WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
std::vector< EventSelectionID > EventSelectionIDVector
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
EventToProcessBlockIndexes const & eventToProcessBlockIndexes() const
EDGetTokenT< ProductType > consumes(edm::InputTag const &tag)
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
bool anyProductProduced() const
ProcessBlockPrincipal & processBlockPrincipal() const
T getUntrackedParameter(std::string const &, T const &) const
void doEventAsync(WaitingTaskHolder iHolder, EventPrincipal const &principal, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
virtual void connectTo(ProductResolverBase const &, Principal const *)=0
ServiceToken serviceToken_
void deleteRunFromCache(ProcessHistoryID const &parentPhID, int runNumber)
std::vector< BranchListIndex > BranchListIndexes
std::vector< std::shared_ptr< LuminosityBlockPrincipal > > inUseLumiPrincipals_
ParameterSet const & registerIt()
void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const &iTransitionInfo)
std::tuple< layerClusterToCaloParticle, caloParticleToLayerCluster > association
ProcessHistoryID const & reducedProcessHistoryID() const
auto runLast(edm::WaitingTaskHolder iTask)
static void checkForDuplicateKeptBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const *> &trueBranchIDToKeptBranchDesc)
SelectedProductsForBranchType keptProducts_
std::vector< std::shared_ptr< const EventSetupImpl > > const * eventSetupImpls() const
void selectProducts(ProductRegistry const &preg, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, std::map< BranchID, bool > &keepAssociation)
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
static void fillDroppedToKept(ProductRegistry const &preg, std::map< BranchID, BranchDescription const *> const &trueBranchIDToKeptBranchDesc, std::map< BranchID::value_type, BranchID::value_type > &droppedBranchIDToKeptBranchID_)
void deleteLumiFromCache(LuminosityBlockPrincipal &)
std::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &, ActivityRegistry *, ParameterSet const *eventSetupPset=nullptr, unsigned int maxConcurrentIOVs=0, bool dumpOptions=false)
ProductSelectorRules productSelectorRules_
unsigned int numberOfStreams() const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
edm::propagate_const< std::unique_ptr< ParameterSet > > processParameterSet_
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector, ConsumesCollector &&iC)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
void propagateProducts(BranchType type, Principal const &parentPrincipal, Principal &principal) const
bool selected(BranchDescription const &desc) const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
void writeProcessBlockAsync(edm::WaitingTaskHolder task, ProcessBlockType)
Log< level::Info, false > LogInfo
std::unique_ptr< ExceptionToActionTable const > act_table_
SubProcess(ParameterSet ¶meterSet, ParameterSet const &topLevelParameterSet, std::shared_ptr< ProductRegistry const > parentProductRegistry, std::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, ProcessBlockHelperBase const &parentProcessBlockHelper, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, SubProcessParentageHelper const &parentSubProcessParentageHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext)
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &parentPhID, int runNumber, MergeableRunProductMetadata const *)
ProductSelector productSelector_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
detail::TriggerResultsBasedEventSelector selectors_
std::string const & processName() const
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &)
unsigned int numberOfLuminosityBlocks() const
ProcessHistory const & processHistory() const
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &)
void connectToSubProcess(ActivityRegistry &iOther)
std::map< ProcessHistoryID, ProcessHistoryID > parentToChildPhID_
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::shared_ptr< ActivityRegistry > actReg_
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
ProductProvenanceRetriever const * productProvenanceRetrieverPtr() const
void doEndRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
void doBeginRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const &iTransitionInfo)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ProcessBlockPrincipal & principal()
std::vector< HistoryAppender > historyAppenders_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
std::vector< BranchID::value_type > BranchIDList
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< ParameterSet > popVParameterSet(std::string const &name)
void selectAssociationProducts(std::vector< BranchDescription const *> const &associationDescriptions, std::set< BranchID > const &keptProductsInEvent, std::map< BranchID, bool > &keepAssociation) const
void doBeginStream(unsigned int)
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< ProductRegistry const > parentPreg_
LuminosityBlockIndex index() const
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
ProcessHistoryID const & processHistoryID() const
EventSelectionIDVector const & eventSelectionIDs() const
ProcessContext processContext_
RunAuxiliary const & aux() const
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
LuminosityBlockAuxiliary const & aux() const
unsigned int value() const
std::vector< ModuleProcessName > keepOnlyConsumedUnscheduledModules(bool deleteModules)
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
BranchListIndexes const & branchListIndexes() const
void keepThisBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const *> &trueBranchIDToKeptBranchDesc, std::set< BranchID > &keptProductsInEvent)
RunPrincipal & principal()
void doEndLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::shared_ptr< ProductRegistry const > preg_
PrincipalCache principalCache_
edm::propagate_const< std::shared_ptr< SubProcessParentageHelper > > subProcessParentageHelper_
DelayedReader * reader() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
ConstProductResolverPtr getProductResolver(BranchID const &oid) const
def merge(dictlist, TELL=False)