CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_2_9_HLT1_bphpatch4/src/IOPool/Input/src/RootInputFileSequence.cc

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------
00002 ----------------------------------------------------------------------*/
00003 #include "RootInputFileSequence.h"
00004 #include "PoolSource.h"
00005 #include "RootFile.h"
00006 #include "RootTree.h"
00007 #include "DuplicateChecker.h"
00008 
00009 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
00010 #include "FWCore/Framework/interface/EventPrincipal.h"
00011 #include "FWCore/Framework/interface/FileBlock.h"
00012 #include "FWCore/Framework/src/PrincipalCache.h"
00013 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00014 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00015 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00016 #include "FWCore/ServiceRegistry/interface/Service.h"
00017 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
00018 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00019 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00020 
00021 #include "CLHEP/Random/RandFlat.h"
00022 #include "InputFile.h"
00023 #include "TSystem.h"
00024 
00025 namespace edm {
00026   namespace {
00027     std::string const streamerInfo = std::string("StreamerInfo");
00028   }
00029   RootInputFileSequence::RootInputFileSequence(
00030                 ParameterSet const& pset,
00031                 PoolSource const& input,
00032                 InputFileCatalog const& catalog,
00033                 PrincipalCache& cache,
00034                 bool primaryFiles) :
00035     input_(input),
00036     catalog_(catalog),
00037     firstFile_(true),
00038     fileIterBegin_(fileCatalogItems().begin()),
00039     fileIterEnd_(fileCatalogItems().end()),
00040     fileIter_(fileIterEnd_),
00041     fileIterLastOpened_(fileIterEnd_),
00042     rootFile_(),
00043     parametersMustMatch_(BranchDescription::Permissive),
00044     branchesMustMatch_(BranchDescription::Permissive),
00045     flatDistribution_(),
00046     indexesIntoFiles_(fileCatalogItems().size()),
00047     orderedProcessHistoryIDs_(),
00048     eventSkipperByID_(primaryFiles ? EventSkipperByID::create(pset).release() : 0),
00049     eventsRemainingInFile_(0),
00050     // The default value provided as the second argument to the getUntrackedParameter function call
00051     // is not used when the ParameterSet has been validated and the parameters are not optional
00052     // in the description.  This is currently true when PoolSource is the primary input source.
00053     // The modules that use PoolSource as a SecSource have not defined their fillDescriptions function
00054     // yet, so the ParameterSet does not get validated yet.  As soon as all the modules with a SecSource
00055     // have defined descriptions, the defaults in the getUntrackedParameterSet function calls can
00056     // and should be deleted from the code.
00057     numberOfEventsToSkip_(primaryFiles ? pset.getUntrackedParameter<unsigned int>("skipEvents", 0U) : 0U),
00058     noEventSort_(primaryFiles ? pset.getUntrackedParameter<bool>("noEventSort", true) : false),
00059     skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles", false)),
00060     treeCacheSize_(noEventSort_ ? pset.getUntrackedParameter<unsigned int>("cacheSize", roottree::defaultCacheSize) : 0U),
00061     treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize", -1)),
00062     setRun_(pset.getUntrackedParameter<unsigned int>("setRunNumber", 0U)),
00063     groupSelectorRules_(pset, "inputCommands", "InputSource"),
00064     primaryFiles_(primaryFiles),
00065     duplicateChecker_(primaryFiles ? new DuplicateChecker(pset) : 0),
00066     dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches", primary())),
00067     usingGoToEvent_(false) {
00068 
00069     //we now allow the site local config to specify what the TTree cache size should be
00070     Service<SiteLocalConfig> pSLC;
00071     if(treeCacheSize_ != 0U && pSLC.isAvailable() && pSLC->sourceTTreeCacheSize()) {
00072       treeCacheSize_ = *(pSLC->sourceTTreeCacheSize());
00073     }
00074     
00075     if(primaryFiles_) {
00076       //NOTE: we do not want to stage in secondary files since we can be given a list of
00077       // thousands of files and prestaging all those files can cause a site to fail
00078       StorageFactory *factory = StorageFactory::get();
00079       for(fileIter_ = fileIterBegin_; fileIter_ != fileIterEnd_; ++fileIter_) {
00080         factory->activateTimeout(fileIter_->fileName());
00081         factory->stagein(fileIter_->fileName());
00082       }
00083     }
00084 
00085     std::string parametersMustMatch = pset.getUntrackedParameter<std::string>("parametersMustMatch", std::string("permissive"));
00086     if(parametersMustMatch == std::string("strict")) parametersMustMatch_ = BranchDescription::Strict;
00087 
00088     std::string branchesMustMatch = pset.getUntrackedParameter<std::string>("branchesMustMatch", std::string("permissive"));
00089     if(branchesMustMatch == std::string("strict")) branchesMustMatch_ = BranchDescription::Strict;
00090 
00091     if(primary()) {
00092       for(fileIter_ = fileIterBegin_; fileIter_ != fileIterEnd_; ++fileIter_) {
00093         initFile(skipBadFiles_);
00094         if(rootFile_) break;
00095       }
00096       if(rootFile_) {
00097         productRegistryUpdate().updateFromInput(rootFile_->productRegistry()->productList());
00098         if(numberOfEventsToSkip_ != 0) {
00099           skipEvents(numberOfEventsToSkip_, cache);
00100         }
00101       }
00102     }
00103   }
00104 
00105   std::vector<FileCatalogItem> const&
00106   RootInputFileSequence::fileCatalogItems() const {
00107     return catalog_.fileCatalogItems();
00108   }
00109 
00110   void
00111   RootInputFileSequence::endJob() {
00112     closeFile_();
00113   }
00114 
00115   boost::shared_ptr<FileBlock>
00116   RootInputFileSequence::readFile_(PrincipalCache& cache) {
00117     if(firstFile_) {
00118       // The first input file has already been opened.
00119       firstFile_ = false;
00120       if(!rootFile_) {
00121         initFile(skipBadFiles_);
00122       }
00123     } else {
00124       if(!nextFile(cache)) {
00125         assert(0);
00126       }
00127     }
00128     if(!rootFile_) {
00129       return boost::shared_ptr<FileBlock>(new FileBlock);
00130     }
00131     return rootFile_->createFileBlock();
00132   }
00133 
00134   void RootInputFileSequence::closeFile_() {
00135     // close the currently open file, if any, and delete the RootFile object.
00136     if(rootFile_) {
00137       if (primary()) {
00138         std::auto_ptr<InputSource::FileCloseSentry>
00139         sentry((primaryFiles_) ? new InputSource::FileCloseSentry(input_) : 0);
00140         rootFile_->close();
00141         if(duplicateChecker_) duplicateChecker_->inputFileClosed();
00142       }
00143       rootFile_.reset();
00144     }
00145   }
00146 
00147   void RootInputFileSequence::initFile(bool skipBadFiles) {
00148     // We are really going to close the open file.
00149 
00150     // If this is the primary sequence, we are not duplicate checking across files
00151     // and we are not using random access to find events, then we can delete the
00152     // IndexIntoFile for the file we are closing. If we can't delete all of it,
00153     // then we can delete the parts we do not need.
00154     if (fileIterLastOpened_ != fileIterEnd_) {
00155       size_t currentIndexIntoFile = fileIterLastOpened_ - fileIterBegin_;
00156       bool needIndexesForDuplicateChecker = duplicateChecker_ && duplicateChecker_->checkingAllFiles() && !duplicateChecker_->checkDisabled();
00157       bool deleteIndexIntoFile = primaryFiles_ &&
00158                                  !needIndexesForDuplicateChecker &&
00159                                  !usingGoToEvent_;
00160       if (deleteIndexIntoFile) {
00161               indexesIntoFiles_[currentIndexIntoFile].reset();
00162       } else {
00163               if (indexesIntoFiles_[currentIndexIntoFile]) indexesIntoFiles_[currentIndexIntoFile]->inputFileClosed();
00164       }
00165       fileIterLastOpened_ = fileIterEnd_;
00166     }
00167     closeFile_();
00168 
00169     // Check if the logical file name was found.
00170     if(fileIter_->fileName().empty()) {
00171       // LFN not found in catalog.
00172       InputFile::reportSkippedFile(fileIter_->fileName(), fileIter_->logicalFileName());
00173       if(!skipBadFiles) {
00174         throw cms::Exception("LogicalFileNameNotFound", "RootInputFileSequence::initFile()\n")
00175           << "Logical file name '" << fileIter_->logicalFileName() << "' was not found in the file catalog.\n"
00176           << "If you wanted a local file, you forgot the 'file:' prefix\n"
00177           << "before the file name in your configuration file.\n";
00178       }
00179       LogWarning("") << "Input logical file: " << fileIter_->logicalFileName() << " was not found in the catalog, and will be skipped.\n";
00180       return;
00181     }
00182 
00183     // Determine whether we have a fallback URL specified; if so, prepare it;
00184     // Only valid if it is non-empty and differs from the original filename.
00185     std::string fallbackName = fileIter_->fallbackFileName();
00186     bool hasFallbackUrl = (!fallbackName.empty()) || (fallbackName == fileIter_->fileName());
00187 
00188     boost::shared_ptr<InputFile> filePtr;
00189     try {
00190       std::auto_ptr<InputSource::FileOpenSentry>
00191         sentry(primaryFiles_ ? new InputSource::FileOpenSentry(input_) : 0);
00192       filePtr.reset(new InputFile(gSystem->ExpandPathName(fileIter_->fileName().c_str()), "  Initiating request to open file "));
00193     }
00194     catch (cms::Exception const& e) {
00195       if(!skipBadFiles  && !hasFallbackUrl) {
00196         InputFile::reportSkippedFile(fileIter_->fileName(), fileIter_->logicalFileName());
00197         if(e.explainSelf().find(streamerInfo) != std::string::npos) {
00198           throw Exception(errors::FileReadError) << e.explainSelf() << "\n" <<
00199             "RootInputFileSequence::initFile(): Input file " << fileIter_->fileName() << " could not be read properly.\n" <<
00200             "Possibly the format is incompatible with the current release.\n";
00201         }
00202         throw Exception(errors::FileOpenError) << e.explainSelf() << "\n" <<
00203            "RootInputFileSequence::initFile(): Input file " << fileIter_->fileName() << " was not found, could not be opened, or is corrupted.\n";
00204       }
00205     }
00206     if(!filePtr && (hasFallbackUrl)) {
00207       try {
00208         std::auto_ptr<InputSource::FileOpenSentry>
00209           sentry(primaryFiles_ ? new InputSource::FileOpenSentry(input_) : 0);
00210         filePtr.reset(new InputFile(gSystem->ExpandPathName(fallbackName.c_str()), "  Fallback request to file "));
00211       }
00212       catch (cms::Exception const& e) {
00213         if(!skipBadFiles) {
00214           InputFile::reportSkippedFile(fileIter_->fileName(), fileIter_->logicalFileName());
00215           if(e.explainSelf().find(streamerInfo) != std::string::npos) {
00216             throw Exception(errors::FileReadError) << e.explainSelf() << "\n" <<
00217               "RootInputFileSequence::initFile(): Input file " << fileIter_->fileName() << " could not be read properly.\n" <<
00218               "Possibly the format is incompatible with the current release.\n";
00219           }
00220           throw Exception(errors::FileOpenError) << e.explainSelf() << "\n" <<
00221              "RootInputFileSequence::initFile(): Input fallback file " << fallbackName << " was not found, could not be opened, or is corrupted.\n";
00222         }
00223       }
00224     }
00225     if(filePtr) {
00226       std::vector<boost::shared_ptr<IndexIntoFile> >::size_type currentIndexIntoFile = fileIter_ - fileIterBegin_;
00227       rootFile_ = RootFileSharedPtr(new RootFile(fileIter_->fileName(),
00228           processConfiguration(), fileIter_->logicalFileName(), filePtr,
00229           eventSkipperByID_, numberOfEventsToSkip_ != 0,
00230           remainingEvents(), remainingLuminosityBlocks(), treeCacheSize_, treeMaxVirtualSize_,
00231           input_.processingMode(),
00232           setRun_,
00233           noEventSort_,
00234           groupSelectorRules_, !primaryFiles_, duplicateChecker_, dropDescendants_,
00235                                                  indexesIntoFiles_, currentIndexIntoFile, orderedProcessHistoryIDs_, usingGoToEvent_));
00236 
00237       fileIterLastOpened_ = fileIter_;
00238       indexesIntoFiles_[currentIndexIntoFile] = rootFile_->indexIntoFileSharedPtr();
00239       rootFile_->reportOpened(primary() ?
00240          (primaryFiles_ ? "primaryFiles" : "secondaryFiles") : "mixingFiles");
00241     } else {
00242       InputFile::reportSkippedFile(fileIter_->fileName(), fileIter_->logicalFileName());
00243       if(!skipBadFiles) {
00244         throw Exception(errors::FileOpenError) <<
00245            "RootInputFileSequence::initFile(): Input file " << fileIter_->fileName() << " was not found or could not be opened.\n";
00246       }
00247       LogWarning("") << "Input file: " << fileIter_->fileName() << " was not found or could not be opened, and will be skipped.\n";
00248     }
00249   }
00250 
00251   boost::shared_ptr<ProductRegistry const>
00252   RootInputFileSequence::fileProductRegistry() const {
00253     return rootFile_->productRegistry();
00254   }
00255 
00256   bool RootInputFileSequence::nextFile(PrincipalCache& cache) {
00257     if(fileIter_ != fileIterEnd_) ++fileIter_;
00258     if(fileIter_ == fileIterEnd_) {
00259       if(primaryFiles_) {
00260         return false;
00261       } else {
00262         fileIter_ = fileIterBegin_;
00263       }
00264     }
00265 
00266     initFile(skipBadFiles_);
00267 
00268     if(primaryFiles_ && rootFile_) {
00269       size_t size = productRegistry()->size();
00270       // make sure the new product registry is compatible with the main one
00271       std::string mergeInfo = productRegistryUpdate().merge(*rootFile_->productRegistry(),
00272                                                             fileIter_->fileName(),
00273                                                             parametersMustMatch_,
00274                                                             branchesMustMatch_);
00275       if(!mergeInfo.empty()) {
00276         throw Exception(errors::MismatchedInputFiles,"RootInputFileSequence::nextFile()") << mergeInfo;
00277       }
00278       if (productRegistry()->size() > size) {
00279         cache.adjustIndexesAfterProductRegistryAddition();
00280       }
00281       cache.adjustEventToNewProductRegistry(productRegistry());
00282     }
00283     return true;
00284   }
00285 
00286   bool RootInputFileSequence::previousFile(PrincipalCache& cache) {
00287     if(fileIter_ == fileIterBegin_) {
00288       if(primaryFiles_) {
00289         return false;
00290       } else {
00291         fileIter_ = fileIterEnd_;
00292       }
00293     }
00294     --fileIter_;
00295 
00296     initFile(false);
00297 
00298     if(primaryFiles_ && rootFile_) {
00299       size_t size = productRegistry()->size();
00300       // make sure the new product registry is compatible to the main one
00301       std::string mergeInfo = productRegistryUpdate().merge(*rootFile_->productRegistry(),
00302                                                             fileIter_->fileName(),
00303                                                             parametersMustMatch_,
00304                                                             branchesMustMatch_);
00305       if(!mergeInfo.empty()) {
00306         throw Exception(errors::MismatchedInputFiles,"RootInputFileSequence::previousEvent()") << mergeInfo;
00307       }
00308       if (productRegistry()->size() > size) {
00309         cache.adjustIndexesAfterProductRegistryAddition();
00310       }
00311       cache.adjustEventToNewProductRegistry(productRegistry());
00312     }
00313     if(rootFile_) rootFile_->setToLastEntry();
00314     return true;
00315   }
00316 
00317   RootInputFileSequence::~RootInputFileSequence() {
00318   }
00319 
00320   boost::shared_ptr<RunAuxiliary>
00321   RootInputFileSequence::readRunAuxiliary_() {
00322     boost::shared_ptr<RunAuxiliary> aux = rootFile_->readRunAuxiliary_();
00323     return aux;
00324   }
00325 
00326   boost::shared_ptr<LuminosityBlockAuxiliary>
00327   RootInputFileSequence::readLuminosityBlockAuxiliary_() {
00328     boost::shared_ptr<LuminosityBlockAuxiliary> aux = rootFile_->readLuminosityBlockAuxiliary_();
00329     return aux;
00330   }
00331 
00332   boost::shared_ptr<RunPrincipal>
00333   RootInputFileSequence::readRun_(boost::shared_ptr<RunPrincipal> rpCache) {
00334     return rootFile_->readRun_(rpCache);
00335   }
00336 
00337   boost::shared_ptr<LuminosityBlockPrincipal>
00338   RootInputFileSequence::readLuminosityBlock_(boost::shared_ptr<LuminosityBlockPrincipal> lbCache) {
00339     return rootFile_->readLumi(lbCache);
00340   }
00341 
00342   // readEvent() is responsible for setting up the EventPrincipal.
00343   //
00344   //   1. create an EventPrincipal with a unique EventID
00345   //   2. For each entry in the provenance, put in one Group,
00346   //      holding the Provenance for the corresponding EDProduct.
00347   //   3. set up the caches in the EventPrincipal to know about this
00348   //      Group.
00349   //
00350   // We do *not* create the EDProduct instance (the equivalent of reading
00351   // the branch containing this EDProduct. That will be done by the Delayed Reader,
00352   //  when it is asked to do so.
00353   //
00354 
00355   EventPrincipal*
00356   RootInputFileSequence::readEvent(EventPrincipal& cache, boost::shared_ptr<LuminosityBlockPrincipal> lb) {
00357     return rootFile_->readEvent(cache, rootFile_, lb);
00358   }
00359 
00360   InputSource::ItemType
00361   RootInputFileSequence::getNextItemType() {
00362     if(fileIter_ == fileIterEnd_) {
00363       return InputSource::IsStop;
00364     }
00365     if(firstFile_) {
00366       return InputSource::IsFile;
00367     }
00368     if(rootFile_) {
00369       IndexIntoFile::EntryType entryType = rootFile_->getNextEntryTypeWanted();
00370       if(entryType == IndexIntoFile::kEvent) {
00371         return InputSource::IsEvent;
00372       } else if(entryType == IndexIntoFile::kLumi) {
00373         return InputSource::IsLumi;
00374       } else if(entryType == IndexIntoFile::kRun) {
00375         return InputSource::IsRun;
00376       }
00377       assert(entryType == IndexIntoFile::kEnd);
00378     }
00379     if(fileIter_ + 1 == fileIterEnd_) {
00380       return InputSource::IsStop;
00381     }
00382     return InputSource::IsFile;
00383   }
00384 
00385   // Rewind to before the first event that was read.
00386   void
00387   RootInputFileSequence::rewind_() {
00388     if (fileIter_ != fileIterBegin_) {
00389       closeFile_();
00390       fileIter_ = fileIterBegin_;
00391     }
00392     if (!rootFile_) {
00393       initFile(false);
00394     }
00395     rewindFile();
00396     firstFile_ = true;
00397   }
00398 
00399   // Rewind to the beginning of the current file
00400   void
00401   RootInputFileSequence::rewindFile() {
00402     rootFile_->rewind();
00403   }
00404 
00405   void
00406   RootInputFileSequence::reset(PrincipalCache& cache) {
00407     //NOTE: Need to handle duplicate checker
00408     // Also what if skipBadFiles_==true and the first time we succeeded but after a reset we fail?
00409     if(primary()) {
00410       firstFile_ = true;
00411       for(fileIter_ = fileIterBegin_; fileIter_ != fileIterEnd_; ++fileIter_) {
00412         initFile(skipBadFiles_);
00413         if(rootFile_) break;
00414       }
00415       if(rootFile_) {
00416         if(numberOfEventsToSkip_ != 0) {
00417           skipEvents(numberOfEventsToSkip_, cache);
00418         }
00419       }
00420     }
00421   }
00422 
00423   // Advance "offset" events.  Offset can be positive or negative (or zero).
00424   bool
00425   RootInputFileSequence::skipEvents(int offset, PrincipalCache& cache) {
00426     assert (numberOfEventsToSkip_ == 0 || numberOfEventsToSkip_ == offset);
00427     numberOfEventsToSkip_ = offset;
00428     while(numberOfEventsToSkip_ != 0) {
00429       bool atEnd = rootFile_->skipEvents(numberOfEventsToSkip_);
00430       if((numberOfEventsToSkip_ > 0 || atEnd) && !nextFile(cache)) {
00431         numberOfEventsToSkip_ = 0;
00432         return false;
00433       }
00434       if(numberOfEventsToSkip_ < 0 && !previousFile(cache)) {
00435         numberOfEventsToSkip_ = 0;
00436         fileIter_ = fileIterEnd_;
00437         return false;
00438       }
00439     }
00440     return true;
00441   }
00442 
00443   bool
00444   RootInputFileSequence::goToEvent(EventID const& eventID, PrincipalCache& cache) {
00445     usingGoToEvent_ = true;
00446     if (rootFile_) {
00447       if (rootFile_->goToEvent(eventID)) {
00448         return true;
00449       }
00450       // If only one input file, give up now, to save time.
00451       if(rootFile_ && indexesIntoFiles_.size() == 1) {
00452         return false;
00453       }
00454       // Save the current file and position so that we can restore them
00455       // if we fail to restore the desired event
00456       bool closedOriginalFile = false;
00457       std::vector<FileCatalogItem>::const_iterator originalFile = fileIter_;
00458       IndexIntoFile::IndexIntoFileItr originalPosition = rootFile_->indexIntoFileIter();
00459 
00460       // Look for item (run/lumi/event) in files previously opened without reopening unnecessary files.
00461       typedef std::vector<boost::shared_ptr<IndexIntoFile> >::const_iterator Iter;
00462       for(Iter it = indexesIntoFiles_.begin(), itEnd = indexesIntoFiles_.end(); it != itEnd; ++it) {
00463         if(*it && (*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
00464           // We found it. Close the currently open file, and open the correct one.
00465           fileIter_ = fileIterBegin_ + (it - indexesIntoFiles_.begin());
00466           initFile(false);
00467           // Now get the item from the correct file.
00468           bool found = rootFile_->goToEvent(eventID);
00469           assert (found);
00470           return true;
00471         }
00472       }
00473       // Look for item in files not yet opened.
00474       for(Iter it = indexesIntoFiles_.begin(), itEnd = indexesIntoFiles_.end(); it != itEnd; ++it) {
00475         if(!*it) {
00476           fileIter_ = fileIterBegin_ + (it - indexesIntoFiles_.begin());
00477           initFile(false);
00478           closedOriginalFile = true;
00479           if ((*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
00480             if  (rootFile_->goToEvent(eventID)) {
00481               return true;
00482             }
00483           }
00484         }
00485       }
00486       if (closedOriginalFile) {
00487         fileIter_ = originalFile;
00488         initFile(false);
00489         rootFile_->setPosition(originalPosition);
00490       }
00491     }
00492     return false;
00493   }
00494 
00495   bool
00496   RootInputFileSequence::skipToItem(RunNumber_t run, LuminosityBlockNumber_t lumi, EventNumber_t event) {
00497     // Attempt to find item in currently open input file.
00498     bool found = rootFile_ && rootFile_->setEntryAtItem(run, lumi, event);
00499     if(!found) {
00500       // If only one input file, give up now, to save time.
00501       if(rootFile_ && indexesIntoFiles_.size() == 1) {
00502         return false;
00503       }
00504       // Look for item (run/lumi/event) in files previously opened without reopening unnecessary files.
00505       typedef std::vector<boost::shared_ptr<IndexIntoFile> >::const_iterator Iter;
00506       for(Iter it = indexesIntoFiles_.begin(), itEnd = indexesIntoFiles_.end(); it != itEnd; ++it) {
00507         if(*it && (*it)->containsItem(run, lumi, event)) {
00508           // We found it. Close the currently open file, and open the correct one.
00509           fileIter_ = fileIterBegin_ + (it - indexesIntoFiles_.begin());
00510           initFile(false);
00511           // Now get the item from the correct file.
00512           found = rootFile_->setEntryAtItem(run, lumi, event);
00513           assert (found);
00514           return true;
00515         }
00516       }
00517       // Look for item in files not yet opened.
00518       for(Iter it = indexesIntoFiles_.begin(), itEnd = indexesIntoFiles_.end(); it != itEnd; ++it) {
00519         if(!*it) {
00520           fileIter_ = fileIterBegin_ + (it - indexesIntoFiles_.begin());
00521           initFile(false);
00522           found = rootFile_->setEntryAtItem(run, lumi, event);
00523           if(found) {
00524             return true;
00525           }
00526         }
00527       }
00528       // Not found
00529       return false;
00530     }
00531     return true;
00532   }
00533 
00534   bool const
00535   RootInputFileSequence::primary() const {
00536     return input_.primary();
00537   }
00538 
00539   ProcessConfiguration const&
00540   RootInputFileSequence::processConfiguration() const {
00541     return input_.processConfiguration();
00542   }
00543 
00544   int
00545   RootInputFileSequence::remainingEvents() const {
00546     return input_.remainingEvents();
00547   }
00548 
00549   int
00550   RootInputFileSequence::remainingLuminosityBlocks() const {
00551     return input_.remainingLuminosityBlocks();
00552   }
00553 
00554   ProductRegistry &
00555   RootInputFileSequence::productRegistryUpdate() const{
00556     return input_.productRegistryUpdate();
00557   }
00558 
00559   boost::shared_ptr<ProductRegistry const>
00560   RootInputFileSequence::productRegistry() const{
00561     return input_.productRegistry();
00562   }
00563 
00564   void
00565   RootInputFileSequence::dropUnwantedBranches_(std::vector<std::string> const& wantedBranches) {
00566     std::vector<std::string> rules;
00567     rules.reserve(wantedBranches.size() + 1);
00568     rules.push_back(std::string("drop *"));
00569     for(std::vector<std::string>::const_iterator it = wantedBranches.begin(), itEnd = wantedBranches.end();
00570         it != itEnd; ++it) {
00571       rules.push_back("keep " + *it + "_*");
00572     }
00573     ParameterSet pset;
00574     pset.addUntrackedParameter("inputCommands", rules);
00575     groupSelectorRules_ = GroupSelectorRules(pset, "inputCommands", "InputSource");
00576   }
00577 
00578   void
00579   RootInputFileSequence::readMany(int number, EventPrincipalVector& result) {
00580     for(int i = 0; i < number; ++i) {
00581       boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(rootFile_->productRegistry(), processConfiguration()));
00582       EventPrincipal* ev = rootFile_->readCurrentEvent(*ep, rootFile_);
00583       if(ev == 0) {
00584         return;
00585       }
00586       assert(ev == ep.get());
00587       result.push_back(ep);
00588       rootFile_->nextEventEntry();
00589     }
00590   }
00591 
00592   void
00593   RootInputFileSequence::readManyRandom(int number, EventPrincipalVector& result, unsigned int& fileSeqNumber) {
00594     if(0 != number && (fileIterEnd_ == fileIterBegin_) ) {
00595       throw Exception(errors::Configuration) << "RootInputFileSequence::readManyRandom(): no input files specified.\n";
00596     }
00597     result.reserve(number);
00598     if (!flatDistribution_) {
00599       Service<RandomNumberGenerator> rng;
00600       CLHEP::HepRandomEngine& engine = rng->getEngine();
00601       flatDistribution_.reset(new CLHEP::RandFlat(engine));
00602     }
00603     skipBadFiles_ = false;
00604     unsigned int currentSeqNumber = fileIter_ - fileIterBegin_;
00605     while(eventsRemainingInFile_ < number) {
00606       fileIter_ = fileIterBegin_ + flatDistribution_->fireInt(fileCatalogItems().size());
00607       unsigned int newSeqNumber = fileIter_ - fileIterBegin_;
00608       if(newSeqNumber != currentSeqNumber) {
00609         initFile(false);
00610         currentSeqNumber = newSeqNumber;
00611       }
00612       eventsRemainingInFile_ = rootFile_->eventTree().entries();
00613       if(eventsRemainingInFile_ == 0) {
00614         throw Exception(errors::NotFound) <<
00615            "RootInputFileSequence::readManyRandom_(): Secondary Input file " << fileIter_->fileName() << " contains no events.\n";
00616       }
00617       rootFile_->setAtEventEntry(flatDistribution_->fireInt(eventsRemainingInFile_));
00618     }
00619     fileSeqNumber = fileIter_ - fileIterBegin_;
00620     for(int i = 0; i < number; ++i) {
00621       boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(rootFile_->productRegistry(), processConfiguration()));
00622       EventPrincipal* ev = rootFile_->readCurrentEvent(*ep, rootFile_);
00623       if(ev == 0) {
00624         rewindFile();
00625         ev = rootFile_->readCurrentEvent(*ep, rootFile_);
00626         assert(ev != 0);
00627       }
00628       assert(ev == ep.get());
00629       result.push_back(ep);
00630       --eventsRemainingInFile_;
00631       rootFile_->nextEventEntry();
00632     }
00633   }
00634 
00635   void
00636   RootInputFileSequence::readManySequential(int number, EventPrincipalVector& result, unsigned int& fileSeqNumber) {
00637     if(0 != number && (fileIterEnd_ == fileIterBegin_) ) {
00638       throw Exception(errors::Configuration) << "RootInputFileSequence::readManySequential(): no input files specified.\n";
00639     }
00640     result.reserve(number);
00641     skipBadFiles_ = false;
00642     if (fileIter_ == fileIterEnd_ || !rootFile_) {
00643       fileIter_ = fileIterBegin_;
00644       initFile(false);
00645       rootFile_->setAtEventEntry(0);
00646     }
00647     fileSeqNumber = fileIter_ - fileIterBegin_;
00648     unsigned int numberRead = 0;
00649     for(int i = 0; i < number; ++i) {
00650       boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(rootFile_->productRegistry(), processConfiguration()));
00651       EventPrincipal* ev = rootFile_->readCurrentEvent(*ep, rootFile_);
00652       if(ev == 0) {
00653         if (numberRead == 0) {
00654           ++fileIter_;
00655           fileSeqNumber = fileIter_ - fileIterBegin_;
00656           if (fileIter_ == fileIterEnd_) {
00657             return;
00658           }
00659           initFile(false);
00660           rootFile_->setAtEventEntry(0);
00661           return readManySequential(number, result, fileSeqNumber);
00662         }
00663         return;
00664       }
00665       assert(ev == ep.get());
00666       result.push_back(ep);
00667       ++numberRead;
00668       rootFile_->nextEventEntry();
00669     }
00670   }
00671 
00672   void
00673   RootInputFileSequence::readManySpecified(std::vector<EventID> const& events, EventPrincipalVector& result) {
00674     skipBadFiles_ = false;
00675     result.reserve(events.size());
00676     for (std::vector<EventID>::const_iterator it = events.begin(), itEnd = events.end(); it != itEnd; ++it) {
00677       bool found = skipToItem(it->run(), it->luminosityBlock(), it->event());
00678       if (!found) {
00679         throw Exception(errors::NotFound) <<
00680            "RootInputFileSequence::readManySpecified_(): Secondary Input file " <<
00681            fileIter_->fileName() <<
00682            " does not contain specified event:\n" << *it << "\n";
00683       }
00684       boost::shared_ptr<EventPrincipal> ep(new EventPrincipal(rootFile_->productRegistry(), processConfiguration()));
00685       EventPrincipal* ev = rootFile_->readCurrentEvent(*ep, rootFile_);
00686       if (ev == 0) {
00687         throw Exception(errors::EventCorruption) <<
00688            "RootInputFileSequence::readManySpecified_(): Secondary Input file " <<
00689            fileIter_->fileName() <<
00690            " contains specified event " << *it << " that cannot be read.\n";
00691       }
00692       assert(ev == ep.get());
00693       result.push_back(ep);
00694     }
00695   }
00696 
00697   void
00698   RootInputFileSequence::fillDescription(ParameterSetDescription & desc) {
00699     desc.addUntracked<unsigned int>("skipEvents", 0U)
00700         ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
00701     desc.addUntracked<bool>("noEventSort", true)
00702         ->setComment("True:  Process runs, lumis and events in the order they appear in the file (but see notes 1 and 2).\n"
00703                      "False: Process runs, lumis and events in each file in numerical order (run#, lumi#, event#) (but see note 3).\n"
00704                      "Note 1: Events within the same lumi will always be processed contiguously.\n"
00705                      "Note 2: Lumis within the same run will always be processed contiguously.\n"
00706                      "Note 3: Any sorting occurs independently in each input file (no sorting across input files).");
00707     desc.addUntracked<bool>("skipBadFiles", false)
00708         ->setComment("True:  Ignore any missing or unopenable input file.\n"
00709                      "False: Throw exception if missing or unopenable input file.");
00710     desc.addUntracked<unsigned int>("cacheSize", roottree::defaultCacheSize)
00711         ->setComment("Size of ROOT TTree prefetch cache.  Affects performance.");
00712     desc.addUntracked<int>("treeMaxVirtualSize", -1)
00713         ->setComment("Size of ROOT TTree TBasket cache.  Affects performance.");
00714     desc.addUntracked<unsigned int>("setRunNumber", 0U)
00715         ->setComment("If non-zero, change number of first run to this number. Apply same offset to all runs.  Allowed only for simulation.");
00716     desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
00717         ->setComment("If True, also drop on input any descendent of any branch dropped on input.");
00718     std::string defaultString("permissive");
00719     desc.addUntracked<std::string>("parametersMustMatch", defaultString)
00720         ->setComment("'strict':     Values of tracked parameters must be unique across all input files.\n"
00721                      "'permissive': Values of tracked parameters may differ across or within files.");
00722     desc.addUntracked<std::string>("branchesMustMatch", defaultString)
00723         ->setComment("'strict':     Branches in each input file must match those in the first file.\n"
00724                      "'permissive': Branches in each input file may be any subset of those in the first file.");
00725 
00726     GroupSelectorRules::fillDescription(desc, "inputCommands");
00727     EventSkipperByID::fillDescription(desc);
00728     DuplicateChecker::fillDescription(desc);
00729   }
00730 
00731   ProcessingController::ForwardState
00732   RootInputFileSequence::forwardState() const {
00733     if (rootFile_) {
00734       if (!rootFile_->wasLastEventJustRead()) {
00735         return ProcessingController::kEventsAheadInFile;
00736       }
00737       std::vector<FileCatalogItem>::const_iterator itr(fileIter_);
00738       if (itr != fileIterEnd_) ++itr;
00739       if (itr != fileIterEnd_) {
00740         return ProcessingController::kNextFileExists;
00741       }
00742       return ProcessingController::kAtLastEvent;
00743     }
00744     return ProcessingController::kUnknownForward;
00745   }
00746 
00747   ProcessingController::ReverseState
00748   RootInputFileSequence::reverseState() const {
00749     if (rootFile_) {
00750       if (!rootFile_->wasFirstEventJustRead()) {
00751         return ProcessingController::kEventsBackwardsInFile;
00752       }
00753       if (fileIter_ != fileIterBegin_) {
00754         return ProcessingController::kPreviousFileExists;
00755       }
00756       return ProcessingController::kAtFirstEvent;
00757     }
00758     return ProcessingController::kUnknownReverse;
00759   }
00760 }