26 #include "TBranchElement.h" 27 #include "TObjArray.h" 33 #include "boost/algorithm/string.hpp" 39 rootServiceChecker_(),
41 selectedOutputItemList_(),
42 fileName_(
pset.getUntrackedParameter<
std::
string>(
"fileName")),
43 logicalFileName_(
pset.getUntrackedParameter<
std::
string>(
"logicalFileName")),
45 maxFileSize_(
pset.getUntrackedParameter<
int>(
"maxSize")),
46 compressionLevel_(
pset.getUntrackedParameter<
int>(
"compressionLevel")),
47 compressionAlgorithm_(
pset.getUntrackedParameter<
std::
string>(
"compressionAlgorithm")),
48 basketSize_(
pset.getUntrackedParameter<
int>(
"basketSize")),
49 eventAuxBasketSize_(
pset.getUntrackedParameter<
int>(
"eventAuxiliaryBasketSize")),
50 eventAutoFlushSize_(
pset.getUntrackedParameter<
int>(
"eventAutoFlushCompressedSize")),
51 splitLevel_(
std::
min<
int>(
pset.getUntrackedParameter<
int>(
"splitLevel") + 1, 99)),
52 basketOrder_(
pset.getUntrackedParameter<
std::
string>(
"sortBaskets")),
53 treeMaxVirtualSize_(
pset.getUntrackedParameter<
int>(
"treeMaxVirtualSize")),
54 whyNotFastClonable_(
pset.getUntrackedParameter<
bool>(
"fastCloning") ?
FileBlock::CanFastClone
56 dropMetaData_(DropNone),
57 moduleLabel_(
pset.getParameter<
std::
string>(
"@module_label")),
58 initializedFromInput_(
false),
63 overrideInputFileSplitLevels_(
pset.getUntrackedParameter<
bool>(
"overrideInputFileSplitLevels")),
64 compactEventAuxiliary_(
pset.getUntrackedParameter<
bool>(
"compactEventAuxiliary")),
65 mergeJob_(
pset.getUntrackedParameter<
bool>(
"mergeJob")),
68 overrideGUID_(
pset.getUntrackedParameter<
std::
string>(
"overrideGUID")) {
69 if (
pset.getUntrackedParameter<
bool>(
"writeStatusFile")) {
70 std::ostringstream statusfilename;
89 <<
"Legal values are 'NONE', 'DROPPED', 'PRIOR', and 'ALL'.\n";
96 auto const& specialSplit{
pset.getUntrackedParameterSetVector(
"overrideBranchesSplitLevel")};
99 for (
auto const&
s : specialSplit) {
101 s.getUntrackedParameter<
int>(
"splitLevel"));
107 pset.getUntrackedParameterSet(
"dataset");
112 for (
auto const&
prod : reg->productList()) {
132 if (
tree !=
nullptr) {
133 TObjArray* branches =
tree->GetListOfBranches();
134 for (
int i = 0;
i < branches->GetEntries(); ++
i) {
135 TBranchElement*
br = (TBranchElement*)branches->At(
i);
144 if (treeMap_->empty())
146 std::string const& lstring =
lh.branchDescription_->branchName();
148 std::map<std::string, int>::const_iterator lit = treeMap_->find(lstring);
149 std::map<std::string, int>::const_iterator rit = treeMap_->find(rstring);
150 bool lfound = (lit != treeMap_->end());
151 bool rfound = (rit != treeMap_->end());
152 if (lfound && rfound) {
153 return lit->second < rit->second;
163 return std::regex_match(iBranchName, branch_);
168 boost::replace_all(
tmp,
"*",
".*");
169 boost::replace_all(
tmp,
"?",
".");
170 return std::regex(
tmp);
198 for (
auto const& kept : keptVector) {
207 ? theInputTree->GetBranch(
prod.branchName().c_str())
210 if (theBranch !=
nullptr) {
216 if (
b.match(
prod.branchName())) {
235 for (
auto const& parentToChildren : branchToChildMap) {
236 for (
auto const&
child : parentToChildren.second) {
253 std::vector<std::string>
const& processesWithProcessBlockProducts =
255 unsigned int numberOfProcessesWithProcessBlockProducts = processesWithProcessBlockProducts.size();
263 TTree* theInputTree =
269 for (
unsigned int k =
InProcess;
k < numberOfTTrees; ++
k) {
299 statusFile <<
e.id() <<
" time: " << std::setprecision(3) <<
TimeOfDay() <<
'\n';
364 <<
"Please report this to the core framework developers.\n";
372 std::ostringstream ofilename;
373 std::ostringstream lfilename;
374 ofilename << fileBase;
385 return std::make_pair(ofilename.str(), lfilename.str());
403 if (provenance !=
nullptr) {
431 auto pr =
ep->productProvenanceRetrieverPtr();
433 pr->readProvenanceAsync(iTask, &iModuleCallingContext);
442 std::set<ParentageID>
const& eIds = branchParent.second;
443 for (
auto const& eId : eIds) {
457 desc.setComment(
"Writes runs, lumis, and events into EDM/ROOT files.");
458 desc.addUntracked<
std::string>(
"fileName")->setComment(
"Name of output file.");
460 ->setComment(
"Passed to job report. Otherwise unused by module.");
462 ->setComment(
"Passed to job report. Otherwise unused by module.");
463 desc.addUntracked<
int>(
"maxSize", 0x7f000000)
465 "Maximum output file size, in kB.\n" 466 "If over maximum, new output file will be started at next input file transition.");
467 desc.addUntracked<
int>(
"compressionLevel", 4)->setComment(
"ROOT compression level of output file.");
470 "Algorithm used to compress data in the ROOT output file, allowed values are ZLIB, LZMA, LZ4, and ZSTD");
471 desc.addUntracked<
int>(
"basketSize", 16384)->setComment(
"Default ROOT basket size in output file.");
472 desc.addUntracked<
int>(
"eventAuxiliaryBasketSize", 16384)
473 ->setComment(
"Default ROOT basket size in output file for EventAuxiliary branch.");
474 desc.addUntracked<
int>(
"eventAutoFlushCompressedSize", 20 * 1024 * 1024)
476 "Set ROOT auto flush stored data size (in bytes) for event TTree. The value sets how large the compressed " 477 "buffer is allowed to get. The uncompressed buffer can be quite a bit larger than this depending on the " 478 "average compression ratio. The value of -1 just uses ROOT's default value. The value of 0 turns off this " 480 desc.addUntracked<
int>(
"splitLevel", 99)->setComment(
"Default ROOT branch split level in output file.");
483 "Legal values: 'sortbasketsbyoffset', 'sortbasketsbybranch', 'sortbasketsbyentry'.\n" 484 "Used by ROOT when fast copying. Affects performance.");
485 desc.addUntracked<
int>(
"treeMaxVirtualSize", -1)
486 ->setComment(
"Size of ROOT TTree TBasket cache. Affects performance.");
487 desc.addUntracked<
bool>(
"fastCloning",
true)
489 "True: Allow fast copying, if possible.\n" 490 "False: Disable fast copying.");
491 desc.addUntracked(
"mergeJob",
false)
493 "If set to true and fast copying is disabled, copy input file compression and basket sizes to the output " 495 desc.addUntracked<
bool>(
"compactEventAuxiliary",
false)
497 "False: Write EventAuxiliary as we go like any other event metadata branch.\n" 498 "True: Optimize the file layout by deferring writing the EventAuxiliary branch until the output file is " 500 desc.addUntracked<
bool>(
"overrideInputFileSplitLevels",
false)
502 "False: Use branch split levels and basket sizes from input file, if possible.\n" 503 "True: Always use specified or default split levels and basket sizes.");
504 desc.addUntracked<
bool>(
"writeStatusFile",
false)
505 ->setComment(
"Write a status file. Intended for use by workflow management.");
508 "Determines handling of per product per event metadata. Options are:\n" 509 "'NONE': Keep all of it.\n" 510 "'DROPPED': Keep it for products produced in current process and all kept products. Drop it for dropped " 511 "products produced in prior processes.\n" 512 "'PRIOR': Keep it for products produced in current process. Drop it for products produced in prior " 514 "'ALL': Drop all of it.");
517 "Allows to override the GUID of the file. Intended to be used only in Tier0 for re-creating files.\n" 518 "The GUID needs to be of the proper format. If a new output file is started (see maxSize), the GUID of\n" 519 "the first file only is overridden, i.e. the subsequent output files have different, generated GUID.");
524 ->setComment(
"PSet is only used by Data Operations and not by this module.");
529 "Name of branch needing a special split level. The name can contain wildcards '*' and '?'");
530 specialSplit.
addUntracked<
int>(
"splitLevel")->setComment(
"The special split level for the branch");
531 desc.addVPSetUntracked(
"overrideBranchesSplitLevel", specialSplit, std::vector<ParameterSet>());
533 OutputModule::fillDescription(
desc);
539 descriptions.
add(
"edmOutput",
desc);
void writeParentageRegistry()
void openFile(FileBlock const &fb) override
virtual std::pair< std::string, std::string > physicalAndLogicalNameForNewFile()
BranchID branchID() const
BranchDescription const * branchDescription_
ParentageID const & parentageID() const
void write(EventForOutput const &e) override
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
edm::propagate_const< std::unique_ptr< RootOutputFile > > rootOutputFile_
BranchChildren const & branchChildren() const
static int const invalidSplitLevel
std::vector< SpecialSplitLevelForBranch > specialSplitLevelForBranches_
OutputProcessBlockHelper const & outputProcessBlockHelper() const
static int const invalidBasketSize
void setProcessesWithSelectedMergeableRunProducts(std::set< std::string > const &) override
void updateBranchParents(EventForOutput const &e)
constexpr unsigned int numberOfRunLumiEventProductTrees
BranchChildren branchChildren_
TTree * processBlockTree(std::string const &processName) const
void writeRun(RunForOutput const &) override
bool initializedFromInput_
void insertChild(BranchID parent, BranchID child)
std::string const moduleLabel_
std::vector< BranchID > const & parents() const
std::string overrideGUID_
std::vector< std::string > const & processesWithProcessBlockProducts() const
virtual void doExtrasAfterCloseFile()
const std::string names[nVars_]
void writeParameterSetRegistry()
std::string const & currentFileName() const
bool wantAllEvents() const
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
void writeProcessBlockHelper()
bool getMapped(key_type const &k, value_type &result) const
PoolOutputModule(ParameterSet const &ps)
void updateBranchParentsForOneBranch(ProductProvenanceRetriever const *provRetriever, BranchID const &branchID)
bool overrideInputFileSplitLevels_
void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const &iModuleCallingContext, Principal const &iPrincipal) const override
void writeFileFormatVersion()
std::string const & branchName() const
std::vector< BranchID > producedBranches_
std::vector< OutputItemList > selectedOutputItemList_
std::regex convert(std::string const &iGlobBranchExpression) const
ProductProvenance const * branchIDToProvenanceForProducedOnly(BranchID const &bid) const
OutputItem(BranchDescription const *bd, EDGetToken const &token, int splitLevel, int basketSize)
map_t const & childLookup() const
void writeStoredMergeableRunProductMetadata()
void writeEventAuxiliary()
EventID const & min(EventID const &lh, EventID const &rh)
void writeProductDependencies()
BranchParents branchParents_
DropMetaData const & dropMetaData() const
void writeIndexIntoFile()
void writeThinnedAssociationsHelper()
bool operator()(OutputItem const &lh, OutputItem const &rh) const
SelectedProductsForBranchType const & keptProducts() const
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
SubProcessParentageHelper const * subProcessParentageHelper() const
void writeBranchIDListRegistry()
void writeProcessHistoryRegistry()
void fillDependencyGraph()
bool shouldWeCloseFile() const override
allow inheriting classes to override but still be able to call this method in the overridden version ...
void respondToCloseInputFile(FileBlock const &fb) override
DropMetaData dropMetaData_
bool isFileOpen() const override
void reallyCloseFile() override
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void writeFileIdentifier()
std::string const & processName() const
bool match(std::string const &iBranchName) const
std::shared_ptr< std::map< std::string, int > > treeMap_
std::string statusFileName_
std::string const & logicalFileName() const
~PoolOutputModule() override
void beginInputFile(FileBlock const &fb)
std::string const & fileName() const
void writeProductDescriptionRegistry()
int const eventAuxBasketSize_
std::vector< OutputItem > OutputItemList
int remainingEvents() const
void respondToOpenInputFile(FileBlock const &fb) override
void writeProcessBlock(ProcessBlockForOutput const &) override
std::string const & BranchTypeToAuxiliaryBranchName(BranchType const &branchType)
void fillSelectedItemList(BranchType branchtype, std::string const &processName, TTree *theInputTree, OutputItemList &)
static void fillDescriptions(ConfigurationDescriptions &descriptions)
static void fillDescription(ParameterSetDescription &desc)
static ParentageRegistry * instance()
std::vector< std::string > processesWithSelectedMergeableRunProducts_
void writeLuminosityBlock(LuminosityBlockForOutput const &) override