CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMFileIterator.cc
Go to the documentation of this file.
1 #include "DQMFileIterator.h"
4 
5 #include <boost/regex.hpp>
6 #include <boost/format.hpp>
7 #include <boost/range.hpp>
8 #include <boost/filesystem.hpp>
9 #include <boost/algorithm/string/predicate.hpp>
10 
11 #include <memory>
12 #include <string>
13 #include <iterator>
14 #include <boost/property_tree/json_parser.hpp>
15 #include <boost/property_tree/ptree.hpp>
16 
17 namespace dqmservices {
18 
20  const std::string& filename, int lumiNumber, int datafn_position) {
21  boost::property_tree::ptree pt;
22  read_json(filename, pt);
23 
25  lumi.filename = filename;
26 
27  lumi.n_events_processed = std::next(pt.get_child("data").begin(), 0)
28  ->second.get_value<std::size_t>();
29 
30  lumi.n_events_accepted = std::next(pt.get_child("data").begin(), 1)
31  ->second.get_value<std::size_t>();
32 
33  lumi.file_ls = lumiNumber;
34 
35  if (datafn_position >= 0) {
36  lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)
37  ->second.get_value<std::string>();
38  }
39 
40  return lumi;
41 }
42 
43 // Contents of Eor json file are ignored for the moment.
44 // This function will not be called.
46  const std::string& filename) {
47  boost::property_tree::ptree pt;
48  read_json(filename, pt);
49 
50  EorEntry eor;
51  eor.filename = filename;
52 
53  // We rely on n_events to be the first item on the array...
54  eor.n_events = std::next(pt.get_child("data").begin(), 1)
55  ->second.get_value<std::size_t>();
56  eor.n_lumi = std::next(pt.get_child("data").begin(), 2)
57  ->second.get_value<std::size_t>();
58 
59  eor.loaded = true;
60  return eor;
61 }
62 
64  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
65  datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
66  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
67  streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
68  delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
70  pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
71 
72  // scan one mode
73  flagScanOnce_ = pset.getUntrackedParameter<bool>("scanOnce");
74 
76  reset();
77 }
78 
80 
82  runPath_ = str(boost::format("%s/run%06d") % runInputDir_ % runNumber_);
83 
84  eor_.loaded = false;
85  state_ = State::OPEN;
86  nextLumiNumber_ = 1;
87  lumiSeen_.clear();
88  filesSeen_.clear();
89 
91 
92  collect(true);
93  update_state();
94 
95  if (mon_.isAvailable()) {
96  ptree doc;
97  doc.put("run", runNumber_);
98  doc.put("next_lumi", nextLumiNumber_);
99  doc.put("fi_state", std::to_string(state_));
100  mon_->outputUpdate(doc);
101  }
102 }
103 
105 
108  advanceToLumi(nextLumiNumber_ + 1, "open: file iterator");
109  return lumi;
110 }
111 
113  if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
114  return true;
115  }
116 
117  return false;
118 }
119 
120 unsigned int DQMFileIterator::runNumber() { return runNumber_; }
121 
123  if (!lumiSeen_.empty()) {
124  return lumiSeen_.rbegin()->first;
125  }
126 
127  return 1;
128 }
129 
130 void DQMFileIterator::advanceToLumi(unsigned int lumi, std::string reason) {
131  using boost::property_tree::ptree;
132  using boost::str;
133 
134  unsigned int currentLumi = nextLumiNumber_;
135 
138 
139  auto iter = lumiSeen_.lower_bound(currentLumi);
140 
141  while ((iter != lumiSeen_.end()) && ((iter->first) < nextLumiNumber_)) {
142  iter->second.state = reason;
143  monUpdateLumi(iter->second);
144 
145  ++iter;
146  }
147 
148  if (mon_.isAvailable()) {
149  // report the successful lumi file open
150  ptree doc;
151  doc.put("next_lumi", nextLumiNumber_);
152  mon_->outputUpdate(doc);
153  }
154 }
155 
157  if (!mon_.isAvailable()) return;
158 
159  ptree doc;
160  doc.put(str(boost::format("extra.lumi_seen.lumi%06d") % lumi.file_ls),
161  lumi.state);
162  mon_->outputUpdate(doc);
163 }
164 
166  if (boost::starts_with(fn, "/")) return fn;
167 
169  p /= fn;
170  return p.string();
171 }
172 
173 void DQMFileIterator::collect(bool ignoreTimers) {
174  // search filesystem to find available lumi section files
175  // or the end of run files
176 
178  auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
180 
181  // don't refresh if it's too soon
182  if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
183  return;
184  }
185 
186  // check if directory changed
187  std::time_t mtime_now = boost::filesystem::last_write_time(runPath_);
188 
189  if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) &&
190  (mtime_now == runPathMTime_)) {
191  // logFileAction("Directory hasn't changed.");
192  return;
193  } else {
194  // logFileAction("Directory changed, updating.");
195  }
196 
197  runPathMTime_ = mtime_now;
199 
200  using boost::filesystem::directory_iterator;
201  using boost::filesystem::directory_entry;
202 
203  std::string fn_eor;
204 
205  directory_iterator dend;
206  for (directory_iterator di(runPath_); di != dend; ++di) {
207  const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
208 
209  const std::string filename = di->path().filename().string();
210  const std::string fn = di->path().string();
211 
212  if (filesSeen_.find(filename) != filesSeen_.end()) {
213  continue;
214  }
215 
216  boost::smatch result;
217  if (boost::regex_match(filename, result, fn_re)) {
218  unsigned int run = std::stoi(result[1]);
219  unsigned int lumi = std::stoi(result[2]);
220  std::string label = result[3];
221 
222  filesSeen_.insert(filename);
223 
224  if (run != runNumber_) continue;
225 
226  // check if this is EoR
227  // for various reasons we have to load it after all other files
228  if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
229  fn_eor = fn;
230  continue;
231  }
232 
233  // check if lumi is loaded
234  if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
235  continue; // already loaded
236  }
237 
238  // check if this belongs to us
239  if (label != streamLabel_) {
240  std::string msg("Found and skipped json file (stream label mismatch, ");
241  msg += label + " [files] != " + streamLabel_ + " [config]";
242  msg += "): ";
243  logFileAction(msg, fn);
244  continue;
245  }
246 
247  try {
248  LumiEntry lumi_jsn = LumiEntry::load_json(fn, lumi, datafnPosition_);
249  lumiSeen_.emplace(lumi, lumi_jsn);
250  logFileAction("Found and loaded json file: ", fn);
251 
252  monUpdateLumi(lumi_jsn);
253  } catch (const std::exception& e) {
254  // don't reset the mtime, keep it waiting
255  filesSeen_.erase(filename);
256 
257  std::string msg("Found, tried to load the json, but failed (");
258  msg += e.what();
259  msg += "): ";
260  logFileAction(msg, fn);
261  }
262  }
263  }
264 
265  if ((!fn_eor.empty()) or flagScanOnce_) {
266  if (!fn_eor.empty()) {
267  logFileAction("EoR file found: ", fn_eor);
268  }
269 
270  // @TODO load EoR files correctly
271  // eor_ = EorEntry::load_json(fn_eor);
272  // logFileAction("Loaded eor file: ", fn_eor);
273 
274  // for now , set n_lumi to the highest _found_ lumi
275  eor_.loaded = true;
276 
277  if (lumiSeen_.empty()) {
278  eor_.n_lumi = 0;
279  } else {
280  eor_.n_lumi = lumiSeen_.rbegin()->first;
281  }
282  }
283 }
284 
286  using std::chrono::high_resolution_clock;
287  using std::chrono::duration_cast;
288  using std::chrono::milliseconds;
289 
290  State old_state = state_;
291 
292  // in scanOnce mode we don't do repeated scans
293  // whatever found at reset() is be used
294  if (!flagScanOnce_) {
295  collect(false);
296  }
297 
298  if ((state_ == State::OPEN) && (eor_.loaded)) {
299  state_ = State::EOR_CLOSING;
300  }
301 
302  // special case for missing lumi files
303  // skip to the next available, but after the timeout
304  if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
305  auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
306  if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
307  auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
308  auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
309 
310  if (elapsed_ms >= nextLumiTimeoutMillis_) {
311  std::string msg("Timeout reached, skipping lumisection(s) ");
312  msg += std::to_string(nextLumiNumber_) + " .. " +
313  std::to_string(iter->first - 1);
314  msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
315  logFileAction(msg);
316 
317  advanceToLumi(iter->first, "skipped: timeout");
318  }
319  }
320  }
321 
322  if (state_ == State::EOR_CLOSING) {
323  // check if we parsed all lumis
324  // n_lumi is both last lumi and the number of lumi
325  // since lumis are indexed from 1
326 
327  // after all lumi have been pop()'ed
328  // current lumi will become larger than the last lumi
329  if (nextLumiNumber_ > eor_.n_lumi) {
330  state_ = State::EOR;
331  }
332  }
333 
334  if (state_ != old_state) {
335  logFileAction("Streamer state changed: ",
336  std::to_string(old_state) + "->" + std::to_string(state_));
337 
338  if (mon_) {
339  ptree doc;
340  doc.put("fi_state", std::to_string(state_));
341  mon_->outputUpdate(doc);
342  }
343  }
344 }
345 
347  const std::string& fileName) const {
348  edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay()
349  << " " << msg << fileName;
351 }
352 
354  const std::string& msg) {
355  if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
356  lumiSeen_[lumi.file_ls].state = msg;
357 
359  } else {
360  logFileAction("Internal error: referenced lumi is not the map.");
361  }
362 }
363 
365  if (mon_.isAvailable()) mon_->keepAlive();
366 
367  usleep(delayMillis_ * 1000);
368 }
369 
371  desc.addUntracked<unsigned int>("runNumber")
372  ->setComment("Run number passed via configuration file.");
373 
374  desc.addUntracked<unsigned int>("datafnPosition", 3)
375  ->setComment(
376  "Data filename position in the positional arguments array 'data' in "
377  "json file.");
378 
379  desc.addUntracked<std::string>("streamLabel")
380  ->setComment("Stream label used in json discovery.");
381 
382  desc.addUntracked<uint32_t>("delayMillis")
383  ->setComment("Number of milliseconds to wait between file checks.");
384 
385  desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)
386  ->setComment(
387  "Number of milliseconds to wait before switching to the next lumi "
388  "section if the current is missing, -1 to disable.");
389 
390  desc.addUntracked<bool>("scanOnce", false)
391  ->setComment(
392  "Don't repeat file scans: use what was found during the initial scan. "
393  "EOR file is ignored and the state is set to 'past end of run'.");
394 
395  desc.addUntracked<std::string>("runInputDir")
396  ->setComment("Directory where the DQM files will appear.");
397 }
398 
399 } /* end of namespace */
T getUntrackedParameter(std::string const &, T const &) const
std::string make_path(const std::string &fn)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void FlushMessageLog()
edm::Service< DQMMonitoringService > mon_
void logLumiState(const LumiEntry &lumi, const std::string &msg)
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, LumiEntry > lumiSeen_
string format
Some error handling for the usage.
U second(std::pair< T, U > const &p)
DQMFileIterator(edm::ParameterSet const &pset)
tuple result
Definition: query.py:137
void logFileAction(const std::string &msg, const std::string &fileName="") const
static EorEntry load_json(const std::string &filename)
std::chrono::high_resolution_clock::time_point lastLumiLoad_
static LumiEntry load_json(const std::string &filename, int lumiNumber, int datafn_position)
tuple filename
Definition: lut2db_cfg.py:20
void monUpdateLumi(const LumiEntry &lumi)
std::chrono::high_resolution_clock::time_point runPathLastCollect_
std::unordered_set< std::string > filesSeen_
void advanceToLumi(unsigned int lumi, std::string reason)
static void fillDescription(edm::ParameterSetDescription &d)
void collect(bool ignoreTimers)