39 std::shared_ptr<ProductRegistry const> parentProductRegistry,
40 std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
49 parentPreg_(parentProductRegistry),
51 branchIDListHelper_(),
53 processConfiguration_(),
54 historyLumiOffset_(preallocConfig.numberOfStreams()),
55 historyRunOffset_(historyLumiOffset_+preallocConfig.numberOfLuminosityBlocks()),
56 processHistoryRegistries_(historyRunOffset_+ preallocConfig.numberOfRuns()),
57 historyAppenders_(historyRunOffset_+preallocConfig.numberOfRuns()),
63 processParameterSet_(),
64 productSelectorRules_(parameterSet,
"outputCommands",
"OutputModule"),
66 wantAllEvents_(
true) {
76 tns->getProcessName(),
79 std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
82 outputModulePathPositions,
83 parentProductRegistry->anyProductProduced());
85 std::map<BranchID, bool> keepAssociation;
86 selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
102 if(topLevelParameterSet.
exists(maxEvents)) {
105 if(topLevelParameterSet.
exists(maxLumis)) {
164 ep->preModuleDelayedGetSignal_.connect(std::cref(items.actReg_->preModuleEventDelayedGetSignal_));
165 ep->postModuleDelayedGetSignal_.connect(std::cref(items.actReg_->postModuleEventDelayedGetSignal_));
168 if(subProcessParameterSet) {
170 topLevelParameterSet,
173 *thinnedAssociationsHelper_,
211 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
213 if(
subProcess_.get()) c.
call([
this](){ this->subProcess_->doEndJob();});
222 std::map<BranchID, bool>& keepAssociation) {
223 if(productSelector_.initialized())
return;
230 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
231 std::vector<BranchDescription const*> associationDescriptions;
232 std::set<BranchID> keptProductsInEvent;
242 associationDescriptions.push_back(&desc);
243 }
else if(productSelector_.selected(desc)) {
244 keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
252 for(
auto association : associationDescriptions) {
253 if(keepAssociation[association->branchID()]) {
254 keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
259 ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);
263 std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
264 std::set<BranchID>& keptProductsInEvent) {
266 ProductSelector::checkForDuplicateKeptBranch(desc,
267 trueBranchIDToKeptBranchDesc);
273 keptProductsInEvent.insert(desc.
branchID());
277 keptProducts_[desc.
branchType()].push_back(&desc);
281 SubProcess::fixBranchIDListsForEDAliases(std::map<BranchID::value_type, BranchID::value_type>
const& droppedBranchIDToKeptBranchID) {
284 for(
BranchIDList& branchIDList : branchIDListHelper_->mutableBranchIDLists()) {
286 std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter = droppedBranchIDToKeptBranchID.find(branchID);
287 if(iter != droppedBranchIDToKeptBranchID.end()) {
288 branchID = iter->second;
292 if(subProcess_.get()) subProcess_->fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID);
299 if(!wantAllEvents_) {
302 if(!selectors_.wantEvent(ep,
nullptr)) {
317 esids.push_back(selector_config_id_);
321 auto & processHistoryRegistry = processHistoryRegistries_[principal.
streamID().
value()];
322 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
324 branchIDListHelper_->fixBranchListIndexes(bli);
326 processHistoryRegistry,
332 propagateProducts(
InEvent, principal, ep);
334 schedule_->processOneEvent<Traits>(ep.
streamID().
value(),ep, esp_->eventSetup());
335 if(subProcess_.get()) subProcess_->doEvent(ep);
342 beginRun(principal,ts);
347 auto aux = std::make_shared<RunAuxiliary>(principal.
aux());
349 auto rpp = std::make_shared<RunPrincipal>(
aux, preg_, *processConfiguration_, &(historyAppenders_[historyRunOffset_+principal.
index()]),principal.
index());
350 auto & processHistoryRegistry = processHistoryRegistries_[historyRunOffset_+principal.
index()];
351 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
352 rpp->fillRunPrincipal(processHistoryRegistry, principal.
reader());
353 principalCache_.insert(rpp);
358 parentToChildPhID_.insert(std::make_pair(parentInputReducedPHID,inputReducedPHID));
361 propagateProducts(
InRun, principal, rp);
363 schedule_->processOneGlobal<Traits>(rp, esp_->eventSetupForInstance(ts));
364 if(subProcess_.get()) subProcess_->doBeginRun(rp, ts);
370 endRun(principal,ts,cleaningUpAfterException);
376 propagateProducts(
InRun, principal, rp);
378 schedule_->processOneGlobal<Traits>(rp, esp_->eventSetupForInstance(ts), cleaningUpAfterException);
379 if(subProcess_.get()) subProcess_->doEndRun(rp, ts, cleaningUpAfterException);
385 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
386 assert(it != parentToChildPhID_.end());
387 schedule_->writeRun(principalCache_.runPrincipal(it->second, runNumber), &processContext_);
388 if(subProcess_.get()) subProcess_->writeRun(it->second, runNumber);
393 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
394 assert(it != parentToChildPhID_.end());
395 principalCache_.deleteRun(it->second, runNumber);
396 if(subProcess_.get()) subProcess_->deleteRunFromCache(it->second, runNumber);
402 beginLuminosityBlock(principal,ts);
407 auto aux = std::make_shared<LuminosityBlockAuxiliary>(principal.
aux());
409 auto lbpp = std::make_shared<LuminosityBlockPrincipal>(
aux, preg_, *processConfiguration_, &(historyAppenders_[historyLumiOffset_+principal.
index()]),principal.
index());
410 auto & processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_+principal.
index()];
411 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
412 lbpp->fillLuminosityBlockPrincipal(processHistoryRegistry, principal.
reader());
413 lbpp->setRunPrincipal(principalCache_.runPrincipalPtr());
414 principalCache_.insert(lbpp);
416 propagateProducts(
InLumi, principal, lbp);
418 schedule_->processOneGlobal<Traits>(lbp, esp_->eventSetupForInstance(ts));
419 if(subProcess_.get()) subProcess_->doBeginLuminosityBlock(lbp, ts);
425 endLuminosityBlock(principal,ts,cleaningUpAfterException);
431 propagateProducts(
InLumi, principal, lbp);
433 schedule_->processOneGlobal<Traits>(lbp, esp_->eventSetupForInstance(ts), cleaningUpAfterException);
434 if(subProcess_.get()) subProcess_->doEndLuminosityBlock(lbp, ts, cleaningUpAfterException);
440 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
441 assert(it != parentToChildPhID_.end());
442 schedule_->writeLumi(principalCache_.lumiPrincipal(it->second, runNumber, lumiNumber), &processContext_);
443 if(subProcess_.get()) subProcess_->writeLumi(it->second, runNumber, lumiNumber);
448 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
449 assert(it != parentToChildPhID_.end());
450 principalCache_.deleteLumi(it->second, runNumber, lumiNumber);
451 if(subProcess_.get()) subProcess_->deleteLumiFromCache(it->second, runNumber, lumiNumber);
455 SubProcess::doBeginStream(
unsigned int iID) {
457 schedule_->beginStream(iID);
458 if(subProcess_.get()) subProcess_->doBeginStream(iID);
462 SubProcess::doEndStream(
unsigned int iID) {
464 schedule_->endStream(iID);
465 if(subProcess_.get()) subProcess_->doEndStream(iID);
474 schedule_->processOneStream<Traits>(id,rp, esp_->eventSetupForInstance(ts));
475 if(subProcess_.get()) subProcess_->doStreamBeginRun(
id,rp, ts);
485 schedule_->processOneStream<Traits>(id,rp, esp_->eventSetupForInstance(ts),cleaningUpAfterException);
486 if(subProcess_.get()) subProcess_->doStreamEndRun(
id,rp, ts,cleaningUpAfterException);
496 schedule_->processOneStream<Traits>(id,lbp, esp_->eventSetupForInstance(ts));
497 if(subProcess_.get()) subProcess_->doStreamBeginLuminosityBlock(
id,lbp, ts);
507 schedule_->processOneStream<Traits>(id,lbp, esp_->eventSetupForInstance(ts),cleaningUpAfterException);
508 if(subProcess_.get()) subProcess_->doStreamEndLuminosityBlock(
id,lbp, ts,cleaningUpAfterException);
516 for(
auto const& item : keptVector) {
518 if(parentProductHolder !=
nullptr) {
521 if(productHolder !=
nullptr) {
544 void SubProcess::updateBranchIDListHelper(
BranchIDLists const& branchIDLists) {
545 branchIDListHelper_->updateFromParent(branchIDLists);
546 if(subProcess_.get()) {
547 subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
555 schedule_->respondToOpenInputFile(fb);
556 if(subProcess_.get()) subProcess_->respondToOpenInputFile(fb);
560 std::auto_ptr<ParameterSet>
562 std::vector<std::string> subProcesses = parameterSet.
getUntrackedParameter<std::vector<std::string> >(
"@all_subprocesses");
563 if(!subProcesses.empty()) {
564 assert(subProcesses.size() == 1U);
565 assert(subProcesses[0] ==
"@sub_process");
568 return std::auto_ptr<ParameterSet>(
nullptr);
unsigned int historyRunOffset_
ParameterSetID selector_config_id_
void insert(std::shared_ptr< RunPrincipal > rp)
bool productProvenanceValid() const
ProductRegistry const & productRegistry() const
T getUntrackedParameter(std::string const &, T const &) const
void setLuminosityBlockPrincipal(std::shared_ptr< LuminosityBlockPrincipal > const &lbp)
EventSelectionIDVector const & eventSelectionIDs() const
BranchType const & branchType() const
std::vector< BranchIDList > BranchIDLists
ProcessHistoryID const & reducedProcessHistoryID() const
ConstProductHolderPtr getProductHolder(BranchID const &oid) const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet ¶meterSet)
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
PathsAndConsumesOfModules pathsAndConsumesOfModules_
bool exists(std::string const ¶meterName) const
checks if a parameter exists
void updateBranchIDListHelper(BranchIDLists const &)
ProductProvenance * productProvenance() const
LuminosityBlockAuxiliary const & aux() const
LuminosityBlockIndex index() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void setParentProcessContext(ProcessContext const *parentProcessContext)
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
BranchListIndexes const & branchListIndexes() const
void resetProductProvenance() const
ProcessHistory const & processHistory() const
std::vector< EventSelectionID > EventSelectionIDVector
ProductList const & productList() const
void selectAssociationProducts(std::vector< BranchDescription const * > const &associationDescriptions, std::set< BranchID > const &keptProductsInEvent, std::map< BranchID, bool > &keepAssociation) const
void initialize(Schedule const *, std::shared_ptr< ProductRegistry const >)
ServiceToken serviceToken_
std::vector< BranchListIndex > BranchListIndexes
ProcessHistoryID const & processHistoryID() const
std::shared_ptr< ProductProvenanceRetriever > productProvenanceRetrieverPtr() const
void selectProducts(ProductRegistry const &preg, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, std::map< BranchID, bool > &keepAssociation)
ProcessHistory const & processHistory() const
static void setThrowAnException(bool v)
std::shared_ptr< ProductProvenanceRetriever > const & store() const
StreamID streamID() const
std::shared_ptr< WrapperBase > wrapper_
std::vector< BranchDescription const * > allBranchDescriptions() const
std::auto_ptr< ParameterSet > popParameterSet(std::string const &name)
BranchID const & branchID() const
TypeWithDict const & unwrappedType() const
RunAuxiliary const & aux() const
ProductData const & productData() const
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< ExceptionToActionTable const > act_table_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
DelayedReader * reader() const
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
unsigned int value() const
detail::TriggerResultsBasedEventSelector selectors_
boost::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &)
std::auto_ptr< Schedule > schedule_
void clearEventPrincipal()
void connectToSubProcess(ActivityRegistry &iOther)
std::shared_ptr< ActivityRegistry > actReg_
bool anyProductProduced() const
void setProductID(ProductID const &pid)
std::vector< HistoryAppender > historyAppenders_
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
std::vector< BranchID::value_type > BranchIDList
void setStore(std::shared_ptr< ProductProvenanceRetriever > store) const
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
std::unique_ptr< ParameterSet > processParameterSet_
std::vector< std::string > const & getAllTriggerNames()
void setProcessHistory(ProcessHistory const &ph)
std::vector< BranchDescription const * > SelectedProducts
unsigned int numberOfStreams() const
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
void setProductProvenance(ProductProvenance const &prov) const
ProcessContext processContext_
BranchID const & originalBranchID() const
ProductID const & productID() const
void call(std::function< void(void)>)
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
SubProcess(ParameterSet ¶meterSet, ParameterSet const &topLevelParameterSet, std::shared_ptr< ProductRegistry const > parentProductRegistry, std::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext)
std::auto_ptr< SubProcess > subProcess_
EventAuxiliary const & aux() const
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector)
ParameterSet const & registerIt()
std::shared_ptr< ProductRegistry const > preg_
PrincipalCache principalCache_
ParameterSet const & parameterSet(Provenance const &provenance)
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool productUnavailable() const