CMS 3D CMS Logo

InputSource.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_InputSource_h
2 #define FWCore_Framework_InputSource_h
3 
4 /*----------------------------------------------------------------------
5 
6 InputSource: Abstract interface for all input sources. Input
7 sources are responsible for creating an EventPrincipal, using data
8 controlled by the source, and external to the EventPrincipal itself.
9 
10 The InputSource is also responsible for dealing with the "process
11 name list" contained within the EventPrincipal. Each InputSource has
12 to know what "process" (HLT, PROD, USER, USER1, etc.) the program is
13 part of. The InputSource is repsonsible for pushing this process name
14 onto the end of the process name list.
15 
16 For now, we specify this process name to the constructor of the
17 InputSource. This should be improved.
18 
19  Some questions about this remain:
20 
21  1. What should happen if we "rerun" a process? i.e., if "USER1" is
22  already last in our input file, and we run again a job which claims
23  to be "USER1", what should happen? For now, we just quietly add
24  this to the history.
25 
26  2. Do we need to detect a problem with a history like:
27  HLT PROD USER1 PROD
28  or is it up to the user not to do something silly? Right now, there
29  is no protection against such sillyness.
30 
31 Some examples of InputSource subclasses may be:
32 
33  1) EmptySource: creates EventPrincipals which contain no EDProducts.
34  2) PoolSource: creates EventPrincipals which "contain" the data
35  read from a EDM/ROOT file. This source should provide for delayed loading
36  of data, thus the quotation marks around contain.
37  3) DAQSource: creats EventPrincipals which contain raw data, as
38  delivered by the L1 trigger and event builder.
39 
40 ----------------------------------------------------------------------*/
41 
55 
56 #include <memory>
57 #include <string>
58 #include <chrono>
59 #include <mutex>
60 
61 namespace edm {
62  class ActivityRegistry;
63  class BranchIDListHelper;
65  class HistoryAppender;
66  class ParameterSet;
68  class ProcessContext;
69  class ProcessHistoryRegistry;
70  class ProductRegistry;
71  class StreamContext;
72  class ModuleCallingContext;
74  class ThinnedAssociationsHelper;
75 
76  class InputSource {
77  public:
79 
81 
83  explicit InputSource(ParameterSet const&, InputSourceDescription const&);
84 
86  virtual ~InputSource() noexcept(false);
87 
88  InputSource(InputSource const&) = delete; // Disallow copying and moving
89  InputSource& operator=(InputSource const&) = delete; // Disallow copying and moving
90 
91  static void fillDescriptions(ConfigurationDescriptions& descriptions);
92  static const std::string& baseType();
93  static void fillDescription(ParameterSetDescription& desc);
95 
98 
101 
103  bool readEvent(EventPrincipal& ep, EventID const&, StreamContext&);
104 
106  std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary();
107 
109  std::shared_ptr<RunAuxiliary> readRunAuxiliary();
110 
112  void readRun(RunPrincipal& runPrincipal, HistoryAppender& historyAppender);
113 
115  void readAndMergeRun(RunPrincipal& rp);
116 
118  void readLuminosityBlock(LuminosityBlockPrincipal& lumiPrincipal, HistoryAppender& historyAppender);
119 
122 
124  std::unique_ptr<FileBlock> readFile();
125 
127  void closeFile(FileBlock*, bool cleaningUpAfterException);
128 
131  void skipEvents(int offset);
132 
133  bool goToEvent(EventID const& eventID);
134 
136  void rewind();
137 
140 
143 
145  void issueReports(EventID const& eventID, StreamID streamID);
146 
148  virtual void registerProducts();
149 
151  std::shared_ptr<ProductRegistry const> productRegistry() const { return get_underlying_safe(productRegistry_); }
152  std::shared_ptr<ProductRegistry>& productRegistry() { return get_underlying_safe(productRegistry_); }
153 
157 
159  std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
161  }
162  std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
163 
165  std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
167  }
168  std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
170  }
171 
173  void repeat() {
176  }
177 
179  std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader();
180 
183  int maxEvents() const { return maxEvents_; }
184 
187  int remainingEvents() const { return remainingEvents_; }
188 
191  int maxLuminosityBlocks() const { return maxLumis_; }
192 
196 
199 
202 
204  std::string const& processGUID() const { return processGUID_; }
205 
207  void doBeginJob();
208 
210  void doEndJob();
211 
213  virtual void doBeginLumi(LuminosityBlockPrincipal& lbp, ProcessContext const*);
214 
216  virtual void doBeginRun(RunPrincipal& rp, ProcessContext const*);
217 
219  Timestamp const& timestamp() const { return time_; }
220 
225 
227  RunNumber_t run() const;
228 
231 
234 
236  std::shared_ptr<ActivityRegistry> actReg() const { return actReg_; }
237 
239  std::shared_ptr<RunAuxiliary> runAuxiliary() const { return runAuxiliary_; }
240 
242  std::shared_ptr<LuminosityBlockAuxiliary> luminosityBlockAuxiliary() const { return lumiAuxiliary_; }
243 
244  bool randomAccess() const;
247 
249  public:
252 
253  EventSourceSentry(EventSourceSentry const&) = delete; // Disallow copying and moving
254  EventSourceSentry& operator=(EventSourceSentry const&) = delete; // Disallow copying and moving
255 
256  private:
259  };
260 
262  public:
264  ~LumiSourceSentry();
265 
266  LumiSourceSentry(LumiSourceSentry const&) = delete; // Disallow copying and moving
267  LumiSourceSentry& operator=(LumiSourceSentry const&) = delete; // Disallow copying and moving
268 
269  private:
272  };
273 
275  public:
277  ~RunSourceSentry();
278 
279  RunSourceSentry(RunSourceSentry const&) = delete; // Disallow copying and moving
280  RunSourceSentry& operator=(RunSourceSentry const&) = delete; // Disallow copying and moving
281 
282  private:
285  };
286 
288  public:
290  explicit FileOpenSentry(InputSource const& source, std::string const& lfn, bool usedFallback);
291  ~FileOpenSentry();
292 
293  FileOpenSentry(FileOpenSentry const&) = delete; // Disallow copying and moving
294  FileOpenSentry& operator=(FileOpenSentry const&) = delete; // Disallow copying and moving
295 
296  private:
297  Sig& post_;
300  };
301 
303  public:
305  explicit FileCloseSentry(InputSource const& source, std::string const& lfn, bool usedFallback);
306  ~FileCloseSentry();
307 
308  FileCloseSentry(FileCloseSentry const&) = delete; // Disallow copying and moving
309  FileCloseSentry& operator=(FileCloseSentry const&) = delete; // Disallow copying and moving
310 
311  private:
312  Sig& post_;
315  };
316 
319 
320  protected:
321  virtual void skip(int offset);
322 
324  void setTimestamp(Timestamp const& theTime) { time_ = theTime; }
325 
328  ItemType state() const { return state_; }
330  runAuxiliary_.reset(rp);
331  newRun_ = newLumi_ = true;
332  }
334  lumiAuxiliary_.reset(lbp);
335  newLumi_ = true;
336  }
337  void resetRunAuxiliary(bool isNewRun = true) const {
338  runAuxiliary_.reset();
339  newRun_ = newLumi_ = isNewRun;
340  }
341  void resetLuminosityBlockAuxiliary(bool isNewLumi = true) const {
342  lumiAuxiliary_.reset();
343  newLumi_ = isNewLumi;
344  }
345  void reset() const {
348  state_ = IsInvalid;
349  }
350  bool newRun() const { return newRun_; }
351  void setNewRun() { newRun_ = true; }
352  void resetNewRun() { newRun_ = false; }
353  bool newLumi() const { return newLumi_; }
354  void setNewLumi() { newLumi_ = true; }
355  void resetNewLumi() { newLumi_ = false; }
356  bool eventCached() const { return eventCached_; }
358  void setEventCached() { eventCached_ = true; }
359  void resetEventCached() { eventCached_ = false; }
360 
363  void decreaseRemainingEventsBy(int iSkipped);
364 
365  private:
366  bool eventLimitReached() const { return remainingEvents_ == 0; }
367  bool lumiLimitReached() const {
368  if (remainingLumis_ == 0) {
369  return true;
370  }
371  if (maxSecondsUntilRampdown_ <= 0) {
372  return false;
373  }
375  auto elapsed = end - processingStart_;
376  if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() > maxSecondsUntilRampdown_) {
377  return true;
378  }
379  return false;
380  }
381  bool limitReached() const { return eventLimitReached() || lumiLimitReached(); }
382  virtual ItemType getNextItemType() = 0;
384  virtual std::shared_ptr<RunAuxiliary> readRunAuxiliary_() = 0;
385  virtual std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() = 0;
386  virtual void readRun_(RunPrincipal& runPrincipal);
387  virtual void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal);
388  virtual void readEvent_(EventPrincipal& eventPrincipal) = 0;
389  virtual bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext);
390  virtual std::unique_ptr<FileBlock> readFile_();
391  virtual void closeFile_() {}
392  virtual bool goToEvent_(EventID const& eventID);
393  virtual void setRun(RunNumber_t r);
394  virtual void setLumi(LuminosityBlockNumber_t lb);
395  virtual void rewind_();
396  virtual void beginJob();
397  virtual void endJob();
398  virtual std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_();
399 
400  virtual bool randomAccess_() const;
403 
404  private:
405  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
412  std::chrono::time_point<std::chrono::steady_clock> processingStart_;
421  mutable bool newRun_;
422  mutable bool newLumi_;
424  mutable ItemType state_;
425  mutable std::shared_ptr<RunAuxiliary> runAuxiliary_;
426  mutable std::shared_ptr<LuminosityBlockAuxiliary> lumiAuxiliary_;
428 
430  };
431 } // namespace edm
432 
433 #endif
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:155
virtual void setRun(RunNumber_t r)
Definition: InputSource.cc:357
std::shared_ptr< RunAuxiliary > runAuxiliary_
Definition: InputSource.h:425
void doBeginJob()
Called by framework at beginning of job.
Definition: InputSource.cc:205
virtual std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_()=0
std::shared_ptr< LuminosityBlockAuxiliary > lumiAuxiliary_
Definition: InputSource.h:426
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:326
virtual void closeFile_()
Definition: InputSource.h:391
void decreaseRemainingEventsBy(int iSkipped)
Definition: InputSource.cc:388
Timestamp time_
Definition: InputSource.h:420
static std::string const source("source")
virtual void doBeginRun(RunPrincipal &rp, ProcessContext const *)
Called by framework at beginning of run.
Definition: InputSource.cc:399
InputSource & operator=(InputSource const &)=delete
void doEndJob()
Called by framework at end of job.
Definition: InputSource.cc:207
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader()
Returns nullptr if no resource shared between the Source and a DelayedReader.
Definition: InputSource.cc:209
void readAndMergeRun(RunPrincipal &rp)
Read next run (same as a prior run)
Definition: InputSource.cc:246
std::string statusFileName_
Definition: InputSource.h:427
bool newLumi() const
Definition: InputSource.h:353
virtual void registerProducts()
Register any produced products.
Definition: InputSource.cc:217
virtual void setLumi(LuminosityBlockNumber_t lb)
Definition: InputSource.cc:363
void setLuminosityBlockNumber_t(LuminosityBlockNumber_t lb)
Set the luminosity block ID.
Definition: InputSource.h:142
std::shared_ptr< ProductRegistry > & productRegistry()
Definition: InputSource.h:152
void resetRunAuxiliary(bool isNewRun=true) const
Definition: InputSource.h:337
virtual void rewind_()
Definition: InputSource.cc:382
std::shared_ptr< RunAuxiliary > readRunAuxiliary()
Read next run Auxiliary.
Definition: InputSource.cc:200
ProcessingMode processingMode() const
RunsLumisAndEvents (default), RunsAndLumis, or Runs.
Definition: InputSource.h:233
void setTimestamp(Timestamp const &theTime)
To set the current time, as seen by the input source.
Definition: InputSource.h:324
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:436
unsigned int LuminosityBlockNumber_t
virtual void readEvent_(EventPrincipal &eventPrincipal)=0
void closeFile(FileBlock *, bool cleaningUpAfterException)
close current file
Definition: InputSource.cc:227
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:204
ProcessHistoryID const & reducedProcessHistoryID() const
Definition: InputSource.cc:431
bool limitReached() const
Definition: InputSource.h:381
int maxEvents() const
Definition: InputSource.h:183
virtual bool goToEvent_(EventID const &eventID)
Definition: InputSource.cc:375
std::chrono::time_point< std::chrono::steady_clock > processingStart_
Definition: InputSource.h:412
ProcessingController::ForwardState forwardState() const
Definition: InputSource.cc:407
unsigned int numberOfEventsBeforeBigSkip_
Definition: InputSource.h:429
EventSourceSentry(InputSource const &source, StreamContext &sc)
Definition: InputSource.cc:446
std::shared_ptr< BranchIDListHelper > & branchIDListHelper()
Definition: InputSource.h:162
virtual ProcessingController::ForwardState forwardState_() const
Definition: InputSource.cc:423
virtual std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader_()
Definition: InputSource.cc:213
int remainingEvents() const
Definition: InputSource.h:187
ProcessingController::ReverseState reverseState() const
Definition: InputSource.cc:412
bool lumiLimitReached() const
Definition: InputSource.h:367
void readRun(RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
Read next run (new run)
Definition: InputSource.cc:240
virtual void beginJob()
Definition: InputSource.cc:417
ProcessingMode processingMode_
Definition: InputSource.h:413
int maxSecondsUntilRampdown_
Definition: InputSource.h:411
std::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary()
Read next luminosity block Auxilary.
Definition: InputSource.cc:195
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:333
virtual void doBeginLumi(LuminosityBlockPrincipal &lbp, ProcessContext const *)
Called by framework at beginning of lumi block.
Definition: InputSource.cc:401
edm::propagate_const< std::shared_ptr< ThinnedAssociationsHelper > > thinnedAssociationsHelper_
Definition: InputSource.h:418
virtual ItemType getNextItemType()=0
bool newRun() const
Definition: InputSource.h:350
static const std::string & baseType()
Definition: InputSource.cc:113
std::shared_ptr< ActivityRegistry > actReg_
Definition: InputSource.h:405
void issueReports(EventID const &eventID, StreamID streamID)
issue an event report
Definition: InputSource.cc:335
virtual bool readIt(EventID const &id, EventPrincipal &eventPrincipal, StreamContext &streamContext)
Definition: InputSource.cc:351
bool goToEvent(EventID const &eventID)
Definition: InputSource.cc:321
#define end
Definition: vmac.h:39
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
ProcessHistoryRegistry & processHistoryRegistry()
Definition: InputSource.h:156
ItemType state() const
Definition: InputSource.h:328
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:358
void resetEventCached()
Definition: InputSource.h:359
virtual ~InputSource() noexcept(false)
Destructor.
Definition: InputSource.cc:101
std::string processGUID_
Definition: InputSource.h:419
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:441
#define noexcept
bool eventLimitReached() const
Definition: InputSource.h:366
void readAndMergeLumi(LuminosityBlockPrincipal &lbp)
Read next luminosity block (same as a prior lumi)
Definition: InputSource.cc:260
void skipEvents(int offset)
Definition: InputSource.cc:317
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:159
edm::propagate_const< std::shared_ptr< ProductRegistry > > productRegistry_
Definition: InputSource.h:415
virtual void readRun_(RunPrincipal &runPrincipal)
Definition: InputSource.cc:269
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper()
Definition: InputSource.h:168
virtual std::unique_ptr< FileBlock > readFile_()
Definition: InputSource.cc:238
Timestamp const & timestamp() const
Accessor for the current time, as seen by the input source.
Definition: InputSource.h:219
static void fillDescriptions(ConfigurationDescriptions &descriptions)
Definition: InputSource.cc:103
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:239
void readEvent(EventPrincipal &ep, StreamContext &)
Read next event.
Definition: InputSource.cc:280
std::unique_ptr< FileBlock > readFile()
Read next file.
Definition: InputSource.cc:220
void setRunNumber(RunNumber_t r)
Set the run number.
Definition: InputSource.h:139
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:341
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:327
virtual bool randomAccess_() const
Definition: InputSource.cc:421
virtual std::shared_ptr< RunAuxiliary > readRunAuxiliary_()=0
edm::propagate_const< std::shared_ptr< BranchIDListHelper > > branchIDListHelper_
Definition: InputSource.h:417
ItemType nextItemType_()
Definition: InputSource.cc:134
HLT enums.
void reset() const
Definition: InputSource.h:345
void repeat()
Reset the remaining number of events/lumis to the maximum number.
Definition: InputSource.h:173
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:329
std::shared_ptr< ActivityRegistry > actReg() const
Accessor for Activity Registry.
Definition: InputSource.h:236
edm::propagate_const< std::unique_ptr< ProcessHistoryRegistry > > processHistoryRegistry_
Definition: InputSource.h:416
std::shared_ptr< ProductRegistry const > productRegistry() const
Accessors for product registry.
Definition: InputSource.h:151
virtual ProcessingController::ReverseState reverseState_() const
Definition: InputSource.cc:427
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Accessors for thinnedAssociationsHelper.
Definition: InputSource.h:165
bool randomAccess() const
Definition: InputSource.cc:403
EventSourceSentry & operator=(EventSourceSentry const &)=delete
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:115
void rewind()
Begin again at the first event.
Definition: InputSource.cc:326
int remainingLuminosityBlocks() const
Definition: InputSource.h:195
ModuleDescription const & moduleDescription() const
Accessor for &#39;module&#39; description.
Definition: InputSource.h:198
unsigned int RunNumber_t
virtual void readLuminosityBlock_(LuminosityBlockPrincipal &lumiPrincipal)
Definition: InputSource.cc:276
signalslot::Signal< void(std::string const &, bool)> Sig
Definition: InputSource.h:304
void readLuminosityBlock(LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
Read next luminosity block (new lumi)
Definition: InputSource.cc:251
ProcessConfiguration const & processConfiguration() const
Accessor for Process Configuration.
Definition: InputSource.h:201
int maxLuminosityBlocks() const
Definition: InputSource.h:191
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
Definition: InputSource.h:317
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:242
ItemType nextItemType()
Advances the source to the next item.
Definition: InputSource.cc:149
InputSource(ParameterSet const &, InputSourceDescription const &)
Constructor.
Definition: InputSource.cc:46
virtual void endJob()
Definition: InputSource.cc:419
ModuleDescription const moduleDescription_
Definition: InputSource.h:414
static void prevalidate(ConfigurationDescriptions &)
Definition: InputSource.cc:109
virtual void skip(int offset)
Definition: InputSource.cc:369
ProcessConfiguration const & processConfiguration() const
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
Definition: InputSource.h:318
signalslot::Signal< void(std::string const &, bool)> Sig
Definition: InputSource.h:289
bool eventCached() const
Definition: InputSource.h:356