CMS 3D CMS Logo

PoolOutputModule.cc
Go to the documentation of this file.
2 
4 
24 
25 #include "TTree.h"
26 #include "TBranchElement.h"
27 #include "TObjArray.h"
28 #include "RVersion.h"
29 
30 #include <fstream>
31 #include <iomanip>
32 #include <sstream>
33 #include "boost/algorithm/string.hpp"
34 
35 
36 namespace edm {
40  rootServiceChecker_(),
41  auxItems_(),
42  selectedOutputItemList_(),
43  fileName_(pset.getUntrackedParameter<std::string>("fileName")),
44  logicalFileName_(pset.getUntrackedParameter<std::string>("logicalFileName")),
45  catalog_(pset.getUntrackedParameter<std::string>("catalog")),
46  maxFileSize_(pset.getUntrackedParameter<int>("maxSize")),
47  compressionLevel_(pset.getUntrackedParameter<int>("compressionLevel")),
48  compressionAlgorithm_(pset.getUntrackedParameter<std::string>("compressionAlgorithm")),
49  basketSize_(pset.getUntrackedParameter<int>("basketSize")),
50  eventAutoFlushSize_(pset.getUntrackedParameter<int>("eventAutoFlushCompressedSize")),
51  splitLevel_(std::min<int>(pset.getUntrackedParameter<int>("splitLevel") + 1, 99)),
52  basketOrder_(pset.getUntrackedParameter<std::string>("sortBaskets")),
53  treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
54  whyNotFastClonable_(pset.getUntrackedParameter<bool>("fastCloning") ? FileBlock::CanFastClone : FileBlock::DisabledInConfigFile),
55  dropMetaData_(DropNone),
56  moduleLabel_(pset.getParameter<std::string>("@module_label")),
57  initializedFromInput_(false),
58  outputFileCount_(0),
59  inputFileCount_(0),
60  childIndex_(0U),
61  numberOfDigitsInIndex_(0U),
62  branchParents_(),
63  branchChildren_(),
64  overrideInputFileSplitLevels_(pset.getUntrackedParameter<bool>("overrideInputFileSplitLevels")),
65  rootOutputFile_(),
66  statusFileName_() {
67 
68  if (pset.getUntrackedParameter<bool>("writeStatusFile")) {
69  std::ostringstream statusfilename;
70  statusfilename << moduleLabel_ << '_' << getpid();
71  statusFileName_ = statusfilename.str();
72  }
73 
75  if(dropMetaData.empty()) dropMetaData_ = DropNone;
76  else if(dropMetaData == std::string("NONE")) dropMetaData_ = DropNone;
77  else if(dropMetaData == std::string("DROPPED")) dropMetaData_ = DropDroppedPrior;
78  else if(dropMetaData == std::string("PRIOR")) dropMetaData_ = DropPrior;
79  else if(dropMetaData == std::string("ALL")) dropMetaData_ = DropAll;
80  else {
81  throw edm::Exception(errors::Configuration, "Illegal dropMetaData parameter value: ")
82  << dropMetaData << ".\n"
83  << "Legal values are 'NONE', 'DROPPED', 'PRIOR', and 'ALL'.\n";
84  }
85 
86  if (!wantAllEvents()) {
88  }
89 
90  auto const& specialSplit {pset.getUntrackedParameterSetVector("overrideBranchesSplitLevel")};
91 
92  specialSplitLevelForBranches_.reserve(specialSplit.size());
93  for(auto const& s: specialSplit) {
94  specialSplitLevelForBranches_.emplace_back(s.getUntrackedParameter<std::string>("branch"),
95  s.getUntrackedParameter<int>("splitLevel"));
96  }
97 
98  // We don't use this next parameter, but we read it anyway because it is part
99  // of the configuration of this module. An external parser creates the
100  // configuration by reading this source code.
101  pset.getUntrackedParameterSet("dataset");
102  }
103 
106  for(auto const& prod : reg->productList()) {
107  BranchDescription const& desc = prod.second;
108  if (desc.produced() && desc.branchType() == InEvent && !desc.isAlias()) {
109  producedBranches_.emplace_back(desc.branchID());
110  }
111  }
112  }
113 
115  return rootOutputFile_->fileName();
116  }
117 
119  basketSize_(BranchDescription::invalidBasketSize) {}
120 
122  branchDescription_(0),
123  token_(),
124  product_(0),
125  splitLevel_(BranchDescription::invalidSplitLevel),
126  basketSize_(BranchDescription::invalidBasketSize) {}
127 
129  branchDescription_(bd),
130  token_(token),
131  product_(0),
132  splitLevel_(splitLevel),
133  basketSize_(basketSize) {}
134 
135 
137  // Fill a map mapping branch names to an index specifying the order in the tree.
138  if(tree != nullptr) {
139  TObjArray* branches = tree->GetListOfBranches();
140  for(int i = 0; i < branches->GetEntries(); ++i) {
141  TBranchElement* br = (TBranchElement*)branches->At(i);
142  treeMap_->insert(std::make_pair(std::string(br->GetName()), i));
143  }
144  }
145  }
146 
147  bool
149  // Provides a comparison for sorting branches according to the index values in treeMap_.
150  // Branches not found are always put at the end (i.e. not found > found).
151  if(treeMap_->empty()) return lh < rh;
152  std::string const& lstring = lh.branchDescription_->branchName();
153  std::string const& rstring = rh.branchDescription_->branchName();
154  std::map<std::string, int>::const_iterator lit = treeMap_->find(lstring);
155  std::map<std::string, int>::const_iterator rit = treeMap_->find(rstring);
156  bool lfound = (lit != treeMap_->end());
157  bool rfound = (rit != treeMap_->end());
158  if(lfound && rfound) {
159  return lit->second < rit->second;
160  } else if(lfound) {
161  return true;
162  } else if(rfound) {
163  return false;
164  }
165  return lh < rh;
166  }
167 
168  inline bool PoolOutputModule::SpecialSplitLevelForBranch::match( std::string const& iBranchName) const {
169  return std::regex_match(iBranchName,branch_);
170  }
171 
172  std::regex PoolOutputModule::SpecialSplitLevelForBranch::convert( std::string const& iGlobBranchExpression) const {
173  std::string tmp(iGlobBranchExpression);
174  boost::replace_all(tmp, "*", ".*");
175  boost::replace_all(tmp, "?", ".");
176  return std::regex(tmp);
177  }
178 
180 
181  SelectedProducts const& keptVector = keptProducts()[branchType];
183  AuxItem& auxItem = auxItems_[branchType];
184 
185  // Fill AuxItem
186  if (theInputTree != nullptr && !overrideInputFileSplitLevels_) {
187  TBranch* auxBranch = theInputTree->GetBranch(BranchTypeToAuxiliaryBranchName(branchType).c_str());
188  if (auxBranch) {
189  auxItem.basketSize_ = auxBranch->GetBasketSize();
190  } else {
191  auxItem.basketSize_ = basketSize_;
192  }
193  } else {
194  auxItem.basketSize_ = basketSize_;
195  }
196 
197  // Fill outputItemList with an entry for each branch.
198  for(auto const& kept : keptVector) {
201 
202  BranchDescription const& prod = *kept.first;
203  TBranch* theBranch = ((!prod.produced() && theInputTree != nullptr && !overrideInputFileSplitLevels_) ? theInputTree->GetBranch(prod.branchName().c_str()) : 0);
204 
205  if(theBranch != nullptr) {
206  splitLevel = theBranch->GetSplitLevel();
207  basketSize = theBranch->GetBasketSize();
208  } else {
209  splitLevel = (prod.splitLevel() == BranchDescription::invalidSplitLevel ? splitLevel_ : prod.splitLevel());
210  for(auto const& b: specialSplitLevelForBranches_) {
211  if(b.match(prod.branchName())) {
212  splitLevel =b.splitLevel_;
213  }
214  }
215  basketSize = (prod.basketSize() == BranchDescription::invalidBasketSize ? basketSize_ : prod.basketSize());
216  }
217  outputItemList.emplace_back(&prod, kept.second, splitLevel, basketSize);
218  }
219 
220  // Sort outputItemList to allow fast copying.
221  // The branches in outputItemList must be in the same order as in the input tree, with all new branches at the end.
222  sort_all(outputItemList, OutputItem::Sorter(theInputTree));
223  }
224 
226  if(isFileOpen()) {
227  //Faster to read ChildrenBranches directly from input
228  // file than to build it every event
229  auto const& branchToChildMap = fb.branchChildren().childLookup();
230  for (auto const& parentToChildren : branchToChildMap) {
231  for (auto const& child : parentToChildren.second) {
232  branchChildren_.insertChild(parentToChildren.first, child);
233  }
234  }
235  rootOutputFile_->beginInputFile(fb, remainingEvents());
236  }
237  }
238 
240  if(!isFileOpen()) {
241  reallyOpenFile();
242  beginInputFile(fb);
243  }
244  }
245 
247  if(!initializedFromInput_) {
248  for(int i = InEvent; i < NumBranchTypes; ++i) {
249  BranchType branchType = static_cast<BranchType>(i);
250  TTree* theInputTree = (branchType == InEvent ? fb.tree() :
251  (branchType == InLumi ? fb.lumiTree() :
252  fb.runTree()));
253  fillSelectedItemList(branchType, theInputTree);
254  }
255  initializedFromInput_ = true;
256  }
257  ++inputFileCount_;
258  beginInputFile(fb);
259  }
260 
262  if(rootOutputFile_) rootOutputFile_->respondToCloseInputFile(fb);
263  }
264 
265  void PoolOutputModule::postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {
266  childIndex_ = iChildIndex;
267  while (iNumberOfChildren != 0) {
269  iNumberOfChildren /= 10;
270  }
271  if (numberOfDigitsInIndex_ == 0) {
272  numberOfDigitsInIndex_ = 3; // Protect against zero iNumberOfChildren
273  }
274  }
275 
277  }
278 
281  rootOutputFile_->writeOne(e);
282  if (!statusFileName_.empty()) {
283  std::ofstream statusFile(statusFileName_.c_str());
284  statusFile << e.id() << " time: " << std::setprecision(3) << TimeOfDay() << '\n';
285  statusFile.close();
286  }
287  }
288 
290  rootOutputFile_->writeLuminosityBlock(lb);
291  }
292 
294  rootOutputFile_->writeRun(r);
295  }
296 
299  branchParents_.clear();
300  startEndFile();
310  writeProductDependencies(); //branchChildren used here
312  finishEndFile();
313 
315  }
316 
317 
318  // At some later date, we may move functionality from finishEndFile() to here.
320 
321  void PoolOutputModule::writeFileFormatVersion() { rootOutputFile_->writeFileFormatVersion(); }
322  void PoolOutputModule::writeFileIdentifier() { rootOutputFile_->writeFileIdentifier(); }
323  void PoolOutputModule::writeIndexIntoFile() { rootOutputFile_->writeIndexIntoFile(); }
324  void PoolOutputModule::writeProcessHistoryRegistry() { rootOutputFile_->writeProcessHistoryRegistry(); }
325  void PoolOutputModule::writeParameterSetRegistry() { rootOutputFile_->writeParameterSetRegistry(); }
326  void PoolOutputModule::writeProductDescriptionRegistry() { rootOutputFile_->writeProductDescriptionRegistry(); }
327  void PoolOutputModule::writeParentageRegistry() { rootOutputFile_->writeParentageRegistry(); }
328  void PoolOutputModule::writeBranchIDListRegistry() { rootOutputFile_->writeBranchIDListRegistry(); }
329  void PoolOutputModule::writeThinnedAssociationsHelper() { rootOutputFile_->writeThinnedAssociationsHelper(); }
330  void PoolOutputModule::writeProductDependencies() { rootOutputFile_->writeProductDependencies(); }
331  void PoolOutputModule::finishEndFile() { rootOutputFile_->finishEndFile(); rootOutputFile_ = nullptr; } // propagate_const<T> has no reset() function
333  bool PoolOutputModule::isFileOpen() const { return rootOutputFile_.get() != nullptr; }
334  bool PoolOutputModule::shouldWeCloseFile() const { return rootOutputFile_->shouldWeCloseFile(); }
335 
336  std::pair<std::string, std::string>
338  if(inputFileCount_ == 0) {
340  << "Attempt to open output file before input file. "
341  << "Please report this to the core framework developers.\n";
342  }
343  std::string suffix(".root");
344  std::string::size_type offset = fileName().rfind(suffix);
345  bool ext = (offset == fileName().size() - suffix.size());
346  if(!ext) suffix.clear();
347  std::string fileBase(ext ? fileName().substr(0, offset) : fileName());
348  std::ostringstream ofilename;
349  std::ostringstream lfilename;
350  ofilename << fileBase;
351  lfilename << logicalFileName();
353  ofilename << '_' << std::setw(numberOfDigitsInIndex_) << std::setfill('0') << childIndex_;
354  if(!logicalFileName().empty()) {
355  lfilename << '_' << std::setw(numberOfDigitsInIndex_) << std::setfill('0') << childIndex_;
356  }
357  }
358  if(outputFileCount_) {
359  ofilename << std::setw(3) << std::setfill('0') << outputFileCount_;
360  if(!logicalFileName().empty()) {
361  lfilename << std::setw(3) << std::setfill('0') << outputFileCount_;
362  }
363  }
364  ofilename << suffix;
366 
367  return std::make_pair(ofilename.str(), lfilename.str());
368  }
369 
372  rootOutputFile_ = std::make_unique<RootOutputFile>(this, names.first, names.second); // propagate_const<T> has no reset() function
373  }
374 
375  void
377  ProductProvenanceRetriever const* provRetriever,
378  BranchID const& branchID) {
379 
380  ProductProvenance const* provenance = provRetriever->branchIDToProvenanceForProducedOnly(branchID);
381  if (provenance != nullptr) {
382  BranchParents::iterator it = branchParents_.find(branchID);
383  if (it == branchParents_.end()) {
384  it = branchParents_.insert(std::make_pair(branchID,
385  std::set<ParentageID>())).first;
386  }
387  it->second.insert(provenance->parentageID());
388  }
389  }
390 
391  void
393 
395  for (auto const& bid : producedBranches_) {
396  updateBranchParentsForOneBranch(provRetriever, bid);
397  }
399  if (helper) {
400  for (auto const& bid : subProcessParentageHelper()->producedProducts()) {
401  updateBranchParentsForOneBranch(provRetriever, bid);
402  }
403  }
404  }
405 
406  void
407  PoolOutputModule::preActionBeforeRunEventAsync(WaitingTask* iTask, ModuleCallingContext const& iModuleCallingContext, Principal const& iPrincipal) const {
408  if(DropAll != dropMetaData_ ) {
409  auto const* ep = dynamic_cast<EventPrincipal const*>(&iPrincipal);
410  if(ep)
411  {
412  auto pr = ep->productProvenanceRetrieverPtr();
413  if(pr) {
414  pr->readProvenanceAsync(iTask,&iModuleCallingContext);
415  }
416  }
417  }
418  }
419 
420  void
422  for(auto const& branchParent : branchParents_) {
423  BranchID const& child = branchParent.first;
424  std::set<ParentageID> const& eIds = branchParent.second;
425  for(auto const& eId : eIds) {
426  Parentage entryDesc;
427  ParentageRegistry::instance()->getMapped(eId, entryDesc);
428  std::vector<BranchID> const& parents = entryDesc.parents();
429  for(auto const& parent : parents) {
431  }
432  }
433  }
434  }
435 
436  void
438  std::string defaultString;
439 
440  desc.setComment("Writes runs, lumis, and events into EDM/ROOT files.");
441  desc.addUntracked<std::string>("fileName")
442  ->setComment("Name of output file.");
443  desc.addUntracked<std::string>("logicalFileName", defaultString)
444  ->setComment("Passed to job report. Otherwise unused by module.");
445  desc.addUntracked<std::string>("catalog", defaultString)
446  ->setComment("Passed to job report. Otherwise unused by module.");
447  desc.addUntracked<int>("maxSize", 0x7f000000)
448  ->setComment("Maximum output file size, in kB.\n"
449  "If over maximum, new output file will be started at next input file transition.");
450  desc.addUntracked<int>("compressionLevel", 7)
451  ->setComment("ROOT compression level of output file.");
452  desc.addUntracked<std::string>("compressionAlgorithm", "ZLIB")
453  ->setComment("Algorithm used to compress data in the ROOT output file, allowed values are ZLIB and LZMA");
454  desc.addUntracked<int>("basketSize", 16384)
455  ->setComment("Default ROOT basket size in output file.");
456  desc.addUntracked<int>("eventAutoFlushCompressedSize",-1)->setComment("Set ROOT auto flush stored data size (in bytes) for event TTree. The value sets how large the compressed buffer is allowed to get. The uncompressed buffer can be quite a bit larger than this depending on the average compression ratio. The value of -1 just uses ROOT's default value. The value of 0 turns off this feature.");
457  desc.addUntracked<int>("splitLevel", 99)
458  ->setComment("Default ROOT branch split level in output file.");
459  desc.addUntracked<std::string>("sortBaskets", std::string("sortbasketsbyoffset"))
460  ->setComment("Legal values: 'sortbasketsbyoffset', 'sortbasketsbybranch', 'sortbasketsbyentry'.\n"
461  "Used by ROOT when fast copying. Affects performance.");
462  desc.addUntracked<int>("treeMaxVirtualSize", -1)
463  ->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
464  desc.addUntracked<bool>("fastCloning", true)
465  ->setComment("True: Allow fast copying, if possible.\n"
466  "False: Disable fast copying.");
467  desc.addUntracked<bool>("overrideInputFileSplitLevels", false)
468  ->setComment("False: Use branch split levels and basket sizes from input file, if possible.\n"
469  "True: Always use specified or default split levels and basket sizes.");
470  desc.addUntracked<bool>("writeStatusFile", false)
471  ->setComment("Write a status file. Intended for use by workflow management.");
472  desc.addUntracked<std::string>("dropMetaData", defaultString)
473  ->setComment("Determines handling of per product per event metadata. Options are:\n"
474  "'NONE': Keep all of it.\n"
475  "'DROPPED': Keep it for products produced in current process and all kept products. Drop it for dropped products produced in prior processes.\n"
476  "'PRIOR': Keep it for products produced in current process. Drop it for products produced in prior processes.\n"
477  "'ALL': Drop all of it.");
478  {
479  ParameterSetDescription dataSet;
480  dataSet.setAllowAnything();
481  desc.addUntracked<ParameterSetDescription>("dataset", dataSet)
482  ->setComment("PSet is only used by Data Operations and not by this module.");
483  }
484  {
485  ParameterSetDescription specialSplit;
486  specialSplit.addUntracked<std::string>("branch")->setComment("Name of branch needing a special split level. The name can contain wildcards '*' and '?'");
487  specialSplit.addUntracked<int>("splitLevel")->setComment("The special split level for the branch");
488  desc.addVPSetUntracked("overrideBranchesSplitLevel",specialSplit, std::vector<ParameterSet>());
489  }
491  }
492 
493  void
497  descriptions.add("edmOutput", desc);
498  }
499 }
virtual void openFile(FileBlock const &fb) override
virtual std::pair< std::string, std::string > physicalAndLogicalNameForNewFile()
T getUntrackedParameter(std::string const &, T const &) const
virtual bool shouldWeCloseFile() const override
allow inheriting classes to override but still be able to call this method in the overridden version ...
std::string const & branchName() const
SubProcessParentageHelper const * subProcessParentageHelper() const
std::string const & BranchTypeToAuxiliaryBranchName(BranchType const &branchType)
Definition: BranchType.cc:115
BranchDescription const * branchDescription_
int const & basketSize() const
EventID const & id() const
TPRegexp parents
Definition: eve_filter.cc:21
BranchType const & branchType() const
Definition: helper.py:1
virtual void write(EventForOutput const &e) override
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
static const HistoName names[]
edm::propagate_const< std::unique_ptr< RootOutputFile > > rootOutputFile_
static int const invalidSplitLevel
void setAllowAnything()
allow any parameter label/value pairs
std::vector< SpecialSplitLevelForBranch > specialSplitLevelForBranches_
static int const invalidBasketSize
void updateBranchParents(EventForOutput const &e)
DropMetaData const & dropMetaData() const
BranchChildren branchChildren_
std::vector< OutputItem > OutputItemList
std::string const & fileName() const
void insertChild(BranchID parent, BranchID child)
BranchChildren const & branchChildren() const
Definition: FileBlock.h:113
std::string const moduleLabel_
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
bool int lh
Definition: SIMDVec.h:21
uint16_t size_type
virtual void doExtrasAfterCloseFile()
std::string const & logicalFileName() const
BranchType
Definition: BranchType.h:11
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
void fillSelectedItemList(BranchType branchtype, TTree *theInputTree)
std::vector< BranchID > const & parents() const
Definition: Parentage.h:44
std::regex convert(std::string const &iGlobBranchExpression) const
PoolOutputModule(ParameterSet const &ps)
void setComment(std::string const &value)
void updateBranchParentsForOneBranch(ProductProvenanceRetriever const *provRetriever, BranchID const &branchID)
bool operator()(OutputItem const &lh, OutputItem const &rh) const
std::string const & currentFileName() const
bool getMapped(key_type const &k, value_type &result) const
OutputItemListArray selectedOutputItemList_
map_t const & childLookup() const
std::vector< BranchID > producedBranches_
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &iModuleCallingContext, Principal const &iPrincipal) const override
void readProvenanceAsync(WaitingTask *task, ModuleCallingContext const *moduleCallingContext) const
static void fillDescription(ParameterSetDescription &desc, std::vector< std::string > const &iDefaultOutputCommands=ProductSelectorRules::defaultSelectionStrings())
ProductProvenanceRetriever const * productProvenanceRetrieverPtr() const
SelectedProductsForBranchType const & keptProducts() const
int const & splitLevel() const
BranchID const & branchID() const
EventID const & min(EventID const &lh, EventID const &rh)
Definition: EventID.h:137
virtual void reallyOpenFile() override
BranchParents branchParents_
ProductProvenanceRetriever const * productProvenanceRetrieverPtr() const
unsigned int numberOfDigitsInIndex_
virtual void writeRun(RunForOutput const &r) override
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
virtual bool isFileOpen() const override
TTree * lumiTree() const
Definition: FileBlock.h:99
virtual void respondToCloseInputFile(FileBlock const &fb) override
virtual void beginJob() override
virtual void reallyCloseFile() override
double b
Definition: hdecay.h:120
void add(std::string const &label, ParameterSetDescription const &psetDescription)
ProductProvenance const * branchIDToProvenanceForProducedOnly(BranchID const &bid) const
ParentageID const & parentageID() const
std::vector< std::vector< double > > tmp
Definition: MVATrainer.cc:100
HLT enums.
virtual void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) override
std::shared_ptr< std::map< std::string, int > > treeMap_
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
void beginInputFile(FileBlock const &fb)
Definition: tree.py:1
virtual void respondToOpenInputFile(FileBlock const &fb) override
Definition: memstream.h:15
static void fillDescriptions(ConfigurationDescriptions &descriptions)
bool match(std::string const &iBranchName) const
static void fillDescription(ParameterSetDescription &desc)
static ParentageRegistry * instance()
TTree * runTree() const
Definition: FileBlock.h:101
TTree * tree() const
Definition: FileBlock.h:97
ParameterDescriptionBase * addVPSetUntracked(U const &iLabel, ParameterSetDescription const &validator, std::vector< ParameterSet > const &defaults)
def branchType(schema, name)
Definition: revisionDML.py:112
virtual void writeLuminosityBlock(LuminosityBlockForOutput const &lb) override