6 #include <boost/algorithm/string.hpp>
7 #include <boost/algorithm/string/predicate.hpp>
8 #include <boost/filesystem.hpp>
9 #include <boost/property_tree/json_parser.hpp>
10 #include <boost/property_tree/ptree.hpp>
11 #include <boost/range.hpp>
12 #include <boost/regex.hpp>
13 #include <fmt/printf.h>
24 int datafn_position) {
25 boost::property_tree::ptree
pt;
32 lumi.n_events_processed =
std::next(
pt.get_child(
"data").begin(), 0)->
second.get_value<std::size_t>();
36 lumi.file_ls = lumiNumber;
38 if (datafn_position >= 0) {
46 if (boost::starts_with(datafn,
"/"))
60 boost::property_tree::ptree
pt;
76 runNumber_ =
pset.getUntrackedParameter<
unsigned int>(
"runNumber");
95 std::vector<std::string> tokens;
98 for (
auto token : tokens) {
113 if (
mon_.isAvailable()) {
117 doc.put(
"fi_state", std::to_string(
state_));
150 using boost::property_tree::ptree;
157 auto iter =
lumiSeen_.lower_bound(currentLumi);
160 iter->second.state =
reason;
166 if (
mon_.isAvailable()) {
175 if (!
mon_.isAvailable())
179 doc.put(fmt::sprintf(
"extra.lumi_seen.lumi%06d",
lumi.file_ls),
lumi.state);
184 unsigned mtime_now = 0;
187 if (!std::filesystem::exists(
path))
190 auto write_time = std::filesystem::last_write_time(
path);
192 mtime_now ^ std::chrono::duration_cast<std::chrono::microseconds>(write_time.time_since_epoch()).
count();
206 if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
223 using std::filesystem::directory_entry;
224 using std::filesystem::directory_iterator;
229 if (!std::filesystem::exists(runPath)) {
235 directory_iterator dend;
236 for (directory_iterator di(runPath); di != dend; ++di) {
237 const boost::regex fn_re(
"run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
271 std::string msg(
"Found and skipped json file (stream label mismatch, ");
298 if (!fn_eor.empty()) {
318 using std::chrono::duration_cast;
319 using std::chrono::high_resolution_clock;
320 using std::chrono::milliseconds;
331 state_ = State::EOR_CLOSING;
340 auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
345 msg +=
", nextLumiNumber_ is now " + std::to_string(iter->first);
353 if (
state_ == State::EOR_CLOSING) {
365 if (
state_ != old_state) {
366 logFileAction(
"Streamer state changed: ", std::to_string(old_state) +
"->" + std::to_string(
state_));
370 doc.put(
"fi_state", std::to_string(
state_));
387 logFileAction(
"Internal error: referenced lumi is not the map.");
392 if (
mon_.isAvailable())
399 desc.addUntracked<
unsigned int>(
"runNumber")->setComment(
"Run number passed via configuration file.");
401 desc.addUntracked<
unsigned int>(
"datafnPosition", 3)
403 "Data filename position in the positional arguments array 'data' in "
406 desc.addUntracked<
std::string>(
"streamLabel")->setComment(
"Stream label used in json discovery.");
408 desc.addUntracked<uint32_t>(
"delayMillis")->setComment(
"Number of milliseconds to wait between file checks.");
410 desc.addUntracked<int32_t>(
"nextLumiTimeoutMillis", -1)
412 "Number of milliseconds to wait before switching to the next lumi "
413 "section if the current is missing, -1 to disable.");
415 desc.addUntracked<
bool>(
"scanOnce",
false)
417 "Don't repeat file scans: use what was found during the initial scan. "
418 "EOR file is ignored and the state is set to 'past end of run'.");
420 desc.addUntracked<
std::string>(
"runInputDir")->setComment(
"Directory where the DQM files will appear.");