CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DQMFileIterator.cc
Go to the documentation of this file.
1 #include "DQMFileIterator.h"
4 
5 #include <boost/regex.hpp>
6 #include <boost/format.hpp>
7 #include <boost/range.hpp>
8 #include <boost/filesystem.hpp>
9 #include <boost/algorithm/string/predicate.hpp>
10 
11 #include <memory>
12 #include <string>
13 #include <iterator>
14 #include <boost/property_tree/json_parser.hpp>
15 #include <boost/property_tree/ptree.hpp>
16 
17 namespace dqmservices {
18 
20  const std::string& filename, int lumiNumber, unsigned int datafn_position) {
21  boost::property_tree::ptree pt;
22  read_json(filename, pt);
23 
25  lumi.filename = filename;
26 
27  // We rely on n_events to be the first item on the array...
28  lumi.n_events = std::next(pt.get_child("data").begin(), 1)
29  ->second.get_value<std::size_t>();
30 
31  lumi.ls = lumiNumber;
32  lumi.datafilename = std::next(pt.get_child("data").begin(), datafn_position)
33  ->second.get_value<std::string>();
34 
35  lumi.loaded = true;
36  return lumi;
37 }
38 
39 // Contents of Eor json file are ignored for the moment.
40 // This function will not be called.
42  const std::string& filename) {
43  boost::property_tree::ptree pt;
44  read_json(filename, pt);
45 
46  EorEntry eor;
47  eor.filename = filename;
48 
49  // We rely on n_events to be the first item on the array...
50  eor.n_events = std::next(pt.get_child("data").begin(), 1)
51  ->second.get_value<std::size_t>();
52  eor.n_lumi = std::next(pt.get_child("data").begin(), 2)
53  ->second.get_value<std::size_t>();
54  eor.datafilename = std::next(pt.get_child("data").begin(), 2)
55  ->second.get_value<std::string>();
56 
57  eor.loaded = true;
58  return eor;
59 }
60 
62  : state_(EOR) {
63 
64  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
65  datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
66  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
67  streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
68  delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
70  pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
71 
72  reset();
73 }
74 
76 
78  runPath_ = str(boost::format("%s/run%06d") % runInputDir_ % runNumber_);
79 
80  eor_.loaded = false;
81  state_ = State::OPEN;
82  currentLumi_ = 1;
83  lumiSeen_.clear();
84 
86 
87  collect(true);
88  update_state();
89 }
90 
92 
94  return lumiSeen_[currentLumi_];
95 }
96 
99 }
100 
102  if (lumiSeen_.find(currentLumi_) != lumiSeen_.end()) {
103  return true;
104  }
105 
106  return false;
107 }
108 
109 unsigned int DQMFileIterator::runNumber() { return runNumber_; }
110 
112  if (!lumiSeen_.empty()) {
113  return lumiSeen_.rbegin()->first;
114  }
115 
116  return 1;
117 }
118 
120  using boost::property_tree::ptree;
121  using boost::str;
122 
123  unsigned int prev_lumi = currentLumi_;
124 
125  currentLumi_ = lumi;
127 
128  // report the successful lumi file open
129  if (mon_.isAvailable()) {
130  ptree children;
131 
132  auto iter = lumiSeen_.begin();
133  for (; iter != lumiSeen_.end(); ++iter) {
134  children.put(std::to_string(iter->first), iter->second.filename);
135  }
136 
137  mon_->registerExtra("lumiSeen", children);
138  mon_->reportLumiSection(runNumber_, prev_lumi);
139  }
140 }
141 
143  if (boost::starts_with(lumi.datafilename, "/")) return lumi.datafilename;
144 
146  p /= lumi.datafilename;
147  return p.string();
148 }
149 
150 void DQMFileIterator::collect(bool ignoreTimers) {
151  // search filesystem to find available lumi section files
152  // or the end of run files
153 
155  auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
157 
158  // don't refresh if it's too soon
159  if ((!ignoreTimers) && (last_ms < 100)) {
160  return;
161  } else {
163  }
164 
165  // check if directory changed
166  std::time_t t = boost::filesystem::last_write_time(runPath_);
167 
168  if ((!ignoreTimers) && (t <= runPathMTime_)) {
169  //logFileAction("Directory hasn't changed.");
170  return;
171  } else {
172  //logFileAction("Directory changed, updating.");
173  runPathMTime_ = t;
174  }
175 
176  using boost::filesystem::directory_iterator;
177  using boost::filesystem::directory_entry;
178 
179  std::string fn_eor;
180 
181  directory_iterator dend;
182  for (directory_iterator di(runPath_); di != dend; ++di) {
183  const boost::regex fn_re("run(\\d+)_ls(\\d+)(_.*).jsn");
184 
185  const std::string filename = di->path().filename().string();
186  const std::string fn = di->path().string();
187 
188  boost::smatch result;
189  if (boost::regex_match(filename, result, fn_re)) {
190  unsigned int run = std::stoi(result[1]);
191  unsigned int lumi = std::stoi(result[2]);
192  std::string label = result[3];
193 
194  if (run != runNumber_) continue;
195 
196  // check if this is EoR
197  // for various reasons we have to load it after all other files
198  if ((lumi == 0) && (label == "_EoR") && (!eor_.loaded)) {
199  fn_eor = fn;
200  continue;
201  }
202 
203  // check if lumi is loaded
204  if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
205  continue; // already loaded
206  }
207 
208  // check if this belongs to us
209  if (label != streamLabel_) {
210  logFileAction("Found and skipped json file (stream label mismatch): ",
211  fn);
212  continue;
213  }
214 
215  LumiEntry lumi_jsn = LumiEntry::load_json(fn, lumi, datafnPosition_);
216  lumiSeen_.emplace(lumi, lumi_jsn);
217  logFileAction("Found and loaded json file: ", fn);
218  }
219  }
220 
221  if (!fn_eor.empty()) {
222  logFileAction("EoR file found: ", fn_eor);
223 
224  // @TODO load EoR files correctly
225  // eor_ = EorEntry::load_json(fn_eor);
226  // logFileAction("Loaded eor file: ", fn_eor);
227 
228  // for now , set n_lumi to the highest _found_ lumi
229  eor_.loaded = true;
230 
231  if (lumiSeen_.empty()) {
232  eor_.n_lumi = 0;
233  } else {
234  eor_.n_lumi = lumiSeen_.rbegin()->first;
235  }
236  }
237 }
238 
240  using std::chrono::high_resolution_clock;
241  using std::chrono::duration_cast;
242  using std::chrono::milliseconds;
243 
244  collect(false);
245 
246  // now update the state
247  State old_state = state_;
248 
249  if ((state_ == State::OPEN) && (eor_.loaded)) {
250  state_ = State::EOR_CLOSING;
251  }
252 
253  // special case for missing lumi files
254  // skip to the next available, but after the timeout
255  if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
256  auto iter = lumiSeen_.lower_bound(currentLumi_);
257  if ((iter != lumiSeen_.end()) && iter->first != currentLumi_) {
258 
259  auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
260  auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
261 
262  if (elapsed_ms >= nextLumiTimeoutMillis_) {
263  std::string msg("Timeout reached, skipping lumisection(s) ");
264  msg += std::to_string(currentLumi_) + " .. " +
265  std::to_string(iter->first - 1);
266  msg += ", currentLumi_ is now " + std::to_string(iter->first);
267 
268  logFileAction(msg);
269 
270  currentLumi_ = iter->first;
271  }
272  }
273  }
274 
275  if (state_ == State::EOR_CLOSING) {
276  // check if we parsed all lumis
277  // n_lumi is both last lumi and the number of lumi
278  // since lumis are indexed from 1
279 
280  // after all lumi have been pop()'ed
281  // current lumi will become larger than the last lumi
282  if (currentLumi_ > eor_.n_lumi) {
283  state_ = State::EOR;
284  }
285  }
286 
287  if (state_ != old_state) {
288  logFileAction("Streamer state changed: ",
289  std::to_string(old_state) + "->" + std::to_string(state_));
290  }
291 }
292 
294  const std::string& fileName) const {
295  edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay()
296  << " " << msg << fileName;
298 }
299 
301  const char* x = getenv("WATCHDOG_FD");
302  if (x) {
303  int fd = atoi(x);
304  write(fd, ".\n", 2);
305  }
306 }
307 
309  //logFileAction("Streamer waiting for the next LS.");
310 
311  updateWatchdog();
312  usleep(delayMillis_ * 1000);
313  updateWatchdog();
314 }
315 
317 
318  desc.addUntracked<unsigned int>("runNumber")
319  ->setComment("Run number passed via configuration file.");
320 
321  desc.addUntracked<unsigned int>("datafnPosition", 3)
322  ->setComment("Data filename position in the positional arguments array 'data' in json file.");
323 
324  desc.addUntracked<std::string>("streamLabel")
325  ->setComment("Stream label used in json discovery.");
326 
327  desc.addUntracked<uint32_t>("delayMillis")
328  ->setComment("Number of milliseconds to wait between file checks.");
329 
330  desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)->setComment(
331  "Number of milliseconds to wait before switching to the next lumi "
332  "section if the current is missing, -1 to disable.");
333 
334  desc.addUntracked<std::string>("runInputDir")
335  ->setComment("Directory where the DQM files will appear.");
336 }
337 
338 } /* end of namespace */
T getUntrackedParameter(std::string const &, T const &) const
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void FlushMessageLog()
edm::Service< DQMMonitoringService > mon_
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, LumiEntry > lumiSeen_
string format
Some error handling for the usage.
void advanceToLumi(unsigned int lumi)
std::string make_path_data(const LumiEntry &lumi)
U second(std::pair< T, U > const &p)
std::string to_string(const T &t)
Definition: Logger.cc:26
DQMFileIterator(edm::ParameterSet const &pset)
tuple path
else: Piece not in the list, fine.
tuple result
Definition: query.py:137
void logFileAction(const std::string &msg, const std::string &fileName="") const
static EorEntry load_json(const std::string &filename)
std::chrono::high_resolution_clock::time_point lastLumiLoad_
static LumiEntry load_json(const std::string &filename, int lumiNumber, unsigned int datafn_position)
tuple filename
Definition: lut2db_cfg.py:20
std::chrono::high_resolution_clock::time_point runPathLastCollect_
Definition: DDAxes.h:10
static void fillDescription(edm::ParameterSetDescription &d)
void collect(bool ignoreTimers)