CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
RootPrimaryFileSequence.cc
Go to the documentation of this file.
1 /*----------------------------------------------------------------------
2 ----------------------------------------------------------------------*/
3 #include "DuplicateChecker.h"
4 #include "InputFile.h"
5 #include "PoolSource.h"
6 #include "RootFile.h"
8 #include "RootTree.h"
9 
19 
20 namespace edm {
22  ParameterSet const& pset,
25  unsigned int nStreams) :
26  RootInputFileSequence(pset, catalog),
27  input_(input),
28  firstFile_(true),
29  branchesMustMatch_(BranchDescription::Permissive),
30  orderedProcessHistoryIDs_(),
31  nStreams_(nStreams),
32  eventSkipperByID_(EventSkipperByID::create(pset).release()),
33  // The default value provided as the second argument to the getUntrackedParameter function call
34  // is not used when the ParameterSet has been validated and the parameters are not optional
35  // in the description. This is currently true when PoolSource is the primary input source.
36  // The modules that use PoolSource as a SecSource have not defined their fillDescriptions function
37  // yet, so the ParameterSet does not get validated yet. As soon as all the modules with a SecSource
38  // have defined descriptions, the defaults in the getUntrackedParameterSet function calls can
39  // and should be deleted from the code.
40  initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents", 0U)),
41  noEventSort_(pset.getUntrackedParameter<bool>("noEventSort", true)),
42  skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles", false)),
43  bypassVersionCheck_(pset.getUntrackedParameter<bool>("bypassVersionCheck", false)),
44  treeCacheSize_(noEventSort_ ? pset.getUntrackedParameter<unsigned int>("cacheSize", roottree::defaultCacheSize) : 0U),
45  treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize", -1)),
46  setRun_(pset.getUntrackedParameter<unsigned int>("setRunNumber", 0U)),
47  productSelectorRules_(pset, "inputCommands", "InputSource"),
48  duplicateChecker_(new DuplicateChecker(pset)),
49  dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches", true)),
50  labelRawDataLikeMC_(pset.getUntrackedParameter<bool>("labelRawDataLikeMC", true)),
51  usingGoToEvent_(false),
52  enablePrefetching_(false) {
53 
54  // The SiteLocalConfig controls the TTreeCache size and the prefetching settings.
56  if(pSLC.isAvailable()) {
57  if(treeCacheSize_ != 0U && pSLC->sourceTTreeCacheSize()) {
58  treeCacheSize_ = *(pSLC->sourceTTreeCacheSize());
59  }
60  enablePrefetching_ = pSLC->enablePrefetching();
61  }
62 
63  std::string branchesMustMatch = pset.getUntrackedParameter<std::string>("branchesMustMatch", std::string("permissive"));
64  if(branchesMustMatch == std::string("strict")) branchesMustMatch_ = BranchDescription::Strict;
65 
67 
68  // Prestage the files
70  factory->activateTimeout(fileName());
71  factory->stagein(fileName());
72  }
73  // Open the first file.
76  if(rootFile()) break;
77  }
78  if(rootFile()) {
79  input_.productRegistryUpdate().updateFromInput(rootFile()->productRegistry()->productList());
82  }
83  }
84  }
85 
87  }
88 
89  void
91  closeFile_();
92  }
93 
94  std::unique_ptr<FileBlock>
96  if(firstFile_) {
97  // The first input file has already been opened.
98  firstFile_ = false;
99  if(!rootFile()) {
101  }
102  } else {
103  if(!nextFile()) {
104  assert(0);
105  }
106  }
107  if(!rootFile()) {
108  return std::unique_ptr<FileBlock>(new FileBlock);
109  }
110  return rootFile()->createFileBlock();
111  }
112 
113  void
115  // close the currently open file, if any, and delete the RootFile object.
116  if(rootFile()) {
117  std::unique_ptr<InputSource::FileCloseSentry>
119  rootFile()->close();
120  if(duplicateChecker_) duplicateChecker_->inputFileClosed();
121  rootFile().reset();
122  }
123  }
124 
125  void
127  // If we are not duplicate checking across files and we are not using random access to find events,
128  // then we can delete the IndexIntoFile for the file we are closing.
129  // If we can't delete all of it, then we can delete the parts we do not need.
130  bool deleteIndexIntoFile = !usingGoToEvent_ && !(duplicateChecker_ && duplicateChecker_->checkingAllFiles() && !duplicateChecker_->checkDisabled());
131  initTheFile(skipBadFiles, deleteIndexIntoFile, &input_, "primaryFiles", InputType::Primary);
132  }
133 
135  RootPrimaryFileSequence::makeRootFile(std::shared_ptr<InputFile> filePtr) {
136  size_t currentIndexIntoFile = sequenceNumberOfFile();
137  return std::make_shared<RootFile>(
138  fileName(),
140  logicalFileName(),
141  filePtr,
144  remainingEvents(),
146  nStreams_,
150  setRun_,
151  noEventSort_,
156  std::vector<BranchID>(), // associationsFromSecondary_
161  currentIndexIntoFile,
167  }
168 
170  if(!noMoreFiles()) setAtNextFile();
171  if(noMoreFiles()) {
172  return false;
173  }
174 
176 
177  if(rootFile()) {
178  // make sure the new product registry is compatible with the main one
179  std::string mergeInfo = input_.productRegistryUpdate().merge(*rootFile()->productRegistry(),
180  fileName(),
182  if(!mergeInfo.empty()) {
183  throw Exception(errors::MismatchedInputFiles,"RootPrimaryFileSequence::nextFile()") << mergeInfo;
184  }
185  }
186  return true;
187  }
188 
190  if(atFirstFile()) {
191  return false;
192  }
194 
195  initFile(false);
196 
197  if(rootFile()) {
198  // make sure the new product registry is compatible to the main one
199  std::string mergeInfo = input_.productRegistryUpdate().merge(*rootFile()->productRegistry(),
200  fileName(),
202  if(!mergeInfo.empty()) {
203  throw Exception(errors::MismatchedInputFiles,"RootPrimaryFileSequence::previousEvent()") << mergeInfo;
204  }
205  }
206  if(rootFile()) rootFile()->setToLastEntry();
207  return true;
208  }
209 
212  if(noMoreFiles()) {
213  return InputSource::IsStop;
214  }
215  if(firstFile_) {
216  return InputSource::IsFile;
217  }
218  if(rootFile()) {
219  IndexIntoFile::EntryType entryType = rootFile()->getNextItemType(run, lumi, event);
220  if(entryType == IndexIntoFile::kEvent) {
221  return InputSource::IsEvent;
222  } else if(entryType == IndexIntoFile::kLumi) {
223  return InputSource::IsLumi;
224  } else if(entryType == IndexIntoFile::kRun) {
225  return InputSource::IsRun;
226  }
227  assert(entryType == IndexIntoFile::kEnd);
228  }
229  if(atLastFile()) {
230  return InputSource::IsStop;
231  }
232  return InputSource::IsFile;
233  }
234 
235  // Rewind to before the first event that was read.
236  void
238  if(!atFirstFile()) {
239  closeFile_();
240  setAtFirstFile();
241  }
242  if(!rootFile()) {
243  initFile(false);
244  }
245  rewindFile();
246  firstFile_ = true;
247  if(rootFile()) {
250  }
251  }
252  }
253 
254  // Rewind to the beginning of the current file
255  void
257  if(rootFile()) rootFile()->rewind();
258  }
259 
260  // Advance "offset" events. Offset can be positive or negative (or zero).
261  bool
263  assert(rootFile());
264  while(offset != 0) {
265  bool atEnd = rootFile()->skipEvents(offset);
266  if((offset > 0 || atEnd) && !nextFile()) {
267  return false;
268  }
269  if(offset < 0 && !previousFile()) {
270  setNoMoreFiles();
271  return false;
272  }
273  }
274  return true;
275  }
276 
277  bool
279  usingGoToEvent_ = true;
280  if(rootFile()) {
281  if(rootFile()->goToEvent(eventID)) {
282  return true;
283  }
284  // If only one input file, give up now, to save time.
285  if(rootFile() && indexesIntoFiles().size() == 1) {
286  return false;
287  }
288  // Save the current file and position so that we can restore them
289  // if we fail to restore the desired event
290  bool closedOriginalFile = false;
291  size_t const originalFileSequenceNumber = sequenceNumberOfFile();
292  IndexIntoFile::IndexIntoFileItr originalPosition = rootFile()->indexIntoFileIter();
293 
294  // Look for item (run/lumi/event) in files previously opened without reopening unnecessary files.
295  typedef std::vector<std::shared_ptr<IndexIntoFile> >::const_iterator Iter;
296  for(Iter it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
297  if(*it && (*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
298  // We found it. Close the currently open file, and open the correct one.
300  initFile(false);
301  // Now get the item from the correct file.
302  assert(rootFile());
303  bool found = rootFile()->goToEvent(eventID);
304  assert(found);
305  return true;
306  }
307  }
308  // Look for item in files not yet opened.
309  for(Iter it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
310  if(!*it) {
312  initFile(false);
313  closedOriginalFile = true;
314  if((*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
315  assert(rootFile());
316  if(rootFile()->goToEvent(eventID)) {
317  return true;
318  }
319  }
320  }
321  }
322  if(closedOriginalFile) {
323  setAtFileSequenceNumber(originalFileSequenceNumber);
324  initFile(false);
325  assert(rootFile());
326  rootFile()->setPosition(originalPosition);
327  }
328  }
329  return false;
330  }
331 
332  int
334  return input_.remainingEvents();
335  }
336 
337  int
340  }
341 
342  void
344  desc.addUntracked<unsigned int>("skipEvents", 0U)
345  ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
346  desc.addUntracked<bool>("noEventSort", true)
347  ->setComment("True: Process runs, lumis and events in the order they appear in the file (but see notes 1 and 2).\n"
348  "False: Process runs, lumis and events in each file in numerical order (run#, lumi#, event#) (but see note 3).\n"
349  "Note 1: Events within the same lumi will always be processed contiguously.\n"
350  "Note 2: Lumis within the same run will always be processed contiguously.\n"
351  "Note 3: Any sorting occurs independently in each input file (no sorting across input files).");
352  desc.addUntracked<bool>("skipBadFiles", false)
353  ->setComment("True: Ignore any missing or unopenable input file.\n"
354  "False: Throw exception if missing or unopenable input file.");
355  desc.addUntracked<bool>("bypassVersionCheck", false)
356  ->setComment("True: Bypass release version check.\n"
357  "False: Throw exception if reading file in a release prior to the release in which the file was written.");
358  desc.addUntracked<unsigned int>("cacheSize", roottree::defaultCacheSize)
359  ->setComment("Size of ROOT TTree prefetch cache. Affects performance.");
360  desc.addUntracked<int>("treeMaxVirtualSize", -1)
361  ->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
362  desc.addUntracked<unsigned int>("setRunNumber", 0U)
363  ->setComment("If non-zero, change number of first run to this number. Apply same offset to all runs. Allowed only for simulation.");
364  desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
365  ->setComment("If True, also drop on input any descendent of any branch dropped on input.");
366  std::string defaultString("permissive");
367  desc.addUntracked<std::string>("branchesMustMatch", defaultString)
368  ->setComment("'strict': Branches in each input file must match those in the first file.\n"
369  "'permissive': Branches in each input file may be any subset of those in the first file.");
370  desc.addUntracked<bool>("labelRawDataLikeMC", true)
371  ->setComment("If True: replace module label for raw data to match MC. Also use 'LHC' as process.");
372 
373  ProductSelectorRules::fillDescription(desc, "inputCommands");
376  }
377 
380  if(rootFile()) {
381  if(!rootFile()->wasLastEventJustRead()) {
383  }
384  if(noMoreFiles() || atLastFile()) {
386  } else {
388  }
389  }
391  }
392 
395  if(rootFile()) {
396  if(!rootFile()->wasFirstEventJustRead()) {
398  }
399  if(!atFirstFile()) {
401  }
403  }
405  }
406 
407 }
RunNumber_t run() const
Definition: EventID.h:39
EventNumber_t event() const
Definition: EventID.h:41
T getUntrackedParameter(std::string const &, T const &) const
std::string const & logicalFileName() const
void stagein(const std::string &url)
static void fillDescription(ParameterSetDescription &desc, char const *parameterName)
void initFile(bool skipBadFiles)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
std::string const & fileName() const
InputSource::ItemType getNextItemType(RunNumber_t &run, LuminosityBlockNumber_t &lumi, EventNumber_t &event)
void setAtFileSequenceNumber(size_t offset)
virtual void initFile_(bool skipBadFiles) override
ProcessingController::ForwardState forwardState() const
tuple lumi
Definition: fjr2json.py:35
assert(m_qm.get())
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper() const
Accessor for thinnedAssociationsHelper.
Definition: InputSource.h:178
unsigned long long EventNumber_t
std::vector< std::shared_ptr< IndexIntoFile > > const & indexesIntoFiles() const
ProcessingMode processingMode() const
RunsLumisAndEvents (default), RunsAndLumis, or Runs.
Definition: InputSource.h:251
static void fillDescription(ParameterSetDescription &desc)
std::shared_ptr< DuplicateChecker > duplicateChecker_
static void fillDescription(ParameterSetDescription &desc)
std::unique_ptr< FileBlock > readFile_()
LuminosityBlockNumber_t luminosityBlock() const
Definition: EventID.h:40
RootPrimaryFileSequence(ParameterSet const &pset, PoolSource &input, InputFileCatalog const &catalog, unsigned int nStreams)
unsigned int const defaultCacheSize
Definition: RootTree.h:37
unsigned int LuminosityBlockNumber_t
static std::string const input
Definition: EdmProvDump.cc:43
std::shared_ptr< EventSkipperByID > eventSkipperByID_
ProcessingController::ReverseState reverseState() const
static StorageFactory * get(void)
bool goToEvent(EventID const &eventID)
int remainingEvents() const
Definition: InputSource.h:195
std::string const & lfn() const
std::shared_ptr< BranchIDListHelper > branchIDListHelper() const
Accessor for branchIDListHelper.
Definition: InputSource.h:175
virtual RootFileSharedPtr makeRootFile(std::shared_ptr< InputFile > filePtr) override
void initTheFile(bool skipBadFiles, bool deleteIndexIntoFile, InputSource *input, char const *inputTypeName, InputType inputType)
RootFileSharedPtr const & rootFile() const
std::vector< ProcessHistoryID > orderedProcessHistoryIDs_
bool isAvailable() const
Definition: Service.h:46
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
#define end
Definition: vmac.h:37
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
std::shared_ptr< RootFile > RootFileSharedPtr
ProductSelectorRules productSelectorRules_
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:345
BranchDescription::MatchMode branchesMustMatch_
tuple skipBadFiles
Definition: example_cfg.py:64
static void fillDescription(ParameterSetDescription &desc)
#define begin
Definition: vmac.h:30
ProcessHistoryRegistry & processHistoryRegistryForUpdate() const
Definition: InputSource.h:346
void updateFromInput(ProductList const &other)
void activateTimeout(const std::string &url)
int remainingLuminosityBlocks() const
Definition: InputSource.h:203
unsigned int RunNumber_t
volatile std::atomic< bool > shutdown_flag false
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:209
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.