CMS 3D CMS Logo

CondorStatusUpdater.cc
Go to the documentation of this file.
1 
18 
19 #include <fcntl.h>
20 #include <unistd.h>
21 #include <sys/wait.h>
22 #include <spawn.h>
23 #include <iostream>
24 #include <fstream>
25 #include <sstream>
26 #include <cmath>
27 #include <chrono>
28 #include <sstream>
29 #include <atomic>
30 #include <string>
31 #include <set>
32 
33 namespace edm {
34 
35  namespace service {
36 
38  public:
41  CondorStatusService(const CondorStatusService &) = delete;
43 
44  static void fillDescriptions(ConfigurationDescriptions &descriptions);
45 
46  private:
47  bool isChirpSupported();
48  template <typename T>
49  bool updateChirp(const std::string &key_suffix, const T &value);
50  bool updateChirpQuoted(const std::string &key_suffix, const std::string &value);
51  bool updateChirpImpl(std::string const &key, std::string const &value);
52  inline void update();
53  void firstUpdate();
54  void secondUpdate();
55  void lastUpdate();
56  void updateImpl(std::chrono::steady_clock::duration sinceLastUpdate);
57 
58  void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown);
59  void eventPost(StreamContext const &iContext);
60  void lumiPost(GlobalContext const &);
61  void runPost(GlobalContext const &);
62  void beginPre(PathsAndConsumesOfModulesBase const &, ProcessContext const &processContext);
63  void beginPost();
64  void endPost();
65  void filePost(std::string const &);
66 
67  bool m_debug;
68  std::atomic_flag m_shouldUpdate;
69  std::chrono::steady_clock::time_point m_beginJob;
70  std::chrono::steady_clock::duration m_updateInterval =
71  std::chrono::duration_cast<std::chrono::steady_clock::duration>(m_defaultUpdateInterval);
73  float m_rate = 0;
74  static constexpr float m_defaultEmaInterval = 15 * 60; // Time in seconds to average EMA over for event rate.
75  static constexpr std::chrono::minutes m_defaultUpdateInterval = std::chrono::minutes(3);
76  std::atomic<std::chrono::steady_clock::time_point> m_lastUpdate;
77  std::atomic<std::uint_least64_t> m_events;
78  std::atomic<std::uint_least64_t> m_lumis;
79  std::atomic<std::uint_least64_t> m_runs;
80  std::atomic<std::uint_least64_t> m_files;
83 
84  std::uint_least64_t m_lastEventCount = 0;
85  };
86  inline bool isProcessWideService(CondorStatusService const *) { return true; }
87 
88  } // namespace service
89 
90 } // namespace edm
91 
92 using namespace edm::service;
93 
96 
98  : m_debug(pset.getUntrackedParameter("debug", false)),
99  m_lastUpdate(),
100  m_events(0),
101  m_lumis(0),
102  m_runs(0),
103  m_files(0) {
104  m_shouldUpdate.clear();
105  if (not pset.getUntrackedParameter("enable", true)) {
106  return;
107  }
108  if (!isChirpSupported()) {
109  return;
110  }
111 
112  firstUpdate();
113 
121 
122  if (pset.exists("updateIntervalSeconds")) {
123  m_updateInterval = std::chrono::seconds(pset.getUntrackedParameter<unsigned int>("updateIntervalSeconds"));
124  }
125  if (pset.exists("EMAInterval")) {
126  m_emaInterval = pset.getUntrackedParameter<double>("EMAInterval");
127  }
128  if (pset.exists("tag")) {
129  m_tag = pset.getUntrackedParameter<std::string>("tag");
130  }
131 }
132 
134  m_events++;
135  update();
136 }
137 
139  m_lumis++;
140  update();
141 }
142 
144  m_runs++;
145  update();
146 }
147 
149  m_files++;
150  update();
151 }
152 
154  secondUpdate();
156  m_processParameterSetID = processContext.parameterSetID();
157  }
158 }
159 
161  ParameterSet const &processParameterSet = edm::getParameterSet(m_processParameterSetID);
162  const edm::ParameterSet &pset = processParameterSet.getParameterSet("@main_input");
163  // PSet info from edm::ScheduleItems
164  int maxEvents =
165  processParameterSet.getUntrackedParameterSet("maxEvents", ParameterSet()).getUntrackedParameter<int>("input", -1);
166  int maxLumis = processParameterSet.getUntrackedParameterSet("maxLuminosityBlocks", ParameterSet())
167  .getUntrackedParameter<int>("input", -1);
168 
169  // lumisToProcess from EventSkipperByID (PoolSource and similar)
170  std::vector<edm::LuminosityBlockRange> toProcess = pset.getUntrackedParameter<std::vector<LuminosityBlockRange>>(
171  "lumisToProcess", std::vector<LuminosityBlockRange>());
172  edm::sortAndRemoveOverlaps(toProcess);
173  uint64_t lumiCount = 0;
174  for (auto const &range : toProcess) {
175  if (range.startRun() != range.endRun()) {
176  break;
177  }
179  break;
180  }
181  lumiCount += (range.endLumi() - range.startLumi());
182  }
183  // Handle sources deriving from ProducerSourceBase
184  unsigned int eventsPerLumi = pset.getUntrackedParameter<unsigned int>("numberEventsInLuminosityBlock", 0);
185  if ((lumiCount == 0) && (maxEvents > 0) && (eventsPerLumi > 0)) {
186  lumiCount = static_cast<unsigned int>(std::ceil(static_cast<float>(maxEvents) / static_cast<float>(eventsPerLumi)));
187  }
188 
189  std::vector<std::string> fileNames =
190  pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>());
191  std::stringstream ss_max_files;
192  ss_max_files << fileNames.size();
193  updateChirp("MaxFiles", ss_max_files.str());
194 
195  if (lumiCount > 0) {
196  if (maxLumis < 0) {
197  maxLumis = lumiCount;
198  }
199  if (maxLumis > static_cast<int>(lumiCount)) {
200  maxLumis = lumiCount;
201  }
202  }
203  if (maxEvents > 0) {
204  std::stringstream ss_max_events;
205  ss_max_events << maxEvents;
206  updateChirp("MaxEvents", ss_max_events.str());
207  }
208  if (maxLumis > 0) {
209  std::stringstream ss_max_lumis;
210  ss_max_lumis << maxLumis;
211  updateChirp("MaxLumis", ss_max_lumis.str());
212  }
213  update();
214 }
215 
217 
219  if (m_debug) {
220  return true;
221  }
222 
223  return std::getenv("_CONDOR_CHIRP_CONFIG") && updateChirp("Elapsed", "0");
224 }
225 
227  // Note we always update all our statistics to 0 / false / -1
228  // This allows us to overwrite the activities of a previous cmsRun process
229  // within this HTCondor job.
230  updateImpl(std::chrono::steady_clock::duration());
231  updateChirp("MaxFiles", "-1");
232  updateChirp("MaxEvents", "-1");
233  updateChirp("MaxLumis", "-1");
234  updateChirp("Done", "false");
237  if (m_beginJob == decltype(m_beginJob)()) {
239  }
240 }
241 
243  edm::Service<edm::ResourceInformation> resourceInformationService;
244  if (resourceInformationService.isAvailable()) {
245  std::string models = resourceInformationService->cpuModelsFormatted();
246  double avgSpeed = resourceInformationService->cpuAverageSpeed();
247  if (!models.empty()) {
248  updateChirpQuoted("CPUModels", models);
249  updateChirp("CPUSpeed", avgSpeed);
250  }
251  }
252 }
253 
256  updateImpl(now - m_lastUpdate.load());
257  updateChirp("Done", "true");
258  edm::Service<edm::ResourceInformation> resourceInformationService;
259  if (!resourceInformationService.isAvailable()) {
260  edm::LogWarning("CondorStatusService") << "At post, ResourceInformationService is NOT available.\n";
261  }
262 }
263 
266  if ((now - m_lastUpdate.load(std::memory_order_relaxed)) > m_updateInterval) {
267  if (!m_shouldUpdate.test_and_set(std::memory_order_acquire)) {
268  // Caught exception is rethrown
269  CMS_SA_ALLOW try {
270  auto sinceLastUpdate = now - m_lastUpdate.load();
271  m_lastUpdate = now;
272  updateImpl(sinceLastUpdate);
273  m_shouldUpdate.clear(std::memory_order_release);
274  } catch (...) {
275  m_shouldUpdate.clear(std::memory_order_release);
276  throw;
277  }
278  }
279  }
280 }
281 
282 void CondorStatusService::updateImpl(std::chrono::steady_clock::duration sinceLastUpdate) {
284  auto jobTime = std::chrono::duration<double, std::ratio<1, 1>>(now - m_beginJob).count();
285  auto secsSinceLastUpdate = std::chrono::duration_cast<std::chrono::seconds>(sinceLastUpdate).count();
286 
288  if (timingsvc.isAvailable()) {
289  updateChirp("TotalCPU", timingsvc->getTotalCPU());
290  }
291 
292  updateChirp("LastUpdate", std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()));
293 
294  if (!m_events || (m_events > m_lastEventCount)) {
295  updateChirp("Events", m_events);
296  }
297 
298  updateChirp("Lumis", m_lumis);
299 
300  updateChirp("Runs", m_runs);
301 
302  updateChirp("Files", m_files);
303 
304  float ema_coeff = 1 - std::exp(-static_cast<float>(secsSinceLastUpdate) /
305  std::max(std::min(m_emaInterval, static_cast<float>(jobTime)), 1.0f));
306  if (secsSinceLastUpdate > 0) {
307  updateChirp("Elapsed", jobTime);
308  m_rate = ema_coeff * static_cast<float>(m_events - m_lastEventCount) / static_cast<float>(secsSinceLastUpdate) +
309  (1.0 - ema_coeff) * m_rate;
311  updateChirp("EventRate", m_rate);
312  }
313 
314  // If Xrootd was used, pull the statistics from there.
316  if (xrdsvc.isAvailable()) {
317  for (auto const &iter : xrdsvc->condorUpdate()) {
318  std::string site = iter.first;
319  site.erase(std::remove_if(site.begin(), site.end(), [](char x) { return !isalnum(x) && (x != '_'); }),
320  site.end());
321  auto &iostats = iter.second;
322  updateChirp("IOSite_" + site + "_ReadBytes", iostats.bytesRead);
323  updateChirp("IOSite_" + site + "_ReadTimeMS",
324  std::chrono::duration_cast<std::chrono::milliseconds>(iostats.transferTime).count());
325  }
326  }
327 
328  using namespace edm::storage;
329  // Update storage account information
330  auto const &stats = StorageAccount::summary();
331  uint64_t readOps = 0;
332  uint64_t readVOps = 0;
333  uint64_t readSegs = 0;
334  uint64_t readBytes = 0;
335  uint64_t readTimeTotal = 0;
336  uint64_t writeBytes = 0;
337  uint64_t writeTimeTotal = 0;
338  const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
339  for (const auto &storage : stats) {
340  // StorageAccount records statistics for both the TFile layer and the
341  // StorageFactory layer. However, the StorageFactory statistics tend to
342  // be more accurate as various backends may alter the incoming read requests
343  // (such as when lazy-download is used).
344  if (storage.first == token.value()) {
345  continue;
346  }
347  for (const auto &counter : storage.second) {
348  if (counter.first == static_cast<int>(StorageAccount::Operation::read)) {
349  readOps += counter.second.successes;
350  readSegs++;
351  readBytes += counter.second.amount;
352  readTimeTotal += counter.second.timeTotal;
353  } else if (counter.first == static_cast<int>(StorageAccount::Operation::readv)) {
354  readVOps += counter.second.successes;
355  readSegs += counter.second.vector_count;
356  readBytes += counter.second.amount;
357  readTimeTotal += counter.second.timeTotal;
358  } else if ((counter.first == static_cast<int>(StorageAccount::Operation::write)) ||
359  (counter.first == static_cast<int>(StorageAccount::Operation::writev))) {
360  writeBytes += counter.second.amount;
361  writeTimeTotal += counter.second.timeTotal;
362  }
363  }
364  }
365  updateChirp("ReadOps", readOps);
366  updateChirp("ReadVOps", readVOps);
367  updateChirp("ReadSegments", readSegs);
368  updateChirp("ReadBytes", readBytes);
369  updateChirp("ReadTimeMsecs", readTimeTotal / (1000 * 1000));
370  updateChirp("WriteBytes", writeBytes);
371  updateChirp("WriteTimeMsecs", writeTimeTotal / (1000 * 1000));
372 }
373 
374 template <typename T>
375 bool CondorStatusService::updateChirp(const std::string &key_suffix, const T &value) {
376  std::stringstream ss;
377  ss << value;
378  return updateChirpImpl(key_suffix, ss.str());
379 }
380 
382  std::string value_copy = value;
383  // Remove double-quotes or the \ character (as it has special escaping semantics in ClassAds).
384  // Make sure we have ASCII characters.
385  // Otherwise, remainder is allowed (including tabs, newlines, single-quotes).
386  value_copy.erase(
387  remove_if(
388  value_copy.begin(), value_copy.end(), [](const char &c) { return !isascii(c) || (c == '"') || (c == '\\'); }),
389  value_copy.end());
390  return updateChirpImpl(key_suffix, "\"" + value_copy + "\"");
391 }
392 
394  std::stringstream ss;
395  ss << "ChirpCMSSW" << m_tag << key_suffix;
396  std::string key = ss.str();
397  if (m_debug) {
398  std::cout << "condor_chirp set_job_attr_delayed " << key << " " << value << std::endl;
399  }
400  int pid = 0;
401  posix_spawn_file_actions_t file_actions;
402  int devnull_fd = open("/dev/null", O_RDWR);
403  if (devnull_fd == -1) {
404  return false;
405  }
406  posix_spawn_file_actions_init(&file_actions);
407  posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 1);
408  posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 2);
409  const std::string chirp_name = "condor_chirp";
410  const std::string set_job_attr = "set_job_attr_delayed";
411  std::vector<const char *> argv;
412  argv.push_back(chirp_name.c_str());
413  argv.push_back(set_job_attr.c_str());
414  argv.push_back(key.c_str());
415  argv.push_back(value.c_str());
416  argv.push_back(nullptr);
417  int status = posix_spawnp(&pid, "condor_chirp", &file_actions, nullptr, const_cast<char *const *>(&argv[0]), environ);
418  close(devnull_fd);
419  posix_spawn_file_actions_destroy(&file_actions);
420  if (status) {
421  return false;
422  }
423  while ((waitpid(pid, &status, 0) == -1) && errno == -EINTR) {
424  }
425  return status == 0;
426 }
427 
430  desc.setComment("Service to update HTCondor with the current CMSSW status.");
431  desc.addOptionalUntracked<unsigned int>(
432  "updateIntervalSeconds", std::chrono::duration_cast<std::chrono::seconds>(m_defaultUpdateInterval).count())
433  ->setComment("Interval, in seconds, for HTCondor updates");
434  desc.addOptionalUntracked<bool>("debug", false)->setComment("Enable debugging of this service");
435  desc.addOptionalUntracked<double>("EMAInterval", m_defaultEmaInterval)
436  ->setComment("Interval, in seconds, to calculate event rate over (using EMA)");
437  desc.addOptionalUntracked<std::string>("tag")->setComment(
438  "Identifier tag for this process (a value of 'Foo' results in ClassAd attributes of the form 'ChirpCMSSWFoo*')");
439  desc.addOptionalUntracked<bool>("enable", true)->setComment("Enable this service");
440  descriptions.add("CondorStatusService", desc);
441 }
442 
444 
bool updateChirpImpl(std::string const &key, std::string const &value)
constexpr int32_t ceil(float num)
CondorStatusService(ParameterSet const &pset, edm::ActivityRegistry &ar)
virtual double cpuAverageSpeed() const =0
#define CMS_SA_ALLOW
void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown)
#define DEFINE_FWK_SERVICE_MAKER(concrete, maker)
Definition: ServiceMaker.h:102
double seconds()
void runPost(GlobalContext const &)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
std::chrono::steady_clock::time_point m_beginJob
void beginPre(PathsAndConsumesOfModulesBase const &, ProcessContext const &processContext)
std::atomic< std::uint_least64_t > m_events
edm::serviceregistry::AllArgsMaker< edm::service::CondorStatusService > CondorStatusServiceMaker
void watchPostEvent(PostEvent::slot_type const &iSlot)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
bool isProcessWideService(TFileService const *)
Definition: TFileService.h:98
ParameterSet const & getParameterSet(std::string const &) const
void lumiPost(GlobalContext const &)
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: models.py:1
std::chrono::steady_clock::duration m_updateInterval
std::atomic< std::uint_least64_t > m_lumis
Guid const & processGUID()
Definition: processGUID.cc:4
bool isValid() const
Definition: Hash.h:141
T getUntrackedParameter(std::string const &, T const &) const
void watchPostCloseFile(PostCloseFile::slot_type const &iSlot)
bool updateChirpQuoted(const std::string &key_suffix, const std::string &value)
void filePost(std::string const &)
CondorStatusService & operator=(const CondorStatusService &)=delete
static constexpr float m_defaultEmaInterval
std::string toString(const char *format,...)
Definition: xdaq_compat.cc:4
std::atomic< std::uint_least64_t > m_runs
key
prepare the HTCondor submission files and eventually submit them
double f[11][100]
virtual std::string const & cpuModelsFormatted() const =0
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
Definition: value.py:1
virtual std::vector< std::pair< std::string, CondorIOStats > > condorUpdate()=0
void eventPost(StreamContext const &iContext)
unsigned long long uint64_t
Definition: Time.h:13
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::atomic< std::chrono::steady_clock::time_point > m_lastUpdate
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
ParameterSet const & getParameterSet(ParameterSetID const &id)
virtual double getTotalCPU() const =0
static constexpr std::chrono::minutes m_defaultUpdateInterval
bool updateChirp(const std::string &key_suffix, const T &value)
HLT enums.
std::vector< EventRange > & sortAndRemoveOverlaps(std::vector< EventRange > &eventRange)
Definition: EventRange.cc:98
float x
bool isAvailable() const
Definition: Service.h:40
static std::chrono::steady_clock::time_point jobStartTime()
Log< level::Warning, false > LogWarning
long double T
static void fillDescriptions(ConfigurationDescriptions &descriptions)
void updateImpl(std::chrono::steady_clock::duration sinceLastUpdate)
ParameterSetID const & parameterSetID() const
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::atomic< std::uint_least64_t > m_files