37 boost::shared_ptr<ProductRegistry const> parentProductRegistry,
38 boost::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
46 parentPreg_(parentProductRegistry),
48 branchIDListHelper_(),
50 processConfiguration_(),
51 historyLumiOffset_(preallocConfig.numberOfStreams()),
52 historyRunOffset_(historyLumiOffset_+preallocConfig.numberOfLuminosityBlocks()),
53 processHistoryRegistries_(historyRunOffset_+ preallocConfig.numberOfRuns()),
54 historyAppenders_(historyRunOffset_+preallocConfig.numberOfRuns()),
60 processParameterSet_(),
61 productSelectorRules_(parameterSet,
"outputCommands",
"OutputModule"),
63 wantAllEvents_(
true) {
73 tns->getProcessName(),
76 std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
79 outputModulePathPositions,
80 parentProductRegistry->anyProductProduced());
98 if(topLevelParameterSet.
exists(maxEvents)) {
101 if(topLevelParameterSet.
exists(maxLumis)) {
109 ScheduleItems items(*parentProductRegistry, *parentBranchIDListHelper, *
this);
139 preg_.reset(items.preg_.release());
156 ep->preModuleDelayedGetSignal_.connect(std::cref(items.actReg_->preModuleEventDelayedGetSignal_));
157 ep->postModuleDelayedGetSignal_.connect(std::cref(items.actReg_->postModuleEventDelayedGetSignal_));
160 if(subProcessParameterSet) {
162 topLevelParameterSet,
200 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
202 if(
subProcess_.get()) c.
call([
this](){ this->subProcess_->doEndJob();});
210 if(productSelector_.initialized())
return;
217 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
226 }
else if(productSelector_.selected(desc)) {
232 std::map<BranchID, BranchDescription const*>::const_iterator iter = trueBranchIDToKeptBranchDesc.find(trueBranchID);
233 if(iter != trueBranchIDToKeptBranchDesc.end()) {
235 <<
"Two (or more) equivalent branches have been selected for output.\n"
237 <<
"#2: " <<
BranchKey(*iter->second) <<
"\n"
238 <<
"Please drop at least one of them.\n";
240 trueBranchIDToKeptBranchDesc.insert(std::make_pair(trueBranchID, &desc));
243 keptProducts_[desc.
branchType()].push_back(&desc);
251 std::map<BranchID, BranchDescription const*>::const_iterator iter = trueBranchIDToKeptBranchDesc.find(branchID);
252 if(iter != trueBranchIDToKeptBranchDesc.end()) {
254 BranchID const& keptBranchID = iter->second->branchID();
255 if(keptBranchID != branchID) {
257 droppedBranchIDToKeptBranchID_.insert(std::make_pair(branchID.
id(), keptBranchID.
id()));
264 SubProcess::fixBranchIDListsForEDAliases(std::map<BranchID::value_type, BranchID::value_type>
const& droppedBranchIDToKeptBranchID) {
267 for(
BranchIDList& branchIDList : branchIDListHelper_->mutableBranchIDLists()) {
269 std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter = droppedBranchIDToKeptBranchID.find(branchID);
270 if(iter != droppedBranchIDToKeptBranchID.end()) {
271 branchID = iter->second;
275 if(subProcess_.get()) subProcess_->fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID);
285 if(!wantAllEvents_) {
288 if(!selectors_.wantEvent(ep,
nullptr)) {
303 esids.push_back(selector_config_id_);
307 auto & processHistoryRegistry = processHistoryRegistries_[principal.
streamID().
value()];
308 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
311 processHistoryRegistry,
317 propagateProducts(
InEvent, principal, ep);
319 schedule_->processOneEvent<Traits>(ep.
streamID().
value(),ep, esp_->eventSetup());
320 if(subProcess_.get()) subProcess_->doEvent(ep);
327 beginRun(principal,ts);
334 boost::shared_ptr<RunPrincipal> rpp(
new RunPrincipal(aux, preg_, *processConfiguration_, &(historyAppenders_[historyRunOffset_+principal.
index()]),principal.
index()));
335 auto & processHistoryRegistry = processHistoryRegistries_[historyRunOffset_+principal.
index()];
336 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
337 rpp->fillRunPrincipal(processHistoryRegistry, principal.
reader());
338 principalCache_.insert(rpp);
343 parentToChildPhID_.insert(std::make_pair(parentInputReducedPHID,inputReducedPHID));
346 propagateProducts(
InRun, principal, rp);
348 schedule_->processOneGlobal<Traits>(rp, esp_->eventSetupForInstance(ts));
349 if(subProcess_.get()) subProcess_->doBeginRun(rp, ts);
355 endRun(principal,ts,cleaningUpAfterException);
361 propagateProducts(
InRun, principal, rp);
363 schedule_->processOneGlobal<Traits>(rp, esp_->eventSetupForInstance(ts), cleaningUpAfterException);
364 if(subProcess_.get()) subProcess_->doEndRun(rp, ts, cleaningUpAfterException);
370 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
371 assert(it != parentToChildPhID_.end());
372 schedule_->writeRun(principalCache_.runPrincipal(it->second, runNumber), &processContext_);
373 if(subProcess_.get()) subProcess_->writeRun(it->second, runNumber);
378 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
379 assert(it != parentToChildPhID_.end());
380 principalCache_.deleteRun(it->second, runNumber);
381 if(subProcess_.get()) subProcess_->deleteRunFromCache(it->second, runNumber);
387 beginLuminosityBlock(principal,ts);
394 boost::shared_ptr<LuminosityBlockPrincipal> lbpp(
new LuminosityBlockPrincipal(aux, preg_, *processConfiguration_, &(historyAppenders_[historyLumiOffset_+principal.
index()]),principal.
index()));
395 auto & processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_+principal.
index()];
396 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
397 lbpp->fillLuminosityBlockPrincipal(processHistoryRegistry, principal.
reader());
398 lbpp->setRunPrincipal(principalCache_.runPrincipalPtr());
399 principalCache_.insert(lbpp);
401 propagateProducts(
InLumi, principal, lbp);
403 schedule_->processOneGlobal<Traits>(lbp, esp_->eventSetupForInstance(ts));
404 if(subProcess_.get()) subProcess_->doBeginLuminosityBlock(lbp, ts);
410 endLuminosityBlock(principal,ts,cleaningUpAfterException);
416 propagateProducts(
InLumi, principal, lbp);
418 schedule_->processOneGlobal<Traits>(lbp, esp_->eventSetupForInstance(ts), cleaningUpAfterException);
419 if(subProcess_.get()) subProcess_->doEndLuminosityBlock(lbp, ts, cleaningUpAfterException);
425 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
426 assert(it != parentToChildPhID_.end());
427 schedule_->writeLumi(principalCache_.lumiPrincipal(it->second, runNumber, lumiNumber), &processContext_);
428 if(subProcess_.get()) subProcess_->writeLumi(it->second, runNumber, lumiNumber);
433 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
434 assert(it != parentToChildPhID_.end());
435 principalCache_.deleteLumi(it->second, runNumber, lumiNumber);
436 if(subProcess_.get()) subProcess_->deleteLumiFromCache(it->second, runNumber, lumiNumber);
440 SubProcess::doBeginStream(
unsigned int iID) {
442 schedule_->beginStream(iID);
443 if(subProcess_.get()) subProcess_->doBeginStream(iID);
447 SubProcess::doEndStream(
unsigned int iID) {
449 schedule_->endStream(iID);
450 if(subProcess_.get()) subProcess_->doEndStream(iID);
459 schedule_->processOneStream<Traits>(id,rp, esp_->eventSetupForInstance(ts));
460 if(subProcess_.get()) subProcess_->doStreamBeginRun(
id,rp, ts);
470 schedule_->processOneStream<Traits>(id,rp, esp_->eventSetupForInstance(ts),cleaningUpAfterException);
471 if(subProcess_.get()) subProcess_->doStreamEndRun(
id,rp, ts,cleaningUpAfterException);
481 schedule_->processOneStream<Traits>(id,lbp, esp_->eventSetupForInstance(ts));
482 if(subProcess_.get()) subProcess_->doStreamBeginLuminosityBlock(
id,lbp, ts);
492 schedule_->processOneStream<Traits>(id,lbp, esp_->eventSetupForInstance(ts),cleaningUpAfterException);
493 if(subProcess_.get()) subProcess_->doStreamEndLuminosityBlock(
id,lbp, ts,cleaningUpAfterException);
501 for(
auto const& item : keptVector) {
503 if(parentProductHolder !=
nullptr) {
506 if(productHolder !=
nullptr) {
534 schedule_->respondToOpenInputFile(fb);
535 if(subProcess_.get()) subProcess_->respondToOpenInputFile(fb);
539 std::auto_ptr<ParameterSet>
541 std::vector<std::string> subProcesses = parameterSet.
getUntrackedParameter<std::vector<std::string> >(
"@all_subprocesses");
542 if(!subProcesses.empty()) {
543 assert(subProcesses.size() == 1U);
544 assert(subProcesses[0] ==
"@sub_process");
547 return std::auto_ptr<ParameterSet>(
nullptr);
unsigned int historyRunOffset_
ParameterSetID selector_config_id_
bool productProvenanceValid() const
void setLuminosityBlockPrincipal(boost::shared_ptr< LuminosityBlockPrincipal > const &lbp)
ProductRegistry const & productRegistry() const
T getUntrackedParameter(std::string const &, T const &) const
EventSelectionIDVector const & eventSelectionIDs() const
BranchType const & branchType() const
ProcessHistoryID const & reducedProcessHistoryID() const
ConstProductHolderPtr getProductHolder(BranchID const &oid) const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet ¶meterSet)
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
BranchIDLists const & branchIDLists() const
bool exists(std::string const ¶meterName) const
checks if a parameter exists
void call(boost::function< void(void)>)
ProductProvenance * productProvenance() const
LuminosityBlockAuxiliary const & aux() const
LuminosityBlockIndex index() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void insert(boost::shared_ptr< RunPrincipal > rp)
void setParentProcessContext(ProcessContext const *parentProcessContext)
void setStore(boost::shared_ptr< ProductProvenanceRetriever > store) const
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
BranchListIndexes const & branchListIndexes() const
void resetProductProvenance() const
ProcessHistory const & processHistory() const
std::vector< EventSelectionID > EventSelectionIDVector
void selectProducts(ProductRegistry const &preg)
ProductList const & productList() const
ServiceToken serviceToken_
std::vector< BranchListIndex > BranchListIndexes
ProcessHistoryID const & processHistoryID() const
ProcessHistory const & processHistory() const
static void setThrowAnException(bool v)
StreamID streamID() const
boost::shared_ptr< void const > wrapper_
std::vector< BranchDescription const * > allBranchDescriptions() const
std::auto_ptr< ParameterSet > popParameterSet(std::string const &name)
SubProcess(ParameterSet ¶meterSet, ParameterSet const &topLevelParameterSet, boost::shared_ptr< ProductRegistry const > parentProductRegistry, boost::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext)
BranchID const & branchID() const
RunAuxiliary const & aux() const
ProductData const & productData() const
boost::shared_ptr< ProductProvenanceRetriever > const & store() const
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=nullptr)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< ExceptionToActionTable const > act_table_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
DelayedReader * reader() const
unsigned int value() const
detail::TriggerResultsBasedEventSelector selectors_
boost::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &)
boost::shared_ptr< ProductProvenanceRetriever > productProvenanceRetrieverPtr() const
std::auto_ptr< Schedule > schedule_
void clearEventPrincipal()
void connectToSubProcess(ActivityRegistry &iOther)
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
bool anyProductProduced() const
void setProductID(ProductID const &pid)
std::vector< HistoryAppender > historyAppenders_
std::vector< BranchID::value_type > BranchIDList
std::unique_ptr< ParameterSet > processParameterSet_
std::vector< std::string > const & getAllTriggerNames()
void setProcessHistory(ProcessHistory const &ph)
std::vector< BranchDescription const * > SelectedProducts
unsigned int numberOfStreams() const
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
void setProductProvenance(ProductProvenance const &prov) const
ProcessContext processContext_
BranchID const & originalBranchID() const
ProductID const & productID() const
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
boost::shared_ptr< ProductRegistry const > preg_
EventAuxiliary const & aux() const
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector)
ParameterSet const & registerIt()
PrincipalCache principalCache_
ParameterSet const & parameterSet(Provenance const &provenance)
bool productUnavailable() const