CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
CondorStatusUpdater.cc
Go to the documentation of this file.
1 
17 
18 #include <fcntl.h>
19 #include <unistd.h>
20 #include <sys/wait.h>
21 #include <spawn.h>
22 #include <iostream>
23 #include <fstream>
24 #include <sstream>
25 #include <cmath>
26 #include <chrono>
27 #include <sstream>
28 #include <atomic>
29 #include <string>
30 #include <set>
31 
32 namespace edm {
33 
34  namespace service {
35 
37  public:
40  CondorStatusService(const CondorStatusService &) = delete;
42 
43  static void fillDescriptions(ConfigurationDescriptions &descriptions);
44 
45  private:
46  bool isChirpSupported();
47  template <typename T>
48  bool updateChirp(const std::string &key_suffix, const T &value);
49  bool updateChirpQuoted(const std::string &key_suffix, const std::string &value);
50  bool updateChirpImpl(std::string const &key, std::string const &value);
51  inline void update();
52  void firstUpdate();
53  void lastUpdate();
54  void updateImpl(time_t secsSinceLastUpdate);
55 
56  void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown);
57  void eventPost(StreamContext const &iContext);
58  void lumiPost(GlobalContext const &);
59  void runPost(GlobalContext const &);
60  void beginPre(PathsAndConsumesOfModulesBase const &, ProcessContext const &processContext);
61  void beginPost();
62  void endPost();
63  void filePost(std::string const &, bool);
64 
65  bool m_debug;
66  std::atomic_flag m_shouldUpdate;
67  time_t m_beginJob = 0;
70  float m_rate = 0;
71  static constexpr float m_defaultEmaInterval = 15 * 60; // Time in seconds to average EMA over for event rate.
72  static constexpr unsigned int m_defaultUpdateInterval = 3 * 60;
73  std::atomic<time_t> m_lastUpdate;
74  std::atomic<std::uint_least64_t> m_events;
75  std::atomic<std::uint_least64_t> m_lumis;
76  std::atomic<std::uint_least64_t> m_runs;
77  std::atomic<std::uint_least64_t> m_files;
80 
81  std::uint_least64_t m_lastEventCount = 0;
82  };
83 
84  } // namespace service
85 
86 } // namespace edm
87 
88 using namespace edm::service;
89 
92 
94  : m_debug(false), m_lastUpdate(0), m_events(0), m_lumis(0), m_runs(0), m_files(0) {
95  m_shouldUpdate.clear();
96  if (pset.exists("debug")) {
97  m_debug = true;
98  }
99  if (!isChirpSupported()) {
100  return;
101  }
102 
103  firstUpdate();
104 
112 
113  if (pset.exists("updateIntervalSeconds")) {
114  m_updateInterval = pset.getUntrackedParameter<unsigned int>("updateIntervalSeconds");
115  }
116  if (pset.exists("EMAInterval")) {
117  m_emaInterval = pset.getUntrackedParameter<double>("EMAInterval");
118  }
119  if (pset.exists("tag")) {
120  m_tag = pset.getUntrackedParameter<std::string>("tag");
121  }
122 }
123 
125  m_events++;
126  update();
127 }
128 
130  m_lumis++;
131  update();
132 }
133 
135  m_runs++;
136  update();
137 }
138 
139 void CondorStatusService::filePost(std::string const & /*lfn*/, bool /*usedFallback*/) {
140  m_files++;
141  update();
142 }
143 
146  m_processParameterSetID = processContext.parameterSetID();
147  }
148 }
149 
151  ParameterSet const &processParameterSet = edm::getParameterSet(m_processParameterSetID);
152  const edm::ParameterSet &pset = processParameterSet.getParameterSet("@main_input");
153  // PSet info from edm::ScheduleItems
154  int maxEvents =
155  processParameterSet.getUntrackedParameterSet("maxEvents", ParameterSet()).getUntrackedParameter<int>("input", -1);
156  int maxLumis = processParameterSet.getUntrackedParameterSet("maxLuminosityBlocks", ParameterSet())
157  .getUntrackedParameter<int>("input", -1);
158 
159  // lumisToProcess from EventSkipperByID (PoolSource and similar)
160  std::vector<edm::LuminosityBlockRange> toProcess = pset.getUntrackedParameter<std::vector<LuminosityBlockRange>>(
161  "lumisToProcess", std::vector<LuminosityBlockRange>());
162  edm::sortAndRemoveOverlaps(toProcess);
163  uint64_t lumiCount = 0;
164  for (auto const &range : toProcess) {
165  if (range.startRun() != range.endRun()) {
166  break;
167  }
169  break;
170  }
171  lumiCount += (range.endLumi() - range.startLumi());
172  }
173  // Handle sources deriving from ProducerSourceBase
174  unsigned int eventsPerLumi = pset.getUntrackedParameter<unsigned int>("numberEventsInLuminosityBlock", 0);
175  if ((lumiCount == 0) && (maxEvents > 0) && (eventsPerLumi > 0)) {
176  lumiCount = static_cast<unsigned int>(std::ceil(static_cast<float>(maxEvents) / static_cast<float>(eventsPerLumi)));
177  }
178 
179  std::vector<std::string> fileNames =
180  pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>());
181  std::stringstream ss_max_files;
182  ss_max_files << fileNames.size();
183  updateChirp("MaxFiles", ss_max_files.str());
184 
185  if (lumiCount > 0) {
186  if (maxLumis < 0) {
187  maxLumis = lumiCount;
188  }
189  if (maxLumis > static_cast<int>(lumiCount)) {
190  maxLumis = lumiCount;
191  }
192  }
193  if (maxEvents > 0) {
194  std::stringstream ss_max_events;
195  ss_max_events << maxEvents;
196  updateChirp("MaxEvents", ss_max_events.str());
197  }
198  if (maxLumis > 0) {
199  std::stringstream ss_max_lumis;
200  ss_max_lumis << maxLumis;
201  updateChirp("MaxLumis", ss_max_lumis.str());
202  }
203 
204  m_beginJob = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
205  update();
206 }
207 
209 
211  if (m_debug) {
212  return true;
213  }
214 
215  return std::getenv("_CONDOR_CHIRP_CONFIG") && updateChirp("Elapsed", "0");
216 }
217 
219  // Note we always update all our statistics to 0 / false / -1
220  // This allows us to overwrite the activities of a previous cmsRun process
221  // within this HTCondor job.
222  updateImpl(0);
223  updateChirp("MaxFiles", "-1");
224  updateChirp("MaxEvents", "-1");
225  updateChirp("MaxLumis", "-1");
226  updateChirp("Done", "false");
227 
229  std::string models;
230  double avgSpeed;
231  if (cpusvc.isAvailable() && cpusvc->cpuInfo(models, avgSpeed)) {
232  updateChirpQuoted("CPUModels", models);
233  updateChirp("CPUSpeed", avgSpeed);
234  }
235 }
236 
238  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
239  updateImpl(now - m_lastUpdate);
240  updateChirp("Done", "true");
242  if (!cpusvc.isAvailable()) {
243  std::cout << "At post, CPU service is NOT available.\n";
244  }
245 }
246 
248  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
249  if ((now - m_lastUpdate.load(std::memory_order_relaxed)) > m_updateInterval) {
250  if (!m_shouldUpdate.test_and_set(std::memory_order_acquire)) {
251  // Caught exception is rethrown
252  CMS_SA_ALLOW try {
253  time_t sinceLastUpdate = now - m_lastUpdate;
254  m_lastUpdate = now;
255  updateImpl(sinceLastUpdate);
256  m_shouldUpdate.clear(std::memory_order_release);
257  } catch (...) {
258  m_shouldUpdate.clear(std::memory_order_release);
259  throw;
260  }
261  }
262  }
263 }
264 
265 void CondorStatusService::updateImpl(time_t sinceLastUpdate) {
266  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
267  time_t jobTime = now - m_beginJob;
268 
270  if (timingsvc.isAvailable()) {
271  updateChirp("TotalCPU", timingsvc->getTotalCPU());
272  }
273 
274  updateChirp("LastUpdate", now);
275 
276  if (!m_events || (m_events > m_lastEventCount)) {
277  updateChirp("Events", m_events);
278  }
279 
280  updateChirp("Lumis", m_lumis);
281 
282  updateChirp("Runs", m_runs);
283 
284  updateChirp("Files", m_files);
285 
286  float ema_coeff = 1 - std::exp(-static_cast<float>(sinceLastUpdate) /
287  std::max(std::min(m_emaInterval, static_cast<float>(jobTime)), 1.0f));
288  if (sinceLastUpdate > 0) {
289  updateChirp("Elapsed", jobTime);
290  m_rate = ema_coeff * static_cast<float>(m_events - m_lastEventCount) / static_cast<float>(sinceLastUpdate) +
291  (1.0 - ema_coeff) * m_rate;
293  updateChirp("EventRate", m_rate);
294  }
295 
296  // If Xrootd was used, pull the statistics from there.
298  if (xrdsvc.isAvailable()) {
299  for (auto const &iter : xrdsvc->condorUpdate()) {
300  std::string site = iter.first;
301  site.erase(std::remove_if(site.begin(), site.end(), [](char x) { return !isalnum(x) && (x != '_'); }),
302  site.end());
303  auto &iostats = iter.second;
304  updateChirp("IOSite_" + site + "_ReadBytes", iostats.bytesRead);
305  updateChirp("IOSite_" + site + "_ReadTimeMS",
306  std::chrono::duration_cast<std::chrono::milliseconds>(iostats.transferTime).count());
307  }
308  }
309 
310  using namespace edm::storage;
311  // Update storage account information
312  auto const &stats = StorageAccount::summary();
313  uint64_t readOps = 0;
314  uint64_t readVOps = 0;
315  uint64_t readSegs = 0;
316  uint64_t readBytes = 0;
317  uint64_t readTimeTotal = 0;
318  uint64_t writeBytes = 0;
319  uint64_t writeTimeTotal = 0;
320  const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
321  for (const auto &storage : stats) {
322  // StorageAccount records statistics for both the TFile layer and the
323  // StorageFactory layer. However, the StorageFactory statistics tend to
324  // be more accurate as various backends may alter the incoming read requests
325  // (such as when lazy-download is used).
326  if (storage.first == token.value()) {
327  continue;
328  }
329  for (const auto &counter : storage.second) {
330  if (counter.first == static_cast<int>(StorageAccount::Operation::read)) {
331  readOps += counter.second.successes;
332  readSegs++;
333  readBytes += counter.second.amount;
334  readTimeTotal += counter.second.timeTotal;
335  } else if (counter.first == static_cast<int>(StorageAccount::Operation::readv)) {
336  readVOps += counter.second.successes;
337  readSegs += counter.second.vector_count;
338  readBytes += counter.second.amount;
339  readTimeTotal += counter.second.timeTotal;
340  } else if ((counter.first == static_cast<int>(StorageAccount::Operation::write)) ||
341  (counter.first == static_cast<int>(StorageAccount::Operation::writev))) {
342  writeBytes += counter.second.amount;
343  writeTimeTotal += counter.second.timeTotal;
344  }
345  }
346  }
347  updateChirp("ReadOps", readOps);
348  updateChirp("ReadVOps", readVOps);
349  updateChirp("ReadSegments", readSegs);
350  updateChirp("ReadBytes", readBytes);
351  updateChirp("ReadTimeMsecs", readTimeTotal / (1000 * 1000));
352  updateChirp("WriteBytes", writeBytes);
353  updateChirp("WriteTimeMsecs", writeTimeTotal / (1000 * 1000));
354 }
355 
356 template <typename T>
357 bool CondorStatusService::updateChirp(const std::string &key_suffix, const T &value) {
358  std::stringstream ss;
359  ss << value;
360  return updateChirpImpl(key_suffix, ss.str());
361 }
362 
364  std::string value_copy = value;
365  // Remove double-quotes or the \ character (as it has special escaping semantics in ClassAds).
366  // Make sure we have ASCII characters.
367  // Otherwise, remainder is allowed (including tabs, newlines, single-quotes).
368  value_copy.erase(
369  remove_if(
370  value_copy.begin(), value_copy.end(), [](const char &c) { return !isascii(c) || (c == '"') || (c == '\\'); }),
371  value_copy.end());
372  return updateChirpImpl(key_suffix, "\"" + value_copy + "\"");
373 }
374 
376  std::stringstream ss;
377  ss << "ChirpCMSSW" << m_tag << key_suffix;
378  std::string key = ss.str();
379  if (m_debug) {
380  std::cout << "condor_chirp set_job_attr_delayed " << key << " " << value << std::endl;
381  }
382  int pid = 0;
383  posix_spawn_file_actions_t file_actions;
384  int devnull_fd = open("/dev/null", O_RDWR);
385  if (devnull_fd == -1) {
386  return false;
387  }
388  posix_spawn_file_actions_init(&file_actions);
389  posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 1);
390  posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 2);
391  const std::string chirp_name = "condor_chirp";
392  const std::string set_job_attr = "set_job_attr_delayed";
393  std::vector<const char *> argv;
394  argv.push_back(chirp_name.c_str());
395  argv.push_back(set_job_attr.c_str());
396  argv.push_back(key.c_str());
397  argv.push_back(value.c_str());
398  argv.push_back(nullptr);
399  int status = posix_spawnp(&pid, "condor_chirp", &file_actions, nullptr, const_cast<char *const *>(&argv[0]), environ);
400  close(devnull_fd);
401  posix_spawn_file_actions_destroy(&file_actions);
402  if (status) {
403  return false;
404  }
405  while ((waitpid(pid, &status, 0) == -1) && errno == -EINTR) {
406  }
407  return status == 0;
408 }
409 
412  desc.setComment("Service to update HTCondor with the current CMSSW status.");
413  desc.addOptionalUntracked<unsigned int>("updateIntervalSeconds", m_defaultUpdateInterval)
414  ->setComment("Interval, in seconds, for HTCondor updates");
415  desc.addOptionalUntracked<bool>("debug", false)->setComment("Enable debugging of this service");
416  desc.addOptionalUntracked<double>("EMAInterval", m_defaultEmaInterval)
417  ->setComment("Interval, in seconds, to calculate event rate over (using EMA)");
418  desc.addOptionalUntracked<std::string>("tag")->setComment(
419  "Identifier tag for this process (a value of 'Foo' results in ClassAd attributes of the form 'ChirpCMSSWFoo*')");
420  descriptions.add("CondorStatusService", desc);
421 }
422 
bool updateChirpImpl(std::string const &key, std::string const &value)
constexpr int32_t ceil(float num)
T getUntrackedParameter(std::string const &, T const &) const
CondorStatusService(ParameterSet const &pset, edm::ActivityRegistry &ar)
const edm::EventSetup & c
#define CMS_SA_ALLOW
void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown)
#define DEFINE_FWK_SERVICE_MAKER(concrete, maker)
Definition: ServiceMaker.h:100
void runPost(GlobalContext const &)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void beginPre(PathsAndConsumesOfModulesBase const &, ProcessContext const &processContext)
virtual bool cpuInfo(std::string &models, double &avgSpeed)=0
CPU information - the models present and average speed.
void updateImpl(time_t secsSinceLastUpdate)
std::atomic< std::uint_least64_t > m_events
edm::serviceregistry::AllArgsMaker< edm::service::CondorStatusService > CondorStatusServiceMaker
ParameterSet const & getParameterSet(ParameterSetID const &id)
void watchPostEvent(PostEvent::slot_type const &iSlot)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
list status
Definition: mps_update.py:107
void lumiPost(GlobalContext const &)
bool exists(std::string const &parameterName) const
checks if a parameter exists
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Exp< T >::type exp(const T &t)
Definition: Exp.h:22
std::atomic< std::uint_least64_t > m_lumis
ParameterSetID const & parameterSetID() const
const uint16_t range(const Frame &aFrame)
void setComment(std::string const &value)
void watchPostCloseFile(PostCloseFile::slot_type const &iSlot)
bool updateChirpQuoted(const std::string &key_suffix, const std::string &value)
CondorStatusService & operator=(const CondorStatusService &)=delete
static constexpr float m_defaultEmaInterval
std::atomic< std::uint_least64_t > m_runs
bool isAvailable() const
Definition: Service.h:40
tuple key
prepare the HTCondor submission files and eventually submit them
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
T min(T a, T b)
Definition: MathUtil.h:58
virtual std::vector< std::pair< std::string, CondorIOStats > > condorUpdate()=0
void eventPost(StreamContext const &iContext)
unsigned long long uint64_t
Definition: Time.h:13
ParameterSet const & getParameterSet(std::string const &) const
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
static constexpr unsigned int m_defaultUpdateInterval
virtual double getTotalCPU() const =0
bool updateChirp(const std::string &key_suffix, const T &value)
bool isValid() const
Definition: Hash.h:141
static std::atomic< unsigned int > counter
std::vector< EventRange > & sortAndRemoveOverlaps(std::vector< EventRange > &eventRange)
Definition: EventRange.cc:98
tuple cout
Definition: gather_cfg.py:144
ParameterDescriptionBase * addOptionalUntracked(U const &iLabel, T const &value)
tuple fileNames
Definition: LaserDQM_cfg.py:34
long double T
static void fillDescriptions(ConfigurationDescriptions &descriptions)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::atomic< std::uint_least64_t > m_files
void filePost(std::string const &, bool)