5 import socket, fcntl, select, atexit, signal
6 import sys, os, time, datetime
11 sys.stderr.write(
"m: " + s +
"\n");
16 return time.mktime(dt.timetuple())
19 def __init__(self, pid, cmdline, history, json):
29 "hostname": socket.gethostname(),
37 self.
id_format =
u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
38 self.
doc[
"type"] =
"dqm-source-state"
42 c = self.
doc[
"cmdline"]
45 l = os.path.basename(l)
46 l = l.replace(
".py",
"")
47 l = l.replace(
"_cfg",
"")
58 for key, value
in keys.items():
62 while self.s_json.have_docs():
63 doc = self.s_json.get_doc()
66 for k
in [
"pid",
"run",
"lumi"]:
74 pid = int(self.
doc[
"pid"])
75 fn =
"/proc/%d/status" % pid
79 k, v = line.strip().
split(
":", 1)
80 d[k.strip()] = v.strip()
83 self.
doc[
"ps_info"] = d
89 self.
doc[
"stderr"] = self.s_history.read()
93 self.
doc[
"report_timestamp"] = time.time()
101 fn_id = self.
doc[
"_id"] +
".jsn"
102 fn = os.path.join(
"/tmp/dqm_monitoring/", fn_id)
103 fn_tmp = os.path.join(
"/tmp/dqm_monitoring/", fn_id +
".tmp")
105 with open(fn_tmp,
"w")
as f:
106 json.dump(self.
doc, f, indent=
True)
108 os.rename(fn_tmp, fn)
116 if self.
s_json and self.s_json.have_docs():
133 self.
buf = collections.deque()
137 if not len(self.
buf):
140 elm = self.buf.popleft()
141 self.
size -= len(elm)
146 self.buf.append(rbuf)
147 self.
size += len(rbuf)
174 doc = json.loads(line)
175 self.docs.append(doc)
177 log(
"cannot deserialize json: %s" % line)
180 return self.docs.pop(0)
183 return len(self.
docs) > 0
186 self.buf.append(rbuf)
190 spl = all.split(
"\n")
213 log(
"closed fd %d" % self.
fd)
222 fd_map[desc.fd] = desc
223 p.register(desc.fd, select.POLLIN)
225 while len(fd_map) > 0:
226 events = p.poll(timeout)
231 for fd, ev
in events:
232 rbuf = os.read(fd, 1024)
244 if os.path.isdir(
"/tmp/dqm_monitoring"):
245 prefix =
"/tmp/dqm_monitoring"
247 base =
".es_monitoring_pid%08d" % os.getpid()
248 fn = os.path.join(prefix, base)
250 if os.path.exists(fn):
254 if not os.path.exists(fn):
255 log(
"Failed to create fifo file: %s" % fn)
258 atexit.register(os.unlink, fn)
264 mon_fd = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK)
273 os.open(fifo, os.O_WRONLY)
278 libc = ctypes.CDLL(
"libc.so.6")
280 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
282 log(
"Failed to setup PR_SET_PDEATHSIG.")
286 env[
"DQMMON_UPDATE_PIPE"] = fifo
288 p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
289 CURRENT_PROC.append(p)
291 mon_file = os.fdopen(mon_fd)
294 report_sink =
ElasticReport(pid=p.pid, cmdline=args.pargs, history=s_hist, json=s_json)
296 stdout_cap =
DescriptorCapture(p.stdout, write_files=[sys.stdout, s_hist, report_sink, ], )
297 stderr_cap =
DescriptorCapture(p.stderr, write_files=[sys.stderr, s_hist, report_sink, ], )
300 fs = [stdout_cap, stderr_cap, stdmon_cap]
302 DescriptorCapture.event_loop(fs, timeout=1000, timeout_call=report_sink.flush)
303 except select.error, e:
306 log(
"Select error (we will terminate): " + str(e))
311 CURRENT_PROC.remove(p)
313 report_sink.update_doc({
"exit_code": r })
314 report_sink.make_report()
319 for proc
in CURRENT_PROC:
320 proc.send_signal(signum)
322 if __name__ ==
"__main__":
324 parser.add_argument(
"-t", type=int, default=
"2", help=
"Timeout in seconds.")
325 parser.add_argument(
"-s", type=int, default=
"2000", help=
"Signal to send.")
326 parser.add_argument(
"-r",
"--restart", action=
"store_true", default=
False, help=
"Restart the process after killing it.")
327 parser.add_argument(
"pargs", nargs=argparse.REMAINDER)
328 args = parser.parse_args()
331 signal.signal(signal.SIGINT, handle_signal)
332 signal.signal(signal.SIGTERM, handle_signal)
static std::string join(char **cmd)