CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
esMonitoring.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import argparse
4 import subprocess
5 import socket, fcntl, select, atexit, signal
6 import sys, os, time, datetime
7 import collections
8 import json
9 
10 def log(s):
11  sys.stderr.write("m: " + s + "\n");
12  sys.stderr.flush()
13 
14 def dt2time(dt):
15  # convert datetime timstamp to unix
16  return time.mktime(dt.timetuple())
17 
19  def __init__(self, pid, history, json, args):
20  self.s_history = history
21  self.s_json = json
22  self.s_path = "/tmp/dqm_monitoring/"
23 
24  self.last_make_report = None
26  self.seq = 0
27  self.args = args
28 
29  self.doc = {
30  "pid": pid,
31  "hostname": socket.gethostname(),
32  "sequence": self.seq,
33  "cmdline": args.pargs,
34  }
35 
36  self.defaults()
37 
38  def defaults(self):
39  self.id_format = u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
40  self.doc["type"] = "dqm-source-state"
41  self.doc["run"] = 0
42 
43  # figure out the tag
44  c = self.doc["cmdline"]
45  for l in c:
46  if l.endswith(".py"):
47  t = os.path.basename(l)
48  t = t.replace(".py", "")
49  t = t.replace("_cfg", "")
50  self.doc["tag"] = t
51 
52  pr = l.split("=")
53  if len(pr) > 1 and pr[0] == "runNumber" and pr[1].isdigit():
54  run = long(pr[1])
55  self.doc["run"] = run
56 
57  self.make_id()
58  self.find_stdout()
59 
60  def find_stdout(self):
61  try:
62  self.doc["stdout_fn"] = os.readlink("/proc/self/fd/1")
63  self.doc["stderr_fn"] = os.readlink("/proc/self/fd/2")
64  except:
65  pass
66 
67 
68  def make_id(self):
69  id = self.id_format % self.doc
70  self.doc["_id"] = id
71  return id
72 
73  def update_doc_recursive(self, old_obj, new_obj):
74  for key, value in new_obj.items():
75  if (old_obj.has_key(key) and
76  isinstance(value, dict) and
77  isinstance(old_obj[key], dict)):
78 
79  self.update_doc_recursive(old_obj[key], value)
80  else:
81  old_obj[key] = value
82 
83  def update_doc(self, keys):
84  self.update_doc_recursive(self.doc, keys)
85 
86  def update_from_json(self):
87  while self.s_json.have_docs():
88  doc = self.s_json.get_doc()
89 
90  # convert some values to integers
91  for k in ["pid", "run", "lumi"]:
92  if doc.has_key(k):
93  doc[k] = int(doc[k])
94 
95  self.update_doc_recursive(self.doc, doc)
96 
97  def update_ps_status(self):
98  try:
99  pid = int(self.doc["pid"])
100  fn = "/proc/%d/status" % pid
101  f = open(fn, "r")
102  d = {}
103  for line in f:
104  k, v = line.strip().split(":", 1)
105  d[k.strip()] = v.strip()
106  f.close()
107 
108  self.update_doc({ 'extra': { 'ps_info': d } })
109  except:
110  pass
111 
112  def update_mem_status(self):
113  try:
114  key = str(time.time())
115 
116  pid = int(self.doc["pid"])
117  fn = "/proc/%d/statm" % pid
118  f = open(fn, "r")
119  dct = { key: f.read().strip() }
120  f.close()
121 
122  self.update_doc({ 'extra': { 'mem_info': dct } })
123  except:
124  pass
125 
126  def update_stdlog(self):
127  if self.s_history:
128  txt = self.s_history.read()
129  self.update_doc({ 'extra': { 'stdlog': txt } })
130 
131  def make_report(self):
132  self.last_make_report = time.time()
133  self.doc["report_timestamp"] = time.time()
134  self.update_from_json()
135  self.make_id()
136 
137  if not os.path.isdir(self.s_path):
138  # don't make a report if the directory is not available
139  return
140 
141  self.update_ps_status()
142  self.update_mem_status()
143  self.update_stdlog()
144 
145  fn_id = self.doc["_id"] + ".jsn"
146 
147  if args.debug:
148  tm = "%.06f+" % time.time()
149  fn_id = tm + fn_id
150 
151  fn = os.path.join(self.s_path, fn_id)
152  fn_tmp = os.path.join(self.s_path, fn_id + ".tmp")
153 
154  with open(fn_tmp, "w") as f:
155  json.dump(self.doc, f, indent=True)
156 
157  os.rename(fn_tmp, fn)
158 
159  def try_update(self):
160  # first time
161  if self.last_make_report is None:
162  return self.make_report()
163 
164  # if json stream has updates
165  if self.s_json and self.s_json.have_docs():
166  # just apply them, it still goes through timer
167  self.update_from_json()
168 
169  now = time.time()
170  delta = now - self.last_make_report
171  if delta > self.make_report_timer:
172  return self.make_report()
173 
174  def write(self, rbuf):
175  self.try_update()
176 
177  def flush(self):
178  self.try_update()
179 
181  def __init__(self, history_size=8*1024):
182  self.max_size = history_size
183  self.buf = collections.deque()
184  self.size = 0
185 
186  def pop(self):
187  if not len(self.buf):
188  return None
189 
190  elm = self.buf.popleft()
191  self.size -= len(elm)
192 
193  return elm
194 
195  def push(self, rbuf):
196  self.buf.append(rbuf)
197  self.size += len(rbuf)
198 
199  def write(self, rbuf):
200  l = len(rbuf)
201  while (self.size + l) >= self.max_size:
202  self.pop()
203 
204  self.push(rbuf)
205 
206  def read(self):
207  return "".join(self.buf)
208 
209  def flush(self):
210  pass
211 
212 class JsonInput(object):
213  def __init__(self):
214  self.buf = []
215  self.docs = []
216 
217  def parse_line(self, line):
218  if not line.strip():
219  # this is keep alive
220  # not yet implemented
221  return
222 
223  try:
224  doc = json.loads(line)
225  self.docs.append(doc)
226  except:
227  log("cannot deserialize json: %s" % line)
228 
229  def get_doc(self):
230  return self.docs.pop(0)
231 
232  def have_docs(self):
233  return len(self.docs) > 0
234 
235  def write(self, rbuf):
236  self.buf.append(rbuf)
237  if "\n" in rbuf:
238  # split whatever we have
239  all = "".join(self.buf)
240  spl = all.split("\n")
241 
242  while len(spl) > 1:
243  line = spl.pop(0)
244  self.parse_line(line)
245 
246  self.buf = [spl[0]]
247 
248  def flush(self):
249  pass
250 
251 class DescriptorCapture(object):
252  def __init__(self, f, write_files=[]):
253  self.f = f
254  self.fd = f.fileno()
255  self.write_files = write_files
256 
257  def read_in(self, rbuf):
258  for f in self.write_files:
259  f.write(rbuf)
260  f.flush()
261 
262  def close_in(self):
263  log("closed fd %d" % self.fd)
264  self.f.close()
265 
266  @staticmethod
267  def event_loop(desc, timeout, timeout_call=None):
268  fd_map = {}
269  p = select.poll()
270 
271  for desc in desc:
272  fd_map[desc.fd] = desc
273  p.register(desc.fd, select.POLLIN)
274 
275  while len(fd_map) > 0:
276  events = p.poll(timeout)
277  if len(events) == 0:
278  if timeout_call:
279  timeout_call()
280 
281  for fd, ev in events:
282  rbuf = os.read(fd, 1024)
283  if len(rbuf) == 0:
284  fd_map[fd].close_in()
285 
286  p.unregister(fd)
287  del fd_map[fd]
288  else:
289  fd_map[fd].read_in(rbuf)
290 
291 
293  prefix = "/tmp"
294  if os.path.isdir("/tmp/dqm_monitoring"):
295  prefix = "/tmp/dqm_monitoring"
296 
297  base = ".es_monitoring_pid%08d" % os.getpid()
298  fn = os.path.join(prefix, base)
299 
300  if os.path.exists(fn):
301  os.unlink(fn)
302 
303  os.mkfifo(fn, 0600)
304  if not os.path.exists(fn):
305  log("Failed to create fifo file: %s" % fn)
306  sys.exit(-1)
307 
308  atexit.register(os.unlink, fn)
309  return fn
310 
311 CURRENT_PROC = []
313  fifo = create_fifo()
314  mon_fd = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK)
315 
316  def preexec():
317  # this should only be open on a parent
318  os.close(mon_fd)
319 
320  # open fifo once (hack)
321  # so there is *always* at least one writter
322  # which closes with the executable
323  os.open(fifo, os.O_WRONLY)
324 
325  try:
326  # ensure the child dies if we are SIGKILLED
327  import ctypes
328  libc = ctypes.CDLL("libc.so.6")
329  PR_SET_PDEATHSIG = 1
330  libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
331  except:
332  log("Failed to setup PR_SET_PDEATHSIG.")
333  pass
334 
335  env = os.environ
336  env["DQMMON_UPDATE_PIPE"] = fifo
337 
338  p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
339  CURRENT_PROC.append(p)
340 
341  mon_file = os.fdopen(mon_fd)
342  s_hist = History()
343  s_json = JsonInput()
344  report_sink = ElasticReport(pid=p.pid, history=s_hist, json=s_json, args=args)
345 
346  stdout_cap = DescriptorCapture(p.stdout, write_files=[sys.stdout, s_hist, report_sink, ], )
347  stderr_cap = DescriptorCapture(p.stderr, write_files=[sys.stderr, s_hist, report_sink, ], )
348  stdmon_cap = DescriptorCapture(mon_file, write_files=[s_json, report_sink, ],)
349 
350  fs = [stdout_cap, stderr_cap, stdmon_cap]
351  try:
352  DescriptorCapture.event_loop(fs, timeout=1000, timeout_call=report_sink.flush)
353  except select.error, e:
354  # we have this on ctrl+c
355  # just terminate the child
356  log("Select error (we will terminate): " + str(e))
357  p.terminate()
358 
359  # at this point the program is dead
360  r = p.wait()
361  CURRENT_PROC.remove(p)
362 
363  report_sink.update_doc({ "exit_code": r })
364  report_sink.make_report()
365 
366  return r
367 
368 def handle_signal(signum, frame):
369  for proc in CURRENT_PROC:
370  proc.send_signal(signum)
371 
372 if __name__ == "__main__":
373  parser = argparse.ArgumentParser(description="Monitor a child process - produces elastic search documents.")
374  parser.add_argument("-t", type=int, default="2", help="Timeout in seconds.")
375  parser.add_argument("--debug", "-d", action='store_true', default=False, help="Enables debugging mode: es documents will have timestamp in the name.")
376  parser.add_argument("pargs", nargs=argparse.REMAINDER)
377  args = parser.parse_args()
378 
379  # do some signal magic
380  signal.signal(signal.SIGINT, handle_signal)
381  signal.signal(signal.SIGTERM, handle_signal)
382 
383  sys.exit(launch_monitoring(args))
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def launch_monitoring
list object
Definition: dbtoconf.py:77
double split
Definition: MVATrainer.cc:139