CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
CondorStatusUpdater.cc
Go to the documentation of this file.
1 
11 
12 #include <fcntl.h>
13 #include <unistd.h>
14 #include <sys/wait.h>
15 #include <spawn.h>
16 #include <iostream>
17 #include <cmath>
18 #include <chrono>
19 #include <sstream>
20 #include <atomic>
21 #include <string>
22 
23 namespace edm {
24 
25  namespace service {
26 
28  {
29 
30  public:
31 
32  explicit CondorStatusService(ParameterSet const& pset, edm::ActivityRegistry& ar);
36 
37  static void fillDescriptions(ConfigurationDescriptions &descriptions);
38 
39  private:
40 
41  bool isChirpSupported();
42  bool updateChirp(std::string const &key, std::string const &value);
43  inline void update();
44  void firstUpdate();
45  void lastUpdate();
46  void updateImpl(time_t secsSinceLastUpdate);
47 
48  void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown);
49  void eventPost(StreamContext const& iContext);
50  void lumiPost(GlobalContext const&);
51  void runPost(GlobalContext const&);
52  void beginPre(PathsAndConsumesOfModulesBase const&, ProcessContext const& processContext);
53  void beginPost();
54  void endPost();
55  void filePost(std::string const &, bool);
56 
57  bool m_debug;
58  std::atomic_flag m_shouldUpdate;
59  time_t m_beginJob = 0;
62  float m_rate = 0;
63  static constexpr float m_defaultEmaInterval = 15*60; // Time in seconds to average EMA over for event rate.
64  static constexpr unsigned int m_defaultUpdateInterval = 3*60;
65  std::atomic<time_t> m_lastUpdate;
66  std::atomic<std::uint_least64_t> m_events;
67  std::atomic<std::uint_least64_t> m_lumis;
68  std::atomic<std::uint_least64_t> m_runs;
69  std::atomic<std::uint_least64_t> m_files;
71 
72  std::uint_least64_t m_lastEventCount = 0;
73  };
74 
75  }
76 
77 }
78 
79 using namespace edm::service;
80 
82  :
83  m_debug(false),
84  m_lastUpdate(0),
85  m_events(0),
86  m_lumis(0),
87  m_runs(0),
88  m_files(0)
89 {
90  m_shouldUpdate.clear();
91  if (pset.exists("debug"))
92  {
93  m_debug = true;
94  }
95  if (!isChirpSupported()) {return;}
96 
97  firstUpdate();
98 
106 
107  if (pset.exists("updateIntervalSeconds"))
108  {
109  m_updateInterval = pset.getUntrackedParameter<unsigned int>("updateIntervalSeconds");
110  }
111  if (pset.exists("EMAInterval"))
112  {
113  m_emaInterval = pset.getUntrackedParameter<double>("EMAInterval");
114  }
115 }
116 
117 
118 void
120 {
121  m_events++;
122  update();
123 }
124 
125 
126 void
128 {
129  m_lumis++;
130  update();
131 }
132 
133 
134 void
136 {
137  m_runs++;
138  update();
139 }
140 
141 
142 void
143 CondorStatusService::filePost(std::string const & /*lfn*/, bool /*usedFallback*/)
144 {
145  m_files++;
146  update();
147 }
148 
149 
150 void
152 {
154  {
155  m_processParameterSetID = processContext.parameterSetID();
156  }
157 }
158 
159 
160 void
162 {
163  ParameterSet const& processParameterSet = edm::getParameterSet(m_processParameterSetID);
164  const edm::ParameterSet &pset = processParameterSet.getParameterSet("@main_input");
165  // PSet info from edm::ScheduleItems
166  int maxEvents = processParameterSet.getUntrackedParameterSet("maxEvents", ParameterSet()).getUntrackedParameter<int>("input", -1);
167  int maxLumis = processParameterSet.getUntrackedParameterSet("maxLuminosityBlocks", ParameterSet()).getUntrackedParameter<int>("input", -1);
168 
169  // lumisToProcess from EventSkipperByID (PoolSource and similar)
170  std::vector<edm::LuminosityBlockRange> toProcess = pset.getUntrackedParameter<std::vector<LuminosityBlockRange> >("lumisToProcess", std::vector<LuminosityBlockRange>());
171  edm::sortAndRemoveOverlaps(toProcess);
172  uint64_t lumiCount = 0;
173  for (auto const &range : toProcess)
174  {
175  if (range.startRun() != range.endRun()) {break;}
176  if (range.endLumi() >= edm::LuminosityBlockID::maxLuminosityBlockNumber()) {break;}
177  lumiCount += (range.endLumi()-range.startLumi());
178  }
179  // Handle sources deriving from ProducerSourceBase
180  unsigned int eventsPerLumi = pset.getUntrackedParameter<unsigned int>("numberEventsInLuminosityBlock", 0);
181  if ((lumiCount == 0) && (maxEvents > 0) && (eventsPerLumi > 0))
182  {
183  lumiCount = static_cast<unsigned int>(std::ceil(static_cast<float>(maxEvents) / static_cast<float>(eventsPerLumi)));
184  }
185 
186  std::vector<std::string> fileNames = pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>());
187  std::stringstream ss_max_files; ss_max_files << fileNames.size();
188  updateChirp("ChirpCMSSWMaxFiles", ss_max_files.str());
189 
190  if (lumiCount > 0)
191  {
192  if (maxLumis < 0) {maxLumis = lumiCount;}
193  if (maxLumis > static_cast<int>(lumiCount))
194  {
195  maxLumis = lumiCount;
196  }
197  }
198  if (maxEvents > 0)
199  {
200  std::stringstream ss_max_events; ss_max_events << maxEvents;
201  updateChirp("ChirpCMSSWMaxEvents", ss_max_events.str());
202  }
203  if (maxLumis > 0)
204  {
205  std::stringstream ss_max_lumis; ss_max_lumis << maxLumis;
206  updateChirp("ChirpCMSSWMaxLumis", ss_max_lumis.str());
207  }
208 
209  m_beginJob = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
210  update();
211 }
212 
213 
214 void
216 {
217  lastUpdate();
218 }
219 
220 
221 bool
223 {
224  if (m_debug) {return true;}
225 
226  return getenv("_CONDOR_CHIRP_CONFIG") && updateChirp("ChirpCMSSWElapsed", "0");
227 }
228 
229 
230 void
232 {
233  // Note we always update all our statistics to 0 / false / -1
234  // This allows us to overwrite the activities of a previous cmsRun process
235  // within this HTCondor job.
236  updateImpl(0);
237  updateChirp("ChirpCMSSWMaxFiles", "-1");
238  updateChirp("ChirpCMSSWMaxEvents", "-1");
239  updateChirp("ChirpCMSSWMaxLumis", "-1");
240  updateChirp("ChirpCMSSWDone", "false");
241 }
242 
243 
244 void
246 {
247  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
248  updateImpl(now - m_lastUpdate);
249  updateChirp("ChirpCMSSWDone", "true");
250 }
251 
252 
253 void
255 {
256  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
257  if ((now - m_lastUpdate.load(std::memory_order_relaxed)) > m_updateInterval)
258  {
259  if (!m_shouldUpdate.test_and_set(std::memory_order_acquire))
260  {
261  try
262  {
263  time_t sinceLastUpdate = now - m_lastUpdate;
264  m_lastUpdate = now;
265  updateImpl(sinceLastUpdate);
266  m_shouldUpdate.clear(std::memory_order_release);
267  }
268  catch (...)
269  {
270  m_shouldUpdate.clear(std::memory_order_release);
271  throw;
272  }
273  }
274  }
275 }
276 
277 
278 void
279 CondorStatusService::updateImpl(time_t sinceLastUpdate)
280 {
281  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
282  time_t jobTime = now-m_beginJob;
283 
284  std::stringstream ss_now; ss_now << now;
285  updateChirp("ChirpCMSSWLastUpdate", ss_now.str());
286 
288  {
289  std::stringstream ss_events; ss_events << m_events;
290  updateChirp("ChirpCMSSWEvents", ss_events.str());
291  }
292 
293  std::stringstream ss_lumis; ss_lumis << m_lumis;
294  updateChirp("ChirpCMSSWLumis", ss_lumis.str());
295 
296  std::stringstream ss_runs; ss_runs << m_runs;
297  updateChirp("ChirpCMSSWRuns", ss_runs.str());
298 
299  std::stringstream ss_files; ss_files << m_files;
300  updateChirp("ChirpCMSSWFiles", ss_files.str());
301 
302  float ema_coeff = 1 - std::exp(-static_cast<float>(sinceLastUpdate)/m_emaInterval);
303  if (sinceLastUpdate > 0)
304  {
305  std::stringstream ss_elapsed; ss_elapsed << jobTime;
306  updateChirp("ChirpCMSSWElapsed", ss_elapsed.str());
307  m_rate = ema_coeff*static_cast<float>(m_events-m_lastEventCount)/static_cast<float>(sinceLastUpdate) + (1.0-ema_coeff)*m_rate;
309  std::stringstream ss_rate; ss_rate << m_rate;
310  updateChirp("ChirpCMSSWEventRate", ss_rate.str());
311  }
312 }
313 
314 
315 bool
317 {
318  if (m_debug)
319  {
320  std::cout << "condor_chirp set_job_attr_delayed " << key << " " << value << std::endl;
321  }
322  int pid = 0;
323  posix_spawn_file_actions_t file_actions;
324  int devnull_fd = open("/dev/null", O_RDWR);
325  if (devnull_fd == -1) {return false;}
326  posix_spawn_file_actions_init(&file_actions);
327  posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 1);
328  posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 2);
329  const std::string chirp_name = "condor_chirp";
330  const std::string set_job_attr = "set_job_attr_delayed";
331  std::vector<const char *> argv;
332  argv.push_back(chirp_name.c_str());
333  argv.push_back(set_job_attr.c_str());
334  argv.push_back(key.c_str());
335  argv.push_back(value.c_str());
336  argv.push_back(NULL);
337  int status = posix_spawnp(&pid, "condor_chirp", &file_actions, NULL, const_cast<char* const*>(&argv[0]), environ);
338  close(devnull_fd);
339  posix_spawn_file_actions_destroy(&file_actions);
340  if (status)
341  {
342  return false;
343  }
344  while ((waitpid(pid, &status, 0) == -1) && errno == -EINTR) {}
345  return status == 0;
346 }
347 
348 
349 void
351 {
353  desc.setComment("Service to update HTCondor with the current CMSSW status.");
354  desc.addOptionalUntracked<unsigned int>("updateIntervalSeconds", m_defaultUpdateInterval)
355  ->setComment("Interval, in seconds, for HTCondor updates");
356  desc.addOptionalUntracked<bool>("debug", false)
357  ->setComment("Enable debugging of this service");
358  desc.addOptionalUntracked<double>("EMAInterval", m_defaultEmaInterval)
359  ->setComment("Interval, in seconds, to calculate event rate over (using EMA)");
360  descriptions.add("CondorStatusService", desc);
361 }
362 
363 
366 
T getUntrackedParameter(std::string const &, T const &) const
CondorStatusService(ParameterSet const &pset, edm::ActivityRegistry &ar)
void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown)
#define DEFINE_FWK_SERVICE_MAKER(concrete, maker)
Definition: ServiceMaker.h:117
void runPost(GlobalContext const &)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void beginPre(PathsAndConsumesOfModulesBase const &, ProcessContext const &processContext)
void updateImpl(time_t secsSinceLastUpdate)
std::atomic< std::uint_least64_t > m_events
edm::serviceregistry::AllArgsMaker< edm::service::CondorStatusService > CondorStatusServiceMaker
ParameterSet const & getParameterSet(ParameterSetID const &id)
void watchPostEvent(PostEvent::slot_type const &iSlot)
static LuminosityBlockNumber_t maxLuminosityBlockNumber()
void lumiPost(GlobalContext const &)
bool exists(std::string const &parameterName) const
checks if a parameter exists
#define NULL
Definition: scimark2.h:8
ParameterSet getUntrackedParameterSet(std::string const &name, ParameterSet const &defaultValue) const
std::atomic< std::uint_least64_t > m_lumis
#define constexpr
ParameterSetID const & parameterSetID() const
void setComment(std::string const &value)
void watchPostCloseFile(PostCloseFile::slot_type const &iSlot)
bool updateChirp(std::string const &key, std::string const &value)
CondorStatusService & operator=(const CondorStatusService &)=delete
std::atomic< std::uint_least64_t > m_runs
void watchPostGlobalEndLumi(PostGlobalEndLumi::slot_type const &iSlot)
void eventPost(StreamContext const &iContext)
unsigned long long uint64_t
Definition: Time.h:15
ParameterSet const & getParameterSet(std::string const &) const
tuple pid
Definition: sysUtil.py:22
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
bool isValid() const
Definition: Hash.h:150
std::vector< EventRange > & sortAndRemoveOverlaps(std::vector< EventRange > &eventRange)
Definition: EventRange.cc:102
tuple cout
Definition: gather_cfg.py:121
ParameterDescriptionBase * addOptionalUntracked(U const &iLabel, T const &value)
tuple fileNames
Definition: LaserDQM_cfg.py:34
volatile std::atomic< bool > shutdown_flag false
tuple status
Definition: ntuplemaker.py:245
static void fillDescriptions(ConfigurationDescriptions &descriptions)
void watchPostBeginJob(PostBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::atomic< std::uint_least64_t > m_files
void filePost(std::string const &, bool)