50 #include "Compression.h" 59 bool sorterForJobReportHash(BranchDescription
const*
lh, BranchDescription
const* rh) {
60 return lh->fullClassName() < rh->fullClassName() ?
true 61 :
lh->fullClassName() > rh->fullClassName() ?
false 62 :
lh->moduleLabel() < rh->moduleLabel() ?
true 63 :
lh->moduleLabel() > rh->moduleLabel() ?
false 64 :
lh->productInstanceName() < rh->productInstanceName() ?
true 65 :
lh->productInstanceName() > rh->productInstanceName() ?
false 66 :
lh->processName() < rh->processName() ?
true 73 if (
e != std::exception_ptr()) {
75 std::rethrow_exception(
e);
84 std::vector<std::string>
const& processesWithSelectedMergeableRunProducts,
87 logicalFile_(logicalFileName),
90 whyNotFastClonable_(om_->whyNotFastClonable()),
91 canFastCloneAux_(
false),
94 eventEntryNumber_(0
LL),
95 lumiEntryNumber_(0
LL),
98 storedMergeableRunProductMetadata_(processesWithSelectedMergeableRunProducts),
100 metaDataTree_(nullptr),
101 parameterSetsTree_(nullptr),
102 parentageTree_(nullptr),
106 pLumiAux_(&lumiAux_),
108 eventEntryInfoVector_(),
109 pEventEntryInfoVector_(&eventEntryInfoVector_),
110 pBranchListIndexes_(nullptr),
111 pEventSelectionIDs_(nullptr),
114 runTree_(filePtr(),
InRun, om_->
splitLevel(), om_->treeMaxVirtualSize()),
115 dataTypeReported_(
false),
116 processHistoryRegistry_(),
118 branchesWithStoredHistory_(),
119 wrapperBaseTClass_(TClass::GetClass(
"edm::WrapperBase")) {
120 std::vector<std::string>
const& processesWithProcessBlockProducts =
121 om_->outputProcessBlockHelper().processesWithProcessBlockProducts();
122 for (
auto const&
processName : processesWithProcessBlockProducts) {
128 filePtr_->SetCompressionAlgorithm(ROOT::kZLIB);
130 filePtr_->SetCompressionAlgorithm(ROOT::kLZMA);
132 filePtr_->SetCompressionAlgorithm(ROOT::kZSTD);
134 filePtr_->SetCompressionAlgorithm(ROOT::kLZ4);
137 <<
"PoolOutputModule configured with unknown compression algorithm '" <<
om_->compressionAlgorithm() <<
"'\n" 138 <<
"Allowed compression algorithms are ZLIB, LZMA, LZ4, and ZSTD\n";
143 if (
om_->compactEventAuxiliary()) {
161 if (
om_->outputProcessBlockHelper().productsFromInputKept()) {
182 for (
auto&
item :
om_->selectedOutputItemList()[
i]) {
183 item.setProduct(
nullptr);
190 item.branchDescription()->produced());
200 if (overrideGUID.empty()) {
205 <<
"GUID to be used for output file is not valid (is '" << overrideGUID <<
"')";
216 std::vector<BranchDescription const*> branches;
221 branches.push_back(
item.branchDescription());
224 sort_all(branches, sorterForJobReportHash);
226 std::ostringstream oss;
227 char const underscore =
'_';
228 for (
auto const&
branch : branches) {
230 oss <<
bd.fullClassName() << underscore <<
bd.moduleLabel() << underscore <<
bd.productInstanceName()
231 << underscore <<
bd.processName() << underscore;
252 void maybeIssueWarning(
int whyNotFastClonable,
std::string const& ifileName,
std::string const& ofileName) {
262 bool isWarning =
true;
263 std::ostringstream message;
264 message <<
"Fast copying of file " << ifileName <<
" to file " << ofileName <<
" is disabled because:\n";
266 message <<
"a SecondaryFileSequence was specified.\n";
271 message <<
"the input file is in an old format.\n";
276 message <<
"events need to be sorted.\n";
280 message <<
"a run or a lumi is not contiguous in the input file.\n";
284 message <<
"events or lumis were selected or skipped by ID.\n";
289 message <<
"initial events, lumis or runs were skipped.\n";
294 message <<
"some events were skipped because of duplicate checking.\n";
298 message <<
"some events were not copied because of maxEvents limit.\n";
303 message <<
"some events were not copied because of maxLumis limit.\n";
308 message <<
"parallel processing was specified.\n";
313 message <<
"an EventSelector was specified.\n";
318 message <<
"some events were not copied because of maxEvents output limit.\n";
323 message <<
"the split level or basket size of a branch or branches was modified.\n";
327 message <<
"The format of a data product has changed.\n";
332 LogWarning(
"FastCloningDisabled") << message.str();
334 LogInfo(
"FastCloningDisabled") << message.str();
344 if (fb.
tree() !=
nullptr) {
347 if (remainingEvents >= 0 && remainingEvents < fb.
tree()->GetEntries()) {
353 if (
om_->overrideInputFileSplitLevels()) {
364 <<
"Merge failure because input file " <<
file_ <<
" has different ROOT split levels or basket sizes\n" 365 <<
"than previous files. To allow merging in spite of this, use the configuration parameter\n" 366 <<
"overrideInputFileSplitLevels=cms.untracked.bool(True)\n" 367 <<
"in every PoolOutputModule.\n";
379 constexpr
auto setSubBranchBasketConditions =
384 if (
om_->inputFileCount() == 1) {
385 if (
om_->mergeJob()) {
389 filePtr_->SetCompressionSettings(
infile->GetCompressionSettings());
444 if (
om_->compactEventAuxiliary() &&
447 long long int reserve = remainingEvents;
448 if (fb.
tree() !=
nullptr) {
449 reserve = reserve > 0 ?
std::min(fb.
tree()->GetEntries(), reserve) : fb.
tree()->GetEntries();
459 if (not
om_->compactEventAuxiliary()) {
467 unsigned int const oneK = 1024;
469 return (
size >=
om_->maxFileSize());
489 if (reg->anyProductProduced() || !
om_->wantAllEvents()) {
490 esids.push_back(
om_->selectorConfig());
495 unsigned int ttreeIndex =
InEvent;
517 if (
om_->compactEventAuxiliary()) {
523 reportSvc->eventWrittenToFile(
reportToken_,
e.id().run(),
e.id().event());
540 unsigned int ttreeIndex =
InLumi;
563 unsigned int ttreeIndex =
InRun;
573 std::vector<std::string>
const& processesWithProcessBlockProducts =
574 om_->outputProcessBlockHelper().processesWithProcessBlockProducts();
575 std::vector<std::string>::const_iterator it =
576 std::find(processesWithProcessBlockProducts.cbegin(), processesWithProcessBlockProducts.cend(),
processName);
577 if (it == processesWithProcessBlockProducts.cend()) {
582 treePointers_[ttreeIndex]->optimizeBaskets(10ULL * 1024 * 1024);
595 orderedIDs[parentageID.second] = parentageID.first;
598 for (
auto const& orderedID : orderedIDs) {
626 ex <<
"The number of entries in at least one output TBranch whose entries\n" 627 "were copied from the input does not match the number of events\n" 628 "recorded in IndexIntoFile. This might (or might not) indicate a\n" 629 "problem related to fast copy.";
630 ex.
addContext(
"Calling RootOutputFile::writeIndexIntoFile");
677 ProductList& pList =
const_cast<ProductList&
>(pReg.productList());
678 for (
auto const&
prod : pList) {
679 if (
prod.second.branchID() !=
prod.second.originalBranchID()) {
686 for (ProductList::iterator it = pList.begin(); it != pList.end();) {
689 ProductList::iterator itCopy = it;
720 constexpr std::size_t maxEaBasketSize = 4 * 1024 * 1024;
722 if (
om_->compactEventAuxiliary()) {
726 tree->SetBranchStatus(bname,
true);
730 tree->SetBasketSize(bname, basketsize);
731 auto b =
tree->GetBranch(bname);
735 LogDebug(
"writeEventAuxiliary") <<
"EventAuxiliary ratio extras/GUIDs/all = " 740 const auto ea =
aux.eventAuxiliary();
750 if (!
om_->outputProcessBlockHelper().processesWithProcessBlockProducts().empty()) {
752 om_->outputProcessBlockHelper().processesWithProcessBlockProducts());
753 om_->outputProcessBlockHelper().fillCacheIndices(storedProcessBlockHelper);
789 treePointer->close();
790 treePointer =
nullptr;
803 if (
tree &&
tree->GetNbranches() != 0) {
825 std::set<BranchID>
const& iProducedIDs,
826 std::set<StoredProductProvenance>& oToFill) {
832 for (
auto const& parentID : parentIDs) {
837 (iProducedIDs.end() != iProducedIDs.find(
info->branchID()))) {
849 unsigned int ttreeIndex,
852 std::vector<std::unique_ptr<WrapperBase> > dummies;
856 bool const doProvenance =
861 std::set<StoredProductProvenance> provenanceToKeep;
867 std::set<BranchID> producedBranches;
870 for (
auto bd : preg->allBranchDescriptions()) {
871 if (
bd->produced() &&
bd->branchType() ==
InEvent) {
872 producedBranches.insert(
bd->branchID());
879 BranchID const&
id =
item.branchDescription()->branchID();
882 bool produced =
item.branchDescription()->produced();
885 bool keepProvenance = doProvenance && (produced || keepProvenanceForPrior);
891 product =
result.wrapper();
892 if (
result.isValid() && keepProvenance) {
893 productProvenance =
result.provenance()->productProvenance();
895 if (product ==
nullptr) {
898 TClass*
cp =
item.branchDescription()->wrappedType().getClass();
903 product =
dummy.get();
906 item.setProduct(product);
908 if (keepProvenance && productProvenance ==
nullptr) {
911 if (productProvenance) {
913 insertAncestors(*productProvenance, provRetriever, produced, producedBranches, provenanceToKeep);
918 productProvenanceVecPtr->assign(provenanceToKeep.begin(), provenanceToKeep.end());
921 productProvenanceVecPtr->clear();
925 std::set<edm::StoredProductProvenance>& oToInsert) {
928 std::set<edm::StoredProductProvenance>::iterator itFound = oToInsert.find(toStore);
929 if (itFound == oToInsert.end()) {
931 std::pair<std::map<edm::ParentageID, unsigned int>::iterator,
bool>
i =
936 <<
"RootOutputFile::insertProductProvenance\n" 938 <<
" is out of bounds. The maximum value is currently " <<
parentageIDs_.size() - 1 <<
".\n" 939 <<
"This should never happen.\n" 940 <<
"Please report this to the framework developers.";
943 oToInsert.insert(toStore);
size_type guidsSize() const
PoolOutputModule::OutputItemList OutputItemList
std::string const & metaDataTreeName()
void beginInputFile(FileBlock const &fb, int remainingEvents)
LuminosityBlockNumber_t luminosityBlock() const
LuminosityBlockAuxiliary lumiAux_
std::string const & fid() const
std::string const & fileIdentifierBranchName()
ParentageID const & parentageID() const
std::string const & mergeableRunProductMetadataBranchName()
std::vector< BranchIDList > BranchIDLists
std::string const & processBlockHelperBranchName()
EventNumber_t event() const
void writeProcessHistoryRegistry()
bool checkEntriesInReadBranches(Long64_t expectedNumberOfEntries) const
size_type extrasSize() const
void push_back(const EventAuxiliary &ea)
void setSubBranchBasketSizes(TTree *inputTree) const
void setBranchAliases(TTree *tree, SelectedProducts const &branches, std::string const &processName) const
LuminosityBlockNumber_t luminosityBlock() const
std::string const & fileName() const
RootOutputFile(PoolOutputModule *om, std::string const &fileName, std::string const &logicalFileName, std::vector< std::string > const &processesWithSelectedMergeableRunProducts, std::string const &overrideGUID)
BranchType const & branchType() const
void insertAncestors(ProductProvenance const &iGetParents, ProductProvenanceRetriever const *iMapper, bool produced, std::set< BranchID > const &producedBranches, std::set< StoredProductProvenance > &oToFill)
void fillParameterSetBranch(TTree *parameterSetsTree, int basketSize)
std::map< BranchKey, BranchDescription > ProductList
bool registerProcessHistory(ProcessHistory const &processHistory)
std::shared_ptr< TFile const > filePtr() const
std::map< ParentageID, unsigned int > parentageIDs_
void fillProcessHistoryBranch(TTree *metaDataTree, int basketSize, ProcessHistoryRegistry const &processHistoryRegistry)
void writeEventAuxiliary()
edm::propagate_const< TTree * > metaDataTree_
std::string const & processName() const
unsigned long nEventsInLumi_
ProcessHistoryRegistry processHistoryRegistry_
std::vector< BranchID > const & parents() const
std::string const & productDependenciesBranchName()
void writeLuminosityBlock(LuminosityBlockForOutput const &lb)
void reserve(std::size_t size)
void setAutoFlush(Long64_t size)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
BasicHandle getByToken(EDGetToken token, TypeID const &typeID) const
std::string const & fileFormatVersionBranchName()
void writeProcessBlockHelper()
std::set< BranchID > branchesWithStoredHistory_
std::vector< edm::propagate_const< RootOutputTree * > > treePointers_
std::vector< EventSelectionID > EventSelectionIDVector
std::string const & productDescriptionBranchName()
void sortVector_Run_Or_Lumi_Entries()
std::string const & branchIDListBranchName()
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
constexpr element_type const * get() const
bool getMapped(key_type const &k, value_type &result) const
bool checkSplitLevelsAndBasketSizes(TTree *inputTree) const
void writeThinnedAssociationsHelper()
unsigned int parentageIDIndex_
RootOutputTree eventTree_
std::vector< BranchListIndex > BranchListIndexes
void addBranch(std::string const &branchName, std::string const &className, void const *&pProd, int splitLevel, int basketSize, bool produced)
edm::propagate_const< PoolOutputModule * > om_
std::string const & branchName() const
IndexIntoFile::EntryNumber_t eventEntryNumber_
LuminosityBlockAuxiliary const * pLumiAux_
std::string const & productInstanceName() const
edm::propagate_const< std::shared_ptr< TFile > > filePtr_
std::string const & parameterSetsTreeName()
LuminosityBlockNumber_t luminosityBlock() const
void writeOne(EventForOutput const &e)
std::vector< OutputItemList > const & selectedOutputItemList() const
std::string const & BranchTypeToProductProvenanceBranchName(BranchType const &BranchType)
int getFileFormatVersion()
ProcessHistoryID const & reducedProcessHistoryID(ProcessHistoryID const &fullID) const
void fillBranches(BranchType const &branchType, OccurrenceForOutput const &occurrence, unsigned int ttreeIndex, StoredProductProvenanceVector *productProvenanceVecPtr=nullptr, ProductProvenanceRetriever const *provRetriever=nullptr)
ProductProvenance const * branchIDToProvenance(BranchID const &bid) const
std::string const & parentageBranchName()
void setProcessHistoryID(ProcessHistoryID const &phid)
std::string const & processName() const
std::string const & eventToProcessBlockIndexesBranchName()
CompactEventAuxiliaryVector compactEventAuxiliary_
void addEntry(ProcessHistoryID const &processHistoryID, RunNumber_t run, LuminosityBlockNumber_t lumi, EventNumber_t event, EntryNumber_t entry)
edm::propagate_const< TClass * > wrapperBaseTClass_
void writeProductDependencies()
void writeBranchIDListRegistry()
static TTree * makeTTree(TFile *filePtr, std::string const &name, int splitLevel)
ProcessHistoryID const & processHistoryID() const
EventSelectionIDVector const * pEventSelectionIDs_
ProcessHistoryID const & processHistoryID() const
std::string createGlobalIdentifier(bool binary=false)
Log< level::Info, false > LogInfo
edm::propagate_const< TTree * > parameterSetsTree_
StoredMergeableRunProductMetadata storedMergeableRunProductMetadata_
void writeStoredMergeableRunProductMetadata()
void writeIndexIntoFile()
BranchID const & branchID() const
void writeFileIdentifier()
std::string const & eventSelectionsBranchName()
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
edm::propagate_const< StoredProductProvenanceVector * > pEventEntryInfoVector_
std::string const & parentageTreeName()
LuminosityBlockID const & id() const
void addContext(std::string const &context)
LuminosityBlockAuxiliary const & luminosityBlockAuxiliary() const
edm::propagate_const< TTree * > parentageTree_
bool insertProductProvenance(const ProductProvenance &, std::set< StoredProductProvenance > &oToInsert)
void setException(std::exception_ptr e)
void addAuxiliary(std::string const &branchName, T const *&pAux, int bufSize, bool allowCloning=true)
std::vector< StoredProductProvenance > StoredProductProvenanceVector
void writeParameterSetRegistry()
void respondToCloseInputFile(FileBlock const &fb)
IndexIntoFile indexIntoFile_
IndexIntoFile::EntryNumber_t runEntryNumber_
bool isValidGlobalIdentifier(std::string const &guid)
virtual ProcessHistory const & processHistory() const
static void writeTTree(TTree *tree)
TTree const * tree() const
std::string moduleName(StableProvenance const &provenance, ProcessHistory const &history)
EventAuxiliary const * pEventAux_
StoredProductProvenanceVector const * pEventEntryInfoVector() const
void writeParentageRegistry()
std::exception_ptr getException()
void writeProductDescriptionRegistry()
int eventAutoFlushSize() const
void writeProcessBlock(ProcessBlockForOutput const &)
std::unique_ptr< WrapperBase > getWrapperBasePtr(void *p, int offset)
std::string const & branchListIndexesBranchName()
void setProcessHistoryID(ProcessHistoryID const &phid)
void writeFileFormatVersion()
BranchListIndexes const * pBranchListIndexes_
Log< level::Warning, false > LogWarning
void maybeFastCloneTree(bool canFastClone, bool canFastCloneAux, TTree *tree, std::string const &option)
int whyNotFastClonable() const
std::string const & indexIntoFileBranchName()
std::string const & BranchTypeToAuxiliaryBranchName(BranchType const &branchType)
std::vector< edm::propagate_const< std::unique_ptr< RootOutputTree > > > processBlockTrees_
static ParentageRegistry * instance()
bool shouldWeCloseFile() const
IndexIntoFile::EntryNumber_t lumiEntryNumber_
void optimizeBaskets(ULong64_t size)
void writeRun(RunForOutput const &r)
std::string const & moduleLabel() const
std::string toString() const
RunAuxiliary const * pRunAux_
bool checkIfFastClonable(TTree *inputTree) const
JobReport::Token reportToken_
std::set< std::string > const & branchAliases() const
EventToProcessBlockIndexes const * pEventToProcessBlockIndexes_
std::string const & thinnedAssociationsHelperBranchName()
std::string match(BranchDescription const &a, BranchDescription const &b, std::string const &fileName)
Parentage const & parentage() const