CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
CondorStatusUpdater.cc
Go to the documentation of this file.
1 
12 
13 #include <fcntl.h>
14 #include <unistd.h>
15 #include <sys/wait.h>
16 #include <spawn.h>
17 #include <iostream>
18 #include <cmath>
19 #include <chrono>
20 #include <sstream>
21 #include <atomic>
22 #include <string>
23 
24 namespace edm {
25 
26  namespace service {
27 
29  {
30 
31  public:
32 
37 
38  static void fillDescriptions(ConfigurationDescriptions &descriptions);
39 
40  private:
41 
42  bool isChirpSupported();
43  template<typename T> bool updateChirp(const std::string &key_suffix, const T &value);
44  bool updateChirp(std::string const &key, std::string const &value);
45  inline void update();
46  void firstUpdate();
47  void lastUpdate();
48  void updateImpl(time_t secsSinceLastUpdate);
49 
50  void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown);
51  void eventPost(StreamContext const& iContext);
52  void lumiPost(GlobalContext const&);
53  void runPost(GlobalContext const&);
54  void beginPre(PathsAndConsumesOfModulesBase const&, ProcessContext const& processContext);
55  void beginPost();
56  void endPost();
57  void filePost(std::string const &, bool);
58 
59  bool m_debug;
60  std::atomic_flag m_shouldUpdate;
61  time_t m_beginJob = 0;
64  float m_rate = 0;
65  static constexpr float m_defaultEmaInterval = 15*60; // Time in seconds to average EMA over for event rate.
66  static constexpr unsigned int m_defaultUpdateInterval = 3*60;
67  std::atomic<time_t> m_lastUpdate;
68  std::atomic<std::uint_least64_t> m_events;
69  std::atomic<std::uint_least64_t> m_lumis;
70  std::atomic<std::uint_least64_t> m_runs;
71  std::atomic<std::uint_least64_t> m_files;
74 
75  std::uint_least64_t m_lastEventCount = 0;
76  };
77 
78  }
79 
80 }
81 
82 using namespace edm::service;
83 
86 
88  :
89  m_debug(false),
90  m_lastUpdate(0),
91  m_events(0),
92  m_lumis(0),
93  m_runs(0),
94  m_files(0)
95 {
96  m_shouldUpdate.clear();
97  if (pset.exists("debug"))
98  {
99  m_debug = true;
100  }
101  if (!isChirpSupported()) {return;}
102 
103  firstUpdate();
104 
112 
113  if (pset.exists("updateIntervalSeconds"))
114  {
115  m_updateInterval = pset.getUntrackedParameter<unsigned int>("updateIntervalSeconds");
116  }
117  if (pset.exists("EMAInterval"))
118  {
119  m_emaInterval = pset.getUntrackedParameter<double>("EMAInterval");
120  }
121  if (pset.exists("tag"))
122  {
123  m_tag = pset.getUntrackedParameter<std::string>("tag");
124  }
125 }
126 
127 
128 void
130 {
131  m_events++;
132  update();
133 }
134 
135 
136 void
138 {
139  m_lumis++;
140  update();
141 }
142 
143 
144 void
146 {
147  m_runs++;
148  update();
149 }
150 
151 
152 void
153 CondorStatusService::filePost(std::string const & /*lfn*/, bool /*usedFallback*/)
154 {
155  m_files++;
156  update();
157 }
158 
159 
160 void
162 {
164  {
165  m_processParameterSetID = processContext.parameterSetID();
166  }
167 }
168 
169 
170 void
172 {
173  ParameterSet const& processParameterSet = edm::getParameterSet(m_processParameterSetID);
174  const edm::ParameterSet &pset = processParameterSet.getParameterSet("@main_input");
175  // PSet info from edm::ScheduleItems
176  int maxEvents = processParameterSet.getUntrackedParameterSet("maxEvents", ParameterSet()).getUntrackedParameter<int>("input", -1);
177  int maxLumis = processParameterSet.getUntrackedParameterSet("maxLuminosityBlocks", ParameterSet()).getUntrackedParameter<int>("input", -1);
178 
179  // lumisToProcess from EventSkipperByID (PoolSource and similar)
180  std::vector<edm::LuminosityBlockRange> toProcess = pset.getUntrackedParameter<std::vector<LuminosityBlockRange> >("lumisToProcess", std::vector<LuminosityBlockRange>());
181  edm::sortAndRemoveOverlaps(toProcess);
182  uint64_t lumiCount = 0;
183  for (auto const &range : toProcess)
184  {
185  if (range.startRun() != range.endRun()) {break;}
186  if (range.endLumi() >= edm::LuminosityBlockID::maxLuminosityBlockNumber()) {break;}
187  lumiCount += (range.endLumi()-range.startLumi());
188  }
189  // Handle sources deriving from ProducerSourceBase
190  unsigned int eventsPerLumi = pset.getUntrackedParameter<unsigned int>("numberEventsInLuminosityBlock", 0);
191  if ((lumiCount == 0) && (maxEvents > 0) && (eventsPerLumi > 0))
192  {
193  lumiCount = static_cast<unsigned int>(std::ceil(static_cast<float>(maxEvents) / static_cast<float>(eventsPerLumi)));
194  }
195 
196  std::vector<std::string> fileNames = pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>());
197  std::stringstream ss_max_files; ss_max_files << fileNames.size();
198  updateChirp("MaxFiles", ss_max_files.str());
199 
200  if (lumiCount > 0)
201  {
202  if (maxLumis < 0) {maxLumis = lumiCount;}
203  if (maxLumis > static_cast<int>(lumiCount))
204  {
205  maxLumis = lumiCount;
206  }
207  }
208  if (maxEvents > 0)
209  {
210  std::stringstream ss_max_events; ss_max_events << maxEvents;
211  updateChirp("MaxEvents", ss_max_events.str());
212  }
213  if (maxLumis > 0)
214  {
215  std::stringstream ss_max_lumis; ss_max_lumis << maxLumis;
216  updateChirp("MaxLumis", ss_max_lumis.str());
217  }
218 
219  m_beginJob = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
220  update();
221 }
222 
223 
224 void
226 {
227  lastUpdate();
228 }
229 
230 
231 bool
233 {
234  if (m_debug) {return true;}
235 
236  return getenv("_CONDOR_CHIRP_CONFIG") && updateChirp("Elapsed", "0");
237 }
238 
239 
240 void
242 {
243  // Note we always update all our statistics to 0 / false / -1
244  // This allows us to overwrite the activities of a previous cmsRun process
245  // within this HTCondor job.
246  updateImpl(0);
247  updateChirp("MaxFiles", "-1");
248  updateChirp("MaxEvents", "-1");
249  updateChirp("MaxLumis", "-1");
250  updateChirp("Done", "false");
251 }
252 
253 
254 void
256 {
257  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
258  updateImpl(now - m_lastUpdate);
259  updateChirp("Done", "true");
260 }
261 
262 
263 void
265 {
266  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
267  if ((now - m_lastUpdate.load(std::memory_order_relaxed)) > m_updateInterval)
268  {
269  if (!m_shouldUpdate.test_and_set(std::memory_order_acquire))
270  {
271  try
272  {
273  time_t sinceLastUpdate = now - m_lastUpdate;
274  m_lastUpdate = now;
275  updateImpl(sinceLastUpdate);
276  m_shouldUpdate.clear(std::memory_order_release);
277  }
278  catch (...)
279  {
280  m_shouldUpdate.clear(std::memory_order_release);
281  throw;
282  }
283  }
284  }
285 }
286 
287 
288 void
289 CondorStatusService::updateImpl(time_t sinceLastUpdate)
290 {
291  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
292  time_t jobTime = now-m_beginJob;
293 
294  updateChirp("LastUpdate", now);
295 
296  if (!m_events || (m_events > m_lastEventCount))
297  {
298  updateChirp("Events", m_events);
299  }
300 
301  updateChirp("Lumis", m_lumis);
302 
303  updateChirp("Runs", m_runs);
304 
305  updateChirp("Files", m_files);
306 
307  float ema_coeff = 1 - std::exp(-static_cast<float>(sinceLastUpdate)/std::max(std::min(m_emaInterval, static_cast<float>(jobTime)), 1.0f));
308  if (sinceLastUpdate > 0)
309  {
310  updateChirp("Elapsed", jobTime);
311  m_rate = ema_coeff*static_cast<float>(m_events-m_lastEventCount)/static_cast<float>(sinceLastUpdate) + (1.0-ema_coeff)*m_rate;
313  updateChirp("EventRate", m_rate);
314  }
315 
316  // Update storage account information
317  auto const& stats = StorageAccount::summary();
318  uint64_t readOps = 0;
319  uint64_t readVOps = 0;
320  uint64_t readSegs = 0;
321  uint64_t readBytes = 0;
322  uint64_t readTimeTotal = 0;
323  uint64_t writeBytes = 0;
324  uint64_t writeTimeTotal = 0;
325  const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
326  for (const auto & storage : stats)
327  {
328  // StorageAccount records statistics for both the TFile layer and the
329  // StorageFactory layer. However, the StorageFactory statistics tend to
330  // be more accurate as various backends may alter the incoming read requests
331  // (such as when lazy-download is used).
332  if (storage.first == token.value()) {continue;}
333  for (const auto & counter : storage.second)
334  {
335  if (counter.first == static_cast<int>(StorageAccount::Operation::read))
336  {
337  readOps += counter.second.successes;
338  readSegs++;
339  readBytes += counter.second.amount;
340  readTimeTotal += counter.second.timeTotal;
341  }
342  else if (counter.first == static_cast<int>(StorageAccount::Operation::readv))
343  {
344  readVOps += counter.second.successes;
345  readSegs += counter.second.vector_count;
346  readBytes += counter.second.amount;
347  readTimeTotal += counter.second.timeTotal;
348  }
349  else if ((counter.first == static_cast<int>(StorageAccount::Operation::write)) || (counter.first == static_cast<int>(StorageAccount::Operation::writev)))
350  {
351  writeBytes += counter.second.amount;
352  writeTimeTotal += counter.second.timeTotal;
353  }
354  }
355  }
356  updateChirp("ReadOps", readOps);
357  updateChirp("ReadVOps", readVOps);
358  updateChirp("ReadSegments", readSegs);
359  updateChirp("ReadBytes", readBytes);
360  updateChirp("ReadTimeMsecs", readTimeTotal/(1000*1000));
361  updateChirp("WriteBytes", writeBytes);
362  updateChirp("WriteTimeMsecs", writeTimeTotal/(1000*1000));
363 }
364 
365 
366 template<typename T> bool
368 {
369  std::stringstream ss; ss << value;
370  return updateChirp(key_suffix, ss.str());
371 }
372 
373 
374 bool
376 {
377  std::stringstream ss; ss << "ChirpCMSSW" << m_tag << key_suffix;
378  std::string key = ss.str();
379  if (m_debug)
380  {
381  std::cout << "condor_chirp set_job_attr_delayed " << key << " " << value << std::endl;
382  }
383  int pid = 0;
384  posix_spawn_file_actions_t file_actions;
385  int devnull_fd = open("/dev/null", O_RDWR);
386  if (devnull_fd == -1) {return false;}
387  posix_spawn_file_actions_init(&file_actions);
388  posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 1);
389  posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 2);
390  const std::string chirp_name = "condor_chirp";
391  const std::string set_job_attr = "set_job_attr_delayed";
392  std::vector<const char *> argv;
393  argv.push_back(chirp_name.c_str());
394  argv.push_back(set_job_attr.c_str());
395  argv.push_back(key.c_str());
396  argv.push_back(value.c_str());
397  argv.push_back(NULL);
398  int status = posix_spawnp(&pid, "condor_chirp", &file_actions, NULL, const_cast<char* const*>(&argv[0]), environ);
399  close(devnull_fd);
400  posix_spawn_file_actions_destroy(&file_actions);
401  if (status)
402  {
403  return false;
404  }
405  while ((waitpid(pid, &status, 0) == -1) && errno == -EINTR) {}
406  return status == 0;
407 }
408 
409 
410 void
412 {
414  desc.setComment("Service to update HTCondor with the current CMSSW status.");
415  desc.addOptionalUntracked<unsigned int>("updateIntervalSeconds", m_defaultUpdateInterval)
416  ->setComment("Interval, in seconds, for HTCondor updates");
417  desc.addOptionalUntracked<bool>("debug", false)
418  ->setComment("Enable debugging of this service");
419  desc.addOptionalUntracked<double>("EMAInterval", m_defaultEmaInterval)
420  ->setComment("Interval, in seconds, to calculate event rate over (using EMA)");
421  desc.addOptionalUntracked<std::string>("tag")
422  ->setComment("Identifier tag for this process (a value of 'Foo' results in ClassAd attributes of the form 'ChirpCMSSWFoo*')");
423  descriptions.add("CondorStatusService", desc);
424 }
425 
426 
429 
T getUntrackedParameter(std::string const &, T const &) const
CondorStatusService(ParameterSet const &pset, edm::ActivityRegistry &ar)
void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown)
#define DEFINE_FWK_SERVICE_MAKER(concrete, maker)
Definition: ServiceMaker.h:117
void runPost(GlobalContext const &)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void beginPre(PathsAndConsumesOfModulesBase const &, ProcessContext const &processContext)
void updateImpl(time_t secsSinceLastUpdate)
std::atomic< std::uint_least64_t > m_events
edm::serviceregistry::AllArgsMaker< edm::service::CondorStatusService > CondorStatusServiceMaker
ParameterSet const & getParameterSet(ParameterSetID const &id)
void watchPostEvent(PostEvent::slot_type const &iSlot)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
static const StorageStats & summary(void)
void lumiPost(GlobalContext const &)
bool exists(std::string const &parameterName) const
checks if a parameter exists
#define NULL
Definition: scimark2.h:8
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::atomic< std::uint_least64_t > m_lumis
#define constexpr
ParameterSetID const & parameterSetID() const
void setComment(std::string const &value)
void watchPostCloseFile(PostCloseFile::slot_type const &iSlot)
static StorageClassToken tokenForStorageClassName(std::string const &iName)
CondorStatusService & operator=(const CondorStatusService &)=delete
std::atomic< std::uint_least64_t > m_runs
double f[11][100]
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
T min(T a, T b)
Definition: MathUtil.h:58
string key
FastSim: produces sample of signal events, overlayed with premixed minbias events.
void eventPost(StreamContext const &iContext)
unsigned long long uint64_t
Definition: Time.h:15
ParameterSet const & getParameterSet(std::string const &) const
tuple pid
Definition: sysUtil.py:22
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
bool updateChirp(const std::string &key_suffix, const T &value)
bool isValid() const
Definition: Hash.h:150
static std::atomic< unsigned int > counter
std::vector< EventRange > & sortAndRemoveOverlaps(std::vector< EventRange > &eventRange)
Definition: EventRange.cc:102
tuple cout
Definition: gather_cfg.py:145
ParameterDescriptionBase * addOptionalUntracked(U const &iLabel, T const &value)
tuple fileNames
Definition: LaserDQM_cfg.py:34
volatile std::atomic< bool > shutdown_flag false
long double T
static void fillDescriptions(ConfigurationDescriptions &descriptions)
tuple status
Definition: mps_update.py:57
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::atomic< std::uint_least64_t > m_files
void filePost(std::string const &, bool)