CMS 3D CMS Logo

CondorStatusUpdater.cc
Go to the documentation of this file.
1 
16 
17 #include <fcntl.h>
18 #include <unistd.h>
19 #include <sys/wait.h>
20 #include <spawn.h>
21 #include <iostream>
22 #include <fstream>
23 #include <sstream>
24 #include <cmath>
25 #include <chrono>
26 #include <sstream>
27 #include <atomic>
28 #include <string>
29 #include <set>
30 
31 namespace edm {
32 
33  namespace service {
34 
36  public:
39  CondorStatusService(const CondorStatusService &) = delete;
41 
42  static void fillDescriptions(ConfigurationDescriptions &descriptions);
43 
44  private:
45  bool isChirpSupported();
46  template <typename T>
47  bool updateChirp(const std::string &key_suffix, const T &value);
48  bool updateChirpQuoted(const std::string &key_suffix, const std::string &value);
49  bool updateChirpImpl(std::string const &key, std::string const &value);
50  inline void update();
51  void firstUpdate();
52  void lastUpdate();
53  void updateImpl(time_t secsSinceLastUpdate);
54 
55  void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown);
56  void eventPost(StreamContext const &iContext);
57  void lumiPost(GlobalContext const &);
58  void runPost(GlobalContext const &);
59  void beginPre(PathsAndConsumesOfModulesBase const &, ProcessContext const &processContext);
60  void beginPost();
61  void endPost();
62  void filePost(std::string const &, bool);
63 
64  bool m_debug;
65  std::atomic_flag m_shouldUpdate;
66  time_t m_beginJob = 0;
69  float m_rate = 0;
70  static constexpr float m_defaultEmaInterval = 15 * 60; // Time in seconds to average EMA over for event rate.
71  static constexpr unsigned int m_defaultUpdateInterval = 3 * 60;
72  std::atomic<time_t> m_lastUpdate;
73  std::atomic<std::uint_least64_t> m_events;
74  std::atomic<std::uint_least64_t> m_lumis;
75  std::atomic<std::uint_least64_t> m_runs;
76  std::atomic<std::uint_least64_t> m_files;
79 
80  std::uint_least64_t m_lastEventCount = 0;
81  };
82 
83  } // namespace service
84 
85 } // namespace edm
86 
87 using namespace edm::service;
88 
91 
93  : m_debug(false), m_lastUpdate(0), m_events(0), m_lumis(0), m_runs(0), m_files(0) {
94  m_shouldUpdate.clear();
95  if (pset.exists("debug")) {
96  m_debug = true;
97  }
98  if (!isChirpSupported()) {
99  return;
100  }
101 
102  firstUpdate();
103 
111 
112  if (pset.exists("updateIntervalSeconds")) {
113  m_updateInterval = pset.getUntrackedParameter<unsigned int>("updateIntervalSeconds");
114  }
115  if (pset.exists("EMAInterval")) {
116  m_emaInterval = pset.getUntrackedParameter<double>("EMAInterval");
117  }
118  if (pset.exists("tag")) {
119  m_tag = pset.getUntrackedParameter<std::string>("tag");
120  }
121 }
122 
124  m_events++;
125  update();
126 }
127 
129  m_lumis++;
130  update();
131 }
132 
134  m_runs++;
135  update();
136 }
137 
138 void CondorStatusService::filePost(std::string const & /*lfn*/, bool /*usedFallback*/) {
139  m_files++;
140  update();
141 }
142 
145  m_processParameterSetID = processContext.parameterSetID();
146  }
147 }
148 
150  ParameterSet const &processParameterSet = edm::getParameterSet(m_processParameterSetID);
151  const edm::ParameterSet &pset = processParameterSet.getParameterSet("@main_input");
152  // PSet info from edm::ScheduleItems
153  int maxEvents =
154  processParameterSet.getUntrackedParameterSet("maxEvents", ParameterSet()).getUntrackedParameter<int>("input", -1);
155  int maxLumis = processParameterSet.getUntrackedParameterSet("maxLuminosityBlocks", ParameterSet())
156  .getUntrackedParameter<int>("input", -1);
157 
158  // lumisToProcess from EventSkipperByID (PoolSource and similar)
159  std::vector<edm::LuminosityBlockRange> toProcess = pset.getUntrackedParameter<std::vector<LuminosityBlockRange>>(
160  "lumisToProcess", std::vector<LuminosityBlockRange>());
161  edm::sortAndRemoveOverlaps(toProcess);
162  uint64_t lumiCount = 0;
163  for (auto const &range : toProcess) {
164  if (range.startRun() != range.endRun()) {
165  break;
166  }
168  break;
169  }
170  lumiCount += (range.endLumi() - range.startLumi());
171  }
172  // Handle sources deriving from ProducerSourceBase
173  unsigned int eventsPerLumi = pset.getUntrackedParameter<unsigned int>("numberEventsInLuminosityBlock", 0);
174  if ((lumiCount == 0) && (maxEvents > 0) && (eventsPerLumi > 0)) {
175  lumiCount = static_cast<unsigned int>(std::ceil(static_cast<float>(maxEvents) / static_cast<float>(eventsPerLumi)));
176  }
177 
178  std::vector<std::string> fileNames =
179  pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>());
180  std::stringstream ss_max_files;
181  ss_max_files << fileNames.size();
182  updateChirp("MaxFiles", ss_max_files.str());
183 
184  if (lumiCount > 0) {
185  if (maxLumis < 0) {
186  maxLumis = lumiCount;
187  }
188  if (maxLumis > static_cast<int>(lumiCount)) {
189  maxLumis = lumiCount;
190  }
191  }
192  if (maxEvents > 0) {
193  std::stringstream ss_max_events;
194  ss_max_events << maxEvents;
195  updateChirp("MaxEvents", ss_max_events.str());
196  }
197  if (maxLumis > 0) {
198  std::stringstream ss_max_lumis;
199  ss_max_lumis << maxLumis;
200  updateChirp("MaxLumis", ss_max_lumis.str());
201  }
202 
203  m_beginJob = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
204  update();
205 }
206 
208 
210  if (m_debug) {
211  return true;
212  }
213 
214  return std::getenv("_CONDOR_CHIRP_CONFIG") && updateChirp("Elapsed", "0");
215 }
216 
218  // Note we always update all our statistics to 0 / false / -1
219  // This allows us to overwrite the activities of a previous cmsRun process
220  // within this HTCondor job.
221  updateImpl(0);
222  updateChirp("MaxFiles", "-1");
223  updateChirp("MaxEvents", "-1");
224  updateChirp("MaxLumis", "-1");
225  updateChirp("Done", "false");
226 
229  double avgSpeed;
230  if (cpusvc.isAvailable() && cpusvc->cpuInfo(models, avgSpeed)) {
231  updateChirpQuoted("CPUModels", models);
232  updateChirp("CPUSpeed", avgSpeed);
233  }
234 }
235 
237  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
238  updateImpl(now - m_lastUpdate);
239  updateChirp("Done", "true");
241  if (!cpusvc.isAvailable()) {
242  std::cout << "At post, CPU service is NOT available.\n";
243  }
244 }
245 
247  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
248  if ((now - m_lastUpdate.load(std::memory_order_relaxed)) > m_updateInterval) {
249  if (!m_shouldUpdate.test_and_set(std::memory_order_acquire)) {
250  try {
251  time_t sinceLastUpdate = now - m_lastUpdate;
252  m_lastUpdate = now;
253  updateImpl(sinceLastUpdate);
254  m_shouldUpdate.clear(std::memory_order_release);
255  } catch (...) {
256  m_shouldUpdate.clear(std::memory_order_release);
257  throw;
258  }
259  }
260  }
261 }
262 
263 void CondorStatusService::updateImpl(time_t sinceLastUpdate) {
264  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
265  time_t jobTime = now - m_beginJob;
266 
268  if (timingsvc.isAvailable()) {
269  updateChirp("TotalCPU", timingsvc->getTotalCPU());
270  }
271 
272  updateChirp("LastUpdate", now);
273 
274  if (!m_events || (m_events > m_lastEventCount)) {
275  updateChirp("Events", m_events);
276  }
277 
278  updateChirp("Lumis", m_lumis);
279 
280  updateChirp("Runs", m_runs);
281 
282  updateChirp("Files", m_files);
283 
284  float ema_coeff = 1 - std::exp(-static_cast<float>(sinceLastUpdate) /
285  std::max(std::min(m_emaInterval, static_cast<float>(jobTime)), 1.0f));
286  if (sinceLastUpdate > 0) {
287  updateChirp("Elapsed", jobTime);
288  m_rate = ema_coeff * static_cast<float>(m_events - m_lastEventCount) / static_cast<float>(sinceLastUpdate) +
289  (1.0 - ema_coeff) * m_rate;
291  updateChirp("EventRate", m_rate);
292  }
293 
294  // If Xrootd was used, pull the statistics from there.
296  if (xrdsvc.isAvailable()) {
297  for (auto const &iter : xrdsvc->condorUpdate()) {
298  std::string site = iter.first;
299  site.erase(std::remove_if(site.begin(), site.end(), [](char x) { return !isalnum(x) && (x != '_'); }),
300  site.end());
301  auto &iostats = iter.second;
302  updateChirp("IOSite_" + site + "_ReadBytes", iostats.bytesRead);
303  updateChirp("IOSite_" + site + "_ReadTimeMS",
304  std::chrono::duration_cast<std::chrono::milliseconds>(iostats.transferTime).count());
305  }
306  }
307 
308  // Update storage account information
309  auto const &stats = StorageAccount::summary();
310  uint64_t readOps = 0;
311  uint64_t readVOps = 0;
312  uint64_t readSegs = 0;
313  uint64_t readBytes = 0;
314  uint64_t readTimeTotal = 0;
315  uint64_t writeBytes = 0;
316  uint64_t writeTimeTotal = 0;
317  const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
318  for (const auto &storage : stats) {
319  // StorageAccount records statistics for both the TFile layer and the
320  // StorageFactory layer. However, the StorageFactory statistics tend to
321  // be more accurate as various backends may alter the incoming read requests
322  // (such as when lazy-download is used).
323  if (storage.first == token.value()) {
324  continue;
325  }
326  for (const auto &counter : storage.second) {
327  if (counter.first == static_cast<int>(StorageAccount::Operation::read)) {
328  readOps += counter.second.successes;
329  readSegs++;
330  readBytes += counter.second.amount;
331  readTimeTotal += counter.second.timeTotal;
332  } else if (counter.first == static_cast<int>(StorageAccount::Operation::readv)) {
333  readVOps += counter.second.successes;
334  readSegs += counter.second.vector_count;
335  readBytes += counter.second.amount;
336  readTimeTotal += counter.second.timeTotal;
337  } else if ((counter.first == static_cast<int>(StorageAccount::Operation::write)) ||
338  (counter.first == static_cast<int>(StorageAccount::Operation::writev))) {
339  writeBytes += counter.second.amount;
340  writeTimeTotal += counter.second.timeTotal;
341  }
342  }
343  }
344  updateChirp("ReadOps", readOps);
345  updateChirp("ReadVOps", readVOps);
346  updateChirp("ReadSegments", readSegs);
347  updateChirp("ReadBytes", readBytes);
348  updateChirp("ReadTimeMsecs", readTimeTotal / (1000 * 1000));
349  updateChirp("WriteBytes", writeBytes);
350  updateChirp("WriteTimeMsecs", writeTimeTotal / (1000 * 1000));
351 }
352 
353 template <typename T>
354 bool CondorStatusService::updateChirp(const std::string &key_suffix, const T &value) {
355  std::stringstream ss;
356  ss << value;
357  return updateChirpImpl(key_suffix, ss.str());
358 }
359 
361  std::string value_copy = value;
362  // Remove double-quotes or the \ character (as it has special escaping semantics in ClassAds).
363  // Make sure we have ASCII characters.
364  // Otherwise, remainder is allowed (including tabs, newlines, single-quotes).
365  value_copy.erase(
366  remove_if(
367  value_copy.begin(), value_copy.end(), [](const char &c) { return !isascii(c) || (c == '"') || (c == '\\'); }),
368  value_copy.end());
369  return updateChirpImpl(key_suffix, "\"" + value_copy + "\"");
370 }
371 
373  std::stringstream ss;
374  ss << "ChirpCMSSW" << m_tag << key_suffix;
375  std::string key = ss.str();
376  if (m_debug) {
377  std::cout << "condor_chirp set_job_attr_delayed " << key << " " << value << std::endl;
378  }
379  int pid = 0;
380  posix_spawn_file_actions_t file_actions;
381  int devnull_fd = open("/dev/null", O_RDWR);
382  if (devnull_fd == -1) {
383  return false;
384  }
385  posix_spawn_file_actions_init(&file_actions);
386  posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 1);
387  posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 2);
388  const std::string chirp_name = "condor_chirp";
389  const std::string set_job_attr = "set_job_attr_delayed";
390  std::vector<const char *> argv;
391  argv.push_back(chirp_name.c_str());
392  argv.push_back(set_job_attr.c_str());
393  argv.push_back(key.c_str());
394  argv.push_back(value.c_str());
395  argv.push_back(nullptr);
396  int status = posix_spawnp(&pid, "condor_chirp", &file_actions, nullptr, const_cast<char *const *>(&argv[0]), environ);
397  close(devnull_fd);
398  posix_spawn_file_actions_destroy(&file_actions);
399  if (status) {
400  return false;
401  }
402  while ((waitpid(pid, &status, 0) == -1) && errno == -EINTR) {
403  }
404  return status == 0;
405 }
406 
409  desc.setComment("Service to update HTCondor with the current CMSSW status.");
410  desc.addOptionalUntracked<unsigned int>("updateIntervalSeconds", m_defaultUpdateInterval)
411  ->setComment("Interval, in seconds, for HTCondor updates");
412  desc.addOptionalUntracked<bool>("debug", false)->setComment("Enable debugging of this service");
413  desc.addOptionalUntracked<double>("EMAInterval", m_defaultEmaInterval)
414  ->setComment("Interval, in seconds, to calculate event rate over (using EMA)");
415  desc.addOptionalUntracked<std::string>("tag")->setComment(
416  "Identifier tag for this process (a value of 'Foo' results in ClassAd attributes of the form 'ChirpCMSSWFoo*')");
417  descriptions.add("CondorStatusService", desc);
418 }
419 
bool updateChirpImpl(std::string const &key, std::string const &value)
T getUntrackedParameter(std::string const &, T const &) const
CondorStatusService(ParameterSet const &pset, edm::ActivityRegistry &ar)
void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown)
void runPost(GlobalContext const &)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void beginPre(PathsAndConsumesOfModulesBase const &, ProcessContext const &processContext)
virtual bool cpuInfo(std::string &models, double &avgSpeed)=0
CPU information - the models present and average speed.
void updateImpl(time_t secsSinceLastUpdate)
std::atomic< std::uint_least64_t > m_events
edm::serviceregistry::AllArgsMaker< edm::service::CondorStatusService > CondorStatusServiceMaker
ParameterSet const & getParameterSet(ParameterSetID const &id)
void watchPostEvent(PostEvent::slot_type const &iSlot)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
static const StorageStats & summary(void)
#define DEFINE_FWK_SERVICE_MAKER(concrete, maker)
Definition: ServiceMaker.h:109
void lumiPost(GlobalContext const &)
bool exists(std::string const &parameterName) const
checks if a parameter exists
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: models.py:1
std::atomic< std::uint_least64_t > m_lumis
std::vector< std::pair< std::string, CondorIOStats > > condorUpdate()
constexpr int32_t ceil(float num)
ParameterSetID const & parameterSetID() const
void setComment(std::string const &value)
void watchPostCloseFile(PostCloseFile::slot_type const &iSlot)
static StorageClassToken tokenForStorageClassName(std::string const &iName)
bool updateChirpQuoted(const std::string &key_suffix, const std::string &value)
CondorStatusService & operator=(const CondorStatusService &)=delete
std::atomic< std::uint_least64_t > m_runs
bool isAvailable() const
Definition: Service.h:40
double f[11][100]
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
Definition: value.py:1
T min(T a, T b)
Definition: MathUtil.h:58
virtual double getTotalCPU() const =0
void eventPost(StreamContext const &iContext)
unsigned long long uint64_t
Definition: Time.h:13
ParameterSet const & getParameterSet(std::string const &) const
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
bool updateChirp(const std::string &key_suffix, const T &value)
HLT enums.
bool isValid() const
Definition: Hash.h:141
std::vector< EventRange > & sortAndRemoveOverlaps(std::vector< EventRange > &eventRange)
Definition: EventRange.cc:98
ParameterDescriptionBase * addOptionalUntracked(U const &iLabel, T const &value)
long double T
static void fillDescriptions(ConfigurationDescriptions &descriptions)
#define constexpr
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::atomic< std::uint_least64_t > m_files
void filePost(std::string const &, bool)