6 #include <boost/algorithm/string.hpp>
7 #include <boost/algorithm/string/predicate.hpp>
8 #include <boost/property_tree/json_parser.hpp>
9 #include <boost/property_tree/ptree.hpp>
10 #include <boost/range.hpp>
11 #include <boost/regex.hpp>
12 #include <fmt/printf.h>
23 int datafn_position) {
24 boost::property_tree::ptree
pt;
31 lumi.n_events_processed =
std::next(
pt.get_child(
"data").begin(), 0)->
second.get_value<std::size_t>();
35 lumi.file_ls = lumiNumber;
37 if (datafn_position >= 0) {
45 if (boost::starts_with(datafn,
"/"))
59 boost::property_tree::ptree
pt;
75 runNumber_ =
pset.getUntrackedParameter<
unsigned int>(
"runNumber");
94 std::vector<std::string> tokens;
97 for (
const auto&
token : tokens) {
112 if (
mon_.isAvailable()) {
116 doc.put(
"fi_state", std::to_string(
state_));
149 using boost::property_tree::ptree;
156 auto iter =
lumiSeen_.lower_bound(currentLumi);
159 iter->second.state =
reason;
165 if (
mon_.isAvailable()) {
174 if (!
mon_.isAvailable())
178 doc.put(fmt::sprintf(
"extra.lumi_seen.lumi%06d",
lumi.file_ls),
lumi.state);
183 unsigned mtime_now = 0;
186 if (!std::filesystem::exists(
path))
189 auto write_time = std::filesystem::last_write_time(
path);
191 mtime_now ^ std::chrono::duration_cast<std::chrono::microseconds>(write_time.time_since_epoch()).
count();
205 if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
222 using std::filesystem::directory_entry;
223 using std::filesystem::directory_iterator;
227 for (
const auto& runPath :
runPath_) {
228 if (!std::filesystem::exists(runPath)) {
234 directory_iterator dend;
235 for (directory_iterator di(runPath); di != dend; ++di) {
236 const boost::regex fn_re(
"run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
270 std::string msg(
"Found and skipped json file (stream label mismatch, ");
297 if (!fn_eor.empty()) {
317 using std::chrono::duration_cast;
318 using std::chrono::high_resolution_clock;
319 using std::chrono::milliseconds;
330 state_ = State::EOR_CLOSING;
339 auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
344 msg +=
", nextLumiNumber_ is now " + std::to_string(iter->first);
352 if (
state_ == State::EOR_CLOSING) {
364 if (
state_ != old_state) {
365 logFileAction(
"Streamer state changed: ", std::to_string(old_state) +
"->" + std::to_string(
state_));
369 doc.put(
"fi_state", std::to_string(
state_));
386 logFileAction(
"Internal error: referenced lumi is not the map.");
391 if (
mon_.isAvailable())
398 desc.addUntracked<
unsigned int>(
"runNumber")->setComment(
"Run number passed via configuration file.");
400 desc.addUntracked<
unsigned int>(
"datafnPosition", 3)
402 "Data filename position in the positional arguments array 'data' in "
405 desc.addUntracked<
std::string>(
"streamLabel")->setComment(
"Stream label used in json discovery.");
407 desc.addUntracked<uint32_t>(
"delayMillis")->setComment(
"Number of milliseconds to wait between file checks.");
409 desc.addUntracked<int32_t>(
"nextLumiTimeoutMillis", -1)
411 "Number of milliseconds to wait before switching to the next lumi "
412 "section if the current is missing, -1 to disable.");
414 desc.addUntracked<
bool>(
"scanOnce",
false)
416 "Don't repeat file scans: use what was found during the initial scan. "
417 "EOR file is ignored and the state is set to 'past end of run'.");
419 desc.addUntracked<
std::string>(
"runInputDir")->setComment(
"Directory where the DQM files will appear.");