45 #include "boost/range/adaptor/reversed.hpp" 55 std::shared_ptr<ProductRegistry const> parentProductRegistry,
56 std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
69 parentPreg_(parentProductRegistry),
71 branchIDListHelper_(),
73 processConfiguration_(),
75 historyRunOffset_(historyLumiOffset_ + preallocConfig.numberOfLuminosityBlocks()),
76 processHistoryRegistries_(historyRunOffset_ + preallocConfig.numberOfRuns()),
77 historyAppenders_(historyRunOffset_ + preallocConfig.numberOfRuns()),
82 processParameterSet_(),
83 productSelectorRules_(
parameterSet,
"outputCommands",
"OutputModule"),
85 wantAllEvents_(
true) {
94 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
96 selectevents,
"", outputModulePathPositions, parentProductRegistry->anyProductProduced());
98 std::map<BranchID, bool> keepAssociation;
99 selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
121 if (topLevelParameterSet.
exists(maxLumis)) {
128 bool hasSubProcesses = !subProcessVParameterSet.empty();
187 auto ep = std::make_shared<EventPrincipal>(
preg_,
199 auto rpp = std::make_unique<RunPrincipal>(
205 auto lbpp = std::make_unique<LuminosityBlockPrincipal>(
222 for (
auto& subProcessPSet : subProcessVParameterSet) {
224 topLevelParameterSet,
250 std::vector<ModuleProcessName> consumedByChildren;
252 auto c = subProcess.keepOnlyConsumedUnscheduledModules(deleteModules);
253 if (consumedByChildren.empty()) {
255 }
else if (not
c.empty()) {
256 std::vector<ModuleProcessName>
tmp;
257 tmp.reserve(consumedByChildren.size() +
c.size());
258 std::merge(consumedByChildren.begin(), consumedByChildren.end(),
c.begin(),
c.end(), std::back_inserter(
tmp));
266 not unusedModules.empty()) {
269 edm::LogInfo(
"DeleteModules").log([&unusedModules,
this](
auto&
l) {
270 l <<
"Following modules are not in any Path or EndPath, nor is their output consumed by any other module, " 272 "therefore they are deleted from SubProcess " 286 for (
auto const& dep :
289 consumedByChildren.end(),
291 consumedByChildren.emplace(
it, dep.moduleLabel(), dep.processName());
294 return consumedByChildren;
313 std::exception_ptr firstException;
318 firstException = std::current_exception();
321 CMS_SA_ALLOW try { subProcess.doBeginJob(); }
catch (...) {
322 if (!firstException) {
323 firstException = std::current_exception();
327 if (firstException) {
328 std::rethrow_exception(firstException);
340 subProcess.doEndJob(collector);
346 std::map<BranchID, bool>& keepAssociation) {
355 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
356 std::vector<BranchDescription const*> associationDescriptions;
357 std::set<BranchID> keptProductsInEvent;
361 if (
desc.transient()) {
363 }
else if (!
desc.present() && !
desc.produced()) {
367 associationDescriptions.push_back(&
desc);
374 associationDescriptions, keptProductsInEvent, keepAssociation);
387 std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
388 std::set<BranchID>& keptProductsInEvent) {
392 if (
desc.produced()) {
393 keptProductsInEvent.insert(
desc.originalBranchID());
395 keptProductsInEvent.insert(
desc.branchID());
406 std::map<BranchID::value_type, BranchID::value_type>
const& droppedBranchIDToKeptBranchID) {
411 std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
414 branchID = iter->second;
425 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
441 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
452 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
455 bool deepCopyRetriever =
false;
456 ep.fillEventPrincipal(
474 subProcess.doEventAsync(nextTask,
ep, iEventSetupImpls);
476 }) |
chain::then([&
ep](std::exception_ptr
const* iPtr,
auto nextTask) {
477 ep.clearEventPrincipal();
479 nextTask.doneWaiting(*iPtr);
485 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>>(
496 beginGlobalTransitionAsync<Traits>(
501 void SubProcess::doBeginProcessBlockAsync<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>>(
517 bool cleaningUpAfterException) {
531 chain::first([&](
const std::exception_ptr*,
auto nextTask) {
533 beginGlobalTransitionAsync<TraitsInput>(
std::move(nextTask),
538 cleaningUpAfterException);
540 chain::then([
this,
info = transitionInfo, cleaningUpAfterException](std::exception_ptr
const* iPtr,
541 auto nextTask)
mutable {
548 nextTask.doneWaiting(*iPtr);
551 endGlobalTransitionAsync<Traits>(
558 endGlobalTransitionAsync<Traits>(
567 auto aux = parentPrincipal.
aux();
573 processHistoryRegistry.registerProcessHistory(parentPrincipal.
processHistory());
574 rpp->fillRunPrincipal(processHistoryRegistry, parentPrincipal.
reader());
579 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
580 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[
esp_->subProcessIndex()]), eventSetupImpls);
587 bool cleaningUpAfterException) {
592 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
593 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[
esp_->subProcessIndex()]), eventSetupImpls);
595 endGlobalTransitionAsync<Traits>(
601 chain::first([&](std::exception_ptr
const*,
auto nextTask) {
608 s.writeProcessBlockAsync(nextTask, processBlockType);
619 chain::first([&](std::exception_ptr
const*,
auto nextTask) {
625 s.writeRunAsync(nextTask, *rp, mergeableRunProductMetadata);
634 s.clearRunPrincipal(*rp);
636 rp->clearPrincipal();
643 s.clearProcessBlockPrincipal(processBlockType);
651 auto aux = parentPrincipal.
aux();
657 processHistoryRegistry.registerProcessHistory(parentPrincipal.
processHistory());
658 lbpp->fillLuminosityBlockPrincipal(&parentPrincipal.
processHistory(), parentPrincipal.
reader());
663 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
671 bool cleaningUpAfterException) {
676 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
679 endGlobalTransitionAsync<Traits>(
687 chain::first([&](std::exception_ptr
const*,
auto nextTask) {
693 s.writeLumiAsync(nextTask, *
l);
702 s.clearLumiPrincipal(*lb);
704 lb->setRunPrincipal(std::shared_ptr<RunPrincipal>());
705 lb->clearPrincipal();
710 std::exception_ptr exceptionPtr;
712 exceptionPtr = std::current_exception();
716 CMS_SA_ALLOW try { subProcess.doBeginStream(streamID); }
catch (...) {
718 exceptionPtr = std::current_exception();
723 std::rethrow_exception(exceptionPtr);
731 schedule_->endStream(streamID, collector, collectorMutex);
733 subProcess.doEndStream(streamID, collector, collectorMutex);
745 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
746 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[
esp_->subProcessIndex()]), eventSetupImpls);
747 beginStreamTransitionAsync<Traits>(
754 bool cleaningUpAfterException) {
760 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
761 RunTransitionInfo transitionInfo(rp, *((*eventSetupImpls)[
esp_->subProcessIndex()]), eventSetupImpls);
762 endStreamTransitionAsync<Traits>(
772 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
774 beginStreamTransitionAsync<Traits>(
781 bool cleaningUpAfterException) {
784 std::vector<std::shared_ptr<const EventSetupImpl>>
const* eventSetupImpls = iTransitionInfo.
eventSetupImpls();
786 endStreamTransitionAsync<Traits>(
792 for (
auto const&
item : keptVector) {
795 if (parentProductResolver !=
nullptr) {
797 if (productResolver !=
nullptr) {
799 productResolver->
connectTo(*parentProductResolver, &parentPrincipal);
807 for (
auto const&
item : keptVector) {
811 if (parentProductResolver !=
nullptr) {
813 if (productResolver !=
nullptr) {
814 if (parentProductResolver->branchDescription().produced()) {
826 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
838 std::vector<std::string> subProcesses =
840 if (!subProcesses.empty()) {
unsigned int historyRunOffset_
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &, bool cleaningUpAfterException)
unsigned int historyLumiOffset_
ParameterSetID selector_config_id_
std::vector< BranchDescription const * > allBranchDescriptions() const
ProductResolverBase * getModifiableProductResolver(BranchID const &oid)
void addException(cms::Exception const &exception)
bool wantEvent(EventForOutput const &e)
void endJob(ExceptionCollector &)
void respondToOpenInputFile(FileBlock const &fb)
SelectedProductsForBranchType const & keptProducts() const
std::vector< BranchIDList > BranchIDLists
std::vector< ModuleDescription const * > const & allModules() const
EventAuxiliary const & aux() const
std::unique_ptr< ParameterSet > popParameterSet(std::string const &name)
std::vector< std::string > const & getAllTriggerNames()
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
ProductList const & productList() const
LuminosityBlockPrincipal & principal()
void clearLumiPrincipal(LuminosityBlockPrincipal &)
bool parentProducedProductIsKept(Principal const &parentPrincipal, Principal &principal) const
int merge(int argc, char *argv[])
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
void clearProcessBlockPrincipal(ProcessBlockType)
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
ProductRegistry const & productRegistry() const
std::vector< SubProcess > subProcesses_
edm::propagate_const< std::shared_ptr< SubProcessBlockHelper > > processBlockHelper_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
void writeRunAsync(WaitingTaskHolder, RunPrincipal const &, MergeableRunProductMetadata const *)
void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &, bool iPrefetchMayGet)
RunPrincipal const & runPrincipal() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
bool exists(std::string const ¶meterName) const
checks if a parameter exists
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &, bool cleaningUpAfterException)
ProcessBlockPrincipal & inputProcessBlockPrincipal() const
std::vector< ModuleProcessName > const & modulesInPreviousProcessesWhoseProductsAreConsumedBy(unsigned int moduleID) const
void updateBranchIDListHelper(BranchIDLists const &)
void doBeginStream(unsigned int streamID)
LuminosityBlockPrincipal const & luminosityBlockPrincipal() const
void processAsync(WaitingTaskHolder iHolder, EventPrincipal const &e, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
std::string_view moduleLabel() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::shared_ptr< RunPrincipal > getAvailableRunPrincipalPtr()
void initialize(ProductSelectorRules const &rules, std::vector< BranchDescription const *> const &branchDescriptions)
void setParentProcessContext(ProcessContext const *parentProcessContext)
StreamID streamID() const
SubProcess(ParameterSet ¶meterSet, ParameterSet const &topLevelParameterSet, std::shared_ptr< ProductRegistry const > parentProductRegistry, std::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, ProcessBlockHelperBase const &parentProcessBlockHelper, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, SubProcessParentageHelper const &parentSubProcessParentageHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext, ModuleTypeResolverMaker const *typeResolverMaker)
ParameterSet const & parameterSet(StableProvenance const &provenance, ProcessHistory const &history)
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
constexpr auto then(O &&iO)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
void doEndProcessBlockAsync(WaitingTaskHolder iHolder, ProcessBlockTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
void swap(Association< C > &lhs, Association< C > &rhs)
std::vector< EventSelectionID > EventSelectionIDVector
std::tuple< layerClusterToCaloParticle, caloParticleToLayerCluster > association
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
EventToProcessBlockIndexes const & eventToProcessBlockIndexes() const
std::vector< std::shared_ptr< RunPrincipal > > inUseRunPrincipals_
EDGetTokenT< ProductType > consumes(edm::InputTag const &tag)
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
void fillProcessBlockPrincipal(std::string const &processName, DelayedReader *reader=nullptr)
bool anyProductProduced() const
ProcessBlockPrincipal & processBlockPrincipal() const
T getUntrackedParameter(std::string const &, T const &) const
void doEventAsync(WaitingTaskHolder iHolder, EventPrincipal const &principal, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
virtual void connectTo(ProductResolverBase const &, Principal const *)=0
ServiceToken serviceToken_
std::vector< BranchListIndex > BranchListIndexes
std::vector< std::shared_ptr< LuminosityBlockPrincipal > > inUseLumiPrincipals_
ParameterSet const & registerIt()
void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const &iTransitionInfo)
auto runLast(edm::WaitingTaskHolder iTask)
static void checkForDuplicateKeptBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const *> &trueBranchIDToKeptBranchDesc)
SelectedProductsForBranchType keptProducts_
std::vector< std::shared_ptr< const EventSetupImpl > > const * eventSetupImpls() const
void selectProducts(ProductRegistry const &preg, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, std::map< BranchID, bool > &keepAssociation)
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
void connectToSubProcess(ActivityRegistry &iOther)
static void fillDroppedToKept(ProductRegistry const &preg, std::map< BranchID, BranchDescription const *> const &trueBranchIDToKeptBranchDesc, std::map< BranchID::value_type, BranchID::value_type > &droppedBranchIDToKeptBranchID_)
std::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &, ActivityRegistry *, ParameterSet const *eventSetupPset=nullptr, unsigned int maxConcurrentIOVs=0, bool dumpOptions=false)
ProductSelectorRules productSelectorRules_
unsigned int numberOfStreams() const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
std::vector< ModuleDescription const * > nonConsumedUnscheduledModules(edm::PathsAndConsumesOfModulesBase const &iPnC, std::vector< ModuleProcessName > &consumedByChildren)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
edm::propagate_const< std::unique_ptr< ParameterSet > > processParameterSet_
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector, ConsumesCollector &&iC)
constexpr auto ifThen(bool iValue, O &&iO)
Only runs this task if the condition (which is known at the call time) is true. If false...
void propagateProducts(BranchType type, Principal const &parentPrincipal, Principal &principal) const
bool selected(BranchDescription const &desc) const
void doEndJob(ExceptionCollector &)
unsigned int numberOfRuns() const
void insert(std::unique_ptr< ProcessBlockPrincipal >)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::vector< std::shared_ptr< const EventSetupImpl > > const * eventSetupImpls() const
void writeProcessBlockAsync(edm::WaitingTaskHolder task, ProcessBlockType)
Log< level::Info, false > LogInfo
std::unique_ptr< ExceptionToActionTable const > act_table_
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
ProductSelector productSelector_
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
detail::TriggerResultsBasedEventSelector selectors_
std::string const & processName() const
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LumiTransitionInfo const &)
unsigned int numberOfLuminosityBlocks() const
ProcessHistory const & processHistory() const
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const &)
std::shared_ptr< ActivityRegistry > actReg_
void insertForInput(std::unique_ptr< ProcessBlockPrincipal >)
ProductProvenanceRetriever const * productProvenanceRetrieverPtr() const
void doEndRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
void doBeginRunAsync(WaitingTaskHolder iHolder, RunTransitionInfo const &iTransitionInfo)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ProcessBlockPrincipal & principal()
void doEndStream(unsigned int streamID, ExceptionCollector &collector, std::mutex &collectorMutex) noexcept
std::vector< HistoryAppender > historyAppenders_
std::vector< BranchID::value_type > BranchIDList
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
std::vector< ParameterSet > popVParameterSet(std::string const &name)
void selectAssociationProducts(std::vector< BranchDescription const *> const &associationDescriptions, std::set< BranchID > const &keptProductsInEvent, std::map< BranchID, bool > &keepAssociation) const
void clearRunPrincipal(RunPrincipal &)
void removeModules(std::vector< ModuleDescription const *> const &modules)
std::shared_ptr< ProductRegistry const > parentPreg_
LuminosityBlockIndex index() const
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
ProcessHistoryID const & processHistoryID() const
EventSelectionIDVector const & eventSelectionIDs() const
ProcessContext processContext_
RunAuxiliary const & aux() const
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
LuminosityBlockAuxiliary const & aux() const
unsigned int value() const
std::vector< ModuleProcessName > keepOnlyConsumedUnscheduledModules(bool deleteModules)
auto wrap(F iFunc) -> decltype(iFunc())
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
BranchListIndexes const & branchListIndexes() const
void keepThisBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const *> &trueBranchIDToKeptBranchDesc, std::set< BranchID > &keptProductsInEvent)
RunPrincipal & principal()
void doEndLuminosityBlockAsync(WaitingTaskHolder iHolder, LumiTransitionInfo const &iTransitionInfo, bool cleaningUpAfterException)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::shared_ptr< ProductRegistry const > preg_
PrincipalCache principalCache_
edm::propagate_const< std::shared_ptr< SubProcessParentageHelper > > subProcessParentageHelper_
DelayedReader * reader() const
std::shared_ptr< ProcessConfiguration const > processConfiguration_
ConstProductResolverPtr getProductResolver(BranchID const &oid) const