5 import socket, fcntl, select, atexit, signal, asyncore
6 import sys, os, time, datetime
12 sys.stderr.write(
"m: " + s +
"\n");
17 return time.mktime(dt.timetuple())
21 if hasattr(obj,
'to_json'):
23 return json.JSONEncoder.encode(self, x)
25 return json.JSONEncoder.default(self, obj)
37 "hostname": socket.gethostname(),
39 "cmdline": args.pargs,
43 self.
id_format =
u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
44 self.
doc[
"type"] =
"dqm-source-state"
48 c = self.
doc[
"cmdline"]
51 t = os.path.basename(l)
52 t = t.replace(
".py",
"")
53 t = t.replace(
"_cfg",
"")
57 if len(pr) > 1
and pr[0] ==
"runNumber" and pr[1].isdigit():
67 self.
doc[
"stdout_fn"] = os.readlink(
"/proc/self/fd/1")
68 self.
doc[
"stderr_fn"] = os.readlink(
"/proc/self/fd/2")
73 "environ":
dict(os.environ)
82 for key, value
in new_obj.items():
83 if (old_obj.has_key(key)
and
84 isinstance(value, dict)
and
85 isinstance(old_obj[key], dict)):
96 pid = int(self.
doc[
"pid"])
97 fn =
"/proc/%d/status" % pid
101 k, v = line.strip().
split(
":", 1)
102 d[k.strip()] = v.strip()
105 self.
update_doc({
'extra': {
'ps_info': d } })
111 key = str(time.time())
113 pid = int(self.
doc[
"pid"])
114 fn =
"/proc/%d/statm" % pid
116 dct = { key: f.read().strip() }
119 self.update_doc({ 'extra': {
'mem_info': dct } })
125 self.
doc[
"report_timestamp"] = time.time()
128 if not os.path.isdir(self.
s_path):
135 fn_id = self.
doc[
"_id"] +
".jsn"
137 fn = os.path.join(self.
s_path, fn_id)
138 fn_tmp = os.path.join(self.
s_path, fn_id +
".tmp")
140 with open(fn_tmp,
"w")
as f:
141 json.dump(self.
doc, f, indent=
True, cls=JsonEncoder)
143 os.rename(fn_tmp, fn)
146 log(
"File %s written." % fn)
159 def __init__(self, max_bytes=16*1024, max_lines=256):
163 self.
buf = collections.deque()
167 elm = self.buf.popleft()
168 self.
size -= len(elm)
171 self.buf.append(rbuf)
172 self.
size += len(rbuf)
175 line_size = len(line)
190 LineHistoryEnd.__init__(self, *kargs, **kwargs)
220 rbuf = self.recv(1024*16)
226 self.line_buf.append(rbuf)
244 class AsyncLineReaderTimeoutMixin(AsyncLineReaderMixin):
249 super(AsyncLineReaderTimeoutMixin, self).
__init__()
253 AsyncLineReaderMixin.handle_read(self)
258 self.handle_timeout()
260 return super(AsyncLineReaderTimeoutMixin, self).
readable()
264 AsyncLineReaderMixin.__init__(self)
265 asyncore.dispatcher.__init__(self, sock)
276 doc = json.loads(line)
278 for k
in [
"pid",
"run",
"lumi"]:
282 self.es.update_doc_recursive(self.es.doc, doc)
285 log(
"cannot deserialize json len: %d content: %s" % (len(line), line))
295 asyncore.dispatcher.__init__(self)
301 if os.path.isdir(
"/tmp/dqm_monitoring"):
302 prefix =
"/tmp/dqm_monitoring"
304 base =
".es_monitoring_pid%08d" % os.getpid()
305 self.
fn = os.path.join(prefix, base)
307 if os.path.exists(self.
fn):
310 self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
311 oldmask = os.umask(0077)
322 if self.
fn is not None:
323 if os.path.exists(self.
fn):
336 def __init__(self, fd, es, zlog, close_socket=None):
337 AsyncLineReaderTimeoutMixin.__init__(self, 5)
338 asyncore.file_dispatcher.__init__(self, fd)
347 self.es.update_doc({
'extra': {
'stdlog_start': self.
start } })
348 self.es.update_doc({
'extra': {
'stdlog_end': self.
end } })
354 if self.
zlog is not None:
355 self.zlog.write(line)
357 sys.stdout.write(line)
360 self.start.write(line)
367 if self.
zlog is not None:
368 self.zlog.handle_timeout()
374 self.close_socket.handle_close()
377 if self.
zlog is not None:
386 env = os.environ.copy()
387 env[
"DQM2_SOCKET"] = json_handler.fn
393 libc = ctypes.CDLL(
"libc.so.6")
395 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
397 log(
"Failed to setup PR_SET_PDEATHSIG.")
400 p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=
True, env=env)
401 CURRENT_PROC.append(p)
406 relpath = os.path.dirname(__file__)
407 sys.path.append(relpath)
408 from ztee
import GZipLog
410 zlog_ = GZipLog(log_file=args.zlog)
411 es.update_doc({
"stdlog_gzip": args.zlog })
413 log(
"Open gzip log file: %s" % args.zlog)
416 log(
"Failed to setup zlog file: " + str(e))
418 es.update_doc({
"pid": p.pid })
422 log_handler =
FDOutputListener(fd=p.stdout.fileno(), es=es, zlog=zlog, close_socket=json_handler)
423 log_handler.handle_line(
"-- starting process: %s --\n" % str(args.pargs))
427 asyncore.loop(timeout=5)
428 except select.error, e:
431 log(
"Select error (we will terminate): " + str(e))
436 log_handler.handle_line(
"\n-- process exit: %s --\n" % str(r))
439 es.update_doc({
"exit_code": r })
442 CURRENT_PROC.remove(p)
446 for proc
in CURRENT_PROC:
447 proc.send_signal(signum)
449 if __name__ ==
"__main__":
451 parser.add_argument(
'--debug',
'-d', action=
'store_true', help=
"Debug mode")
452 parser.add_argument(
'--zlog',
'-z', type=str, default=
None, help=
"Don't output anything, zip the log file (uses ztee.py).")
453 parser.add_argument(
'pargs', nargs=argparse.REMAINDER)
454 args = parser.parse_args()
459 elif args.pargs[0] ==
"--":
461 args.pargs = args.pargs[1:]
464 signal.signal(signal.SIGINT, handle_signal)
465 signal.signal(signal.SIGTERM, handle_signal)
line_buf
not needed, since asyncore automatically handles close if len(rbuf) == 0: self.handle_close() return ...
static std::string join(char **cmd)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run