CMS 3D CMS Logo

DQMFileIterator.cc
Go to the documentation of this file.
1 #include <filesystem>
2 #include <iterator>
3 #include <memory>
4 #include <string>
5 
6 #include <boost/algorithm/string.hpp>
7 #include <boost/algorithm/string/predicate.hpp>
8 #include <boost/filesystem.hpp>
9 #include <boost/property_tree/json_parser.hpp>
10 #include <boost/property_tree/ptree.hpp>
11 #include <boost/range.hpp>
12 #include <boost/regex.hpp>
13 #include <fmt/printf.h>
14 
17 #include "DQMFileIterator.h"
18 
19 namespace dqmservices {
20 
22  const std::string& filename,
23  int lumiNumber,
24  int datafn_position) {
25  boost::property_tree::ptree pt;
26  read_json(filename, pt);
27 
29  lumi.filename = filename;
30  lumi.run_path = run_path;
31 
32  lumi.n_events_processed = std::next(pt.get_child("data").begin(), 0)->second.get_value<std::size_t>();
33 
34  lumi.n_events_accepted = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
35 
36  lumi.file_ls = lumiNumber;
37 
38  if (datafn_position >= 0) {
39  lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)->second.get_value<std::string>();
40  }
41 
42  return lumi;
43  }
44 
46  if (boost::starts_with(datafn, "/"))
47  return datafn;
48 
49  std::filesystem::path p(run_path);
50  p /= datafn;
51  return p.string();
52  }
53 
55 
56  // Contents of Eor json file are ignored for the moment.
57  // This function will not be called.
59  const std::string& filename) {
60  boost::property_tree::ptree pt;
61  read_json(filename, pt);
62 
63  EorEntry eor;
64  eor.filename = filename;
65  eor.run_path = run_path;
66 
67  // We rely on n_events to be the first item on the array...
68  eor.n_events = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
69  eor.n_lumi = std::next(pt.get_child("data").begin(), 2)->second.get_value<std::size_t>();
70 
71  eor.loaded = true;
72  return eor;
73  }
74 
76  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
77  datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
78  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
79  streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
80  delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
81  nextLumiTimeoutMillis_ = pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
82 
83  // scan one mode
84  flagScanOnce_ = pset.getUntrackedParameter<bool>("scanOnce");
85 
87  reset();
88  }
89 
91 
93  runPath_.clear();
94 
95  std::vector<std::string> tokens;
96  boost::split(tokens, runInputDir_, boost::is_any_of(":"));
97 
98  for (auto token : tokens) {
99  runPath_.push_back(fmt::sprintf("%s/run%06d", token, runNumber_));
100  }
101 
102  eor_.loaded = false;
103  state_ = State::OPEN;
104  nextLumiNumber_ = 1;
105  lumiSeen_.clear();
106  filesSeen_.clear();
107 
109 
110  collect(true);
111  update_state();
112 
113  if (mon_.isAvailable()) {
114  ptree doc;
115  doc.put("run", runNumber_);
116  doc.put("next_lumi", nextLumiNumber_);
117  doc.put("fi_state", std::to_string(state_));
118  mon_->outputUpdate(doc);
119  }
120  }
121 
123 
126  advanceToLumi(nextLumiNumber_ + 1, "open: file iterator");
127  return lumi;
128  }
129 
131  if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
132  return true;
133  }
134 
135  return false;
136  }
137 
138  unsigned int DQMFileIterator::runNumber() { return runNumber_; }
139 
141  if (!lumiSeen_.empty()) {
142  return lumiSeen_.rbegin()->first;
143  }
144 
145  return 1;
146  }
147 
149  using boost::str;
150  using boost::property_tree::ptree;
151 
152  unsigned int currentLumi = nextLumiNumber_;
153 
156 
157  auto iter = lumiSeen_.lower_bound(currentLumi);
158 
159  while ((iter != lumiSeen_.end()) && ((iter->first) < nextLumiNumber_)) {
160  iter->second.state = reason;
161  monUpdateLumi(iter->second);
162 
163  ++iter;
164  }
165 
166  if (mon_.isAvailable()) {
167  // report the successful lumi file open
168  ptree doc;
169  doc.put("next_lumi", nextLumiNumber_);
170  mon_->outputUpdate(doc);
171  }
172  }
173 
175  if (!mon_.isAvailable())
176  return;
177 
178  ptree doc;
179  doc.put(fmt::sprintf("extra.lumi_seen.lumi%06d", lumi.file_ls), lumi.state);
180  mon_->outputUpdate(doc);
181  }
182 
183  unsigned DQMFileIterator::mtimeHash() const {
184  unsigned mtime_now = 0;
185 
186  for (auto path : runPath_) {
187  if (!std::filesystem::exists(path))
188  continue;
189 
190  auto write_time = std::filesystem::last_write_time(path);
191  mtime_now =
192  mtime_now ^ std::chrono::duration_cast<std::chrono::microseconds>(write_time.time_since_epoch()).count();
193  }
194 
195  return mtime_now;
196  }
197 
198  void DQMFileIterator::collect(bool ignoreTimers) {
199  // search filesystem to find available lumi section files
200  // or the end of run files
201 
203  auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - runPathLastCollect_).count();
204 
205  // don't refresh if it's too soon
206  if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
207  return;
208  }
209 
210  // check if directory changed
211  auto mtime_now = mtimeHash();
212 
213  if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) && (mtime_now == runPathMTime_)) {
214  // logFileAction("Directory hasn't changed.");
215  return;
216  } else {
217  // logFileAction("Directory changed, updating.");
218  }
219 
220  runPathMTime_ = mtime_now;
222 
223  using std::filesystem::directory_entry;
224  using std::filesystem::directory_iterator;
225 
226  std::string fn_eor;
227 
228  for (auto runPath : runPath_) {
229  if (!std::filesystem::exists(runPath)) {
230  logFileAction("Directory does not exist: ", runPath);
231 
232  continue;
233  }
234 
235  directory_iterator dend;
236  for (directory_iterator di(runPath); di != dend; ++di) {
237  const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
238 
239  const std::string filename = di->path().filename().string();
240  const std::string fn = di->path().string();
241 
242  if (filesSeen_.find(filename) != filesSeen_.end()) {
243  continue;
244  }
245 
246  boost::smatch result;
247  if (boost::regex_match(filename, result, fn_re)) {
248  unsigned int run = std::stoi(result[1]);
249  unsigned int lumi = std::stoi(result[2]);
250  std::string label = result[3];
251 
252  filesSeen_.insert(filename);
253 
254  if (run != runNumber_)
255  continue;
256 
257  // check if this is EoR
258  // for various reasons we have to load it after all other files
259  if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
260  fn_eor = fn;
261  continue;
262  }
263 
264  // check if lumi is loaded
265  if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
266  continue; // already loaded
267  }
268 
269  // check if this belongs to us
270  if (label != streamLabel_) {
271  std::string msg("Found and skipped json file (stream label mismatch, ");
272  msg += label + " [files] != " + streamLabel_ + " [config]";
273  msg += "): ";
274  logFileAction(msg, fn);
275  continue;
276  }
277 
278  try {
279  LumiEntry lumi_jsn = LumiEntry::load_json(runPath, fn, lumi, datafnPosition_);
280  lumiSeen_.emplace(lumi, lumi_jsn);
281  logFileAction("Found and loaded json file: ", fn);
282 
283  monUpdateLumi(lumi_jsn);
284  } catch (const std::exception& e) {
285  // don't reset the mtime, keep it waiting
286  filesSeen_.erase(filename);
287 
288  std::string msg("Found, tried to load the json, but failed (");
289  msg += e.what();
290  msg += "): ";
291  logFileAction(msg, fn);
292  }
293  }
294  }
295  }
296 
297  if ((!fn_eor.empty()) or flagScanOnce_) {
298  if (!fn_eor.empty()) {
299  logFileAction("EoR file found: ", fn_eor);
300  }
301 
302  // @TODO load EoR files correctly
303  // eor_ = EorEntry::load_json(fn_eor);
304  // logFileAction("Loaded eor file: ", fn_eor);
305 
306  // for now , set n_lumi to the highest _found_ lumi
307  eor_.loaded = true;
308 
309  if (lumiSeen_.empty()) {
310  eor_.n_lumi = 0;
311  } else {
312  eor_.n_lumi = lumiSeen_.rbegin()->first;
313  }
314  }
315  }
316 
318  using std::chrono::duration_cast;
319  using std::chrono::high_resolution_clock;
320  using std::chrono::milliseconds;
321 
322  State old_state = state_;
323 
324  // in scanOnce mode we don't do repeated scans
325  // whatever found at reset() is be used
326  if (!flagScanOnce_) {
327  collect(false);
328  }
329 
330  if ((state_ == State::OPEN) && (eor_.loaded)) {
331  state_ = State::EOR_CLOSING;
332  }
333 
334  // special case for missing lumi files
335  // skip to the next available, but after the timeout
336  if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
337  auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
338  if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
339  auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
340  auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
341 
342  if (elapsed_ms >= nextLumiTimeoutMillis_) {
343  std::string msg("Timeout reached, skipping lumisection(s) ");
344  msg += std::to_string(nextLumiNumber_) + " .. " + std::to_string(iter->first - 1);
345  msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
347 
348  advanceToLumi(iter->first, "skipped: timeout");
349  }
350  }
351  }
352 
353  if (state_ == State::EOR_CLOSING) {
354  // check if we parsed all lumis
355  // n_lumi is both last lumi and the number of lumi
356  // since lumis are indexed from 1
357 
358  // after all lumi have been pop()'ed
359  // current lumi will become larger than the last lumi
360  if (nextLumiNumber_ > eor_.n_lumi) {
361  state_ = State::EOR;
362  }
363  }
364 
365  if (state_ != old_state) {
366  logFileAction("Streamer state changed: ", std::to_string(old_state) + "->" + std::to_string(state_));
367 
368  if (mon_) {
369  ptree doc;
370  doc.put("fi_state", std::to_string(state_));
371  mon_->outputUpdate(doc);
372  }
373  }
374  }
375 
377  edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay() << " " << msg << fileName;
379  }
380 
382  if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
383  lumiSeen_[lumi.file_ls].state = msg;
384 
385  monUpdateLumi(lumiSeen_[lumi.file_ls]);
386  } else {
387  logFileAction("Internal error: referenced lumi is not the map.");
388  }
389  }
390 
392  if (mon_.isAvailable())
393  mon_->keepAlive();
394 
395  usleep(delayMillis_ * 1000);
396  }
397 
399  desc.addUntracked<unsigned int>("runNumber")->setComment("Run number passed via configuration file.");
400 
401  desc.addUntracked<unsigned int>("datafnPosition", 3)
402  ->setComment(
403  "Data filename position in the positional arguments array 'data' in "
404  "json file.");
405 
406  desc.addUntracked<std::string>("streamLabel")->setComment("Stream label used in json discovery.");
407 
408  desc.addUntracked<uint32_t>("delayMillis")->setComment("Number of milliseconds to wait between file checks.");
409 
410  desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)
411  ->setComment(
412  "Number of milliseconds to wait before switching to the next lumi "
413  "section if the current is missing, -1 to disable.");
414 
415  desc.addUntracked<bool>("scanOnce", false)
416  ->setComment(
417  "Don't repeat file scans: use what was found during the initial scan. "
418  "EOR file is ignored and the state is set to 'past end of run'.");
419 
420  desc.addUntracked<std::string>("runInputDir")->setComment("Directory where the DQM files will appear.");
421  }
422 
423 } // namespace dqmservices
dqmservices::DQMFileIterator::runNumber_
unsigned int runNumber_
Definition: DQMFileIterator.h:93
dqmservices::DQMFileIterator::EorEntry::n_events
std::size_t n_events
Definition: DQMFileIterator.h:44
dqmservices::DQMFileIterator::EorEntry
Definition: DQMFileIterator.h:39
dqmservices::DQMFileIterator::runNumber
unsigned int runNumber()
Definition: DQMFileIterator.cc:138
dqmservices::DQMFileIterator::state_
State state_
Definition: DQMFileIterator.h:106
common_cff.doc
doc
Definition: common_cff.py:54
dqmservices::DQMFileIterator::EorEntry::load_json
static EorEntry load_json(const std::string &run_path, const std::string &filename)
Definition: DQMFileIterator.cc:58
dqmservices::DQMFileIterator::logFileAction
void logFileAction(const std::string &msg, const std::string &fileName="") const
Definition: DQMFileIterator.cc:376
MessageLogger.h
dqmservices::DQMFileIterator::reset
void reset()
Definition: DQMFileIterator.cc:92
dqmservices
Definition: DQMFileIterator.cc:19
edm::TimeOfDay
Definition: TimeOfDay.h:9
DiDispStaMuonMonitor_cfi.pt
pt
Definition: DiDispStaMuonMonitor_cfi.py:39
dqmservices::DQMFileIterator::lumiSeen_
std::map< unsigned int, LumiEntry > lumiSeen_
Definition: DQMFileIterator.h:109
submitPVValidationJobs.now
now
Definition: submitPVValidationJobs.py:639
dqmservices::DQMFileIterator::collect
void collect(bool ignoreTimers)
Definition: DQMFileIterator.cc:198
dqmservices::DQMFileIterator::LumiEntry::load_json
static LumiEntry load_json(const std::string &run_path, const std::string &filename, int lumiNumber, int datafn_position)
Definition: DQMFileIterator.cc:21
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
dqmservices::DQMFileIterator::delayMillis_
unsigned long delayMillis_
Definition: DQMFileIterator.h:96
dqmservices::DQMFileIterator::filesSeen_
std::unordered_set< std::string > filesSeen_
Definition: DQMFileIterator.h:110
edm::second
U second(std::pair< T, U > const &p)
Definition: ParameterSet.cc:222
dqmservices::DQMFileIterator::open
LumiEntry open()
Definition: DQMFileIterator.cc:124
dqmservices::DQMFileIterator::flagScanOnce_
bool flagScanOnce_
Definition: DQMFileIterator.h:99
DQMFileIterator.h
dqmservices::DQMFileIterator::EOR
Definition: DQMFileIterator.h:53
mps_check.msg
tuple msg
Definition: mps_check.py:285
dqmservices::DQMFileIterator::streamLabel_
std::string streamLabel_
Definition: DQMFileIterator.h:95
dqmservices::DQMFileIterator::runPathLastCollect_
std::chrono::high_resolution_clock::time_point runPathLastCollect_
Definition: DQMFileIterator.h:115
dqmservices::DQMFileIterator::delay
void delay()
Definition: DQMFileIterator.cc:391
MillePedeFileConverter_cfg.fileName
fileName
Definition: MillePedeFileConverter_cfg.py:32
dqmservices::DQMFileIterator::datafnPosition_
unsigned int datafnPosition_
Definition: DQMFileIterator.h:102
dqmservices::DQMFileIterator::LumiEntry
Definition: DQMFileIterator.h:20
dqmservices::DQMFileIterator::nextLumiNumber_
unsigned int nextLumiNumber_
Definition: DQMFileIterator.h:108
dqmservices::DQMFileIterator::EorEntry::run_path
std::string run_path
Definition: DQMFileIterator.h:42
dqmservices::DQMFileIterator::fillDescription
static void fillDescription(edm::ParameterSetDescription &d)
Definition: DQMFileIterator.cc:398
dqmservices::DQMFileIterator::LumiEntry::get_json_path
std::string get_json_path() const
Definition: DQMFileIterator.cc:54
BXlumiParameters_cfi.lumi
lumi
Definition: BXlumiParameters_cfi.py:6
dqmservices::DQMFileIterator::mon_
edm::Service< DQMMonitoringService > mon_
Definition: DQMFileIterator.h:125
submitPVValidationJobs.split
def split(sequence, size)
Definition: submitPVValidationJobs.py:352
str
#define str(s)
Definition: TestProcessor.cc:51
dqmservices::DQMFileIterator::DQMFileIterator
DQMFileIterator(edm::ParameterSet const &pset)
Definition: DQMFileIterator.cc:75
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
submitPVResolutionJobs.count
count
Definition: submitPVResolutionJobs.py:352
dqmservices::DQMFileIterator::mtimeHash
unsigned mtimeHash() const
Definition: DQMFileIterator.cc:183
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
edm::FlushMessageLog
void FlushMessageLog()
Definition: MessageLogger.cc:34
dqmservices::DQMFileIterator::LumiEntry::get_data_path
std::string get_data_path() const
Definition: DQMFileIterator.cc:45
edm::ParameterSet
Definition: ParameterSet.h:47
dqmservices::DQMFileIterator::state
State state()
Definition: DQMFileIterator.cc:122
dqmservices::DQMFileIterator::lumiReady
bool lumiReady()
Definition: DQMFileIterator.cc:130
edm::LogAbsolute
Log< level::System, true > LogAbsolute
Definition: MessageLogger.h:134
dqmservices::DQMFileIterator::eor_
EorEntry eor_
Definition: DQMFileIterator.h:105
PixelMapPlotter.reason
reason
Definition: PixelMapPlotter.py:509
dqmservices::DQMFileIterator::advanceToLumi
void advanceToLumi(unsigned int lumi, std::string reason)
Definition: DQMFileIterator.cc:148
dqmservices::DQMFileIterator::runPathMTime_
unsigned runPathMTime_
Definition: DQMFileIterator.h:114
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
writedatasetfile.run
run
Definition: writedatasetfile.py:27
dqmservices::DQMFileIterator::runInputDir_
std::string runInputDir_
Definition: DQMFileIterator.h:94
dqmservices::DQMFileIterator::EorEntry::filename
std::string filename
Definition: DQMFileIterator.h:41
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
personalPlayback.fn
fn
Definition: personalPlayback.py:515
dqmservices::DQMFileIterator::update_state
void update_state()
Definition: DQMFileIterator.cc:317
dqmservices::DQMFileIterator::nextLumiTimeoutMillis_
long nextLumiTimeoutMillis_
Definition: DQMFileIterator.h:97
dqmservices::DQMFileIterator::monUpdateLumi
void monUpdateLumi(const LumiEntry &lumi)
Definition: DQMFileIterator.cc:174
mps_fire.result
result
Definition: mps_fire.py:311
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
TimeOfDay.h
dqmservices::DQMFileIterator::runPath_
std::vector< std::string > runPath_
Definition: DQMFileIterator.h:103
dqmservices::DQMFileIterator::LumiEntry::run_path
std::string run_path
Definition: DQMFileIterator.h:22
dqmservices::DQMFileIterator::EorEntry::loaded
bool loaded
Definition: DQMFileIterator.h:40
dqmservices::DQMFileIterator::forceFileCheckTimeoutMillis_
long forceFileCheckTimeoutMillis_
Definition: DQMFileIterator.h:98
dqmservices::DQMFileIterator::EorEntry::n_lumi
std::size_t n_lumi
Definition: DQMFileIterator.h:45
dqmservices::DQMFileIterator::LumiEntry::filename
std::string filename
Definition: DQMFileIterator.h:21
lumi
Definition: LumiSectionData.h:20
dqmservices::DQMFileIterator::logLumiState
void logLumiState(const LumiEntry &lumi, const std::string &msg)
Definition: DQMFileIterator.cc:381
label
const char * label
Definition: PFTauDecayModeTools.cc:11
dqmservices::DQMFileIterator::lastLumiFound
unsigned int lastLumiFound()
Definition: DQMFileIterator.cc:140
dqmservices::DQMFileIterator::State
State
Definition: DQMFileIterator.h:50
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
GetRecoTauVFromDQM_MC_cff.next
next
Definition: GetRecoTauVFromDQM_MC_cff.py:31
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
dqmservices::DQMFileIterator::~DQMFileIterator
~DQMFileIterator()
Definition: DQMFileIterator.cc:90
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:318
dqmservices::DQMFileIterator::lastLumiLoad_
std::chrono::high_resolution_clock::time_point lastLumiLoad_
Definition: DQMFileIterator.h:118