CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
esMonitoring.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import argparse
4 import subprocess
5 import socket, fcntl, select, atexit, signal, asyncore
6 import sys, os, time, datetime
7 import collections
8 import json
9 import zlib
10 
11 def log(s):
12  sys.stderr.write("m: " + s + "\n");
13  sys.stderr.flush()
14 
15 def dt2time(dt):
16  # convert datetime timstamp to unix
17  return time.mktime(dt.timetuple())
18 
19 class JsonEncoder(json.JSONEncoder):
20  def default(self, obj):
21  if hasattr(obj, 'to_json'):
22  x = obj.to_json()
23  return json.JSONEncoder.encode(self, x)
24 
25  return json.JSONEncoder.default(self, obj)
26 
28  def __init__(self, args):
29  self.s_path = "/tmp/dqm_monitoring/"
30 
31  self.last_make_report = None
33  self.seq = 0
34  self.args = args
35 
36  self.doc = {
37  "hostname": socket.gethostname(),
38  "sequence": self.seq,
39  "cmdline": args.pargs,
40  }
41 
42  def defaults(self):
43  self.id_format = u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
44  self.doc["type"] = "dqm-source-state"
45  self.doc["run"] = 0
46 
47  # figure out the tag
48  c = self.doc["cmdline"]
49  for l in c:
50  if l.endswith(".py"):
51  t = os.path.basename(l)
52  t = t.replace(".py", "")
53  t = t.replace("_cfg", "")
54  self.doc["tag"] = t
55 
56  pr = l.split("=")
57  if len(pr) > 1 and pr[0] == "runNumber" and pr[1].isdigit():
58  run = long(pr[1])
59  self.doc["run"] = run
60 
61  self.make_id()
62 
63  #if os.environ.has_key("GZIP_LOG"):
64  # self.doc["stdlog_gzip"] = os.environ["GZIP_LOG"]
65 
66  try:
67  self.doc["stdout_fn"] = os.readlink("/proc/self/fd/1")
68  self.doc["stderr_fn"] = os.readlink("/proc/self/fd/2")
69  except:
70  pass
71 
72  self.update_doc({ "extra": {
73  "environ": dict(os.environ)
74  }})
75 
76  def make_id(self):
77  id = self.id_format % self.doc
78  self.doc["_id"] = id
79  return id
80 
81  def update_doc_recursive(self, old_obj, new_obj):
82  for key, value in new_obj.items():
83  if (old_obj.has_key(key) and
84  isinstance(value, dict) and
85  isinstance(old_obj[key], dict)):
86 
87  self.update_doc_recursive(old_obj[key], value)
88  else:
89  old_obj[key] = value
90 
91  def update_doc(self, keys):
92  self.update_doc_recursive(self.doc, keys)
93 
94  def update_ps_status(self):
95  try:
96  pid = int(self.doc["pid"])
97  fn = "/proc/%d/status" % pid
98  f = open(fn, "r")
99  d = {}
100  for line in f:
101  k, v = line.strip().split(":", 1)
102  d[k.strip()] = v.strip()
103  f.close()
104 
105  self.update_doc({ 'extra': { 'ps_info': d } })
106  except:
107  pass
108 
109  def update_mem_status(self):
110  try:
111  key = str(time.time())
112 
113  pid = int(self.doc["pid"])
114  fn = "/proc/%d/statm" % pid
115  f = open(fn, "r")
116  dct = { key: f.read().strip() }
117  f.close()
118 
119  self.update_doc({ 'extra': { 'mem_info': dct } })
120  except:
121  pass
122 
123  def make_report(self):
124  self.last_make_report = time.time()
125  self.doc["report_timestamp"] = time.time()
126  self.make_id()
127 
128  if not os.path.isdir(self.s_path):
129  # don't make a report if the directory is not available
130  return
131 
132  self.update_ps_status()
133  self.update_mem_status()
134 
135  fn_id = self.doc["_id"] + ".jsn"
136 
137  fn = os.path.join(self.s_path, fn_id)
138  fn_tmp = os.path.join(self.s_path, fn_id + ".tmp")
139 
140  with open(fn_tmp, "w") as f:
141  json.dump(self.doc, f, indent=True, cls=JsonEncoder)
142 
143  os.rename(fn_tmp, fn)
144 
145  if self.args.debug:
146  log("File %s written." % fn)
147 
148  def try_update(self):
149  # first time
150  if self.last_make_report is None:
151  return self.make_report()
152 
153  now = time.time()
154  delta = now - self.last_make_report
155  if delta > self.make_report_timer:
156  return self.make_report()
157 
159  def __init__(self, max_bytes=16*1024, max_lines=256):
160  self.max_bytes = max_bytes
161  self.max_lines = max_lines
162 
163  self.buf = collections.deque()
164  self.size = 0
165 
166  def pop(self):
167  elm = self.buf.popleft()
168  self.size -= len(elm)
169 
170  def push(self, rbuf):
171  self.buf.append(rbuf)
172  self.size += len(rbuf)
173 
174  def write(self, line):
175  line_size = len(line)
176 
177  while len(self.buf) and ((self.size + line_size) > self.max_bytes):
178  self.pop()
179 
180  while (len(self.buf) + 1) > self.max_lines:
181  self.pop()
182 
183  self.push(line)
184 
185  def to_json(self):
186  return list(self.buf)
187 
189  def __init__(self, *kargs, **kwargs):
190  LineHistoryEnd.__init__(self, *kargs, **kwargs)
191  self.done = False
192 
193  def write(self, line):
194  if self.done:
195  return
196 
197  if ((self.size + len(line)) > self.max_bytes):
198  self.done = True
199  return
200 
201  if (len(self.buf) > self.max_lines):
202  self.done = True
203  return
204 
205  self.push(line)
206 
208  def __init__(self):
209  self.line_buf = []
210 
211  def handle_close(self):
212  # closing fd
213  if len(self.line_buf):
214  self.handle_line("".join(self.line_buf))
215  self.line_buf = []
216 
217  self.close()
218 
219  def handle_read(self):
220  rbuf = self.recv(1024*16)
221  ## not needed, since asyncore automatically handles close
222  #if len(rbuf) == 0:
223  # self.handle_close()
224  # return
225 
226  self.line_buf.append(rbuf)
227  if "\n" in rbuf:
228  # split whatever we have
229  spl = "".join(self.line_buf).split("\n")
230 
231  while len(spl) > 1:
232  line = spl.pop(0)
233  self.handle_line(line + "\n")
234 
235  if len(spl[0]):
236  self.line_buf = [spl[0]]
237  else:
238  self.line_buf = []
239 
240  def handle_line(self):
241  # override this!
242  pass
243 
244 class AsyncLineReaderTimeoutMixin(AsyncLineReaderMixin):
245  def __init__(self, timeout_secs):
246  self.timeout_secs = timeout_secs
247  self.last_read = time.time()
248 
249  super(AsyncLineReaderTimeoutMixin, self).__init__()
250 
251  def handle_read(self):
252  self.last_read = time.time()
253  AsyncLineReaderMixin.handle_read(self)
254 
255  def readable(self):
256  if (time.time() - self.last_read) >= self.timeout_secs:
257  self.last_read = time.time()
258  self.handle_timeout()
259 
260  return super(AsyncLineReaderTimeoutMixin, self).readable()
261 
262 class FDJsonHandler(AsyncLineReaderMixin, asyncore.dispatcher):
263  def __init__(self, sock, es):
264  AsyncLineReaderMixin.__init__(self)
265  asyncore.dispatcher.__init__(self, sock)
266 
267  self.es = es
268 
269  def handle_line(self, line):
270  if len(line) < 4:
271  # keep alive 'ping'
272  self.es.try_update()
273  return
274 
275  try:
276  doc = json.loads(line)
277 
278  for k in ["pid", "run", "lumi"]:
279  if doc.has_key(k):
280  doc[k] = int(doc[k])
281 
282  self.es.update_doc_recursive(self.es.doc, doc)
283  self.es.try_update()
284  except:
285  log("cannot deserialize json len: %d content: %s" % (len(line), line))
286 
287  def handle_write(self):
288  pass
289 
290  def writable(self):
291  return False
292 
293 class FDJsonServer(asyncore.file_dispatcher):
294  def __init__(self, es):
295  asyncore.dispatcher.__init__(self)
296 
297  self.fn = None
298  self.es = es
299 
300  prefix = "/tmp"
301  if os.path.isdir("/tmp/dqm_monitoring"):
302  prefix = "/tmp/dqm_monitoring"
303 
304  base = ".es_monitoring_pid%08d" % os.getpid()
305  self.fn = os.path.join(prefix, base)
306 
307  if os.path.exists(self.fn):
308  os.unlink(self.fn)
309 
310  self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
311  oldmask = os.umask(0077)
312  try:
313  self.bind(self.fn)
314  self.listen(5)
315  finally:
316  os.umask(oldmask)
317  pass
318 
319  atexit.register(self.cleanup)
320 
321  def cleanup(self):
322  if self.fn is not None:
323  if os.path.exists(self.fn):
324  os.unlink(self.fn)
325 
326  def handle_accept(self):
327  pair = self.accept()
328  if pair is not None:
329  handler = FDJsonHandler(pair[0], self.es)
330 
331  def handle_close(self):
332  self.close()
333  self.cleanup()
334 
335 class FDOutputListener(AsyncLineReaderTimeoutMixin, asyncore.file_dispatcher):
336  def __init__(self, fd, es, zlog, close_socket=None):
337  AsyncLineReaderTimeoutMixin.__init__(self, 5)
338  asyncore.file_dispatcher.__init__(self, fd)
339 
340  self.es = es
341  self.zlog = zlog
342  self.close_socket = close_socket
343 
346 
347  self.es.update_doc({ 'extra': { 'stdlog_start': self.start } })
348  self.es.update_doc({ 'extra': { 'stdlog_end': self.end } })
349 
350  def writable(self):
351  return False
352 
353  def handle_line(self, line):
354  if self.zlog is not None:
355  self.zlog.write(line)
356  else:
357  sys.stdout.write(line)
358  sys.stdout.flush()
359 
360  self.start.write(line)
361  self.end.write(line)
362  self.es.try_update()
363 
364  def handle_timeout(self):
365  self.es.try_update()
366 
367  if self.zlog is not None:
368  self.zlog.handle_timeout()
369 
370  def handle_close(self):
371  super(FDOutputListener, self).handle_close()
372 
373  if self.close_socket is not None:
374  self.close_socket.handle_close()
375 
376  def finish(self):
377  if self.zlog is not None:
378  self.zlog.finish()
379 
380 
381 CURRENT_PROC = []
383  es = ElasticReport(args=args)
384 
385  json_handler = FDJsonServer(es=es)
386  env = os.environ.copy()
387  env["DQM2_SOCKET"] = json_handler.fn
388 
389  def preexec():
390  try:
391  # ensure the child dies if we are SIGKILLED
392  import ctypes
393  libc = ctypes.CDLL("libc.so.6")
394  PR_SET_PDEATHSIG = 1
395  libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
396  except:
397  log("Failed to setup PR_SET_PDEATHSIG.")
398  pass
399 
400  p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True, env=env)
401  CURRENT_PROC.append(p)
402 
403  zlog = None
404  if args.zlog:
405  try:
406  relpath = os.path.dirname(__file__)
407  sys.path.append(relpath)
408  from ztee import GZipLog
409 
410  zlog_ = GZipLog(log_file=args.zlog)
411  es.update_doc({ "stdlog_gzip": args.zlog })
412 
413  log("Open gzip log file: %s" % args.zlog)
414  zlog = zlog_
415  except Exception, e:
416  log("Failed to setup zlog file: " + str(e))
417 
418  es.update_doc({ "pid": p.pid })
419  es.defaults()
420  es.make_report()
421 
422  log_handler = FDOutputListener(fd=p.stdout.fileno(), es=es, zlog=zlog, close_socket=json_handler)
423  log_handler.handle_line("-- starting process: %s --\n" % str(args.pargs))
424 
425  try:
426  #manager.event_loop(timeout=5, exit_fd=p.stdout.fileno())
427  asyncore.loop(timeout=5)
428  except select.error, e:
429  # we have this on ctrl+c
430  # just terminate the child
431  log("Select error (we will terminate): " + str(e))
432  p.terminate()
433 
434  # at this point the program is dead
435  r = p.wait()
436  log_handler.handle_line("\n-- process exit: %s --\n" % str(r))
437  log_handler.finish()
438 
439  es.update_doc({ "exit_code": r })
440  es.make_report()
441 
442  CURRENT_PROC.remove(p)
443  return r
444 
445 def handle_signal(signum, frame):
446  for proc in CURRENT_PROC:
447  proc.send_signal(signum)
448 
449 if __name__ == "__main__":
450  parser = argparse.ArgumentParser(description="Monitor a child process and produce es documents.")
451  parser.add_argument('--debug', '-d', action='store_true', help="Debug mode")
452  parser.add_argument('--zlog', '-z', type=str, default=None, help="Don't output anything, zip the log file (uses ztee.py).")
453  parser.add_argument('pargs', nargs=argparse.REMAINDER)
454  args = parser.parse_args()
455 
456  if not args.pargs:
457  parser.print_help()
458  sys.exit(-1)
459  elif args.pargs[0] == "--":
460  # compat with 2.6
461  args.pargs = args.pargs[1:]
462 
463  # do some signal magic
464  signal.signal(signal.SIGINT, handle_signal)
465  signal.signal(signal.SIGTERM, handle_signal)
466 
467  sys.exit(launch_monitoring(args))
line_buf
not needed, since asyncore automatically handles close if len(rbuf) == 0: self.handle_close() return ...
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def launch_monitoring
list object
Definition: dbtoconf.py:77
double split
Definition: MVATrainer.cc:139
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run