CMS 3D CMS Logo

esMonitoring.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import argparse
4 import subprocess
5 import socket, fcntl, select, atexit, signal, asyncore
6 import sys, os, time, datetime
7 import collections
8 import json
9 import zlib
10 
11 def log(s):
12  sys.stderr.write("m: " + s + "\n");
13  sys.stderr.flush()
14 
15 def dt2time(dt):
16  # convert datetime timstamp to unix
17  return time.mktime(dt.timetuple())
18 
19 class JsonEncoder(json.JSONEncoder):
20  def default(self, obj):
21  if hasattr(obj, 'to_json'):
22  return obj.to_json()
23 
24  return json.JSONEncoder.default(self, obj)
25 
27  def __init__(self, args):
28  self.last_make_report = None
30  self.seq = 0
31  self.args = args
32 
33  self.doc = {
34  "hostname": socket.gethostname(),
35  "sequence": self.seq,
36  "cmdline": args.pargs,
37  }
38 
39  def defaults(self):
40  self.id_format = u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
41  self.doc["type"] = "dqm-source-state"
42  self.doc["run"] = 0
43 
44  # figure out the tag
45  c = self.doc["cmdline"]
46  for l in c:
47  if l.endswith(".py"):
48  t = os.path.basename(l)
49  t = t.replace(".py", "")
50  t = t.replace("_cfg", "")
51  self.doc["tag"] = t
52 
53  pr = l.split("=")
54  if len(pr) > 1 and pr[0] == "runNumber" and pr[1].isdigit():
55  run = long(pr[1])
56  self.doc["run"] = run
57 
58  self.make_id()
59 
60  #if os.environ.has_key("GZIP_LOG"):
61  # self.doc["stdlog_gzip"] = os.environ["GZIP_LOG"]
62 
63  try:
64  self.doc["stdout_fn"] = os.readlink("/proc/self/fd/1")
65  self.doc["stderr_fn"] = os.readlink("/proc/self/fd/2")
66  except:
67  pass
68 
69  self.update_doc({ "extra": {
70  "environ": dict(os.environ)
71  }})
72 
73  def make_id(self):
74  id = self.id_format % self.doc
75  self.doc["_id"] = id
76  return id
77 
78  def update_doc_recursive(self, old_obj, new_obj):
79  for key, value in new_obj.items():
80  if (key in old_obj and
81  isinstance(value, dict) and
82  isinstance(old_obj[key], dict)):
83 
84  self.update_doc_recursive(old_obj[key], value)
85  else:
86  old_obj[key] = value
87 
88  def update_doc(self, keys):
89  self.update_doc_recursive(self.doc, keys)
90 
91  def update_ps_status(self):
92  try:
93  pid = int(self.doc["pid"])
94  fn = "/proc/%d/status" % pid
95  f = open(fn, "r")
96  d = {}
97  for line in f:
98  k, v = line.strip().split(":", 1)
99  d[k.strip()] = v.strip()
100  f.close()
101 
102  self.update_doc({ 'extra': { 'ps_info': d } })
103  except:
104  pass
105 
106  def update_mem_status(self):
107  try:
108  key = str(time.time())
109 
110  pid = int(self.doc["pid"])
111  fn = "/proc/%d/statm" % pid
112  f = open(fn, "r")
113  dct = { key: f.read().strip() }
114  f.close()
115 
116  self.update_doc({ 'extra': { 'mem_info': dct } })
117  except:
118  pass
119 
120  def make_report(self):
121  self.last_make_report = time.time()
122  self.doc["report_timestamp"] = time.time()
123  self.make_id()
124 
125  m_path = self.args.path
126 
127  if not os.path.isdir(m_path):
128  if self.args.debug:
129  log("File not written, because report directory does not exist: %s." % m_path)
130  # don't make a report if the directory is not available
131  return
132 
133  self.update_ps_status()
134  self.update_mem_status()
135 
136  fn_id = self.doc["_id"] + ".jsn"
137 
138  fn = os.path.join(m_path, fn_id)
139  fn_tmp = os.path.join(m_path, fn_id + ".tmp")
140 
141  with open(fn_tmp, "w") as f:
142  json.dump(self.doc, f, indent=True, cls=JsonEncoder)
143 
144  os.rename(fn_tmp, fn)
145 
146  if self.args.debug:
147  log("File %s written." % fn)
148 
149  def try_update(self):
150  # first time
151  if self.last_make_report is None:
152  return self.make_report()
153 
154  now = time.time()
155  delta = now - self.last_make_report
156  if delta > self.make_report_timer:
157  return self.make_report()
158 
160  def __init__(self, max_bytes=16*1024, max_lines=256):
161  self.max_bytes = max_bytes
162  self.max_lines = max_lines
163 
164  self.buf = collections.deque()
165  self.size = 0
166 
167  def pop(self):
168  elm = self.buf.popleft()
169  self.size -= len(elm)
170 
171  def push(self, rbuf):
172  self.buf.append(rbuf)
173  self.size += len(rbuf)
174 
175  def write(self, line):
176  line_size = len(line)
177 
178  while len(self.buf) and ((self.size + line_size) > self.max_bytes):
179  self.pop()
180 
181  while (len(self.buf) + 1) > self.max_lines:
182  self.pop()
183 
184  self.push(line)
185 
186  def to_json(self):
187  return list(self.buf)
188 
190  def __init__(self, *kargs, **kwargs):
191  LineHistoryEnd.__init__(self, *kargs, **kwargs)
192  self.done = False
193 
194  def write(self, line):
195  if self.done:
196  return
197 
198  if ((self.size + len(line)) > self.max_bytes):
199  self.done = True
200  return
201 
202  if (len(self.buf) > self.max_lines):
203  self.done = True
204  return
205 
206  self.push(line)
207 
209  def __init__(self):
210  self.line_buf = []
211 
212  def handle_close(self):
213  # closing fd
214  if len(self.line_buf):
215  self.handle_line("".join(self.line_buf))
216  self.line_buf = []
217 
218  self.close()
219 
220  def handle_read(self):
221  rbuf = self.recv(1024*16)
222 
226 
227  self.line_buf.append(rbuf)
228  if "\n" in rbuf:
229  # split whatever we have
230  spl = "".join(self.line_buf).split("\n")
231 
232  while len(spl) > 1:
233  line = spl.pop(0)
234  self.handle_line(line + "\n")
235 
236  if len(spl[0]):
237  self.line_buf = [spl[0]]
238  else:
239  self.line_buf = []
240 
241  def handle_line(self):
242  # override this!
243  pass
244 
245 class AsyncLineReaderTimeoutMixin(AsyncLineReaderMixin):
246  def __init__(self, timeout_secs):
247  self.timeout_secs = timeout_secs
248  self.last_read = time.time()
249 
250  super(AsyncLineReaderTimeoutMixin, self).__init__()
251 
252  def handle_read(self):
253  self.last_read = time.time()
254  AsyncLineReaderMixin.handle_read(self)
255 
256  def readable(self):
257  if (time.time() - self.last_read) >= self.timeout_secs:
258  self.last_read = time.time()
259  self.handle_timeout()
260 
261  return super(AsyncLineReaderTimeoutMixin, self).readable()
262 
263 class FDJsonHandler(AsyncLineReaderMixin, asyncore.dispatcher):
264  def __init__(self, sock, es):
265  AsyncLineReaderMixin.__init__(self)
266  asyncore.dispatcher.__init__(self, sock)
267 
268  self.es = es
269 
270  def handle_line(self, line):
271  if len(line) < 4:
272  # keep alive 'ping'
273  self.es.try_update()
274  return
275 
276  try:
277  doc = json.loads(line)
278 
279  for k in ["pid", "run", "lumi"]:
280  if k in doc:
281  doc[k] = int(doc[k])
282 
283  self.es.update_doc_recursive(self.es.doc, doc)
284  self.es.try_update()
285  except:
286  log("cannot deserialize json len: %d content: %s" % (len(line), line))
287 
288  def handle_write(self):
289  pass
290 
291  def writable(self):
292  return False
293 
294 class FDJsonServer(asyncore.file_dispatcher):
295  def __init__(self, es, args):
296  asyncore.dispatcher.__init__(self)
297 
298  self.fn = None
299  self.es = es
300  self.args = args
301 
302  prefix = "/tmp"
303  if os.path.isdir(self.args.path):
304  prefix = self.args.path
305 
306  base = ".es_monitoring_pid%08d" % os.getpid()
307  self.fn = os.path.join(prefix, base)
308 
309  if self.args.debug:
310  log("Socket path: %s" % self.fn)
311 
312  if os.path.exists(self.fn):
313  os.unlink(self.fn)
314 
315  self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
316  oldmask = os.umask(0o077)
317  try:
318  self.bind(self.fn)
319  self.listen(5)
320  finally:
321  os.umask(oldmask)
322  pass
323 
324  atexit.register(self.cleanup)
325 
326  def cleanup(self):
327  if self.fn is not None:
328  if os.path.exists(self.fn):
329  os.unlink(self.fn)
330 
331  def handle_accept(self):
332  pair = self.accept()
333  if pair is not None:
334  handler = FDJsonHandler(pair[0], self.es)
335 
336  def handle_close(self):
337  self.close()
338  self.cleanup()
339 
340 class FDOutputListener(AsyncLineReaderTimeoutMixin, asyncore.file_dispatcher):
341  def __init__(self, fd, es, zlog, close_socket=None):
342  AsyncLineReaderTimeoutMixin.__init__(self, 5)
343  asyncore.file_dispatcher.__init__(self, fd)
344 
345  self.es = es
346  self.zlog = zlog
347  self.close_socket = close_socket
348 
351 
352  self.es.update_doc({ 'extra': { 'stdlog_start': self.start } })
353  self.es.update_doc({ 'extra': { 'stdlog_end': self.end } })
354 
355  def writable(self):
356  return False
357 
358  def handle_line(self, line):
359  if self.zlog is not None:
360  self.zlog.write(line)
361  else:
362  sys.stdout.write(line)
363  sys.stdout.flush()
364 
365  self.start.write(line)
366  self.end.write(line)
367  self.es.try_update()
368 
369  def handle_timeout(self):
370  self.es.try_update()
371 
372  if self.zlog is not None:
373  self.zlog.handle_timeout()
374 
375  def handle_close(self):
376  super(FDOutputListener, self).handle_close()
377 
378  if self.close_socket is not None:
380 
381  def finish(self):
382  if self.zlog is not None:
383  self.zlog.finish()
384 
385 
386 CURRENT_PROC = []
388  es = ElasticReport(args=args)
389 
390  json_handler = FDJsonServer(es=es, args=args)
391  env = os.environ.copy()
392  env["DQM2_SOCKET"] = json_handler.fn
393 
394  def preexec():
395  try:
396  # ensure the child dies if we are SIGKILLED
397  import ctypes
398  libc = ctypes.CDLL("libc.so.6")
399  PR_SET_PDEATHSIG = 1
400  libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
401  except:
402  log("Failed to setup PR_SET_PDEATHSIG.")
403  pass
404 
405  p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True, env=env)
406  CURRENT_PROC.append(p)
407 
408  zlog = None
409  if args.zlog:
410  try:
411  relpath = os.path.dirname(__file__)
412  sys.path.append(relpath)
413  from ztee import GZipLog
414 
415  zlog_ = GZipLog(log_file=args.zlog)
416  es.update_doc({ "stdlog_gzip": args.zlog })
417 
418  log("Open gzip log file: %s" % args.zlog)
419  zlog = zlog_
420  except Exception as e:
421  log("Failed to setup zlog file: " + str(e))
422 
423  es.update_doc({ "pid": p.pid })
424  es.update_doc({ "monitoring_pid": os.getpid() })
425  es.update_doc({ "monitoring_socket": json_handler.fn })
426  es.defaults()
427  es.make_report()
428 
429  log_handler = FDOutputListener(fd=p.stdout.fileno(), es=es, zlog=zlog, close_socket=json_handler)
430  log_handler.handle_line("-- starting process: %s --\n" % str(args.pargs))
431 
432  try:
433  #manager.event_loop(timeout=5, exit_fd=p.stdout.fileno())
434  asyncore.loop(timeout=5)
435  except select.error as e:
436  # we have this on ctrl+c
437  # just terminate the child
438  log("Select error (we will terminate): " + str(e))
439  p.terminate()
440 
441  # at this point the program is dead
442  r = p.wait()
443  log_handler.handle_line("\n-- process exit: %s --\n" % str(r))
444  log_handler.finish()
445 
446  es.update_doc({ "exit_code": r })
447  es.make_report()
448 
449  CURRENT_PROC.remove(p)
450  return r
451 
452 def handle_signal(signum, frame):
453  for proc in CURRENT_PROC:
454  proc.send_signal(signum)
455 
456 if __name__ == "__main__":
457  parser = argparse.ArgumentParser(description="Monitor a child process and produce es documents.")
458  parser.add_argument('--debug', '-d', action='store_true', help="Debug mode")
459  parser.add_argument('--zlog', '-z', type=str, default=None, help="Don't output anything, zip the log file (uses ztee.py).")
460  parser.add_argument('--path', '-p', type=str, default="/tmp/dqm_monitoring/", help="Path for the monitoring output.")
461  parser.add_argument('pargs', nargs=argparse.REMAINDER)
462  args = parser.parse_args()
463 
464  if not args.pargs:
465  parser.print_help()
466  sys.exit(-1)
467  elif args.pargs[0] == "--":
468  # compat with 2.6
469  args.pargs = args.pargs[1:]
470 
471  # do some signal magic
472  signal.signal(signal.SIGINT, handle_signal)
473  signal.signal(signal.SIGTERM, handle_signal)
474 
475  sys.exit(launch_monitoring(args))
esMonitoring.AsyncLineReaderTimeoutMixin.last_read
last_read
Definition: esMonitoring.py:248
esMonitoring.FDJsonHandler.handle_line
def handle_line(self, line)
Definition: esMonitoring.py:270
resolutioncreator_cfi.object
object
Definition: resolutioncreator_cfi.py:4
esMonitoring.FDJsonServer.args
args
Definition: esMonitoring.py:300
esMonitoring.LineHistoryEnd.to_json
def to_json(self)
Definition: esMonitoring.py:186
esMonitoring.ElasticReport.try_update
def try_update(self)
Definition: esMonitoring.py:149
esMonitoring.ElasticReport.doc
doc
Definition: esMonitoring.py:33
esMonitoring.FDJsonServer.cleanup
def cleanup(self)
Definition: esMonitoring.py:326
esMonitoring.LineHistoryEnd.write
def write(self, line)
Definition: esMonitoring.py:175
digitizers_cfi.strip
strip
Definition: digitizers_cfi.py:19
esMonitoring.AsyncLineReaderTimeoutMixin.timeout_secs
timeout_secs
Definition: esMonitoring.py:247
esMonitoring.LineHistoryStart.write
def write(self, line)
Definition: esMonitoring.py:194
join
static std::string join(char **cmd)
Definition: RemoteFile.cc:17
esMonitoring.handle_signal
def handle_signal(signum, frame)
Definition: esMonitoring.py:452
esMonitoring.AsyncLineReaderMixin
Definition: esMonitoring.py:208
esMonitoring.FDOutputListener.end
end
Definition: esMonitoring.py:350
esMonitoring.FDJsonHandler.__init__
def __init__(self, sock, es)
Definition: esMonitoring.py:264
esMonitoring.FDJsonServer
Definition: esMonitoring.py:294
esMonitoring.log
def log(s)
Definition: esMonitoring.py:11
esMonitoring.JsonEncoder
Definition: esMonitoring.py:19
esMonitoring.AsyncLineReaderTimeoutMixin
Definition: esMonitoring.py:245
esMonitoring.ElasticReport
Definition: esMonitoring.py:26
esMonitoring.FDOutputListener.finish
def finish(self)
Definition: esMonitoring.py:381
esMonitoring.AsyncLineReaderMixin.handle_line
def handle_line(self)
Definition: esMonitoring.py:241
esMonitoring.AsyncLineReaderTimeoutMixin.handle_read
def handle_read(self)
Definition: esMonitoring.py:252
esMonitoring.FDJsonServer.handle_close
def handle_close(self)
Definition: esMonitoring.py:336
esMonitoring.LineHistoryStart.__init__
def __init__(self, *kargs, **kwargs)
Definition: esMonitoring.py:190
esMonitoring.FDOutputListener.close_socket
close_socket
Definition: esMonitoring.py:347
esMonitoring.FDOutputListener.handle_timeout
def handle_timeout(self)
Definition: esMonitoring.py:369
submitPVValidationJobs.split
def split(sequence, size)
Definition: submitPVValidationJobs.py:352
esMonitoring.LineHistoryEnd.max_lines
max_lines
Definition: esMonitoring.py:162
esMonitoring.FDJsonServer.es
es
Definition: esMonitoring.py:299
esMonitoring.ElasticReport.__init__
def __init__(self, args)
Definition: esMonitoring.py:27
esMonitoring.FDJsonHandler.writable
def writable(self)
Definition: esMonitoring.py:291
esMonitoring.launch_monitoring
def launch_monitoring(args)
Definition: esMonitoring.py:387
esMonitoring.dt2time
def dt2time(dt)
Definition: esMonitoring.py:15
esMonitoring.ElasticReport.make_report
def make_report(self)
Definition: esMonitoring.py:120
esMonitoring.ElasticReport.seq
seq
Definition: esMonitoring.py:30
esMonitoring.AsyncLineReaderMixin.__init__
def __init__(self)
Definition: esMonitoring.py:209
esMonitoring.FDOutputListener.__init__
def __init__(self, fd, es, zlog, close_socket=None)
Definition: esMonitoring.py:341
esMonitoring.LineHistoryEnd.push
def push(self, rbuf)
Definition: esMonitoring.py:171
esMonitoring.LineHistoryEnd
Definition: esMonitoring.py:159
esMonitoring.AsyncLineReaderMixin.handle_close
def handle_close(self)
Definition: esMonitoring.py:212
esMonitoring.FDOutputListener.handle_close
def handle_close(self)
Definition: esMonitoring.py:375
esMonitoring.ElasticReport.update_doc_recursive
def update_doc_recursive(self, old_obj, new_obj)
Definition: esMonitoring.py:78
esMonitoring.ElasticReport.update_doc
def update_doc(self, keys)
Definition: esMonitoring.py:88
esMonitoring.LineHistoryStart
Definition: esMonitoring.py:189
esMonitoring.AsyncLineReaderTimeoutMixin.readable
def readable(self)
Definition: esMonitoring.py:256
esMonitoring.ElasticReport.defaults
def defaults(self)
Definition: esMonitoring.py:39
esMonitoring.ElasticReport.args
args
Definition: esMonitoring.py:31
mps_setup.append
append
Definition: mps_setup.py:85
esMonitoring.FDOutputListener.start
start
Definition: esMonitoring.py:349
esMonitoring.ElasticReport.make_id
def make_id(self)
Definition: esMonitoring.py:73
createfilelist.int
int
Definition: createfilelist.py:10
esMonitoring.AsyncLineReaderTimeoutMixin.__init__
def __init__(self, timeout_secs)
Definition: esMonitoring.py:246
esMonitoring.JsonEncoder.default
def default(self, obj)
Definition: esMonitoring.py:20
esMonitoring.LineHistoryEnd.size
size
Definition: esMonitoring.py:165
esMonitoring.FDOutputListener.writable
def writable(self)
Definition: esMonitoring.py:355
esMonitoring.ElasticReport.last_make_report
last_make_report
Definition: esMonitoring.py:28
esMonitoring.str
str
Definition: esMonitoring.py:459
esMonitoring.FDOutputListener.handle_line
def handle_line(self, line)
Definition: esMonitoring.py:358
esMonitoring.LineHistoryEnd.__init__
def __init__(self, max_bytes=16 *1024, max_lines=256)
Definition: esMonitoring.py:160
esMonitoring.FDJsonServer.fn
fn
Definition: esMonitoring.py:298
writeEcalDQMStatus.write
write
Definition: writeEcalDQMStatus.py:48
esMonitoring.LineHistoryEnd.buf
buf
Definition: esMonitoring.py:164
esMonitoring.FDJsonHandler.handle_write
def handle_write(self)
Definition: esMonitoring.py:288
esMonitoring.AsyncLineReaderMixin.line_buf
line_buf
not needed, since asyncore automatically handles close if len(rbuf) == 0: self.handle_close() return
Definition: esMonitoring.py:210
esMonitoring.ElasticReport.id_format
id_format
Definition: esMonitoring.py:40
esMonitoring.ElasticReport.update_ps_status
def update_ps_status(self)
Definition: esMonitoring.py:91
esMonitoring.FDJsonServer.handle_accept
def handle_accept(self)
Definition: esMonitoring.py:331
esMonitoring.LineHistoryEnd.max_bytes
max_bytes
Definition: esMonitoring.py:161
esMonitoring.LineHistoryStart.done
done
Definition: esMonitoring.py:192
esMonitoring.FDOutputListener
Definition: esMonitoring.py:340
esMonitoring.LineHistoryEnd.pop
def pop(self)
Definition: esMonitoring.py:167
esMonitoring.ElasticReport.make_report_timer
make_report_timer
Definition: esMonitoring.py:29
esMonitoring.FDOutputListener.zlog
zlog
Definition: esMonitoring.py:346
esMonitoring.FDJsonHandler
Definition: esMonitoring.py:263
esMonitoring.FDOutputListener.es
es
Definition: esMonitoring.py:345
esMonitoring.AsyncLineReaderMixin.handle_read
def handle_read(self)
Definition: esMonitoring.py:220
esMonitoring.FDJsonHandler.es
es
Definition: esMonitoring.py:268
esMonitoring.ElasticReport.update_mem_status
def update_mem_status(self)
Definition: esMonitoring.py:106
esMonitoring.FDJsonServer.__init__
def __init__(self, es, args)
Definition: esMonitoring.py:295