37 #include "boost/range/adaptor/reversed.hpp" 47 std::shared_ptr<ProductRegistry const> parentProductRegistry,
48 std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
59 parentPreg_(parentProductRegistry),
61 branchIDListHelper_(),
63 processConfiguration_(),
64 historyLumiOffset_(preallocConfig.numberOfStreams()),
65 historyRunOffset_(historyLumiOffset_+preallocConfig.numberOfLuminosityBlocks()),
66 processHistoryRegistries_(historyRunOffset_+ preallocConfig.numberOfRuns()),
67 historyAppenders_(historyRunOffset_+preallocConfig.numberOfRuns()),
73 processParameterSet_(),
74 productSelectorRules_(parameterSet,
"outputCommands",
"OutputModule"),
76 wantAllEvents_(
true) {
86 tns->getProcessName(),
90 std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
93 outputModulePathPositions,
94 parentProductRegistry->anyProductProduced());
96 std::map<BranchID, bool> keepAssociation;
97 selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
114 if(topLevelParameterSet.
exists(maxEvents)) {
117 if(topLevelParameterSet.
exists(maxLumis)) {
123 bool hasSubProcesses = subProcessVParameterSet.size() != 0ull;
164 preg_ = items.preg();
181 auto ep = std::make_shared<EventPrincipal>(
preg_,
198 for(
auto& subProcessPSet : subProcessVParameterSet) {
200 topLevelParameterSet,
203 *thinnedAssociationsHelper_,
251 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
254 c.
call([&subProcess](){ subProcess.doEndJob();});
264 std::map<BranchID, bool>& keepAssociation) {
272 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
273 std::vector<BranchDescription const*> associationDescriptions;
274 std::set<BranchID> keptProductsInEvent;
284 associationDescriptions.push_back(&desc);
286 keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
305 std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
306 std::set<BranchID>& keptProductsInEvent) {
309 trueBranchIDToKeptBranchDesc);
315 keptProductsInEvent.insert(desc.
branchID());
333 std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter = droppedBranchIDToKeptBranchID.find(branchID);
334 if(iter != droppedBranchIDToKeptBranchID.end()) {
335 branchID = iter->second;
339 for_all(
subProcesses_, [&droppedBranchIDToKeptBranchID](
auto& subProcess){ subProcess.fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID); });
371 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
374 bool deepCopyRetriever =
false;
376 processHistoryRegistry,
386 [&ep,iHolder](std::exception_ptr
const* iPtr)
mutable 399 afterProcessTask =
std::move(finalizeEventTask);
403 [
this,&ep,finalizeEventTask] (std::exception_ptr
const* iPtr)
mutable{
406 subProcess.doEventAsync(finalizeEventTask,ep);
423 auto aux = std::make_shared<RunAuxiliary>(principal.
aux());
427 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
428 rpp->fillRunPrincipal(processHistoryRegistry, principal.
reader());
439 beginGlobalTransitionAsync<Traits>(
std::move(iHolder),
453 bool cleaningUpAfterException) {
457 endGlobalTransitionAsync<Traits>(
std::move(iHolder),
464 cleaningUpAfterException);
470 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it =
parentToChildPhID_.find(parentPhID);
472 auto const& childPhID = it->second;
474 auto subTasks =
edm::make_waiting_task(tbb::task::allocate_root(), [
this,childPhID,runNumber, task](std::exception_ptr
const* iExcept)
mutable {
480 s.writeRunAsync(task, childPhID, runNumber);
489 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it =
parentToChildPhID_.find(parentPhID);
491 auto const& childPhID = it->second;
493 for_all(
subProcesses_, [&childPhID, runNumber](
auto& subProcess){ subProcess.deleteRunFromCache(childPhID, runNumber); });
500 auto aux = principal.
aux();
506 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
507 lbpp->fillLuminosityBlockPrincipal(processHistoryRegistry, principal.
reader());
512 beginGlobalTransitionAsync<Traits>(
std::move(iHolder),
527 endGlobalTransitionAsync<Traits>(
std::move(iHolder),
534 cleaningUpAfterException);
543 auto subTasks =
edm::make_waiting_task(tbb::task::allocate_root(), [
this,
l, task](std::exception_ptr
const* iExcept)
mutable {
549 s.writeLumiAsync(task, *
l);
561 s.deleteLumiFromCache(*lb);
563 lb->clearPrincipal();
587 beginStreamTransitionAsync<Traits>(
std::move(iHolder),
605 endStreamTransitionAsync<Traits>(
std::move(iHolder),
613 cleaningUpAfterException);
623 beginStreamTransitionAsync<Traits>(
std::move(iHolder),
640 endStreamTransitionAsync<Traits>(
std::move(iHolder),
648 cleaningUpAfterException);
655 for(
auto const& item : keptVector) {
658 if(parentProductResolver !=
nullptr) {
660 if(productResolver !=
nullptr) {
663 productResolver->
connectTo(*parentProductResolver, &parentPrincipal);
683 std::vector<ParameterSet>
685 std::vector<std::string> subProcesses = parameterSet.
getUntrackedParameter<std::vector<std::string>>(
"@all_subprocesses");
686 if(!subProcesses.empty()) {
unsigned int historyRunOffset_
unsigned int historyLumiOffset_
ParameterSetID selector_config_id_
void insert(std::shared_ptr< RunPrincipal > rp)
void setLuminosityBlockPrincipal(LuminosityBlockPrincipal *lbp)
ProductRegistry const & productRegistry() const
ProductResolverBase * getModifiableProductResolver(BranchID const &oid)
void doEventAsync(WaitingTaskHolder iHolder, EventPrincipal const &principal)
T getUntrackedParameter(std::string const &, T const &) const
bool selected(BranchDescription const &desc) const
bool wantEvent(EventForOutput const &e)
EventSelectionIDVector const & eventSelectionIDs() const
void respondToOpenInputFile(FileBlock const &fb)
BranchType const & branchType() const
std::vector< BranchIDList > BranchIDLists
ProcessHistoryID const & reducedProcessHistoryID() const
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts)
std::unique_ptr< ParameterSet > popParameterSet(std::string const &name)
static void fillDroppedToKept(ProductRegistry const &preg, std::map< BranchID, BranchDescription const * > const &trueBranchIDToKeptBranchDesc, std::map< BranchID::value_type, BranchID::value_type > &droppedBranchIDToKeptBranchID_)
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
std::vector< SubProcess > subProcesses_
void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const &, bool iPrefetchMayGet)
void doStreamEndRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
PathsAndConsumesOfModules pathsAndConsumesOfModules_
bool exists(std::string const ¶meterName) const
checks if a parameter exists
void updateBranchIDListHelper(BranchIDLists const &)
LuminosityBlockAuxiliary const & aux() const
SelectedProductsForBranchType const & keptProducts() const
std::string const & processName() const
LuminosityBlockIndex index() const
void doEndStream(unsigned int)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void doStreamEndLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
std::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &, ActivityRegistry *)
void setParentProcessContext(ProcessContext const *parentProcessContext)
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
BranchListIndexes const & branchListIndexes() const
edm::propagate_const< std::unique_ptr< Schedule > > schedule_
ProcessHistory const & processHistory() const
std::vector< EventSelectionID > EventSelectionIDVector
void validateTopLevelParameterSets(ParameterSet *processParameterSet)
EDGetTokenT< ProductType > consumes(edm::InputTag const &tag)
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
ProductList const & productList() const
void selectAssociationProducts(std::vector< BranchDescription const * > const &associationDescriptions, std::set< BranchID > const &keptProductsInEvent, std::map< BranchID, bool > &keepAssociation) const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
virtual void connectTo(ProductResolverBase const &, Principal const *)=0
ServiceToken serviceToken_
void deleteRunFromCache(ProcessHistoryID const &parentPhID, int runNumber)
void writeRunAsync(WaitingTaskHolder, ProcessHistoryID const &parentPhID, int runNumber)
std::vector< BranchListIndex > BranchListIndexes
void doneWaiting(std::exception_ptr iExcept)
std::vector< std::shared_ptr< LuminosityBlockPrincipal > > inUseLumiPrincipals_
void processAsync(WaitingTaskHolder iHolder, EventPrincipal const &e)
void doEndRunAsync(WaitingTaskHolder iHolder, RunPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
std::string const & moduleLabel() const
void doBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts)
std::string const & productInstanceName() const
ProcessHistoryID const & processHistoryID() const
SelectedProductsForBranchType keptProducts_
LuminosityBlockPrincipal const & luminosityBlockPrincipal() const
void selectProducts(ProductRegistry const &preg, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, std::map< BranchID, bool > &keepAssociation)
void doBeginRunAsync(WaitingTaskHolder iHolder, RunPrincipal const &principal, IOVSyncValue const &ts)
ConsumesCollector consumesCollector()
Use a ConsumesCollector to gather consumes information from helper functions.
void deleteLumiFromCache(LuminosityBlockPrincipal &)
ProductSelectorRules productSelectorRules_
StreamID streamID() const
TypeID unwrappedTypeID() const
edm::propagate_const< std::shared_ptr< eventsetup::EventSetupProvider > > esp_
std::vector< BranchDescription const * > allBranchDescriptions() const
edm::propagate_const< std::unique_ptr< ParameterSet > > processParameterSet_
void doEndLuminosityBlockAsync(WaitingTaskHolder iHolder, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts, bool cleaningUpAfterException)
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector, ConsumesCollector &&iC)
BranchID const & branchID() const
TypeWithDict const & unwrappedType() const
RunAuxiliary const & aux() const
void keepThisBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const * > &trueBranchIDToKeptBranchDesc, std::set< BranchID > &keptProductsInEvent)
unsigned int numberOfLuminosityBlocks() const
ProductProvenanceRetriever const * productProvenanceRetrieverPtr() const
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
EventPrincipal & eventPrincipal(unsigned int iStreamIndex) const
void doStreamBeginLuminosityBlockAsync(WaitingTaskHolder iHolder, unsigned int iID, LuminosityBlockPrincipal const &principal, IOVSyncValue const &ts)
std::unique_ptr< ExceptionToActionTable const > act_table_
std::vector< ParameterSet > popSubProcessVParameterSet(ParameterSet ¶meterSet)
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
DelayedReader * reader() const
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
ProductSelector productSelector_
unsigned int value() const
void checkForModuleDependencyCorrectness(edm::PathsAndConsumesOfModulesBase const &iPnC, bool iPrintDependencies)
detail::TriggerResultsBasedEventSelector selectors_
void clearEventPrincipal()
void connectToSubProcess(ActivityRegistry &iOther)
std::map< ProcessHistoryID, ProcessHistoryID > parentToChildPhID_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
void deleteRun(ProcessHistoryID const &phid, RunNumber_t run)
std::shared_ptr< ActivityRegistry > actReg_
SubProcess(ParameterSet ¶meterSet, ParameterSet const &topLevelParameterSet, std::shared_ptr< ProductRegistry const > parentProductRegistry, std::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, SubProcessParentageHelper const &parentSubProcessParentageHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext)
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
std::shared_ptr< RunPrincipal > const & runPrincipalPtr(ProcessHistoryID const &phid, RunNumber_t run) const
bool anyProductProduced() const
std::vector< HistoryAppender > historyAppenders_
std::vector< BranchID::value_type > BranchIDList
std::vector< ParameterSet > popVParameterSet(std::string const &name)
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
std::vector< std::string > const & getAllTriggerNames()
void initialize(ProductSelectorRules const &rules, std::vector< BranchDescription const * > const &branchDescriptions)
void propagateProducts(BranchType type, Principal const &parentPrincipal, Principal &principal) const
void doBeginStream(unsigned int)
unsigned int numberOfStreams() const
void setConsumer(EDConsumerBase const *iConsumer)
std::shared_ptr< ProductRegistry const > parentPreg_
ConstProductResolverPtr getProductResolver(BranchID const &oid) const
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
ProcessContext processContext_
std::shared_ptr< LuminosityBlockPrincipal > getAvailableLumiPrincipalPtr()
BranchID const & originalBranchID() const
void call(std::function< void(void)>)
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
EventAuxiliary const & aux() const
ParameterSet const & registerIt()
void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal &)
std::shared_ptr< ProductRegistry const > preg_
PrincipalCache principalCache_
ParameterSet const & parameterSet(Provenance const &provenance)
edm::propagate_const< std::shared_ptr< SubProcessParentageHelper > > subProcessParentageHelper_
static void checkForDuplicateKeptBranch(BranchDescription const &desc, std::map< BranchID, BranchDescription const * > &trueBranchIDToKeptBranchDesc)
std::shared_ptr< ProcessConfiguration const > processConfiguration_
RunPrincipal & runPrincipal(ProcessHistoryID const &phid, RunNumber_t run) const
def operate(timelog, memlog, json_f, num)