CMS 3D CMS Logo

CondorStatusUpdater.cc
Go to the documentation of this file.
1 
15 
16 #include <fcntl.h>
17 #include <unistd.h>
18 #include <sys/wait.h>
19 #include <spawn.h>
20 #include <iostream>
21 #include <fstream>
22 #include <sstream>
23 #include <cmath>
24 #include <chrono>
25 #include <sstream>
26 #include <atomic>
27 #include <string>
28 #include <set>
29 
30 namespace edm {
31 
32  namespace service {
33 
35  {
36 
37  public:
38 
43 
44  static void fillDescriptions(ConfigurationDescriptions &descriptions);
45 
46  private:
47 
48  bool isChirpSupported();
49  template<typename T> bool updateChirp(const std::string &key_suffix, const T &value);
50  bool updateChirpQuoted(const std::string &key_suffix, const std::string &value);
51  bool updateChirpImpl(std::string const &key, std::string const &value);
52  inline void update();
53  void firstUpdate();
54  void lastUpdate();
55  void updateImpl(time_t secsSinceLastUpdate);
56 
57  void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown);
58  void eventPost(StreamContext const& iContext);
59  void lumiPost(GlobalContext const&);
60  void runPost(GlobalContext const&);
61  void beginPre(PathsAndConsumesOfModulesBase const&, ProcessContext const& processContext);
62  void beginPost();
63  void endPost();
64  void filePost(std::string const &, bool);
65 
66  bool m_debug;
67  std::atomic_flag m_shouldUpdate;
68  time_t m_beginJob = 0;
71  float m_rate = 0;
72  static constexpr float m_defaultEmaInterval = 15*60; // Time in seconds to average EMA over for event rate.
73  static constexpr unsigned int m_defaultUpdateInterval = 3*60;
74  std::atomic<time_t> m_lastUpdate;
75  std::atomic<std::uint_least64_t> m_events;
76  std::atomic<std::uint_least64_t> m_lumis;
77  std::atomic<std::uint_least64_t> m_runs;
78  std::atomic<std::uint_least64_t> m_files;
81 
82  std::uint_least64_t m_lastEventCount = 0;
83  };
84 
85  }
86 
87 }
88 
89 using namespace edm::service;
90 
93 
95  :
96  m_debug(false),
97  m_lastUpdate(0),
98  m_events(0),
99  m_lumis(0),
100  m_runs(0),
101  m_files(0)
102 {
103  m_shouldUpdate.clear();
104  if (pset.exists("debug"))
105  {
106  m_debug = true;
107  }
108  if (!isChirpSupported()) {return;}
109 
110  firstUpdate();
111 
119 
120  if (pset.exists("updateIntervalSeconds"))
121  {
122  m_updateInterval = pset.getUntrackedParameter<unsigned int>("updateIntervalSeconds");
123  }
124  if (pset.exists("EMAInterval"))
125  {
126  m_emaInterval = pset.getUntrackedParameter<double>("EMAInterval");
127  }
128  if (pset.exists("tag"))
129  {
130  m_tag = pset.getUntrackedParameter<std::string>("tag");
131  }
132 }
133 
134 
135 void
137 {
138  m_events++;
139  update();
140 }
141 
142 
143 void
145 {
146  m_lumis++;
147  update();
148 }
149 
150 
151 void
153 {
154  m_runs++;
155  update();
156 }
157 
158 
159 void
160 CondorStatusService::filePost(std::string const & /*lfn*/, bool /*usedFallback*/)
161 {
162  m_files++;
163  update();
164 }
165 
166 
167 void
169 {
171  {
172  m_processParameterSetID = processContext.parameterSetID();
173  }
174 }
175 
176 
177 void
179 {
180  ParameterSet const& processParameterSet = edm::getParameterSet(m_processParameterSetID);
181  const edm::ParameterSet &pset = processParameterSet.getParameterSet("@main_input");
182  // PSet info from edm::ScheduleItems
183  int maxEvents = processParameterSet.getUntrackedParameterSet("maxEvents", ParameterSet()).getUntrackedParameter<int>("input", -1);
184  int maxLumis = processParameterSet.getUntrackedParameterSet("maxLuminosityBlocks", ParameterSet()).getUntrackedParameter<int>("input", -1);
185 
186  // lumisToProcess from EventSkipperByID (PoolSource and similar)
187  std::vector<edm::LuminosityBlockRange> toProcess = pset.getUntrackedParameter<std::vector<LuminosityBlockRange> >("lumisToProcess", std::vector<LuminosityBlockRange>());
188  edm::sortAndRemoveOverlaps(toProcess);
189  uint64_t lumiCount = 0;
190  for (auto const &range : toProcess)
191  {
192  if (range.startRun() != range.endRun()) {break;}
193  if (range.endLumi() >= edm::LuminosityBlockID::maxLuminosityBlockNumber()) {break;}
194  lumiCount += (range.endLumi()-range.startLumi());
195  }
196  // Handle sources deriving from ProducerSourceBase
197  unsigned int eventsPerLumi = pset.getUntrackedParameter<unsigned int>("numberEventsInLuminosityBlock", 0);
198  if ((lumiCount == 0) && (maxEvents > 0) && (eventsPerLumi > 0))
199  {
200  lumiCount = static_cast<unsigned int>(std::ceil(static_cast<float>(maxEvents) / static_cast<float>(eventsPerLumi)));
201  }
202 
203  std::vector<std::string> fileNames = pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>());
204  std::stringstream ss_max_files; ss_max_files << fileNames.size();
205  updateChirp("MaxFiles", ss_max_files.str());
206 
207  if (lumiCount > 0)
208  {
209  if (maxLumis < 0) {maxLumis = lumiCount;}
210  if (maxLumis > static_cast<int>(lumiCount))
211  {
212  maxLumis = lumiCount;
213  }
214  }
215  if (maxEvents > 0)
216  {
217  std::stringstream ss_max_events; ss_max_events << maxEvents;
218  updateChirp("MaxEvents", ss_max_events.str());
219  }
220  if (maxLumis > 0)
221  {
222  std::stringstream ss_max_lumis; ss_max_lumis << maxLumis;
223  updateChirp("MaxLumis", ss_max_lumis.str());
224  }
225 
226  m_beginJob = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
227  update();
228 }
229 
230 
231 void
233 {
234  lastUpdate();
235 }
236 
237 
238 bool
240 {
241  if (m_debug) {return true;}
242 
243  return getenv("_CONDOR_CHIRP_CONFIG") && updateChirp("Elapsed", "0");
244 }
245 
246 
247 void
249 {
250  // Note we always update all our statistics to 0 / false / -1
251  // This allows us to overwrite the activities of a previous cmsRun process
252  // within this HTCondor job.
253  updateImpl(0);
254  updateChirp("MaxFiles", "-1");
255  updateChirp("MaxEvents", "-1");
256  updateChirp("MaxLumis", "-1");
257  updateChirp("Done", "false");
258 
261  double avgSpeed;
262  if (cpusvc.isAvailable() && cpusvc->cpuInfo(models, avgSpeed)) {
263  updateChirpQuoted("CPUModels", models);
264  updateChirp("CPUSpeed", avgSpeed);
265  }
266 }
267 
268 
269 void
271 {
272  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
273  updateImpl(now - m_lastUpdate);
274  updateChirp("Done", "true");
276  if (!cpusvc.isAvailable()) {
277  std::cout << "At post, CPU service is NOT available.\n";
278  }
279 }
280 
281 
282 void
284 {
285  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
286  if ((now - m_lastUpdate.load(std::memory_order_relaxed)) > m_updateInterval)
287  {
288  if (!m_shouldUpdate.test_and_set(std::memory_order_acquire))
289  {
290  try
291  {
292  time_t sinceLastUpdate = now - m_lastUpdate;
293  m_lastUpdate = now;
294  updateImpl(sinceLastUpdate);
295  m_shouldUpdate.clear(std::memory_order_release);
296  }
297  catch (...)
298  {
299  m_shouldUpdate.clear(std::memory_order_release);
300  throw;
301  }
302  }
303  }
304 }
305 
306 
307 void
308 CondorStatusService::updateImpl(time_t sinceLastUpdate)
309 {
310  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
311  time_t jobTime = now-m_beginJob;
312 
314  if (timingsvc.isAvailable()) {
315  updateChirp("TotalCPU", timingsvc->getTotalCPU());
316  }
317 
318  updateChirp("LastUpdate", now);
319 
320  if (!m_events || (m_events > m_lastEventCount))
321  {
322  updateChirp("Events", m_events);
323  }
324 
325  updateChirp("Lumis", m_lumis);
326 
327  updateChirp("Runs", m_runs);
328 
329  updateChirp("Files", m_files);
330 
331  float ema_coeff = 1 - std::exp(-static_cast<float>(sinceLastUpdate)/std::max(std::min(m_emaInterval, static_cast<float>(jobTime)), 1.0f));
332  if (sinceLastUpdate > 0)
333  {
334  updateChirp("Elapsed", jobTime);
335  m_rate = ema_coeff*static_cast<float>(m_events-m_lastEventCount)/static_cast<float>(sinceLastUpdate) + (1.0-ema_coeff)*m_rate;
337  updateChirp("EventRate", m_rate);
338  }
339 
340  // Update storage account information
341  auto const& stats = StorageAccount::summary();
342  uint64_t readOps = 0;
343  uint64_t readVOps = 0;
344  uint64_t readSegs = 0;
345  uint64_t readBytes = 0;
346  uint64_t readTimeTotal = 0;
347  uint64_t writeBytes = 0;
348  uint64_t writeTimeTotal = 0;
349  const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
350  for (const auto & storage : stats)
351  {
352  // StorageAccount records statistics for both the TFile layer and the
353  // StorageFactory layer. However, the StorageFactory statistics tend to
354  // be more accurate as various backends may alter the incoming read requests
355  // (such as when lazy-download is used).
356  if (storage.first == token.value()) {continue;}
357  for (const auto & counter : storage.second)
358  {
359  if (counter.first == static_cast<int>(StorageAccount::Operation::read))
360  {
361  readOps += counter.second.successes;
362  readSegs++;
363  readBytes += counter.second.amount;
364  readTimeTotal += counter.second.timeTotal;
365  }
366  else if (counter.first == static_cast<int>(StorageAccount::Operation::readv))
367  {
368  readVOps += counter.second.successes;
369  readSegs += counter.second.vector_count;
370  readBytes += counter.second.amount;
371  readTimeTotal += counter.second.timeTotal;
372  }
373  else if ((counter.first == static_cast<int>(StorageAccount::Operation::write)) || (counter.first == static_cast<int>(StorageAccount::Operation::writev)))
374  {
375  writeBytes += counter.second.amount;
376  writeTimeTotal += counter.second.timeTotal;
377  }
378  }
379  }
380  updateChirp("ReadOps", readOps);
381  updateChirp("ReadVOps", readVOps);
382  updateChirp("ReadSegments", readSegs);
383  updateChirp("ReadBytes", readBytes);
384  updateChirp("ReadTimeMsecs", readTimeTotal/(1000*1000));
385  updateChirp("WriteBytes", writeBytes);
386  updateChirp("WriteTimeMsecs", writeTimeTotal/(1000*1000));
387 }
388 
389 
390 template<typename T> bool
392 {
393  std::stringstream ss; ss << value;
394  return updateChirpImpl(key_suffix, ss.str());
395 }
396 
397 bool
399 {
400  std::string value_copy = value;
401  // Remove double-quotes or the \ character (as it has special escaping semantics in ClassAds).
402  // Make sure we have ASCII characters.
403  // Otherwise, remainder is allowed (including tabs, newlines, single-quotes).
404  value_copy.erase(remove_if(value_copy.begin(), value_copy.end(), [](const char &c) {return !isascii(c) || (c == '"') || (c == '\\');}), value_copy.end());
405  return updateChirpImpl(key_suffix, "\"" + value_copy + "\"");
406 }
407 
408 
409 bool
411 {
412  std::stringstream ss; ss << "ChirpCMSSW" << m_tag << key_suffix;
413  std::string key = ss.str();
414  if (m_debug)
415  {
416  std::cout << "condor_chirp set_job_attr_delayed " << key << " " << value << std::endl;
417  }
418  int pid = 0;
419  posix_spawn_file_actions_t file_actions;
420  int devnull_fd = open("/dev/null", O_RDWR);
421  if (devnull_fd == -1) {return false;}
422  posix_spawn_file_actions_init(&file_actions);
423  posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 1);
424  posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 2);
425  const std::string chirp_name = "condor_chirp";
426  const std::string set_job_attr = "set_job_attr_delayed";
427  std::vector<const char *> argv;
428  argv.push_back(chirp_name.c_str());
429  argv.push_back(set_job_attr.c_str());
430  argv.push_back(key.c_str());
431  argv.push_back(value.c_str());
432  argv.push_back(NULL);
433  int status = posix_spawnp(&pid, "condor_chirp", &file_actions, NULL, const_cast<char* const*>(&argv[0]), environ);
434  close(devnull_fd);
435  posix_spawn_file_actions_destroy(&file_actions);
436  if (status)
437  {
438  return false;
439  }
440  while ((waitpid(pid, &status, 0) == -1) && errno == -EINTR) {}
441  return status == 0;
442 }
443 
444 
445 void
447 {
449  desc.setComment("Service to update HTCondor with the current CMSSW status.");
450  desc.addOptionalUntracked<unsigned int>("updateIntervalSeconds", m_defaultUpdateInterval)
451  ->setComment("Interval, in seconds, for HTCondor updates");
452  desc.addOptionalUntracked<bool>("debug", false)
453  ->setComment("Enable debugging of this service");
454  desc.addOptionalUntracked<double>("EMAInterval", m_defaultEmaInterval)
455  ->setComment("Interval, in seconds, to calculate event rate over (using EMA)");
456  desc.addOptionalUntracked<std::string>("tag")
457  ->setComment("Identifier tag for this process (a value of 'Foo' results in ClassAd attributes of the form 'ChirpCMSSWFoo*')");
458  descriptions.add("CondorStatusService", desc);
459 }
460 
461 
464 
bool updateChirpImpl(std::string const &key, std::string const &value)
T getUntrackedParameter(std::string const &, T const &) const
CondorStatusService(ParameterSet const &pset, edm::ActivityRegistry &ar)
void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown)
#define DEFINE_FWK_SERVICE_MAKER(concrete, maker)
Definition: ServiceMaker.h:117
void runPost(GlobalContext const &)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void beginPre(PathsAndConsumesOfModulesBase const &, ProcessContext const &processContext)
virtual bool cpuInfo(std::string &models, double &avgSpeed)=0
CPU information - the models present and average speed.
void updateImpl(time_t secsSinceLastUpdate)
std::atomic< std::uint_least64_t > m_events
edm::serviceregistry::AllArgsMaker< edm::service::CondorStatusService > CondorStatusServiceMaker
ParameterSet const & getParameterSet(ParameterSetID const &id)
void watchPostEvent(PostEvent::slot_type const &iSlot)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
static const StorageStats & summary(void)
void lumiPost(GlobalContext const &)
bool exists(std::string const &parameterName) const
checks if a parameter exists
#define NULL
Definition: scimark2.h:8
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
Definition: models.py:1
std::atomic< std::uint_least64_t > m_lumis
#define constexpr
ParameterSetID const & parameterSetID() const
void setComment(std::string const &value)
void watchPostCloseFile(PostCloseFile::slot_type const &iSlot)
static StorageClassToken tokenForStorageClassName(std::string const &iName)
bool updateChirpQuoted(const std::string &key_suffix, const std::string &value)
CondorStatusService & operator=(const CondorStatusService &)=delete
std::atomic< std::uint_least64_t > m_runs
bool isAvailable() const
Definition: Service.h:46
double f[11][100]
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
Definition: value.py:1
T min(T a, T b)
Definition: MathUtil.h:58
virtual double getTotalCPU() const =0
void eventPost(StreamContext const &iContext)
unsigned long long uint64_t
Definition: Time.h:15
ParameterSet const & getParameterSet(std::string const &) const
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
bool updateChirp(const std::string &key_suffix, const T &value)
HLT enums.
bool isValid() const
Definition: Hash.h:151
std::vector< EventRange > & sortAndRemoveOverlaps(std::vector< EventRange > &eventRange)
Definition: EventRange.cc:102
ParameterDescriptionBase * addOptionalUntracked(U const &iLabel, T const &value)
long double T
static void fillDescriptions(ConfigurationDescriptions &descriptions)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::atomic< std::uint_least64_t > m_files
void filePost(std::string const &, bool)