56 void updateImpl(std::chrono::steady_clock::duration sinceLastUpdate);
79 std::atomic<std::uint_least64_t>
m_runs;
98 : m_debug(
pset.getUntrackedParameter(
"debug",
false)),
105 if (not
pset.getUntrackedParameter(
"enable",
true)) {
122 if (
pset.exists(
"updateIntervalSeconds")) {
125 if (
pset.exists(
"EMAInterval")) {
128 if (
pset.exists(
"tag")) {
170 std::vector<edm::LuminosityBlockRange> toProcess =
pset.getUntrackedParameter<std::vector<LuminosityBlockRange>>(
171 "lumisToProcess", std::vector<LuminosityBlockRange>());
174 for (
auto const &
range : toProcess) {
181 lumiCount += (
range.endLumi() -
range.startLumi());
184 unsigned int eventsPerLumi =
pset.getUntrackedParameter<
unsigned int>(
"numberEventsInLuminosityBlock", 0);
185 if ((lumiCount == 0) && (
maxEvents > 0) && (eventsPerLumi > 0)) {
186 lumiCount =
static_cast<unsigned int>(
std::ceil(static_cast<float>(
maxEvents) / static_cast<float>(eventsPerLumi)));
190 pset.getUntrackedParameter<std::vector<std::string>>(
"fileNames", std::vector<std::string>());
191 std::stringstream ss_max_files;
197 maxLumis = lumiCount;
199 if (maxLumis > static_cast<int>(lumiCount)) {
200 maxLumis = lumiCount;
204 std::stringstream ss_max_events;
209 std::stringstream ss_max_lumis;
210 ss_max_lumis << maxLumis;
223 return std::getenv(
"_CONDOR_CHIRP_CONFIG") &&
updateChirp(
"Elapsed",
"0");
230 updateImpl(std::chrono::steady_clock::duration());
260 edm::LogWarning(
"CondorStatusService") <<
"At post, ResourceInformationService is NOT available.\n";
284 auto jobTime = std::chrono::duration<double, std::ratio<1, 1>>(
now -
m_beginJob).
count();
304 float ema_coeff = 1 -
std::exp(-static_cast<float>(secsSinceLastUpdate) /
306 if (secsSinceLastUpdate > 0) {
309 (1.0 - ema_coeff) *
m_rate;
319 site.erase(std::remove_if(site.begin(), site.end(), [](
char x) {
return !isalnum(
x) && (
x !=
'_'); }),
321 auto &iostats = iter.second;
322 updateChirp(
"IOSite_" + site +
"_ReadBytes", iostats.bytesRead);
324 std::chrono::duration_cast<std::chrono::milliseconds>(iostats.transferTime).count());
338 const auto token = StorageAccount::tokenForStorageClassName(
"tstoragefile");
339 for (
const auto &storage :
stats) {
344 if (storage.first ==
token.value()) {
347 for (
const auto &
counter : storage.second) {
349 readOps +=
counter.second.successes;
351 readBytes +=
counter.second.amount;
352 readTimeTotal +=
counter.second.timeTotal;
353 }
else if (
counter.first == static_cast<int>(StorageAccount::Operation::readv)) {
354 readVOps +=
counter.second.successes;
355 readSegs +=
counter.second.vector_count;
356 readBytes +=
counter.second.amount;
357 readTimeTotal +=
counter.second.timeTotal;
359 (
counter.first ==
static_cast<int>(StorageAccount::Operation::writev))) {
360 writeBytes +=
counter.second.amount;
361 writeTimeTotal +=
counter.second.timeTotal;
369 updateChirp(
"ReadTimeMsecs", readTimeTotal / (1000 * 1000));
371 updateChirp(
"WriteTimeMsecs", writeTimeTotal / (1000 * 1000));
374 template <
typename T>
376 std::stringstream
ss;
388 value_copy.begin(), value_copy.end(), [](
const char &
c) {
return !isascii(
c) || (
c ==
'"') || (
c ==
'\\'); }),
394 std::stringstream
ss;
395 ss <<
"ChirpCMSSW" <<
m_tag << key_suffix;
398 std::cout <<
"condor_chirp set_job_attr_delayed " <<
key <<
" " <<
value << std::endl;
401 posix_spawn_file_actions_t file_actions;
402 int devnull_fd = open(
"/dev/null", O_RDWR);
403 if (devnull_fd == -1) {
406 posix_spawn_file_actions_init(&file_actions);
407 posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 1);
408 posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 2);
410 const std::string set_job_attr =
"set_job_attr_delayed";
411 std::vector<const char *>
argv;
412 argv.push_back(chirp_name.c_str());
413 argv.push_back(set_job_attr.c_str());
416 argv.push_back(
nullptr);
417 int status = posix_spawnp(&pid,
"condor_chirp", &file_actions,
nullptr, const_cast<char *const *>(&
argv[0]), environ);
419 posix_spawn_file_actions_destroy(&file_actions);
423 while ((waitpid(pid, &
status, 0) == -1) && errno == -EINTR) {
430 desc.setComment(
"Service to update HTCondor with the current CMSSW status.");
431 desc.addOptionalUntracked<
unsigned int>(
433 ->setComment(
"Interval, in seconds, for HTCondor updates");
434 desc.addOptionalUntracked<
bool>(
"debug",
false)->setComment(
"Enable debugging of this service");
436 ->setComment(
"Interval, in seconds, to calculate event rate over (using EMA)");
438 "Identifier tag for this process (a value of 'Foo' results in ClassAd attributes of the form 'ChirpCMSSWFoo*')");
439 desc.addOptionalUntracked<
bool>(
"enable",
true)->setComment(
"Enable this service");
440 descriptions.
add(
"CondorStatusService",
desc);
bool updateChirpImpl(std::string const &key, std::string const &value)
constexpr int32_t ceil(float num)
CondorStatusService(ParameterSet const &pset, edm::ActivityRegistry &ar)
void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown)
#define DEFINE_FWK_SERVICE_MAKER(concrete, maker)
void runPost(GlobalContext const &)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
std::chrono::steady_clock::time_point m_beginJob
void beginPre(PathsAndConsumesOfModulesBase const &, ProcessContext const &processContext)
std::atomic< std::uint_least64_t > m_events
edm::serviceregistry::AllArgsMaker< edm::service::CondorStatusService > CondorStatusServiceMaker
void watchPostEvent(PostEvent::slot_type const &iSlot)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
bool isProcessWideService(TFileService const *)
ParameterSet const & getParameterSet(std::string const &) const
void lumiPost(GlobalContext const &)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::chrono::steady_clock::duration m_updateInterval
std::atomic< std::uint_least64_t > m_lumis
Guid const & processGUID()
T getUntrackedParameter(std::string const &, T const &) const
void watchPostCloseFile(PostCloseFile::slot_type const &iSlot)
bool updateChirpQuoted(const std::string &key_suffix, const std::string &value)
void filePost(std::string const &)
CondorStatusService & operator=(const CondorStatusService &)=delete
static constexpr float m_defaultEmaInterval
std::atomic< std::uint_least64_t > m_runs
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
virtual std::vector< std::pair< std::string, CondorIOStats > > condorUpdate()=0
std::uint_least64_t m_lastEventCount
void eventPost(StreamContext const &iContext)
unsigned long long uint64_t
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::atomic< std::chrono::steady_clock::time_point > m_lastUpdate
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
edm::ParameterSetID m_processParameterSetID
ParameterSet const & getParameterSet(ParameterSetID const &id)
virtual double getTotalCPU() const =0
static constexpr std::chrono::minutes m_defaultUpdateInterval
bool updateChirp(const std::string &key_suffix, const T &value)
std::vector< EventRange > & sortAndRemoveOverlaps(std::vector< EventRange > &eventRange)
std::atomic_flag m_shouldUpdate
static std::chrono::steady_clock::time_point jobStartTime()
Log< level::Warning, false > LogWarning
static void fillDescriptions(ConfigurationDescriptions &descriptions)
void updateImpl(std::chrono::steady_clock::duration sinceLastUpdate)
ParameterSetID const & parameterSetID() const
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::atomic< std::uint_least64_t > m_files