CMS 3D CMS Logo

SourceWithWaits.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: FWCore/Integration
4 // Class : SourceWithWaits
5 //
6 // Original Author: W. David Dagenhart
7 // Created: 12 October 2023
8 
9 // This source allows configuring both a time per lumi section
10 // and events per lumi. Calls to std::this_thread::sleep_for are inserted in the
11 // getNextItemType function in the amount
12 //
13 // (time per lumi) / (events per lumi + 1)
14 //
15 // The sleeps occur before getNextItemType returns when
16 // an event is next and also when a lumi is next (excluding
17 // the first lumi). The total time sleeping that elapses per
18 // lumi is approximately equal to the configured amount.
19 // The algorithm accomplishing this is not perfect and
20 // if the events take enough time to process, then the lumis
21 // will last longer than configured amount (just because
22 // that was a lot easier to implement and good enough for
23 // the test this is used for).
24 //
25 // The time per lumi is the same for all lumis. events per lumi
26 // can be different each lumi. You can also configure a single
27 // value for lumis per run if you want multiple runs.
28 //
29 // The job will stop when the end of the vector specifying
30 // events per lumi is reached (it might end earlier if maxEvents
31 // is also configured).
32 //
33 // In some ways this source is like EmptySource. It does not produce
34 // or read anything. The initial intent is to use for tests of
35 // some issues we are facing with concurrent lumis in the online
36 // source. It emulates the relevant behavior of that source without
37 // all the associated complexity.
38 
52 
53 #include <cassert>
54 #include <memory>
55 #include <unistd.h>
56 #include <vector>
57 
58 namespace edmtest {
60  public:
62  ~SourceWithWaits() override;
64 
65  private:
67  std::shared_ptr<edm::RunAuxiliary> readRunAuxiliary_() override;
68  std::shared_ptr<edm::LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
69  void readEvent_(edm::EventPrincipal&) override;
70 
71  double timePerLumi_; // seconds
72  double sleepAfterStartOfRun_; // seconds
73  std::vector<unsigned int> eventsPerLumi_;
74  unsigned int lumisPerRun_;
75  unsigned int multipleEntriesForRun_;
79 
83  unsigned int currentFile_ = 0;
84  unsigned int eventInCurrentLumi_ = 0;
85  unsigned int lumiInCurrentRun_ = 0;
86  bool startedNewRun_ = false;
87  bool lastEventOfLumi_ = false;
88  bool noEventsInLumi_ = false;
89  };
90 
92  : edm::InputSource(pset, desc),
93  timePerLumi_(pset.getUntrackedParameter<double>("timePerLumi")),
94  sleepAfterStartOfRun_(pset.getUntrackedParameter<double>("sleepAfterStartOfRun")),
95  eventsPerLumi_(pset.getUntrackedParameter<std::vector<unsigned int>>("eventsPerLumi")),
96  lumisPerRun_(pset.getUntrackedParameter<unsigned int>("lumisPerRun")),
97  multipleEntriesForRun_(pset.getUntrackedParameter<unsigned int>("multipleEntriesForRun")),
98  multipleEntriesForLumi_(pset.getUntrackedParameter<unsigned int>("multipleEntriesForLumi")),
99  declareLast_(pset.getUntrackedParameter<bool>("declareLast")),
100  declareAllLast_(pset.getUntrackedParameter<bool>("declareAllLast")) {}
101 
103 
106  desc.addUntracked<double>("timePerLumi");
107  desc.addUntracked<double>("sleepAfterStartOfRun");
108  desc.addUntracked<std::vector<unsigned int>>("eventsPerLumi");
109  desc.addUntracked<unsigned int>("lumisPerRun");
110  desc.addUntracked<unsigned int>("multipleEntriesForRun", 0);
111  desc.addUntracked<unsigned int>("multipleEntriesForLumi", 0);
112  desc.addUntracked<bool>("declareLast", false);
113  desc.addUntracked<bool>("declareAllLast", false);
114  descriptions.add("source", desc);
115  }
116 
118  if (startedNewRun_) {
119  std::this_thread::sleep_for(std::chrono::duration<double>(sleepAfterStartOfRun_));
120  startedNewRun_ = false;
121  }
122 
124  std::this_thread::sleep_for(std::chrono::duration<double>(timePerLumi_ / (eventsPerLumi_[currentLumi_ - 1] + 1)));
125  lastEventOfLumi_ = false;
126  noEventsInLumi_ = false;
127  }
128 
129  // First three cases are for the initial file, run, and lumi transitions
130  // Note that there will always be at exactly one file and at least
131  // one run from this test source.
132  if (currentFile_ == 0u) {
133  ++currentFile_;
134  return ItemType::IsFile;
135  }
136  // First Run
137  else if (currentRun_ == 0u) {
138  ++currentRun_;
140  startedNewRun_ = true;
141  auto const position =
144  } else {
145  // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
146  // if the Framework detects the potential InputSource bug and throws an exception.
149  }
150  }
151  // If configured, a second Entry for the same run number and reduced ProcessHistoryID
152  else if (currentRun_ == multipleEntriesForRun_) {
154  startedNewRun_ = true;
155  auto const position =
158  }
159  // First lumi
160  else if (currentLumi_ == 0u && lumisPerRun_ != 0) {
161  ++currentLumi_;
163  // The job will stop when we hit the end of the eventsPerLumi vector
164  // unless maxEvents stopped it earlier.
165  if ((currentLumi_ - 1) >= eventsPerLumi_.size()) {
166  return ItemType::IsStop;
167  }
169  if (eventsPerLumi_[currentLumi_ - 1] == 0) {
170  noEventsInLumi_ = true;
171  }
172  auto const position =
175  } else {
176  // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
177  // if the Framework detects the potential InputSource bug and throws an exception.
180  }
181  }
182  // If configured, a second Entry for the same lumi number in the same run
183  else if (currentLumi_ == multipleEntriesForLumi_ && lumisPerRun_ != 0) {
185  if (eventsPerLumi_[currentLumi_ - 1] == 0) {
186  noEventsInLumi_ = true;
187  }
188  auto const position =
191  }
192  // Handle events in the current lumi
194  std::this_thread::sleep_for(std::chrono::duration<double>(timePerLumi_ / (eventsPerLumi_[currentLumi_ - 1] + 1)));
196  ++currentEvent_;
198  lastEventOfLumi_ = true;
199  }
200  return ItemType::IsEvent;
201  }
202  // Next lumi
203  else if (lumiInCurrentRun_ < lumisPerRun_) {
204  ++currentLumi_;
206  // The job will stop when we hit the end of the eventsPerLumi vector
207  // unless maxEvents stopped it earlier.
208  if ((currentLumi_ - 1) >= eventsPerLumi_.size()) {
209  return ItemType::IsStop;
210  }
213  if (eventsPerLumi_[currentLumi_ - 1] == 0) {
214  noEventsInLumi_ = true;
215  }
216  auto const position =
219  } else {
220  // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
221  // if the Framework detects the potential InputSource bug and throws an exception.
224  }
225  }
226  // Next run
227  else {
228  // The job will stop when we hit the end of the eventsPerLumi vector
229  // unless maxEvents stopped it earlier. Don't start the run if
230  // it will end with no lumis in it.
231  if (currentLumi_ >= eventsPerLumi_.size()) {
232  return ItemType::IsStop;
233  }
234  ++currentRun_;
235  // Avoid infinite job if lumisPerRun_ is 0
236  if (currentRun_ > 100) {
237  return ItemType::IsStop;
238  }
239  lumiInCurrentRun_ = 0;
241  startedNewRun_ = true;
242  auto const position =
245  } else {
246  // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
247  // if the Framework detects the potential InputSource bug and throws an exception.
250  }
251  }
252  // Should be impossible to get here
253  assert(false);
254  // return something so it will compile
255  return ItemType::IsStop;
256  }
257 
258  std::shared_ptr<edm::RunAuxiliary> SourceWithWaits::readRunAuxiliary_() {
260  return std::make_shared<edm::RunAuxiliary>(currentRun_, ts, edm::Timestamp::invalidTimestamp());
261  }
262 
263  std::shared_ptr<edm::LuminosityBlockAuxiliary> SourceWithWaits::readLuminosityBlockAuxiliary_() {
265  return std::make_shared<edm::LuminosityBlockAuxiliary>(
267  }
268 
270  bool isRealData = false;
273  auto history = processHistoryRegistry().getMapped(aux.processHistoryID());
274  eventPrincipal.fillEventPrincipal(aux, history);
275  }
276 
277 } // namespace edmtest
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
SourceWithWaits(edm::ParameterSet const &, edm::InputSourceDescription const &)
unsigned int multipleEntriesForLumi_
unsigned long long EventNumber_t
unsigned int eventInCurrentLumi_
edm::InputSource::ItemTypeInfo getNextItemType() override
unsigned int multipleEntriesForRun_
assert(be >=bs)
unsigned int LuminosityBlockNumber_t
std::vector< unsigned int > eventsPerLumi_
static void fillDescriptions(edm::ConfigurationDescriptions &)
edm::RunNumber_t currentRun_
#define DEFINE_FWK_INPUT_SOURCE(type)
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:226
void readEvent_(edm::EventPrincipal &) override
std::shared_ptr< edm::RunAuxiliary > readRunAuxiliary_() override
std::shared_ptr< edm::LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_() override
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:168
void add(std::string const &label, ParameterSetDescription const &psetDescription)
bool getMapped(ProcessHistoryID const &key, ProcessHistory &value) const
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistory const *processHistory, DelayedReader *reader=nullptr)
edm::LuminosityBlockNumber_t currentLumi_
edm::EventNumber_t currentEvent_
HLT enums.
static int position[264][3]
Definition: ReadPGInfo.cc:289
unsigned int RunNumber_t
Helper class to handle FWLite file input sources.