CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMFileIterator.cc
Go to the documentation of this file.
1 #include "DQMFileIterator.h"
4 
5 #include <boost/regex.hpp>
6 #include <boost/format.hpp>
7 #include <boost/range.hpp>
8 #include <boost/filesystem.hpp>
9 #include <boost/algorithm/string/predicate.hpp>
10 
11 #include <memory>
12 #include <string>
13 #include <iterator>
14 #include <boost/property_tree/json_parser.hpp>
15 #include <boost/property_tree/ptree.hpp>
16 
17 namespace dqmservices {
18 
20  const std::string& filename, int lumiNumber, unsigned int datafn_position) {
21  boost::property_tree::ptree pt;
22  read_json(filename, pt);
23 
25  lumi.filename = filename;
26 
27  // We rely on n_events to be the first item on the array...
28  lumi.n_events = std::next(pt.get_child("data").begin(), 1)
29  ->second.get_value<std::size_t>();
30 
31  lumi.file_ls = lumiNumber;
32  lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)
33  ->second.get_value<std::string>();
34 
35  return lumi;
36 }
37 
38 // Contents of Eor json file are ignored for the moment.
39 // This function will not be called.
41  const std::string& filename) {
42  boost::property_tree::ptree pt;
43  read_json(filename, pt);
44 
45  EorEntry eor;
46  eor.filename = filename;
47 
48  // We rely on n_events to be the first item on the array...
49  eor.n_events = std::next(pt.get_child("data").begin(), 1)
50  ->second.get_value<std::size_t>();
51  eor.n_lumi = std::next(pt.get_child("data").begin(), 2)
52  ->second.get_value<std::size_t>();
53 
54  eor.loaded = true;
55  return eor;
56 }
57 
59  : state_(EOR) {
60 
61  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
62  datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
63  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
64  streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
65  delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
67  pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
68 
70  reset();
71 }
72 
74 
76  runPath_ = str(boost::format("%s/run%06d") % runInputDir_ % runNumber_);
77 
78  eor_.loaded = false;
79  state_ = State::OPEN;
80  nextLumiNumber_ = 1;
81  lumiSeen_.clear();
82  filesSeen_.clear();
83 
85 
86  collect(true);
87  update_state();
88 
89  if (mon_.isAvailable()) {
90  ptree doc;
91  doc.put("run", runNumber_);
92  doc.put("next_lumi", nextLumiNumber_);
93  doc.put("fi_state", std::to_string(state_));
94  mon_->outputUpdate(doc);
95  }
96 
97 }
98 
100 
103  advanceToLumi(nextLumiNumber_ + 1, "open: file iterator");
104  return lumi;
105 }
106 
108  if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
109  return true;
110  }
111 
112  return false;
113 }
114 
115 unsigned int DQMFileIterator::runNumber() { return runNumber_; }
116 
118  if (!lumiSeen_.empty()) {
119  return lumiSeen_.rbegin()->first;
120  }
121 
122  return 1;
123 }
124 
125 void DQMFileIterator::advanceToLumi(unsigned int lumi, std::string reason) {
126  using boost::property_tree::ptree;
127  using boost::str;
128 
129  unsigned int currentLumi = nextLumiNumber_;
130 
133 
134  auto iter = lumiSeen_.lower_bound(currentLumi);
135 
136  while ((iter != lumiSeen_.end()) && ((iter->first) < nextLumiNumber_)) {
137  iter->second.state = reason;
138  monUpdateLumi(iter->second);
139 
140  ++iter;
141  }
142 
143  if (mon_.isAvailable()) {
144  // report the successful lumi file open
145  ptree doc;
146  doc.put("next_lumi", nextLumiNumber_);
147  mon_->outputUpdate(doc);
148  }
149 }
150 
152  if (! mon_.isAvailable())
153  return;
154 
155  ptree lumi_doc;
156  lumi_doc.put("filename", lumi.filename);
157  lumi_doc.put("file_ls", lumi.file_ls);
158  lumi_doc.put("state", lumi.state);
159 
160  ptree doc;
161  doc.add_child(str(boost::format("extra.lumi_seen.lumi%06d") % lumi.file_ls), lumi_doc);
162  mon_->outputUpdate(doc);
163 }
164 
166  if (boost::starts_with(fn, "/")) return fn;
167 
169  p /= fn;
170  return p.string();
171 }
172 
173 void DQMFileIterator::collect(bool ignoreTimers) {
174  // search filesystem to find available lumi section files
175  // or the end of run files
176 
178  auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
180 
181  // don't refresh if it's too soon
182  if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
183  return;
184  }
185 
186  // check if directory changed
187  std::time_t mtime_now = boost::filesystem::last_write_time(runPath_);
188 
189  if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) && (mtime_now == runPathMTime_)) {
190  //logFileAction("Directory hasn't changed.");
191  return;
192  } else {
193  //logFileAction("Directory changed, updating.");
194  }
195 
196  runPathMTime_ = mtime_now;
198 
199  using boost::filesystem::directory_iterator;
200  using boost::filesystem::directory_entry;
201 
202  std::string fn_eor;
203 
204  directory_iterator dend;
205  for (directory_iterator di(runPath_); di != dend; ++di) {
206  const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
207 
208  const std::string filename = di->path().filename().string();
209  const std::string fn = di->path().string();
210 
211  if (filesSeen_.find(filename) != filesSeen_.end()) {
212  continue;
213  }
214 
215  boost::smatch result;
216  if (boost::regex_match(filename, result, fn_re)) {
217  unsigned int run = std::stoi(result[1]);
218  unsigned int lumi = std::stoi(result[2]);
219  std::string label = result[3];
220 
221  filesSeen_.insert(filename);
222 
223  if (run != runNumber_) continue;
224 
225  // check if this is EoR
226  // for various reasons we have to load it after all other files
227  if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
228  fn_eor = fn;
229  continue;
230  }
231 
232  // check if lumi is loaded
233  if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
234  continue; // already loaded
235  }
236 
237  // check if this belongs to us
238  if (label != streamLabel_) {
239  std::string msg("Found and skipped json file (stream label mismatch, ");
240  msg += label + " [files] != " + streamLabel_ + " [config]";
241  msg += "): ";
242  logFileAction(msg, fn);
243  continue;
244  }
245 
246  try {
247  LumiEntry lumi_jsn = LumiEntry::load_json(fn, lumi, datafnPosition_);
248  lumiSeen_.emplace(lumi, lumi_jsn);
249  logFileAction("Found and loaded json file: ", fn);
250 
251  monUpdateLumi(lumi_jsn);
252  } catch (const std::exception& e) {
253  // don't reset the mtime, keep it waiting
254  filesSeen_.erase(filename);
255 
256  std::string msg("Found, tried to load the json, but failed (");
257  msg += e.what();
258  msg += "): ";
259  logFileAction(msg, fn);
260  }
261  }
262  }
263 
264  if (!fn_eor.empty()) {
265  logFileAction("EoR file found: ", fn_eor);
266 
267  // @TODO load EoR files correctly
268  // eor_ = EorEntry::load_json(fn_eor);
269  // logFileAction("Loaded eor file: ", fn_eor);
270 
271  // for now , set n_lumi to the highest _found_ lumi
272  eor_.loaded = true;
273 
274  if (lumiSeen_.empty()) {
275  eor_.n_lumi = 0;
276  } else {
277  eor_.n_lumi = lumiSeen_.rbegin()->first;
278  }
279  }
280 }
281 
283  using std::chrono::high_resolution_clock;
284  using std::chrono::duration_cast;
285  using std::chrono::milliseconds;
286 
287  collect(false);
288 
289  // now update the state
290  State old_state = state_;
291 
292  if ((state_ == State::OPEN) && (eor_.loaded)) {
293  state_ = State::EOR_CLOSING;
294  }
295 
296  // special case for missing lumi files
297  // skip to the next available, but after the timeout
298  if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
299  auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
300  if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
301 
302  auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
303  auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
304 
305  if (elapsed_ms >= nextLumiTimeoutMillis_) {
306  std::string msg("Timeout reached, skipping lumisection(s) ");
307  msg += std::to_string(nextLumiNumber_) + " .. " +
308  std::to_string(iter->first - 1);
309  msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
310  logFileAction(msg);
311 
312  advanceToLumi(iter->first, "skipped: timeout");
313  }
314  }
315  }
316 
317  if (state_ == State::EOR_CLOSING) {
318  // check if we parsed all lumis
319  // n_lumi is both last lumi and the number of lumi
320  // since lumis are indexed from 1
321 
322  // after all lumi have been pop()'ed
323  // current lumi will become larger than the last lumi
324  if (nextLumiNumber_ > eor_.n_lumi) {
325  state_ = State::EOR;
326  }
327  }
328 
329  if (state_ != old_state) {
330  logFileAction("Streamer state changed: ",
331  std::to_string(old_state) + "->" + std::to_string(state_));
332 
333  if (mon_) {
334  ptree doc;
335  doc.put("fi_state", std::to_string(state_));
336  mon_->outputUpdate(doc);
337  }
338  }
339 }
340 
342  const std::string& fileName) const {
343  edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay()
344  << " " << msg << fileName;
346 }
347 
349  if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
350  lumiSeen_[lumi.file_ls].state = msg;
351 
353  } else {
354  logFileAction("Internal error: referenced lumi is not the map.");
355  }
356 }
357 
359  if (mon_.isAvailable())
360  mon_->keepAlive();
361 
362  usleep(delayMillis_ * 1000);
363 }
364 
366 
367  desc.addUntracked<unsigned int>("runNumber")
368  ->setComment("Run number passed via configuration file.");
369 
370  desc.addUntracked<unsigned int>("datafnPosition", 3)
371  ->setComment("Data filename position in the positional arguments array 'data' in json file.");
372 
373  desc.addUntracked<std::string>("streamLabel")
374  ->setComment("Stream label used in json discovery.");
375 
376  desc.addUntracked<uint32_t>("delayMillis")
377  ->setComment("Number of milliseconds to wait between file checks.");
378 
379  desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)->setComment(
380  "Number of milliseconds to wait before switching to the next lumi "
381  "section if the current is missing, -1 to disable.");
382 
383  desc.addUntracked<std::string>("runInputDir")
384  ->setComment("Directory where the DQM files will appear.");
385 }
386 
387 } /* end of namespace */
T getUntrackedParameter(std::string const &, T const &) const
std::string make_path(const std::string &fn)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void FlushMessageLog()
edm::Service< DQMMonitoringService > mon_
void logLumiState(const LumiEntry &lumi, const std::string &msg)
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, LumiEntry > lumiSeen_
string format
Some error handling for the usage.
U second(std::pair< T, U > const &p)
std::string to_string(const T &t)
Definition: Logger.cc:26
DQMFileIterator(edm::ParameterSet const &pset)
tuple path
else: Piece not in the list, fine.
tuple result
Definition: query.py:137
void logFileAction(const std::string &msg, const std::string &fileName="") const
tuple doc
Definition: asciidump.py:381
static EorEntry load_json(const std::string &filename)
std::chrono::high_resolution_clock::time_point lastLumiLoad_
static LumiEntry load_json(const std::string &filename, int lumiNumber, unsigned int datafn_position)
tuple filename
Definition: lut2db_cfg.py:20
void monUpdateLumi(const LumiEntry &lumi)
std::chrono::high_resolution_clock::time_point runPathLastCollect_
std::unordered_set< std::string > filesSeen_
void advanceToLumi(unsigned int lumi, std::string reason)
static void fillDescription(edm::ParameterSetDescription &d)
void collect(bool ignoreTimers)