CMS 3D CMS Logo

DQMFileIterator.cc
Go to the documentation of this file.
1 #include "DQMFileIterator.h"
4 
5 #include <boost/regex.hpp>
6 #include <boost/format.hpp>
7 #include <boost/range.hpp>
8 #include <boost/filesystem.hpp>
9 #include <boost/algorithm/string/predicate.hpp>
10 
11 #include <memory>
12 #include <string>
13 #include <iterator>
14 #include <boost/property_tree/json_parser.hpp>
15 #include <boost/property_tree/ptree.hpp>
16 #include <boost/algorithm/string.hpp>
17 
18 namespace dqmservices {
19 
21  const std::string& filename,
22  int lumiNumber,
23  int datafn_position) {
24  boost::property_tree::ptree pt;
25  read_json(filename, pt);
26 
28  lumi.filename = filename;
29  lumi.run_path = run_path;
30 
31  lumi.n_events_processed = std::next(pt.get_child("data").begin(), 0)->second.get_value<std::size_t>();
32 
33  lumi.n_events_accepted = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
34 
35  lumi.file_ls = lumiNumber;
36 
37  if (datafn_position >= 0) {
38  lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)->second.get_value<std::string>();
39  }
40 
41  return lumi;
42  }
43 
45  if (boost::starts_with(datafn, "/"))
46  return datafn;
47 
48  boost::filesystem::path p(run_path);
49  p /= datafn;
50  return p.string();
51  }
52 
54 
55  // Contents of Eor json file are ignored for the moment.
56  // This function will not be called.
58  const std::string& filename) {
59  boost::property_tree::ptree pt;
60  read_json(filename, pt);
61 
62  EorEntry eor;
63  eor.filename = filename;
64  eor.run_path = run_path;
65 
66  // We rely on n_events to be the first item on the array...
67  eor.n_events = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
68  eor.n_lumi = std::next(pt.get_child("data").begin(), 2)->second.get_value<std::size_t>();
69 
70  eor.loaded = true;
71  return eor;
72  }
73 
75  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
76  datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
77  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
78  streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
79  delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
80  nextLumiTimeoutMillis_ = pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
81 
82  // scan one mode
83  flagScanOnce_ = pset.getUntrackedParameter<bool>("scanOnce");
84 
86  reset();
87  }
88 
90 
92  runPath_.clear();
93 
94  std::vector<std::string> tokens;
95  boost::split(tokens, runInputDir_, boost::is_any_of(":"));
96 
97  for (auto token : tokens) {
98  runPath_.push_back(boost::str(boost::format("%s/run%06d") % token % runNumber_));
99  }
100 
101  eor_.loaded = false;
102  state_ = State::OPEN;
103  nextLumiNumber_ = 1;
104  lumiSeen_.clear();
105  filesSeen_.clear();
106 
108 
109  collect(true);
110  update_state();
111 
112  if (mon_.isAvailable()) {
113  ptree doc;
114  doc.put("run", runNumber_);
115  doc.put("next_lumi", nextLumiNumber_);
116  doc.put("fi_state", std::to_string(state_));
117  mon_->outputUpdate(doc);
118  }
119  }
120 
122 
125  advanceToLumi(nextLumiNumber_ + 1, "open: file iterator");
126  return lumi;
127  }
128 
130  if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
131  return true;
132  }
133 
134  return false;
135  }
136 
137  unsigned int DQMFileIterator::runNumber() { return runNumber_; }
138 
140  if (!lumiSeen_.empty()) {
141  return lumiSeen_.rbegin()->first;
142  }
143 
144  return 1;
145  }
146 
148  using boost::str;
149  using boost::property_tree::ptree;
150 
151  unsigned int currentLumi = nextLumiNumber_;
152 
155 
156  auto iter = lumiSeen_.lower_bound(currentLumi);
157 
158  while ((iter != lumiSeen_.end()) && ((iter->first) < nextLumiNumber_)) {
159  iter->second.state = reason;
160  monUpdateLumi(iter->second);
161 
162  ++iter;
163  }
164 
165  if (mon_.isAvailable()) {
166  // report the successful lumi file open
167  ptree doc;
168  doc.put("next_lumi", nextLumiNumber_);
169  mon_->outputUpdate(doc);
170  }
171  }
172 
174  if (!mon_.isAvailable())
175  return;
176 
177  ptree doc;
178  doc.put(str(boost::format("extra.lumi_seen.lumi%06d") % lumi.file_ls), lumi.state);
179  mon_->outputUpdate(doc);
180  }
181 
182  std::time_t DQMFileIterator::mtimeHash() const {
183  std::time_t mtime_now = 0;
184 
185  for (auto path : runPath_) {
186  if (!boost::filesystem::exists(path))
187  continue;
188 
189  mtime_now = mtime_now ^ boost::filesystem::last_write_time(path);
190  }
191 
192  return mtime_now;
193  }
194 
195  void DQMFileIterator::collect(bool ignoreTimers) {
196  // search filesystem to find available lumi section files
197  // or the end of run files
198 
200  auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - runPathLastCollect_).count();
201 
202  // don't refresh if it's too soon
203  if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
204  return;
205  }
206 
207  // check if directory changed
208  std::time_t mtime_now = mtimeHash();
209 
210  if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) && (mtime_now == runPathMTime_)) {
211  // logFileAction("Directory hasn't changed.");
212  return;
213  } else {
214  // logFileAction("Directory changed, updating.");
215  }
216 
217  runPathMTime_ = mtime_now;
219 
220  using boost::filesystem::directory_entry;
221  using boost::filesystem::directory_iterator;
222 
223  std::string fn_eor;
224 
225  for (auto runPath : runPath_) {
226  if (!boost::filesystem::exists(runPath)) {
227  logFileAction("Directory does not exist: ", runPath);
228 
229  continue;
230  }
231 
232  directory_iterator dend;
233  for (directory_iterator di(runPath); di != dend; ++di) {
234  const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
235 
236  const std::string filename = di->path().filename().string();
237  const std::string fn = di->path().string();
238 
239  if (filesSeen_.find(filename) != filesSeen_.end()) {
240  continue;
241  }
242 
243  boost::smatch result;
244  if (boost::regex_match(filename, result, fn_re)) {
245  unsigned int run = std::stoi(result[1]);
246  unsigned int lumi = std::stoi(result[2]);
247  std::string label = result[3];
248 
249  filesSeen_.insert(filename);
250 
251  if (run != runNumber_)
252  continue;
253 
254  // check if this is EoR
255  // for various reasons we have to load it after all other files
256  if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
257  fn_eor = fn;
258  continue;
259  }
260 
261  // check if lumi is loaded
262  if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
263  continue; // already loaded
264  }
265 
266  // check if this belongs to us
267  if (label != streamLabel_) {
268  std::string msg("Found and skipped json file (stream label mismatch, ");
269  msg += label + " [files] != " + streamLabel_ + " [config]";
270  msg += "): ";
271  logFileAction(msg, fn);
272  continue;
273  }
274 
275  try {
276  LumiEntry lumi_jsn = LumiEntry::load_json(runPath, fn, lumi, datafnPosition_);
277  lumiSeen_.emplace(lumi, lumi_jsn);
278  logFileAction("Found and loaded json file: ", fn);
279 
280  monUpdateLumi(lumi_jsn);
281  } catch (const std::exception& e) {
282  // don't reset the mtime, keep it waiting
283  filesSeen_.erase(filename);
284 
285  std::string msg("Found, tried to load the json, but failed (");
286  msg += e.what();
287  msg += "): ";
288  logFileAction(msg, fn);
289  }
290  }
291  }
292  }
293 
294  if ((!fn_eor.empty()) or flagScanOnce_) {
295  if (!fn_eor.empty()) {
296  logFileAction("EoR file found: ", fn_eor);
297  }
298 
299  // @TODO load EoR files correctly
300  // eor_ = EorEntry::load_json(fn_eor);
301  // logFileAction("Loaded eor file: ", fn_eor);
302 
303  // for now , set n_lumi to the highest _found_ lumi
304  eor_.loaded = true;
305 
306  if (lumiSeen_.empty()) {
307  eor_.n_lumi = 0;
308  } else {
309  eor_.n_lumi = lumiSeen_.rbegin()->first;
310  }
311  }
312  }
313 
315  using std::chrono::duration_cast;
316  using std::chrono::high_resolution_clock;
317  using std::chrono::milliseconds;
318 
319  State old_state = state_;
320 
321  // in scanOnce mode we don't do repeated scans
322  // whatever found at reset() is be used
323  if (!flagScanOnce_) {
324  collect(false);
325  }
326 
327  if ((state_ == State::OPEN) && (eor_.loaded)) {
328  state_ = State::EOR_CLOSING;
329  }
330 
331  // special case for missing lumi files
332  // skip to the next available, but after the timeout
333  if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
334  auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
335  if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
336  auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
337  auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
338 
339  if (elapsed_ms >= nextLumiTimeoutMillis_) {
340  std::string msg("Timeout reached, skipping lumisection(s) ");
341  msg += std::to_string(nextLumiNumber_) + " .. " + std::to_string(iter->first - 1);
342  msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
344 
345  advanceToLumi(iter->first, "skipped: timeout");
346  }
347  }
348  }
349 
350  if (state_ == State::EOR_CLOSING) {
351  // check if we parsed all lumis
352  // n_lumi is both last lumi and the number of lumi
353  // since lumis are indexed from 1
354 
355  // after all lumi have been pop()'ed
356  // current lumi will become larger than the last lumi
357  if (nextLumiNumber_ > eor_.n_lumi) {
358  state_ = State::EOR;
359  }
360  }
361 
362  if (state_ != old_state) {
363  logFileAction("Streamer state changed: ", std::to_string(old_state) + "->" + std::to_string(state_));
364 
365  if (mon_) {
366  ptree doc;
367  doc.put("fi_state", std::to_string(state_));
368  mon_->outputUpdate(doc);
369  }
370  }
371  }
372 
374  edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay() << " " << msg << fileName;
376  }
377 
379  if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
380  lumiSeen_[lumi.file_ls].state = msg;
381 
382  monUpdateLumi(lumiSeen_[lumi.file_ls]);
383  } else {
384  logFileAction("Internal error: referenced lumi is not the map.");
385  }
386  }
387 
389  if (mon_.isAvailable())
390  mon_->keepAlive();
391 
392  usleep(delayMillis_ * 1000);
393  }
394 
396  desc.addUntracked<unsigned int>("runNumber")->setComment("Run number passed via configuration file.");
397 
398  desc.addUntracked<unsigned int>("datafnPosition", 3)
399  ->setComment(
400  "Data filename position in the positional arguments array 'data' in "
401  "json file.");
402 
403  desc.addUntracked<std::string>("streamLabel")->setComment("Stream label used in json discovery.");
404 
405  desc.addUntracked<uint32_t>("delayMillis")->setComment("Number of milliseconds to wait between file checks.");
406 
407  desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)
408  ->setComment(
409  "Number of milliseconds to wait before switching to the next lumi "
410  "section if the current is missing, -1 to disable.");
411 
412  desc.addUntracked<bool>("scanOnce", false)
413  ->setComment(
414  "Don't repeat file scans: use what was found during the initial scan. "
415  "EOR file is ignored and the state is set to 'past end of run'.");
416 
417  desc.addUntracked<std::string>("runInputDir")->setComment("Directory where the DQM files will appear.");
418  }
419 
420 } // namespace dqmservices
dqmservices::DQMFileIterator::runNumber_
unsigned int runNumber_
Definition: DQMFileIterator.h:93
edm::LogAbsolute
Definition: MessageLogger.h:469
dqmservices::DQMFileIterator::EorEntry::n_events
std::size_t n_events
Definition: DQMFileIterator.h:44
dqmservices::DQMFileIterator::EorEntry
Definition: DQMFileIterator.h:39
dqmservices::DQMFileIterator::runNumber
unsigned int runNumber()
Definition: DQMFileIterator.cc:137
dqmservices::DQMFileIterator::state_
State state_
Definition: DQMFileIterator.h:106
common_cff.doc
doc
Definition: common_cff.py:54
dqmservices::DQMFileIterator::EorEntry::load_json
static EorEntry load_json(const std::string &run_path, const std::string &filename)
Definition: DQMFileIterator.cc:57
dqmservices::DQMFileIterator::logFileAction
void logFileAction(const std::string &msg, const std::string &fileName="") const
Definition: DQMFileIterator.cc:373
MessageLogger.h
dqmservices::DQMFileIterator::reset
void reset()
Definition: DQMFileIterator.cc:91
dqmservices
Definition: DQMFileIterator.cc:18
edm::TimeOfDay
Definition: TimeOfDay.h:9
DiDispStaMuonMonitor_cfi.pt
pt
Definition: DiDispStaMuonMonitor_cfi.py:39
dqmservices::DQMFileIterator::lumiSeen_
std::map< unsigned int, LumiEntry > lumiSeen_
Definition: DQMFileIterator.h:109
dqmservices::DQMFileIterator::collect
void collect(bool ignoreTimers)
Definition: DQMFileIterator.cc:195
dqmservices::DQMFileIterator::LumiEntry::load_json
static LumiEntry load_json(const std::string &run_path, const std::string &filename, int lumiNumber, int datafn_position)
Definition: DQMFileIterator.cc:20
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
dqmservices::DQMFileIterator::delayMillis_
unsigned long delayMillis_
Definition: DQMFileIterator.h:96
dqmservices::DQMFileIterator::filesSeen_
std::unordered_set< std::string > filesSeen_
Definition: DQMFileIterator.h:110
edm::second
U second(std::pair< T, U > const &p)
Definition: ParameterSet.cc:215
dqmservices::DQMFileIterator::open
LumiEntry open()
Definition: DQMFileIterator.cc:123
dqmservices::DQMFileIterator::flagScanOnce_
bool flagScanOnce_
Definition: DQMFileIterator.h:99
DQMFileIterator.h
dqmservices::DQMFileIterator::EOR
Definition: DQMFileIterator.h:53
mps_check.msg
tuple msg
Definition: mps_check.py:285
dqmservices::DQMFileIterator::streamLabel_
std::string streamLabel_
Definition: DQMFileIterator.h:95
dqmservices::DQMFileIterator::mtimeHash
std::time_t mtimeHash() const
Definition: DQMFileIterator.cc:182
dqmservices::DQMFileIterator::runPathLastCollect_
std::chrono::high_resolution_clock::time_point runPathLastCollect_
Definition: DQMFileIterator.h:115
cms::dd::split
std::vector< std::string_view > split(std::string_view, const char *)
dqmservices::DQMFileIterator::delay
void delay()
Definition: DQMFileIterator.cc:388
MillePedeFileConverter_cfg.fileName
fileName
Definition: MillePedeFileConverter_cfg.py:32
dqmservices::DQMFileIterator::datafnPosition_
unsigned int datafnPosition_
Definition: DQMFileIterator.h:102
dqmservices::DQMFileIterator::LumiEntry
Definition: DQMFileIterator.h:20
dqmservices::DQMFileIterator::nextLumiNumber_
unsigned int nextLumiNumber_
Definition: DQMFileIterator.h:108
dqmservices::DQMFileIterator::EorEntry::run_path
std::string run_path
Definition: DQMFileIterator.h:42
dqmservices::DQMFileIterator::fillDescription
static void fillDescription(edm::ParameterSetDescription &d)
Definition: DQMFileIterator.cc:395
dqmservices::DQMFileIterator::LumiEntry::get_json_path
std::string get_json_path() const
Definition: DQMFileIterator.cc:53
fileCollector.now
now
Definition: fileCollector.py:207
BXlumiParameters_cfi.lumi
lumi
Definition: BXlumiParameters_cfi.py:6
dqmservices::DQMFileIterator::mon_
edm::Service< DQMMonitoringService > mon_
Definition: DQMFileIterator.h:125
dqm-mbProfile.format
format
Definition: dqm-mbProfile.py:16
str
#define str(s)
Definition: TestProcessor.cc:48
dqmservices::DQMFileIterator::DQMFileIterator
DQMFileIterator(edm::ParameterSet const &pset)
Definition: DQMFileIterator.cc:74
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
dqmservices::DQMFileIterator::runPathMTime_
std::time_t runPathMTime_
Definition: DQMFileIterator.h:114
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
edm::ParameterSetDescription::addUntracked
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
Definition: ParameterSetDescription.h:100
edm::FlushMessageLog
void FlushMessageLog()
Definition: MessageLogger.cc:94
dqmservices::DQMFileIterator::LumiEntry::get_data_path
std::string get_data_path() const
Definition: DQMFileIterator.cc:44
edm::ParameterSet
Definition: ParameterSet.h:36
dqmservices::DQMFileIterator::state
State state()
Definition: DQMFileIterator.cc:121
dqmservices::DQMFileIterator::lumiReady
bool lumiReady()
Definition: DQMFileIterator.cc:129
dqmservices::DQMFileIterator::eor_
EorEntry eor_
Definition: DQMFileIterator.h:105
PixelMapPlotter.reason
reason
Definition: PixelMapPlotter.py:509
dqmservices::DQMFileIterator::advanceToLumi
void advanceToLumi(unsigned int lumi, std::string reason)
Definition: DQMFileIterator.cc:147
writedatasetfile.run
run
Definition: writedatasetfile.py:27
dqmservices::DQMFileIterator::runInputDir_
std::string runInputDir_
Definition: DQMFileIterator.h:94
dqmservices::DQMFileIterator::EorEntry::filename
std::string filename
Definition: DQMFileIterator.h:41
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
personalPlayback.fn
fn
Definition: personalPlayback.py:515
dqmservices::DQMFileIterator::update_state
void update_state()
Definition: DQMFileIterator.cc:314
dqmservices::DQMFileIterator::nextLumiTimeoutMillis_
long nextLumiTimeoutMillis_
Definition: DQMFileIterator.h:97
dqmservices::DQMFileIterator::monUpdateLumi
void monUpdateLumi(const LumiEntry &lumi)
Definition: DQMFileIterator.cc:173
mps_fire.result
result
Definition: mps_fire.py:303
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
TimeOfDay.h
dqmservices::DQMFileIterator::runPath_
std::vector< std::string > runPath_
Definition: DQMFileIterator.h:103
dqmservices::DQMFileIterator::LumiEntry::run_path
std::string run_path
Definition: DQMFileIterator.h:22
dqmservices::DQMFileIterator::EorEntry::loaded
bool loaded
Definition: DQMFileIterator.h:40
dqmservices::DQMFileIterator::forceFileCheckTimeoutMillis_
long forceFileCheckTimeoutMillis_
Definition: DQMFileIterator.h:98
dqmservices::DQMFileIterator::EorEntry::n_lumi
std::size_t n_lumi
Definition: DQMFileIterator.h:45
dqmservices::DQMFileIterator::LumiEntry::filename
std::string filename
Definition: DQMFileIterator.h:21
lumi
Definition: LumiSectionData.h:20
dqmservices::DQMFileIterator::logLumiState
void logLumiState(const LumiEntry &lumi, const std::string &msg)
Definition: DQMFileIterator.cc:378
label
const char * label
Definition: PFTauDecayModeTools.cc:11
dqmservices::DQMFileIterator::lastLumiFound
unsigned int lastLumiFound()
Definition: DQMFileIterator.cc:139
dqmservices::DQMFileIterator::State
State
Definition: DQMFileIterator.h:50
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
GetRecoTauVFromDQM_MC_cff.next
next
Definition: GetRecoTauVFromDQM_MC_cff.py:31
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
dqmservices::DQMFileIterator::~DQMFileIterator
~DQMFileIterator()
Definition: DQMFileIterator.cc:89
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316
dqmservices::DQMFileIterator::lastLumiLoad_
std::chrono::high_resolution_clock::time_point lastLumiLoad_
Definition: DQMFileIterator.h:118