75 std::atomic<std::uint_least64_t>
m_runs;
95 if (pset.
exists(
"debug")) {
112 if (pset.
exists(
"updateIntervalSeconds")) {
115 if (pset.
exists(
"EMAInterval")) {
159 std::vector<edm::LuminosityBlockRange> toProcess = pset.
getUntrackedParameter<std::vector<LuminosityBlockRange>>(
160 "lumisToProcess", std::vector<LuminosityBlockRange>());
163 for (
auto const &range : toProcess) {
164 if (range.startRun() != range.endRun()) {
170 lumiCount += (range.endLumi() - range.startLumi());
173 unsigned int eventsPerLumi = pset.
getUntrackedParameter<
unsigned int>(
"numberEventsInLuminosityBlock", 0);
174 if ((lumiCount == 0) && (maxEvents > 0) && (eventsPerLumi > 0)) {
175 lumiCount =
static_cast<unsigned int>(std::ceil(static_cast<float>(maxEvents) / static_cast<float>(eventsPerLumi)));
180 std::stringstream ss_max_files;
181 ss_max_files << fileNames.size();
186 maxLumis = lumiCount;
188 if (maxLumis > static_cast<int>(lumiCount)) {
189 maxLumis = lumiCount;
193 std::stringstream ss_max_events;
198 std::stringstream ss_max_lumis;
199 ss_max_lumis << maxLumis;
214 return getenv(
"_CONDOR_CHIRP_CONFIG") &&
updateChirp(
"Elapsed",
"0");
242 std::cout <<
"At post, CPU service is NOT available.\n";
284 float ema_coeff = 1 -
std::exp(-static_cast<float>(sinceLastUpdate) /
286 if (sinceLastUpdate > 0) {
289 (1.0 - ema_coeff) *
m_rate;
299 site.erase(std::remove_if(site.begin(), site.end(), [](
char x) {
return !isalnum(x) && (x !=
'_'); }),
301 auto &iostats = iter.second;
302 updateChirp(
"IOSite_" + site +
"_ReadBytes", iostats.bytesRead);
304 std::chrono::duration_cast<std::chrono::milliseconds>(iostats.transferTime).count());
318 for (
const auto &storage : stats) {
323 if (storage.first == token.value()) {
326 for (
const auto &
counter : storage.second) {
328 readOps +=
counter.second.successes;
330 readBytes +=
counter.second.amount;
331 readTimeTotal +=
counter.second.timeTotal;
333 readVOps +=
counter.second.successes;
334 readSegs +=
counter.second.vector_count;
335 readBytes +=
counter.second.amount;
336 readTimeTotal +=
counter.second.timeTotal;
339 writeBytes +=
counter.second.amount;
340 writeTimeTotal +=
counter.second.timeTotal;
348 updateChirp(
"ReadTimeMsecs", readTimeTotal / (1000 * 1000));
350 updateChirp(
"WriteTimeMsecs", writeTimeTotal / (1000 * 1000));
353 template <
typename T>
355 std::stringstream ss;
367 value_copy.begin(), value_copy.end(), [](
const char &
c) {
return !isascii(
c) || (
c ==
'"') || (
c ==
'\\'); }),
373 std::stringstream ss;
374 ss <<
"ChirpCMSSW" <<
m_tag << key_suffix;
377 std::cout <<
"condor_chirp set_job_attr_delayed " << key <<
" " << value << std::endl;
380 posix_spawn_file_actions_t file_actions;
381 int devnull_fd = open(
"/dev/null", O_RDWR);
382 if (devnull_fd == -1) {
385 posix_spawn_file_actions_init(&file_actions);
386 posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 1);
387 posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 2);
389 const std::string set_job_attr =
"set_job_attr_delayed";
390 std::vector<const char *>
argv;
391 argv.push_back(chirp_name.c_str());
392 argv.push_back(set_job_attr.c_str());
393 argv.push_back(key.c_str());
394 argv.push_back(value.c_str());
395 argv.push_back(
nullptr);
396 int status = posix_spawnp(&pid,
"condor_chirp", &file_actions,
nullptr, const_cast<char *const *>(&argv[0]), environ);
398 posix_spawn_file_actions_destroy(&file_actions);
402 while ((waitpid(pid, &status, 0) == -1) && errno == -EINTR) {
409 desc.
setComment(
"Service to update HTCondor with the current CMSSW status.");
411 ->setComment(
"Interval, in seconds, for HTCondor updates");
412 desc.
addOptionalUntracked<
bool>(
"debug",
false)->setComment(
"Enable debugging of this service");
414 ->setComment(
"Interval, in seconds, to calculate event rate over (using EMA)");
416 "Identifier tag for this process (a value of 'Foo' results in ClassAd attributes of the form 'ChirpCMSSWFoo*')");
417 descriptions.
add(
"CondorStatusService", desc);
std::atomic< time_t > m_lastUpdate
static float m_defaultEmaInterval
bool updateChirpImpl(std::string const &key, std::string const &value)
T getUntrackedParameter(std::string const &, T const &) const
CondorStatusService(ParameterSet const &pset, edm::ActivityRegistry &ar)
void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown)
void runPost(GlobalContext const &)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void beginPre(PathsAndConsumesOfModulesBase const &, ProcessContext const &processContext)
virtual bool cpuInfo(std::string &models, double &avgSpeed)=0
CPU information - the models present and average speed.
void updateImpl(time_t secsSinceLastUpdate)
std::atomic< std::uint_least64_t > m_events
edm::serviceregistry::AllArgsMaker< edm::service::CondorStatusService > CondorStatusServiceMaker
ParameterSet const & getParameterSet(ParameterSetID const &id)
void watchPostEvent(PostEvent::slot_type const &iSlot)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
static const StorageStats & summary(void)
#define DEFINE_FWK_SERVICE_MAKER(concrete, maker)
void lumiPost(GlobalContext const &)
bool exists(std::string const ¶meterName) const
checks if a parameter exists
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::atomic< std::uint_least64_t > m_lumis
std::vector< std::pair< std::string, CondorIOStats > > condorUpdate()
ParameterSetID const & parameterSetID() const
void setComment(std::string const &value)
void watchPostCloseFile(PostCloseFile::slot_type const &iSlot)
static StorageClassToken tokenForStorageClassName(std::string const &iName)
bool updateChirpQuoted(const std::string &key_suffix, const std::string &value)
CondorStatusService & operator=(const CondorStatusService &)=delete
std::atomic< std::uint_least64_t > m_runs
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
virtual double getTotalCPU() const =0
std::uint_least64_t m_lastEventCount
void eventPost(StreamContext const &iContext)
unsigned long long uint64_t
ParameterSet const & getParameterSet(std::string const &) const
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
static unsigned int m_defaultUpdateInterval
edm::ParameterSetID m_processParameterSetID
bool updateChirp(const std::string &key_suffix, const T &value)
std::vector< EventRange > & sortAndRemoveOverlaps(std::vector< EventRange > &eventRange)
std::atomic_flag m_shouldUpdate
ParameterDescriptionBase * addOptionalUntracked(U const &iLabel, T const &value)
static void fillDescriptions(ConfigurationDescriptions &descriptions)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::atomic< std::uint_least64_t > m_files
void filePost(std::string const &, bool)