CMS 3D CMS Logo

CondorStatusUpdater.cc
Go to the documentation of this file.
1 
16 
17 #include <fcntl.h>
18 #include <unistd.h>
19 #include <sys/wait.h>
20 #include <spawn.h>
21 #include <iostream>
22 #include <fstream>
23 #include <sstream>
24 #include <cmath>
25 #include <chrono>
26 #include <sstream>
27 #include <atomic>
28 #include <string>
29 #include <set>
30 
31 namespace edm {
32 
33  namespace service {
34 
36  {
37 
38  public:
39 
44 
45  static void fillDescriptions(ConfigurationDescriptions &descriptions);
46 
47  private:
48 
49  bool isChirpSupported();
50  template<typename T> bool updateChirp(const std::string &key_suffix, const T &value);
51  bool updateChirpQuoted(const std::string &key_suffix, const std::string &value);
52  bool updateChirpImpl(std::string const &key, std::string const &value);
53  inline void update();
54  void firstUpdate();
55  void lastUpdate();
56  void updateImpl(time_t secsSinceLastUpdate);
57 
58  void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown);
59  void eventPost(StreamContext const& iContext);
60  void lumiPost(GlobalContext const&);
61  void runPost(GlobalContext const&);
62  void beginPre(PathsAndConsumesOfModulesBase const&, ProcessContext const& processContext);
63  void beginPost();
64  void endPost();
65  void filePost(std::string const &, bool);
66 
67  bool m_debug;
68  std::atomic_flag m_shouldUpdate;
69  time_t m_beginJob = 0;
72  float m_rate = 0;
73  static constexpr float m_defaultEmaInterval = 15*60; // Time in seconds to average EMA over for event rate.
74  static constexpr unsigned int m_defaultUpdateInterval = 3*60;
75  std::atomic<time_t> m_lastUpdate;
76  std::atomic<std::uint_least64_t> m_events;
77  std::atomic<std::uint_least64_t> m_lumis;
78  std::atomic<std::uint_least64_t> m_runs;
79  std::atomic<std::uint_least64_t> m_files;
82 
83  std::uint_least64_t m_lastEventCount = 0;
84  };
85 
86  }
87 
88 }
89 
90 using namespace edm::service;
91 
94 
96  :
97  m_debug(false),
98  m_lastUpdate(0),
99  m_events(0),
100  m_lumis(0),
101  m_runs(0),
102  m_files(0)
103 {
104  m_shouldUpdate.clear();
105  if (pset.exists("debug"))
106  {
107  m_debug = true;
108  }
109  if (!isChirpSupported()) {return;}
110 
111  firstUpdate();
112 
120 
121  if (pset.exists("updateIntervalSeconds"))
122  {
123  m_updateInterval = pset.getUntrackedParameter<unsigned int>("updateIntervalSeconds");
124  }
125  if (pset.exists("EMAInterval"))
126  {
127  m_emaInterval = pset.getUntrackedParameter<double>("EMAInterval");
128  }
129  if (pset.exists("tag"))
130  {
131  m_tag = pset.getUntrackedParameter<std::string>("tag");
132  }
133 }
134 
135 
136 void
138 {
139  m_events++;
140  update();
141 }
142 
143 
144 void
146 {
147  m_lumis++;
148  update();
149 }
150 
151 
152 void
154 {
155  m_runs++;
156  update();
157 }
158 
159 
160 void
161 CondorStatusService::filePost(std::string const & /*lfn*/, bool /*usedFallback*/)
162 {
163  m_files++;
164  update();
165 }
166 
167 
168 void
170 {
172  {
173  m_processParameterSetID = processContext.parameterSetID();
174  }
175 }
176 
177 
178 void
180 {
181  ParameterSet const& processParameterSet = edm::getParameterSet(m_processParameterSetID);
182  const edm::ParameterSet &pset = processParameterSet.getParameterSet("@main_input");
183  // PSet info from edm::ScheduleItems
184  int maxEvents = processParameterSet.getUntrackedParameterSet("maxEvents", ParameterSet()).getUntrackedParameter<int>("input", -1);
185  int maxLumis = processParameterSet.getUntrackedParameterSet("maxLuminosityBlocks", ParameterSet()).getUntrackedParameter<int>("input", -1);
186 
187  // lumisToProcess from EventSkipperByID (PoolSource and similar)
188  std::vector<edm::LuminosityBlockRange> toProcess = pset.getUntrackedParameter<std::vector<LuminosityBlockRange> >("lumisToProcess", std::vector<LuminosityBlockRange>());
189  edm::sortAndRemoveOverlaps(toProcess);
190  uint64_t lumiCount = 0;
191  for (auto const &range : toProcess)
192  {
193  if (range.startRun() != range.endRun()) {break;}
194  if (range.endLumi() >= edm::LuminosityBlockID::maxLuminosityBlockNumber()) {break;}
195  lumiCount += (range.endLumi()-range.startLumi());
196  }
197  // Handle sources deriving from ProducerSourceBase
198  unsigned int eventsPerLumi = pset.getUntrackedParameter<unsigned int>("numberEventsInLuminosityBlock", 0);
199  if ((lumiCount == 0) && (maxEvents > 0) && (eventsPerLumi > 0))
200  {
201  lumiCount = static_cast<unsigned int>(std::ceil(static_cast<float>(maxEvents) / static_cast<float>(eventsPerLumi)));
202  }
203 
204  std::vector<std::string> fileNames = pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>());
205  std::stringstream ss_max_files; ss_max_files << fileNames.size();
206  updateChirp("MaxFiles", ss_max_files.str());
207 
208  if (lumiCount > 0)
209  {
210  if (maxLumis < 0) {maxLumis = lumiCount;}
211  if (maxLumis > static_cast<int>(lumiCount))
212  {
213  maxLumis = lumiCount;
214  }
215  }
216  if (maxEvents > 0)
217  {
218  std::stringstream ss_max_events; ss_max_events << maxEvents;
219  updateChirp("MaxEvents", ss_max_events.str());
220  }
221  if (maxLumis > 0)
222  {
223  std::stringstream ss_max_lumis; ss_max_lumis << maxLumis;
224  updateChirp("MaxLumis", ss_max_lumis.str());
225  }
226 
227  m_beginJob = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
228  update();
229 }
230 
231 
232 void
234 {
235  lastUpdate();
236 }
237 
238 
239 bool
241 {
242  if (m_debug) {return true;}
243 
244  return getenv("_CONDOR_CHIRP_CONFIG") && updateChirp("Elapsed", "0");
245 }
246 
247 
248 void
250 {
251  // Note we always update all our statistics to 0 / false / -1
252  // This allows us to overwrite the activities of a previous cmsRun process
253  // within this HTCondor job.
254  updateImpl(0);
255  updateChirp("MaxFiles", "-1");
256  updateChirp("MaxEvents", "-1");
257  updateChirp("MaxLumis", "-1");
258  updateChirp("Done", "false");
259 
262  double avgSpeed;
263  if (cpusvc.isAvailable() && cpusvc->cpuInfo(models, avgSpeed)) {
264  updateChirpQuoted("CPUModels", models);
265  updateChirp("CPUSpeed", avgSpeed);
266  }
267 }
268 
269 
270 void
272 {
273  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
274  updateImpl(now - m_lastUpdate);
275  updateChirp("Done", "true");
277  if (!cpusvc.isAvailable()) {
278  std::cout << "At post, CPU service is NOT available.\n";
279  }
280 }
281 
282 
283 void
285 {
286  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
287  if ((now - m_lastUpdate.load(std::memory_order_relaxed)) > m_updateInterval)
288  {
289  if (!m_shouldUpdate.test_and_set(std::memory_order_acquire))
290  {
291  try
292  {
293  time_t sinceLastUpdate = now - m_lastUpdate;
294  m_lastUpdate = now;
295  updateImpl(sinceLastUpdate);
296  m_shouldUpdate.clear(std::memory_order_release);
297  }
298  catch (...)
299  {
300  m_shouldUpdate.clear(std::memory_order_release);
301  throw;
302  }
303  }
304  }
305 }
306 
307 
308 void
309 CondorStatusService::updateImpl(time_t sinceLastUpdate)
310 {
311  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
312  time_t jobTime = now-m_beginJob;
313 
315  if (timingsvc.isAvailable()) {
316  updateChirp("TotalCPU", timingsvc->getTotalCPU());
317  }
318 
319  updateChirp("LastUpdate", now);
320 
321  if (!m_events || (m_events > m_lastEventCount))
322  {
323  updateChirp("Events", m_events);
324  }
325 
326  updateChirp("Lumis", m_lumis);
327 
328  updateChirp("Runs", m_runs);
329 
330  updateChirp("Files", m_files);
331 
332  float ema_coeff = 1 - std::exp(-static_cast<float>(sinceLastUpdate)/std::max(std::min(m_emaInterval, static_cast<float>(jobTime)), 1.0f));
333  if (sinceLastUpdate > 0)
334  {
335  updateChirp("Elapsed", jobTime);
336  m_rate = ema_coeff*static_cast<float>(m_events-m_lastEventCount)/static_cast<float>(sinceLastUpdate) + (1.0-ema_coeff)*m_rate;
338  updateChirp("EventRate", m_rate);
339  }
340 
341  // If Xrootd was used, pull the statistics from there.
343  if (xrdsvc.isAvailable()) {
344  for (auto const &iter : xrdsvc->condorUpdate()) {
345  std::string site = iter.first;
346  site.erase(std::remove_if(site.begin(), site.end(),
347  [](char x){return !isalnum(x) && (x != '_');}),
348  site.end());
349  auto & iostats = iter.second;
350  updateChirp("IOSite_" + site + "_ReadBytes", iostats.bytesRead);
351  updateChirp("IOSite_" + site + "_ReadTimeMS",
352  std::chrono::duration_cast<std::chrono::milliseconds>(iostats.transferTime).count());
353  }
354  }
355 
356  // Update storage account information
357  auto const& stats = StorageAccount::summary();
358  uint64_t readOps = 0;
359  uint64_t readVOps = 0;
360  uint64_t readSegs = 0;
361  uint64_t readBytes = 0;
362  uint64_t readTimeTotal = 0;
363  uint64_t writeBytes = 0;
364  uint64_t writeTimeTotal = 0;
365  const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
366  for (const auto & storage : stats)
367  {
368  // StorageAccount records statistics for both the TFile layer and the
369  // StorageFactory layer. However, the StorageFactory statistics tend to
370  // be more accurate as various backends may alter the incoming read requests
371  // (such as when lazy-download is used).
372  if (storage.first == token.value()) {continue;}
373  for (const auto & counter : storage.second)
374  {
375  if (counter.first == static_cast<int>(StorageAccount::Operation::read))
376  {
377  readOps += counter.second.successes;
378  readSegs++;
379  readBytes += counter.second.amount;
380  readTimeTotal += counter.second.timeTotal;
381  }
382  else if (counter.first == static_cast<int>(StorageAccount::Operation::readv))
383  {
384  readVOps += counter.second.successes;
385  readSegs += counter.second.vector_count;
386  readBytes += counter.second.amount;
387  readTimeTotal += counter.second.timeTotal;
388  }
389  else if ((counter.first == static_cast<int>(StorageAccount::Operation::write)) || (counter.first == static_cast<int>(StorageAccount::Operation::writev)))
390  {
391  writeBytes += counter.second.amount;
392  writeTimeTotal += counter.second.timeTotal;
393  }
394  }
395  }
396  updateChirp("ReadOps", readOps);
397  updateChirp("ReadVOps", readVOps);
398  updateChirp("ReadSegments", readSegs);
399  updateChirp("ReadBytes", readBytes);
400  updateChirp("ReadTimeMsecs", readTimeTotal/(1000*1000));
401  updateChirp("WriteBytes", writeBytes);
402  updateChirp("WriteTimeMsecs", writeTimeTotal/(1000*1000));
403 }
404 
405 
406 template<typename T> bool
408 {
409  std::stringstream ss; ss << value;
410  return updateChirpImpl(key_suffix, ss.str());
411 }
412 
413 bool
415 {
416  std::string value_copy = value;
417  // Remove double-quotes or the \ character (as it has special escaping semantics in ClassAds).
418  // Make sure we have ASCII characters.
419  // Otherwise, remainder is allowed (including tabs, newlines, single-quotes).
420  value_copy.erase(remove_if(value_copy.begin(), value_copy.end(), [](const char &c) {return !isascii(c) || (c == '"') || (c == '\\');}), value_copy.end());
421  return updateChirpImpl(key_suffix, "\"" + value_copy + "\"");
422 }
423 
424 
425 bool
427 {
428  std::stringstream ss; ss << "ChirpCMSSW" << m_tag << key_suffix;
429  std::string key = ss.str();
430  if (m_debug)
431  {
432  std::cout << "condor_chirp set_job_attr_delayed " << key << " " << value << std::endl;
433  }
434  int pid = 0;
435  posix_spawn_file_actions_t file_actions;
436  int devnull_fd = open("/dev/null", O_RDWR);
437  if (devnull_fd == -1) {return false;}
438  posix_spawn_file_actions_init(&file_actions);
439  posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 1);
440  posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 2);
441  const std::string chirp_name = "condor_chirp";
442  const std::string set_job_attr = "set_job_attr_delayed";
443  std::vector<const char *> argv;
444  argv.push_back(chirp_name.c_str());
445  argv.push_back(set_job_attr.c_str());
446  argv.push_back(key.c_str());
447  argv.push_back(value.c_str());
448  argv.push_back(nullptr);
449  int status = posix_spawnp(&pid, "condor_chirp", &file_actions, nullptr, const_cast<char* const*>(&argv[0]), environ);
450  close(devnull_fd);
451  posix_spawn_file_actions_destroy(&file_actions);
452  if (status)
453  {
454  return false;
455  }
456  while ((waitpid(pid, &status, 0) == -1) && errno == -EINTR) {}
457  return status == 0;
458 }
459 
460 
461 void
463 {
465  desc.setComment("Service to update HTCondor with the current CMSSW status.");
466  desc.addOptionalUntracked<unsigned int>("updateIntervalSeconds", m_defaultUpdateInterval)
467  ->setComment("Interval, in seconds, for HTCondor updates");
468  desc.addOptionalUntracked<bool>("debug", false)
469  ->setComment("Enable debugging of this service");
470  desc.addOptionalUntracked<double>("EMAInterval", m_defaultEmaInterval)
471  ->setComment("Interval, in seconds, to calculate event rate over (using EMA)");
472  desc.addOptionalUntracked<std::string>("tag")
473  ->setComment("Identifier tag for this process (a value of 'Foo' results in ClassAd attributes of the form 'ChirpCMSSWFoo*')");
474  descriptions.add("CondorStatusService", desc);
475 }
476 
477 
480 
bool updateChirpImpl(std::string const &key, std::string const &value)
T getUntrackedParameter(std::string const &, T const &) const
CondorStatusService(ParameterSet const &pset, edm::ActivityRegistry &ar)
void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown)
#define DEFINE_FWK_SERVICE_MAKER(concrete, maker)
Definition: ServiceMaker.h:117
void runPost(GlobalContext const &)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void beginPre(PathsAndConsumesOfModulesBase const &, ProcessContext const &processContext)
virtual bool cpuInfo(std::string &models, double &avgSpeed)=0
CPU information - the models present and average speed.
void updateImpl(time_t secsSinceLastUpdate)
std::atomic< std::uint_least64_t > m_events
edm::serviceregistry::AllArgsMaker< edm::service::CondorStatusService > CondorStatusServiceMaker
ParameterSet const & getParameterSet(ParameterSetID const &id)
void watchPostEvent(PostEvent::slot_type const &iSlot)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
static const StorageStats & summary(void)
void lumiPost(GlobalContext const &)
bool exists(std::string const &parameterName) const
checks if a parameter exists
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: models.py:1
std::atomic< std::uint_least64_t > m_lumis
std::vector< std::pair< std::string, CondorIOStats > > condorUpdate()
#define constexpr
ParameterSetID const & parameterSetID() const
void setComment(std::string const &value)
void watchPostCloseFile(PostCloseFile::slot_type const &iSlot)
static StorageClassToken tokenForStorageClassName(std::string const &iName)
bool updateChirpQuoted(const std::string &key_suffix, const std::string &value)
CondorStatusService & operator=(const CondorStatusService &)=delete
std::atomic< std::uint_least64_t > m_runs
bool isAvailable() const
Definition: Service.h:46
double f[11][100]
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
Definition: value.py:1
T min(T a, T b)
Definition: MathUtil.h:58
virtual double getTotalCPU() const =0
void eventPost(StreamContext const &iContext)
unsigned long long uint64_t
Definition: Time.h:15
ParameterSet const & getParameterSet(std::string const &) const
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
bool updateChirp(const std::string &key_suffix, const T &value)
HLT enums.
bool isValid() const
Definition: Hash.h:154
std::vector< EventRange > & sortAndRemoveOverlaps(std::vector< EventRange > &eventRange)
Definition: EventRange.cc:102
ParameterDescriptionBase * addOptionalUntracked(U const &iLabel, T const &value)
long double T
static void fillDescriptions(ConfigurationDescriptions &descriptions)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::atomic< std::uint_least64_t > m_files
void filePost(std::string const &, bool)