CMS 3D CMS Logo

DQMFileIterator.cc
Go to the documentation of this file.
1 #include <filesystem>
2 #include <iterator>
3 #include <memory>
4 #include <string>
5 
6 #include <boost/algorithm/string.hpp>
7 #include <boost/algorithm/string/predicate.hpp>
8 #include <boost/property_tree/json_parser.hpp>
9 #include <boost/property_tree/ptree.hpp>
10 #include <boost/range.hpp>
11 #include <boost/regex.hpp>
12 #include <fmt/printf.h>
13 
16 #include "DQMFileIterator.h"
17 
18 namespace dqmservices {
19 
21  const std::string& filename,
22  int lumiNumber,
23  int datafn_position) {
24  boost::property_tree::ptree pt;
25  read_json(filename, pt);
26 
28  lumi.filename = filename;
29  lumi.run_path = run_path;
30 
31  lumi.n_events_processed = std::next(pt.get_child("data").begin(), 0)->second.get_value<std::size_t>();
32 
33  lumi.n_events_accepted = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
34 
35  lumi.file_ls = lumiNumber;
36 
37  if (datafn_position >= 0) {
38  lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)->second.get_value<std::string>();
39  }
40 
41  return lumi;
42  }
43 
45  if (boost::starts_with(datafn, "/"))
46  return datafn;
47 
48  std::filesystem::path p(run_path);
49  p /= datafn;
50  return p.string();
51  }
52 
54 
55  // Contents of Eor json file are ignored for the moment.
56  // This function will not be called.
58  const std::string& filename) {
59  boost::property_tree::ptree pt;
60  read_json(filename, pt);
61 
62  EorEntry eor;
63  eor.filename = filename;
64  eor.run_path = run_path;
65 
66  // We rely on n_events to be the first item on the array...
67  eor.n_events = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
68  eor.n_lumi = std::next(pt.get_child("data").begin(), 2)->second.get_value<std::size_t>();
69 
70  eor.loaded = true;
71  return eor;
72  }
73 
75  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
76  datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
77  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
78  streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
79  delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
80  nextLumiTimeoutMillis_ = pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
81 
82  // scan one mode
83  flagScanOnce_ = pset.getUntrackedParameter<bool>("scanOnce");
84 
86  reset();
87  }
88 
90 
92  runPath_.clear();
93 
94  std::vector<std::string> tokens;
95  boost::split(tokens, runInputDir_, boost::is_any_of(":"));
96 
97  for (const auto& token : tokens) {
98  runPath_.push_back(fmt::sprintf("%s/run%06d", token, runNumber_));
99  }
100 
101  eor_.loaded = false;
102  state_ = State::OPEN;
103  nextLumiNumber_ = 1;
104  lumiSeen_.clear();
105  filesSeen_.clear();
106 
108 
109  collect(true);
110  update_state();
111 
112  if (mon_.isAvailable()) {
113  ptree doc;
114  doc.put("run", runNumber_);
115  doc.put("next_lumi", nextLumiNumber_);
116  doc.put("fi_state", std::to_string(state_));
117  mon_->outputUpdate(doc);
118  }
119  }
120 
122 
125  advanceToLumi(nextLumiNumber_ + 1, "open: file iterator");
126  return lumi;
127  }
128 
130  if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
131  return true;
132  }
133 
134  return false;
135  }
136 
137  unsigned int DQMFileIterator::runNumber() { return runNumber_; }
138 
140  if (!lumiSeen_.empty()) {
141  return lumiSeen_.rbegin()->first;
142  }
143 
144  return 1;
145  }
146 
148  using boost::str;
149  using boost::property_tree::ptree;
150 
151  unsigned int currentLumi = nextLumiNumber_;
152 
155 
156  auto iter = lumiSeen_.lower_bound(currentLumi);
157 
158  while ((iter != lumiSeen_.end()) && ((iter->first) < nextLumiNumber_)) {
159  iter->second.state = reason;
160  monUpdateLumi(iter->second);
161 
162  ++iter;
163  }
164 
165  if (mon_.isAvailable()) {
166  // report the successful lumi file open
167  ptree doc;
168  doc.put("next_lumi", nextLumiNumber_);
169  mon_->outputUpdate(doc);
170  }
171  }
172 
174  if (!mon_.isAvailable())
175  return;
176 
177  ptree doc;
178  doc.put(fmt::sprintf("extra.lumi_seen.lumi%06d", lumi.file_ls), lumi.state);
179  mon_->outputUpdate(doc);
180  }
181 
182  unsigned DQMFileIterator::mtimeHash() const {
183  unsigned mtime_now = 0;
184 
185  for (const auto& path : runPath_) {
186  if (!std::filesystem::exists(path))
187  continue;
188 
189  auto write_time = std::filesystem::last_write_time(path);
190  mtime_now =
191  mtime_now ^ std::chrono::duration_cast<std::chrono::microseconds>(write_time.time_since_epoch()).count();
192  }
193 
194  return mtime_now;
195  }
196 
197  void DQMFileIterator::collect(bool ignoreTimers) {
198  // search filesystem to find available lumi section files
199  // or the end of run files
200 
202  auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - runPathLastCollect_).count();
203 
204  // don't refresh if it's too soon
205  if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
206  return;
207  }
208 
209  // check if directory changed
210  auto mtime_now = mtimeHash();
211 
212  if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) && (mtime_now == runPathMTime_)) {
213  // logFileAction("Directory hasn't changed.");
214  return;
215  } else {
216  // logFileAction("Directory changed, updating.");
217  }
218 
219  runPathMTime_ = mtime_now;
221 
222  using std::filesystem::directory_entry;
223  using std::filesystem::directory_iterator;
224 
225  std::string fn_eor;
226 
227  for (const auto& runPath : runPath_) {
228  if (!std::filesystem::exists(runPath)) {
229  logFileAction("Directory does not exist: ", runPath);
230 
231  continue;
232  }
233 
234  directory_iterator dend;
235  for (directory_iterator di(runPath); di != dend; ++di) {
236  const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
237 
238  const std::string filename = di->path().filename().string();
239  const std::string fn = di->path().string();
240 
241  if (filesSeen_.find(filename) != filesSeen_.end()) {
242  continue;
243  }
244 
245  boost::smatch result;
246  if (boost::regex_match(filename, result, fn_re)) {
247  unsigned int run = std::stoi(result[1]);
248  unsigned int lumi = std::stoi(result[2]);
249  std::string label = result[3];
250 
251  filesSeen_.insert(filename);
252 
253  if (run != runNumber_)
254  continue;
255 
256  // check if this is EoR
257  // for various reasons we have to load it after all other files
258  if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
259  fn_eor = fn;
260  continue;
261  }
262 
263  // check if lumi is loaded
264  if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
265  continue; // already loaded
266  }
267 
268  // check if this belongs to us
269  if (label != streamLabel_) {
270  std::string msg("Found and skipped json file (stream label mismatch, ");
271  msg += label + " [files] != " + streamLabel_ + " [config]";
272  msg += "): ";
273  logFileAction(msg, fn);
274  continue;
275  }
276 
277  try {
278  LumiEntry lumi_jsn = LumiEntry::load_json(runPath, fn, lumi, datafnPosition_);
279  lumiSeen_.emplace(lumi, lumi_jsn);
280  logFileAction("Found and loaded json file: ", fn);
281 
282  monUpdateLumi(lumi_jsn);
283  } catch (const std::exception& e) {
284  // don't reset the mtime, keep it waiting
285  filesSeen_.erase(filename);
286 
287  std::string msg("Found, tried to load the json, but failed (");
288  msg += e.what();
289  msg += "): ";
290  logFileAction(msg, fn);
291  }
292  }
293  }
294  }
295 
296  if ((!fn_eor.empty()) or flagScanOnce_) {
297  if (!fn_eor.empty()) {
298  logFileAction("EoR file found: ", fn_eor);
299  }
300 
301  // @TODO load EoR files correctly
302  // eor_ = EorEntry::load_json(fn_eor);
303  // logFileAction("Loaded eor file: ", fn_eor);
304 
305  // for now , set n_lumi to the highest _found_ lumi
306  eor_.loaded = true;
307 
308  if (lumiSeen_.empty()) {
309  eor_.n_lumi = 0;
310  } else {
311  eor_.n_lumi = lumiSeen_.rbegin()->first;
312  }
313  }
314  }
315 
317  using std::chrono::duration_cast;
318  using std::chrono::high_resolution_clock;
319  using std::chrono::milliseconds;
320 
321  State old_state = state_;
322 
323  // in scanOnce mode we don't do repeated scans
324  // whatever found at reset() is be used
325  if (!flagScanOnce_) {
326  collect(false);
327  }
328 
329  if ((state_ == State::OPEN) && (eor_.loaded)) {
330  state_ = State::EOR_CLOSING;
331  }
332 
333  // special case for missing lumi files
334  // skip to the next available, but after the timeout
335  if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
336  auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
337  if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
338  auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
339  auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
340 
341  if (elapsed_ms >= nextLumiTimeoutMillis_) {
342  std::string msg("Timeout reached, skipping lumisection(s) ");
343  msg += std::to_string(nextLumiNumber_) + " .. " + std::to_string(iter->first - 1);
344  msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
346 
347  advanceToLumi(iter->first, "skipped: timeout");
348  }
349  }
350  }
351 
352  if (state_ == State::EOR_CLOSING) {
353  // check if we parsed all lumis
354  // n_lumi is both last lumi and the number of lumi
355  // since lumis are indexed from 1
356 
357  // after all lumi have been pop()'ed
358  // current lumi will become larger than the last lumi
359  if (nextLumiNumber_ > eor_.n_lumi) {
360  state_ = State::EOR;
361  }
362  }
363 
364  if (state_ != old_state) {
365  logFileAction("Streamer state changed: ", std::to_string(old_state) + "->" + std::to_string(state_));
366 
367  if (mon_) {
368  ptree doc;
369  doc.put("fi_state", std::to_string(state_));
370  mon_->outputUpdate(doc);
371  }
372  }
373  }
374 
376  edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay() << " " << msg << fileName;
378  }
379 
381  if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
382  lumiSeen_[lumi.file_ls].state = msg;
383 
384  monUpdateLumi(lumiSeen_[lumi.file_ls]);
385  } else {
386  logFileAction("Internal error: referenced lumi is not the map.");
387  }
388  }
389 
391  if (mon_.isAvailable())
392  mon_->keepAlive();
393 
394  usleep(delayMillis_ * 1000);
395  }
396 
398  desc.addUntracked<unsigned int>("runNumber")->setComment("Run number passed via configuration file.");
399 
400  desc.addUntracked<unsigned int>("datafnPosition", 3)
401  ->setComment(
402  "Data filename position in the positional arguments array 'data' in "
403  "json file.");
404 
405  desc.addUntracked<std::string>("streamLabel")->setComment("Stream label used in json discovery.");
406 
407  desc.addUntracked<uint32_t>("delayMillis")->setComment("Number of milliseconds to wait between file checks.");
408 
409  desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)
410  ->setComment(
411  "Number of milliseconds to wait before switching to the next lumi "
412  "section if the current is missing, -1 to disable.");
413 
414  desc.addUntracked<bool>("scanOnce", false)
415  ->setComment(
416  "Don't repeat file scans: use what was found during the initial scan. "
417  "EOR file is ignored and the state is set to 'past end of run'.");
418 
419  desc.addUntracked<std::string>("runInputDir")->setComment("Directory where the DQM files will appear.");
420  }
421 
422 } // namespace dqmservices
dqmservices::DQMFileIterator::runNumber_
unsigned int runNumber_
Definition: DQMFileIterator.h:93
dqmservices::DQMFileIterator::EorEntry::n_events
std::size_t n_events
Definition: DQMFileIterator.h:44
dqmservices::DQMFileIterator::EorEntry
Definition: DQMFileIterator.h:39
dqmservices::DQMFileIterator::runNumber
unsigned int runNumber()
Definition: DQMFileIterator.cc:137
dqmservices::DQMFileIterator::state_
State state_
Definition: DQMFileIterator.h:106
common_cff.doc
doc
Definition: common_cff.py:54
dqmservices::DQMFileIterator::EorEntry::load_json
static EorEntry load_json(const std::string &run_path, const std::string &filename)
Definition: DQMFileIterator.cc:57
dqmservices::DQMFileIterator::logFileAction
void logFileAction(const std::string &msg, const std::string &fileName="") const
Definition: DQMFileIterator.cc:375
MessageLogger.h
dqmservices::DQMFileIterator::reset
void reset()
Definition: DQMFileIterator.cc:91
dqmservices
Definition: DQMFileIterator.cc:18
edm::TimeOfDay
Definition: TimeOfDay.h:9
DiDispStaMuonMonitor_cfi.pt
pt
Definition: DiDispStaMuonMonitor_cfi.py:39
dqmservices::DQMFileIterator::lumiSeen_
std::map< unsigned int, LumiEntry > lumiSeen_
Definition: DQMFileIterator.h:109
submitPVValidationJobs.now
now
Definition: submitPVValidationJobs.py:639
dqmservices::DQMFileIterator::collect
void collect(bool ignoreTimers)
Definition: DQMFileIterator.cc:197
dqmservices::DQMFileIterator::LumiEntry::load_json
static LumiEntry load_json(const std::string &run_path, const std::string &filename, int lumiNumber, int datafn_position)
Definition: DQMFileIterator.cc:20
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
dqmservices::DQMFileIterator::delayMillis_
unsigned long delayMillis_
Definition: DQMFileIterator.h:96
dqmservices::DQMFileIterator::filesSeen_
std::unordered_set< std::string > filesSeen_
Definition: DQMFileIterator.h:110
edm::second
U second(std::pair< T, U > const &p)
Definition: ParameterSet.cc:222
dqmservices::DQMFileIterator::open
LumiEntry open()
Definition: DQMFileIterator.cc:123
dqmservices::DQMFileIterator::flagScanOnce_
bool flagScanOnce_
Definition: DQMFileIterator.h:99
DQMFileIterator.h
dqmservices::DQMFileIterator::EOR
Definition: DQMFileIterator.h:53
mps_check.msg
tuple msg
Definition: mps_check.py:285
dqmservices::DQMFileIterator::streamLabel_
std::string streamLabel_
Definition: DQMFileIterator.h:95
dqmservices::DQMFileIterator::runPathLastCollect_
std::chrono::high_resolution_clock::time_point runPathLastCollect_
Definition: DQMFileIterator.h:115
dqmservices::DQMFileIterator::delay
void delay()
Definition: DQMFileIterator.cc:390
MillePedeFileConverter_cfg.fileName
fileName
Definition: MillePedeFileConverter_cfg.py:32
dqmservices::DQMFileIterator::datafnPosition_
unsigned int datafnPosition_
Definition: DQMFileIterator.h:102
dqmservices::DQMFileIterator::LumiEntry
Definition: DQMFileIterator.h:20
dqmservices::DQMFileIterator::nextLumiNumber_
unsigned int nextLumiNumber_
Definition: DQMFileIterator.h:108
dqmservices::DQMFileIterator::EorEntry::run_path
std::string run_path
Definition: DQMFileIterator.h:42
dqmservices::DQMFileIterator::fillDescription
static void fillDescription(edm::ParameterSetDescription &d)
Definition: DQMFileIterator.cc:397
dqmservices::DQMFileIterator::LumiEntry::get_json_path
std::string get_json_path() const
Definition: DQMFileIterator.cc:53
BXlumiParameters_cfi.lumi
lumi
Definition: BXlumiParameters_cfi.py:6
dqmservices::DQMFileIterator::mon_
edm::Service< DQMMonitoringService > mon_
Definition: DQMFileIterator.h:125
submitPVValidationJobs.split
def split(sequence, size)
Definition: submitPVValidationJobs.py:352
str
#define str(s)
Definition: TestProcessor.cc:52
dqmservices::DQMFileIterator::DQMFileIterator
DQMFileIterator(edm::ParameterSet const &pset)
Definition: DQMFileIterator.cc:74
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
submitPVResolutionJobs.count
count
Definition: submitPVResolutionJobs.py:352
dqmservices::DQMFileIterator::mtimeHash
unsigned mtimeHash() const
Definition: DQMFileIterator.cc:182
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
edm::FlushMessageLog
void FlushMessageLog()
Definition: MessageLogger.cc:34
dqmservices::DQMFileIterator::LumiEntry::get_data_path
std::string get_data_path() const
Definition: DQMFileIterator.cc:44
edm::ParameterSet
Definition: ParameterSet.h:47
dqmservices::DQMFileIterator::state
State state()
Definition: DQMFileIterator.cc:121
dqmservices::DQMFileIterator::lumiReady
bool lumiReady()
Definition: DQMFileIterator.cc:129
edm::LogAbsolute
Log< level::System, true > LogAbsolute
Definition: MessageLogger.h:134
dqmservices::DQMFileIterator::eor_
EorEntry eor_
Definition: DQMFileIterator.h:105
PixelMapPlotter.reason
reason
Definition: PixelMapPlotter.py:509
dqmservices::DQMFileIterator::advanceToLumi
void advanceToLumi(unsigned int lumi, std::string reason)
Definition: DQMFileIterator.cc:147
dqmservices::DQMFileIterator::runPathMTime_
unsigned runPathMTime_
Definition: DQMFileIterator.h:114
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
writedatasetfile.run
run
Definition: writedatasetfile.py:27
dqmservices::DQMFileIterator::runInputDir_
std::string runInputDir_
Definition: DQMFileIterator.h:94
dqmservices::DQMFileIterator::EorEntry::filename
std::string filename
Definition: DQMFileIterator.h:41
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
personalPlayback.fn
fn
Definition: personalPlayback.py:515
dqmservices::DQMFileIterator::update_state
void update_state()
Definition: DQMFileIterator.cc:316
dqmservices::DQMFileIterator::nextLumiTimeoutMillis_
long nextLumiTimeoutMillis_
Definition: DQMFileIterator.h:97
dqmservices::DQMFileIterator::monUpdateLumi
void monUpdateLumi(const LumiEntry &lumi)
Definition: DQMFileIterator.cc:173
mps_fire.result
result
Definition: mps_fire.py:311
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
TimeOfDay.h
dqmservices::DQMFileIterator::runPath_
std::vector< std::string > runPath_
Definition: DQMFileIterator.h:103
dqmservices::DQMFileIterator::LumiEntry::run_path
std::string run_path
Definition: DQMFileIterator.h:22
dqmservices::DQMFileIterator::EorEntry::loaded
bool loaded
Definition: DQMFileIterator.h:40
dqmservices::DQMFileIterator::forceFileCheckTimeoutMillis_
long forceFileCheckTimeoutMillis_
Definition: DQMFileIterator.h:98
dqmservices::DQMFileIterator::EorEntry::n_lumi
std::size_t n_lumi
Definition: DQMFileIterator.h:45
dqmservices::DQMFileIterator::LumiEntry::filename
std::string filename
Definition: DQMFileIterator.h:21
lumi
Definition: LumiSectionData.h:20
dqmservices::DQMFileIterator::logLumiState
void logLumiState(const LumiEntry &lumi, const std::string &msg)
Definition: DQMFileIterator.cc:380
label
const char * label
Definition: PFTauDecayModeTools.cc:11
dqmservices::DQMFileIterator::lastLumiFound
unsigned int lastLumiFound()
Definition: DQMFileIterator.cc:139
dqmservices::DQMFileIterator::State
State
Definition: DQMFileIterator.h:50
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
GetRecoTauVFromDQM_MC_cff.next
next
Definition: GetRecoTauVFromDQM_MC_cff.py:31
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
dqmservices::DQMFileIterator::~DQMFileIterator
~DQMFileIterator()
Definition: DQMFileIterator.cc:89
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316
dqmservices::DQMFileIterator::lastLumiLoad_
std::chrono::high_resolution_clock::time_point lastLumiLoad_
Definition: DQMFileIterator.h:118