CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
esMonitoring.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import argparse
4 import subprocess
5 import socket, fcntl, select, atexit, signal
6 import sys, os, time, datetime
7 import collections
8 import json
9 
10 def log(s):
11  sys.stderr.write("m: " + s + "\n");
12  sys.stderr.flush()
13 
14 def dt2time(dt):
15  # convert datetime timstamp to unix
16  return time.mktime(dt.timetuple())
17 
19  def __init__(self, pid, history, json, args):
20  self.s_history = history
21  self.s_json = json
22  self.s_path = "/tmp/dqm_monitoring/"
23 
24  self.last_make_report = None
26  self.seq = 0
27  self.args = args
28 
29  self.doc = {
30  "pid": pid,
31  "hostname": socket.gethostname(),
32  "sequence": self.seq,
33  "cmdline": args.pargs,
34  }
35 
36  self.defaults()
37 
38  def defaults(self):
39  self.id_format = u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
40  self.doc["type"] = "dqm-source-state"
41  self.doc["run"] = 0
42 
43  # figure out the tag
44  c = self.doc["cmdline"]
45  for l in c:
46  if l.endswith(".py"):
47  t = os.path.basename(l)
48  t = t.replace(".py", "")
49  t = t.replace("_cfg", "")
50  self.doc["tag"] = t
51 
52  pr = l.split("=")
53  if len(pr) > 1 and pr[0] == "runNumber" and pr[1].isdigit():
54  run = long(pr[1])
55  self.doc["run"] = run
56 
57  self.make_id()
58 
59  def make_id(self):
60  id = self.id_format % self.doc
61  self.doc["_id"] = id
62  return id
63 
64  def update_doc_recursive(self, old_obj, new_obj):
65  for key, value in new_obj.items():
66  if (old_obj.has_key(key) and
67  isinstance(value, dict) and
68  isinstance(old_obj[key], dict)):
69 
70  self.update_doc_recursive(old_obj[key], value)
71  else:
72  old_obj[key] = value
73 
74  def update_doc(self, keys):
75  self.update_doc_recursive(self.doc, keys)
76 
77  def update_from_json(self):
78  while self.s_json.have_docs():
79  doc = self.s_json.get_doc()
80 
81  # convert some values to integers
82  for k in ["pid", "run", "lumi"]:
83  if doc.has_key(k):
84  doc[k] = int(doc[k])
85 
86  self.update_doc_recursive(self.doc, doc)
87 
88  def update_ps_status(self):
89  try:
90  pid = int(self.doc["pid"])
91  fn = "/proc/%d/status" % pid
92  f = open(fn, "r")
93  d = {}
94  for line in f:
95  k, v = line.strip().split(":", 1)
96  d[k.strip()] = v.strip()
97  f.close()
98 
99  self.update_doc({ 'extra': { 'ps_info': d } })
100  except:
101  pass
102 
103  def update_stderr(self):
104  if self.s_history:
105  txt = self.s_history.read()
106  self.update_doc({ 'extra': { 'stdlog': txt } })
107 
108  def make_report(self):
109  self.last_make_report = time.time()
110  self.doc["report_timestamp"] = time.time()
111  self.update_from_json()
112  self.make_id()
113 
114  if not os.path.isdir(self.s_path):
115  # don't make a report if the directory is not available
116  return
117 
118  self.update_ps_status()
119  self.update_stderr()
120 
121  fn_id = self.doc["_id"] + ".jsn"
122 
123  if args.debug:
124  tm = "%.06f+" % time.time()
125  fn_id = tm + fn_id
126 
127  fn = os.path.join(self.s_path, fn_id)
128  fn_tmp = os.path.join(self.s_path, fn_id + ".tmp")
129 
130  with open(fn_tmp, "w") as f:
131  json.dump(self.doc, f, indent=True)
132 
133  os.rename(fn_tmp, fn)
134 
135  def try_update(self):
136  # first time
137  if self.last_make_report is None:
138  return self.make_report()
139 
140  # is json stream has updates
141  if self.s_json and self.s_json.have_docs():
142  return self.make_report()
143 
144  now = time.time()
145  delta = now - self.last_make_report
146  if delta > self.make_report_timer:
147  return self.make_report()
148 
149  def write(self, rbuf):
150  self.try_update()
151 
152  def flush(self):
153  self.try_update()
154 
156  def __init__(self, history_size=64*1024):
157  self.max_size = history_size
158  self.buf = collections.deque()
159  self.size = 0
160 
161  def pop(self):
162  if not len(self.buf):
163  return None
164 
165  elm = self.buf.popleft()
166  self.size -= len(elm)
167 
168  return elm
169 
170  def push(self, rbuf):
171  self.buf.append(rbuf)
172  self.size += len(rbuf)
173 
174  def write(self, rbuf):
175  l = len(rbuf)
176  while (self.size + l) >= self.max_size:
177  self.pop()
178 
179  self.push(rbuf)
180 
181  def read(self):
182  return "".join(self.buf)
183 
184  def flush(self):
185  pass
186 
187 class JsonInput(object):
188  def __init__(self):
189  self.buf = []
190  self.docs = []
191 
192  def parse_line(self, line):
193  if not line.strip():
194  # this is keep alive
195  # not yet implemented
196  return
197 
198  try:
199  doc = json.loads(line)
200  self.docs.append(doc)
201  except:
202  log("cannot deserialize json: %s" % line)
203 
204  def get_doc(self):
205  return self.docs.pop(0)
206 
207  def have_docs(self):
208  return len(self.docs) > 0
209 
210  def write(self, rbuf):
211  self.buf.append(rbuf)
212  if "\n" in rbuf:
213  # split whatever we have
214  all = "".join(self.buf)
215  spl = all.split("\n")
216 
217  while len(spl) > 1:
218  line = spl.pop(0)
219  self.parse_line(line)
220 
221  self.buf = [spl[0]]
222 
223  def flush(self):
224  pass
225 
226 class DescriptorCapture(object):
227  def __init__(self, f, write_files=[]):
228  self.f = f
229  self.fd = f.fileno()
230  self.write_files = write_files
231 
232  def read_in(self, rbuf):
233  for f in self.write_files:
234  f.write(rbuf)
235  f.flush()
236 
237  def close_in(self):
238  log("closed fd %d" % self.fd)
239  self.f.close()
240 
241  @staticmethod
242  def event_loop(desc, timeout, timeout_call=None):
243  fd_map = {}
244  p = select.poll()
245 
246  for desc in desc:
247  fd_map[desc.fd] = desc
248  p.register(desc.fd, select.POLLIN)
249 
250  while len(fd_map) > 0:
251  events = p.poll(timeout)
252  if len(events) == 0:
253  if timeout_call:
254  timeout_call()
255 
256  for fd, ev in events:
257  rbuf = os.read(fd, 1024)
258  if len(rbuf) == 0:
259  fd_map[fd].close_in()
260 
261  p.unregister(fd)
262  del fd_map[fd]
263  else:
264  fd_map[fd].read_in(rbuf)
265 
266 
268  prefix = "/tmp"
269  if os.path.isdir("/tmp/dqm_monitoring"):
270  prefix = "/tmp/dqm_monitoring"
271 
272  base = ".es_monitoring_pid%08d" % os.getpid()
273  fn = os.path.join(prefix, base)
274 
275  if os.path.exists(fn):
276  os.unlink(fn)
277 
278  os.mkfifo(fn, 0600)
279  if not os.path.exists(fn):
280  log("Failed to create fifo file: %s" % fn)
281  sys.exit(-1)
282 
283  atexit.register(os.unlink, fn)
284  return fn
285 
286 CURRENT_PROC = []
288  fifo = create_fifo()
289  mon_fd = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK)
290 
291  def preexec():
292  # this should only be open on a parent
293  os.close(mon_fd)
294 
295  # open fifo once (hack)
296  # so there is *always* at least one writter
297  # which closes with the executable
298  os.open(fifo, os.O_WRONLY)
299 
300  try:
301  # ensure the child dies if we are SIGKILLED
302  import ctypes
303  libc = ctypes.CDLL("libc.so.6")
304  PR_SET_PDEATHSIG = 1
305  libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
306  except:
307  log("Failed to setup PR_SET_PDEATHSIG.")
308  pass
309 
310  env = os.environ
311  env["DQMMON_UPDATE_PIPE"] = fifo
312 
313  p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
314  CURRENT_PROC.append(p)
315 
316  mon_file = os.fdopen(mon_fd)
317  s_hist = History()
318  s_json = JsonInput()
319  report_sink = ElasticReport(pid=p.pid, history=s_hist, json=s_json, args=args)
320 
321  stdout_cap = DescriptorCapture(p.stdout, write_files=[sys.stdout, s_hist, report_sink, ], )
322  stderr_cap = DescriptorCapture(p.stderr, write_files=[sys.stderr, s_hist, report_sink, ], )
323  stdmon_cap = DescriptorCapture(mon_file, write_files=[s_json, report_sink, ],)
324 
325  fs = [stdout_cap, stderr_cap, stdmon_cap]
326  try:
327  DescriptorCapture.event_loop(fs, timeout=1000, timeout_call=report_sink.flush)
328  except select.error, e:
329  # we have this on ctrl+c
330  # just terminate the child
331  log("Select error (we will terminate): " + str(e))
332  p.terminate()
333 
334  # at this point the program is dead
335  r = p.wait()
336  CURRENT_PROC.remove(p)
337 
338  report_sink.update_doc({ "exit_code": r })
339  report_sink.make_report()
340 
341  return r
342 
343 def handle_signal(signum, frame):
344  for proc in CURRENT_PROC:
345  proc.send_signal(signum)
346 
347 if __name__ == "__main__":
348  parser = argparse.ArgumentParser(description="Monitor a child process - produces elastic search documents.")
349  parser.add_argument("-t", type=int, default="2", help="Timeout in seconds.")
350  parser.add_argument("--debug", "-d", action='store_true', default=False, help="Enables debugging mode: es documents will have timestamp in the name.")
351  parser.add_argument("pargs", nargs=argparse.REMAINDER)
352  args = parser.parse_args()
353 
354  # do some signal magic
355  signal.signal(signal.SIGINT, handle_signal)
356  signal.signal(signal.SIGTERM, handle_signal)
357 
358  sys.exit(launch_monitoring(args))
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def launch_monitoring
list object
Definition: dbtoconf.py:77
double split
Definition: MVATrainer.cc:139