CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
fff_deleter.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import os
4 import logging
5 import re
6 import datetime
7 import subprocess
8 import socket
9 import time
10 import json
11 from StringIO import StringIO
12 
13 log = logging.getLogger(__name__)
14 
15 re_files = re.compile(r"^run(?P<run>\d+)/run(?P<runf>\d+)_ls(?P<ls>\d+)_.+\.(dat|raw)+(\.deleted)*")
17  m = re_files.match(rl)
18  if not m:
19  return None
20 
21  d = m.groupdict()
22  sort_key = (int(d["run"]), int(d["runf"]), int(d["ls"]), )
23  return sort_key
24 
25 def iterate(top, stopSize, action):
26  # entry format (path, size)
27  collected = []
28 
29  for root, dirs, files in os.walk(top, topdown=True):
30  for name in files:
31  fp = os.path.join(root, name)
32  rl = os.path.relpath(fp, top)
33 
34  sort_key = parse_file_name(rl)
35  if sort_key:
36  fsize = os.stat(fp).st_size
37  if fsize == 0:
38  continue
39 
40  sort_key = parse_file_name(rl)
41  collected.append((sort_key, fp, fsize, ))
42 
43  # for now just use simple sort
44  collected.sort(key=lambda x: x[0])
45 
46  # do the action
47  for sort_key, fp, fsize in collected:
48  if stopSize <= 0:
49  break
50 
51  action(fp)
52  stopSize = stopSize - fsize
53 
54 def cleanup_threshold(top, threshold, action, string):
55  st = os.statvfs(top)
56  total = st.f_blocks * st.f_frsize
57  used = total - (st.f_bavail * st.f_frsize)
58  threshold = used - float(total * threshold) / 100
59 
60  def p(x):
61  return float(x) * 100 / total
62 
63  log.info("Using %d (%.02f%%) of %d space, %d (%.02f%%) above %s threshold.",
64  used, p(used), total, threshold, p(threshold), string)
65 
66  if threshold > 0:
67  iterate(top, threshold, action)
68  log.info("Done cleaning up for %s threshold.", string)
69  else:
70  log.info("Threshold %s not reached, doing nothing.", string)
71 
72 def diskusage(top):
73  st = os.statvfs(top)
74  total = st.f_blocks * st.f_frsize
75  used = total - (st.f_bavail * st.f_frsize)
76  return float(used) * 100 / total
77 
78 def diskusage_bytes(top):
79  st = os.statvfs(top)
80  total = st.f_blocks * st.f_frsize
81  free = st.f_bavail * st.f_frsize
82  used = total - free
83 
84  return used, free, total
85 
87  def __init__(self, top, thresholds, email_to, report_directory, fake=True, ):
88  self.top = top
89  self.fake = fake
90  self.email_to = email_to
91  self.thresholds = thresholds
92  self.report_directory = report_directory
93  self.sequence = 0
94 
95  self.last_email = None
96  self.min_interval = datetime.timedelta(seconds=60*10)
97  self.hostname = socket.gethostname()
98 
99  def rename(self, f):
100  if f.endswith(".deleted"):
101  return
102 
103  fn = f + ".deleted"
104 
105  if self.fake:
106  log.warning("Renaming file (fake): %s -> %s", f,
107  os.path.relpath(fn, os.path.dirname(f)))
108  else:
109  log.warning("Renaming file: %s -> %s", f,
110  os.path.relpath(fn, os.path.dirname(f)))
111 
112  os.rename(f, fn)
113 
114  def delete(self, f):
115  if not f.endswith(".deleted"):
116  return
117 
118  if self.fake:
119  log.warning("Truncating file (fake): %s", f)
120  else:
121  log.warning("Truncating file: %s", f)
122  open(f, "w").close()
123 
124  def send_smg(self, used_pc):
125  now = datetime.datetime.now()
126 
127  if (self.last_email is not None):
128  if (now - self.last_email) < self.min_interval:
129  return
130 
131  self.last_email = now
132 
133  # sms service does not accept an email with a several recipients
134  # so we send one-by-one
135  for email in self.email_to:
136  subject = "Disk out of space (%.02f%%) on %s." % (used_pc, self.hostname)
137  if "mail2sms" in email:
138  text = ""
139  else:
140  text = subject
141 
142  log.info("Sending email: %s", repr(["/bin/mail", "-s", subject, email]))
143  p = subprocess.Popen(["/bin/mail", "-s", subject, email], stdin=subprocess.PIPE, shell=False)
144  p.communicate(input=text)
145 
146  def make_report(self, logout):
147  if not os.path.isdir(self.report_directory):
148  log.warning("Directory %s does not exists. Reports disabled.", self.report_directory)
149  return
150 
151  meminfo = list(open("/proc/meminfo", "r").readlines())
152  def entry_to_dict(line):
153  key, value = line.split()[:2]
154  value = int(value)
155  return (key.strip(":"), value, )
156  meminfo = dict(map(entry_to_dict, meminfo))
157 
158  if os.path.isdir(self.top):
159  used, free, total = diskusage_bytes(self.top)
160  else:
161  used, free, total = -1, -1, -1
162 
163  doc = {
164  "sequence": self.sequence,
165  "memory_used": (meminfo["MemTotal"] - meminfo["MemFree"]) * 1024,
166  "memory_free": meminfo["MemFree"] * 1024,
167  "memory_total": meminfo["MemTotal"] * 1024,
168  "disk_used": used,
169  "disk_free": free,
170  "disk_total": total,
171  "hostname": self.hostname,
172  "extra": {
173  "meminfo": meminfo,
174  "last_log": logout.split("\n"),
175  },
176  "pid": os.getpid(),
177  "_id": "dqm-diskspace-%s" % self.hostname,
178  "type": "dqm-diskspace"
179  }
180 
181  fn = "dqm-diskspace-seq%06d" % (doc["sequence"], )
182  tmp_fp = os.path.join(self.report_directory, "." + fn + ".tmp")
183  final_fp = os.path.join(self.report_directory, fn + ".jsn")
184  fd = open(tmp_fp, "w")
185 
186  json.dump(doc, fd, indent=True)
187  fd.write("\n")
188  fd.close()
189 
190  os.rename(tmp_fp, final_fp)
191 
192  def run(self):
193  self.sequence += 1
194  if not os.path.isdir(self.top):
195  log.warning("Directory %s does not exists.", self.top)
196  return
197 
198  cleanup_threshold(self.top, self.thresholds['rename'], self.rename, "rename")
199  cleanup_threshold(self.top, self.thresholds['delete'], self.delete, "delete")
200 
201  du = diskusage(self.top)
202  if du > self.thresholds['email']:
203  deleter.send_smg(du)
204 
205 # use a named socket check if we are running
206 # this is very clean and atomic and leave no files
207 # from: http://stackoverflow.com/a/7758075
208 def lock(pname):
209  sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
210  try:
211  sock.bind('\0' + pname)
212  return sock
213  except socket.error:
214  return None
215 
216 def daemon(deleter, log_capture, delay_seconds=30):
217  while True:
218  deleter.run()
219 
220  if log_capture:
221  log_out = log_capture.getvalue()
222  log_capture.truncate(0)
223  deleter.make_report(log_out)
224 
225  time.sleep(delay_seconds)
226 
227 def daemonize(logfile, pidfile):
228  # do the double fork
229  pid = os.fork()
230  if pid != 0:
231  sys.exit(0)
232 
233  os.setsid()
234  sys.stdin.close()
235  sys.stdout.close()
236  sys.stderr.close()
237 
238  fl = open(logfile, "a")
239  sys.stdout = fl
240  sys.stderr = fl
241 
242  pid = os.fork()
243  if pid != 0:
244  sys.exit(0)
245 
246  if pidfile:
247  f = open(pidfile, "w")
248  f.write("%d\n" % os.getpid())
249  f.close()
250 
251 import sys
252 if __name__ == "__main__":
253  #import argparse
254  #parser = argparse.ArgumentParser(description="Delete files if disk space usage reaches critical level.")
255  #parser.add_argument("-r", "--renameT", type=float, help="Percentage of total disk space used for file renaming.")
256  #parser.add_argument("-d", "--deleteT", type=float, help="Percentage of total disk space used for file deletion.")
257  #parser.add_argument("-t", "--top", type=str, help="Top level directory.", default="/fff/ramdisk/")
258  #args = parser.parse_args()
259 
260  # try to take the lock or quit
261  sock = lock("fff_deleter")
262  if sock is None:
263  sys.stderr.write("Already running, exitting.\n")
264  sys.stderr.flush()
265  sys.exit(1)
266 
267  # threshold rename and delete must be in order
268  # in other words, always: delete > rename
269  # this is because delete only deletes renamed files
270 
271  # email threshold has no restrictions
272  top = "/fff/ramdisk"
273  thresholds = {
274  'rename': 60,
275  'delete': 80,
276  'email': 90,
277  }
278 
279  deleter = FileDeleter(
280  top = top,
281  thresholds = thresholds,
282  # put "41XXXXXXXXX@mail2sms.cern.ch" to send the sms
283  email_to = [
284  "dmitrijus.bugelskis@cern.ch",
285  "atanas.batinkov@cern.ch",
286  "daniel.joseph.duggan@cern.ch",
287  ],
288  report_directory = "/tmp/dqm_monitoring/",
289  fake = False,
290  )
291 
292  # setup logging
293  formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
294 
295  log.setLevel(logging.INFO)
296  log_capture = None
297  log_capture = StringIO()
298  log_capture_ch = logging.StreamHandler(log_capture)
299  log_capture_ch.setLevel(logging.INFO)
300  log_capture_ch.setFormatter(formatter)
301  log.addHandler(log_capture_ch)
302 
303  if True: # run background
304  daemonize("/var/log/fff_deleter.log", "/var/run/fff_deleter.pid")
305 
306  # log to stderr (it might be redirected)
307  flog_ch = logging.StreamHandler()
308  flog_ch.setLevel(logging.INFO)
309  flog_ch.setFormatter(formatter)
310  log.addHandler(flog_ch)
311 
312  # write the pid file
313  log.info("Pid is %d", os.getpid())
314  daemon(deleter=deleter, log_capture=log_capture)
def diskusage_bytes
Definition: fff_deleter.py:78
def cleanup_threshold
Definition: fff_deleter.py:54
def parse_file_name
Definition: fff_deleter.py:16
list object
Definition: dbtoconf.py:77
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run