CMS 3D CMS Logo

DQMFileIterator.cc
Go to the documentation of this file.
1 #include "DQMFileIterator.h"
4 
5 #include <boost/regex.hpp>
6 #include <boost/format.hpp>
7 #include <boost/range.hpp>
8 #include <boost/filesystem.hpp>
9 #include <boost/algorithm/string/predicate.hpp>
10 
11 #include <memory>
12 #include <string>
13 #include <iterator>
14 #include <boost/property_tree/json_parser.hpp>
15 #include <boost/property_tree/ptree.hpp>
16 #include <boost/algorithm/string.hpp>
17 
18 namespace dqmservices {
19 
21  const std::string& filename,
22  int lumiNumber,
23  int datafn_position) {
24  boost::property_tree::ptree pt;
25  read_json(filename, pt);
26 
28  lumi.filename = filename;
29  lumi.run_path = run_path;
30 
31  lumi.n_events_processed = std::next(pt.get_child("data").begin(), 0)->second.get_value<std::size_t>();
32 
33  lumi.n_events_accepted = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
34 
35  lumi.file_ls = lumiNumber;
36 
37  if (datafn_position >= 0) {
38  lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)->second.get_value<std::string>();
39  }
40 
41  return lumi;
42  }
43 
45  if (boost::starts_with(datafn, "/"))
46  return datafn;
47 
49  p /= datafn;
50  return p.string();
51  }
52 
54 
55  // Contents of Eor json file are ignored for the moment.
56  // This function will not be called.
58  const std::string& filename) {
59  boost::property_tree::ptree pt;
60  read_json(filename, pt);
61 
62  EorEntry eor;
63  eor.filename = filename;
64  eor.run_path = run_path;
65 
66  // We rely on n_events to be the first item on the array...
67  eor.n_events = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
68  eor.n_lumi = std::next(pt.get_child("data").begin(), 2)->second.get_value<std::size_t>();
69 
70  eor.loaded = true;
71  return eor;
72  }
73 
75  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
76  datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
77  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
78  streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
79  delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
80  nextLumiTimeoutMillis_ = pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
81 
82  // scan one mode
83  flagScanOnce_ = pset.getUntrackedParameter<bool>("scanOnce");
84 
86  reset();
87  }
88 
90 
92  runPath_.clear();
93 
94  std::vector<std::string> tokens;
95  boost::split(tokens, runInputDir_, boost::is_any_of(":"));
96 
97  for (auto token : tokens) {
98  runPath_.push_back(boost::str(boost::format("%s/run%06d") % token % runNumber_));
99  }
100 
101  eor_.loaded = false;
102  state_ = State::OPEN;
103  nextLumiNumber_ = 1;
104  lumiSeen_.clear();
105  filesSeen_.clear();
106 
108 
109  collect(true);
110  update_state();
111 
112  if (mon_.isAvailable()) {
113  ptree doc;
114  doc.put("run", runNumber_);
115  doc.put("next_lumi", nextLumiNumber_);
116  doc.put("fi_state", std::to_string(state_));
117  mon_->outputUpdate(doc);
118  }
119  }
120 
122 
125  advanceToLumi(nextLumiNumber_ + 1, "open: file iterator");
126  return lumi;
127  }
128 
130  if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
131  return true;
132  }
133 
134  return false;
135  }
136 
137  unsigned int DQMFileIterator::runNumber() { return runNumber_; }
138 
140  if (!lumiSeen_.empty()) {
141  return lumiSeen_.rbegin()->first;
142  }
143 
144  return 1;
145  }
146 
148  using boost::str;
149  using boost::property_tree::ptree;
150 
151  unsigned int currentLumi = nextLumiNumber_;
152 
155 
156  auto iter = lumiSeen_.lower_bound(currentLumi);
157 
158  while ((iter != lumiSeen_.end()) && ((iter->first) < nextLumiNumber_)) {
159  iter->second.state = reason;
160  monUpdateLumi(iter->second);
161 
162  ++iter;
163  }
164 
165  if (mon_.isAvailable()) {
166  // report the successful lumi file open
167  ptree doc;
168  doc.put("next_lumi", nextLumiNumber_);
169  mon_->outputUpdate(doc);
170  }
171  }
172 
174  if (!mon_.isAvailable())
175  return;
176 
177  ptree doc;
178  doc.put(str(boost::format("extra.lumi_seen.lumi%06d") % lumi.file_ls), lumi.state);
179  mon_->outputUpdate(doc);
180  }
181 
182  std::time_t DQMFileIterator::mtimeHash() const {
183  std::time_t mtime_now = 0;
184 
185  for (auto path : runPath_) {
186  if (!boost::filesystem::exists(path))
187  continue;
188 
189  mtime_now = mtime_now ^ boost::filesystem::last_write_time(path);
190  }
191 
192  return mtime_now;
193  }
194 
195  void DQMFileIterator::collect(bool ignoreTimers) {
196  // search filesystem to find available lumi section files
197  // or the end of run files
198 
200  auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - runPathLastCollect_).count();
201 
202  // don't refresh if it's too soon
203  if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
204  return;
205  }
206 
207  // check if directory changed
208  std::time_t mtime_now = mtimeHash();
209 
210  if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) && (mtime_now == runPathMTime_)) {
211  // logFileAction("Directory hasn't changed.");
212  return;
213  } else {
214  // logFileAction("Directory changed, updating.");
215  }
216 
217  runPathMTime_ = mtime_now;
219 
220  using boost::filesystem::directory_entry;
221  using boost::filesystem::directory_iterator;
222 
223  std::string fn_eor;
224 
225  for (auto runPath : runPath_) {
226  if (!boost::filesystem::exists(runPath)) {
227  logFileAction("Directory does not exist: ", runPath);
228 
229  continue;
230  }
231 
232  directory_iterator dend;
233  for (directory_iterator di(runPath); di != dend; ++di) {
234  const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
235 
236  const std::string filename = di->path().filename().string();
237  const std::string fn = di->path().string();
238 
239  if (filesSeen_.find(filename) != filesSeen_.end()) {
240  continue;
241  }
242 
243  boost::smatch result;
244  if (boost::regex_match(filename, result, fn_re)) {
245  unsigned int run = std::stoi(result[1]);
246  unsigned int lumi = std::stoi(result[2]);
247  std::string label = result[3];
248 
249  filesSeen_.insert(filename);
250 
251  if (run != runNumber_)
252  continue;
253 
254  // check if this is EoR
255  // for various reasons we have to load it after all other files
256  if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
257  fn_eor = fn;
258  continue;
259  }
260 
261  // check if lumi is loaded
262  if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
263  continue; // already loaded
264  }
265 
266  // check if this belongs to us
267  if (label != streamLabel_) {
268  std::string msg("Found and skipped json file (stream label mismatch, ");
269  msg += label + " [files] != " + streamLabel_ + " [config]";
270  msg += "): ";
271  logFileAction(msg, fn);
272  continue;
273  }
274 
275  try {
276  LumiEntry lumi_jsn = LumiEntry::load_json(runPath, fn, lumi, datafnPosition_);
277  lumiSeen_.emplace(lumi, lumi_jsn);
278  logFileAction("Found and loaded json file: ", fn);
279 
280  monUpdateLumi(lumi_jsn);
281  } catch (const std::exception& e) {
282  // don't reset the mtime, keep it waiting
283  filesSeen_.erase(filename);
284 
285  std::string msg("Found, tried to load the json, but failed (");
286  msg += e.what();
287  msg += "): ";
288  logFileAction(msg, fn);
289  }
290  }
291  }
292  }
293 
294  if ((!fn_eor.empty()) or flagScanOnce_) {
295  if (!fn_eor.empty()) {
296  logFileAction("EoR file found: ", fn_eor);
297  }
298 
299  // @TODO load EoR files correctly
300  // eor_ = EorEntry::load_json(fn_eor);
301  // logFileAction("Loaded eor file: ", fn_eor);
302 
303  // for now , set n_lumi to the highest _found_ lumi
304  eor_.loaded = true;
305 
306  if (lumiSeen_.empty()) {
307  eor_.n_lumi = 0;
308  } else {
309  eor_.n_lumi = lumiSeen_.rbegin()->first;
310  }
311  }
312  }
313 
315  using std::chrono::duration_cast;
316  using std::chrono::high_resolution_clock;
317  using std::chrono::milliseconds;
318 
319  State old_state = state_;
320 
321  // in scanOnce mode we don't do repeated scans
322  // whatever found at reset() is be used
323  if (!flagScanOnce_) {
324  collect(false);
325  }
326 
327  if ((state_ == State::OPEN) && (eor_.loaded)) {
328  state_ = State::EOR_CLOSING;
329  }
330 
331  // special case for missing lumi files
332  // skip to the next available, but after the timeout
333  if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
334  auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
335  if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
336  auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
337  auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
338 
339  if (elapsed_ms >= nextLumiTimeoutMillis_) {
340  std::string msg("Timeout reached, skipping lumisection(s) ");
341  msg += std::to_string(nextLumiNumber_) + " .. " + std::to_string(iter->first - 1);
342  msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
343  logFileAction(msg);
344 
345  advanceToLumi(iter->first, "skipped: timeout");
346  }
347  }
348  }
349 
350  if (state_ == State::EOR_CLOSING) {
351  // check if we parsed all lumis
352  // n_lumi is both last lumi and the number of lumi
353  // since lumis are indexed from 1
354 
355  // after all lumi have been pop()'ed
356  // current lumi will become larger than the last lumi
357  if (nextLumiNumber_ > eor_.n_lumi) {
358  state_ = State::EOR;
359  }
360  }
361 
362  if (state_ != old_state) {
363  logFileAction("Streamer state changed: ", std::to_string(old_state) + "->" + std::to_string(state_));
364 
365  if (mon_) {
366  ptree doc;
367  doc.put("fi_state", std::to_string(state_));
368  mon_->outputUpdate(doc);
369  }
370  }
371  }
372 
374  edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay() << " " << msg << fileName;
376  }
377 
379  if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
380  lumiSeen_[lumi.file_ls].state = msg;
381 
383  } else {
384  logFileAction("Internal error: referenced lumi is not the map.");
385  }
386  }
387 
389  if (mon_.isAvailable())
390  mon_->keepAlive();
391 
392  usleep(delayMillis_ * 1000);
393  }
394 
396  desc.addUntracked<unsigned int>("runNumber")->setComment("Run number passed via configuration file.");
397 
398  desc.addUntracked<unsigned int>("datafnPosition", 3)
399  ->setComment(
400  "Data filename position in the positional arguments array 'data' in "
401  "json file.");
402 
403  desc.addUntracked<std::string>("streamLabel")->setComment("Stream label used in json discovery.");
404 
405  desc.addUntracked<uint32_t>("delayMillis")->setComment("Number of milliseconds to wait between file checks.");
406 
407  desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)
408  ->setComment(
409  "Number of milliseconds to wait before switching to the next lumi "
410  "section if the current is missing, -1 to disable.");
411 
412  desc.addUntracked<bool>("scanOnce", false)
413  ->setComment(
414  "Don't repeat file scans: use what was found during the initial scan. "
415  "EOR file is ignored and the state is set to 'past end of run'.");
416 
417  desc.addUntracked<std::string>("runInputDir")->setComment("Directory where the DQM files will appear.");
418  }
419 
420 } // namespace dqmservices
std::vector< std::string_view > split(std::string_view, const char *)
T getUntrackedParameter(std::string const &, T const &) const
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void FlushMessageLog()
edm::Service< DQMMonitoringService > mon_
void logLumiState(const LumiEntry &lumi, const std::string &msg)
std::map< unsigned int, LumiEntry > lumiSeen_
static LumiEntry load_json(const std::string &run_path, const std::string &filename, int lumiNumber, int datafn_position)
U second(std::pair< T, U > const &p)
char const * label
DQMFileIterator(edm::ParameterSet const &pset)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
std::vector< std::string > runPath_
void logFileAction(const std::string &msg, const std::string &fileName="") const
std::chrono::high_resolution_clock::time_point lastLumiLoad_
static EorEntry load_json(const std::string &run_path, const std::string &filename)
tuple msg
Definition: mps_check.py:285
void monUpdateLumi(const LumiEntry &lumi)
std::chrono::high_resolution_clock::time_point runPathLastCollect_
std::unordered_set< std::string > filesSeen_
std::time_t mtimeHash() const
#define str(s)
void advanceToLumi(unsigned int lumi, std::string reason)
static void fillDescription(edm::ParameterSetDescription &d)
void collect(bool ignoreTimers)