CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMFileIterator.cc
Go to the documentation of this file.
1 #include "DQMFileIterator.h"
4 
5 #include <queue>
6 #include <boost/regex.hpp>
7 #include <boost/format.hpp>
8 #include <boost/range.hpp>
9 #include <boost/filesystem.hpp>
10 #include <boost/algorithm/string/predicate.hpp>
11 
12 namespace edm {
13 
15  const std::string& filename, int lumiNumber, JsonType type) {
16  boost::property_tree::ptree pt;
17  read_json(filename, pt);
18 
20  lumi.filename = filename;
21 
22  // We rely on n_events to be the first item on the array...
23  lumi.n_events = std::next(pt.get_child("data").begin(), 1)
24  ->second.get_value<std::size_t>();
25 
26  lumi.ls = lumiNumber;
27 
28  if (type == JS_PROTOBUF) {
29  lumi.datafilename = std::next(pt.get_child("data").begin(), 4)
30  ->second.get_value<std::string>();
31  } else {
32  lumi.datafilename = std::next(pt.get_child("data").begin(), 3)
33  ->second.get_value<std::string>();
34  }
35 
36  lumi.loaded = true;
37  return lumi;
38 }
39 
40 // Contents of Eor json file are ignored for the moment.
41 // This function will not be called.
43  const std::string& filename) {
44  boost::property_tree::ptree pt;
45  read_json(filename, pt);
46 
47  EorEntry eor;
48  eor.filename = filename;
49 
50  // We rely on n_events to be the first item on the array...
51  eor.n_events = std::next(pt.get_child("data").begin(), 1)
52  ->second.get_value<std::size_t>();
53  eor.n_lumi = std::next(pt.get_child("data").begin(), 2)
54  ->second.get_value<std::size_t>();
55  eor.datafilename = std::next(pt.get_child("data").begin(), 2)
56  ->second.get_value<std::string>();
57 
58  eor.loaded = true;
59  return eor;
60 }
61 
63  : type_(t), state_(EOR) {
64 
65  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
66  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
67  streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
68  delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
70  pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
71 
72  reset();
73 }
74 
76 
78  runPath_ = str(boost::format("%s/run%06d") % runInputDir_ % runNumber_);
79 
80  eor_.loaded = false;
81  state_ = State::OPEN;
82  currentLumi_ = 1;
83  lumiSeen_.clear();
84 
86 
87  collect(true);
88  update_state();
89 }
90 
92 
94  return lumiSeen_[currentLumi_];
95 }
96 
99 
100  currentLumi_ += 1;
101 }
102 
104  if (lumiSeen_.find(currentLumi_) != lumiSeen_.end()) {
105  return true;
106  }
107 
108  return false;
109 }
110 
111 unsigned int DQMFileIterator::runNumber() { return runNumber_; }
112 
114  if (!lumiSeen_.empty()) {
115  return lumiSeen_.rbegin()->first;
116  }
117 
118  return 1;
119 }
120 
122  currentLumi_ = lumi;
124 }
125 
127  if (boost::starts_with(lumi.datafilename, "/")) return lumi.datafilename;
128 
130  p /= lumi.datafilename;
131  return p.string();
132 }
133 
134 void DQMFileIterator::collect(bool ignoreTimers) {
135  // search filesystem to find available lumi section files
136  // or the end of run files
137 
139  auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
141 
142  // don't refresh if it's too soon
143  if ((!ignoreTimers) && (last_ms < 100)) {
144  return;
145  } else {
147  }
148 
149  // check if directory changed
150  std::time_t t = boost::filesystem::last_write_time(runPath_);
151 
152  if ((!ignoreTimers) && (t <= runPathMTime_)) {
153  logFileAction("Directory hasn't changed.");
154  return;
155  } else {
156  logFileAction("Directory changed, updating.");
157  runPathMTime_ = t;
158  }
159 
160  using boost::filesystem::directory_iterator;
161  using boost::filesystem::directory_entry;
162 
163  std::string fn_eor;
164 
165  directory_iterator dend;
166  for (directory_iterator di(runPath_); di != dend; ++di) {
167  const boost::regex fn_re("run(\\d+)_ls(\\d+)(_.*).jsn");
168 
169  const std::string filename = di->path().filename().string();
170  const std::string fn = di->path().string();
171 
172  boost::smatch result;
173  if (boost::regex_match(filename, result, fn_re)) {
174  unsigned int run = std::stoi(result[1]);
175  unsigned int lumi = std::stoi(result[2]);
176  std::string label = result[3];
177 
178  if (run != runNumber_) continue;
179 
180  // check if this is EoR
181  // for various reasons we have to load it after all other files
182  if ((lumi == 0) && (label == "_EoR") && (!eor_.loaded)) {
183  fn_eor = fn;
184  continue;
185  }
186 
187  // check if lumi is loaded
188  if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
189  continue; // already loaded
190  }
191 
192  // check if this belongs to us
193  if (label != streamLabel_) {
194  logFileAction("Found and skipped json file (stream label mismatch): ",
195  fn);
196  continue;
197  }
198 
199  LumiEntry lumi_jsn = LumiEntry::load_json(fn, lumi, type_);
200  lumiSeen_.emplace(lumi, lumi_jsn);
201  logFileAction("Found and loaded json file: ", fn);
202  }
203  }
204 
205  if (!fn_eor.empty()) {
206  logFileAction("EoR file found: ", fn_eor);
207 
208  // @TODO load EoR files correctly
209  // eor_ = EorEntry::load_json(fn_eor);
210  // logFileAction("Loaded eor file: ", fn_eor);
211 
212  // for now , set n_lumi to the highest _found_ lumi
213  eor_.loaded = true;
214 
215  if (lumiSeen_.empty()) {
216  eor_.n_lumi = 0;
217  } else {
218  eor_.n_lumi = lumiSeen_.rbegin()->first;
219  }
220  }
221 }
222 
224  using std::chrono::high_resolution_clock;
225  using std::chrono::duration_cast;
226  using std::chrono::milliseconds;
227 
228  collect(false);
229 
230  // now update the state
231  State old_state = state_;
232 
233  if ((state_ == State::OPEN) && (eor_.loaded)) {
234  state_ = State::EOR_CLOSING;
235  }
236 
237  // special case for missing lumi files
238  // skip to the next available, but after the timeout
239  if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
240  auto iter = lumiSeen_.lower_bound(currentLumi_);
241  if ((iter != lumiSeen_.end()) && iter->first != currentLumi_) {
242 
243  auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
244  auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
245 
246  if (elapsed_ms >= nextLumiTimeoutMillis_) {
247  std::string msg("Timeout reached, skipping lumisection(s) ");
248  msg += std::to_string(currentLumi_) + " .. " +
249  std::to_string(iter->first - 1);
250  msg += ", currentLumi_ is now " + std::to_string(iter->first);
251 
252  logFileAction(msg);
253 
254  currentLumi_ = iter->first;
255  }
256  }
257  }
258 
259  if (state_ == State::EOR_CLOSING) {
260  // check if we parsed all lumis
261  // n_lumi is both last lumi and the number of lumi
262  // since lumis are indexed from 1
263 
264  // after all lumi have been pop()'ed
265  // current lumi will become larger than the last lumi
266  if (currentLumi_ > eor_.n_lumi) {
267  state_ = State::EOR;
268  }
269  }
270 
271  if (state_ != old_state) {
272  logFileAction("Streamer state changed: ",
273  std::to_string(old_state) + "->" + std::to_string(state_));
274  }
275 }
276 
278  const std::string& fileName) const {
279  edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay()
280  << " " << msg << fileName;
282 }
283 
285  const char* x = getenv("WATCHDOG_FD");
286  if (x) {
287  int fd = atoi(x);
288  write(fd, ".\n", 2);
289  }
290 }
291 
293  logFileAction("Streamer waiting for the next LS.");
294 
295  updateWatchdog();
296  usleep(delayMillis_ * 1000);
297  updateWatchdog();
298 }
299 
301 
302  desc.addUntracked<unsigned int>("runNumber")
303  ->setComment("Run number passed via configuration file.");
304 
305  desc.addUntracked<std::string>("streamLabel")
306  ->setComment("Stream label used in json discovery.");
307 
308  desc.addUntracked<uint32_t>("delayMillis")
309  ->setComment("Number of milliseconds to wait between file checks.");
310 
311  desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)->setComment(
312  "Number of milliseconds to wait before switching to the next lumi "
313  "section if the current is missing, -1 to disable.");
314 
315  desc.addUntracked<std::string>("runInputDir")
316  ->setComment("Directory where the DQM files will appear.");
317 }
318 
319 } /* end of namespace */
type
Definition: HCALResponse.h:21
T getUntrackedParameter(std::string const &, T const &) const
std::string make_path_data(const LumiEntry &lumi)
static LumiEntry load_json(const std::string &filename, int lumiNumber, JsonType type)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void FlushMessageLog()
tuple lumi
Definition: fjr2json.py:35
DQMFileIterator(ParameterSet const &pset, JsonType t)
void advanceToLumi(unsigned int lumi)
std::string runInputDir_
string format
Some error handling for the usage.
std::map< unsigned int, LumiEntry > lumiSeen_
U second(std::pair< T, U > const &p)
std::string to_string(const T &t)
Definition: Logger.cc:26
tuple path
else: Piece not in the list, fine.
const LumiEntry & front()
tuple result
Definition: query.py:137
void logFileAction(const std::string &msg, const std::string &fileName="") const
static void fillDescription(ParameterSetDescription &d)
unsigned int runNumber()
unsigned int lastLumiFound()
std::chrono::high_resolution_clock::time_point lastLumiLoad_
std::chrono::high_resolution_clock::time_point runPathLastCollect_
unsigned long delayMillis_
std::string streamLabel_
static EorEntry load_json(const std::string &filename)
tuple filename
Definition: lut2db_cfg.py:20
Definition: DDAxes.h:10
void collect(bool ignoreTimers)
unsigned int currentLumi_
unsigned int runNumber_