00001 #include "FWCore/Framework/interface/SubProcess.h"
00002
00003 #include "DataFormats/Provenance/interface/BranchID.h"
00004 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
00005 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
00006 #include "FWCore/Framework/interface/Group.h"
00007 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00008 #include "FWCore/Framework/interface/OccurrenceTraits.h"
00009 #include "FWCore/Framework/interface/RunPrincipal.h"
00010 #include "FWCore/Framework/interface/Schedule.h"
00011 #include "FWCore/Framework/src/EventSetupsController.h"
00012 #include "FWCore/Framework/src/SignallingProductRegistry.h"
00013 #include "FWCore/ParameterSet/interface/IllegalParameters.h"
00014 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00015 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00016 #include "FWCore/Utilities/interface/EDMException.h"
00017
00018 #include <cassert>
00019 #include <set>
00020 #include <string>
00021 #include <vector>
00022
00023 namespace edm {
00024
00025 SubProcess::SubProcess(ParameterSet& parameterSet,
00026 ParameterSet const& topLevelParameterSet,
00027 boost::shared_ptr<ProductRegistry const> parentProductRegistry,
00028 eventsetup::EventSetupsController& esController,
00029 ActivityRegistry& parentActReg,
00030 ServiceToken const& token,
00031 serviceregistry::ServiceLegacy iLegacy) :
00032 OutputModule(parameterSet),
00033 serviceToken_(),
00034 parentPreg_(parentProductRegistry),
00035 preg_(),
00036 act_table_(),
00037 processConfiguration_(),
00038 principalCache_(),
00039 esp_(),
00040 schedule_(),
00041 parentToChildPhID_(),
00042 esInfo_(0),
00043 subProcess_() {
00044
00045 std::string const maxEvents("maxEvents");
00046 std::string const maxLumis("maxLuminosityBlocks");
00047
00048 std::auto_ptr<ParameterSet> processParameterSet = parameterSet.popParameterSet(std::string("process"));
00049
00050
00051 if(processParameterSet->exists(maxEvents)) {
00052 processParameterSet->popParameterSet(maxEvents);
00053 }
00054 if(processParameterSet->exists(maxLumis)) {
00055 processParameterSet->popParameterSet(maxLumis);
00056 }
00057
00058
00059 if(topLevelParameterSet.exists(maxEvents)) {
00060 processParameterSet->addUntrackedParameter<ParameterSet>(maxEvents, topLevelParameterSet.getUntrackedParameterSet(maxEvents));
00061 }
00062 if(topLevelParameterSet.exists(maxLumis)) {
00063 processParameterSet->addUntrackedParameter<ParameterSet>(maxLumis, topLevelParameterSet.getUntrackedParameterSet(maxLumis));
00064 }
00065
00066
00067
00068 boost::shared_ptr<ParameterSet> subProcessParameterSet(popSubProcessParameterSet(*processParameterSet).release());
00069
00070 ScheduleItems items(*parentProductRegistry);
00071
00072 ParameterSet const& optionsPset(processParameterSet->getUntrackedParameterSet("options", ParameterSet()));
00073 IllegalParameters::setThrowAnException(optionsPset.getUntrackedParameter<bool>("throwIfIllegalParameter", true));
00074
00075
00076 ServiceToken iToken;
00077
00078
00079 std::auto_ptr<std::vector<ParameterSet> > serviceSets = processParameterSet->popVParameterSet(std::string("services"));
00080
00081 ServiceToken newToken = items.initServices(*serviceSets, *processParameterSet, token, iLegacy, false);
00082 parentActReg.connectToSubProcess(*items.actReg_);
00083 serviceToken_ = items.addCPRandTNS(*processParameterSet, newToken);
00084
00085
00086
00087 ServiceRegistry::Operate operate(serviceToken_);
00088
00089
00090 boost::shared_ptr<CommonParams> common(items.initMisc(*processParameterSet));
00091
00092
00093 esp_ = esController.makeProvider(*processParameterSet, *common);
00094
00095
00096 schedule_ = items.initSchedule(*processParameterSet);
00097
00098
00099 act_table_ = items.act_table_;
00100 preg_ = items.preg_;
00101 processConfiguration_ = items.processConfiguration_;
00102
00103 boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(preg_, *processConfiguration_));
00104 principalCache_.insert(ep);
00105
00106 if(subProcessParameterSet) {
00107 subProcess_.reset(new SubProcess(*subProcessParameterSet, topLevelParameterSet, preg_, esController, *items.actReg_, newToken, iLegacy));
00108 }
00109 }
00110
00111 SubProcess::~SubProcess() {}
00112
00113 void
00114 SubProcess::beginJob() {
00115
00116 {
00117 std::set<BranchID> keptBranches;
00118 Selections const& keptVectorR = keptProducts()[InRun];
00119 for(Selections::const_iterator it = keptVectorR.begin(), itEnd = keptVectorR.end(); it != itEnd; ++it) {
00120 keptBranches.insert((*it)->branchID());
00121 }
00122 Selections const& keptVectorL = keptProducts()[InLumi];
00123 for(Selections::const_iterator it = keptVectorL.begin(), itEnd = keptVectorL.end(); it != itEnd; ++it) {
00124 keptBranches.insert((*it)->branchID());
00125 }
00126 Selections const& keptVectorE = keptProducts()[InEvent];
00127 for(Selections::const_iterator it = keptVectorE.begin(), itEnd = keptVectorE.end(); it != itEnd; ++it) {
00128 keptBranches.insert((*it)->branchID());
00129 }
00130 for(ProductRegistry::ProductList::const_iterator it = preg_->productList().begin(), itEnd = preg_->productList().end(); it != itEnd; ++it) {
00131 if(keptBranches.find(it->second.branchID()) == keptBranches.end()) {
00132 it->second.setDropped();
00133 }
00134 }
00135 }
00136 ServiceRegistry::Operate operate(serviceToken_);
00137 schedule_->beginJob();
00138 if(subProcess_.get()) subProcess_->doBeginJob();
00139 }
00140
00141 void
00142 SubProcess::endJob() {
00143 ServiceRegistry::Operate operate(serviceToken_);
00144 schedule_->endJob();
00145 if(subProcess_.get()) subProcess_->doEndJob();
00146 }
00147
00148 SubProcess::ESInfo::ESInfo(IOVSyncValue const& ts, eventsetup::EventSetupProvider& esp) :
00149 ts_(ts),
00150 es_(esp.eventSetupForInstance(ts)) {
00151 }
00152
00153 void
00154 SubProcess::doEvent(EventPrincipal const& principal, IOVSyncValue const& ts) {
00155 ServiceRegistry::Operate operate(serviceToken_);
00156 esInfo_.reset(new ESInfo(ts, *esp_));
00157 CurrentProcessingContext cpc;
00158 doEvent(principal, esInfo_->es_, &cpc);
00159 esInfo_.reset();
00160 }
00161
00162 void
00163 SubProcess::write(EventPrincipal const& principal) {
00164 std::auto_ptr<EventAuxiliary> aux(new EventAuxiliary(principal.aux()));
00165 aux->setProcessHistoryID(principal.processHistoryID());
00166 EventPrincipal& ep = principalCache_.eventPrincipal();
00167 ep.fillEventPrincipal(aux,
00168 principalCache_.lumiPrincipalPtr(),
00169 boost::shared_ptr<EventSelectionIDVector>(new EventSelectionIDVector),
00170 boost::shared_ptr<BranchListIndexes>(new BranchListIndexes),
00171 principal.branchMapperPtr(),
00172 principal.store());
00173 propagateProducts(InEvent, principal, ep);
00174 typedef OccurrenceTraits<EventPrincipal, BranchActionBegin> Traits;
00175 schedule_->processOneOccurrence<Traits>(ep, esInfo_->es_);
00176 if(subProcess_.get()) subProcess_->doEvent(ep, esInfo_->ts_);
00177 ep.clearEventPrincipal();
00178 }
00179
00180 void
00181 SubProcess::doBeginRun(RunPrincipal const& principal, IOVSyncValue const& ts) {
00182 ServiceRegistry::Operate operate(serviceToken_);
00183 esInfo_.reset(new ESInfo(ts, *esp_));
00184 CurrentProcessingContext cpc;
00185 doBeginRun(principal, esInfo_->es_, &cpc);
00186 esInfo_.reset();
00187 }
00188
00189 void
00190 SubProcess::beginRun(RunPrincipal const& principal) {
00191 boost::shared_ptr<RunAuxiliary> aux(new RunAuxiliary(principal.aux()));
00192 aux->setProcessHistoryID(principal.processHistoryID());
00193 boost::shared_ptr<RunPrincipal> rpp(new RunPrincipal(aux, preg_, *processConfiguration_));
00194 rpp->fillRunPrincipal(principal.branchMapperPtr(), principal.store());
00195 principalCache_.insert(rpp);
00196 parentToChildPhID_.insert(std::make_pair(principal.aux().processHistoryID(), principal.processHistoryID()));
00197 parentToChildPhID_.insert(std::make_pair(principal.processHistoryID(), rpp->processHistoryID()));
00198 RunPrincipal& rp = *principalCache_.runPrincipalPtr();
00199 propagateProducts(InRun, principal, rp);
00200 typedef OccurrenceTraits<RunPrincipal, BranchActionBegin> Traits;
00201 schedule_->processOneOccurrence<Traits>(rp, esInfo_->es_);
00202 if(subProcess_.get()) subProcess_->doBeginRun(rp, esInfo_->ts_);
00203 }
00204
00205 void
00206 SubProcess::doEndRun(RunPrincipal const& principal, IOVSyncValue const& ts) {
00207 ServiceRegistry::Operate operate(serviceToken_);
00208 esInfo_.reset(new ESInfo(ts, *esp_));
00209 CurrentProcessingContext cpc;
00210 doEndRun(principal, esInfo_->es_, &cpc);
00211 esInfo_.reset();
00212 }
00213
00214 void
00215 SubProcess::endRun(RunPrincipal const& principal) {
00216 RunPrincipal& rp = *principalCache_.runPrincipalPtr();
00217 propagateProducts(InRun, principal, rp);
00218 typedef OccurrenceTraits<RunPrincipal, BranchActionEnd> Traits;
00219 schedule_->processOneOccurrence<Traits>(rp, esInfo_->es_);
00220 if(subProcess_.get()) subProcess_->doEndRun(rp, esInfo_->ts_);
00221 }
00222
00223 void
00224 SubProcess::writeRun(ProcessHistoryID const& parentPhID, int runNumber) {
00225 ServiceRegistry::Operate operate(serviceToken_);
00226 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
00227 assert(it != parentToChildPhID_.end());
00228 schedule_->writeRun(principalCache_.runPrincipal(it->second, runNumber));
00229 if(subProcess_.get()) subProcess_->writeRun(it->second, runNumber);
00230 }
00231
00232 void
00233 SubProcess::doBeginLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts) {
00234 ServiceRegistry::Operate operate(serviceToken_);
00235 esInfo_.reset(new ESInfo(ts, *esp_));
00236 CurrentProcessingContext cpc;
00237 doBeginLuminosityBlock(principal, esInfo_->es_, &cpc);
00238 esInfo_.reset();
00239 }
00240
00241 void
00242 SubProcess::beginLuminosityBlock(LuminosityBlockPrincipal const& principal) {
00243 boost::shared_ptr<LuminosityBlockAuxiliary> aux(new LuminosityBlockAuxiliary(principal.aux()));
00244 aux->setProcessHistoryID(principal.processHistoryID());
00245 boost::shared_ptr<LuminosityBlockPrincipal> lbpp(new LuminosityBlockPrincipal(aux, preg_, *processConfiguration_, principalCache_.runPrincipalPtr()));
00246 lbpp->fillLuminosityBlockPrincipal(principal.branchMapperPtr(), principal.store());
00247 principalCache_.insert(lbpp);
00248 parentToChildPhID_.insert(std::make_pair(principal.aux().processHistoryID(), principal.processHistoryID()));
00249 parentToChildPhID_.insert(std::make_pair(principal.processHistoryID(), lbpp->processHistoryID()));
00250 LuminosityBlockPrincipal& lbp = *principalCache_.lumiPrincipalPtr();
00251 propagateProducts(InLumi, principal, lbp);
00252 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionBegin> Traits;
00253 schedule_->processOneOccurrence<Traits>(lbp, esInfo_->es_);
00254 if(subProcess_.get()) subProcess_->doBeginLuminosityBlock(lbp, esInfo_->ts_);
00255 }
00256
00257 void
00258 SubProcess::doEndLuminosityBlock(LuminosityBlockPrincipal const& principal, IOVSyncValue const& ts) {
00259 ServiceRegistry::Operate operate(serviceToken_);
00260 esInfo_.reset(new ESInfo(ts, *esp_));
00261 CurrentProcessingContext cpc;
00262 doEndLuminosityBlock(principal, esInfo_->es_, &cpc);
00263 esInfo_.reset();
00264 }
00265
00266 void
00267 SubProcess::endLuminosityBlock(LuminosityBlockPrincipal const& principal) {
00268 LuminosityBlockPrincipal& lbp = *principalCache_.lumiPrincipalPtr();
00269 propagateProducts(InLumi, principal, lbp);
00270 typedef OccurrenceTraits<LuminosityBlockPrincipal, BranchActionEnd> Traits;
00271 schedule_->processOneOccurrence<Traits>(lbp, esInfo_->es_);
00272 if(subProcess_.get()) subProcess_->doEndLuminosityBlock(lbp, esInfo_->ts_);
00273 }
00274
00275 void
00276 SubProcess::writeLumi(ProcessHistoryID const& parentPhID, int runNumber, int lumiNumber) {
00277 ServiceRegistry::Operate operate(serviceToken_);
00278 std::map<ProcessHistoryID, ProcessHistoryID>::const_iterator it = parentToChildPhID_.find(parentPhID);
00279 assert(it != parentToChildPhID_.end());
00280 schedule_->writeLumi(principalCache_.lumiPrincipal(it->second, runNumber, lumiNumber));
00281 if(subProcess_.get()) subProcess_->writeLumi(it->second, runNumber, lumiNumber);
00282 }
00283
00284 void
00285 SubProcess::propagateProducts(BranchType type, Principal const& parentPrincipal, Principal& principal) const {
00286 Selections const& keptVector = keptProducts()[type];
00287 for(Selections::const_iterator it = keptVector.begin(), itEnd = keptVector.end(); it != itEnd; ++it) {
00288 boost::shared_ptr<Group const> parentGroup = parentPrincipal.getGroup((*it)->branchID(), false, false);
00289 if(parentGroup) {
00290
00291 GroupData parentData = parentGroup->groupData();
00292 boost::shared_ptr<Group const> group = principal.getGroup((*it)->branchID(), false, false);
00293 if(group) {
00294
00295 GroupData& thisData = const_cast<GroupData&>(group->groupData());
00296 thisData.swap(parentData);
00297
00298 group->productUnavailable();
00299 }
00300 }
00301 }
00302 }
00303
00304
00305 std::auto_ptr<ParameterSet>
00306 popSubProcessParameterSet(ParameterSet& parameterSet) {
00307 std::vector<std::string> subProcesses = parameterSet.getUntrackedParameter<std::vector<std::string> >("@all_subprocesses");
00308 if(!subProcesses.empty()) {
00309 assert(subProcesses.size() == 1U);
00310 assert(subProcesses[0] == "@sub_process");
00311 return parameterSet.popParameterSet(subProcesses[0]);
00312 }
00313 return std::auto_ptr<ParameterSet>(0);
00314 }
00315
00316 }
00317