CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
esMonitoring.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import argparse
4 import subprocess
5 import socket, fcntl, select, atexit, signal
6 import sys, os, time, datetime
7 import collections
8 import json
9 
10 def log(s):
11  sys.stderr.write("m: " + s + "\n");
12  sys.stderr.flush()
13 
14 def dt2time(dt):
15  # convert datetime timstamp to unix
16  return time.mktime(dt.timetuple())
17 
19  def __init__(self, pid, cmdline, history, json):
20  self.s_history = history
21  self.s_json = json
22 
23  self.last_make_report = None
25  self.seq = 0
26 
27  self.doc = {
28  "pid": pid,
29  "hostname": socket.gethostname(),
30  "sequence": self.seq,
31  "cmdline": cmdline,
32  }
33 
34  self.defaults()
35 
36  def defaults(self):
37  self.id_format = u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
38  self.doc["type"] = "dqm-source-state"
39  self.doc["run"] = 0
40 
41  # figure out the tag
42  c = self.doc["cmdline"]
43  for l in c:
44  if l.endswith(".py"):
45  l = os.path.basename(l)
46  l = l.replace(".py", "")
47  l = l.replace("_cfg", "")
48  self.doc["tag"] = l
49 
50  self.make_id()
51 
52  def make_id(self):
53  id = self.id_format % self.doc
54  self.doc["_id"] = id
55  return id
56 
57  def update_doc(self, keys):
58  for key, value in keys.items():
59  self.doc[key] = value
60 
61  def update_from_json(self):
62  while self.s_json.have_docs():
63  doc = self.s_json.get_doc()
64 
65  # convert some values to integers
66  for k in ["pid", "run", "lumi"]:
67  if doc.has_key(k):
68  doc[k] = int(doc[k])
69 
70  self.update_doc(doc)
71 
72  def update_ps_status(self):
73  try:
74  pid = int(self.doc["pid"])
75  fn = "/proc/%d/status" % pid
76  f = open(fn, "r")
77  d = {}
78  for line in f:
79  k, v = line.strip().split(":", 1)
80  d[k.strip()] = v.strip()
81  f.close()
82 
83  self.doc["ps_info"] = d
84  except:
85  pass
86 
87  def update_stderr(self):
88  if self.s_history:
89  self.doc["stderr"] = self.s_history.read()
90 
91  def make_report(self):
92  self.last_make_report = time.time()
93  self.doc["report_timestamp"] = time.time()
94 
95  self.update_from_json()
96  self.make_id()
97  self.update_ps_status()
98  self.update_stderr()
99  #print self.doc
100 
101  fn_id = self.doc["_id"] + ".jsn"
102  fn = os.path.join("/tmp/dqm_monitoring/", fn_id)
103  fn_tmp = os.path.join("/tmp/dqm_monitoring/", fn_id + ".tmp")
104 
105  with open(fn_tmp, "w") as f:
106  json.dump(self.doc, f, indent=True)
107 
108  os.rename(fn_tmp, fn)
109 
110  def try_update(self):
111  # first time
112  if self.last_make_report is None:
113  return self.make_report()
114 
115  # is json stream has updates
116  if self.s_json and self.s_json.have_docs():
117  return self.make_report()
118 
119  now = time.time()
120  delta = now - self.last_make_report
121  if delta > self.make_report_timer:
122  return self.make_report()
123 
124  def write(self, rbuf):
125  self.try_update()
126 
127  def flush(self):
128  self.try_update()
129 
131  def __init__(self, history_size=64*1024):
132  self.max_size = history_size
133  self.buf = collections.deque()
134  self.size = 0
135 
136  def pop(self):
137  if not len(self.buf):
138  return None
139 
140  elm = self.buf.popleft()
141  self.size -= len(elm)
142 
143  return elm
144 
145  def push(self, rbuf):
146  self.buf.append(rbuf)
147  self.size += len(rbuf)
148 
149  def write(self, rbuf):
150  l = len(rbuf)
151  while (self.size + l) >= self.max_size:
152  self.pop()
153 
154  self.push(rbuf)
155 
156  def read(self):
157  return "".join(self.buf)
158 
159  def flush(self):
160  pass
161 
162 class JsonInput(object):
163  def __init__(self):
164  self.buf = []
165  self.docs = []
166 
167  def parse_line(self, line):
168  if not line.strip():
169  # this is keep alive
170  # not yet implemented
171  return
172 
173  try:
174  doc = json.loads(line)
175  self.docs.append(doc)
176  except:
177  log("cannot deserialize json: %s" % line)
178 
179  def get_doc(self):
180  return self.docs.pop(0)
181 
182  def have_docs(self):
183  return len(self.docs) > 0
184 
185  def write(self, rbuf):
186  self.buf.append(rbuf)
187  if "\n" in rbuf:
188  # split whatever we have
189  all = "".join(self.buf)
190  spl = all.split("\n")
191 
192  while len(spl) > 1:
193  line = spl.pop(0)
194  self.parse_line(line)
195 
196  self.buf = [spl[0]]
197 
198  def flush(self):
199  pass
200 
201 class DescriptorCapture(object):
202  def __init__(self, f, write_files=[]):
203  self.f = f
204  self.fd = f.fileno()
205  self.write_files = write_files
206 
207  def read_in(self, rbuf):
208  for f in self.write_files:
209  f.write(rbuf)
210  f.flush()
211 
212  def close_in(self):
213  log("closed fd %d" % self.fd)
214  self.f.close()
215 
216  @staticmethod
217  def event_loop(desc, timeout, timeout_call=None):
218  fd_map = {}
219  p = select.poll()
220 
221  for desc in desc:
222  fd_map[desc.fd] = desc
223  p.register(desc.fd, select.POLLIN)
224 
225  while len(fd_map) > 0:
226  events = p.poll(timeout)
227  if len(events) == 0:
228  if timeout_call:
229  timeout_call()
230 
231  for fd, ev in events:
232  rbuf = os.read(fd, 1024)
233  if len(rbuf) == 0:
234  fd_map[fd].close_in()
235 
236  p.unregister(fd)
237  del fd_map[fd]
238  else:
239  fd_map[fd].read_in(rbuf)
240 
241 
243  prefix = "/tmp"
244  if os.path.isdir("/tmp/dqm_monitoring"):
245  prefix = "/tmp/dqm_monitoring"
246 
247  base = ".es_monitoring_pid%08d" % os.getpid()
248  fn = os.path.join(prefix, base)
249 
250  if os.path.exists(fn):
251  os.unlink(fn)
252 
253  os.mkfifo(fn, 0600)
254  if not os.path.exists(fn):
255  log("Failed to create fifo file: %s" % fn)
256  sys.exit(-1)
257 
258  atexit.register(os.unlink, fn)
259  return fn
260 
261 CURRENT_PROC = []
263  fifo = create_fifo()
264  mon_fd = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK)
265 
266  def preexec():
267  # this should only be open on a parent
268  os.close(mon_fd)
269 
270  # open fifo once (hack)
271  # so there is *always* at least one writter
272  # which closes with the executable
273  os.open(fifo, os.O_WRONLY)
274 
275  try:
276  # ensure the child dies if we are SIGKILLED
277  import ctypes
278  libc = ctypes.CDLL("libc.so.6")
279  PR_SET_PDEATHSIG = 1
280  libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
281  except:
282  log("Failed to setup PR_SET_PDEATHSIG.")
283  pass
284 
285  env = os.environ
286  env["DQMMON_UPDATE_PIPE"] = fifo
287 
288  p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
289  CURRENT_PROC.append(p)
290 
291  mon_file = os.fdopen(mon_fd)
292  s_hist = History()
293  s_json = JsonInput()
294  report_sink = ElasticReport(pid=p.pid, cmdline=args.pargs, history=s_hist, json=s_json)
295 
296  stdout_cap = DescriptorCapture(p.stdout, write_files=[sys.stdout, s_hist, report_sink, ], )
297  stderr_cap = DescriptorCapture(p.stderr, write_files=[sys.stderr, s_hist, report_sink, ], )
298  stdmon_cap = DescriptorCapture(mon_file, write_files=[s_json, report_sink, ],)
299 
300  fs = [stdout_cap, stderr_cap, stdmon_cap]
301  try:
302  DescriptorCapture.event_loop(fs, timeout=1000, timeout_call=report_sink.flush)
303  except select.error, e:
304  # we have this on ctrl+c
305  # just terminate the child
306  log("Select error (we will terminate): " + str(e))
307  p.terminate()
308 
309  # at this point the program is dead
310  r = p.wait()
311  CURRENT_PROC.remove(p)
312 
313  report_sink.update_doc({ "exit_code": r })
314  report_sink.make_report()
315 
316  return r
317 
318 def handle_signal(signum, frame):
319  for proc in CURRENT_PROC:
320  proc.send_signal(signum)
321 
322 if __name__ == "__main__":
323  parser = argparse.ArgumentParser(description="Kill/restart the child process if it doesn't out the required string.")
324  parser.add_argument("-t", type=int, default="2", help="Timeout in seconds.")
325  parser.add_argument("-s", type=int, default="2000", help="Signal to send.")
326  parser.add_argument("-r", "--restart", action="store_true", default=False, help="Restart the process after killing it.")
327  parser.add_argument("pargs", nargs=argparse.REMAINDER)
328  args = parser.parse_args()
329 
330  # do some signal magic
331  signal.signal(signal.SIGINT, handle_signal)
332  signal.signal(signal.SIGTERM, handle_signal)
333 
334  sys.exit(launch_monitoring(args))
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def launch_monitoring
list object
Definition: dbtoconf.py:77
double split
Definition: MVATrainer.cc:139