5 import socket, fcntl, select, atexit, signal, asyncore
6 import sys, os, time, datetime
12 sys.stderr.write(
"m: " + s +
"\n");
17 return time.mktime(dt.timetuple())
21 if hasattr(obj,
'to_json'):
24 return json.JSONEncoder.default(self, obj)
34 "hostname": socket.gethostname(),
36 "cmdline": args.pargs,
40 self.
id_format =
u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d" 41 self.
doc[
"type"] =
"dqm-source-state" 45 c = self.
doc[
"cmdline"]
48 t = os.path.basename(l)
49 t = t.replace(
".py",
"")
50 t = t.replace(
"_cfg",
"")
54 if len(pr) > 1
and pr[0] ==
"runNumber" and pr[1].isdigit():
64 self.
doc[
"stdout_fn"] = os.readlink(
"/proc/self/fd/1")
65 self.
doc[
"stderr_fn"] = os.readlink(
"/proc/self/fd/2")
70 "environ": dict(os.environ)
79 for key, value
in new_obj.items():
80 if (key
in old_obj
and 81 isinstance(value, dict)
and 82 isinstance(old_obj[key], dict)):
94 fn =
"/proc/%d/status" % pid
98 k, v = line.strip().
split(
":", 1)
99 d[k.strip()] = v.strip()
102 self.
update_doc({
'extra': {
'ps_info': d } })
108 key =
str(time.time())
110 pid =
int(self.
doc[
"pid"])
111 fn =
"/proc/%d/statm" % pid
113 dct = { key: f.read().strip() } 116 self.update_doc({ 'extra': {
'mem_info': dct } })
122 self.
doc[
"report_timestamp"] = time.time()
125 m_path = self.
args.path
127 if not os.path.isdir(m_path):
129 log(
"File not written, because report directory does not exist: %s." % m_path)
136 fn_id = self.
doc[
"_id"] +
".jsn" 138 fn = os.path.join(m_path, fn_id)
139 fn_tmp = os.path.join(m_path, fn_id +
".tmp")
141 with open(fn_tmp,
"w")
as f:
142 json.dump(self.
doc, f, indent=
True, cls=JsonEncoder)
144 os.rename(fn_tmp, fn)
147 log(
"File %s written." % fn)
160 def __init__(self, max_bytes=16*1024, max_lines=256):
164 self.
buf = collections.deque()
168 elm = self.
buf.popleft()
169 self.
size -= len(elm)
173 self.
size += len(rbuf)
176 line_size = len(line)
187 return list(self.
buf)
191 LineHistoryEnd.__init__(self, *kargs, **kwargs)
221 rbuf = self.recv(1024*16)
222 rbuf = rbuf.decode(
'utf-8')
246 class AsyncLineReaderTimeoutMixin(AsyncLineReaderMixin):
251 super(AsyncLineReaderTimeoutMixin, self).
__init__()
255 AsyncLineReaderMixin.handle_read(self)
260 self.handle_timeout()
262 return super(AsyncLineReaderTimeoutMixin, self).
readable()
266 AsyncLineReaderMixin.__init__(self)
267 asyncore.dispatcher.__init__(self, sock)
278 doc = json.loads(line)
280 for k
in [
"pid",
"run",
"lumi"]:
284 self.
es.update_doc_recursive(self.
es.doc, doc)
287 log(
"cannot deserialize json len: %d content: %s" % (len(line), line))
297 asyncore.dispatcher.__init__(self)
304 if os.path.isdir(self.
args.path):
305 prefix = self.
args.path
307 base =
".es_monitoring_pid%08d" % os.getpid()
308 self.
fn = os.path.join(prefix, base)
311 log(
"Socket path: %s" % self.
fn)
313 if os.path.exists(self.
fn):
316 self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
317 oldmask = os.umask(0o077)
328 if self.
fn is not None:
329 if os.path.exists(self.
fn):
342 def __init__(self, fd, es, zlog, close_socket=None):
343 AsyncLineReaderTimeoutMixin.__init__(self, 5)
344 asyncore.file_dispatcher.__init__(self, fd)
353 self.
es.update_doc({
'extra': {
'stdlog_start': self.
start } })
354 self.
es.update_doc({
'extra': {
'stdlog_end': self.
end } })
360 if self.
zlog is not None:
363 sys.stdout.write(line)
373 if self.
zlog is not None:
383 if self.
zlog is not None:
392 env = os.environ.copy()
393 env[
"DQM2_SOCKET"] = json_handler.fn
399 libc = ctypes.CDLL(
"libc.so.6")
401 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
403 log(
"Failed to setup PR_SET_PDEATHSIG.")
406 p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=
True, env=env)
407 CURRENT_PROC.append(p)
412 relpath = os.path.dirname(__file__)
413 sys.path.append(relpath)
414 from ztee
import GZipLog
416 zlog_ = GZipLog(log_file=args.zlog)
417 es.update_doc({
"stdlog_gzip": args.zlog })
419 log(
"Open gzip log file: %s" % args.zlog)
421 except Exception
as e:
422 log(
"Failed to setup zlog file: " +
str(e))
424 es.update_doc({
"pid": p.pid })
425 es.update_doc({
"monitoring_pid": os.getpid() })
426 es.update_doc({
"monitoring_socket": json_handler.fn })
430 log_handler =
FDOutputListener(fd=p.stdout.fileno(), es=es, zlog=zlog, close_socket=json_handler)
431 log_handler.handle_line(
"-- starting process: %s --\n" %
str(args.pargs))
435 asyncore.loop(timeout=5)
436 except select.error
as e:
439 log(
"Select error (we will terminate): " +
str(e))
444 log_handler.handle_line(
"\n-- process exit: %s --\n" %
str(r))
447 es.update_doc({
"exit_code": r })
450 CURRENT_PROC.remove(p)
454 for proc
in CURRENT_PROC:
455 proc.send_signal(signum)
457 if __name__ ==
"__main__":
458 parser = argparse.ArgumentParser(description=
"Monitor a child process and produce es documents.")
459 parser.add_argument(
'--debug',
'-d', action=
'store_true', help=
"Debug mode")
460 parser.add_argument(
'--zlog',
'-z', type=str, default=
None, help=
"Don't output anything, zip the log file (uses ztee.py).")
461 parser.add_argument(
'--path',
'-p', type=str, default=
"/tmp/dqm_monitoring/", help=
"Path for the monitoring output.")
462 parser.add_argument(
'pargs', nargs=argparse.REMAINDER)
463 args = parser.parse_args()
468 elif args.pargs[0] ==
"--":
470 args.pargs = args.pargs[1:]
473 signal.signal(signal.SIGINT, handle_signal)
474 signal.signal(signal.SIGTERM, handle_signal)
def __init__(self, kargs, kwargs)
def __init__(self, sock, es)
def __init__(self, fd, es, zlog, close_socket=None)
def update_doc_recursive(self, old_obj, new_obj)
line_buf
not needed, since asyncore automatically handles close if len(rbuf) == 0: self.handle_close() return ...
def __init__(self, es, args)
def update_doc(self, keys)
def split(sequence, size)
static std::string join(char **cmd)
def update_ps_status(self)
def handle_line(self, line)
def __init__(self, max_bytes=16 *1024, max_lines=256)
def __init__(self, timeout_secs)
def handle_signal(signum, frame)
def update_mem_status(self)
def launch_monitoring(args)
def handle_line(self, line)