CMS 3D CMS Logo

DQMFileIterator.cc
Go to the documentation of this file.
1 #include "DQMFileIterator.h"
4 
5 #include <boost/regex.hpp>
6 #include <boost/format.hpp>
7 #include <boost/range.hpp>
8 #include <boost/filesystem.hpp>
9 #include <boost/algorithm/string/predicate.hpp>
10 
11 #include <memory>
12 #include <string>
13 #include <iterator>
14 #include <boost/property_tree/json_parser.hpp>
15 #include <boost/property_tree/ptree.hpp>
16 #include <boost/algorithm/string.hpp>
17 
18 namespace dqmservices {
19 
21  const std::string& run_path,
22  const std::string& filename, int lumiNumber, int datafn_position) {
23  boost::property_tree::ptree pt;
24  read_json(filename, pt);
25 
27  lumi.filename = filename;
28  lumi.run_path = run_path;
29 
30  lumi.n_events_processed = std::next(pt.get_child("data").begin(), 0)
31  ->second.get_value<std::size_t>();
32 
33  lumi.n_events_accepted = std::next(pt.get_child("data").begin(), 1)
34  ->second.get_value<std::size_t>();
35 
36  lumi.file_ls = lumiNumber;
37 
38  if (datafn_position >= 0) {
39  lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)
40  ->second.get_value<std::string>();
41  }
42 
43  return lumi;
44 }
45 
47  if (boost::starts_with(datafn, "/")) return datafn;
48 
50  p /= datafn;
51  return p.string();
52 }
53 
55  return filename;
56 }
57 
58 // Contents of Eor json file are ignored for the moment.
59 // This function will not be called.
61  const std::string& run_path,
62  const std::string& filename) {
63  boost::property_tree::ptree pt;
64  read_json(filename, pt);
65 
66  EorEntry eor;
67  eor.filename = filename;
68  eor.run_path = run_path;
69 
70  // We rely on n_events to be the first item on the array...
71  eor.n_events = std::next(pt.get_child("data").begin(), 1)
72  ->second.get_value<std::size_t>();
73  eor.n_lumi = std::next(pt.get_child("data").begin(), 2)
74  ->second.get_value<std::size_t>();
75 
76  eor.loaded = true;
77  return eor;
78 }
79 
81  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
82  datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
83  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
84  streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
85  delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
87  pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
88 
89  // scan one mode
90  flagScanOnce_ = pset.getUntrackedParameter<bool>("scanOnce");
91 
93  reset();
94 }
95 
97 
99  runPath_.clear();
100 
101  std::vector<std::string> tokens;
102  boost::split(tokens, runInputDir_, boost::is_any_of(":"));
103 
104  for (auto token: tokens) {
105  runPath_.push_back(boost::str(boost::format("%s/run%06d") % token % runNumber_));
106  }
107 
108  eor_.loaded = false;
109  state_ = State::OPEN;
110  nextLumiNumber_ = 1;
111  lumiSeen_.clear();
112  filesSeen_.clear();
113 
115 
116  collect(true);
117  update_state();
118 
119  if (mon_.isAvailable()) {
120  ptree doc;
121  doc.put("run", runNumber_);
122  doc.put("next_lumi", nextLumiNumber_);
123  doc.put("fi_state", std::to_string(state_));
124  mon_->outputUpdate(doc);
125  }
126 }
127 
129 
132  advanceToLumi(nextLumiNumber_ + 1, "open: file iterator");
133  return lumi;
134 }
135 
137  if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
138  return true;
139  }
140 
141  return false;
142 }
143 
144 unsigned int DQMFileIterator::runNumber() { return runNumber_; }
145 
147  if (!lumiSeen_.empty()) {
148  return lumiSeen_.rbegin()->first;
149  }
150 
151  return 1;
152 }
153 
154 void DQMFileIterator::advanceToLumi(unsigned int lumi, std::string reason) {
155  using boost::property_tree::ptree;
156  using boost::str;
157 
158  unsigned int currentLumi = nextLumiNumber_;
159 
162 
163  auto iter = lumiSeen_.lower_bound(currentLumi);
164 
165  while ((iter != lumiSeen_.end()) && ((iter->first) < nextLumiNumber_)) {
166  iter->second.state = reason;
167  monUpdateLumi(iter->second);
168 
169  ++iter;
170  }
171 
172  if (mon_.isAvailable()) {
173  // report the successful lumi file open
174  ptree doc;
175  doc.put("next_lumi", nextLumiNumber_);
176  mon_->outputUpdate(doc);
177  }
178 }
179 
181  if (!mon_.isAvailable()) return;
182 
183  ptree doc;
184  doc.put(str(boost::format("extra.lumi_seen.lumi%06d") % lumi.file_ls),
185  lumi.state);
186  mon_->outputUpdate(doc);
187 }
188 
189 std::time_t DQMFileIterator::mtimeHash() const {
190  std::time_t mtime_now = 0;
191 
192  for (auto path : runPath_) {
193  if (!boost::filesystem::exists(path))
194  continue;
195 
196  mtime_now = mtime_now ^ boost::filesystem::last_write_time(path);
197  }
198 
199  return mtime_now;
200 }
201 
202 void DQMFileIterator::collect(bool ignoreTimers) {
203  // search filesystem to find available lumi section files
204  // or the end of run files
205 
207  auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
209 
210  // don't refresh if it's too soon
211  if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
212  return;
213  }
214 
215  // check if directory changed
216  std::time_t mtime_now = mtimeHash();
217 
218  if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) &&
219  (mtime_now == runPathMTime_)) {
220  // logFileAction("Directory hasn't changed.");
221  return;
222  } else {
223  // logFileAction("Directory changed, updating.");
224  }
225 
226  runPathMTime_ = mtime_now;
228 
229  using boost::filesystem::directory_iterator;
230  using boost::filesystem::directory_entry;
231 
232  std::string fn_eor;
233 
234  for (auto runPath : runPath_) {
235  if (!boost::filesystem::exists(runPath)) {
236  logFileAction("Directory does not exist: ", runPath);
237 
238  continue;
239  }
240 
241  directory_iterator dend;
242  for (directory_iterator di(runPath); di != dend; ++di) {
243  const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
244 
245  const std::string filename = di->path().filename().string();
246  const std::string fn = di->path().string();
247 
248  if (filesSeen_.find(filename) != filesSeen_.end()) {
249  continue;
250  }
251 
252  boost::smatch result;
253  if (boost::regex_match(filename, result, fn_re)) {
254  unsigned int run = std::stoi(result[1]);
255  unsigned int lumi = std::stoi(result[2]);
256  std::string label = result[3];
257 
258  filesSeen_.insert(filename);
259 
260  if (run != runNumber_) continue;
261 
262  // check if this is EoR
263  // for various reasons we have to load it after all other files
264  if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
265  fn_eor = fn;
266  continue;
267  }
268 
269  // check if lumi is loaded
270  if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
271  continue; // already loaded
272  }
273 
274  // check if this belongs to us
275  if (label != streamLabel_) {
276  std::string msg("Found and skipped json file (stream label mismatch, ");
277  msg += label + " [files] != " + streamLabel_ + " [config]";
278  msg += "): ";
279  logFileAction(msg, fn);
280  continue;
281  }
282 
283  try {
284  LumiEntry lumi_jsn = LumiEntry::load_json(runPath, fn, lumi, datafnPosition_);
285  lumiSeen_.emplace(lumi, lumi_jsn);
286  logFileAction("Found and loaded json file: ", fn);
287 
288  monUpdateLumi(lumi_jsn);
289  } catch (const std::exception& e) {
290  // don't reset the mtime, keep it waiting
291  filesSeen_.erase(filename);
292 
293  std::string msg("Found, tried to load the json, but failed (");
294  msg += e.what();
295  msg += "): ";
296  logFileAction(msg, fn);
297  }
298  }
299  }
300  }
301 
302  if ((!fn_eor.empty()) or flagScanOnce_) {
303  if (!fn_eor.empty()) {
304  logFileAction("EoR file found: ", fn_eor);
305  }
306 
307  // @TODO load EoR files correctly
308  // eor_ = EorEntry::load_json(fn_eor);
309  // logFileAction("Loaded eor file: ", fn_eor);
310 
311  // for now , set n_lumi to the highest _found_ lumi
312  eor_.loaded = true;
313 
314  if (lumiSeen_.empty()) {
315  eor_.n_lumi = 0;
316  } else {
317  eor_.n_lumi = lumiSeen_.rbegin()->first;
318  }
319  }
320 }
321 
323  using std::chrono::high_resolution_clock;
324  using std::chrono::duration_cast;
325  using std::chrono::milliseconds;
326 
327  State old_state = state_;
328 
329  // in scanOnce mode we don't do repeated scans
330  // whatever found at reset() is be used
331  if (!flagScanOnce_) {
332  collect(false);
333  }
334 
335  if ((state_ == State::OPEN) && (eor_.loaded)) {
336  state_ = State::EOR_CLOSING;
337  }
338 
339  // special case for missing lumi files
340  // skip to the next available, but after the timeout
341  if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
342  auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
343  if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
344  auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
345  auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
346 
347  if (elapsed_ms >= nextLumiTimeoutMillis_) {
348  std::string msg("Timeout reached, skipping lumisection(s) ");
349  msg += std::to_string(nextLumiNumber_) + " .. " +
350  std::to_string(iter->first - 1);
351  msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
352  logFileAction(msg);
353 
354  advanceToLumi(iter->first, "skipped: timeout");
355  }
356  }
357  }
358 
359  if (state_ == State::EOR_CLOSING) {
360  // check if we parsed all lumis
361  // n_lumi is both last lumi and the number of lumi
362  // since lumis are indexed from 1
363 
364  // after all lumi have been pop()'ed
365  // current lumi will become larger than the last lumi
366  if (nextLumiNumber_ > eor_.n_lumi) {
367  state_ = State::EOR;
368  }
369  }
370 
371  if (state_ != old_state) {
372  logFileAction("Streamer state changed: ",
373  std::to_string(old_state) + "->" + std::to_string(state_));
374 
375  if (mon_) {
376  ptree doc;
377  doc.put("fi_state", std::to_string(state_));
378  mon_->outputUpdate(doc);
379  }
380  }
381 }
382 
384  const std::string& fileName) const {
385  edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay()
386  << " " << msg << fileName;
388 }
389 
391  const std::string& msg) {
392  if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
393  lumiSeen_[lumi.file_ls].state = msg;
394 
396  } else {
397  logFileAction("Internal error: referenced lumi is not the map.");
398  }
399 }
400 
402  if (mon_.isAvailable()) mon_->keepAlive();
403 
404  usleep(delayMillis_ * 1000);
405 }
406 
408  desc.addUntracked<unsigned int>("runNumber")
409  ->setComment("Run number passed via configuration file.");
410 
411  desc.addUntracked<unsigned int>("datafnPosition", 3)
412  ->setComment(
413  "Data filename position in the positional arguments array 'data' in "
414  "json file.");
415 
416  desc.addUntracked<std::string>("streamLabel")
417  ->setComment("Stream label used in json discovery.");
418 
419  desc.addUntracked<uint32_t>("delayMillis")
420  ->setComment("Number of milliseconds to wait between file checks.");
421 
422  desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)
423  ->setComment(
424  "Number of milliseconds to wait before switching to the next lumi "
425  "section if the current is missing, -1 to disable.");
426 
427  desc.addUntracked<bool>("scanOnce", false)
428  ->setComment(
429  "Don't repeat file scans: use what was found during the initial scan. "
430  "EOR file is ignored and the state is set to 'past end of run'.");
431 
432  desc.addUntracked<std::string>("runInputDir")
433  ->setComment("Directory where the DQM files will appear.");
434 }
435 
436 } /* end of namespace */
T getUntrackedParameter(std::string const &, T const &) const
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void FlushMessageLog()
edm::Service< DQMMonitoringService > mon_
void logLumiState(const LumiEntry &lumi, const std::string &msg)
std::map< unsigned int, LumiEntry > lumiSeen_
static LumiEntry load_json(const std::string &run_path, const std::string &filename, int lumiNumber, int datafn_position)
U second(std::pair< T, U > const &p)
DQMFileIterator(edm::ParameterSet const &pset)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::vector< std::string > runPath_
void logFileAction(const std::string &msg, const std::string &fileName="") const
format
Some error handling for the usage.
std::chrono::high_resolution_clock::time_point lastLumiLoad_
static EorEntry load_json(const std::string &run_path, const std::string &filename)
tuple msg
Definition: mps_check.py:277
void monUpdateLumi(const LumiEntry &lumi)
std::chrono::high_resolution_clock::time_point runPathLastCollect_
std::unordered_set< std::string > filesSeen_
std::time_t mtimeHash() const
void advanceToLumi(unsigned int lumi, std::string reason)
double split
Definition: MVATrainer.cc:139
static void fillDescription(edm::ParameterSetDescription &d)
void collect(bool ignoreTimers)