43 #include "boost/range/adaptor/reversed.hpp" 53 std::shared_ptr<ProductRegistry const> parentProductRegistry,
54 std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
67 parentPreg_(parentProductRegistry),
69 branchIDListHelper_(),
71 processConfiguration_(),
73 historyRunOffset_(historyLumiOffset_ + preallocConfig.numberOfLuminosityBlocks()),
74 processHistoryRegistries_(historyRunOffset_ + preallocConfig.numberOfRuns()),
75 historyAppenders_(historyRunOffset_ + preallocConfig.numberOfRuns()),
80 processParameterSet_(),
81 productSelectorRules_(
parameterSet,
"outputCommands",
"OutputModule"),
83 wantAllEvents_(
true) {
92 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
94 selectevents,
"", outputModulePathPositions, parentProductRegistry->anyProductProduced());
96 std::map<BranchID, bool> keepAssociation;
97 selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
119 if (topLevelParameterSet.
exists(maxLumis)) {
126 bool hasSubProcesses = !subProcessVParameterSet.empty();
185 auto ep = std::make_shared<EventPrincipal>(
preg_,
197 auto rpp = std::make_unique<RunPrincipal>(
203 auto lbpp = std::make_unique<LuminosityBlockPrincipal>(
220 for (
auto& subProcessPSet : subProcessVParameterSet) {
222 topLevelParameterSet,
248 std::vector<ModuleProcessName> consumedByChildren;
250 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
251 if (consumedByChildren.empty()) {
253 }
else if (not
c.empty()) {
254 std::vector<ModuleProcessName>
tmp;
255 tmp.reserve(consumedByChildren.size() +
c.size());
256 std::merge(consumedByChildren.begin(), consumedByChildren.end(),
c.begin(),
c.end(), std::back_inserter(
tmp));
264 not unusedModules.empty()) {
267 edm::LogInfo(
"DeleteModules").log([&unusedModules,
this](
auto&
l) {
268 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, " 270 "therefore they are deleted from SubProcess " 284 for (
auto const& dep :
287 consumedByChildren.end(),
289 consumedByChildren.emplace(it, dep.moduleLabel(), dep.processName());
292 return consumedByChildren;
318 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
321 c.call([&subProcess]() { subProcess.doEndJob(); });
330 std::map<BranchID, bool>& keepAssociation) {
339 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
340 std::vector<BranchDescription const*> associationDescriptions;
341 std::set<BranchID> keptProductsInEvent;
345 if (
desc.transient()) {
347 }
else if (!
desc.present() && !
desc.produced()) {
351 associationDescriptions.push_back(&
desc);
358 associationDescriptions, keptProductsInEvent, keepAssociation);
371 std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
372 std::set<BranchID>& keptProductsInEvent) {
376 if (
desc.produced()) {
377 keptProductsInEvent.insert(
desc.originalBranchID());
379 keptProductsInEvent.insert(
desc.branchID());
390 std::map<BranchID::value_type, BranchID::value_type>
const& droppedBranchIDToKeptBranchID) {
395 std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
398 branchID = iter->second;
409 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
425 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
436 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
439 bool deepCopyRetriever =
false;
440 ep.fillEventPrincipal(
458 subProcess.doEventAsync(nextTask,
ep, iEventSetupImpls);
460 }) |
chain::then([&
ep](std::exception_ptr
const* iPtr,
auto nextTask) {
461 ep.clearEventPrincipal();
463 nextTask.doneWaiting(*iPtr);
469 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>>(
480 beginGlobalTransitionAsync<Traits>(
485 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>>(
501 bool cleaningUpAfterException) {
515 chain::first([&](
const std::exception_ptr*,
auto nextTask) {
517 beginGlobalTransitionAsync<TraitsInput>(
std::move(nextTask),
522 cleaningUpAfterException);
524 chain::then([
this,
info = transitionInfo, cleaningUpAfterException](std::exception_ptr
const* iPtr,
525 auto nextTask)
mutable {
532 nextTask.doneWaiting(*iPtr);
535 endGlobalTransitionAsync<Traits>(
542 endGlobalTransitionAsync<Traits>(
551 auto aux = parentPrincipal.
aux();
557 processHistoryRegistry.registerProcessHistory(parentPrincipal.
processHistory());
558 rpp->fillRunPrincipal(processHistoryRegistry, parentPrincipal.
reader());
563 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
564 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[
esp_->subProcessIndex()]), eventSetupImpls);
571 bool cleaningUpAfterException) {
576 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
577 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[
esp_->subProcessIndex()]), eventSetupImpls);
579 endGlobalTransitionAsync<Traits>(
585 chain::first([&](std::exception_ptr
const*,
auto nextTask) {
592 s.writeProcessBlockAsync(nextTask, processBlockType);
603 chain::first([&](std::exception_ptr
const*,
auto nextTask) {
609 s.writeRunAsync(nextTask, *rp, mergeableRunProductMetadata);
618 s.clearRunPrincipal(*rp);
620 rp->clearPrincipal();
627 s.clearProcessBlockPrincipal(processBlockType);
635 auto aux = parentPrincipal.
aux();
641 processHistoryRegistry.registerProcessHistory(parentPrincipal.
processHistory());
642 lbpp->fillLuminosityBlockPrincipal(&parentPrincipal.
processHistory(), parentPrincipal.
reader());
647 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
655 bool cleaningUpAfterException) {
660 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
663 endGlobalTransitionAsync<Traits>(
671 chain::first([&](std::exception_ptr
const*,
auto nextTask) {
677 s.writeLumiAsync(nextTask, *
l);
686 s.clearLumiPrincipal(*lb);
688 lb->setRunPrincipal(std::shared_ptr<RunPrincipal>());
689 lb->clearPrincipal();
712 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
713 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[
esp_->subProcessIndex()]), eventSetupImpls);
714 beginStreamTransitionAsync<Traits>(
721 bool cleaningUpAfterException) {
727 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
728 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[
esp_->subProcessIndex()]), eventSetupImpls);
729 endStreamTransitionAsync<Traits>(
739 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
741 beginStreamTransitionAsync<Traits>(
748 bool cleaningUpAfterException) {
751 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
753 endStreamTransitionAsync<Traits>(
759 for (
auto const&
item : keptVector) {
762 if (parentProductResolver !=
nullptr) {
764 if (productResolver !=
nullptr) {
766 productResolver->
connectTo(*parentProductResolver, &parentPrincipal);
774 for (
auto const&
item : keptVector) {
778 if (parentProductResolver !=
nullptr) {
780 if (productResolver !=
nullptr) {
781 if (parentProductResolver->branchDescription().produced()) {
793 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
805 std::vector<std::string> subProcesses =
807 if (!subProcesses.empty()) {
unsigned int historyRunOffset_
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &, bool cleaningUpAfterException)
unsigned int historyLumiOffset_
ParameterSetID selector_config_id_
std::vector< BranchDescription const * > allBranchDescriptions() const
ProductResolverBase * getModifiableProductResolver(BranchID const &oid)
bool wantEvent(EventForOutput const &e)
void respondToOpenInputFile(FileBlock const &fb)
SelectedProductsForBranchType const & keptProducts() const
std::vector< BranchIDList > BranchIDLists
std::vector< ModuleDescription const * > const & allModules() const
EventAuxiliary const & aux() const
std::unique_ptr< ParameterSet > popParameterSet(std::string const &name)
std::vector< std::string > const & getAllTriggerNames()
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
ProductList const & productList() const
LuminosityBlockPrincipal & principal()
void clearLumiPrincipal(LuminosityBlockPrincipal &)
bool parentProducedProductIsKept(Principal const &parentPrincipal, Principal &principal) const
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
void clearProcessBlockPrincipal(ProcessBlockType)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
ProductRegistry const & productRegistry() const
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::shared_ptr< SubProcessBlockHelper > > processBlockHelper_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &, bool iPrefetchMayGet)
RunPrincipal const & runPrincipal() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
bool exists(std::string const ¶meterName) const
checks if a parameter exists
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &, bool cleaningUpAfterException)
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< ModuleProcessName > const & modulesInPreviousProcessesWhoseProductsAreConsumedBy(unsigned int moduleID) const
void updateBranchIDListHelper(BranchIDLists const &)
LuminosityBlockPrincipal const & luminosityBlockPrincipal() const
void processAsync(WaitingTaskHolder iHolder, EventPrincipal const &e, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
std::string_view moduleLabel() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void doEndStream(unsigned int)
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
void initialize(ProductSelectorRules const &rules, std::vector< BranchDescription const *> const &branchDescriptions)
void setParentProcessContext(ProcessContext const *parentProcessContext)
StreamID streamID() const
SubProcess(ParameterSet ¶meterSet, ParameterSet const &topLevelParameterSet, std::shared_ptr< ProductRegistry const > parentProductRegistry, std::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, ProcessBlockHelperBase const &parentProcessBlockHelper, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, SubProcessParentageHelper const &parentSubProcessParentageHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext, ModuleTypeResolverMaker const *typeResolverMaker)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
constexpr auto then(O &&iO)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void doEndProcessBlockAsync(WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
std::vector< EventSelectionID > EventSelectionIDVector
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
EventToProcessBlockIndexes const & eventToProcessBlockIndexes() const
std::vector< std::shared_ptr< RunPrincipal > > inUseRunPrincipals_
EDGetTokenT< ProductType > consumes(edm::InputTag const &tag)
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
bool anyProductProduced() const
ProcessBlockPrincipal & processBlockPrincipal() const
T getUntrackedParameter(std::string const &, T const &) const
void doEventAsync(WaitingTaskHolder iHolder, EventPrincipal const &principal, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
virtual void connectTo(ProductResolverBase const &, Principal const *)=0
ServiceToken serviceToken_
std::vector< BranchListIndex > BranchListIndexes
std::vector< std::shared_ptr< LuminosityBlockPrincipal > > inUseLumiPrincipals_
ParameterSet const & registerIt()
void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const &iTransitionInfo)
std::tuple< layerClusterToCaloParticle, caloParticleToLayerCluster > association
auto runLast(edm::WaitingTaskHolder iTask)
static void checkForDuplicateKeptBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const *> &trueBranchIDToKeptBranchDesc)
SelectedProductsForBranchType keptProducts_
std::vector< std::shared_ptr< const EventSetupImpl > > const * eventSetupImpls() const
void selectProducts(ProductRegistry const &preg, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, std::map< BranchID, bool > &keepAssociation)
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
static void fillDroppedToKept(ProductRegistry const &preg, std::map< BranchID, BranchDescription const *> const &trueBranchIDToKeptBranchDesc, std::map< BranchID::value_type, BranchID::value_type > &droppedBranchIDToKeptBranchID_)
int merge(int argc, char *argv[])
std::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &, ActivityRegistry *, ParameterSet const *eventSetupPset=nullptr, unsigned int maxConcurrentIOVs=0, bool dumpOptions=false)
ProductSelectorRules productSelectorRules_
unsigned int numberOfStreams() const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
edm::propagate_const< std::unique_ptr< ParameterSet > > processParameterSet_
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector, ConsumesCollector &&iC)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
void propagateProducts(BranchType type, Principal const &parentPrincipal, Principal &principal) const
bool selected(BranchDescription const &desc) const
unsigned int numberOfRuns() const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::vector< std::shared_ptr< const EventSetupImpl > > const * eventSetupImpls() const
void writeProcessBlockAsync(edm::WaitingTaskHolder task, ProcessBlockType)
Log< level::Info, false > LogInfo
std::unique_ptr< ExceptionToActionTable const > act_table_
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
ProductSelector productSelector_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
detail::TriggerResultsBasedEventSelector selectors_
std::string const & processName() const
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &)
unsigned int numberOfLuminosityBlocks() const
ProcessHistory const & processHistory() const
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &)
void connectToSubProcess(ActivityRegistry &iOther)
std::shared_ptr< ActivityRegistry > actReg_
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
ProductProvenanceRetriever const * productProvenanceRetrieverPtr() const
void doEndRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
void doBeginRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const &iTransitionInfo)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ProcessBlockPrincipal & principal()
std::vector< HistoryAppender > historyAppenders_
std::vector< BranchID::value_type > BranchIDList
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< ParameterSet > popVParameterSet(std::string const &name)
void selectAssociationProducts(std::vector< BranchDescription const *> const &associationDescriptions, std::set< BranchID > const &keptProductsInEvent, std::map< BranchID, bool > &keepAssociation) const
void clearRunPrincipal(RunPrincipal &)
void doBeginStream(unsigned int)
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< ProductRegistry const > parentPreg_
LuminosityBlockIndex index() const
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
ProcessHistoryID const & processHistoryID() const
EventSelectionIDVector const & eventSelectionIDs() const
ProcessContext processContext_
RunAuxiliary const & aux() const
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
LuminosityBlockAuxiliary const & aux() const
unsigned int value() const
std::vector< ModuleProcessName > keepOnlyConsumedUnscheduledModules(bool deleteModules)
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
BranchListIndexes const & branchListIndexes() const
void keepThisBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const *> &trueBranchIDToKeptBranchDesc, std::set< BranchID > &keptProductsInEvent)
RunPrincipal & principal()
void doEndLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::shared_ptr< ProductRegistry const > preg_
PrincipalCache principalCache_
edm::propagate_const< std::shared_ptr< SubProcessParentageHelper > > subProcessParentageHelper_
DelayedReader * reader() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
ConstProductResolverPtr getProductResolver(BranchID const &oid) const