00001 #include "IOPool/Output/interface/PoolOutputModule.h"
00002
00003 #include "FWCore/MessageLogger/interface/JobReport.h"
00004 #include "IOPool/Output/src/RootOutputFile.h"
00005
00006 #include "FWCore/Framework/interface/EventPrincipal.h"
00007 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00008 #include "FWCore/Framework/interface/RunPrincipal.h"
00009 #include "FWCore/Framework/interface/FileBlock.h"
00010 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00011 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00012 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00013 #include "FWCore/ServiceRegistry/interface/Service.h"
00014 #include "DataFormats/Provenance/interface/BranchDescription.h"
00015 #include "FWCore/Utilities/interface/Algorithms.h"
00016 #include "FWCore/Utilities/interface/EDMException.h"
00017 #include "FWCore/Utilities/interface/DictionaryTools.h"
00018 #include "FWCore/Utilities/interface/TimeOfDay.h"
00019 #include "FWCore/Utilities/interface/WrappedClassName.h"
00020
00021 #include "TTree.h"
00022 #include "TBranchElement.h"
00023 #include "TObjArray.h"
00024 #include "RVersion.h"
00025
00026 #include <fstream>
00027 #include <iomanip>
00028 #include <sstream>
00029
00030 namespace edm {
00031 PoolOutputModule::PoolOutputModule(ParameterSet const& pset) :
00032 OutputModule(pset),
00033 rootServiceChecker_(),
00034 auxItems_(),
00035 selectedOutputItemList_(),
00036 fileName_(pset.getUntrackedParameter<std::string>("fileName")),
00037 logicalFileName_(pset.getUntrackedParameter<std::string>("logicalFileName")),
00038 catalog_(pset.getUntrackedParameter<std::string>("catalog")),
00039 maxFileSize_(pset.getUntrackedParameter<int>("maxSize")),
00040 compressionLevel_(pset.getUntrackedParameter<int>("compressionLevel")),
00041 #if ROOT_VERSION_CODE >= ROOT_VERSION(5,30,0)
00042 compressionAlgorithm_(pset.getUntrackedParameter<std::string>("compressionAlgorithm")),
00043 #else
00044 compressionAlgorithm_("ZLIB"),
00045 #endif
00046 basketSize_(pset.getUntrackedParameter<int>("basketSize")),
00047 eventAutoFlushSize_(pset.getUntrackedParameter<int>("eventAutoFlushCompressedSize")),
00048 splitLevel_(std::min<int>(pset.getUntrackedParameter<int>("splitLevel") + 1, 99)),
00049 basketOrder_(pset.getUntrackedParameter<std::string>("sortBaskets")),
00050 treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
00051 whyNotFastClonable_(pset.getUntrackedParameter<bool>("fastCloning") ? FileBlock::CanFastClone : FileBlock::DisabledInConfigFile),
00052 dropMetaData_(DropNone),
00053 moduleLabel_(pset.getParameter<std::string>("@module_label")),
00054 initializedFromInput_(false),
00055 outputFileCount_(0),
00056 inputFileCount_(0),
00057 childIndex_(0U),
00058 numberOfDigitsInIndex_(0U),
00059 overrideInputFileSplitLevels_(pset.getUntrackedParameter<bool>("overrideInputFileSplitLevels")),
00060 rootOutputFile_(),
00061 statusFileName_() {
00062
00063 if (pset.getUntrackedParameter<bool>("writeStatusFile")) {
00064 std::ostringstream statusfilename;
00065 statusfilename << moduleLabel_ << '_' << getpid();
00066 statusFileName_ = statusfilename.str();
00067 }
00068
00069 std::string dropMetaData(pset.getUntrackedParameter<std::string>("dropMetaData"));
00070 if(dropMetaData.empty()) dropMetaData_ = DropNone;
00071 else if(dropMetaData == std::string("NONE")) dropMetaData_ = DropNone;
00072 else if(dropMetaData == std::string("DROPPED")) dropMetaData_ = DropDroppedPrior;
00073 else if(dropMetaData == std::string("PRIOR")) dropMetaData_ = DropPrior;
00074 else if(dropMetaData == std::string("ALL")) dropMetaData_ = DropAll;
00075 else {
00076 throw edm::Exception(errors::Configuration, "Illegal dropMetaData parameter value: ")
00077 << dropMetaData << ".\n"
00078 << "Legal values are 'NONE', 'DROPPED', 'PRIOR', and 'ALL'.\n";
00079 }
00080
00081 if (!wantAllEvents()) {
00082 whyNotFastClonable_+= FileBlock::EventSelectionUsed;
00083 }
00084
00085
00086
00087
00088 pset.getUntrackedParameterSet("dataset");
00089 }
00090
00091 void PoolOutputModule::beginJob() {
00092 for(int i = InEvent; i < NumBranchTypes; ++i) {
00093 BranchType branchType = static_cast<BranchType>(i);
00094 Selections const& keptVector = keptProducts()[branchType];
00095 for(Selections::const_iterator it = keptVector.begin(), itEnd = keptVector.end(); it != itEnd; ++it) {
00096 BranchDescription const& prod = **it;
00097 checkDictionaries(prod.fullClassName(), true);
00098 checkDictionaries(wrappedClassName(prod.fullClassName()), true);
00099 }
00100 }
00101 }
00102
00103 std::string const& PoolOutputModule::currentFileName() const {
00104 return rootOutputFile_->fileName();
00105 }
00106
00107 PoolOutputModule::AuxItem::AuxItem() :
00108 basketSize_(BranchDescription::invalidBasketSize) {}
00109
00110 PoolOutputModule::OutputItem::OutputItem() :
00111 branchDescription_(0),
00112 product_(0),
00113 splitLevel_(BranchDescription::invalidSplitLevel),
00114 basketSize_(BranchDescription::invalidBasketSize) {}
00115
00116 PoolOutputModule::OutputItem::OutputItem(BranchDescription const* bd, int splitLevel, int basketSize) :
00117 branchDescription_(bd),
00118 product_(0),
00119 splitLevel_(splitLevel),
00120 basketSize_(basketSize) {}
00121
00122
00123 PoolOutputModule::OutputItem::Sorter::Sorter(TTree* tree) : treeMap_(new std::map<std::string, int>) {
00124
00125 if(tree != 0) {
00126 TObjArray* branches = tree->GetListOfBranches();
00127 for(int i = 0; i < branches->GetEntries(); ++i) {
00128 TBranchElement* br = (TBranchElement*)branches->At(i);
00129 treeMap_->insert(std::make_pair(std::string(br->GetName()), i));
00130 }
00131 }
00132 }
00133
00134 bool
00135 PoolOutputModule::OutputItem::Sorter::operator()(OutputItem const& lh, OutputItem const& rh) const {
00136
00137
00138 if(treeMap_->empty()) return lh < rh;
00139 std::string const& lstring = lh.branchDescription_->branchName();
00140 std::string const& rstring = rh.branchDescription_->branchName();
00141 std::map<std::string, int>::const_iterator lit = treeMap_->find(lstring);
00142 std::map<std::string, int>::const_iterator rit = treeMap_->find(rstring);
00143 bool lfound = (lit != treeMap_->end());
00144 bool rfound = (rit != treeMap_->end());
00145 if(lfound && rfound) {
00146 return lit->second < rit->second;
00147 } else if(lfound) {
00148 return true;
00149 } else if(rfound) {
00150 return false;
00151 }
00152 return lh < rh;
00153 }
00154
00155 void PoolOutputModule::fillSelectedItemList(BranchType branchType, TTree* theInputTree) {
00156
00157 Selections const& keptVector = keptProducts()[branchType];
00158 OutputItemList& outputItemList = selectedOutputItemList_[branchType];
00159 AuxItem& auxItem = auxItems_[branchType];
00160
00161
00162 if (theInputTree != 0 && !overrideInputFileSplitLevels_) {
00163 TBranch* auxBranch = theInputTree->GetBranch(BranchTypeToAuxiliaryBranchName(branchType).c_str());
00164 if (auxBranch) {
00165 auxItem.basketSize_ = auxBranch->GetBasketSize();
00166 } else {
00167 auxItem.basketSize_ = basketSize_;
00168 }
00169 } else {
00170 auxItem.basketSize_ = basketSize_;
00171 }
00172
00173
00174 for(Selections::const_iterator it = keptVector.begin(), itEnd = keptVector.end(); it != itEnd; ++it) {
00175 int splitLevel = BranchDescription::invalidSplitLevel;
00176 int basketSize = BranchDescription::invalidBasketSize;
00177
00178 BranchDescription const& prod = **it;
00179 TBranch* theBranch = ((!prod.produced() && theInputTree != 0 && !overrideInputFileSplitLevels_) ? theInputTree->GetBranch(prod.branchName().c_str()) : 0);
00180
00181 if(theBranch != 0) {
00182 splitLevel = theBranch->GetSplitLevel();
00183 basketSize = theBranch->GetBasketSize();
00184 } else {
00185 splitLevel = (prod.splitLevel() == BranchDescription::invalidSplitLevel ? splitLevel_ : prod.splitLevel());
00186 basketSize = (prod.basketSize() == BranchDescription::invalidBasketSize ? basketSize_ : prod.basketSize());
00187 }
00188 outputItemList.emplace_back(&prod, splitLevel, basketSize);
00189 }
00190
00191
00192
00193 sort_all(outputItemList, OutputItem::Sorter(theInputTree));
00194 }
00195
00196 void PoolOutputModule::beginInputFile(FileBlock const& fb) {
00197 if(isFileOpen()) {
00198 rootOutputFile_->beginInputFile(fb, remainingEvents());
00199 }
00200 }
00201
00202 void PoolOutputModule::openFile(FileBlock const& fb) {
00203 if(!isFileOpen()) {
00204 doOpenFile();
00205 beginInputFile(fb);
00206 }
00207 }
00208
00209 void PoolOutputModule::respondToOpenInputFile(FileBlock const& fb) {
00210 if(!initializedFromInput_) {
00211 for(int i = InEvent; i < NumBranchTypes; ++i) {
00212 BranchType branchType = static_cast<BranchType>(i);
00213 TTree* theInputTree = (branchType == InEvent ? fb.tree() :
00214 (branchType == InLumi ? fb.lumiTree() :
00215 fb.runTree()));
00216 fillSelectedItemList(branchType, theInputTree);
00217 }
00218 initializedFromInput_ = true;
00219 }
00220 ++inputFileCount_;
00221 beginInputFile(fb);
00222 }
00223
00224 void PoolOutputModule::respondToCloseInputFile(FileBlock const& fb) {
00225 if(rootOutputFile_) rootOutputFile_->respondToCloseInputFile(fb);
00226 }
00227
00228 void PoolOutputModule::postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {
00229 childIndex_ = iChildIndex;
00230 while (iNumberOfChildren != 0) {
00231 ++numberOfDigitsInIndex_;
00232 iNumberOfChildren /= 10;
00233 }
00234 if (numberOfDigitsInIndex_ == 0) {
00235 numberOfDigitsInIndex_ = 3;
00236 }
00237 }
00238
00239 PoolOutputModule::~PoolOutputModule() {
00240 }
00241
00242 void PoolOutputModule::write(EventPrincipal const& e) {
00243 rootOutputFile_->writeOne(e);
00244 if (!statusFileName_.empty()) {
00245 std::ofstream statusFile(statusFileName_.c_str());
00246 statusFile << e.id() << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
00247 statusFile.close();
00248 }
00249 }
00250
00251 void PoolOutputModule::writeLuminosityBlock(LuminosityBlockPrincipal const& lb) {
00252 rootOutputFile_->writeLuminosityBlock(lb);
00253 Service<JobReport> reportSvc;
00254 reportSvc->reportLumiSection(lb.id().run(), lb.id().luminosityBlock());
00255 }
00256
00257 void PoolOutputModule::writeRun(RunPrincipal const& r) {
00258 rootOutputFile_->writeRun(r);
00259 Service<JobReport> reportSvc;
00260 reportSvc->reportRunNumber(r.run());
00261 }
00262
00263
00264 void PoolOutputModule::startEndFile() { }
00265
00266
00267 void PoolOutputModule::writeFileFormatVersion() { rootOutputFile_->writeFileFormatVersion(); }
00268 void PoolOutputModule::writeFileIdentifier() { rootOutputFile_->writeFileIdentifier(); }
00269 void PoolOutputModule::writeIndexIntoFile() { rootOutputFile_->writeIndexIntoFile(); }
00270 void PoolOutputModule::writeProcessConfigurationRegistry() { rootOutputFile_->writeProcessConfigurationRegistry(); }
00271 void PoolOutputModule::writeProcessHistoryRegistry() { rootOutputFile_->writeProcessHistoryRegistry(); }
00272 void PoolOutputModule::writeParameterSetRegistry() { rootOutputFile_->writeParameterSetRegistry(); }
00273 void PoolOutputModule::writeProductDescriptionRegistry() { rootOutputFile_->writeProductDescriptionRegistry(); }
00274 void PoolOutputModule::writeParentageRegistry() { rootOutputFile_->writeParentageRegistry(); }
00275 void PoolOutputModule::writeBranchIDListRegistry() { rootOutputFile_->writeBranchIDListRegistry(); }
00276 void PoolOutputModule::writeProductDependencies() { rootOutputFile_->writeProductDependencies(); }
00277 void PoolOutputModule::finishEndFile() { rootOutputFile_->finishEndFile(); rootOutputFile_.reset(); }
00278 bool PoolOutputModule::isFileOpen() const { return rootOutputFile_.get() != 0; }
00279 bool PoolOutputModule::shouldWeCloseFile() const { return rootOutputFile_->shouldWeCloseFile(); }
00280
00281 void PoolOutputModule::doOpenFile() {
00282 if(inputFileCount_ == 0) {
00283 throw edm::Exception(errors::LogicError)
00284 << "Attempt to open output file before input file. "
00285 << "Please report this to the core framework developers.\n";
00286 }
00287 std::string suffix(".root");
00288 std::string::size_type offset = fileName().rfind(suffix);
00289 bool ext = (offset == fileName().size() - suffix.size());
00290 if(!ext) suffix.clear();
00291 std::string fileBase(ext ? fileName().substr(0, offset) : fileName());
00292 std::ostringstream ofilename;
00293 std::ostringstream lfilename;
00294 ofilename << fileBase;
00295 lfilename << logicalFileName();
00296 if(numberOfDigitsInIndex_) {
00297 ofilename << '_' << std::setw(numberOfDigitsInIndex_) << std::setfill('0') << childIndex_;
00298 if(!logicalFileName().empty()) {
00299 lfilename << '_' << std::setw(numberOfDigitsInIndex_) << std::setfill('0') << childIndex_;
00300 }
00301 }
00302 if(outputFileCount_) {
00303 ofilename << std::setw(3) << std::setfill('0') << outputFileCount_;
00304 if(!logicalFileName().empty()) {
00305 lfilename << std::setw(3) << std::setfill('0') << outputFileCount_;
00306 }
00307 }
00308 ofilename << suffix;
00309 rootOutputFile_.reset(new RootOutputFile(this, ofilename.str(), lfilename.str()));
00310 ++outputFileCount_;
00311 }
00312
00313 void
00314 PoolOutputModule::fillDescriptions(ConfigurationDescriptions & descriptions) {
00315 std::string defaultString;
00316 ParameterSetDescription desc;
00317 desc.setComment("Writes runs, lumis, and events into EDM/ROOT files.");
00318 desc.addUntracked<std::string>("fileName")
00319 ->setComment("Name of output file.");
00320 desc.addUntracked<std::string>("logicalFileName", defaultString)
00321 ->setComment("Passed to job report. Otherwise unused by module.");
00322 desc.addUntracked<std::string>("catalog", defaultString)
00323 ->setComment("Passed to job report. Otherwise unused by module.");
00324 desc.addUntracked<int>("maxSize", 0x7f000000)
00325 ->setComment("Maximum output file size, in kB.\n"
00326 "If over maximum, new output file will be started at next input file transition.");
00327 desc.addUntracked<int>("compressionLevel", 7)
00328 ->setComment("ROOT compression level of output file.");
00329 #if ROOT_VERSION_CODE >= ROOT_VERSION(5,30,0)
00330 desc.addUntracked<std::string>("compressionAlgorithm", "ZLIB")
00331 ->setComment("Algorithm used to compress data in the ROOT output file, allowed values are ZLIB and LZMA");
00332 #endif
00333 desc.addUntracked<int>("basketSize", 16384)
00334 ->setComment("Default ROOT basket size in output file.");
00335 desc.addUntracked<int>("eventAutoFlushCompressedSize",-1)->setComment("Set ROOT auto flush stored data size (in bytes) for event TTree. The value sets how large the compressed buffer is allowed to get. The uncompressed buffer can be quite a bit larger than this depending on the average compression ratio. The value of -1 just uses ROOT's default value. The value of 0 turns off this feature.");
00336 desc.addUntracked<int>("splitLevel", 99)
00337 ->setComment("Default ROOT branch split level in output file.");
00338 desc.addUntracked<std::string>("sortBaskets", std::string("sortbasketsbyoffset"))
00339 ->setComment("Legal values: 'sortbasketsbyoffset', 'sortbasketsbybranch', 'sortbasketsbyentry'.\n"
00340 "Used by ROOT when fast copying. Affects performance.");
00341 desc.addUntracked<int>("treeMaxVirtualSize", -1)
00342 ->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
00343 desc.addUntracked<bool>("fastCloning", true)
00344 ->setComment("True: Allow fast copying, if possible.\n"
00345 "False: Disable fast copying.");
00346 desc.addUntracked<bool>("overrideInputFileSplitLevels", false)
00347 ->setComment("False: Use branch split levels and basket sizes from input file, if possible.\n"
00348 "True: Always use specified or default split levels and basket sizes.");
00349 desc.addUntracked<bool>("writeStatusFile", false)
00350 ->setComment("Write a status file. Intended for use by workflow management.");
00351 desc.addUntracked<std::string>("dropMetaData", defaultString)
00352 ->setComment("Determines handling of per product per event metadata. Options are:\n"
00353 "'NONE': Keep all of it.\n"
00354 "'DROPPED': Keep it for products produced in current process and all kept products. Drop it for dropped products produced in prior processes.\n"
00355 "'PRIOR': Keep it for products produced in current process. Drop it for products produced in prior processes.\n"
00356 "'ALL': Drop all of it.");
00357 ParameterSetDescription dataSet;
00358 dataSet.setAllowAnything();
00359 desc.addUntracked<ParameterSetDescription>("dataset", dataSet)
00360 ->setComment("PSet is only used by Data Operations and not by this module.");
00361
00362 OutputModule::fillDescription(desc);
00363
00364 descriptions.add("edmOutput", desc);
00365 }
00366 }