CMS 3D CMS Logo

esMonitoring.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import argparse
4 import subprocess
5 import socket, fcntl, select, atexit, signal, asyncore
6 import sys, os, time, datetime
7 import collections
8 import json
9 import zlib
10 
11 def log(s):
12  sys.stderr.write("m: " + s + "\n");
13  sys.stderr.flush()
14 
15 def dt2time(dt):
16  # convert datetime timstamp to unix
17  return time.mktime(dt.timetuple())
18 
19 class JsonEncoder(json.JSONEncoder):
20  def default(self, obj):
21  if hasattr(obj, 'to_json'):
22  return obj.to_json()
23 
24  return json.JSONEncoder.default(self, obj)
25 
27  def __init__(self, args):
28  self.last_make_report = None
30  self.seq = 0
31  self.args = args
32 
33  self.doc = {
34  "hostname": socket.gethostname(),
35  "sequence": self.seq,
36  "cmdline": args.pargs,
37  }
38 
39  def defaults(self):
40  self.id_format = u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
41  self.doc["type"] = "dqm-source-state"
42  self.doc["run"] = 0
43 
44  # figure out the tag
45  c = self.doc["cmdline"]
46  for l in c:
47  if l.endswith(".py"):
48  t = os.path.basename(l)
49  t = t.replace(".py", "")
50  t = t.replace("_cfg", "")
51  self.doc["tag"] = t
52 
53  pr = l.split("=")
54  if len(pr) > 1 and pr[0] == "runNumber" and pr[1].isdigit():
55  run = long(pr[1])
56  self.doc["run"] = run
57 
58  self.make_id()
59 
60  #if os.environ.has_key("GZIP_LOG"):
61  # self.doc["stdlog_gzip"] = os.environ["GZIP_LOG"]
62 
63  try:
64  self.doc["stdout_fn"] = os.readlink("/proc/self/fd/1")
65  self.doc["stderr_fn"] = os.readlink("/proc/self/fd/2")
66  except:
67  pass
68 
69  self.update_doc({ "extra": {
70  "environ": dict(os.environ)
71  }})
72 
73  def make_id(self):
74  id = self.id_format % self.doc
75  self.doc["_id"] = id
76  return id
77 
78  def update_doc_recursive(self, old_obj, new_obj):
79  for key, value in new_obj.items():
80  if (key in old_obj and
81  isinstance(value, dict) and
82  isinstance(old_obj[key], dict)):
83 
84  self.update_doc_recursive(old_obj[key], value)
85  else:
86  old_obj[key] = value
87 
88  def update_doc(self, keys):
89  self.update_doc_recursive(self.doc, keys)
90 
91  def update_ps_status(self):
92  try:
93  pid = int(self.doc["pid"])
94  fn = "/proc/%d/status" % pid
95  f = open(fn, "r")
96  d = {}
97  for line in f:
98  k, v = line.strip().split(":", 1)
99  d[k.strip()] = v.strip()
100  f.close()
101 
102  self.update_doc({ 'extra': { 'ps_info': d } })
103  except:
104  pass
105 
106  def update_mem_status(self):
107  try:
108  key = str(time.time())
109 
110  pid = int(self.doc["pid"])
111  fn = "/proc/%d/statm" % pid
112  f = open(fn, "r")
113  dct = { key: f.read().strip() }
114  f.close()
115 
116  self.update_doc({ 'extra': { 'mem_info': dct } })
117  except:
118  pass
119 
120  def make_report(self):
121  self.last_make_report = time.time()
122  self.doc["report_timestamp"] = time.time()
123  self.make_id()
124 
125  m_path = self.args.path
126 
127  if not os.path.isdir(m_path):
128  if self.args.debug:
129  log("File not written, because report directory does not exist: %s." % m_path)
130  # don't make a report if the directory is not available
131  return
132 
133  self.update_ps_status()
134  self.update_mem_status()
135 
136  fn_id = self.doc["_id"] + ".jsn"
137 
138  fn = os.path.join(m_path, fn_id)
139  fn_tmp = os.path.join(m_path, fn_id + ".tmp")
140 
141  with open(fn_tmp, "w") as f:
142  json.dump(self.doc, f, indent=True, cls=JsonEncoder)
143 
144  os.rename(fn_tmp, fn)
145 
146  if self.args.debug:
147  log("File %s written." % fn)
148 
149  def try_update(self):
150  # first time
151  if self.last_make_report is None:
152  return self.make_report()
153 
154  now = time.time()
155  delta = now - self.last_make_report
156  if delta > self.make_report_timer:
157  return self.make_report()
158 
160  def __init__(self, max_bytes=16*1024, max_lines=256):
161  self.max_bytes = max_bytes
162  self.max_lines = max_lines
163 
164  self.buf = collections.deque()
165  self.size = 0
166 
167  def pop(self):
168  elm = self.buf.popleft()
169  self.size -= len(elm)
170 
171  def push(self, rbuf):
172  self.buf.append(rbuf)
173  self.size += len(rbuf)
174 
175  def write(self, line):
176  line_size = len(line)
177 
178  while len(self.buf) and ((self.size + line_size) > self.max_bytes):
179  self.pop()
180 
181  while (len(self.buf) + 1) > self.max_lines:
182  self.pop()
183 
184  self.push(line)
185 
186  def to_json(self):
187  return list(self.buf)
188 
190  def __init__(self, *kargs, **kwargs):
191  LineHistoryEnd.__init__(self, *kargs, **kwargs)
192  self.done = False
193 
194  def write(self, line):
195  if self.done:
196  return
197 
198  if ((self.size + len(line)) > self.max_bytes):
199  self.done = True
200  return
201 
202  if (len(self.buf) > self.max_lines):
203  self.done = True
204  return
205 
206  self.push(line)
207 
209  def __init__(self):
210  self.line_buf = []
211 
212  def handle_close(self):
213  # closing fd
214  if len(self.line_buf):
215  self.handle_line("".join(self.line_buf))
216  self.line_buf = []
217 
218  self.close()
219 
220  def handle_read(self):
221  rbuf = self.recv(1024*16)
222  ## not needed, since asyncore automatically handles close
223  #if len(rbuf) == 0:
224  # self.handle_close()
225  # return
226 
227  self.line_buf.append(rbuf)
228  if "\n" in rbuf:
229  # split whatever we have
230  spl = "".join(self.line_buf).split("\n")
231 
232  while len(spl) > 1:
233  line = spl.pop(0)
234  self.handle_line(line + "\n")
235 
236  if len(spl[0]):
237  self.line_buf = [spl[0]]
238  else:
239  self.line_buf = []
240 
241  def handle_line(self):
242  # override this!
243  pass
244 
245 class AsyncLineReaderTimeoutMixin(AsyncLineReaderMixin):
246  def __init__(self, timeout_secs):
247  self.timeout_secs = timeout_secs
248  self.last_read = time.time()
249 
250  super(AsyncLineReaderTimeoutMixin, self).__init__()
251 
252  def handle_read(self):
253  self.last_read = time.time()
254  AsyncLineReaderMixin.handle_read(self)
255 
256  def readable(self):
257  if (time.time() - self.last_read) >= self.timeout_secs:
258  self.last_read = time.time()
259  self.handle_timeout()
260 
261  return super(AsyncLineReaderTimeoutMixin, self).readable()
262 
263 class FDJsonHandler(AsyncLineReaderMixin, asyncore.dispatcher):
264  def __init__(self, sock, es):
265  AsyncLineReaderMixin.__init__(self)
266  asyncore.dispatcher.__init__(self, sock)
267 
268  self.es = es
269 
270  def handle_line(self, line):
271  if len(line) < 4:
272  # keep alive 'ping'
273  self.es.try_update()
274  return
275 
276  try:
277  doc = json.loads(line)
278 
279  for k in ["pid", "run", "lumi"]:
280  if k in doc:
281  doc[k] = int(doc[k])
282 
283  self.es.update_doc_recursive(self.es.doc, doc)
284  self.es.try_update()
285  except:
286  log("cannot deserialize json len: %d content: %s" % (len(line), line))
287 
288  def handle_write(self):
289  pass
290 
291  def writable(self):
292  return False
293 
294 class FDJsonServer(asyncore.file_dispatcher):
295  def __init__(self, es, args):
296  asyncore.dispatcher.__init__(self)
297 
298  self.fn = None
299  self.es = es
300  self.args = args
301 
302  prefix = "/tmp"
303  if os.path.isdir(self.args.path):
304  prefix = self.args.path
305 
306  base = ".es_monitoring_pid%08d" % os.getpid()
307  self.fn = os.path.join(prefix, base)
308 
309  if self.args.debug:
310  log("Socket path: %s" % self.fn)
311 
312  if os.path.exists(self.fn):
313  os.unlink(self.fn)
314 
315  self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
316  oldmask = os.umask(0o077)
317  try:
318  self.bind(self.fn)
319  self.listen(5)
320  finally:
321  os.umask(oldmask)
322  pass
323 
324  atexit.register(self.cleanup)
325 
326  def cleanup(self):
327  if self.fn is not None:
328  if os.path.exists(self.fn):
329  os.unlink(self.fn)
330 
331  def handle_accept(self):
332  pair = self.accept()
333  if pair is not None:
334  handler = FDJsonHandler(pair[0], self.es)
335 
336  def handle_close(self):
337  self.close()
338  self.cleanup()
339 
340 class FDOutputListener(AsyncLineReaderTimeoutMixin, asyncore.file_dispatcher):
341  def __init__(self, fd, es, zlog, close_socket=None):
342  AsyncLineReaderTimeoutMixin.__init__(self, 5)
343  asyncore.file_dispatcher.__init__(self, fd)
344 
345  self.es = es
346  self.zlog = zlog
347  self.close_socket = close_socket
348 
351 
352  self.es.update_doc({ 'extra': { 'stdlog_start': self.start } })
353  self.es.update_doc({ 'extra': { 'stdlog_end': self.end } })
354 
355  def writable(self):
356  return False
357 
358  def handle_line(self, line):
359  if self.zlog is not None:
360  self.zlog.write(line)
361  else:
362  sys.stdout.write(line)
363  sys.stdout.flush()
364 
365  self.start.write(line)
366  self.end.write(line)
367  self.es.try_update()
368 
369  def handle_timeout(self):
370  self.es.try_update()
371 
372  if self.zlog is not None:
373  self.zlog.handle_timeout()
374 
375  def handle_close(self):
376  super(FDOutputListener, self).handle_close()
377 
378  if self.close_socket is not None:
379  self.close_socket.handle_close()
380 
381  def finish(self):
382  if self.zlog is not None:
383  self.zlog.finish()
384 
385 
386 CURRENT_PROC = []
388  es = ElasticReport(args=args)
389 
390  json_handler = FDJsonServer(es=es, args=args)
391  env = os.environ.copy()
392  env["DQM2_SOCKET"] = json_handler.fn
393 
394  def preexec():
395  try:
396  # ensure the child dies if we are SIGKILLED
397  import ctypes
398  libc = ctypes.CDLL("libc.so.6")
399  PR_SET_PDEATHSIG = 1
400  libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
401  except:
402  log("Failed to setup PR_SET_PDEATHSIG.")
403  pass
404 
405  p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True, env=env)
406  CURRENT_PROC.append(p)
407 
408  zlog = None
409  if args.zlog:
410  try:
411  relpath = os.path.dirname(__file__)
412  sys.path.append(relpath)
413  from ztee import GZipLog
414 
415  zlog_ = GZipLog(log_file=args.zlog)
416  es.update_doc({ "stdlog_gzip": args.zlog })
417 
418  log("Open gzip log file: %s" % args.zlog)
419  zlog = zlog_
420  except Exception as e:
421  log("Failed to setup zlog file: " + str(e))
422 
423  es.update_doc({ "pid": p.pid })
424  es.update_doc({ "monitoring_pid": os.getpid() })
425  es.update_doc({ "monitoring_socket": json_handler.fn })
426  es.defaults()
427  es.make_report()
428 
429  log_handler = FDOutputListener(fd=p.stdout.fileno(), es=es, zlog=zlog, close_socket=json_handler)
430  log_handler.handle_line("-- starting process: %s --\n" % str(args.pargs))
431 
432  try:
433  #manager.event_loop(timeout=5, exit_fd=p.stdout.fileno())
434  asyncore.loop(timeout=5)
435  except select.error as e:
436  # we have this on ctrl+c
437  # just terminate the child
438  log("Select error (we will terminate): " + str(e))
439  p.terminate()
440 
441  # at this point the program is dead
442  r = p.wait()
443  log_handler.handle_line("\n-- process exit: %s --\n" % str(r))
444  log_handler.finish()
445 
446  es.update_doc({ "exit_code": r })
447  es.make_report()
448 
449  CURRENT_PROC.remove(p)
450  return r
451 
452 def handle_signal(signum, frame):
453  for proc in CURRENT_PROC:
454  proc.send_signal(signum)
455 
456 if __name__ == "__main__":
457  parser = argparse.ArgumentParser(description="Monitor a child process and produce es documents.")
458  parser.add_argument('--debug', '-d', action='store_true', help="Debug mode")
459  parser.add_argument('--zlog', '-z', type=str, default=None, help="Don't output anything, zip the log file (uses ztee.py).")
460  parser.add_argument('--path', '-p', type=str, default="/tmp/dqm_monitoring/", help="Path for the monitoring output.")
461  parser.add_argument('pargs', nargs=argparse.REMAINDER)
462  args = parser.parse_args()
463 
464  if not args.pargs:
465  parser.print_help()
466  sys.exit(-1)
467  elif args.pargs[0] == "--":
468  # compat with 2.6
469  args.pargs = args.pargs[1:]
470 
471  # do some signal magic
472  signal.signal(signal.SIGINT, handle_signal)
473  signal.signal(signal.SIGTERM, handle_signal)
474 
475  sys.exit(launch_monitoring(args))
def __init__(self, kargs, kwargs)
def __init__(self, sock, es)
def dt2time(dt)
Definition: esMonitoring.py:15
def default(self, obj)
Definition: esMonitoring.py:20
def __init__(self, fd, es, zlog, close_socket=None)
def update_doc_recursive(self, old_obj, new_obj)
Definition: esMonitoring.py:78
def __init__(self, args)
Definition: esMonitoring.py:27
line_buf
not needed, since asyncore automatically handles close if len(rbuf) == 0: self.handle_close() return ...
def __init__(self, es, args)
def update_doc(self, keys)
Definition: esMonitoring.py:88
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def __init__(self, max_bytes=16 *1024, max_lines=256)
def handle_signal(signum, frame)
double split
Definition: MVATrainer.cc:139
def launch_monitoring(args)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
def handle_line(self, line)