|
|
Go to the documentation of this file.
38 #include "boost/range/adaptor/reversed.hpp"
48 std::shared_ptr<ProductRegistry const> parentProductRegistry,
49 std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
60 parentPreg_(parentProductRegistry),
62 branchIDListHelper_(),
64 processConfiguration_(),
66 historyRunOffset_(historyLumiOffset_ + preallocConfig.numberOfLuminosityBlocks()),
67 processHistoryRegistries_(historyRunOffset_ + preallocConfig.numberOfRuns()),
68 historyAppenders_(historyRunOffset_ + preallocConfig.numberOfRuns()),
74 processParameterSet_(),
75 productSelectorRules_(
parameterSet,
"outputCommands",
"OutputModule"),
77 wantAllEvents_(
true) {
86 std::map<std::string, std::vector<std::pair<std::string, int>>> outputModulePathPositions;
88 selectevents,
"", outputModulePathPositions, parentProductRegistry->anyProductProduced());
90 std::map<BranchID, bool> keepAssociation;
91 selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
113 if (topLevelParameterSet.
exists(maxLumis)) {
120 bool hasSubProcesses = subProcessVParameterSet.size() != 0ull;
177 auto ep = std::make_shared<EventPrincipal>(
preg_,
187 auto lbpp = std::make_unique<LuminosityBlockPrincipal>(
195 for (
auto& subProcessPSet : subProcessVParameterSet) {
197 topLevelParameterSet,
240 "Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
243 c.call([&subProcess]() { subProcess.doEndJob(); });
252 std::map<BranchID, bool>& keepAssociation) {
261 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
262 std::vector<BranchDescription const*> associationDescriptions;
263 std::set<BranchID> keptProductsInEvent;
273 associationDescriptions.push_back(&desc);
275 keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
280 associationDescriptions, keptProductsInEvent, keepAssociation);
293 std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
294 std::set<BranchID>& keptProductsInEvent) {
301 keptProductsInEvent.insert(desc.
branchID());
312 std::map<BranchID::value_type, BranchID::value_type>
const& droppedBranchIDToKeptBranchID) {
317 std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
320 branchID = iter->second;
331 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
347 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
358 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
361 bool deepCopyRetriever =
false;
362 ep.fillEventPrincipal(
374 make_waiting_task(tbb::task::allocate_root(), [&
ep, iHolder](std::exception_ptr
const* iPtr)
mutable {
375 ep.clearEventPrincipal();
384 afterProcessTask =
std::move(finalizeEventTask);
388 [
this, &
ep, finalizeEventTask, iEventSetupImpls](std::exception_ptr
const* iPtr)
mutable {
391 subProcess.doEventAsync(finalizeEventTask,
ep, iEventSetupImpls);
400 ep.streamID().value(),
402 *((*iEventSetupImpls)[
esp_->subProcessIndex()]),
409 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
412 auto aux = std::make_shared<RunAuxiliary>(principal.
aux());
414 auto rpp = std::make_shared<RunPrincipal>(
aux,
421 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
422 rpp->fillRunPrincipal(processHistoryRegistry, principal.
reader());
433 beginGlobalTransitionAsync<Traits>(
440 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls,
441 bool cleaningUpAfterException) {
445 endGlobalTransitionAsync<Traits>(
std::move(iHolder),
449 esp_->eventSetupImpl(),
453 cleaningUpAfterException);
461 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it =
parentToChildPhID_.find(parentPhID);
463 auto const& childPhID = it->second;
466 tbb::task::allocate_root(),
467 [
this, childPhID,
runNumber,
task, mergeableRunProductMetadata](std::exception_ptr
const* iExcept)
mutable {
469 task.doneWaiting(*iExcept);
473 s.writeRunAsync(
task, childPhID,
runNumber, mergeableRunProductMetadata);
481 mergeableRunProductMetadata);
485 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it =
parentToChildPhID_.find(parentPhID);
487 auto const& childPhID = it->second;
490 [&childPhID,
runNumber](
auto& subProcess) { subProcess.deleteRunFromCache(childPhID,
runNumber); });
497 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
500 auto aux = principal.
aux();
506 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
512 beginGlobalTransitionAsync<Traits>(
std::move(iHolder),
516 *((*iEventSetupImpls)[
esp_->subProcessIndex()]),
525 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls,
526 bool cleaningUpAfterException) {
530 endGlobalTransitionAsync<Traits>(
std::move(iHolder),
534 *((*iEventSetupImpls)[
esp_->subProcessIndex()]),
538 cleaningUpAfterException);
548 task.doneWaiting(*iExcept);
552 s.writeLumiAsync(
task, *
l);
563 s.deleteLumiFromCache(*lb);
565 lb->clearPrincipal();
584 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
589 beginStreamTransitionAsync<Traits>(
std::move(iHolder),
594 esp_->eventSetupImpl(),
604 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls,
605 bool cleaningUpAfterException) {
609 endStreamTransitionAsync<Traits>(
std::move(iHolder),
614 esp_->eventSetupImpl(),
618 cleaningUpAfterException);
626 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls) {
631 beginStreamTransitionAsync<Traits>(
std::move(iHolder),
636 *((*iEventSetupImpls)[
esp_->subProcessIndex()]),
647 std::vector<std::shared_ptr<const EventSetupImpl>>
const* iEventSetupImpls,
648 bool cleaningUpAfterException) {
651 endStreamTransitionAsync<Traits>(
std::move(iHolder),
656 *((*iEventSetupImpls)[
esp_->subProcessIndex()]),
660 cleaningUpAfterException);
665 for (
auto const&
item : keptVector) {
668 if (parentProductResolver !=
nullptr) {
670 if (productResolver !=
nullptr) {
673 productResolver->
connectTo(*parentProductResolver, &parentPrincipal);
682 [
this](
auto& subProcess) { subProcess.updateBranchIDListHelper(
branchIDListHelper_->branchIDLists()); });
694 std::vector<std::string> subProcesses =
696 if (!subProcesses.empty()) {
ParameterSet const & registerIt()
void respondToOpenInputFile(FileBlock const &fb)
std::shared_ptr< ProcessConfiguration const > processConfiguration_
EventAuxiliary const & aux() const
std::string const & productInstanceName() const
std::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &, ActivityRegistry *, ParameterSet const *eventSetupPset=nullptr)
unsigned int historyRunOffset_
unsigned int historyLumiOffset_
ParameterSetID selector_config_id_
LuminosityBlockPrincipal const & luminosityBlockPrincipal() const
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
std::vector< std::string > const & getAllTriggerNames()
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
SelectedProductsForBranchType const & keptProducts() const
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
TypeWithDict const & unwrappedType() const
PathsAndConsumesOfModules pathsAndConsumesOfModules_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
void initialize(ProductSelectorRules const &rules, std::vector< BranchDescription const * > const &branchDescriptions)
void updateBranchIDListHelper(BranchIDLists const &)
std::vector< BranchListIndex > BranchListIndexes
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::vector< SubProcess > subProcesses_
void doEndStream(unsigned int)
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
bool selected(BranchDescription const &desc) const
T getUntrackedParameter(std::string const &, T const &) const
unsigned int value() const
ProcessHistory const & processHistory() const
void doneWaiting(std::exception_ptr iExcept)
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, std::vector< std::shared_ptr< const EventSetupImpl >> const *, bool cleaningUpAfterException)
RunAuxiliary const & aux() const
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
void processAsync(WaitingTaskHolder iHolder, EventPrincipal const &e, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ProductProvenanceRetriever const * productProvenanceRetrieverPtr() const
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
LuminosityBlockAuxiliary const & aux() const
BranchListIndexes const & branchListIndexes() const
unsigned int numberOfLuminosityBlocks() const
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
std::unique_ptr< ParameterSet > popParameterSet(std::string const &name)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
void deleteRunFromCache(ProcessHistoryID const &parentPhID, int runNumber)
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
std::vector< std::shared_ptr< LuminosityBlockPrincipal > > inUseLumiPrincipals_
SubProcess(ParameterSet ¶meterSet, ParameterSet const &topLevelParameterSet, std::shared_ptr< ProductRegistry const > parentProductRegistry, std::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, SubProcessParentageHelper const &parentSubProcessParentageHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext)
std::string const & processName() const
ProcessHistoryID const & processHistoryID() const
std::vector< BranchID::value_type > BranchIDList
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
void selectProducts(ProductRegistry const &preg, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, std::map< BranchID, bool > &keepAssociation)
static void checkForDuplicateKeptBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const * > &trueBranchIDToKeptBranchDesc)
DelayedReader * reader() const
BranchID const & originalBranchID() const
void selectAssociationProducts(std::vector< BranchDescription const * > const &associationDescriptions, std::set< BranchID > const &keptProductsInEvent, std::map< BranchID, bool > &keepAssociation) const
void doEventAsync(WaitingTaskHolder iHolder, EventPrincipal const &principal, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
TypeID unwrappedTypeID() const
ConstProductResolverPtr getProductResolver(BranchID const &oid) const
ServiceToken serviceToken_
void connectToSubProcess(ActivityRegistry &iOther)
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts, std::vector< std::shared_ptr< const EventSetupImpl >> const *, bool cleaningUpAfterException)
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
void doBeginRunAsync(WaitingTaskHolder iHolder, RunPrincipal const &principal, IOVSyncValue const &ts, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
static void fillDroppedToKept(ProductRegistry const &preg, std::map< BranchID, BranchDescription const * > const &trueBranchIDToKeptBranchDesc, std::map< BranchID::value_type, BranchID::value_type > &droppedBranchIDToKeptBranchID_)
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
std::vector< BranchIDList > BranchIDLists
ProductRegistry const & productRegistry() const
void propagateProducts(BranchType type, Principal const &parentPrincipal, Principal &principal) const
SelectedProductsForBranchType keptProducts_
bool exists(std::string const ¶meterName) const
checks if a parameter exists
void deleteLumiFromCache(LuminosityBlockPrincipal &)
ProductSelectorRules productSelectorRules_
void setParentProcessContext(ProcessContext const *parentProcessContext)
bool wantEvent(EventForOutput const &e)
ProductResolverBase * getModifiableProductResolver(BranchID const &oid)
edm::propagate_const< std::unique_ptr< ParameterSet > > processParameterSet_
std::unique_ptr< ExceptionToActionTable const > act_table_
void keepThisBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const * > &trueBranchIDToKeptBranchDesc, std::set< BranchID > &keptProductsInEvent)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &, bool iPrefetchMayGet)
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &parentPhID, int runNumber, MergeableRunProductMetadata const *)
EventSelectionIDVector const & eventSelectionIDs() const
detail::TriggerResultsBasedEventSelector selectors_
ProcessHistoryID const & reducedProcessHistoryID() const
std::vector< ParameterSet > popVParameterSet(std::string const &name)
std::vector< BranchDescription const * > allBranchDescriptions() const
BranchType const & branchType() const
StreamID streamID() const
std::shared_ptr< ActivityRegistry > actReg_
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector, ConsumesCollector &&iC)
ProductSelector productSelector_
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
ProductList const & productList() const
std::map< ProcessHistoryID, ProcessHistoryID > parentToChildPhID_
void doBeginStream(unsigned int)
void doEndRunAsync(WaitingTaskHolder iHolder, RunPrincipal const &principal, IOVSyncValue const &ts, std::vector< std::shared_ptr< const EventSetupImpl >> const *, bool cleaningUpAfterException)
std::shared_ptr< ProductRegistry const > parentPreg_
std::string const & moduleLabel() const
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
ParameterSet const & parameterSet(Provenance const &provenance, ProcessHistory const &history)
std::vector< HistoryAppender > historyAppenders_
unsigned int numberOfStreams() const
virtual void connectTo(ProductResolverBase const &, Principal const *)=0
void doEndLuminosityBlockAsync(WaitingTaskHolder iHolder, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, std::vector< std::shared_ptr< const EventSetupImpl >> const *, bool cleaningUpAfterException)
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
LuminosityBlockIndex index() const
void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
EDGetTokenT< ProductType > consumes(edm::InputTag const &tag)
ProcessContext processContext_
bool anyProductProduced() const
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
edm::propagate_const< std::shared_ptr< SubProcessParentageHelper > > subProcessParentageHelper_
std::vector< EventSelectionID > EventSelectionIDVector
BranchID const & branchID() const
void insert(std::shared_ptr< RunPrincipal > rp)
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts, std::vector< std::shared_ptr< const EventSetupImpl >> const *)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::shared_ptr< ProductRegistry const > preg_
PrincipalCache principalCache_