37 std::shared_ptr<ProductRegistry const> parentProductRegistry,
38 std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
46 parentPreg_(parentProductRegistry),
48 branchIDListHelper_(),
50 processConfiguration_(),
51 historyLumiOffset_(preallocConfig.numberOfStreams()),
52 historyRunOffset_(historyLumiOffset_+preallocConfig.numberOfLuminosityBlocks()),
53 processHistoryRegistries_(historyRunOffset_+ preallocConfig.numberOfRuns()),
54 historyAppenders_(historyRunOffset_+preallocConfig.numberOfRuns()),
60 processParameterSet_(),
61 productSelectorRules_(parameterSet,
"outputCommands",
"OutputModule"),
63 wantAllEvents_(
true) {
73 tns->getProcessName(),
76 std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
79 outputModulePathPositions,
80 parentProductRegistry->anyProductProduced());
97 if(topLevelParameterSet.
exists(maxEvents)) {
100 if(topLevelParameterSet.
exists(maxLumis)) {
141 preg_.reset(items.preg_.release());
155 ep->preModuleDelayedGetSignal_.connect(std::cref(items.actReg_->preModuleEventDelayedGetSignal_));
156 ep->postModuleDelayedGetSignal_.connect(std::cref(items.actReg_->postModuleEventDelayedGetSignal_));
159 if(subProcessParameterSet) {
161 topLevelParameterSet,
199 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
201 if(
subProcess_.get()) c.
call([
this](){ this->subProcess_->doEndJob();});
209 if(productSelector_.initialized())
return;
216 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
225 }
else if(productSelector_.selected(desc)) {
231 std::map<BranchID, BranchDescription const*>::const_iterator
iter = trueBranchIDToKeptBranchDesc.find(trueBranchID);
232 if(iter != trueBranchIDToKeptBranchDesc.end()) {
234 <<
"Two (or more) equivalent branches have been selected for output.\n"
236 <<
"#2: " <<
BranchKey(*iter->second) <<
"\n"
237 <<
"Please drop at least one of them.\n";
239 trueBranchIDToKeptBranchDesc.insert(std::make_pair(trueBranchID, &desc));
242 keptProducts_[desc.
branchType()].push_back(&desc);
250 std::map<BranchID, BranchDescription const*>::const_iterator
iter = trueBranchIDToKeptBranchDesc.find(branchID);
251 if(iter != trueBranchIDToKeptBranchDesc.end()) {
253 BranchID const& keptBranchID = iter->second->branchID();
254 if(keptBranchID != branchID) {
256 droppedBranchIDToKeptBranchID_.insert(std::make_pair(branchID.
id(), keptBranchID.
id()));
263 SubProcess::fixBranchIDListsForEDAliases(std::map<BranchID::value_type, BranchID::value_type>
const& droppedBranchIDToKeptBranchID) {
266 for(
BranchIDList& branchIDList : branchIDListHelper_->mutableBranchIDLists()) {
268 std::map<BranchID::value_type, BranchID::value_type>::const_iterator
iter = droppedBranchIDToKeptBranchID.find(branchID);
269 if(iter != droppedBranchIDToKeptBranchID.end()) {
270 branchID = iter->second;
274 if(subProcess_.get()) subProcess_->fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID);
284 if(!wantAllEvents_) {
287 if(!selectors_.wantEvent(ep,
nullptr)) {
302 esids.push_back(selector_config_id_);
306 auto & processHistoryRegistry = processHistoryRegistries_[principal.
streamID().
value()];
307 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
309 branchIDListHelper_->fixBranchListIndexes(bli);
311 processHistoryRegistry,
317 propagateProducts(
InEvent, principal, ep);
319 schedule_->processOneEvent<Traits>(ep.
streamID().
value(),ep, esp_->eventSetup());
320 if(subProcess_.get()) subProcess_->doEvent(ep);
327 beginRun(principal,ts);
332 auto aux = std::make_shared<RunAuxiliary>(principal.
aux());
334 auto rpp = std::make_shared<RunPrincipal>(
aux, preg_, *processConfiguration_, &(historyAppenders_[historyRunOffset_+principal.
index()]),principal.
index());
335 auto & processHistoryRegistry = processHistoryRegistries_[historyRunOffset_+principal.
index()];
336 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
337 rpp->fillRunPrincipal(processHistoryRegistry, principal.
reader());
338 principalCache_.insert(rpp);
343 parentToChildPhID_.insert(std::make_pair(parentInputReducedPHID,inputReducedPHID));
346 propagateProducts(
InRun, principal, rp);
348 schedule_->processOneGlobal<Traits>(rp, esp_->eventSetupForInstance(ts));
349 if(subProcess_.get()) subProcess_->doBeginRun(rp, ts);
355 endRun(principal,ts,cleaningUpAfterException);
361 propagateProducts(
InRun, principal, rp);
363 schedule_->processOneGlobal<Traits>(rp, esp_->eventSetupForInstance(ts), cleaningUpAfterException);
364 if(subProcess_.get()) subProcess_->doEndRun(rp, ts, cleaningUpAfterException);
370 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
371 assert(it != parentToChildPhID_.end());
372 schedule_->writeRun(principalCache_.runPrincipal(it->second, runNumber), &processContext_);
373 if(subProcess_.get()) subProcess_->writeRun(it->second, runNumber);
378 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
379 assert(it != parentToChildPhID_.end());
380 principalCache_.deleteRun(it->second, runNumber);
381 if(subProcess_.get()) subProcess_->deleteRunFromCache(it->second, runNumber);
387 beginLuminosityBlock(principal,ts);
392 auto aux = std::make_shared<LuminosityBlockAuxiliary>(principal.
aux());
394 auto lbpp = std::make_shared<LuminosityBlockPrincipal>(
aux, preg_, *processConfiguration_, &(historyAppenders_[historyLumiOffset_+principal.
index()]),principal.
index());
395 auto & processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_+principal.
index()];
396 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
397 lbpp->fillLuminosityBlockPrincipal(processHistoryRegistry, principal.
reader());
398 lbpp->setRunPrincipal(principalCache_.runPrincipalPtr());
399 principalCache_.insert(lbpp);
401 propagateProducts(
InLumi, principal, lbp);
403 schedule_->processOneGlobal<Traits>(lbp, esp_->eventSetupForInstance(ts));
404 if(subProcess_.get()) subProcess_->doBeginLuminosityBlock(lbp, ts);
410 endLuminosityBlock(principal,ts,cleaningUpAfterException);
416 propagateProducts(
InLumi, principal, lbp);
418 schedule_->processOneGlobal<Traits>(lbp, esp_->eventSetupForInstance(ts), cleaningUpAfterException);
419 if(subProcess_.get()) subProcess_->doEndLuminosityBlock(lbp, ts, cleaningUpAfterException);
425 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
426 assert(it != parentToChildPhID_.end());
427 schedule_->writeLumi(principalCache_.lumiPrincipal(it->second, runNumber, lumiNumber), &processContext_);
428 if(subProcess_.get()) subProcess_->writeLumi(it->second, runNumber, lumiNumber);
433 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
434 assert(it != parentToChildPhID_.end());
435 principalCache_.deleteLumi(it->second, runNumber, lumiNumber);
436 if(subProcess_.get()) subProcess_->deleteLumiFromCache(it->second, runNumber, lumiNumber);
440 SubProcess::doBeginStream(
unsigned int iID) {
442 schedule_->beginStream(iID);
443 if(subProcess_.get()) subProcess_->doBeginStream(iID);
447 SubProcess::doEndStream(
unsigned int iID) {
449 schedule_->endStream(iID);
450 if(subProcess_.get()) subProcess_->doEndStream(iID);
459 schedule_->processOneStream<Traits>(id,rp, esp_->eventSetupForInstance(ts));
460 if(subProcess_.get()) subProcess_->doStreamBeginRun(
id,rp, ts);
470 schedule_->processOneStream<Traits>(id,rp, esp_->eventSetupForInstance(ts),cleaningUpAfterException);
471 if(subProcess_.get()) subProcess_->doStreamEndRun(
id,rp, ts,cleaningUpAfterException);
481 schedule_->processOneStream<Traits>(id,lbp, esp_->eventSetupForInstance(ts));
482 if(subProcess_.get()) subProcess_->doStreamBeginLuminosityBlock(
id,lbp, ts);
492 schedule_->processOneStream<Traits>(id,lbp, esp_->eventSetupForInstance(ts),cleaningUpAfterException);
493 if(subProcess_.get()) subProcess_->doStreamEndLuminosityBlock(
id,lbp, ts,cleaningUpAfterException);
501 for(
auto const& item : keptVector) {
503 if(parentProductHolder !=
nullptr) {
506 if(productHolder !=
nullptr) {
529 void SubProcess::updateBranchIDListHelper(
BranchIDLists const& branchIDLists) {
530 branchIDListHelper_->updateFromParent(branchIDLists);
531 if(subProcess_.get()) {
532 subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
540 schedule_->respondToOpenInputFile(fb);
541 if(subProcess_.get()) subProcess_->respondToOpenInputFile(fb);
545 std::auto_ptr<ParameterSet>
547 std::vector<std::string> subProcesses = parameterSet.
getUntrackedParameter<std::vector<std::string> >(
"@all_subprocesses");
548 if(!subProcesses.empty()) {
549 assert(subProcesses.size() == 1U);
550 assert(subProcesses[0] ==
"@sub_process");
553 return std::auto_ptr<ParameterSet>(
nullptr);
unsigned int historyRunOffset_
ParameterSetID selector_config_id_
void insert(std::shared_ptr< RunPrincipal > rp)
bool productProvenanceValid() const
ProductRegistry const & productRegistry() const
T getUntrackedParameter(std::string const &, T const &) const
void setLuminosityBlockPrincipal(std::shared_ptr< LuminosityBlockPrincipal > const &lbp)
EventSelectionIDVector const & eventSelectionIDs() const
BranchType const & branchType() const
std::vector< BranchIDList > BranchIDLists
ProcessHistoryID const & reducedProcessHistoryID() const
ConstProductHolderPtr getProductHolder(BranchID const &oid) const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet ¶meterSet)
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
bool exists(std::string const ¶meterName) const
checks if a parameter exists
void updateBranchIDListHelper(BranchIDLists const &)
ProductProvenance * productProvenance() const
LuminosityBlockAuxiliary const & aux() const
LuminosityBlockIndex index() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void setParentProcessContext(ProcessContext const *parentProcessContext)
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
BranchListIndexes const & branchListIndexes() const
void resetProductProvenance() const
ProcessHistory const & processHistory() const
std::vector< EventSelectionID > EventSelectionIDVector
void selectProducts(ProductRegistry const &preg)
ProductList const & productList() const
ServiceToken serviceToken_
std::vector< BranchListIndex > BranchListIndexes
ProcessHistoryID const & processHistoryID() const
std::shared_ptr< ProductProvenanceRetriever > productProvenanceRetrieverPtr() const
ProcessHistory const & processHistory() const
static void setThrowAnException(bool v)
std::shared_ptr< ProductProvenanceRetriever > const & store() const
StreamID streamID() const
std::shared_ptr< WrapperBase > wrapper_
std::vector< BranchDescription const * > allBranchDescriptions() const
std::auto_ptr< ParameterSet > popParameterSet(std::string const &name)
BranchID const & branchID() const
RunAuxiliary const & aux() const
ProductData const & productData() const
SubProcess(ParameterSet ¶meterSet, ParameterSet const &topLevelParameterSet, std::shared_ptr< ProductRegistry const > parentProductRegistry, std::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext)
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=nullptr)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< ExceptionToActionTable const > act_table_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
DelayedReader * reader() const
unsigned int value() const
detail::TriggerResultsBasedEventSelector selectors_
boost::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &)
std::auto_ptr< Schedule > schedule_
void clearEventPrincipal()
void connectToSubProcess(ActivityRegistry &iOther)
bool anyProductProduced() const
void setProductID(ProductID const &pid)
std::vector< HistoryAppender > historyAppenders_
std::vector< BranchID::value_type > BranchIDList
void setStore(std::shared_ptr< ProductProvenanceRetriever > store) const
std::unique_ptr< ParameterSet > processParameterSet_
std::vector< std::string > const & getAllTriggerNames()
void setProcessHistory(ProcessHistory const &ph)
std::vector< BranchDescription const * > SelectedProducts
unsigned int numberOfStreams() const
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
void setProductProvenance(ProductProvenance const &prov) const
ProcessContext processContext_
BranchID const & originalBranchID() const
ProductID const & productID() const
void call(std::function< void(void)>)
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
std::auto_ptr< SubProcess > subProcess_
EventAuxiliary const & aux() const
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector)
ParameterSet const & registerIt()
std::shared_ptr< ProductRegistry const > preg_
PrincipalCache principalCache_
ParameterSet const & parameterSet(Provenance const &provenance)
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool productUnavailable() const