38 std::shared_ptr<ProductRegistry const> parentProductRegistry,
39 std::shared_ptr<BranchIDListHelper const> parentBranchIDListHelper,
48 parentPreg_(parentProductRegistry),
50 branchIDListHelper_(),
52 processConfiguration_(),
53 historyLumiOffset_(preallocConfig.numberOfStreams()),
54 historyRunOffset_(historyLumiOffset_+preallocConfig.numberOfLuminosityBlocks()),
55 processHistoryRegistries_(historyRunOffset_+ preallocConfig.numberOfRuns()),
56 historyAppenders_(historyRunOffset_+preallocConfig.numberOfRuns()),
62 processParameterSet_(),
63 productSelectorRules_(parameterSet,
"outputCommands",
"OutputModule"),
65 wantAllEvents_(
true) {
75 tns->getProcessName(),
78 std::map<std::string, std::vector<std::pair<std::string, int> > > outputModulePathPositions;
81 outputModulePathPositions,
82 parentProductRegistry->anyProductProduced());
84 std::map<BranchID, bool> keepAssociation;
85 selectProducts(*parentProductRegistry, parentThinnedAssociationsHelper, keepAssociation);
101 if(topLevelParameterSet.
exists(maxEvents)) {
104 if(topLevelParameterSet.
exists(maxLumis)) {
148 preg_.reset(items.preg_.release());
162 ep->preModuleDelayedGetSignal_.connect(std::cref(items.actReg_->preModuleEventDelayedGetSignal_));
163 ep->postModuleDelayedGetSignal_.connect(std::cref(items.actReg_->postModuleEventDelayedGetSignal_));
166 if(subProcessParameterSet) {
168 topLevelParameterSet,
171 *thinnedAssociationsHelper_,
207 ExceptionCollector c(
"Multiple exceptions were thrown while executing endJob. An exception message follows for each.");
209 if(
subProcess_.get()) c.
call([
this](){ this->subProcess_->doEndJob();});
218 std::map<BranchID, bool>& keepAssociation) {
219 if(productSelector_.initialized())
return;
226 std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
227 std::vector<BranchDescription const*> associationDescriptions;
228 std::set<BranchID> keptProductsInEvent;
238 associationDescriptions.push_back(&desc);
239 }
else if(productSelector_.selected(desc)) {
240 keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
248 for(
auto association : associationDescriptions) {
249 if(keepAssociation[association->branchID()]) {
250 keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
255 ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);
259 std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
260 std::set<BranchID>& keptProductsInEvent) {
262 ProductSelector::checkForDuplicateKeptBranch(desc,
263 trueBranchIDToKeptBranchDesc);
269 keptProductsInEvent.insert(desc.
branchID());
273 keptProducts_[desc.
branchType()].push_back(&desc);
277 SubProcess::fixBranchIDListsForEDAliases(std::map<BranchID::value_type, BranchID::value_type>
const& droppedBranchIDToKeptBranchID) {
280 for(
BranchIDList& branchIDList : branchIDListHelper_->mutableBranchIDLists()) {
282 std::map<BranchID::value_type, BranchID::value_type>::const_iterator
iter = droppedBranchIDToKeptBranchID.find(branchID);
283 if(iter != droppedBranchIDToKeptBranchID.end()) {
284 branchID = iter->second;
288 if(subProcess_.get()) subProcess_->fixBranchIDListsForEDAliases(droppedBranchIDToKeptBranchID);
298 if(!wantAllEvents_) {
301 if(!selectors_.wantEvent(ep,
nullptr)) {
316 esids.push_back(selector_config_id_);
320 auto & processHistoryRegistry = processHistoryRegistries_[principal.
streamID().
value()];
321 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
323 branchIDListHelper_->fixBranchListIndexes(bli);
325 processHistoryRegistry,
331 propagateProducts(
InEvent, principal, ep);
333 schedule_->processOneEvent<Traits>(ep.
streamID().
value(),ep, esp_->eventSetup());
334 if(subProcess_.get()) subProcess_->doEvent(ep);
341 beginRun(principal,ts);
346 auto aux = std::make_shared<RunAuxiliary>(principal.
aux());
348 auto rpp = std::make_shared<RunPrincipal>(
aux, preg_, *processConfiguration_, &(historyAppenders_[historyRunOffset_+principal.
index()]),principal.
index());
349 auto & processHistoryRegistry = processHistoryRegistries_[historyRunOffset_+principal.
index()];
350 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
351 rpp->fillRunPrincipal(processHistoryRegistry, principal.
reader());
352 principalCache_.insert(rpp);
357 parentToChildPhID_.insert(std::make_pair(parentInputReducedPHID,inputReducedPHID));
360 propagateProducts(
InRun, principal, rp);
362 schedule_->processOneGlobal<Traits>(rp, esp_->eventSetupForInstance(ts));
363 if(subProcess_.get()) subProcess_->doBeginRun(rp, ts);
369 endRun(principal,ts,cleaningUpAfterException);
375 propagateProducts(
InRun, principal, rp);
377 schedule_->processOneGlobal<Traits>(rp, esp_->eventSetupForInstance(ts), cleaningUpAfterException);
378 if(subProcess_.get()) subProcess_->doEndRun(rp, ts, cleaningUpAfterException);
384 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
385 assert(it != parentToChildPhID_.end());
386 schedule_->writeRun(principalCache_.runPrincipal(it->second, runNumber), &processContext_);
387 if(subProcess_.get()) subProcess_->writeRun(it->second, runNumber);
392 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
393 assert(it != parentToChildPhID_.end());
394 principalCache_.deleteRun(it->second, runNumber);
395 if(subProcess_.get()) subProcess_->deleteRunFromCache(it->second, runNumber);
401 beginLuminosityBlock(principal,ts);
406 auto aux = std::make_shared<LuminosityBlockAuxiliary>(principal.
aux());
408 auto lbpp = std::make_shared<LuminosityBlockPrincipal>(
aux, preg_, *processConfiguration_, &(historyAppenders_[historyLumiOffset_+principal.
index()]),principal.
index());
409 auto & processHistoryRegistry = processHistoryRegistries_[historyLumiOffset_+principal.
index()];
410 processHistoryRegistry.registerProcessHistory(principal.
processHistory());
411 lbpp->fillLuminosityBlockPrincipal(processHistoryRegistry, principal.
reader());
412 lbpp->setRunPrincipal(principalCache_.runPrincipalPtr());
413 principalCache_.insert(lbpp);
415 propagateProducts(
InLumi, principal, lbp);
417 schedule_->processOneGlobal<Traits>(lbp, esp_->eventSetupForInstance(ts));
418 if(subProcess_.get()) subProcess_->doBeginLuminosityBlock(lbp, ts);
424 endLuminosityBlock(principal,ts,cleaningUpAfterException);
430 propagateProducts(
InLumi, principal, lbp);
432 schedule_->processOneGlobal<Traits>(lbp, esp_->eventSetupForInstance(ts), cleaningUpAfterException);
433 if(subProcess_.get()) subProcess_->doEndLuminosityBlock(lbp, ts, cleaningUpAfterException);
439 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
440 assert(it != parentToChildPhID_.end());
441 schedule_->writeLumi(principalCache_.lumiPrincipal(it->second, runNumber, lumiNumber), &processContext_);
442 if(subProcess_.get()) subProcess_->writeLumi(it->second, runNumber, lumiNumber);
447 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
448 assert(it != parentToChildPhID_.end());
449 principalCache_.deleteLumi(it->second, runNumber, lumiNumber);
450 if(subProcess_.get()) subProcess_->deleteLumiFromCache(it->second, runNumber, lumiNumber);
454 SubProcess::doBeginStream(
unsigned int iID) {
456 schedule_->beginStream(iID);
457 if(subProcess_.get()) subProcess_->doBeginStream(iID);
461 SubProcess::doEndStream(
unsigned int iID) {
463 schedule_->endStream(iID);
464 if(subProcess_.get()) subProcess_->doEndStream(iID);
473 schedule_->processOneStream<Traits>(id,rp, esp_->eventSetupForInstance(ts));
474 if(subProcess_.get()) subProcess_->doStreamBeginRun(
id,rp, ts);
484 schedule_->processOneStream<Traits>(id,rp, esp_->eventSetupForInstance(ts),cleaningUpAfterException);
485 if(subProcess_.get()) subProcess_->doStreamEndRun(
id,rp, ts,cleaningUpAfterException);
495 schedule_->processOneStream<Traits>(id,lbp, esp_->eventSetupForInstance(ts));
496 if(subProcess_.get()) subProcess_->doStreamBeginLuminosityBlock(
id,lbp, ts);
506 schedule_->processOneStream<Traits>(id,lbp, esp_->eventSetupForInstance(ts),cleaningUpAfterException);
507 if(subProcess_.get()) subProcess_->doStreamEndLuminosityBlock(
id,lbp, ts,cleaningUpAfterException);
515 for(
auto const& item : keptVector) {
517 if(parentProductHolder !=
nullptr) {
520 if(productHolder !=
nullptr) {
543 void SubProcess::updateBranchIDListHelper(
BranchIDLists const& branchIDLists) {
544 branchIDListHelper_->updateFromParent(branchIDLists);
545 if(subProcess_.get()) {
546 subProcess_->updateBranchIDListHelper(branchIDListHelper_->branchIDLists());
554 schedule_->respondToOpenInputFile(fb);
555 if(subProcess_.get()) subProcess_->respondToOpenInputFile(fb);
559 std::auto_ptr<ParameterSet>
561 std::vector<std::string> subProcesses = parameterSet.
getUntrackedParameter<std::vector<std::string> >(
"@all_subprocesses");
562 if(!subProcesses.empty()) {
563 assert(subProcesses.size() == 1U);
564 assert(subProcesses[0] ==
"@sub_process");
567 return std::auto_ptr<ParameterSet>(
nullptr);
unsigned int historyRunOffset_
ParameterSetID selector_config_id_
void insert(std::shared_ptr< RunPrincipal > rp)
bool productProvenanceValid() const
ProductRegistry const & productRegistry() const
T getUntrackedParameter(std::string const &, T const &) const
void setLuminosityBlockPrincipal(std::shared_ptr< LuminosityBlockPrincipal > const &lbp)
EventSelectionIDVector const & eventSelectionIDs() const
BranchType const & branchType() const
std::vector< BranchIDList > BranchIDLists
ProcessHistoryID const & reducedProcessHistoryID() const
ConstProductHolderPtr getProductHolder(BranchID const &oid) const
void setNumberOfConcurrentPrincipals(PreallocationConfiguration const &)
std::vector< ProcessHistoryRegistry > processHistoryRegistries_
std::auto_ptr< ParameterSet > popSubProcessParameterSet(ParameterSet ¶meterSet)
boost::shared_ptr< eventsetup::EventSetupProvider > esp_
bool exists(std::string const ¶meterName) const
checks if a parameter exists
void updateBranchIDListHelper(BranchIDLists const &)
ProductProvenance * productProvenance() const
LuminosityBlockAuxiliary const & aux() const
LuminosityBlockIndex index() const
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
void setParentProcessContext(ProcessContext const *parentProcessContext)
std::map< BranchID::value_type, BranchID::value_type > const & droppedBranchIDToKeptBranchID()
BranchListIndexes const & branchListIndexes() const
void resetProductProvenance() const
ProcessHistory const & processHistory() const
std::vector< EventSelectionID > EventSelectionIDVector
ProductList const & productList() const
void selectAssociationProducts(std::vector< BranchDescription const * > const &associationDescriptions, std::set< BranchID > const &keptProductsInEvent, std::map< BranchID, bool > &keepAssociation) const
ServiceToken serviceToken_
std::vector< BranchListIndex > BranchListIndexes
ProcessHistoryID const & processHistoryID() const
std::shared_ptr< ProductProvenanceRetriever > productProvenanceRetrieverPtr() const
void selectProducts(ProductRegistry const &preg, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, std::map< BranchID, bool > &keepAssociation)
ProcessHistory const & processHistory() const
static void setThrowAnException(bool v)
std::shared_ptr< ProductProvenanceRetriever > const & store() const
StreamID streamID() const
std::shared_ptr< WrapperBase > wrapper_
std::vector< BranchDescription const * > allBranchDescriptions() const
std::auto_ptr< ParameterSet > popParameterSet(std::string const &name)
BranchID const & branchID() const
TypeWithDict const & unwrappedType() const
RunAuxiliary const & aux() const
ProductData const & productData() const
std::type_info const & typeInfo() const
std::shared_ptr< BranchIDListHelper > branchIDListHelper_
void setProcessConfiguration(ProcessConfiguration const *processConfiguration)
std::unique_ptr< ExceptionToActionTable const > act_table_
void setProcessHistoryRegistry(ProcessHistoryRegistry const &phr)
DelayedReader * reader() const
std::map< BranchID::value_type, BranchID::value_type > droppedBranchIDToKeptBranchID_
unsigned int value() const
detail::TriggerResultsBasedEventSelector selectors_
boost::shared_ptr< EventSetupProvider > makeProvider(ParameterSet &)
std::auto_ptr< Schedule > schedule_
void clearEventPrincipal()
void connectToSubProcess(ActivityRegistry &iOther)
bool anyProductProduced() const
void setProductID(ProductID const &pid)
std::vector< HistoryAppender > historyAppenders_
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper_
std::vector< BranchID::value_type > BranchIDList
void setStore(std::shared_ptr< ProductProvenanceRetriever > store) const
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
std::unique_ptr< ParameterSet > processParameterSet_
std::vector< std::string > const & getAllTriggerNames()
void setProcessHistory(ProcessHistory const &ph)
std::vector< BranchDescription const * > SelectedProducts
unsigned int numberOfStreams() const
void fixBranchIDListsForEDAliases(std::map< BranchID::value_type, BranchID::value_type > const &droppedBranchIDToKeptBranchID)
void setProductProvenance(ProductProvenance const &prov) const
ProcessContext processContext_
BranchID const & originalBranchID() const
ProductID const & productID() const
void call(std::function< void(void)>)
ParameterSetID registerProperSelectionInfo(edm::ParameterSet const &iInitial, std::string const &iLabel, std::map< std::string, std::vector< std::pair< std::string, int > > > const &outputModulePathPositions, bool anyProductProduced)
SubProcess(ParameterSet ¶meterSet, ParameterSet const &topLevelParameterSet, std::shared_ptr< ProductRegistry const > parentProductRegistry, std::shared_ptr< BranchIDListHelper const > parentBranchIDListHelper, ThinnedAssociationsHelper const &parentThinnedAssociationsHelper, eventsetup::EventSetupsController &esController, ActivityRegistry &parentActReg, ServiceToken const &token, serviceregistry::ServiceLegacy iLegacy, PreallocationConfiguration const &preallocConfig, ProcessContext const *parentProcessContext)
std::auto_ptr< SubProcess > subProcess_
EventAuxiliary const & aux() const
bool configureEventSelector(edm::ParameterSet const &iPSet, std::string const &iProcessName, std::vector< std::string > const &iAllTriggerNames, edm::detail::TriggerResultsBasedEventSelector &oSelector)
ParameterSet const & registerIt()
std::shared_ptr< ProductRegistry const > preg_
PrincipalCache principalCache_
ParameterSet const & parameterSet(Provenance const &provenance)
std::shared_ptr< ProcessConfiguration const > processConfiguration_
bool productUnavailable() const