5 import socket, fcntl, select, atexit, signal
6 import sys, os, time, datetime
11 sys.stderr.write(
"m: " + s +
"\n");
16 return time.mktime(dt.timetuple())
30 "hostname": socket.gethostname(),
32 "cmdline": args.pargs,
38 self.
id_format =
u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
39 self.
doc[
"type"] =
"dqm-source-state"
43 c = self.
doc[
"cmdline"]
46 t = os.path.basename(l)
47 t = t.replace(
".py",
"")
48 t = t.replace(
"_cfg",
"")
52 if len(pr) > 1
and pr[0] ==
"runNumber" and pr[1].isdigit():
64 for key, value
in new_obj.items():
65 if (old_obj.has_key(key)
and
66 isinstance(value, dict)
and
67 isinstance(old_obj[key], dict)):
77 while self.s_json.have_docs():
78 doc = self.s_json.get_doc()
81 for k
in [
"pid",
"run",
"lumi"]:
89 pid = int(self.
doc[
"pid"])
90 fn =
"/proc/%d/status" % pid
94 k, v = line.strip().
split(
":", 1)
95 d[k.strip()] = v.strip()
104 txt = self.s_history.read()
105 self.
update_doc({
'extra': {
'stdlog': txt } })
109 self.
doc[
"report_timestamp"] = time.time()
116 fn_id = self.
doc[
"_id"] +
".jsn"
119 tm =
"%.06f+" % time.time()
122 fn = os.path.join(
"/tmp/dqm_monitoring/", fn_id)
123 fn_tmp = os.path.join(
"/tmp/dqm_monitoring/", fn_id +
".tmp")
125 with open(fn_tmp,
"w")
as f:
126 json.dump(self.
doc, f, indent=
True)
128 os.rename(fn_tmp, fn)
136 if self.
s_json and self.s_json.have_docs():
153 self.
buf = collections.deque()
157 if not len(self.
buf):
160 elm = self.buf.popleft()
161 self.
size -= len(elm)
166 self.buf.append(rbuf)
167 self.
size += len(rbuf)
194 doc = json.loads(line)
195 self.docs.append(doc)
197 log(
"cannot deserialize json: %s" % line)
200 return self.docs.pop(0)
203 return len(self.
docs) > 0
206 self.buf.append(rbuf)
210 spl = all.split(
"\n")
233 log(
"closed fd %d" % self.
fd)
242 fd_map[desc.fd] = desc
243 p.register(desc.fd, select.POLLIN)
245 while len(fd_map) > 0:
246 events = p.poll(timeout)
251 for fd, ev
in events:
252 rbuf = os.read(fd, 1024)
264 if os.path.isdir(
"/tmp/dqm_monitoring"):
265 prefix =
"/tmp/dqm_monitoring"
267 base =
".es_monitoring_pid%08d" % os.getpid()
268 fn = os.path.join(prefix, base)
270 if os.path.exists(fn):
274 if not os.path.exists(fn):
275 log(
"Failed to create fifo file: %s" % fn)
278 atexit.register(os.unlink, fn)
284 mon_fd = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK)
293 os.open(fifo, os.O_WRONLY)
298 libc = ctypes.CDLL(
"libc.so.6")
300 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
302 log(
"Failed to setup PR_SET_PDEATHSIG.")
306 env[
"DQMMON_UPDATE_PIPE"] = fifo
308 p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
309 CURRENT_PROC.append(p)
311 mon_file = os.fdopen(mon_fd)
314 report_sink =
ElasticReport(pid=p.pid, history=s_hist, json=s_json, args=args)
316 stdout_cap =
DescriptorCapture(p.stdout, write_files=[sys.stdout, s_hist, report_sink, ], )
317 stderr_cap =
DescriptorCapture(p.stderr, write_files=[sys.stderr, s_hist, report_sink, ], )
320 fs = [stdout_cap, stderr_cap, stdmon_cap]
322 DescriptorCapture.event_loop(fs, timeout=1000, timeout_call=report_sink.flush)
323 except select.error, e:
326 log(
"Select error (we will terminate): " + str(e))
331 CURRENT_PROC.remove(p)
333 report_sink.update_doc({
"exit_code": r })
334 report_sink.make_report()
339 for proc
in CURRENT_PROC:
340 proc.send_signal(signum)
342 if __name__ ==
"__main__":
344 parser.add_argument(
"-t", type=int, default=
"2", help=
"Timeout in seconds.")
345 parser.add_argument(
"--debug",
"-d", action=
'store_true', default=
False, help=
"Enables debugging mode: es documents will have timestamp in the name.")
346 parser.add_argument(
"pargs", nargs=argparse.REMAINDER)
347 args = parser.parse_args()
350 signal.signal(signal.SIGINT, handle_signal)
351 signal.signal(signal.SIGTERM, handle_signal)
static std::string join(char **cmd)