test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
esMonitoring.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import argparse
4 import subprocess
5 import socket, fcntl, select, atexit, signal
6 import sys, os, time, datetime
7 import collections
8 import json
9 
10 def log(s):
11  sys.stderr.write("m: " + s + "\n");
12  sys.stderr.flush()
13 
14 def dt2time(dt):
15  # convert datetime timstamp to unix
16  return time.mktime(dt.timetuple())
17 
19  def __init__(self, pid, history, json, args):
20  self.s_history = history
21  self.s_json = json
22 
23  self.last_make_report = None
25  self.seq = 0
26  self.args = args
27 
28  self.doc = {
29  "pid": pid,
30  "hostname": socket.gethostname(),
31  "sequence": self.seq,
32  "cmdline": args.pargs,
33  }
34 
35  self.defaults()
36 
37  def defaults(self):
38  self.id_format = u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
39  self.doc["type"] = "dqm-source-state"
40  self.doc["run"] = 0
41 
42  # figure out the tag
43  c = self.doc["cmdline"]
44  for l in c:
45  if l.endswith(".py"):
46  t = os.path.basename(l)
47  t = t.replace(".py", "")
48  t = t.replace("_cfg", "")
49  self.doc["tag"] = t
50 
51  pr = l.split("=")
52  if len(pr) > 1 and pr[0] == "runNumber" and pr[1].isdigit():
53  run = long(pr[1])
54  self.doc["run"] = run
55 
56  self.make_id()
57 
58  def make_id(self):
59  id = self.id_format % self.doc
60  self.doc["_id"] = id
61  return id
62 
63  def update_doc_recursive(self, old_obj, new_obj):
64  for key, value in new_obj.items():
65  if (old_obj.has_key(key) and
66  isinstance(value, dict) and
67  isinstance(old_obj[key], dict)):
68 
69  self.update_doc_recursive(old_obj[key], value)
70  else:
71  old_obj[key] = value
72 
73  def update_doc(self, keys):
74  self.update_doc_recursive(self.doc, keys)
75 
76  def update_from_json(self):
77  while self.s_json.have_docs():
78  doc = self.s_json.get_doc()
79 
80  # convert some values to integers
81  for k in ["pid", "run", "lumi"]:
82  if doc.has_key(k):
83  doc[k] = int(doc[k])
84 
85  self.update_doc_recursive(self.doc, doc)
86 
87  def update_ps_status(self):
88  try:
89  pid = int(self.doc["pid"])
90  fn = "/proc/%d/status" % pid
91  f = open(fn, "r")
92  d = {}
93  for line in f:
94  k, v = line.strip().split(":", 1)
95  d[k.strip()] = v.strip()
96  f.close()
97 
98  self.update_doc({ 'extra': { 'ps_info': d } })
99  except:
100  pass
101 
102  def update_stderr(self):
103  if self.s_history:
104  txt = self.s_history.read()
105  self.update_doc({ 'extra': { 'stdlog': txt } })
106 
107  def make_report(self):
108  self.last_make_report = time.time()
109  self.doc["report_timestamp"] = time.time()
110 
111  self.update_from_json()
112  self.make_id()
113  self.update_ps_status()
114  self.update_stderr()
115 
116  fn_id = self.doc["_id"] + ".jsn"
117 
118  if args.debug:
119  tm = "%.06f+" % time.time()
120  fn_id = tm + fn_id
121 
122  fn = os.path.join("/tmp/dqm_monitoring/", fn_id)
123  fn_tmp = os.path.join("/tmp/dqm_monitoring/", fn_id + ".tmp")
124 
125  with open(fn_tmp, "w") as f:
126  json.dump(self.doc, f, indent=True)
127 
128  os.rename(fn_tmp, fn)
129 
130  def try_update(self):
131  # first time
132  if self.last_make_report is None:
133  return self.make_report()
134 
135  # is json stream has updates
136  if self.s_json and self.s_json.have_docs():
137  return self.make_report()
138 
139  now = time.time()
140  delta = now - self.last_make_report
141  if delta > self.make_report_timer:
142  return self.make_report()
143 
144  def write(self, rbuf):
145  self.try_update()
146 
147  def flush(self):
148  self.try_update()
149 
151  def __init__(self, history_size=64*1024):
152  self.max_size = history_size
153  self.buf = collections.deque()
154  self.size = 0
155 
156  def pop(self):
157  if not len(self.buf):
158  return None
159 
160  elm = self.buf.popleft()
161  self.size -= len(elm)
162 
163  return elm
164 
165  def push(self, rbuf):
166  self.buf.append(rbuf)
167  self.size += len(rbuf)
168 
169  def write(self, rbuf):
170  l = len(rbuf)
171  while (self.size + l) >= self.max_size:
172  self.pop()
173 
174  self.push(rbuf)
175 
176  def read(self):
177  return "".join(self.buf)
178 
179  def flush(self):
180  pass
181 
182 class JsonInput(object):
183  def __init__(self):
184  self.buf = []
185  self.docs = []
186 
187  def parse_line(self, line):
188  if not line.strip():
189  # this is keep alive
190  # not yet implemented
191  return
192 
193  try:
194  doc = json.loads(line)
195  self.docs.append(doc)
196  except:
197  log("cannot deserialize json: %s" % line)
198 
199  def get_doc(self):
200  return self.docs.pop(0)
201 
202  def have_docs(self):
203  return len(self.docs) > 0
204 
205  def write(self, rbuf):
206  self.buf.append(rbuf)
207  if "\n" in rbuf:
208  # split whatever we have
209  all = "".join(self.buf)
210  spl = all.split("\n")
211 
212  while len(spl) > 1:
213  line = spl.pop(0)
214  self.parse_line(line)
215 
216  self.buf = [spl[0]]
217 
218  def flush(self):
219  pass
220 
221 class DescriptorCapture(object):
222  def __init__(self, f, write_files=[]):
223  self.f = f
224  self.fd = f.fileno()
225  self.write_files = write_files
226 
227  def read_in(self, rbuf):
228  for f in self.write_files:
229  f.write(rbuf)
230  f.flush()
231 
232  def close_in(self):
233  log("closed fd %d" % self.fd)
234  self.f.close()
235 
236  @staticmethod
237  def event_loop(desc, timeout, timeout_call=None):
238  fd_map = {}
239  p = select.poll()
240 
241  for desc in desc:
242  fd_map[desc.fd] = desc
243  p.register(desc.fd, select.POLLIN)
244 
245  while len(fd_map) > 0:
246  events = p.poll(timeout)
247  if len(events) == 0:
248  if timeout_call:
249  timeout_call()
250 
251  for fd, ev in events:
252  rbuf = os.read(fd, 1024)
253  if len(rbuf) == 0:
254  fd_map[fd].close_in()
255 
256  p.unregister(fd)
257  del fd_map[fd]
258  else:
259  fd_map[fd].read_in(rbuf)
260 
261 
263  prefix = "/tmp"
264  if os.path.isdir("/tmp/dqm_monitoring"):
265  prefix = "/tmp/dqm_monitoring"
266 
267  base = ".es_monitoring_pid%08d" % os.getpid()
268  fn = os.path.join(prefix, base)
269 
270  if os.path.exists(fn):
271  os.unlink(fn)
272 
273  os.mkfifo(fn, 0600)
274  if not os.path.exists(fn):
275  log("Failed to create fifo file: %s" % fn)
276  sys.exit(-1)
277 
278  atexit.register(os.unlink, fn)
279  return fn
280 
281 CURRENT_PROC = []
283  fifo = create_fifo()
284  mon_fd = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK)
285 
286  def preexec():
287  # this should only be open on a parent
288  os.close(mon_fd)
289 
290  # open fifo once (hack)
291  # so there is *always* at least one writter
292  # which closes with the executable
293  os.open(fifo, os.O_WRONLY)
294 
295  try:
296  # ensure the child dies if we are SIGKILLED
297  import ctypes
298  libc = ctypes.CDLL("libc.so.6")
299  PR_SET_PDEATHSIG = 1
300  libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
301  except:
302  log("Failed to setup PR_SET_PDEATHSIG.")
303  pass
304 
305  env = os.environ
306  env["DQMMON_UPDATE_PIPE"] = fifo
307 
308  p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
309  CURRENT_PROC.append(p)
310 
311  mon_file = os.fdopen(mon_fd)
312  s_hist = History()
313  s_json = JsonInput()
314  report_sink = ElasticReport(pid=p.pid, history=s_hist, json=s_json, args=args)
315 
316  stdout_cap = DescriptorCapture(p.stdout, write_files=[sys.stdout, s_hist, report_sink, ], )
317  stderr_cap = DescriptorCapture(p.stderr, write_files=[sys.stderr, s_hist, report_sink, ], )
318  stdmon_cap = DescriptorCapture(mon_file, write_files=[s_json, report_sink, ],)
319 
320  fs = [stdout_cap, stderr_cap, stdmon_cap]
321  try:
322  DescriptorCapture.event_loop(fs, timeout=1000, timeout_call=report_sink.flush)
323  except select.error, e:
324  # we have this on ctrl+c
325  # just terminate the child
326  log("Select error (we will terminate): " + str(e))
327  p.terminate()
328 
329  # at this point the program is dead
330  r = p.wait()
331  CURRENT_PROC.remove(p)
332 
333  report_sink.update_doc({ "exit_code": r })
334  report_sink.make_report()
335 
336  return r
337 
338 def handle_signal(signum, frame):
339  for proc in CURRENT_PROC:
340  proc.send_signal(signum)
341 
342 if __name__ == "__main__":
343  parser = argparse.ArgumentParser(description="Monitor a child process - produces elastic search documents.")
344  parser.add_argument("-t", type=int, default="2", help="Timeout in seconds.")
345  parser.add_argument("--debug", "-d", action='store_true', default=False, help="Enables debugging mode: es documents will have timestamp in the name.")
346  parser.add_argument("pargs", nargs=argparse.REMAINDER)
347  args = parser.parse_args()
348 
349  # do some signal magic
350  signal.signal(signal.SIGINT, handle_signal)
351  signal.signal(signal.SIGTERM, handle_signal)
352 
353  sys.exit(launch_monitoring(args))
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def launch_monitoring
list object
Definition: dbtoconf.py:77
double split
Definition: MVATrainer.cc:139