CMS 3D CMS Logo

esMonitoring.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 import argparse
4 import subprocess
5 import socket, fcntl, select, atexit, signal, asyncore
6 import sys, os, time, datetime
7 import collections
8 import json
9 import zlib
10 
11 def log(s):
12  sys.stderr.write("m: " + s + "\n");
13  sys.stderr.flush()
14 
15 def dt2time(dt):
16  # convert datetime timstamp to unix
17  return time.mktime(dt.timetuple())
18 
19 class JsonEncoder(json.JSONEncoder):
20  def default(self, obj):
21  if hasattr(obj, 'to_json'):
22  return obj.to_json()
23 
24  return json.JSONEncoder.default(self, obj)
25 
27  def __init__(self, args):
28  self.last_make_report = None
30  self.seq = 0
31  self.args = args
32 
33  self.doc = {
34  "hostname": socket.gethostname(),
35  "sequence": self.seq,
36  "cmdline": args.pargs,
37  }
38 
39  def defaults(self):
40  self.id_format = u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
41  self.doc["type"] = "dqm-source-state"
42  self.doc["run"] = 0
43 
44  # figure out the tag
45  c = self.doc["cmdline"]
46  for l in c:
47  if l.endswith(".py"):
48  t = os.path.basename(l)
49  t = t.replace(".py", "")
50  t = t.replace("_cfg", "")
51  self.doc["tag"] = t
52 
53  pr = l.split("=")
54  if len(pr) > 1 and pr[0] == "runNumber" and pr[1].isdigit():
55  run = int(pr[1])
56  self.doc["run"] = run
57 
58  self.make_id()
59 
60  #if os.environ.has_key("GZIP_LOG"):
61  # self.doc["stdlog_gzip"] = os.environ["GZIP_LOG"]
62 
63  try:
64  self.doc["stdout_fn"] = os.readlink("/proc/self/fd/1")
65  self.doc["stderr_fn"] = os.readlink("/proc/self/fd/2")
66  except:
67  pass
68 
69  self.update_doc({ "extra": {
70  "environ": dict(os.environ)
71  }})
72 
73  def make_id(self):
74  id = self.id_format % self.doc
75  self.doc["_id"] = id
76  return id
77 
78  def update_doc_recursive(self, old_obj, new_obj):
79  for key, value in new_obj.items():
80  if (key in old_obj and
81  isinstance(value, dict) and
82  isinstance(old_obj[key], dict)):
83 
84  self.update_doc_recursive(old_obj[key], value)
85  else:
86  old_obj[key] = value
87 
88  def update_doc(self, keys):
89  self.update_doc_recursive(self.doc, keys)
90 
91  def update_ps_status(self):
92  try:
93  pid = int(self.doc["pid"])
94  fn = "/proc/%d/status" % pid
95  f = open(fn, "r")
96  d = {}
97  for line in f:
98  k, v = line.strip().split(":", 1)
99  d[k.strip()] = v.strip()
100  f.close()
101 
102  self.update_doc({ 'extra': { 'ps_info': d } })
103  except:
104  pass
105 
106  def update_mem_status(self):
107  try:
108  key = str(time.time())
109 
110  pid = int(self.doc["pid"])
111  fn = "/proc/%d/statm" % pid
112  f = open(fn, "r")
113  dct = { key: f.read().strip() }
114  f.close()
115 
116  self.update_doc({ 'extra': { 'mem_info': dct } })
117  except:
118  pass
119 
120  def make_report(self):
121  self.last_make_report = time.time()
122  self.doc["report_timestamp"] = time.time()
123  self.make_id()
124 
125  m_path = self.args.path
126 
127  if not os.path.isdir(m_path):
128  if self.args.debug:
129  log("File not written, because report directory does not exist: %s." % m_path)
130  # don't make a report if the directory is not available
131  return
132 
133  self.update_ps_status()
134  self.update_mem_status()
135 
136  fn_id = self.doc["_id"] + ".jsn"
137 
138  fn = os.path.join(m_path, fn_id)
139  fn_tmp = os.path.join(m_path, fn_id + ".tmp")
140 
141  with open(fn_tmp, "w") as f:
142  json.dump(self.doc, f, indent=True, cls=JsonEncoder)
143 
144  os.rename(fn_tmp, fn)
145 
146  if self.args.debug:
147  log("File %s written." % fn)
148 
149  def try_update(self):
150  # first time
151  if self.last_make_report is None:
152  return self.make_report()
153 
154  now = time.time()
155  delta = now - self.last_make_report
156  if delta > self.make_report_timer:
157  return self.make_report()
158 
160  def __init__(self, max_bytes=16*1024, max_lines=256):
161  self.max_bytes = max_bytes
162  self.max_lines = max_lines
163 
164  self.buf = collections.deque()
165  self.size = 0
166 
167  def pop(self):
168  elm = self.buf.popleft()
169  self.size -= len(elm)
170 
171  def push(self, rbuf):
172  self.buf.append(rbuf)
173  self.size += len(rbuf)
174 
175  def write(self, line):
176  line_size = len(line)
177 
178  while len(self.buf) and ((self.size + line_size) > self.max_bytes):
179  self.pop()
180 
181  while (len(self.buf) + 1) > self.max_lines:
182  self.pop()
183 
184  self.push(line)
185 
186  def to_json(self):
187  return list(self.buf)
188 
190  def __init__(self, *kargs, **kwargs):
191  LineHistoryEnd.__init__(self, *kargs, **kwargs)
192  self.done = False
193 
194  def write(self, line):
195  if self.done:
196  return
197 
198  if ((self.size + len(line)) > self.max_bytes):
199  self.done = True
200  return
201 
202  if (len(self.buf) > self.max_lines):
203  self.done = True
204  return
205 
206  self.push(line)
207 
209  def __init__(self):
210  self.line_buf = []
211 
212  def handle_close(self):
213  # closing fd
214  if len(self.line_buf):
215  self.handle_line("".join(self.line_buf))
216  self.line_buf = []
217 
218  self.close()
219 
220  def handle_read(self):
221  rbuf = self.recv(1024*16)
222  rbuf = rbuf.decode('utf-8')
223 
227 
228  self.line_buf.append(rbuf)
229  if "\n" in rbuf:
230  # split whatever we have
231  spl = "".join(self.line_buf).split("\n")
232 
233  while len(spl) > 1:
234  line = spl.pop(0)
235  self.handle_line(line + "\n")
236 
237  if len(spl[0]):
238  self.line_buf = [spl[0]]
239  else:
240  self.line_buf = []
241 
242  def handle_line(self):
243  # override this!
244  pass
245 
246 class AsyncLineReaderTimeoutMixin(AsyncLineReaderMixin):
247  def __init__(self, timeout_secs):
248  self.timeout_secs = timeout_secs
249  self.last_read = time.time()
250 
251  super(AsyncLineReaderTimeoutMixin, self).__init__()
252 
253  def handle_read(self):
254  self.last_read = time.time()
255  AsyncLineReaderMixin.handle_read(self)
256 
257  def readable(self):
258  if (time.time() - self.last_read) >= self.timeout_secs:
259  self.last_read = time.time()
260  self.handle_timeout()
261 
262  return super(AsyncLineReaderTimeoutMixin, self).readable()
263 
264 class FDJsonHandler(AsyncLineReaderMixin, asyncore.dispatcher):
265  def __init__(self, sock, es):
266  AsyncLineReaderMixin.__init__(self)
267  asyncore.dispatcher.__init__(self, sock)
268 
269  self.es = es
270 
271  def handle_line(self, line):
272  if len(line) < 4:
273  # keep alive 'ping'
274  self.es.try_update()
275  return
276 
277  try:
278  doc = json.loads(line)
279 
280  for k in ["pid", "run", "lumi"]:
281  if k in doc:
282  doc[k] = int(doc[k])
283 
284  self.es.update_doc_recursive(self.es.doc, doc)
285  self.es.try_update()
286  except:
287  log("cannot deserialize json len: %d content: %s" % (len(line), line))
288 
289  def handle_write(self):
290  pass
291 
292  def writable(self):
293  return False
294 
295 class FDJsonServer(asyncore.file_dispatcher):
296  def __init__(self, es, args):
297  asyncore.dispatcher.__init__(self)
298 
299  self.fn = None
300  self.es = es
301  self.args = args
302 
303  prefix = "/tmp"
304  if os.path.isdir(self.args.path):
305  prefix = self.args.path
306 
307  base = ".es_monitoring_pid%08d" % os.getpid()
308  self.fn = os.path.join(prefix, base)
309 
310  if self.args.debug:
311  log("Socket path: %s" % self.fn)
312 
313  if os.path.exists(self.fn):
314  os.unlink(self.fn)
315 
316  self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
317  oldmask = os.umask(0o077)
318  try:
319  self.bind(self.fn)
320  self.listen(5)
321  finally:
322  os.umask(oldmask)
323  pass
324 
325  atexit.register(self.cleanup)
326 
327  def cleanup(self):
328  if self.fn is not None:
329  if os.path.exists(self.fn):
330  os.unlink(self.fn)
331 
332  def handle_accept(self):
333  pair = self.accept()
334  if pair is not None:
335  handler = FDJsonHandler(pair[0], self.es)
336 
337  def handle_close(self):
338  self.close()
339  self.cleanup()
340 
341 class FDOutputListener(AsyncLineReaderTimeoutMixin, asyncore.file_dispatcher):
342  def __init__(self, fd, es, zlog, close_socket=None):
343  AsyncLineReaderTimeoutMixin.__init__(self, 5)
344  asyncore.file_dispatcher.__init__(self, fd)
345 
346  self.es = es
347  self.zlog = zlog
348  self.close_socket = close_socket
349 
352 
353  self.es.update_doc({ 'extra': { 'stdlog_start': self.start } })
354  self.es.update_doc({ 'extra': { 'stdlog_end': self.end } })
355 
356  def writable(self):
357  return False
358 
359  def handle_line(self, line):
360  if self.zlog is not None:
361  self.zlog.write(line)
362  else:
363  sys.stdout.write(line)
364  sys.stdout.flush()
365 
366  self.start.write(line)
367  self.end.write(line)
368  self.es.try_update()
369 
370  def handle_timeout(self):
371  self.es.try_update()
372 
373  if self.zlog is not None:
374  self.zlog.handle_timeout()
375 
376  def handle_close(self):
377  super(FDOutputListener, self).handle_close()
378 
379  if self.close_socket is not None:
381 
382  def finish(self):
383  if self.zlog is not None:
384  self.zlog.finish()
385 
386 
387 CURRENT_PROC = []
389  es = ElasticReport(args=args)
390 
391  json_handler = FDJsonServer(es=es, args=args)
392  env = os.environ.copy()
393  env["DQM2_SOCKET"] = json_handler.fn
394 
395  def preexec():
396  try:
397  # ensure the child dies if we are SIGKILLED
398  import ctypes
399  libc = ctypes.CDLL("libc.so.6")
400  PR_SET_PDEATHSIG = 1
401  libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
402  except:
403  log("Failed to setup PR_SET_PDEATHSIG.")
404  pass
405 
406  p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True, env=env)
407  CURRENT_PROC.append(p)
408 
409  zlog = None
410  if args.zlog:
411  try:
412  relpath = os.path.dirname(__file__)
413  sys.path.append(relpath)
414  from ztee import GZipLog
415 
416  zlog_ = GZipLog(log_file=args.zlog)
417  es.update_doc({ "stdlog_gzip": args.zlog })
418 
419  log("Open gzip log file: %s" % args.zlog)
420  zlog = zlog_
421  except Exception as e:
422  log("Failed to setup zlog file: " + str(e))
423 
424  es.update_doc({ "pid": p.pid })
425  es.update_doc({ "monitoring_pid": os.getpid() })
426  es.update_doc({ "monitoring_socket": json_handler.fn })
427  es.defaults()
428  es.make_report()
429 
430  log_handler = FDOutputListener(fd=p.stdout.fileno(), es=es, zlog=zlog, close_socket=json_handler)
431  log_handler.handle_line("-- starting process: %s --\n" % str(args.pargs))
432 
433  try:
434  #manager.event_loop(timeout=5, exit_fd=p.stdout.fileno())
435  asyncore.loop(timeout=5)
436  except select.error as e:
437  # we have this on ctrl+c
438  # just terminate the child
439  log("Select error (we will terminate): " + str(e))
440  p.terminate()
441 
442  # at this point the program is dead
443  r = p.wait()
444  log_handler.handle_line("\n-- process exit: %s --\n" % str(r))
445  log_handler.finish()
446 
447  es.update_doc({ "exit_code": r })
448  es.make_report()
449 
450  CURRENT_PROC.remove(p)
451  return r
452 
453 def handle_signal(signum, frame):
454  for proc in CURRENT_PROC:
455  proc.send_signal(signum)
456 
457 if __name__ == "__main__":
458  parser = argparse.ArgumentParser(description="Monitor a child process and produce es documents.")
459  parser.add_argument('--debug', '-d', action='store_true', help="Debug mode")
460  parser.add_argument('--zlog', '-z', type=str, default=None, help="Don't output anything, zip the log file (uses ztee.py).")
461  parser.add_argument('--path', '-p', type=str, default="/tmp/dqm_monitoring/", help="Path for the monitoring output.")
462  parser.add_argument('pargs', nargs=argparse.REMAINDER)
463  args = parser.parse_args()
464 
465  if not args.pargs:
466  parser.print_help()
467  sys.exit(-1)
468  elif args.pargs[0] == "--":
469  # compat with 2.6
470  args.pargs = args.pargs[1:]
471 
472  # do some signal magic
473  signal.signal(signal.SIGINT, handle_signal)
474  signal.signal(signal.SIGTERM, handle_signal)
475 
476  sys.exit(launch_monitoring(args))
def __init__(self, kargs, kwargs)
def __init__(self, sock, es)
def dt2time(dt)
Definition: esMonitoring.py:15
def default(self, obj)
Definition: esMonitoring.py:20
def __init__(self, fd, es, zlog, close_socket=None)
def update_doc_recursive(self, old_obj, new_obj)
Definition: esMonitoring.py:78
def __init__(self, args)
Definition: esMonitoring.py:27
line_buf
not needed, since asyncore automatically handles close if len(rbuf) == 0: self.handle_close() return ...
def __init__(self, es, args)
def update_doc(self, keys)
Definition: esMonitoring.py:88
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
def __init__(self, max_bytes=16 *1024, max_lines=256)
def handle_signal(signum, frame)
def launch_monitoring(args)
def handle_line(self, line)