CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/IOPool/Input/src/RootInputFileSequence.cc

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------
00002 ----------------------------------------------------------------------*/
00003 #include "RootInputFileSequence.h"
00004 #include "PoolSource.h"
00005 #include "RootFile.h"
00006 #include "RootTree.h"
00007 #include "DuplicateChecker.h"
00008 
00009 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
00010 #include "FWCore/Framework/interface/EventPrincipal.h"
00011 #include "FWCore/Framework/interface/FileBlock.h"
00012 #include "FWCore/Framework/src/PrincipalCache.h"
00013 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00014 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00015 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00016 #include "FWCore/ServiceRegistry/interface/Service.h"
00017 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00018 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00019 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00020 
00021 #include "CLHEP/Random/RandFlat.h"
00022 #include "InputFile.h"
00023 #include "TSystem.h"
00024 
00025 namespace edm {
00026   namespace {
00027     std::string const streamerInfo = std::string("StreamerInfo");
00028   }
00029   RootInputFileSequence::RootInputFileSequence(
00030                 ParameterSet const& pset,
00031                 PoolSource const& input,
00032                 InputFileCatalog const& catalog,
00033                 PrincipalCache& cache,
00034                 bool primaryFiles) :
00035     input_(input),
00036     catalog_(catalog),
00037     firstFile_(true),
00038     fileIterBegin_(fileCatalogItems().begin()),
00039     fileIterEnd_(fileCatalogItems().end()),
00040     fileIter_(fileIterEnd_),
00041     fileIterLastOpened_(fileIterEnd_),
00042     rootFile_(),
00043     parametersMustMatch_(BranchDescription::Permissive),
00044     branchesMustMatch_(BranchDescription::Permissive),
00045     flatDistribution_(),
00046     indexesIntoFiles_(fileCatalogItems().size()),
00047     orderedProcessHistoryIDs_(),
00048     eventSkipperByID_(primaryFiles ? EventSkipperByID::create(pset).release() : 0),
00049     eventsRemainingInFile_(0),
00050     // The default value provided as the second argument to the getUntrackedParameter function call
00051     // is not used when the ParameterSet has been validated and the parameters are not optional
00052     // in the description.  This is currently true when PoolSource is the primary input source.
00053     // The modules that use PoolSource as a SecSource have not defined their fillDescriptions function
00054     // yet, so the ParameterSet does not get validated yet.  As soon as all the modules with a SecSource
00055     // have defined descriptions, the defaults in the getUntrackedParameterSet function calls can
00056     // and should be deleted from the code.
00057     numberOfEventsToSkip_(primaryFiles ? pset.getUntrackedParameter<unsigned int>("skipEvents", 0U) : 0U),
00058     noEventSort_(primaryFiles ? pset.getUntrackedParameter<bool>("noEventSort", true) : false),
00059     skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles", false)),
00060     treeCacheSize_(noEventSort_ ? pset.getUntrackedParameter<unsigned int>("cacheSize", roottree::defaultCacheSize) : 0U),
00061     treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize", -1)),
00062     setRun_(pset.getUntrackedParameter<unsigned int>("setRunNumber", 0U)),
00063     groupSelectorRules_(pset, "inputCommands", "InputSource"),
00064     primaryFiles_(primaryFiles),
00065     duplicateChecker_(primaryFiles ? new DuplicateChecker(pset) : 0),
00066     dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches", primary())),
00067     usingGoToEvent_(false) {
00068 
00069     //we now allow the site local config to specify what the TTree cache size should be
00070     Service<SiteLocalConfig> pSLC;
00071     if(treeCacheSize_ != 0U && pSLC.isAvailable() && pSLC->sourceTTreeCacheSize()) {
00072       treeCacheSize_ = *(pSLC->sourceTTreeCacheSize());
00073     }
00074     
00075     if(primaryFiles_) {
00076       //NOTE: we do not want to stage in secondary files since we can be given a list of
00077       // thousands of files and prestaging all those files can cause a site to fail
00078       StorageFactory *factory = StorageFactory::get();
00079       for(fileIter_ = fileIterBegin_; fileIter_ != fileIterEnd_; ++fileIter_) {
00080         factory->activateTimeout(fileIter_->fileName());
00081         factory->stagein(fileIter_->fileName());
00082       }
00083     }
00084 
00085     std::string parametersMustMatch = pset.getUntrackedParameter<std::string>("parametersMustMatch", std::string("permissive"));
00086     if(parametersMustMatch == std::string("strict")) parametersMustMatch_ = BranchDescription::Strict;
00087 
00088     std::string branchesMustMatch = pset.getUntrackedParameter<std::string>("branchesMustMatch", std::string("permissive"));
00089     if(branchesMustMatch == std::string("strict")) branchesMustMatch_ = BranchDescription::Strict;
00090 
00091     if(primary()) {
00092       for(fileIter_ = fileIterBegin_; fileIter_ != fileIterEnd_; ++fileIter_) {
00093         initFile(skipBadFiles_);
00094         if(rootFile_) break;
00095       }
00096       if(rootFile_) {
00097         productRegistryUpdate().updateFromInput(rootFile_->productRegistry()->productList());
00098         if(numberOfEventsToSkip_ != 0) {
00099           skipEvents(numberOfEventsToSkip_, cache);
00100         }
00101       }
00102     }
00103   }
00104 
00105   std::vector<FileCatalogItem> const&
00106   RootInputFileSequence::fileCatalogItems() const {
00107     return catalog_.fileCatalogItems();
00108   }
00109 
00110   void
00111   RootInputFileSequence::endJob() {
00112     closeFile_();
00113   }
00114 
00115   boost::shared_ptr<FileBlock>
00116   RootInputFileSequence::readFile_(PrincipalCache& cache) {
00117     if(firstFile_) {
00118       // The first input file has already been opened.
00119       firstFile_ = false;
00120       if(!rootFile_) {
00121         initFile(skipBadFiles_);
00122       }
00123     } else {
00124       if(!nextFile(cache)) {
00125         assert(0);
00126       }
00127     }
00128     if(!rootFile_) {
00129       return boost::shared_ptr<FileBlock>(new FileBlock);
00130     }
00131     return rootFile_->createFileBlock();
00132   }
00133 
00134   void RootInputFileSequence::closeFile_() {
00135     // close the currently open file, if any, and delete the RootFile object.
00136     if(rootFile_) {
00137       if (primary()) {
00138         std::auto_ptr<InputSource::FileCloseSentry>
00139         sentry((primaryFiles_) ? new InputSource::FileCloseSentry(input_) : 0);
00140         rootFile_->close();
00141         if(duplicateChecker_) duplicateChecker_->inputFileClosed();
00142       }
00143       rootFile_.reset();
00144     }
00145   }
00146 
00147   void RootInputFileSequence::initFile(bool skipBadFiles) {
00148     // We are really going to close the open file.
00149 
00150     // If this is the primary sequence, we are not duplicate checking across files
00151     // and we are not using random access to find events, then we can delete the
00152     // IndexIntoFile for the file we are closing. If we can't delete all of it,
00153     // then we can delete the parts we do not need.
00154     if (fileIterLastOpened_ != fileIterEnd_) {
00155       size_t currentIndexIntoFile = fileIterLastOpened_ - fileIterBegin_;
00156       bool needIndexesForDuplicateChecker = duplicateChecker_ && duplicateChecker_->checkingAllFiles() && !duplicateChecker_->checkDisabled();
00157       bool deleteIndexIntoFile = primaryFiles_ &&
00158                                  !needIndexesForDuplicateChecker &&
00159                                  !usingGoToEvent_;
00160       if (deleteIndexIntoFile) {
00161               indexesIntoFiles_[currentIndexIntoFile].reset();
00162       } else {
00163               if (indexesIntoFiles_[currentIndexIntoFile]) indexesIntoFiles_[currentIndexIntoFile]->inputFileClosed();
00164       }
00165       fileIterLastOpened_ = fileIterEnd_;
00166     }
00167     closeFile_();
00168 
00169     // Determine whether we have a fallback URL specified; if so, prepare it;
00170     // Only valid if it is non-empty and differs from the original filename.
00171     std::string fallbackName = fileIter_->fallbackFileName();
00172     bool hasFallbackUrl = (!fallbackName.empty()) || (fallbackName == fileIter_->fileName());
00173 
00174     boost::shared_ptr<InputFile> filePtr;
00175     try {
00176       std::auto_ptr<InputSource::FileOpenSentry>
00177         sentry(primaryFiles_ ? new InputSource::FileOpenSentry(input_) : 0);
00178       filePtr.reset(new InputFile(gSystem->ExpandPathName(fileIter_->fileName().c_str()), "  Initiating request to open file "));
00179     }
00180     catch (cms::Exception const& e) {
00181       if(!skipBadFiles  && !hasFallbackUrl) {
00182         if(e.explainSelf().find(streamerInfo) != std::string::npos) {
00183           throw Exception(errors::FileReadError) << e.explainSelf() << "\n" <<
00184             "RootInputFileSequence::initFile(): Input file " << fileIter_->fileName() << " could not be read properly.\n" <<
00185             "Possibly the format is incompatible with the current release.\n";
00186         }
00187         throw Exception(errors::FileOpenError) << e.explainSelf() << "\n" <<
00188            "RootInputFileSequence::initFile(): Input file " << fileIter_->fileName() << " was not found, could not be opened, or is corrupted.\n";
00189       }
00190     }
00191     if(!filePtr && (hasFallbackUrl)) {
00192       try {
00193         std::auto_ptr<InputSource::FileOpenSentry>
00194           sentry(primaryFiles_ ? new InputSource::FileOpenSentry(input_) : 0);
00195         filePtr.reset(new InputFile(gSystem->ExpandPathName(fallbackName.c_str()), "  Fallback request to file "));
00196       }
00197       catch (cms::Exception const& e) {
00198         if(!skipBadFiles) {
00199           if(e.explainSelf().find(streamerInfo) != std::string::npos) {
00200             throw Exception(errors::FileReadError) << e.explainSelf() << "\n" <<
00201               "RootInputFileSequence::initFile(): Input file " << fileIter_->fileName() << " could not be read properly.\n" <<
00202               "Possibly the format is incompatible with the current release.\n";
00203           }
00204           throw Exception(errors::FileOpenError) << e.explainSelf() << "\n" <<
00205              "RootInputFileSequence::initFile(): Input fallback file " << fallbackName << " was not found, could not be opened, or is corrupted.\n";
00206         }
00207       }
00208     }
00209     if(filePtr) {
00210       std::vector<boost::shared_ptr<IndexIntoFile> >::size_type currentIndexIntoFile = fileIter_ - fileIterBegin_;
00211       rootFile_ = RootFileSharedPtr(new RootFile(fileIter_->fileName(),
00212           processConfiguration(), fileIter_->logicalFileName(), filePtr,
00213           eventSkipperByID_, numberOfEventsToSkip_ != 0,
00214           remainingEvents(), remainingLuminosityBlocks(), treeCacheSize_, treeMaxVirtualSize_,
00215           input_.processingMode(),
00216           setRun_,
00217           noEventSort_,
00218           groupSelectorRules_, !primaryFiles_, duplicateChecker_, dropDescendants_,
00219                                                  indexesIntoFiles_, currentIndexIntoFile, orderedProcessHistoryIDs_, usingGoToEvent_));
00220 
00221       fileIterLastOpened_ = fileIter_;
00222       indexesIntoFiles_[currentIndexIntoFile] = rootFile_->indexIntoFileSharedPtr();
00223       rootFile_->reportOpened(primary() ?
00224          (primaryFiles_ ? "primaryFiles" : "secondaryFiles") : "mixingFiles");
00225     } else {
00226       if(!skipBadFiles) {
00227         throw Exception(errors::FileOpenError) <<
00228            "RootInputFileSequence::initFile(): Input file " << fileIter_->fileName() << " was not found or could not be opened.\n";
00229       }
00230       LogWarning("") << "Input file: " << fileIter_->fileName() << " was not found or could not be opened, and will be skipped.\n";
00231     }
00232   }
00233 
00234   boost::shared_ptr<ProductRegistry const>
00235   RootInputFileSequence::fileProductRegistry() const {
00236     return rootFile_->productRegistry();
00237   }
00238 
00239   bool RootInputFileSequence::nextFile(PrincipalCache& cache) {
00240     if(fileIter_ != fileIterEnd_) ++fileIter_;
00241     if(fileIter_ == fileIterEnd_) {
00242       if(primaryFiles_) {
00243         return false;
00244       } else {
00245         fileIter_ = fileIterBegin_;
00246       }
00247     }
00248 
00249     initFile(skipBadFiles_);
00250 
00251     if(primaryFiles_ && rootFile_) {
00252       size_t size = productRegistry()->size();
00253       // make sure the new product registry is compatible with the main one
00254       std::string mergeInfo = productRegistryUpdate().merge(*rootFile_->productRegistry(),
00255                                                             fileIter_->fileName(),
00256                                                             parametersMustMatch_,
00257                                                             branchesMustMatch_);
00258       if(!mergeInfo.empty()) {
00259         throw Exception(errors::MismatchedInputFiles,"RootInputFileSequence::nextFile()") << mergeInfo;
00260       }
00261       if (productRegistry()->size() > size) {
00262         cache.adjustIndexesAfterProductRegistryAddition();
00263       }
00264       cache.adjustEventToNewProductRegistry(productRegistry());
00265     }
00266     return true;
00267   }
00268 
00269   bool RootInputFileSequence::previousFile(PrincipalCache& cache) {
00270     if(fileIter_ == fileIterBegin_) {
00271       if(primaryFiles_) {
00272         return false;
00273       } else {
00274         fileIter_ = fileIterEnd_;
00275       }
00276     }
00277     --fileIter_;
00278 
00279     initFile(false);
00280 
00281     if(primaryFiles_ && rootFile_) {
00282       size_t size = productRegistry()->size();
00283       // make sure the new product registry is compatible to the main one
00284       std::string mergeInfo = productRegistryUpdate().merge(*rootFile_->productRegistry(),
00285                                                             fileIter_->fileName(),
00286                                                             parametersMustMatch_,
00287                                                             branchesMustMatch_);
00288       if(!mergeInfo.empty()) {
00289         throw Exception(errors::MismatchedInputFiles,"RootInputFileSequence::previousEvent()") << mergeInfo;
00290       }
00291       if (productRegistry()->size() > size) {
00292         cache.adjustIndexesAfterProductRegistryAddition();
00293       }
00294       cache.adjustEventToNewProductRegistry(productRegistry());
00295     }
00296     if(rootFile_) rootFile_->setToLastEntry();
00297     return true;
00298   }
00299 
00300   RootInputFileSequence::~RootInputFileSequence() {
00301   }
00302 
00303   boost::shared_ptr<RunAuxiliary>
00304   RootInputFileSequence::readRunAuxiliary_() {
00305     boost::shared_ptr<RunAuxiliary> aux = rootFile_->readRunAuxiliary_();
00306     return aux;
00307   }
00308 
00309   boost::shared_ptr<LuminosityBlockAuxiliary>
00310   RootInputFileSequence::readLuminosityBlockAuxiliary_() {
00311     boost::shared_ptr<LuminosityBlockAuxiliary> aux = rootFile_->readLuminosityBlockAuxiliary_();
00312     return aux;
00313   }
00314 
00315   boost::shared_ptr<RunPrincipal>
00316   RootInputFileSequence::readRun_(boost::shared_ptr<RunPrincipal> rpCache) {
00317     return rootFile_->readRun_(rpCache);
00318   }
00319 
00320   boost::shared_ptr<LuminosityBlockPrincipal>
00321   RootInputFileSequence::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00322     return rootFile_->readLumi(lbCache);
00323   }
00324 
00325   // readEvent() is responsible for setting up the EventPrincipal.
00326   //
00327   //   1. create an EventPrincipal with a unique EventID
00328   //   2. For each entry in the provenance, put in one Group,
00329   //      holding the Provenance for the corresponding EDProduct.
00330   //   3. set up the caches in the EventPrincipal to know about this
00331   //      Group.
00332   //
00333   // We do *not* create the EDProduct instance (the equivalent of reading
00334   // the branch containing this EDProduct. That will be done by the Delayed Reader,
00335   //  when it is asked to do so.
00336   //
00337 
00338   EventPrincipal*
00339   RootInputFileSequence::readEvent(EventPrincipal& cache, boost::shared_ptr<LuminosityBlockPrincipal> lb) {
00340     return rootFile_->readEvent(cache, rootFile_, lb);
00341   }
00342 
00343   InputSource::ItemType
00344   RootInputFileSequence::getNextItemType() {
00345     if(fileIter_ == fileIterEnd_) {
00346       return InputSource::IsStop;
00347     }
00348     if(firstFile_) {
00349       return InputSource::IsFile;
00350     }
00351     if(rootFile_) {
00352       IndexIntoFile::EntryType entryType = rootFile_->getNextEntryTypeWanted();
00353       if(entryType == IndexIntoFile::kEvent) {
00354         return InputSource::IsEvent;
00355       } else if(entryType == IndexIntoFile::kLumi) {
00356         return InputSource::IsLumi;
00357       } else if(entryType == IndexIntoFile::kRun) {
00358         return InputSource::IsRun;
00359       }
00360       assert(entryType == IndexIntoFile::kEnd);
00361     }
00362     if(fileIter_ + 1 == fileIterEnd_) {
00363       return InputSource::IsStop;
00364     }
00365     return InputSource::IsFile;
00366   }
00367 
00368   // Rewind to before the first event that was read.
00369   void
00370   RootInputFileSequence::rewind_() {
00371     if (fileIter_ != fileIterBegin_) {
00372       closeFile_();
00373       fileIter_ = fileIterBegin_;
00374     }
00375     if (!rootFile_) {
00376       initFile(false);
00377     }
00378     rewindFile();
00379     firstFile_ = true;
00380   }
00381 
00382   // Rewind to the beginning of the current file
00383   void
00384   RootInputFileSequence::rewindFile() {
00385     rootFile_->rewind();
00386   }
00387 
00388   void
00389   RootInputFileSequence::reset(PrincipalCache& cache) {
00390     //NOTE: Need to handle duplicate checker
00391     // Also what if skipBadFiles_==true and the first time we succeeded but after a reset we fail?
00392     if(primary()) {
00393       firstFile_ = true;
00394       for(fileIter_ = fileIterBegin_; fileIter_ != fileIterEnd_; ++fileIter_) {
00395         initFile(skipBadFiles_);
00396         if(rootFile_) break;
00397       }
00398       if(rootFile_) {
00399         if(numberOfEventsToSkip_ != 0) {
00400           skipEvents(numberOfEventsToSkip_, cache);
00401         }
00402       }
00403     }
00404   }
00405 
00406   // Advance "offset" events.  Offset can be positive or negative (or zero).
00407   bool
00408   RootInputFileSequence::skipEvents(int offset, PrincipalCache& cache) {
00409     assert (numberOfEventsToSkip_ == 0 || numberOfEventsToSkip_ == offset);
00410     numberOfEventsToSkip_ = offset;
00411     while(numberOfEventsToSkip_ != 0) {
00412       bool atEnd = rootFile_->skipEvents(numberOfEventsToSkip_);
00413       if((numberOfEventsToSkip_ > 0 || atEnd) && !nextFile(cache)) {
00414         numberOfEventsToSkip_ = 0;
00415         return false;
00416       }
00417       if(numberOfEventsToSkip_ < 0 && !previousFile(cache)) {
00418         numberOfEventsToSkip_ = 0;
00419         fileIter_ = fileIterEnd_;
00420         return false;
00421       }
00422     }
00423     return true;
00424   }
00425 
00426   bool
00427   RootInputFileSequence::goToEvent(EventID const& eventID, PrincipalCache& cache) {
00428     usingGoToEvent_ = true;
00429     if (rootFile_) {
00430       if (rootFile_->goToEvent(eventID)) {
00431         return true;
00432       }
00433       // If only one input file, give up now, to save time.
00434       if(rootFile_ && indexesIntoFiles_.size() == 1) {
00435         return false;
00436       }
00437       // Save the current file and position so that we can restore them
00438       // if we fail to restore the desired event
00439       bool closedOriginalFile = false;
00440       std::vector<FileCatalogItem>::const_iterator originalFile = fileIter_;
00441       IndexIntoFile::IndexIntoFileItr originalPosition = rootFile_->indexIntoFileIter();
00442 
00443       // Look for item (run/lumi/event) in files previously opened without reopening unnecessary files.
00444       typedef std::vector<boost::shared_ptr<IndexIntoFile> >::const_iterator Iter;
00445       for(Iter it = indexesIntoFiles_.begin(), itEnd = indexesIntoFiles_.end(); it != itEnd; ++it) {
00446         if(*it && (*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
00447           // We found it. Close the currently open file, and open the correct one.
00448           fileIter_ = fileIterBegin_ + (it - indexesIntoFiles_.begin());
00449           initFile(false);
00450           // Now get the item from the correct file.
00451           bool found = rootFile_->goToEvent(eventID);
00452           assert (found);
00453           return true;
00454         }
00455       }
00456       // Look for item in files not yet opened.
00457       for(Iter it = indexesIntoFiles_.begin(), itEnd = indexesIntoFiles_.end(); it != itEnd; ++it) {
00458         if(!*it) {
00459           fileIter_ = fileIterBegin_ + (it - indexesIntoFiles_.begin());
00460           initFile(false);
00461           closedOriginalFile = true;
00462           if ((*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
00463             if  (rootFile_->goToEvent(eventID)) {
00464               return true;
00465             }
00466           }
00467         }
00468       }
00469       if (closedOriginalFile) {
00470         fileIter_ = originalFile;
00471         initFile(false);
00472         rootFile_->setPosition(originalPosition);
00473       }
00474     }
00475     return false;
00476   }
00477 
00478   bool
00479   RootInputFileSequence::skipToItem(RunNumber_t run, LuminosityBlockNumber_t lumi, EventNumber_t event) {
00480     // Attempt to find item in currently open input file.
00481     bool found = rootFile_ && rootFile_->setEntryAtItem(run, lumi, event);
00482     if(!found) {
00483       // If only one input file, give up now, to save time.
00484       if(rootFile_ && indexesIntoFiles_.size() == 1) {
00485         return false;
00486       }
00487       // Look for item (run/lumi/event) in files previously opened without reopening unnecessary files.
00488       typedef std::vector<boost::shared_ptr<IndexIntoFile> >::const_iterator Iter;
00489       for(Iter it = indexesIntoFiles_.begin(), itEnd = indexesIntoFiles_.end(); it != itEnd; ++it) {
00490         if(*it && (*it)->containsItem(run, lumi, event)) {
00491           // We found it. Close the currently open file, and open the correct one.
00492           fileIter_ = fileIterBegin_ + (it - indexesIntoFiles_.begin());
00493           initFile(false);
00494           // Now get the item from the correct file.
00495           found = rootFile_->setEntryAtItem(run, lumi, event);
00496           assert (found);
00497           return true;
00498         }
00499       }
00500       // Look for item in files not yet opened.
00501       for(Iter it = indexesIntoFiles_.begin(), itEnd = indexesIntoFiles_.end(); it != itEnd; ++it) {
00502         if(!*it) {
00503           fileIter_ = fileIterBegin_ + (it - indexesIntoFiles_.begin());
00504           initFile(false);
00505           found = rootFile_->setEntryAtItem(run, lumi, event);
00506           if(found) {
00507             return true;
00508           }
00509         }
00510       }
00511       // Not found
00512       return false;
00513     }
00514     return true;
00515   }
00516 
00517   bool const
00518   RootInputFileSequence::primary() const {
00519     return input_.primary();
00520   }
00521 
00522   ProcessConfiguration const&
00523   RootInputFileSequence::processConfiguration() const {
00524     return input_.processConfiguration();
00525   }
00526 
00527   int
00528   RootInputFileSequence::remainingEvents() const {
00529     return input_.remainingEvents();
00530   }
00531 
00532   int
00533   RootInputFileSequence::remainingLuminosityBlocks() const {
00534     return input_.remainingLuminosityBlocks();
00535   }
00536 
00537   ProductRegistry &
00538   RootInputFileSequence::productRegistryUpdate() const{
00539     return input_.productRegistryUpdate();
00540   }
00541 
00542   boost::shared_ptr<ProductRegistry const>
00543   RootInputFileSequence::productRegistry() const{
00544     return input_.productRegistry();
00545   }
00546 
00547   void
00548   RootInputFileSequence::dropUnwantedBranches_(std::vector<std::string> const& wantedBranches) {
00549     std::vector<std::string> rules;
00550     rules.reserve(wantedBranches.size() + 1);
00551     rules.push_back(std::string("drop *"));
00552     for(std::vector<std::string>::const_iterator it = wantedBranches.begin(), itEnd = wantedBranches.end();
00553         it != itEnd; ++it) {
00554       rules.push_back("keep " + *it + "_*");
00555     }
00556     ParameterSet pset;
00557     pset.addUntrackedParameter("inputCommands", rules);
00558     groupSelectorRules_ = GroupSelectorRules(pset, "inputCommands", "InputSource");
00559   }
00560 
00561   void
00562   RootInputFileSequence::readMany(int number, EventPrincipalVector& result) {
00563     for(int i = 0; i < number; ++i) {
00564       boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(rootFile_->productRegistry(), processConfiguration()));
00565       EventPrincipal* ev = rootFile_->readCurrentEvent(*ep, rootFile_);
00566       if(ev == 0) {
00567         return;
00568       }
00569       assert(ev == ep.get());
00570       result.push_back(ep);
00571       rootFile_->nextEventEntry();
00572     }
00573   }
00574 
00575   void
00576   RootInputFileSequence::readManyRandom(int number, EventPrincipalVector& result, unsigned int& fileSeqNumber) {
00577     if(0 != number && (fileIterEnd_ == fileIterBegin_) ) {
00578       throw Exception(errors::Configuration) << "RootInputFileSequence::readManyRandom(): no input files specified.\n";
00579     }
00580     result.reserve(number);
00581     if (!flatDistribution_) {
00582       Service<RandomNumberGenerator> rng;
00583       CLHEP::HepRandomEngine& engine = rng->getEngine();
00584       flatDistribution_.reset(new CLHEP::RandFlat(engine));
00585     }
00586     skipBadFiles_ = false;
00587     unsigned int currentSeqNumber = fileIter_ - fileIterBegin_;
00588     while(eventsRemainingInFile_ < number) {
00589       fileIter_ = fileIterBegin_ + flatDistribution_->fireInt(fileCatalogItems().size());
00590       unsigned int newSeqNumber = fileIter_ - fileIterBegin_;
00591       if(newSeqNumber != currentSeqNumber) {
00592         initFile(false);
00593         currentSeqNumber = newSeqNumber;
00594       }
00595       eventsRemainingInFile_ = rootFile_->eventTree().entries();
00596       if(eventsRemainingInFile_ == 0) {
00597         throw Exception(errors::NotFound) <<
00598            "RootInputFileSequence::readManyRandom_(): Secondary Input file " << fileIter_->fileName() << " contains no events.\n";
00599       }
00600       rootFile_->setAtEventEntry(flatDistribution_->fireInt(eventsRemainingInFile_));
00601     }
00602     fileSeqNumber = fileIter_ - fileIterBegin_;
00603     for(int i = 0; i < number; ++i) {
00604       boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(rootFile_->productRegistry(), processConfiguration()));
00605       EventPrincipal* ev = rootFile_->readCurrentEvent(*ep, rootFile_);
00606       if(ev == 0) {
00607         rewindFile();
00608         ev = rootFile_->readCurrentEvent(*ep, rootFile_);
00609         assert(ev != 0);
00610       }
00611       assert(ev == ep.get());
00612       result.push_back(ep);
00613       --eventsRemainingInFile_;
00614       rootFile_->nextEventEntry();
00615     }
00616   }
00617 
00618   void
00619   RootInputFileSequence::readManySequential(int number, EventPrincipalVector& result, unsigned int& fileSeqNumber) {
00620     if(0 != number && (fileIterEnd_ == fileIterBegin_) ) {
00621       throw Exception(errors::Configuration) << "RootInputFileSequence::readManySequential(): no input files specified.\n";
00622     }
00623     result.reserve(number);
00624     skipBadFiles_ = false;
00625     if (fileIter_ == fileIterEnd_ || !rootFile_) {
00626       fileIter_ = fileIterBegin_;
00627       initFile(false);
00628       rootFile_->setAtEventEntry(0);
00629     }
00630     fileSeqNumber = fileIter_ - fileIterBegin_;
00631     unsigned int numberRead = 0;
00632     for(int i = 0; i < number; ++i) {
00633       boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(rootFile_->productRegistry(), processConfiguration()));
00634       EventPrincipal* ev = rootFile_->readCurrentEvent(*ep, rootFile_);
00635       if(ev == 0) {
00636         if (numberRead == 0) {
00637           ++fileIter_;
00638           fileSeqNumber = fileIter_ - fileIterBegin_;
00639           if (fileIter_ == fileIterEnd_) {
00640             return;
00641           }
00642           initFile(false);
00643           rootFile_->setAtEventEntry(0);
00644           return readManySequential(number, result, fileSeqNumber);
00645         }
00646         return;
00647       }
00648       assert(ev == ep.get());
00649       result.push_back(ep);
00650       ++numberRead;
00651       rootFile_->nextEventEntry();
00652     }
00653   }
00654 
00655   void
00656   RootInputFileSequence::readManySpecified(std::vector<EventID> const& events, EventPrincipalVector& result) {
00657     skipBadFiles_ = false;
00658     result.reserve(events.size());
00659     for (std::vector<EventID>::const_iterator it = events.begin(), itEnd = events.end(); it != itEnd; ++it) {
00660       bool found = skipToItem(it->run(), it->luminosityBlock(), it->event());
00661       if (!found) {
00662         throw Exception(errors::NotFound) <<
00663            "RootInputFileSequence::readManySpecified_(): Secondary Input file " <<
00664            fileIter_->fileName() <<
00665            " does not contain specified event:\n" << *it << "\n";
00666       }
00667       boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(rootFile_->productRegistry(), processConfiguration()));
00668       EventPrincipal* ev = rootFile_->readCurrentEvent(*ep, rootFile_);
00669       if (ev == 0) {
00670         throw Exception(errors::EventCorruption) <<
00671            "RootInputFileSequence::readManySpecified_(): Secondary Input file " <<
00672            fileIter_->fileName() <<
00673            " contains specified event " << *it << " that cannot be read.\n";
00674       }
00675       assert(ev == ep.get());
00676       result.push_back(ep);
00677     }
00678   }
00679 
00680   void
00681   RootInputFileSequence::fillDescription(ParameterSetDescription & desc) {
00682     desc.addUntracked<unsigned int>("skipEvents", 0U)
00683         ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
00684     desc.addUntracked<bool>("noEventSort", true)
00685         ->setComment("True:  Process runs, lumis and events in the order they appear in the file (but see notes 1 and 2).\n"
00686                      "False: Process runs, lumis and events in each file in numerical order (run#, lumi#, event#) (but see note 3).\n"
00687                      "Note 1: Events within the same lumi will always be processed contiguously.\n"
00688                      "Note 2: Lumis within the same run will always be processed contiguously.\n"
00689                      "Note 3: Any sorting occurs independently in each input file (no sorting across input files).");
00690     desc.addUntracked<bool>("skipBadFiles", false)
00691         ->setComment("True:  Ignore any missing or unopenable input file.\n"
00692                      "False: Throw exception if missing or unopenable input file.");
00693     desc.addUntracked<unsigned int>("cacheSize", roottree::defaultCacheSize)
00694         ->setComment("Size of ROOT TTree prefetch cache.  Affects performance.");
00695     desc.addUntracked<int>("treeMaxVirtualSize", -1)
00696         ->setComment("Size of ROOT TTree TBasket cache.  Affects performance.");
00697     desc.addUntracked<unsigned int>("setRunNumber", 0U)
00698         ->setComment("If non-zero, change number of first run to this number. Apply same offset to all runs.  Allowed only for simulation.");
00699     desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
00700         ->setComment("If True, also drop on input any descendent of any branch dropped on input.");
00701     std::string defaultString("permissive");
00702     desc.addUntracked<std::string>("parametersMustMatch", defaultString)
00703         ->setComment("'strict':     Values of tracked parameters must be unique across all input files.\n"
00704                      "'permissive': Values of tracked parameters may differ across or within files.");
00705     desc.addUntracked<std::string>("branchesMustMatch", defaultString)
00706         ->setComment("'strict':     Branches in each input file must match those in the first file.\n"
00707                      "'permissive': Branches in each input file may be any subset of those in the first file.");
00708 
00709     GroupSelectorRules::fillDescription(desc, "inputCommands");
00710     EventSkipperByID::fillDescription(desc);
00711     DuplicateChecker::fillDescription(desc);
00712   }
00713 
00714   ProcessingController::ForwardState
00715   RootInputFileSequence::forwardState() const {
00716     if (rootFile_) {
00717       if (!rootFile_->wasLastEventJustRead()) {
00718         return ProcessingController::kEventsAheadInFile;
00719       }
00720       std::vector<FileCatalogItem>::const_iterator itr(fileIter_);
00721       if (itr != fileIterEnd_) ++itr;
00722       if (itr != fileIterEnd_) {
00723         return ProcessingController::kNextFileExists;
00724       }
00725       return ProcessingController::kAtLastEvent;
00726     }
00727     return ProcessingController::kUnknownForward;
00728   }
00729 
00730   ProcessingController::ReverseState
00731   RootInputFileSequence::reverseState() const {
00732     if (rootFile_) {
00733       if (!rootFile_->wasFirstEventJustRead()) {
00734         return ProcessingController::kEventsBackwardsInFile;
00735       }
00736       if (fileIter_ != fileIterBegin_) {
00737         return ProcessingController::kPreviousFileExists;
00738       }
00739       return ProcessingController::kAtFirstEvent;
00740     }
00741     return ProcessingController::kUnknownReverse;
00742   }
00743 }