CMS 3D CMS Logo

DQMFileIterator.cc
Go to the documentation of this file.
1 #include <filesystem>
2 #include <iterator>
3 #include <memory>
4 #include <string>
5 
6 #include <boost/algorithm/string.hpp>
7 #include <boost/algorithm/string/predicate.hpp>
8 #include <boost/property_tree/json_parser.hpp>
9 #include <boost/property_tree/ptree.hpp>
10 #include <boost/range.hpp>
11 #include <boost/regex.hpp>
12 #include <fmt/printf.h>
13 
16 #include "DQMFileIterator.h"
17 
18 namespace dqmservices {
19 
21  const std::string& filename,
22  int lumiNumber,
23  int datafn_position) {
24  boost::property_tree::ptree pt;
25  read_json(filename, pt);
26 
28  lumi.filename = filename;
29  lumi.run_path = run_path;
30 
31  lumi.n_events_processed = std::next(pt.get_child("data").begin(), 0)->second.get_value<std::size_t>();
32 
33  lumi.n_events_accepted = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
34 
35  lumi.file_ls = lumiNumber;
36 
37  if (datafn_position >= 0) {
38  lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)->second.get_value<std::string>();
39  }
40 
41  return lumi;
42  }
43 
45  if (boost::starts_with(datafn, "/"))
46  return datafn;
47 
48  std::filesystem::path p(run_path);
49  p /= datafn;
50  return p.string();
51  }
52 
54 
55  // Contents of Eor json file are ignored for the moment.
56  // This function will not be called.
58  const std::string& filename) {
59  boost::property_tree::ptree pt;
60  read_json(filename, pt);
61 
62  EorEntry eor;
63  eor.filename = filename;
64  eor.run_path = run_path;
65 
66  // We rely on n_events to be the first item on the array...
67  eor.n_events = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
68  eor.n_lumi = std::next(pt.get_child("data").begin(), 2)->second.get_value<std::size_t>();
69 
70  eor.loaded = true;
71  return eor;
72  }
73 
75  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
76  datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
77  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
78  streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
79  delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
80  nextLumiTimeoutMillis_ = pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
81 
82  // scan one mode
83  flagScanOnce_ = pset.getUntrackedParameter<bool>("scanOnce");
84 
86  reset();
87  }
88 
90 
92  runPath_.clear();
93 
94  std::vector<std::string> tokens;
95  boost::split(tokens, runInputDir_, boost::is_any_of(":"));
96 
97  for (const auto& token : tokens) {
98  runPath_.push_back(fmt::sprintf("%s/run%06d", token, runNumber_));
99  }
100 
101  eor_.loaded = false;
102  state_ = State::OPEN;
103  nextLumiNumber_ = 1;
104  lumiSeen_.clear();
105  filesSeen_.clear();
106 
108 
109  collect(true);
110  update_state();
111 
112  if (mon_.isAvailable()) {
113  ptree doc;
114  doc.put("run", runNumber_);
115  doc.put("next_lumi", nextLumiNumber_);
116  doc.put("fi_state", std::to_string(state_));
117  mon_->outputUpdate(doc);
118  }
119  }
120 
122 
125  advanceToLumi(nextLumiNumber_ + 1, "open: file iterator");
126  return lumi;
127  }
128 
130  if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
131  return true;
132  }
133 
134  return false;
135  }
136 
137  unsigned int DQMFileIterator::runNumber() { return runNumber_; }
138 
140  if (!lumiSeen_.empty()) {
141  return lumiSeen_.rbegin()->first;
142  }
143 
144  return 1;
145  }
146 
148  using boost::str;
149  using boost::property_tree::ptree;
150 
151  unsigned int currentLumi = nextLumiNumber_;
152 
155 
156  auto iter = lumiSeen_.lower_bound(currentLumi);
157 
158  while ((iter != lumiSeen_.end()) && ((iter->first) < nextLumiNumber_)) {
159  iter->second.state = reason;
160  monUpdateLumi(iter->second);
161 
162  ++iter;
163  }
164 
165  if (mon_.isAvailable()) {
166  // report the successful lumi file open
167  ptree doc;
168  doc.put("next_lumi", nextLumiNumber_);
169  mon_->outputUpdate(doc);
170  }
171  }
172 
174  if (!mon_.isAvailable())
175  return;
176 
177  ptree doc;
178  doc.put(fmt::sprintf("extra.lumi_seen.lumi%06d", lumi.file_ls), lumi.state);
179  mon_->outputUpdate(doc);
180  }
181 
182  unsigned DQMFileIterator::mtimeHash() const {
183  unsigned mtime_now = 0;
184 
185  for (const auto& path : runPath_) {
186  if (!std::filesystem::exists(path))
187  continue;
188 
189  auto write_time = std::filesystem::last_write_time(path);
190  mtime_now =
191  mtime_now ^ std::chrono::duration_cast<std::chrono::microseconds>(write_time.time_since_epoch()).count();
192  }
193 
194  return mtime_now;
195  }
196 
197  void DQMFileIterator::collect(bool ignoreTimers) {
198  // search filesystem to find available lumi section files
199  // or the end of run files
200 
202  auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - runPathLastCollect_).count();
203 
204  // don't refresh if it's too soon
205  if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
206  return;
207  }
208 
209  // check if directory changed
210  auto mtime_now = mtimeHash();
211 
212  if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) && (mtime_now == runPathMTime_)) {
213  // logFileAction("Directory hasn't changed.");
214  return;
215  } else {
216  // logFileAction("Directory changed, updating.");
217  }
218 
219  runPathMTime_ = mtime_now;
221 
222  using std::filesystem::directory_entry;
223  using std::filesystem::directory_iterator;
224 
225  std::string fn_eor;
226 
227  for (const auto& runPath : runPath_) {
228  if (!std::filesystem::exists(runPath)) {
229  logFileAction("Directory does not exist: ", runPath);
230 
231  continue;
232  }
233 
234  directory_iterator dend;
235  for (directory_iterator di(runPath); di != dend; ++di) {
236  const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
237 
238  const std::string filename = di->path().filename().string();
239  const std::string fn = di->path().string();
240 
241  if (filesSeen_.find(filename) != filesSeen_.end()) {
242  continue;
243  }
244 
245  boost::smatch result;
246  if (boost::regex_match(filename, result, fn_re)) {
247  unsigned int run = std::stoi(result[1]);
248  unsigned int lumi = std::stoi(result[2]);
249  std::string label = result[3];
250 
251  filesSeen_.insert(filename);
252 
253  if (run != runNumber_)
254  continue;
255 
256  // check if this is EoR
257  // for various reasons we have to load it after all other files
258  if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
259  fn_eor = fn;
260  continue;
261  }
262 
263  // check if lumi is loaded
264  if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
265  continue; // already loaded
266  }
267 
268  // check if this belongs to us
269  if (label != streamLabel_) {
270  std::string msg("Found and skipped json file (stream label mismatch, ");
271  msg += label + " [files] != " + streamLabel_ + " [config]";
272  msg += "): ";
273  logFileAction(msg, fn);
274  continue;
275  }
276 
277  try {
278  LumiEntry lumi_jsn = LumiEntry::load_json(runPath, fn, lumi, datafnPosition_);
279  lumiSeen_.emplace(lumi, lumi_jsn);
280  logFileAction("Found and loaded json file: ", fn);
281 
282  monUpdateLumi(lumi_jsn);
283  } catch (const std::exception& e) {
284  // don't reset the mtime, keep it waiting
285  filesSeen_.erase(filename);
286 
287  std::string msg("Found, tried to load the json, but failed (");
288  msg += e.what();
289  msg += "): ";
290  logFileAction(msg, fn);
291  }
292  }
293  }
294  }
295 
296  if ((!fn_eor.empty()) or flagScanOnce_) {
297  if (!fn_eor.empty()) {
298  logFileAction("EoR file found: ", fn_eor);
299  }
300 
301  // @TODO load EoR files correctly
302  // eor_ = EorEntry::load_json(fn_eor);
303  // logFileAction("Loaded eor file: ", fn_eor);
304 
305  // for now , set n_lumi to the highest _found_ lumi
306  eor_.loaded = true;
307 
308  if (lumiSeen_.empty()) {
309  eor_.n_lumi = 0;
310  } else {
311  eor_.n_lumi = lumiSeen_.rbegin()->first;
312  }
313  }
314  }
315 
317  using std::chrono::duration_cast;
318  using std::chrono::high_resolution_clock;
319  using std::chrono::milliseconds;
320 
321  State old_state = state_;
322 
323  // in scanOnce mode we don't do repeated scans
324  // whatever found at reset() is be used
325  if (!flagScanOnce_) {
326  collect(false);
327  }
328 
329  if ((state_ == State::OPEN) && (eor_.loaded)) {
330  state_ = State::EOR_CLOSING;
331  }
332 
333  // special case for missing lumi files
334  // skip to the next available, but after the timeout
335  if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
336  auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
337  if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
338  auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
339  auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
340 
341  if (elapsed_ms >= nextLumiTimeoutMillis_) {
342  std::string msg("Timeout reached, skipping lumisection(s) ");
343  msg += std::to_string(nextLumiNumber_) + " .. " + std::to_string(iter->first - 1);
344  msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
346 
347  advanceToLumi(iter->first, "skipped: timeout");
348  }
349  }
350  }
351 
352  if (state_ == State::EOR_CLOSING) {
353  // check if we parsed all lumis
354  // n_lumi is both last lumi and the number of lumi
355  // since lumis are indexed from 1
356 
357  // after all lumi have been pop()'ed
358  // current lumi will become larger than the last lumi
359  if (nextLumiNumber_ > eor_.n_lumi) {
360  state_ = State::EOR;
361  }
362  }
363 
364  if (state_ != old_state) {
365  logFileAction("Streamer state changed: ", std::to_string(old_state) + "->" + std::to_string(state_));
366 
367  if (mon_) {
368  ptree doc;
369  doc.put("fi_state", std::to_string(state_));
370  mon_->outputUpdate(doc);
371  }
372  }
373  }
374 
376  edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay() << " " << msg << fileName;
378  }
379 
381  if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
382  lumiSeen_[lumi.file_ls].state = msg;
383 
384  monUpdateLumi(lumiSeen_[lumi.file_ls]);
385  } else {
386  logFileAction("Internal error: referenced lumi is not the map.");
387  }
388  }
389 
391  if (mon_.isAvailable())
392  mon_->keepAlive();
393 
394  usleep(delayMillis_ * 1000);
395  }
396 
398  desc.addUntracked<unsigned int>("runNumber")->setComment("Run number passed via configuration file.");
399 
400  desc.addUntracked<unsigned int>("datafnPosition", 3)
401  ->setComment(
402  "Data filename position in the positional arguments array 'data' in "
403  "json file.");
404 
405  desc.addUntracked<std::string>("streamLabel")->setComment("Stream label used in json discovery.");
406 
407  desc.addUntracked<uint32_t>("delayMillis")->setComment("Number of milliseconds to wait between file checks.");
408 
409  desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)
410  ->setComment(
411  "Number of milliseconds to wait before switching to the next lumi "
412  "section if the current is missing, -1 to disable.");
413 
414  desc.addUntracked<bool>("scanOnce", false)
415  ->setComment(
416  "Don't repeat file scans: use what was found during the initial scan. "
417  "EOR file is ignored and the state is set to 'past end of run'.");
418 
419  desc.addUntracked<std::string>("runInputDir")->setComment("Directory where the DQM files will appear.");
420  }
421 
422 } // namespace dqmservices
void FlushMessageLog()
edm::Service< DQMMonitoringService > mon_
void logLumiState(const LumiEntry &lumi, const std::string &msg)
std::string to_string(const V &value)
Definition: OMSAccess.h:71
std::map< unsigned int, LumiEntry > lumiSeen_
static LumiEntry load_json(const std::string &run_path, const std::string &filename, int lumiNumber, int datafn_position)
U second(std::pair< T, U > const &p)
char const * label
DQMFileIterator(edm::ParameterSet const &pset)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::vector< std::string > runPath_
std::chrono::high_resolution_clock::time_point lastLumiLoad_
void logFileAction(const std::string &msg, const std::string &fileName="") const
static EorEntry load_json(const std::string &run_path, const std::string &filename)
tuple msg
Definition: mps_check.py:285
void monUpdateLumi(const LumiEntry &lumi)
std::chrono::high_resolution_clock::time_point runPathLastCollect_
std::unordered_set< std::string > filesSeen_
Log< level::System, true > LogAbsolute
#define str(s)
void advanceToLumi(unsigned int lumi, std::string reason)
static void fillDescription(edm::ParameterSetDescription &d)
void collect(bool ignoreTimers)