CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_7/src/IOPool/Output/src/PoolOutputModule.cc

Go to the documentation of this file.
00001 #include "IOPool/Output/interface/PoolOutputModule.h"
00002 
00003 #include "FWCore/MessageLogger/interface/JobReport.h"
00004 #include "IOPool/Output/src/RootOutputFile.h"
00005 
00006 #include "FWCore/Framework/interface/EventPrincipal.h"
00007 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00008 #include "FWCore/Framework/interface/RunPrincipal.h"
00009 #include "FWCore/Framework/interface/FileBlock.h"
00010 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00011 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00012 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00013 #include "FWCore/ServiceRegistry/interface/Service.h"
00014 #include "DataFormats/Provenance/interface/BranchDescription.h"
00015 #include "FWCore/Utilities/interface/Algorithms.h"
00016 #include "FWCore/Utilities/interface/EDMException.h"
00017 #include "FWCore/Utilities/interface/DictionaryTools.h"
00018 #include "FWCore/Utilities/interface/TimeOfDay.h"
00019 #include "FWCore/Utilities/interface/WrappedClassName.h"
00020 
00021 #include "TTree.h"
00022 #include "TBranchElement.h"
00023 #include "TObjArray.h"
00024 #include "RVersion.h"
00025 
00026 #include <fstream>
00027 #include <iomanip>
00028 #include <sstream>
00029 
00030 namespace edm {
00031   PoolOutputModule::PoolOutputModule(ParameterSet const& pset) :
00032     OutputModule(pset),
00033     rootServiceChecker_(),
00034     auxItems_(),
00035     selectedOutputItemList_(),
00036     fileName_(pset.getUntrackedParameter<std::string>("fileName")),
00037     logicalFileName_(pset.getUntrackedParameter<std::string>("logicalFileName")),
00038     catalog_(pset.getUntrackedParameter<std::string>("catalog")),
00039     maxFileSize_(pset.getUntrackedParameter<int>("maxSize")),
00040     compressionLevel_(pset.getUntrackedParameter<int>("compressionLevel")),
00041 #if ROOT_VERSION_CODE >= ROOT_VERSION(5,30,0)
00042     compressionAlgorithm_(pset.getUntrackedParameter<std::string>("compressionAlgorithm")),
00043 #else
00044     compressionAlgorithm_("ZLIB"),
00045 #endif
00046     basketSize_(pset.getUntrackedParameter<int>("basketSize")),
00047     eventAutoFlushSize_(pset.getUntrackedParameter<int>("eventAutoFlushCompressedSize")),
00048     splitLevel_(std::min<int>(pset.getUntrackedParameter<int>("splitLevel") + 1, 99)),
00049     basketOrder_(pset.getUntrackedParameter<std::string>("sortBaskets")),
00050     treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
00051     whyNotFastClonable_(pset.getUntrackedParameter<bool>("fastCloning") ? FileBlock::CanFastClone : FileBlock::DisabledInConfigFile),
00052     dropMetaData_(DropNone),
00053     moduleLabel_(pset.getParameter<std::string>("@module_label")),
00054     initializedFromInput_(false),
00055     outputFileCount_(0),
00056     inputFileCount_(0),
00057     childIndex_(0U),
00058     numberOfDigitsInIndex_(0U),
00059     overrideInputFileSplitLevels_(pset.getUntrackedParameter<bool>("overrideInputFileSplitLevels")),
00060     rootOutputFile_(),
00061     statusFileName_() {
00062 
00063       if (pset.getUntrackedParameter<bool>("writeStatusFile")) {
00064         std::ostringstream statusfilename;
00065         statusfilename << moduleLabel_ << '_' << getpid();
00066         statusFileName_ = statusfilename.str();
00067       }
00068 
00069       std::string dropMetaData(pset.getUntrackedParameter<std::string>("dropMetaData"));
00070       if(dropMetaData.empty()) dropMetaData_ = DropNone;
00071       else if(dropMetaData == std::string("NONE")) dropMetaData_ = DropNone;
00072       else if(dropMetaData == std::string("DROPPED")) dropMetaData_ = DropDroppedPrior;
00073       else if(dropMetaData == std::string("PRIOR")) dropMetaData_ = DropPrior;
00074       else if(dropMetaData == std::string("ALL")) dropMetaData_ = DropAll;
00075       else {
00076         throw edm::Exception(errors::Configuration, "Illegal dropMetaData parameter value: ")
00077             << dropMetaData << ".\n"
00078             << "Legal values are 'NONE', 'DROPPED', 'PRIOR', and 'ALL'.\n";
00079       }
00080 
00081     if (!wantAllEvents()) {
00082       whyNotFastClonable_+= FileBlock::EventSelectionUsed;
00083     }
00084 
00085     // We don't use this next parameter, but we read it anyway because it is part
00086     // of the configuration of this module.  An external parser creates the
00087     // configuration by reading this source code.
00088     pset.getUntrackedParameterSet("dataset");
00089   }
00090 
00091   void PoolOutputModule::beginJob() {
00092     for(int i = InEvent; i < NumBranchTypes; ++i) {
00093       BranchType branchType = static_cast<BranchType>(i);
00094       Selections const& keptVector = keptProducts()[branchType];
00095       for(Selections::const_iterator it = keptVector.begin(), itEnd = keptVector.end(); it != itEnd; ++it) {
00096         BranchDescription const& prod = **it;
00097         checkDictionaries(prod.fullClassName(), true);
00098         checkDictionaries(wrappedClassName(prod.fullClassName()), true);
00099       }
00100     }
00101   }
00102 
00103   std::string const& PoolOutputModule::currentFileName() const {
00104     return rootOutputFile_->fileName();
00105   }
00106 
00107   PoolOutputModule::AuxItem::AuxItem() :
00108         basketSize_(BranchDescription::invalidBasketSize) {}
00109 
00110   PoolOutputModule::OutputItem::OutputItem() :
00111         branchDescription_(0),
00112         product_(0),
00113         splitLevel_(BranchDescription::invalidSplitLevel),
00114         basketSize_(BranchDescription::invalidBasketSize) {}
00115 
00116   PoolOutputModule::OutputItem::OutputItem(BranchDescription const* bd, int splitLevel, int basketSize) :
00117         branchDescription_(bd),
00118         product_(0),
00119         splitLevel_(splitLevel),
00120         basketSize_(basketSize) {}
00121 
00122 
00123   PoolOutputModule::OutputItem::Sorter::Sorter(TTree* tree) : treeMap_(new std::map<std::string, int>) {
00124     // Fill a map mapping branch names to an index specifying the order in the tree.
00125     if(tree != 0) {
00126       TObjArray* branches = tree->GetListOfBranches();
00127       for(int i = 0; i < branches->GetEntries(); ++i) {
00128         TBranchElement* br = (TBranchElement*)branches->At(i);
00129         treeMap_->insert(std::make_pair(std::string(br->GetName()), i));
00130       }
00131     }
00132   }
00133 
00134   bool
00135   PoolOutputModule::OutputItem::Sorter::operator()(OutputItem const& lh, OutputItem const& rh) const {
00136     // Provides a comparison for sorting branches according to the index values in treeMap_.
00137     // Branches not found are always put at the end (i.e. not found > found).
00138     if(treeMap_->empty()) return lh < rh;
00139     std::string const& lstring = lh.branchDescription_->branchName();
00140     std::string const& rstring = rh.branchDescription_->branchName();
00141     std::map<std::string, int>::const_iterator lit = treeMap_->find(lstring);
00142     std::map<std::string, int>::const_iterator rit = treeMap_->find(rstring);
00143     bool lfound = (lit != treeMap_->end());
00144     bool rfound = (rit != treeMap_->end());
00145     if(lfound && rfound) {
00146       return lit->second < rit->second;
00147     } else if(lfound) {
00148       return true;
00149     } else if(rfound) {
00150       return false;
00151     }
00152     return lh < rh;
00153   }
00154 
00155   void PoolOutputModule::fillSelectedItemList(BranchType branchType, TTree* theInputTree) {
00156 
00157     Selections const& keptVector = keptProducts()[branchType];
00158     OutputItemList&   outputItemList = selectedOutputItemList_[branchType];
00159     AuxItem&   auxItem = auxItems_[branchType];
00160 
00161     // Fill AuxItem
00162     if (theInputTree != 0 && !overrideInputFileSplitLevels_) {
00163       TBranch* auxBranch = theInputTree->GetBranch(BranchTypeToAuxiliaryBranchName(branchType).c_str());
00164       if (auxBranch) {
00165         auxItem.basketSize_ = auxBranch->GetBasketSize();
00166       } else {
00167         auxItem.basketSize_ = basketSize_;
00168       }
00169     } else {
00170       auxItem.basketSize_ = basketSize_;
00171     }
00172 
00173     // Fill outputItemList with an entry for each branch.
00174     for(Selections::const_iterator it = keptVector.begin(), itEnd = keptVector.end(); it != itEnd; ++it) {
00175       int splitLevel = BranchDescription::invalidSplitLevel;
00176       int basketSize = BranchDescription::invalidBasketSize;
00177 
00178       BranchDescription const& prod = **it;
00179       TBranch* theBranch = ((!prod.produced() && theInputTree != 0 && !overrideInputFileSplitLevels_) ? theInputTree->GetBranch(prod.branchName().c_str()) : 0);
00180 
00181       if(theBranch != 0) {
00182         splitLevel = theBranch->GetSplitLevel();
00183         basketSize = theBranch->GetBasketSize();
00184       } else {
00185         splitLevel = (prod.splitLevel() == BranchDescription::invalidSplitLevel ? splitLevel_ : prod.splitLevel());
00186         basketSize = (prod.basketSize() == BranchDescription::invalidBasketSize ? basketSize_ : prod.basketSize());
00187       }
00188       outputItemList.emplace_back(&prod, splitLevel, basketSize);
00189     }
00190 
00191     // Sort outputItemList to allow fast copying.
00192     // The branches in outputItemList must be in the same order as in the input tree, with all new branches at the end.
00193     sort_all(outputItemList, OutputItem::Sorter(theInputTree));
00194   }
00195 
00196   void PoolOutputModule::beginInputFile(FileBlock const& fb) {
00197     if(isFileOpen()) {
00198       rootOutputFile_->beginInputFile(fb, remainingEvents());
00199     }
00200   }
00201 
00202   void PoolOutputModule::openFile(FileBlock const& fb) {
00203     if(!isFileOpen()) {
00204       doOpenFile();
00205       beginInputFile(fb);
00206     }
00207   }
00208 
00209   void PoolOutputModule::respondToOpenInputFile(FileBlock const& fb) {
00210     if(!initializedFromInput_) {
00211       for(int i = InEvent; i < NumBranchTypes; ++i) {
00212         BranchType branchType = static_cast<BranchType>(i);
00213         TTree* theInputTree = (branchType == InEvent ? fb.tree() :
00214                               (branchType == InLumi ? fb.lumiTree() :
00215                                fb.runTree()));
00216         fillSelectedItemList(branchType, theInputTree);
00217       }
00218       initializedFromInput_ = true;
00219     }
00220     ++inputFileCount_;
00221     beginInputFile(fb);
00222   }
00223 
00224   void PoolOutputModule::respondToCloseInputFile(FileBlock const& fb) {
00225     if(rootOutputFile_) rootOutputFile_->respondToCloseInputFile(fb);
00226   }
00227 
00228   void PoolOutputModule::postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {
00229     childIndex_ = iChildIndex;
00230     while (iNumberOfChildren != 0) {
00231       ++numberOfDigitsInIndex_;
00232       iNumberOfChildren /= 10;
00233     }
00234     if (numberOfDigitsInIndex_ == 0) {
00235       numberOfDigitsInIndex_ = 3; // Protect against zero iNumberOfChildren
00236     }
00237   }
00238 
00239   PoolOutputModule::~PoolOutputModule() {
00240   }
00241 
00242   void PoolOutputModule::write(EventPrincipal const& e) {
00243       rootOutputFile_->writeOne(e);
00244       if (!statusFileName_.empty()) {
00245         std::ofstream statusFile(statusFileName_.c_str());
00246         statusFile << e.id() << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
00247         statusFile.close();
00248       }
00249   }
00250 
00251   void PoolOutputModule::writeLuminosityBlock(LuminosityBlockPrincipal const& lb) {
00252       rootOutputFile_->writeLuminosityBlock(lb);
00253       Service<JobReport> reportSvc;
00254       reportSvc->reportLumiSection(lb.id().run(), lb.id().luminosityBlock());
00255   }
00256 
00257   void PoolOutputModule::writeRun(RunPrincipal const& r) {
00258       rootOutputFile_->writeRun(r);
00259       Service<JobReport> reportSvc;
00260       reportSvc->reportRunNumber(r.run());
00261   }
00262 
00263   // At some later date, we may move functionality from finishEndFile() to here.
00264   void PoolOutputModule::startEndFile() { }
00265 
00266 
00267   void PoolOutputModule::writeFileFormatVersion() { rootOutputFile_->writeFileFormatVersion(); }
00268   void PoolOutputModule::writeFileIdentifier() { rootOutputFile_->writeFileIdentifier(); }
00269   void PoolOutputModule::writeIndexIntoFile() { rootOutputFile_->writeIndexIntoFile(); }
00270   void PoolOutputModule::writeProcessConfigurationRegistry() { rootOutputFile_->writeProcessConfigurationRegistry(); }
00271   void PoolOutputModule::writeProcessHistoryRegistry() { rootOutputFile_->writeProcessHistoryRegistry(); }
00272   void PoolOutputModule::writeParameterSetRegistry() { rootOutputFile_->writeParameterSetRegistry(); }
00273   void PoolOutputModule::writeProductDescriptionRegistry() { rootOutputFile_->writeProductDescriptionRegistry(); }
00274   void PoolOutputModule::writeParentageRegistry() { rootOutputFile_->writeParentageRegistry(); }
00275   void PoolOutputModule::writeBranchIDListRegistry() { rootOutputFile_->writeBranchIDListRegistry(); }
00276   void PoolOutputModule::writeProductDependencies() { rootOutputFile_->writeProductDependencies(); }
00277   void PoolOutputModule::finishEndFile() { rootOutputFile_->finishEndFile(); rootOutputFile_.reset(); }
00278   bool PoolOutputModule::isFileOpen() const { return rootOutputFile_.get() != 0; }
00279   bool PoolOutputModule::shouldWeCloseFile() const { return rootOutputFile_->shouldWeCloseFile(); }
00280 
00281   void PoolOutputModule::doOpenFile() {
00282       if(inputFileCount_ == 0) {
00283         throw edm::Exception(errors::LogicError)
00284           << "Attempt to open output file before input file. "
00285           << "Please report this to the core framework developers.\n";
00286       }
00287       std::string suffix(".root");
00288       std::string::size_type offset = fileName().rfind(suffix);
00289       bool ext = (offset == fileName().size() - suffix.size());
00290       if(!ext) suffix.clear();
00291       std::string fileBase(ext ? fileName().substr(0, offset) : fileName());
00292       std::ostringstream ofilename;
00293       std::ostringstream lfilename;
00294       ofilename << fileBase;
00295       lfilename << logicalFileName();
00296       if(numberOfDigitsInIndex_) {
00297         ofilename << '_' << std::setw(numberOfDigitsInIndex_) << std::setfill('0') << childIndex_;
00298         if(!logicalFileName().empty()) {
00299           lfilename << '_' << std::setw(numberOfDigitsInIndex_) << std::setfill('0') << childIndex_;
00300         }
00301       }
00302       if(outputFileCount_) {
00303         ofilename << std::setw(3) << std::setfill('0') << outputFileCount_;
00304         if(!logicalFileName().empty()) {
00305           lfilename << std::setw(3) << std::setfill('0') << outputFileCount_;
00306         }
00307       }
00308       ofilename << suffix;
00309       rootOutputFile_.reset(new RootOutputFile(this, ofilename.str(), lfilename.str()));
00310       ++outputFileCount_;
00311   }
00312 
00313   void
00314   PoolOutputModule::fillDescriptions(ConfigurationDescriptions & descriptions) {
00315     std::string defaultString;
00316     ParameterSetDescription desc;
00317     desc.setComment("Writes runs, lumis, and events into EDM/ROOT files.");
00318     desc.addUntracked<std::string>("fileName")
00319         ->setComment("Name of output file.");
00320     desc.addUntracked<std::string>("logicalFileName", defaultString)
00321         ->setComment("Passed to job report. Otherwise unused by module.");
00322     desc.addUntracked<std::string>("catalog", defaultString)
00323         ->setComment("Passed to job report. Otherwise unused by module.");
00324     desc.addUntracked<int>("maxSize", 0x7f000000)
00325         ->setComment("Maximum output file size, in kB.\n"
00326                      "If over maximum, new output file will be started at next input file transition.");
00327     desc.addUntracked<int>("compressionLevel", 7)
00328         ->setComment("ROOT compression level of output file.");
00329 #if ROOT_VERSION_CODE >= ROOT_VERSION(5,30,0)
00330     desc.addUntracked<std::string>("compressionAlgorithm", "ZLIB")
00331         ->setComment("Algorithm used to compress data in the ROOT output file, allowed values are ZLIB and LZMA");
00332 #endif
00333     desc.addUntracked<int>("basketSize", 16384)
00334         ->setComment("Default ROOT basket size in output file.");
00335     desc.addUntracked<int>("eventAutoFlushCompressedSize",-1)->setComment("Set ROOT auto flush stored data size (in bytes) for event TTree. The value sets how large the compressed buffer is allowed to get. The uncompressed buffer can be quite a bit larger than this depending on the average compression ratio. The value of -1 just uses ROOT's default value. The value of 0 turns off this feature.");
00336     desc.addUntracked<int>("splitLevel", 99)
00337         ->setComment("Default ROOT branch split level in output file.");
00338     desc.addUntracked<std::string>("sortBaskets", std::string("sortbasketsbyoffset"))
00339         ->setComment("Legal values: 'sortbasketsbyoffset', 'sortbasketsbybranch', 'sortbasketsbyentry'.\n"
00340                      "Used by ROOT when fast copying. Affects performance.");
00341     desc.addUntracked<int>("treeMaxVirtualSize", -1)
00342         ->setComment("Size of ROOT TTree TBasket cache.  Affects performance.");
00343     desc.addUntracked<bool>("fastCloning", true)
00344         ->setComment("True:  Allow fast copying, if possible.\n"
00345                      "False: Disable fast copying.");
00346     desc.addUntracked<bool>("overrideInputFileSplitLevels", false)
00347         ->setComment("False: Use branch split levels and basket sizes from input file, if possible.\n"
00348                      "True:  Always use specified or default split levels and basket sizes.");
00349     desc.addUntracked<bool>("writeStatusFile", false)
00350         ->setComment("Write a status file. Intended for use by workflow management.");
00351     desc.addUntracked<std::string>("dropMetaData", defaultString)
00352         ->setComment("Determines handling of per product per event metadata.  Options are:\n"
00353                      "'NONE':    Keep all of it.\n"
00354                      "'DROPPED': Keep it for products produced in current process and all kept products. Drop it for dropped products produced in prior processes.\n"
00355                      "'PRIOR':   Keep it for products produced in current process. Drop it for products produced in prior processes.\n"
00356                      "'ALL':     Drop all of it.");
00357     ParameterSetDescription dataSet;
00358     dataSet.setAllowAnything();
00359     desc.addUntracked<ParameterSetDescription>("dataset", dataSet)
00360      ->setComment("PSet is only used by Data Operations and not by this module.");
00361 
00362     OutputModule::fillDescription(desc);
00363 
00364     descriptions.add("edmOutput", desc);
00365   }
00366 }