CMS 3D CMS Logo

DQMFileIterator.cc
Go to the documentation of this file.
1 #include "DQMFileIterator.h"
2 #include "DQMMonitoringService.h"
3 
8 
9 #include <filesystem>
10 
11 #include <boost/algorithm/string.hpp>
12 #include <boost/algorithm/string/predicate.hpp>
13 #include <boost/property_tree/json_parser.hpp>
14 #include <boost/property_tree/ptree.hpp>
15 #include <boost/regex.hpp>
16 
17 #include <fmt/printf.h>
18 
19 namespace dqmservices {
20 
22  const std::string& filename,
23  int lumiNumber,
24  int datafn_position) {
25  boost::property_tree::ptree pt;
26  read_json(filename, pt);
27 
29  lumi.filename = filename;
30  lumi.run_path = run_path;
31 
32  lumi.n_events_processed = std::next(pt.get_child("data").begin(), 0)->second.get_value<std::size_t>();
33 
34  lumi.n_events_accepted = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
35 
36  lumi.file_ls = lumiNumber;
37 
38  if (datafn_position >= 0) {
39  lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)->second.get_value<std::string>();
40  }
41 
42  return lumi;
43  }
44 
46  if (boost::starts_with(datafn, "/"))
47  return datafn;
48 
49  std::filesystem::path p(run_path);
50  p /= datafn;
51  return p.string();
52  }
53 
55 
56  // Contents of Eor json file are ignored for the moment.
57  // This function will not be called.
59  const std::string& filename) {
60  boost::property_tree::ptree pt;
61  read_json(filename, pt);
62 
63  EorEntry eor;
64  eor.filename = filename;
65  eor.run_path = run_path;
66 
67  // We rely on n_events to be the first item on the array...
68  eor.n_events = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
69  eor.n_lumi = std::next(pt.get_child("data").begin(), 2)->second.get_value<std::size_t>();
70 
71  eor.loaded = true;
72  return eor;
73  }
74 
76  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
77  datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
78  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
79  streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
80  delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
81  nextLumiTimeoutMillis_ = pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
82 
83  // scan one mode
84  flagScanOnce_ = pset.getUntrackedParameter<bool>("scanOnce");
85 
87  reset();
88  }
89 
91  runPath_.clear();
92 
93  std::vector<std::string> tokens;
94  boost::split(tokens, runInputDir_, boost::is_any_of(":"));
95 
96  for (const auto& token : tokens) {
97  runPath_.push_back(fmt::sprintf("%s/run%06d", token, runNumber_));
98  }
99 
100  eor_.loaded = false;
101  state_ = State::OPEN;
102  nextLumiNumber_ = 1;
103  lumiSeen_.clear();
104  filesSeen_.clear();
105 
107 
108  collect(true);
109  update_state();
110 
111  if (mon_.isAvailable()) {
112  boost::property_tree::ptree doc;
113  doc.put("run", runNumber_);
114  doc.put("next_lumi", nextLumiNumber_);
115  doc.put("fi_state", std::to_string(state_));
116  mon_->outputUpdate(doc);
117  }
118  }
119 
122  advanceToLumi(nextLumiNumber_ + 1, "open: file iterator");
123  return lumi;
124  }
125 
127  if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
128  return true;
129  }
130 
131  return false;
132  }
133 
135  if (!lumiSeen_.empty()) {
136  return lumiSeen_.rbegin()->first;
137  }
138 
139  return 1;
140  }
141 
143  unsigned int currentLumi = nextLumiNumber_;
144 
147 
148  auto iter = lumiSeen_.lower_bound(currentLumi);
149 
150  while ((iter != lumiSeen_.end()) && ((iter->first) < nextLumiNumber_)) {
151  iter->second.state = reason;
152  monUpdateLumi(iter->second);
153 
154  ++iter;
155  }
156 
157  if (mon_.isAvailable()) {
158  // report the successful lumi file open
159  boost::property_tree::ptree doc;
160  doc.put("next_lumi", nextLumiNumber_);
161  mon_->outputUpdate(doc);
162  }
163  }
164 
166  if (!mon_.isAvailable())
167  return;
168 
169  boost::property_tree::ptree doc;
170  doc.put(fmt::sprintf("extra.lumi_seen.lumi%06d", lumi.file_ls), lumi.state);
171  mon_->outputUpdate(doc);
172  }
173 
174  unsigned DQMFileIterator::mtimeHash() const {
175  unsigned mtime_now = 0;
176 
177  for (const auto& path : runPath_) {
178  if (!std::filesystem::exists(path))
179  continue;
180 
181  auto write_time = std::filesystem::last_write_time(path);
182  mtime_now =
183  mtime_now ^ std::chrono::duration_cast<std::chrono::microseconds>(write_time.time_since_epoch()).count();
184  }
185 
186  return mtime_now;
187  }
188 
189  void DQMFileIterator::collect(bool ignoreTimers) {
190  // search filesystem to find available lumi section files
191  // or the end of run files
192 
194  auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - runPathLastCollect_).count();
195 
196  // don't refresh if it's too soon
197  if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
198  return;
199  }
200 
201  // check if directory changed
202  auto mtime_now = mtimeHash();
203 
204  if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) && (mtime_now == runPathMTime_)) {
205  // logFileAction("Directory hasn't changed.");
206  return;
207  } else {
208  // logFileAction("Directory changed, updating.");
209  }
210 
211  runPathMTime_ = mtime_now;
213 
214  using std::filesystem::directory_entry;
215  using std::filesystem::directory_iterator;
216 
217  std::string fn_eor;
218 
219  for (const auto& runPath : runPath_) {
220  if (!std::filesystem::exists(runPath)) {
221  logFileAction("Directory does not exist: ", runPath);
222 
223  continue;
224  }
225 
226  directory_iterator dend;
227  for (directory_iterator di(runPath); di != dend; ++di) {
228  const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
229 
230  const std::string filename = di->path().filename().string();
231  const std::string fn = di->path().string();
232 
233  if (filesSeen_.find(filename) != filesSeen_.end()) {
234  continue;
235  }
236 
237  boost::smatch result;
238  if (boost::regex_match(filename, result, fn_re)) {
239  unsigned int run = std::stoi(result[1]);
240  unsigned int lumi = std::stoi(result[2]);
241  std::string label = result[3];
242 
243  filesSeen_.insert(filename);
244 
245  if (run != runNumber_)
246  continue;
247 
248  // check if this is EoR
249  // for various reasons we have to load it after all other files
250  if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
251  fn_eor = fn;
252  continue;
253  }
254 
255  // check if lumi is loaded
256  if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
257  continue; // already loaded
258  }
259 
260  // check if this belongs to us
261  if (label != streamLabel_) {
262  std::string msg("Found and skipped json file (stream label mismatch, ");
263  msg += label + " [files] != " + streamLabel_ + " [config]";
264  msg += "): ";
265  logFileAction(msg, fn);
266  continue;
267  }
268 
269  try {
270  LumiEntry lumi_jsn = LumiEntry::load_json(runPath, fn, lumi, datafnPosition_);
271  lumiSeen_.emplace(lumi, lumi_jsn);
272  logFileAction("Found and loaded json file: ", fn);
273 
274  monUpdateLumi(lumi_jsn);
275  } catch (const std::exception& e) {
276  // don't reset the mtime, keep it waiting
277  filesSeen_.erase(filename);
278 
279  std::string msg("Found, tried to load the json, but failed (");
280  msg += e.what();
281  msg += "): ";
282  logFileAction(msg, fn);
283  }
284  }
285  }
286  }
287 
288  if ((!fn_eor.empty()) or flagScanOnce_) {
289  if (!fn_eor.empty()) {
290  logFileAction("EoR file found: ", fn_eor);
291  }
292 
293  // @TODO load EoR files correctly
294  // eor_ = EorEntry::load_json(fn_eor);
295  // logFileAction("Loaded eor file: ", fn_eor);
296 
297  // for now , set n_lumi to the highest _found_ lumi
298  eor_.loaded = true;
299 
300  if (lumiSeen_.empty()) {
301  eor_.n_lumi = 0;
302  } else {
303  eor_.n_lumi = lumiSeen_.rbegin()->first;
304  }
305  }
306  }
307 
309  using std::chrono::duration_cast;
310  using std::chrono::high_resolution_clock;
311  using std::chrono::milliseconds;
312 
313  State old_state = state_;
314 
315  // in scanOnce mode we don't do repeated scans
316  // whatever found at reset() is be used
317  if (!flagScanOnce_) {
318  collect(false);
319  }
320 
321  if ((state_ == State::OPEN) && (eor_.loaded)) {
322  state_ = State::EOR_CLOSING;
323  }
324 
325  // special case for missing lumi files
326  // skip to the next available, but after the timeout
327  if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
328  auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
329  if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
330  auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
331  auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
332 
333  if (elapsed_ms >= nextLumiTimeoutMillis_) {
334  std::string msg("Timeout reached, skipping lumisection(s) ");
335  msg += std::to_string(nextLumiNumber_) + " .. " + std::to_string(iter->first - 1);
336  msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
338 
339  advanceToLumi(iter->first, "skipped: timeout");
340  }
341  }
342  }
343 
344  if (state_ == State::EOR_CLOSING) {
345  // check if we parsed all lumis
346  // n_lumi is both last lumi and the number of lumi
347  // since lumis are indexed from 1
348 
349  // after all lumi have been pop()'ed
350  // current lumi will become larger than the last lumi
351  if (nextLumiNumber_ > eor_.n_lumi) {
352  state_ = State::EOR;
353  }
354  }
355 
356  if (state_ != old_state) {
357  logFileAction("Streamer state changed: ", std::to_string(old_state) + "->" + std::to_string(state_));
358 
359  if (mon_) {
360  boost::property_tree::ptree doc;
361  doc.put("fi_state", std::to_string(state_));
362  mon_->outputUpdate(doc);
363  }
364  }
365  }
366 
368  edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay() << " " << msg << fileName;
370  }
371 
373  if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
374  lumiSeen_[lumi.file_ls].state = msg;
375 
376  monUpdateLumi(lumiSeen_[lumi.file_ls]);
377  } else {
378  logFileAction("Internal error: referenced lumi is not the map.");
379  }
380  }
381 
383  if (mon_.isAvailable())
384  mon_->keepAlive();
385 
386  usleep(delayMillis_ * 1000);
387  }
388 
390  desc.addUntracked<unsigned int>("runNumber")->setComment("Run number passed via configuration file.");
391 
392  desc.addUntracked<unsigned int>("datafnPosition", 3)
393  ->setComment(
394  "Data filename position in the positional arguments array 'data' in "
395  "json file.");
396 
397  desc.addUntracked<std::string>("streamLabel")->setComment("Stream label used in json discovery.");
398 
399  desc.addUntracked<uint32_t>("delayMillis")->setComment("Number of milliseconds to wait between file checks.");
400 
401  desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)
402  ->setComment(
403  "Number of milliseconds to wait before switching to the next lumi "
404  "section if the current is missing, -1 to disable.");
405 
406  desc.addUntracked<bool>("scanOnce", false)
407  ->setComment(
408  "Don't repeat file scans: use what was found during the initial scan. "
409  "EOR file is ignored and the state is set to 'past end of run'.");
410 
411  desc.addUntracked<std::string>("runInputDir")->setComment("Directory where the DQM files will appear.");
412  }
413 
414 } // namespace dqmservices
void FlushMessageLog()
edm::Service< DQMMonitoringService > mon_
void logLumiState(const LumiEntry &lumi, const std::string &msg)
std::map< unsigned int, LumiEntry > lumiSeen_
static LumiEntry load_json(const std::string &run_path, const std::string &filename, int lumiNumber, int datafn_position)
static std::string to_string(const XMLCh *ch)
U second(std::pair< T, U > const &p)
char const * label
DQMFileIterator(edm::ParameterSet const &pset)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::vector< std::string > runPath_
std::chrono::high_resolution_clock::time_point lastLumiLoad_
void logFileAction(const std::string &msg, const std::string &fileName="") const
static EorEntry load_json(const std::string &run_path, const std::string &filename)
tuple msg
Definition: mps_check.py:286
void monUpdateLumi(const LumiEntry &lumi)
std::chrono::high_resolution_clock::time_point runPathLastCollect_
std::unordered_set< std::string > filesSeen_
Log< level::System, true > LogAbsolute
void advanceToLumi(unsigned int lumi, std::string reason)
static void fillDescription(edm::ParameterSetDescription &d)
void collect(bool ignoreTimers)