5 import socket, fcntl, select, atexit, signal, asyncore
6 import sys, os, time, datetime
12 sys.stderr.write(
"m: " + s +
"\n");
17 return time.mktime(dt.timetuple())
21 if hasattr(obj,
'to_json'):
24 return json.JSONEncoder.default(self, obj)
34 "hostname": socket.gethostname(),
36 "cmdline": args.pargs,
40 self.
id_format =
u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
41 self.
doc[
"type"] =
"dqm-source-state"
45 c = self.
doc[
"cmdline"]
48 t = os.path.basename(l)
49 t = t.replace(
".py",
"")
50 t = t.replace(
"_cfg",
"")
54 if len(pr) > 1
and pr[0] ==
"runNumber" and pr[1].isdigit():
64 self.
doc[
"stdout_fn"] = os.readlink(
"/proc/self/fd/1")
65 self.
doc[
"stderr_fn"] = os.readlink(
"/proc/self/fd/2")
70 "environ": dict(os.environ)
79 for key, value
in new_obj.items():
80 if (key
in old_obj
and
81 isinstance(value, dict)
and
82 isinstance(old_obj[key], dict)):
94 fn =
"/proc/%d/status" % pid
98 k, v = line.strip().
split(
":", 1)
99 d[k.strip()] = v.strip()
102 self.
update_doc({
'extra': {
'ps_info': d } })
108 key =
str(time.time())
110 pid =
int(self.
doc[
"pid"])
111 fn =
"/proc/%d/statm" % pid
113 dct = { key: f.read().
strip() }
116 self.
update_doc({
'extra': {
'mem_info': dct } })
122 self.
doc[
"report_timestamp"] = time.time()
125 m_path = self.
args.path
127 if not os.path.isdir(m_path):
129 log(
"File not written, because report directory does not exist: %s." % m_path)
136 fn_id = self.
doc[
"_id"] +
".jsn"
138 fn = os.path.join(m_path, fn_id)
139 fn_tmp = os.path.join(m_path, fn_id +
".tmp")
141 with open(fn_tmp,
"w")
as f:
142 json.dump(self.
doc, f, indent=
True, cls=JsonEncoder)
144 os.rename(fn_tmp, fn)
147 log(
"File %s written." % fn)
160 def __init__(self, max_bytes=16*1024, max_lines=256):
164 self.
buf = collections.deque()
168 elm = self.
buf.popleft()
169 self.
size -= len(elm)
173 self.
size += len(rbuf)
176 line_size = len(line)
191 LineHistoryEnd.__init__(self, *kargs, **kwargs)
221 rbuf = self.recv(1024*16)
245 class AsyncLineReaderTimeoutMixin(AsyncLineReaderMixin):
250 super(AsyncLineReaderTimeoutMixin, self).
__init__()
254 AsyncLineReaderMixin.handle_read(self)
259 self.handle_timeout()
261 return super(AsyncLineReaderTimeoutMixin, self).
readable()
265 AsyncLineReaderMixin.__init__(self)
266 asyncore.dispatcher.__init__(self, sock)
277 doc = json.loads(line)
279 for k
in [
"pid",
"run",
"lumi"]:
283 self.
es.update_doc_recursive(self.
es.doc, doc)
286 log(
"cannot deserialize json len: %d content: %s" % (len(line), line))
296 asyncore.dispatcher.__init__(self)
303 if os.path.isdir(self.
args.path):
304 prefix = self.
args.path
306 base =
".es_monitoring_pid%08d" % os.getpid()
307 self.
fn = os.path.join(prefix, base)
310 log(
"Socket path: %s" % self.
fn)
312 if os.path.exists(self.
fn):
315 self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
316 oldmask = os.umask(0o077)
327 if self.
fn is not None:
328 if os.path.exists(self.
fn):
341 def __init__(self, fd, es, zlog, close_socket=None):
342 AsyncLineReaderTimeoutMixin.__init__(self, 5)
343 asyncore.file_dispatcher.__init__(self, fd)
352 self.
es.update_doc({
'extra': {
'stdlog_start': self.
start } })
353 self.
es.update_doc({
'extra': {
'stdlog_end': self.
end } })
359 if self.
zlog is not None:
362 sys.stdout.write(line)
372 if self.
zlog is not None:
382 if self.
zlog is not None:
391 env = os.environ.copy()
392 env[
"DQM2_SOCKET"] = json_handler.fn
398 libc = ctypes.CDLL(
"libc.so.6")
400 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
402 log(
"Failed to setup PR_SET_PDEATHSIG.")
405 p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=
True, env=env)
406 CURRENT_PROC.append(p)
411 relpath = os.path.dirname(__file__)
412 sys.path.append(relpath)
413 from ztee
import GZipLog
415 zlog_ = GZipLog(log_file=args.zlog)
416 es.update_doc({
"stdlog_gzip": args.zlog })
418 log(
"Open gzip log file: %s" % args.zlog)
420 except Exception
as e:
421 log(
"Failed to setup zlog file: " +
str(e))
423 es.update_doc({
"pid": p.pid })
424 es.update_doc({
"monitoring_pid": os.getpid() })
425 es.update_doc({
"monitoring_socket": json_handler.fn })
429 log_handler =
FDOutputListener(fd=p.stdout.fileno(), es=es, zlog=zlog, close_socket=json_handler)
430 log_handler.handle_line(
"-- starting process: %s --\n" %
str(args.pargs))
434 asyncore.loop(timeout=5)
435 except select.error
as e:
438 log(
"Select error (we will terminate): " +
str(e))
443 log_handler.handle_line(
"\n-- process exit: %s --\n" %
str(r))
446 es.update_doc({
"exit_code": r })
449 CURRENT_PROC.remove(p)
453 for proc
in CURRENT_PROC:
454 proc.send_signal(signum)
456 if __name__ ==
"__main__":
457 parser = argparse.ArgumentParser(description=
"Monitor a child process and produce es documents.")
458 parser.add_argument(
'--debug',
'-d', action=
'store_true', help=
"Debug mode")
459 parser.add_argument(
'--zlog',
'-z', type=str, default=
None, help=
"Don't output anything, zip the log file (uses ztee.py).")
460 parser.add_argument(
'--path',
'-p', type=str, default=
"/tmp/dqm_monitoring/", help=
"Path for the monitoring output.")
461 parser.add_argument(
'pargs', nargs=argparse.REMAINDER)
462 args = parser.parse_args()
467 elif args.pargs[0] ==
"--":
469 args.pargs = args.pargs[1:]
472 signal.signal(signal.SIGINT, handle_signal)
473 signal.signal(signal.SIGTERM, handle_signal)