37 boost::shared_ptr<ProductRegistry const> parentProductRegistry,
38 boost::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
46 parentPreg_(parentProductRegistry),
48 branchIDListHelper_(),
50 processConfiguration_(),
51 historyLumiOffset_(preallocConfig.numberOfStreams()),
52 historyRunOffset_(historyLumiOffset_+preallocConfig.numberOfLuminosityBlocks()),
53 processHistoryRegistries_(historyRunOffset_+ preallocConfig.numberOfRuns()),
54 historyAppenders_(historyRunOffset_+preallocConfig.numberOfRuns()),
60 processParameterSet_(),
61 productSelectorRules_(parameterSet,
"outputCommands",
"OutputModule"),
63 wantAllEvents_(
true) {
73 tns->getProcessName(),
76 std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
79 outputModulePathPositions,
80 parentProductRegistry->anyProductProduced());
97 if(topLevelParameterSet.
exists(maxEvents)) {
100 if(topLevelParameterSet.
exists(maxLumis)) {
141 preg_.reset(items.preg_.release());
159 ep->preModuleDelayedGetSignal_.connect(std::cref(items.actReg_->preModuleEventDelayedGetSignal_));
160 ep->postModuleDelayedGetSignal_.connect(std::cref(items.actReg_->postModuleEventDelayedGetSignal_));
163 if(subProcessParameterSet) {
165 topLevelParameterSet,
203 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
205 if(
subProcess_.get()) c.
call([
this](){ this->subProcess_->doEndJob();});
213 if(productSelector_.initialized())
return;
220 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
229 }
else if(productSelector_.selected(desc)) {
235 std::map<BranchID, BranchDescription const*>::const_iterator
iter = trueBranchIDToKeptBranchDesc.find(trueBranchID);
236 if(iter != trueBranchIDToKeptBranchDesc.end()) {
238 <<
"Two (or more) equivalent branches have been selected for output.\n"
240 <<
"#2: " <<
BranchKey(*iter->second) <<
"\n"
241 <<
"Please drop at least one of them.\n";
243 trueBranchIDToKeptBranchDesc.insert(std::make_pair(trueBranchID, &desc));
246 keptProducts_[desc.
branchType()].push_back(&desc);
254 std::map<BranchID, BranchDescription const*>::const_iterator
iter = trueBranchIDToKeptBranchDesc.find(branchID);
255 if(iter != trueBranchIDToKeptBranchDesc.end()) {
257 BranchID const& keptBranchID = iter->second->branchID();
258 if(keptBranchID != branchID) {
260 droppedBranchIDToKeptBranchID_.insert(std::make_pair(branchID.
id(), keptBranchID.
id()));
267 SubProcess::fixBranchIDListsForEDAliases(std::map<BranchID::value_type, BranchID::value_type>
const& droppedBranchIDToKeptBranchID) {
270 for(
BranchIDList& branchIDList : branchIDListHelper_->mutableBranchIDLists()) {
272 std::map<BranchID::value_type, BranchID::value_type>::const_iterator
iter = droppedBranchIDToKeptBranchID.find(branchID);
273 if(iter != droppedBranchIDToKeptBranchID.end()) {
274 branchID = iter->second;
278 if(subProcess_.get()) subProcess_->fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID);
288 if(!wantAllEvents_) {
291 if(!selectors_.wantEvent(ep,
nullptr)) {
306 esids.push_back(selector_config_id_);
310 auto & processHistoryRegistry = processHistoryRegistries_[principal.
streamID().
value()];
311 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
313 branchIDListHelper_->fixBranchListIndexes(bli);
315 processHistoryRegistry,
321 propagateProducts(
InEvent, principal, ep);
323 schedule_->processOneEvent<Traits>(ep.
streamID().
value(),ep, esp_->eventSetup());
324 if(subProcess_.get()) subProcess_->doEvent(ep);
331 beginRun(principal,ts);
338 boost::shared_ptr<RunPrincipal> rpp(
new RunPrincipal(aux, preg_, *processConfiguration_, &(historyAppenders_[historyRunOffset_+principal.
index()]),principal.
index()));
339 auto & processHistoryRegistry = processHistoryRegistries_[historyRunOffset_+principal.
index()];
340 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
341 rpp->fillRunPrincipal(processHistoryRegistry, principal.
reader());
342 principalCache_.insert(rpp);
347 parentToChildPhID_.insert(std::make_pair(parentInputReducedPHID,inputReducedPHID));
350 propagateProducts(
InRun, principal, rp);
352 schedule_->processOneGlobal<Traits>(rp, esp_->eventSetupForInstance(ts));
353 if(subProcess_.get()) subProcess_->doBeginRun(rp, ts);
359 endRun(principal,ts,cleaningUpAfterException);
365 propagateProducts(
InRun, principal, rp);
367 schedule_->processOneGlobal<Traits>(rp, esp_->eventSetupForInstance(ts), cleaningUpAfterException);
368 if(subProcess_.get()) subProcess_->doEndRun(rp, ts, cleaningUpAfterException);
374 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
375 assert(it != parentToChildPhID_.end());
376 schedule_->writeRun(principalCache_.runPrincipal(it->second, runNumber), &processContext_);
377 if(subProcess_.get()) subProcess_->writeRun(it->second, runNumber);
382 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
383 assert(it != parentToChildPhID_.end());
384 principalCache_.deleteRun(it->second, runNumber);
385 if(subProcess_.get()) subProcess_->deleteRunFromCache(it->second, runNumber);
391 beginLuminosityBlock(principal,ts);
398 boost::shared_ptr<LuminosityBlockPrincipal> lbpp(
new LuminosityBlockPrincipal(aux, preg_, *processConfiguration_, &(historyAppenders_[historyLumiOffset_+principal.
index()]),principal.
index()));
399 auto & processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_+principal.
index()];
400 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
401 lbpp->fillLuminosityBlockPrincipal(processHistoryRegistry, principal.
reader());
402 lbpp->setRunPrincipal(principalCache_.runPrincipalPtr());
403 principalCache_.insert(lbpp);
405 propagateProducts(
InLumi, principal, lbp);
407 schedule_->processOneGlobal<Traits>(lbp, esp_->eventSetupForInstance(ts));
408 if(subProcess_.get()) subProcess_->doBeginLuminosityBlock(lbp, ts);
414 endLuminosityBlock(principal,ts,cleaningUpAfterException);
420 propagateProducts(
InLumi, principal, lbp);
422 schedule_->processOneGlobal<Traits>(lbp, esp_->eventSetupForInstance(ts), cleaningUpAfterException);
423 if(subProcess_.get()) subProcess_->doEndLuminosityBlock(lbp, ts, cleaningUpAfterException);
429 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
430 assert(it != parentToChildPhID_.end());
431 schedule_->writeLumi(principalCache_.lumiPrincipal(it->second, runNumber, lumiNumber), &processContext_);
432 if(subProcess_.get()) subProcess_->writeLumi(it->second, runNumber, lumiNumber);
437 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
438 assert(it != parentToChildPhID_.end());
439 principalCache_.deleteLumi(it->second, runNumber, lumiNumber);
440 if(subProcess_.get()) subProcess_->deleteLumiFromCache(it->second, runNumber, lumiNumber);
444 SubProcess::doBeginStream(
unsigned int iID) {
446 schedule_->beginStream(iID);
447 if(subProcess_.get()) subProcess_->doBeginStream(iID);
451 SubProcess::doEndStream(
unsigned int iID) {
453 schedule_->endStream(iID);
454 if(subProcess_.get()) subProcess_->doEndStream(iID);
463 schedule_->processOneStream<Traits>(id,rp, esp_->eventSetupForInstance(ts));
464 if(subProcess_.get()) subProcess_->doStreamBeginRun(
id,rp, ts);
474 schedule_->processOneStream<Traits>(id,rp, esp_->eventSetupForInstance(ts),cleaningUpAfterException);
475 if(subProcess_.get()) subProcess_->doStreamEndRun(
id,rp, ts,cleaningUpAfterException);
485 schedule_->processOneStream<Traits>(id,lbp, esp_->eventSetupForInstance(ts));
486 if(subProcess_.get()) subProcess_->doStreamBeginLuminosityBlock(
id,lbp, ts);
496 schedule_->processOneStream<Traits>(id,lbp, esp_->eventSetupForInstance(ts),cleaningUpAfterException);
497 if(subProcess_.get()) subProcess_->doStreamEndLuminosityBlock(
id,lbp, ts,cleaningUpAfterException);
505 for(
auto const& item : keptVector) {
507 if(parentProductHolder !=
nullptr) {
510 if(productHolder !=
nullptr) {
533 void SubProcess::updateBranchIDListHelper(
BranchIDLists const& branchIDLists) {
534 branchIDListHelper_->updateFromParent(branchIDLists);
535 if(subProcess_.get()) {
536 subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
544 schedule_->respondToOpenInputFile(fb);
545 if(subProcess_.get()) subProcess_->respondToOpenInputFile(fb);
549 std::auto_ptr<ParameterSet>
551 std::vector<std::string> subProcesses = parameterSet.
getUntrackedParameter<std::vector<std::string> >(
"@all_subprocesses");
552 if(!subProcesses.empty()) {
553 assert(subProcesses.size() == 1U);
554 assert(subProcesses[0] ==
"@sub_process");
557 return std::auto_ptr<ParameterSet>(
nullptr);
unsigned int historyRunOffset_
ParameterSetID selector_config_id_
bool productProvenanceValid() const
void setLuminosityBlockPrincipal(boost::shared_ptr< LuminosityBlockPrincipal > const &lbp)
ProductRegistry const & productRegistry() const
T getUntrackedParameter(std::string const &, T const &) const
EventSelectionIDVector const & eventSelectionIDs() const
BranchType const & branchType() const
std::vector< BranchIDList > BranchIDLists
ProcessHistoryID const & reducedProcessHistoryID() const
ConstProductHolderPtr getProductHolder(BranchID const &oid) const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet ¶meterSet)
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
bool exists(std::string const ¶meterName) const
checks if a parameter exists
void call(boost::function< void(void)>)
void updateBranchIDListHelper(BranchIDLists const &)
ProductProvenance * productProvenance() const
LuminosityBlockAuxiliary const & aux() const
LuminosityBlockIndex index() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void insert(boost::shared_ptr< RunPrincipal > rp)
void setParentProcessContext(ProcessContext const *parentProcessContext)
void setStore(boost::shared_ptr< ProductProvenanceRetriever > store) const
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
BranchListIndexes const & branchListIndexes() const
void resetProductProvenance() const
ProcessHistory const & processHistory() const
std::vector< EventSelectionID > EventSelectionIDVector
void selectProducts(ProductRegistry const &preg)
ProductList const & productList() const
ServiceToken serviceToken_
std::vector< BranchListIndex > BranchListIndexes
ProcessHistoryID const & processHistoryID() const
ProcessHistory const & processHistory() const
static void setThrowAnException(bool v)
StreamID streamID() const
boost::shared_ptr< void const > wrapper_
std::vector< BranchDescription const * > allBranchDescriptions() const
std::auto_ptr< ParameterSet > popParameterSet(std::string const &name)
SubProcess(ParameterSet ¶meterSet, ParameterSet const &topLevelParameterSet, boost::shared_ptr< ProductRegistry const > parentProductRegistry, boost::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext)
BranchID const & branchID() const
RunAuxiliary const & aux() const
ProductData const & productData() const
boost::shared_ptr< ProductProvenanceRetriever > const & store() const
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=nullptr)
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< ExceptionToActionTable const > act_table_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
DelayedReader * reader() const
unsigned int value() const
detail::TriggerResultsBasedEventSelector selectors_
boost::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &)
boost::shared_ptr< ProductProvenanceRetriever > productProvenanceRetrieverPtr() const
std::auto_ptr< Schedule > schedule_
void clearEventPrincipal()
void connectToSubProcess(ActivityRegistry &iOther)
boost::shared_ptr< BranchIDListHelper > branchIDListHelper_
bool anyProductProduced() const
void setProductID(ProductID const &pid)
std::vector< HistoryAppender > historyAppenders_
std::vector< BranchID::value_type > BranchIDList
std::unique_ptr< ParameterSet > processParameterSet_
std::vector< std::string > const & getAllTriggerNames()
void setProcessHistory(ProcessHistory const &ph)
std::vector< BranchDescription const * > SelectedProducts
unsigned int numberOfStreams() const
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
void setProductProvenance(ProductProvenance const &prov) const
ProcessContext processContext_
BranchID const & originalBranchID() const
ProductID const & productID() const
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
std::auto_ptr< SubProcess > subProcess_
boost::shared_ptr< ProcessConfiguration const > processConfiguration_
boost::shared_ptr< ProductRegistry const > preg_
EventAuxiliary const & aux() const
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector)
ParameterSet const & registerIt()
PrincipalCache principalCache_
ParameterSet const & parameterSet(Provenance const &provenance)
bool productUnavailable() const