43 #include "boost/range/adaptor/reversed.hpp" 53 std::shared_ptr<ProductRegistry const> parentProductRegistry,
54 std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
66 parentPreg_(parentProductRegistry),
68 branchIDListHelper_(),
70 processConfiguration_(),
72 historyRunOffset_(historyLumiOffset_ + preallocConfig.numberOfLuminosityBlocks()),
73 processHistoryRegistries_(historyRunOffset_ + preallocConfig.numberOfRuns()),
74 historyAppenders_(historyRunOffset_ + preallocConfig.numberOfRuns()),
79 processParameterSet_(),
80 productSelectorRules_(
parameterSet,
"outputCommands",
"OutputModule"),
82 wantAllEvents_(
true) {
91 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
93 selectevents,
"", outputModulePathPositions, parentProductRegistry->anyProductProduced());
95 std::map<BranchID, bool> keepAssociation;
96 selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
118 if (topLevelParameterSet.
exists(maxLumis)) {
125 bool hasSubProcesses = subProcessVParameterSet.size() != 0ull;
180 auto ep = std::make_shared<EventPrincipal>(
preg_,
192 auto rpp = std::make_unique<RunPrincipal>(
198 auto lbpp = std::make_unique<LuminosityBlockPrincipal>(
215 for (
auto& subProcessPSet : subProcessVParameterSet) {
217 topLevelParameterSet,
242 std::vector<ModuleProcessName> consumedByChildren;
244 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
245 if (consumedByChildren.empty()) {
247 }
else if (not
c.empty()) {
248 std::vector<ModuleProcessName>
tmp;
249 tmp.reserve(consumedByChildren.size() +
c.size());
250 std::merge(consumedByChildren.begin(), consumedByChildren.end(),
c.begin(),
c.end(), std::back_inserter(
tmp));
258 not unusedModules.empty()) {
261 edm::LogInfo(
"DeleteModules").log([&unusedModules,
this](
auto&
l) {
262 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, " 264 "therefore they are deleted from SubProcess " 278 for (
auto const& dep :
281 consumedByChildren.end(),
283 consumedByChildren.emplace(it, dep.moduleLabel(), dep.processName());
286 return consumedByChildren;
312 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
315 c.call([&subProcess]() { subProcess.doEndJob(); });
324 std::map<BranchID, bool>& keepAssociation) {
333 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
334 std::vector<BranchDescription const*> associationDescriptions;
335 std::set<BranchID> keptProductsInEvent;
339 if (
desc.transient()) {
341 }
else if (!
desc.present() && !
desc.produced()) {
345 associationDescriptions.push_back(&
desc);
352 associationDescriptions, keptProductsInEvent, keepAssociation);
365 std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
366 std::set<BranchID>& keptProductsInEvent) {
370 if (
desc.produced()) {
371 keptProductsInEvent.insert(
desc.originalBranchID());
373 keptProductsInEvent.insert(
desc.branchID());
384 std::map<BranchID::value_type, BranchID::value_type>
const& droppedBranchIDToKeptBranchID) {
389 std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
392 branchID = iter->second;
403 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
419 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
430 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
433 bool deepCopyRetriever =
false;
434 ep.fillEventPrincipal(
452 subProcess.doEventAsync(nextTask,
ep, iEventSetupImpls);
454 }) |
chain::then([&
ep](std::exception_ptr
const* iPtr,
auto nextTask) {
455 ep.clearEventPrincipal();
457 nextTask.doneWaiting(*iPtr);
463 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>>(
474 beginGlobalTransitionAsync<Traits>(
479 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>>(
495 bool cleaningUpAfterException) {
509 chain::first([&](
const std::exception_ptr*,
auto nextTask) {
511 beginGlobalTransitionAsync<TraitsInput>(
std::move(nextTask),
516 cleaningUpAfterException);
518 chain::then([
this,
info = transitionInfo, cleaningUpAfterException](std::exception_ptr
const* iPtr,
519 auto nextTask)
mutable {
526 nextTask.doneWaiting(*iPtr);
529 endGlobalTransitionAsync<Traits>(
536 endGlobalTransitionAsync<Traits>(
545 auto aux = parentPrincipal.
aux();
551 processHistoryRegistry.registerProcessHistory(parentPrincipal.
processHistory());
552 rpp->fillRunPrincipal(processHistoryRegistry, parentPrincipal.
reader());
557 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
558 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[
esp_->subProcessIndex()]), eventSetupImpls);
565 bool cleaningUpAfterException) {
570 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
571 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[
esp_->subProcessIndex()]), eventSetupImpls);
573 endGlobalTransitionAsync<Traits>(
579 chain::first([&](std::exception_ptr
const*,
auto nextTask) {
586 s.writeProcessBlockAsync(nextTask, processBlockType);
597 chain::first([&](std::exception_ptr
const*,
auto nextTask) {
603 s.writeRunAsync(nextTask, *rp, mergeableRunProductMetadata);
612 s.clearRunPrincipal(*rp);
614 rp->clearPrincipal();
621 s.clearProcessBlockPrincipal(processBlockType);
629 auto aux = parentPrincipal.
aux();
635 processHistoryRegistry.registerProcessHistory(parentPrincipal.
processHistory());
636 lbpp->fillLuminosityBlockPrincipal(&parentPrincipal.
processHistory(), parentPrincipal.
reader());
641 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
649 bool cleaningUpAfterException) {
654 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
657 endGlobalTransitionAsync<Traits>(
665 chain::first([&](std::exception_ptr
const*,
auto nextTask) {
671 s.writeLumiAsync(nextTask, *
l);
680 s.clearLumiPrincipal(*lb);
682 lb->setRunPrincipal(std::shared_ptr<RunPrincipal>());
683 lb->clearPrincipal();
706 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
707 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[
esp_->subProcessIndex()]), eventSetupImpls);
708 beginStreamTransitionAsync<Traits>(
715 bool cleaningUpAfterException) {
721 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
722 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[
esp_->subProcessIndex()]), eventSetupImpls);
723 endStreamTransitionAsync<Traits>(
733 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
735 beginStreamTransitionAsync<Traits>(
742 bool cleaningUpAfterException) {
745 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
747 endStreamTransitionAsync<Traits>(
753 for (
auto const&
item : keptVector) {
756 if (parentProductResolver !=
nullptr) {
758 if (productResolver !=
nullptr) {
760 productResolver->
connectTo(*parentProductResolver, &parentPrincipal);
768 for (
auto const&
item : keptVector) {
772 if (parentProductResolver !=
nullptr) {
774 if (productResolver !=
nullptr) {
775 if (parentProductResolver->branchDescription().produced()) {
787 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
799 std::vector<std::string> subProcesses =
801 if (!subProcesses.empty()) {
unsigned int historyRunOffset_
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &, bool cleaningUpAfterException)
unsigned int historyLumiOffset_
ParameterSetID selector_config_id_
std::vector< BranchDescription const * > allBranchDescriptions() const
ProductResolverBase * getModifiableProductResolver(BranchID const &oid)
bool wantEvent(EventForOutput const &e)
void respondToOpenInputFile(FileBlock const &fb)
SelectedProductsForBranchType const & keptProducts() const
std::vector< BranchIDList > BranchIDLists
std::vector< ModuleDescription const * > const & allModules() const
EventAuxiliary const & aux() const
std::unique_ptr< ParameterSet > popParameterSet(std::string const &name)
std::vector< std::string > const & getAllTriggerNames()
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
ProductList const & productList() const
LuminosityBlockPrincipal & principal()
void clearLumiPrincipal(LuminosityBlockPrincipal &)
bool parentProducedProductIsKept(Principal const &parentPrincipal, Principal &principal) const
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
void clearProcessBlockPrincipal(ProcessBlockType)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
ProductRegistry const & productRegistry() const
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::shared_ptr< SubProcessBlockHelper > > processBlockHelper_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &, bool iPrefetchMayGet)
RunPrincipal const & runPrincipal() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
bool exists(std::string const ¶meterName) const
checks if a parameter exists
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &, bool cleaningUpAfterException)
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< ModuleProcessName > const & modulesInPreviousProcessesWhoseProductsAreConsumedBy(unsigned int moduleID) const
void updateBranchIDListHelper(BranchIDLists const &)
LuminosityBlockPrincipal const & luminosityBlockPrincipal() const
void processAsync(WaitingTaskHolder iHolder, EventPrincipal const &e, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
std::string_view moduleLabel() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void doEndStream(unsigned int)
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
void initialize(ProductSelectorRules const &rules, std::vector< BranchDescription const *> const &branchDescriptions)
void setParentProcessContext(ProcessContext const *parentProcessContext)
StreamID streamID() const
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
constexpr auto then(O &&iO)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void doEndProcessBlockAsync(WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
std::vector< EventSelectionID > EventSelectionIDVector
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
EventToProcessBlockIndexes const & eventToProcessBlockIndexes() const
std::vector< std::shared_ptr< RunPrincipal > > inUseRunPrincipals_
EDGetTokenT< ProductType > consumes(edm::InputTag const &tag)
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
bool anyProductProduced() const
ProcessBlockPrincipal & processBlockPrincipal() const
T getUntrackedParameter(std::string const &, T const &) const
void doEventAsync(WaitingTaskHolder iHolder, EventPrincipal const &principal, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
virtual void connectTo(ProductResolverBase const &, Principal const *)=0
ServiceToken serviceToken_
std::vector< BranchListIndex > BranchListIndexes
std::vector< std::shared_ptr< LuminosityBlockPrincipal > > inUseLumiPrincipals_
ParameterSet const & registerIt()
void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const &iTransitionInfo)
std::tuple< layerClusterToCaloParticle, caloParticleToLayerCluster > association
auto runLast(edm::WaitingTaskHolder iTask)
static void checkForDuplicateKeptBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const *> &trueBranchIDToKeptBranchDesc)
SelectedProductsForBranchType keptProducts_
std::vector< std::shared_ptr< const EventSetupImpl > > const * eventSetupImpls() const
void selectProducts(ProductRegistry const &preg, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, std::map< BranchID, bool > &keepAssociation)
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
static void fillDroppedToKept(ProductRegistry const &preg, std::map< BranchID, BranchDescription const *> const &trueBranchIDToKeptBranchDesc, std::map< BranchID::value_type, BranchID::value_type > &droppedBranchIDToKeptBranchID_)
int merge(int argc, char *argv[])
std::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &, ActivityRegistry *, ParameterSet const *eventSetupPset=nullptr, unsigned int maxConcurrentIOVs=0, bool dumpOptions=false)
ProductSelectorRules productSelectorRules_
unsigned int numberOfStreams() const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
edm::propagate_const< std::unique_ptr< ParameterSet > > processParameterSet_
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector, ConsumesCollector &&iC)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
void propagateProducts(BranchType type, Principal const &parentPrincipal, Principal &principal) const
bool selected(BranchDescription const &desc) const
unsigned int numberOfRuns() const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::vector< std::shared_ptr< const EventSetupImpl > > const * eventSetupImpls() const
void writeProcessBlockAsync(edm::WaitingTaskHolder task, ProcessBlockType)
Log< level::Info, false > LogInfo
std::unique_ptr< ExceptionToActionTable const > act_table_
SubProcess(ParameterSet ¶meterSet, ParameterSet const &topLevelParameterSet, std::shared_ptr< ProductRegistry const > parentProductRegistry, std::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, ProcessBlockHelperBase const &parentProcessBlockHelper, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, SubProcessParentageHelper const &parentSubProcessParentageHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext)
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
ProductSelector productSelector_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
detail::TriggerResultsBasedEventSelector selectors_
std::string const & processName() const
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &)
unsigned int numberOfLuminosityBlocks() const
ProcessHistory const & processHistory() const
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &)
void connectToSubProcess(ActivityRegistry &iOther)
std::shared_ptr< ActivityRegistry > actReg_
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
ProductProvenanceRetriever const * productProvenanceRetrieverPtr() const
void doEndRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
void doBeginRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const &iTransitionInfo)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ProcessBlockPrincipal & principal()
std::vector< HistoryAppender > historyAppenders_
std::vector< BranchID::value_type > BranchIDList
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< ParameterSet > popVParameterSet(std::string const &name)
void selectAssociationProducts(std::vector< BranchDescription const *> const &associationDescriptions, std::set< BranchID > const &keptProductsInEvent, std::map< BranchID, bool > &keepAssociation) const
void clearRunPrincipal(RunPrincipal &)
void doBeginStream(unsigned int)
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< ProductRegistry const > parentPreg_
LuminosityBlockIndex index() const
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
ProcessHistoryID const & processHistoryID() const
EventSelectionIDVector const & eventSelectionIDs() const
ProcessContext processContext_
RunAuxiliary const & aux() const
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
LuminosityBlockAuxiliary const & aux() const
unsigned int value() const
std::vector< ModuleProcessName > keepOnlyConsumedUnscheduledModules(bool deleteModules)
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
BranchListIndexes const & branchListIndexes() const
void keepThisBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const *> &trueBranchIDToKeptBranchDesc, std::set< BranchID > &keptProductsInEvent)
RunPrincipal & principal()
void doEndLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::shared_ptr< ProductRegistry const > preg_
PrincipalCache principalCache_
edm::propagate_const< std::shared_ptr< SubProcessParentageHelper > > subProcessParentageHelper_
DelayedReader * reader() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
ConstProductResolverPtr getProductResolver(BranchID const &oid) const