CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMFileIterator.cc
Go to the documentation of this file.
1 #include "DQMFileIterator.h"
4 
5 #include <boost/regex.hpp>
6 #include <boost/format.hpp>
7 #include <boost/range.hpp>
8 #include <boost/filesystem.hpp>
9 #include <boost/algorithm/string/predicate.hpp>
10 
11 #include <memory>
12 #include <string>
13 #include <iterator>
14 #include <boost/property_tree/json_parser.hpp>
15 #include <boost/property_tree/ptree.hpp>
16 
17 namespace dqmservices {
18 
20  const std::string& filename, int lumiNumber, unsigned int datafn_position) {
21  boost::property_tree::ptree pt;
22  read_json(filename, pt);
23 
25  lumi.filename = filename;
26 
27  // We rely on n_events to be the first item on the array...
28  lumi.n_events = std::next(pt.get_child("data").begin(), 1)
29  ->second.get_value<std::size_t>();
30 
31  lumi.file_ls = lumiNumber;
32  lumi.datafilename = std::next(pt.get_child("data").begin(), datafn_position)
33  ->second.get_value<std::string>();
34 
35  return lumi;
36 }
37 
38 // Contents of Eor json file are ignored for the moment.
39 // This function will not be called.
41  const std::string& filename) {
42  boost::property_tree::ptree pt;
43  read_json(filename, pt);
44 
45  EorEntry eor;
46  eor.filename = filename;
47 
48  // We rely on n_events to be the first item on the array...
49  eor.n_events = std::next(pt.get_child("data").begin(), 1)
50  ->second.get_value<std::size_t>();
51  eor.n_lumi = std::next(pt.get_child("data").begin(), 2)
52  ->second.get_value<std::size_t>();
53  eor.datafilename = std::next(pt.get_child("data").begin(), 2)
54  ->second.get_value<std::string>();
55 
56  eor.loaded = true;
57  return eor;
58 }
59 
61  : state_(EOR) {
62 
63  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
64  datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
65  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
66  streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
67  delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
69  pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
70 
72  reset();
73 
74  if (mon_.isAvailable()) {
75  ptree doc;
76  doc.put("run", runNumber_);
77  mon_->outputUpdate(doc);
79  }
80 }
81 
83 
85  runPath_ = str(boost::format("%s/run%06d") % runInputDir_ % runNumber_);
86 
87  eor_.loaded = false;
88  state_ = State::OPEN;
89  nextLumiNumber_ = 1;
90  lumiSeen_.clear();
91 
93 
94  collect(true);
95  update_state();
96 }
97 
99 
103 
104  lumi.state = "open: file iterator";
105  return lumi;
106 }
107 
109  if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
110  return true;
111  }
112 
113  return false;
114 }
115 
116 unsigned int DQMFileIterator::runNumber() { return runNumber_; }
117 
119  if (!lumiSeen_.empty()) {
120  return lumiSeen_.rbegin()->first;
121  }
122 
123  return 1;
124 }
125 
127  using boost::property_tree::ptree;
128  using boost::str;
129 
130  unsigned int currentLumi = nextLumiNumber_;
131 
134 
135  if (mon_.isAvailable()) {
136  // report the successful lumi file open
137  ptree doc;
138  doc.put("lumi", currentLumi);
139  mon_->outputUpdate(doc);
141  }
142 }
143 
145  if (! mon_.isAvailable())
146  return;
147 
148  ptree children;
149  auto iter = lumiSeen_.begin();
150  for (; iter != lumiSeen_.end(); ++iter) {
151  ptree lumi;
152  lumi.put("filename", iter->second.filename);
153  lumi.put("file_ls", iter->second.file_ls);
154  lumi.put("state", iter->second.state);
155 
156  children.add_child(std::to_string(iter->first), lumi);
157  }
158 
159  ptree doc;
160  doc.add_child("lumiSeen", children);
161  mon_->outputUpdate(doc);
162 }
163 
165  if (boost::starts_with(lumi.datafilename, "/")) return lumi.datafilename;
166 
168  p /= lumi.datafilename;
169  return p.string();
170 }
171 
172 void DQMFileIterator::collect(bool ignoreTimers) {
173  // search filesystem to find available lumi section files
174  // or the end of run files
175 
177  auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
179 
180  // don't refresh if it's too soon
181  if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
182  return;
183  }
184 
185  // check if directory changed
186  std::time_t mtime_now = boost::filesystem::last_write_time(runPath_);
187 
188  if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) && (mtime_now == runPathMTime_)) {
189  //logFileAction("Directory hasn't changed.");
190  return;
191  } else {
192  //logFileAction("Directory changed, updating.");
193  }
194 
195  runPathMTime_ = mtime_now;
197 
198  using boost::filesystem::directory_iterator;
199  using boost::filesystem::directory_entry;
200 
201  std::string fn_eor;
202 
203  directory_iterator dend;
204  for (directory_iterator di(runPath_); di != dend; ++di) {
205  const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
206 
207  const std::string filename = di->path().filename().string();
208  const std::string fn = di->path().string();
209 
210  boost::smatch result;
211  if (boost::regex_match(filename, result, fn_re)) {
212  unsigned int run = std::stoi(result[1]);
213  unsigned int lumi = std::stoi(result[2]);
214  std::string label = result[3];
215 
216  if (run != runNumber_) continue;
217 
218  // check if this is EoR
219  // for various reasons we have to load it after all other files
220  if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
221  fn_eor = fn;
222  continue;
223  }
224 
225  // check if lumi is loaded
226  if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
227  continue; // already loaded
228  }
229 
230  // check if this belongs to us
231  if (label != streamLabel_) {
232  std::string msg("Found and skipped json file (stream label mismatch, ");
233  msg += label + " [files] != " + streamLabel_ + " [config]";
234  msg += "): ";
235  logFileAction(msg, fn);
236  continue;
237  }
238 
239  try {
240  LumiEntry lumi_jsn = LumiEntry::load_json(fn, lumi, datafnPosition_);
241  lumiSeen_.emplace(lumi, lumi_jsn);
242  logFileAction("Found and loaded json file: ", fn);
243  } catch (const std::exception& e) {
244  // don't reset the mtime, keep it waiting
245  std::string msg("Found, tried to load the json, but failed (");
246  msg += e.what();
247  msg += "): ";
248  logFileAction(msg, fn);
249  }
250  }
251  }
252 
253  if (!fn_eor.empty()) {
254  logFileAction("EoR file found: ", fn_eor);
255 
256  // @TODO load EoR files correctly
257  // eor_ = EorEntry::load_json(fn_eor);
258  // logFileAction("Loaded eor file: ", fn_eor);
259 
260  // for now , set n_lumi to the highest _found_ lumi
261  eor_.loaded = true;
262 
263  if (lumiSeen_.empty()) {
264  eor_.n_lumi = 0;
265  } else {
266  eor_.n_lumi = lumiSeen_.rbegin()->first;
267  }
268  }
269 }
270 
272  using std::chrono::high_resolution_clock;
273  using std::chrono::duration_cast;
274  using std::chrono::milliseconds;
275 
276  collect(false);
277 
278  // now update the state
279  State old_state = state_;
280 
281  if ((state_ == State::OPEN) && (eor_.loaded)) {
282  state_ = State::EOR_CLOSING;
283  }
284 
285  // special case for missing lumi files
286  // skip to the next available, but after the timeout
287  if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
288  auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
289  if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
290 
291  auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
292  auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
293 
294  if (elapsed_ms >= nextLumiTimeoutMillis_) {
295  std::string msg("Timeout reached, skipping lumisection(s) ");
296  msg += std::to_string(nextLumiNumber_) + " .. " +
297  std::to_string(iter->first - 1);
298  msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
299 
300  logFileAction(msg);
301 
302  nextLumiNumber_ = iter->first;
303  }
304  }
305  }
306 
307  if (state_ == State::EOR_CLOSING) {
308  // check if we parsed all lumis
309  // n_lumi is both last lumi and the number of lumi
310  // since lumis are indexed from 1
311 
312  // after all lumi have been pop()'ed
313  // current lumi will become larger than the last lumi
314  if (nextLumiNumber_ > eor_.n_lumi) {
315  state_ = State::EOR;
316  }
317  }
318 
319  if (state_ != old_state) {
320  logFileAction("Streamer state changed: ",
321  std::to_string(old_state) + "->" + std::to_string(state_));
322 
324  }
325 }
326 
328  const std::string& fileName) const {
329  edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay()
330  << " " << msg << fileName;
332 }
333 
335  if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
336  lumiSeen_[lumi.file_ls].state = msg;
337  } else {
338  logFileAction("Internal error: referenced lumi is not the map.");
339  }
340 }
341 
343  //logFileAction("Streamer waiting for the next LS.");
344 
345  if (mon_.isAvailable())
346  mon_->keepAlive();
347 
348  usleep(delayMillis_ * 1000);
349 
350  if (mon_.isAvailable())
351  mon_->keepAlive();
352 }
353 
355 
356  desc.addUntracked<unsigned int>("runNumber")
357  ->setComment("Run number passed via configuration file.");
358 
359  desc.addUntracked<unsigned int>("datafnPosition", 3)
360  ->setComment("Data filename position in the positional arguments array 'data' in json file.");
361 
362  desc.addUntracked<std::string>("streamLabel")
363  ->setComment("Stream label used in json discovery.");
364 
365  desc.addUntracked<uint32_t>("delayMillis")
366  ->setComment("Number of milliseconds to wait between file checks.");
367 
368  desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)->setComment(
369  "Number of milliseconds to wait before switching to the next lumi "
370  "section if the current is missing, -1 to disable.");
371 
372  desc.addUntracked<std::string>("runInputDir")
373  ->setComment("Directory where the DQM files will appear.");
374 }
375 
376 } /* end of namespace */
T getUntrackedParameter(std::string const &, T const &) const
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void FlushMessageLog()
edm::Service< DQMMonitoringService > mon_
void logLumiState(const LumiEntry &lumi, const std::string &msg)
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, LumiEntry > lumiSeen_
string format
Some error handling for the usage.
void advanceToLumi(unsigned int lumi)
std::string make_path_data(const LumiEntry &lumi)
U second(std::pair< T, U > const &p)
std::string to_string(const T &t)
Definition: Logger.cc:26
DQMFileIterator(edm::ParameterSet const &pset)
tuple path
else: Piece not in the list, fine.
tuple result
Definition: query.py:137
void logFileAction(const std::string &msg, const std::string &fileName="") const
tuple doc
Definition: asciidump.py:381
static EorEntry load_json(const std::string &filename)
std::chrono::high_resolution_clock::time_point lastLumiLoad_
static LumiEntry load_json(const std::string &filename, int lumiNumber, unsigned int datafn_position)
tuple filename
Definition: lut2db_cfg.py:20
std::chrono::high_resolution_clock::time_point runPathLastCollect_
static void fillDescription(edm::ParameterSetDescription &d)
void collect(bool ignoreTimers)