5 import socket, fcntl, select, atexit, signal
6 import sys, os, time, datetime
11 sys.stderr.write(
"m: " + s +
"\n");
16 return time.mktime(dt.timetuple())
31 "hostname": socket.gethostname(),
33 "cmdline": args.pargs,
39 self.
id_format =
u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
40 self.
doc[
"type"] =
"dqm-source-state"
44 c = self.
doc[
"cmdline"]
47 t = os.path.basename(l)
48 t = t.replace(
".py",
"")
49 t = t.replace(
"_cfg",
"")
53 if len(pr) > 1
and pr[0] ==
"runNumber" and pr[1].isdigit():
62 self.
doc[
"stdout_fn"] = os.readlink(
"/proc/self/fd/1")
63 self.
doc[
"stderr_fn"] = os.readlink(
"/proc/self/fd/2")
74 for key, value
in new_obj.items():
75 if (old_obj.has_key(key)
and
76 isinstance(value, dict)
and
77 isinstance(old_obj[key], dict)):
87 while self.s_json.have_docs():
88 doc = self.s_json.get_doc()
91 for k
in [
"pid",
"run",
"lumi"]:
99 pid = int(self.
doc[
"pid"])
100 fn =
"/proc/%d/status" % pid
104 k, v = line.strip().
split(
":", 1)
105 d[k.strip()] = v.strip()
108 self.
update_doc({
'extra': {
'ps_info': d } })
114 key = str(time.time())
116 pid = int(self.
doc[
"pid"])
117 fn =
"/proc/%d/statm" % pid
119 dct = { key: f.read().strip() }
122 self.update_doc({ 'extra': {
'mem_info': dct } })
128 txt = self.s_history.read()
129 self.
update_doc({
'extra': {
'stdlog': txt } })
133 self.
doc[
"report_timestamp"] = time.time()
137 if not os.path.isdir(self.
s_path):
145 fn_id = self.
doc[
"_id"] +
".jsn"
148 tm =
"%.06f+" % time.time()
151 fn = os.path.join(self.
s_path, fn_id)
152 fn_tmp = os.path.join(self.
s_path, fn_id +
".tmp")
154 with open(fn_tmp,
"w")
as f:
155 json.dump(self.
doc, f, indent=
True)
157 os.rename(fn_tmp, fn)
165 if self.
s_json and self.s_json.have_docs():
183 self.
buf = collections.deque()
187 if not len(self.
buf):
190 elm = self.buf.popleft()
191 self.
size -= len(elm)
196 self.buf.append(rbuf)
197 self.
size += len(rbuf)
224 doc = json.loads(line)
225 self.docs.append(doc)
227 log(
"cannot deserialize json: %s" % line)
230 return self.docs.pop(0)
233 return len(self.
docs) > 0
236 self.buf.append(rbuf)
240 spl = all.split(
"\n")
263 log(
"closed fd %d" % self.
fd)
272 fd_map[desc.fd] = desc
273 p.register(desc.fd, select.POLLIN)
275 while len(fd_map) > 0:
276 events = p.poll(timeout)
281 for fd, ev
in events:
282 rbuf = os.read(fd, 1024)
294 if os.path.isdir(
"/tmp/dqm_monitoring"):
295 prefix =
"/tmp/dqm_monitoring"
297 base =
".es_monitoring_pid%08d" % os.getpid()
298 fn = os.path.join(prefix, base)
300 if os.path.exists(fn):
304 if not os.path.exists(fn):
305 log(
"Failed to create fifo file: %s" % fn)
308 atexit.register(os.unlink, fn)
314 mon_fd = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK)
323 os.open(fifo, os.O_WRONLY)
328 libc = ctypes.CDLL(
"libc.so.6")
330 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
332 log(
"Failed to setup PR_SET_PDEATHSIG.")
336 env[
"DQMMON_UPDATE_PIPE"] = fifo
338 p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
339 CURRENT_PROC.append(p)
341 mon_file = os.fdopen(mon_fd)
344 report_sink =
ElasticReport(pid=p.pid, history=s_hist, json=s_json, args=args)
346 stdout_cap =
DescriptorCapture(p.stdout, write_files=[sys.stdout, s_hist, report_sink, ], )
347 stderr_cap =
DescriptorCapture(p.stderr, write_files=[sys.stderr, s_hist, report_sink, ], )
350 fs = [stdout_cap, stderr_cap, stdmon_cap]
352 DescriptorCapture.event_loop(fs, timeout=1000, timeout_call=report_sink.flush)
353 except select.error, e:
356 log(
"Select error (we will terminate): " + str(e))
361 CURRENT_PROC.remove(p)
363 report_sink.update_doc({
"exit_code": r })
364 report_sink.make_report()
369 for proc
in CURRENT_PROC:
370 proc.send_signal(signum)
372 if __name__ ==
"__main__":
374 parser.add_argument(
"-t", type=int, default=
"2", help=
"Timeout in seconds.")
375 parser.add_argument(
"--debug",
"-d", action=
'store_true', default=
False, help=
"Enables debugging mode: es documents will have timestamp in the name.")
376 parser.add_argument(
"pargs", nargs=argparse.REMAINDER)
377 args = parser.parse_args()
380 signal.signal(signal.SIGINT, handle_signal)
381 signal.signal(signal.SIGTERM, handle_signal)
static std::string join(char **cmd)