CMS 3D CMS Logo

SourceWithWaits.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: FWCore/Integration
4 // Class : SourceWithWaits
5 //
6 // Original Author: W. David Dagenhart
7 // Created: 12 October 2023
8 
9 // This source allows configuring both a time per lumi section
10 // and events per lumi. Calls to usleep are inserted in the
11 // getNextItemType function in the amount
12 //
13 // (time per lumi) / (events per lumi + 1)
14 //
15 // The sleeps occur before getNextItemType returns when
16 // an event is next and also when a lumi is next (excluding
17 // the first lumi). The total time sleeping that elapses per
18 // lumi is approximately equal to the configured amount.
19 // The algorithm accomplishing this is not perfect and
20 // if the events take enough time to process, then the lumis
21 // will last longer than configured amount (just because
22 // that was a lot easier to implement and good enough for
23 // the test this is used for).
24 //
25 // The time per lumi is the same for all lumis. events per lumi
26 // can be different each lumi. You can also configure a single
27 // value for lumis per run if you want multiple runs.
28 //
29 // The job will stop when the end of the vector specifying
30 // events per lumi is reached (it might end earlier if maxEvents
31 // is also configured).
32 //
33 // In some ways this source is like EmptySource. It does not produce
34 // or read anything. The initial intent is to use for tests of
35 // some issues we are facing with concurrent lumis in the online
36 // source. It emulates the relevant behavior of that source without
37 // all the associated complexity.
38 
52 
53 #include <cassert>
54 #include <memory>
55 #include <unistd.h>
56 #include <vector>
57 
58 namespace edmtest {
60  public:
62  ~SourceWithWaits() override;
64 
65  private:
67  std::shared_ptr<edm::RunAuxiliary> readRunAuxiliary_() override;
68  std::shared_ptr<edm::LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
69  void readEvent_(edm::EventPrincipal&) override;
70 
71  double timePerLumi_; // seconds
72  double sleepAfterStartOfRun_; // seconds
73  std::vector<unsigned int> eventsPerLumi_;
74  unsigned int lumisPerRun_;
75  unsigned int multipleEntriesForRun_;
79 
83  unsigned int currentFile_ = 0;
84  unsigned int eventInCurrentLumi_ = 0;
85  unsigned int lumiInCurrentRun_ = 0;
86  bool startedNewRun_ = false;
87  bool lastEventOfLumi_ = false;
88  bool noEventsInLumi_ = false;
89  };
90 
92  : edm::InputSource(pset, desc),
93  timePerLumi_(pset.getUntrackedParameter<double>("timePerLumi")),
94  sleepAfterStartOfRun_(pset.getUntrackedParameter<double>("sleepAfterStartOfRun")),
95  eventsPerLumi_(pset.getUntrackedParameter<std::vector<unsigned int>>("eventsPerLumi")),
96  lumisPerRun_(pset.getUntrackedParameter<unsigned int>("lumisPerRun")),
97  multipleEntriesForRun_(pset.getUntrackedParameter<unsigned int>("multipleEntriesForRun")),
98  multipleEntriesForLumi_(pset.getUntrackedParameter<unsigned int>("multipleEntriesForLumi")),
99  declareLast_(pset.getUntrackedParameter<bool>("declareLast")),
100  declareAllLast_(pset.getUntrackedParameter<bool>("declareAllLast")) {}
101 
103 
106  desc.addUntracked<double>("timePerLumi");
107  desc.addUntracked<double>("sleepAfterStartOfRun");
108  desc.addUntracked<std::vector<unsigned int>>("eventsPerLumi");
109  desc.addUntracked<unsigned int>("lumisPerRun");
110  desc.addUntracked<unsigned int>("multipleEntriesForRun", 0);
111  desc.addUntracked<unsigned int>("multipleEntriesForLumi", 0);
112  desc.addUntracked<bool>("declareLast", false);
113  desc.addUntracked<bool>("declareAllLast", false);
114  descriptions.add("source", desc);
115  }
116 
118  constexpr unsigned int secondsToMicroseconds = 1000000;
119 
120  if (startedNewRun_) {
121  usleep(secondsToMicroseconds * sleepAfterStartOfRun_);
122  startedNewRun_ = false;
123  }
124 
126  usleep(secondsToMicroseconds * timePerLumi_ / (eventsPerLumi_[currentLumi_ - 1] + 1));
127  lastEventOfLumi_ = false;
128  noEventsInLumi_ = false;
129  }
130 
131  // First three cases are for the initial file, run, and lumi transitions
132  // Note that there will always be at exactly one file and at least
133  // one run from this test source.
134  if (currentFile_ == 0u) {
135  ++currentFile_;
136  return ItemType::IsFile;
137  }
138  // First Run
139  else if (currentRun_ == 0u) {
140  ++currentRun_;
142  startedNewRun_ = true;
143  auto const position =
146  } else {
147  // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
148  // if the Framework detects the potential InputSource bug and throws an exception.
151  }
152  }
153  // If configured, a second Entry for the same run number and reduced ProcessHistoryID
154  else if (currentRun_ == multipleEntriesForRun_) {
156  startedNewRun_ = true;
157  auto const position =
160  }
161  // First lumi
162  else if (currentLumi_ == 0u && lumisPerRun_ != 0) {
163  ++currentLumi_;
165  // The job will stop when we hit the end of the eventsPerLumi vector
166  // unless maxEvents stopped it earlier.
167  if ((currentLumi_ - 1) >= eventsPerLumi_.size()) {
168  return ItemType::IsStop;
169  }
171  if (eventsPerLumi_[currentLumi_ - 1] == 0) {
172  noEventsInLumi_ = true;
173  }
174  auto const position =
177  } else {
178  // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
179  // if the Framework detects the potential InputSource bug and throws an exception.
182  }
183  }
184  // If configured, a second Entry for the same lumi number in the same run
185  else if (currentLumi_ == multipleEntriesForLumi_ && lumisPerRun_ != 0) {
187  if (eventsPerLumi_[currentLumi_ - 1] == 0) {
188  noEventsInLumi_ = true;
189  }
190  auto const position =
193  }
194  // Handle events in the current lumi
196  // note the argument to usleep is microseconds, timePerLumi_ is in seconds
197  usleep(secondsToMicroseconds * timePerLumi_ / (eventsPerLumi_[currentLumi_ - 1] + 1));
199  ++currentEvent_;
201  lastEventOfLumi_ = true;
202  }
203  return ItemType::IsEvent;
204  }
205  // Next lumi
206  else if (lumiInCurrentRun_ < lumisPerRun_) {
207  ++currentLumi_;
209  // The job will stop when we hit the end of the eventsPerLumi vector
210  // unless maxEvents stopped it earlier.
211  if ((currentLumi_ - 1) >= eventsPerLumi_.size()) {
212  return ItemType::IsStop;
213  }
216  if (eventsPerLumi_[currentLumi_ - 1] == 0) {
217  noEventsInLumi_ = true;
218  }
219  auto const position =
222  } else {
223  // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
224  // if the Framework detects the potential InputSource bug and throws an exception.
227  }
228  }
229  // Next run
230  else {
231  // The job will stop when we hit the end of the eventsPerLumi vector
232  // unless maxEvents stopped it earlier. Don't start the run if
233  // it will end with no lumis in it.
234  if (currentLumi_ >= eventsPerLumi_.size()) {
235  return ItemType::IsStop;
236  }
237  ++currentRun_;
238  // Avoid infinite job if lumisPerRun_ is 0
239  if (currentRun_ > 100) {
240  return ItemType::IsStop;
241  }
242  lumiInCurrentRun_ = 0;
244  startedNewRun_ = true;
245  auto const position =
248  } else {
249  // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
250  // if the Framework detects the potential InputSource bug and throws an exception.
253  }
254  }
255  // Should be impossible to get here
256  assert(false);
257  // return something so it will compile
258  return ItemType::IsStop;
259  }
260 
261  std::shared_ptr<edm::RunAuxiliary> SourceWithWaits::readRunAuxiliary_() {
263  return std::make_shared<edm::RunAuxiliary>(currentRun_, ts, edm::Timestamp::invalidTimestamp());
264  }
265 
266  std::shared_ptr<edm::LuminosityBlockAuxiliary> SourceWithWaits::readLuminosityBlockAuxiliary_() {
268  return std::make_shared<edm::LuminosityBlockAuxiliary>(
270  }
271 
273  bool isRealData = false;
276  auto history = processHistoryRegistry().getMapped(aux.processHistoryID());
277  eventPrincipal.fillEventPrincipal(aux, history);
278  }
279 
280 } // namespace edmtest
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
SourceWithWaits(edm::ParameterSet const &, edm::InputSourceDescription const &)
unsigned int multipleEntriesForLumi_
unsigned long long EventNumber_t
unsigned int eventInCurrentLumi_
edm::InputSource::ItemTypeInfo getNextItemType() override
unsigned int multipleEntriesForRun_
assert(be >=bs)
unsigned int LuminosityBlockNumber_t
std::vector< unsigned int > eventsPerLumi_
static void fillDescriptions(edm::ConfigurationDescriptions &)
edm::RunNumber_t currentRun_
#define DEFINE_FWK_INPUT_SOURCE(type)
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:226
void readEvent_(edm::EventPrincipal &) override
std::shared_ptr< edm::RunAuxiliary > readRunAuxiliary_() override
std::shared_ptr< edm::LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_() override
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:168
void add(std::string const &label, ParameterSetDescription const &psetDescription)
bool getMapped(ProcessHistoryID const &key, ProcessHistory &value) const
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistory const *processHistory, DelayedReader *reader=nullptr)
edm::LuminosityBlockNumber_t currentLumi_
edm::EventNumber_t currentEvent_
HLT enums.
static int position[264][3]
Definition: ReadPGInfo.cc:289
unsigned int RunNumber_t
Helper class to handle FWLite file input sources.