5 import socket, fcntl, select, atexit, signal, asyncore
6 import sys, os, time, datetime
12 sys.stderr.write(
"m: " + s +
"\n");
17 return time.mktime(dt.timetuple())
21 if hasattr(obj,
'to_json'):
24 return json.JSONEncoder.default(self, obj)
34 "hostname": socket.gethostname(),
36 "cmdline": args.pargs,
40 self.
id_format =
u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
41 self.
doc[
"type"] =
"dqm-source-state"
45 c = self.
doc[
"cmdline"]
48 t = os.path.basename(l)
49 t = t.replace(
".py",
"")
50 t = t.replace(
"_cfg",
"")
54 if len(pr) > 1
and pr[0] ==
"runNumber" and pr[1].isdigit():
64 self.
doc[
"stdout_fn"] = os.readlink(
"/proc/self/fd/1")
65 self.
doc[
"stderr_fn"] = os.readlink(
"/proc/self/fd/2")
70 "environ":
dict(os.environ)
79 for key, value
in new_obj.items():
80 if (key
in old_obj
and
81 isinstance(value, dict)
and
82 isinstance(old_obj[key], dict)):
93 pid = int(self.
doc[
"pid"])
94 fn =
"/proc/%d/status" % pid
98 k, v = line.strip().
split(
":", 1)
99 d[k.strip()] = v.strip()
102 self.
update_doc({
'extra': {
'ps_info': d } })
108 key = str(time.time())
110 pid = int(self.
doc[
"pid"])
111 fn =
"/proc/%d/statm" % pid
113 dct = { key: f.read().strip() }
116 self.update_doc({ 'extra': {
'mem_info': dct } })
122 self.
doc[
"report_timestamp"] = time.time()
125 m_path = self.args.path
127 if not os.path.isdir(m_path):
129 log(
"File not written, because report directory does not exist: %s." % m_path)
136 fn_id = self.
doc[
"_id"] +
".jsn"
138 fn = os.path.join(m_path, fn_id)
139 fn_tmp = os.path.join(m_path, fn_id +
".tmp")
141 with open(fn_tmp,
"w")
as f:
142 json.dump(self.
doc, f, indent=
True, cls=JsonEncoder)
144 os.rename(fn_tmp, fn)
147 log(
"File %s written." % fn)
160 def __init__(self, max_bytes=16*1024, max_lines=256):
164 self.
buf = collections.deque()
168 elm = self.buf.popleft()
169 self.
size -= len(elm)
172 self.buf.append(rbuf)
173 self.
size += len(rbuf)
176 line_size = len(line)
191 LineHistoryEnd.__init__(self, *kargs, **kwargs)
221 rbuf = self.recv(1024*16)
227 self.line_buf.append(rbuf)
245 class AsyncLineReaderTimeoutMixin(AsyncLineReaderMixin):
250 super(AsyncLineReaderTimeoutMixin, self).
__init__()
254 AsyncLineReaderMixin.handle_read(self)
259 self.handle_timeout()
261 return super(AsyncLineReaderTimeoutMixin, self).
readable()
265 AsyncLineReaderMixin.__init__(self)
266 asyncore.dispatcher.__init__(self, sock)
277 doc = json.loads(line)
279 for k
in [
"pid",
"run",
"lumi"]:
283 self.es.update_doc_recursive(self.es.doc, doc)
286 log(
"cannot deserialize json len: %d content: %s" % (len(line), line))
296 asyncore.dispatcher.__init__(self)
303 if os.path.isdir(self.args.path):
304 prefix = self.args.path
306 base =
".es_monitoring_pid%08d" % os.getpid()
307 self.
fn = os.path.join(prefix, base)
310 log(
"Socket path: %s" % self.
fn)
312 if os.path.exists(self.
fn):
315 self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
316 oldmask = os.umask(0o077)
327 if self.
fn is not None:
328 if os.path.exists(self.
fn):
341 def __init__(self, fd, es, zlog, close_socket=None):
342 AsyncLineReaderTimeoutMixin.__init__(self, 5)
343 asyncore.file_dispatcher.__init__(self, fd)
352 self.es.update_doc({
'extra': {
'stdlog_start': self.
start } })
353 self.es.update_doc({
'extra': {
'stdlog_end': self.
end } })
359 if self.
zlog is not None:
360 self.zlog.write(line)
362 sys.stdout.write(line)
365 self.start.write(line)
372 if self.
zlog is not None:
373 self.zlog.handle_timeout()
379 self.close_socket.handle_close()
382 if self.
zlog is not None:
391 env = os.environ.copy()
392 env[
"DQM2_SOCKET"] = json_handler.fn
398 libc = ctypes.CDLL(
"libc.so.6")
400 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
402 log(
"Failed to setup PR_SET_PDEATHSIG.")
405 p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=
True, env=env)
406 CURRENT_PROC.append(p)
411 relpath = os.path.dirname(__file__)
412 sys.path.append(relpath)
413 from ztee
import GZipLog
415 zlog_ = GZipLog(log_file=args.zlog)
416 es.update_doc({
"stdlog_gzip": args.zlog })
418 log(
"Open gzip log file: %s" % args.zlog)
420 except Exception
as e:
421 log(
"Failed to setup zlog file: " + str(e))
423 es.update_doc({
"pid": p.pid })
424 es.update_doc({
"monitoring_pid": os.getpid() })
425 es.update_doc({
"monitoring_socket": json_handler.fn })
429 log_handler =
FDOutputListener(fd=p.stdout.fileno(), es=es, zlog=zlog, close_socket=json_handler)
430 log_handler.handle_line(
"-- starting process: %s --\n" % str(args.pargs))
434 asyncore.loop(timeout=5)
435 except select.error
as e:
438 log(
"Select error (we will terminate): " + str(e))
443 log_handler.handle_line(
"\n-- process exit: %s --\n" % str(r))
446 es.update_doc({
"exit_code": r })
449 CURRENT_PROC.remove(p)
453 for proc
in CURRENT_PROC:
454 proc.send_signal(signum)
456 if __name__ ==
"__main__":
458 parser.add_argument(
'--debug',
'-d', action=
'store_true', help=
"Debug mode")
459 parser.add_argument(
'--zlog',
'-z', type=str, default=
None, help=
"Don't output anything, zip the log file (uses ztee.py).")
460 parser.add_argument(
'--path',
'-p', type=str, default=
"/tmp/dqm_monitoring/", help=
"Path for the monitoring output.")
461 parser.add_argument(
'pargs', nargs=argparse.REMAINDER)
462 args = parser.parse_args()
467 elif args.pargs[0] ==
"--":
469 args.pargs = args.pargs[1:]
472 signal.signal(signal.SIGINT, handle_signal)
473 signal.signal(signal.SIGTERM, handle_signal)
line_buf
not needed, since asyncore automatically handles close if len(rbuf) == 0: self.handle_close() return ...
static std::string join(char **cmd)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run