76 std::atomic<std::uint_least64_t>
m_runs;
94 : m_debug(
false), m_lastUpdate(0), m_events(0), m_lumis(0), m_runs(0), m_files(0) {
96 if (
pset.exists(
"debug")) {
113 if (
pset.exists(
"updateIntervalSeconds")) {
116 if (
pset.exists(
"EMAInterval")) {
119 if (
pset.exists(
"tag")) {
160 std::vector<edm::LuminosityBlockRange> toProcess =
pset.getUntrackedParameter<std::vector<LuminosityBlockRange>>(
161 "lumisToProcess", std::vector<LuminosityBlockRange>());
164 for (
auto const &
range : toProcess) {
171 lumiCount += (
range.endLumi() -
range.startLumi());
174 unsigned int eventsPerLumi =
pset.getUntrackedParameter<
unsigned int>(
"numberEventsInLuminosityBlock", 0);
175 if ((lumiCount == 0) && (
maxEvents > 0) && (eventsPerLumi > 0)) {
176 lumiCount = static_cast<unsigned int>(
std::ceil(static_cast<float>(
maxEvents) / static_cast<float>(eventsPerLumi)));
180 pset.getUntrackedParameter<std::vector<std::string>>(
"fileNames", std::vector<std::string>());
181 std::stringstream ss_max_files;
187 maxLumis = lumiCount;
189 if (maxLumis > static_cast<int>(lumiCount)) {
190 maxLumis = lumiCount;
194 std::stringstream ss_max_events;
199 std::stringstream ss_max_lumis;
200 ss_max_lumis << maxLumis;
215 return std::getenv(
"_CONDOR_CHIRP_CONFIG") &&
updateChirp(
"Elapsed",
"0");
243 std::cout <<
"At post, CPU service is NOT available.\n";
286 float ema_coeff = 1 -
std::exp(-static_cast<float>(sinceLastUpdate) /
288 if (sinceLastUpdate > 0) {
291 (1.0 - ema_coeff) *
m_rate;
301 site.erase(std::remove_if(site.begin(), site.end(), [](
char x) {
return !isalnum(x) && (x !=
'_'); }),
303 auto &iostats = iter.second;
304 updateChirp(
"IOSite_" + site +
"_ReadBytes", iostats.bytesRead);
306 std::chrono::duration_cast<std::chrono::milliseconds>(iostats.transferTime).count());
320 for (
const auto &storage :
stats) {
325 if (storage.first ==
token.value()) {
328 for (
const auto &
counter : storage.second) {
330 readOps +=
counter.second.successes;
332 readBytes +=
counter.second.amount;
333 readTimeTotal +=
counter.second.timeTotal;
335 readVOps +=
counter.second.successes;
336 readSegs +=
counter.second.vector_count;
337 readBytes +=
counter.second.amount;
338 readTimeTotal +=
counter.second.timeTotal;
341 writeBytes +=
counter.second.amount;
342 writeTimeTotal +=
counter.second.timeTotal;
350 updateChirp(
"ReadTimeMsecs", readTimeTotal / (1000 * 1000));
352 updateChirp(
"WriteTimeMsecs", writeTimeTotal / (1000 * 1000));
355 template <
typename T>
357 std::stringstream
ss;
369 value_copy.begin(), value_copy.end(), [](
const char &
c) {
return !isascii(
c) || (
c ==
'"') || (
c ==
'\\'); }),
375 std::stringstream
ss;
376 ss <<
"ChirpCMSSW" <<
m_tag << key_suffix;
379 std::cout <<
"condor_chirp set_job_attr_delayed " <<
key <<
" " <<
value << std::endl;
382 posix_spawn_file_actions_t file_actions;
383 int devnull_fd = open(
"/dev/null", O_RDWR);
384 if (devnull_fd == -1) {
387 posix_spawn_file_actions_init(&file_actions);
388 posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 1);
389 posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 2);
391 const std::string set_job_attr =
"set_job_attr_delayed";
392 std::vector<const char *>
argv;
393 argv.push_back(chirp_name.c_str());
394 argv.push_back(set_job_attr.c_str());
397 argv.push_back(
nullptr);
398 int status = posix_spawnp(&pid,
"condor_chirp", &file_actions,
nullptr, const_cast<char *const *>(&
argv[0]), environ);
400 posix_spawn_file_actions_destroy(&file_actions);
404 while ((waitpid(pid, &
status, 0) == -1) && errno == -EINTR) {
411 desc.
setComment(
"Service to update HTCondor with the current CMSSW status.");
413 ->setComment(
"Interval, in seconds, for HTCondor updates");
414 desc.
addOptionalUntracked<
bool>(
"debug",
false)->setComment(
"Enable debugging of this service");
416 ->setComment(
"Interval, in seconds, to calculate event rate over (using EMA)");
418 "Identifier tag for this process (a value of 'Foo' results in ClassAd attributes of the form 'ChirpCMSSWFoo*')");
419 descriptions.
add(
"CondorStatusService", desc);