CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
fff_monitoring.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import os
4 import sys
5 import logging
6 import re
7 import datetime
8 import subprocess
9 import socket
10 import time
11 import select
12 import json
13 import datetime
14 
15 log = logging.getLogger(__name__)
16 
18  # minihack
19  sys.path.append('/opt/hltd/python')
20  sys.path.append('/opt/hltd/lib')
21 
22  global inotify, watcher, es_client
23 
24  import _inotify as inotify
25  import watcher
26  import pyelasticsearch.client as es_client
27 
28 
30  def __init__(self, top_path, rescan_timeout=30):
31  self.path = top_path
32  self.rescan_timeout = rescan_timeout
33  self.es = es_client.ElasticSearch("http://127.0.0.1:9200")
34  self.index_name = "dqm_online_monitoring"
35 
36  try:
37  os.makedirs(self.path)
38  except OSError:
39  pass
40 
41  self.mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO
42  self.w = watcher.Watcher()
43  self.w.add(self.path, self.mask)
44 
45  def recreate_index(self):
46  self.delete_index()
47  self.create_index()
48 
49  def delete_index(self):
50  log.info("Deleting index: %s", self.index_name)
51  self.es.delete_index(self.index_name)
52 
53  def create_index(self):
54  log.info("Creating index: %s", self.index_name)
55 
56  self.settings = {
57  "analysis": {
58  "analyzer": {
59  "prefix-test-analyzer": {
60  "type": "custom",
61  "tokenizer": "prefix-test-tokenizer"
62  }
63  },
64  "tokenizer": {
65  "prefix-test-tokenizer": {
66  "type": "path_hierarchy",
67  "delimiter": "_"
68  }
69  }
70  },
71  "index":{
72  'number_of_shards' : 16,
73  'number_of_replicas' : 1
74  }
75  }
76 
77  self.mappings = {
78  'dqm-source-state' : {
79  'properties' : {
80  'type' : {'type' : 'string' },
81  'pid' : { 'type' : 'integer' },
82  'hostname' : { 'type' : 'string' },
83  'sequence' : { 'type' : 'integer', "index" : "not_analyzed" },
84  'run' : { 'type' : 'integer' },
85  'lumi' : { 'type' : 'integer' },
86  },
87  '_timestamp' : { 'enabled' : True, 'store' : True, },
88  '_ttl' : { 'enabled' : True, 'default' : '15d' }
89  },
90  'dqm-diskspace' : {
91  'properties' : {
92  'type' : {'type' : 'string' },
93  'pid' : { 'type' : 'integer' },
94  'hostname' : { 'type' : 'string' },
95  'sequence' : { 'type' : 'integer', "index" : "not_analyzed" },
96  },
97  '_timestamp' : { 'enabled' : True, 'store' : True, },
98  '_ttl' : { 'enabled' : True, 'default' : '15d' }
99  },
100  }
101 
102  try:
103  self.es.create_index(self.index_name, settings={ 'settings': self.settings, 'mappings': self.mappings })
104  except es_client.IndexAlreadyExistsError:
105  logger.info("Index already exists.", exc_info=True)
106  pass
107  except:
108  logger.warning("Cannot create index", exc_info=True)
109 
110  log.info("Created index: %s", self.index_name)
111 
112  def upload_file(self, fp, preprocess=None):
113  log.info("Uploading: %s", fp)
114 
115  try:
116  f = open(fp, "r")
117  document = json.load(f)
118  f.close()
119 
120  if preprocess:
121  document = preprocess(document)
122 
123  ret = self.es.index(self.index_name, document["type"], document, id=document["_id"])
124  except:
125  log.warning("Failure to upload the document: %s", fp, exc_info=True)
126 
127  def process_file(self, fp):
128  fname = os.path.basename(fp)
129 
130  if fname.startswith("."):
131  return
132 
133  if not fname.endswith(".jsn"):
134  return
135 
136  self.upload_file(fp)
137  os.unlink(fp)
138 
139  def process_dir(self):
140  for f in os.listdir(self.path):
141  fp = os.path.join(self.path, f)
142  self.process_file(fp)
143 
144  def run(self):
145  poll = select.poll()
146  poll.register(self.w, select.POLLIN)
147  poll.poll(self.rescan_timeout*1000)
148 
149  # clear the events
150  for event in self.w.read(bufsize=0):
151  pass
152  #print event
153 
154  self.process_dir()
155 
156  def run_daemon(self):
157  self.process_dir()
158 
159  while True:
160  service.run()
161 
162  def run_playback(self, directory, scale=2):
163  files = os.listdir(directory)
164  todo = []
165  for f in files:
166  spl = f.split("+")
167  if len(spl) < 2:
168  continue
169 
170  date, seq = spl[0].split(".")
171  date, seq = datetime.datetime.fromtimestamp(long(date)), long(seq)
172 
173  todo.append({'date': date, 'seq': seq, 'f': os.path.join(directory, f)})
174 
175  def ts(td):
176  # because total_seconds() is missing in 2.6
177  return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
178 
179  todo.sort(key=lambda x: (x["date"], x["seq"], ))
180  m = todo[0]["date"]
181  for f in todo:
182  f["diff"] = ts(f["date"] - m) / scale
183 
184  def hotfix(doc):
185  doc["tag"] = os.path.basename(doc["tag"])
186  return doc
187 
188  start_time = datetime.datetime.now()
189  while todo:
190  elapsed = ts(datetime.datetime.now() - start_time)
191  if todo[0]["diff"] <= elapsed:
192  item = todo.pop(0)
193  self.upload_file(item["f"], preprocess=hotfix)
194  else:
195  time.sleep(0.2)
196 
197 # use a named socket check if we are running
198 # this is very clean and atomic and leave no files
199 # from: http://stackoverflow.com/a/7758075
200 def lock(pname):
201  sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
202  try:
203  sock.bind('\0' + pname)
204  return sock
205  except socket.error:
206  return None
207 
208 # same as in fff_deleter.py
209 def daemonize(logfile, pidfile):
210  # do the double fork
211  pid = os.fork()
212  if pid != 0:
213  sys.exit(0)
214 
215  os.setsid()
216  sys.stdin.close()
217  sys.stdout.close()
218  sys.stderr.close()
219 
220  fl = open(logfile, "a")
221  sys.stdout = fl
222  sys.stderr = fl
223 
224  pid = os.fork()
225  if pid != 0:
226  sys.exit(0)
227 
228  if pidfile:
229  f = open(pidfile, "w")
230  f.write("%d\n" % os.getpid())
231  f.close()
232 
233 if __name__ == "__main__":
234  do_mode = "daemon"
235 
236  do_foreground = False
237  if len(sys.argv) > 1 and sys.argv[1] == "reindex":
238  do_mode = "reindex"
239  do_foreground = True
240 
241  if len(sys.argv) > 1 and sys.argv[1] == "playback":
242  do_mode = "playback"
243  do_foreground = True
244 
245  if not do_foreground:
246  # try to take the lock or quit
247  sock = lock("fff_dqmmon")
248  if sock is None:
249  sys.stderr.write("Already running, exitting.\n")
250  sys.stderr.flush()
251  sys.exit(1)
252 
253  daemonize("/var/log/fff_monitoring.log", "/var/run/fff_monitoring.pid")
254 
255  # log to stderr (it might be redirected)
256  formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
257  flog_ch = logging.StreamHandler()
258  flog_ch.setLevel(logging.INFO)
259  flog_ch.setFormatter(formatter)
260  log.setLevel(logging.INFO)
261  log.addHandler(flog_ch)
262 
263  # write the pid file
264  log.info("Pid is %d", os.getpid())
265 
267 
268  service = DQMMonitor(
269  top_path = "/tmp/dqm_monitoring/",
270  )
271 
272  if do_mode == "reindex":
273  service.recreate_index()
274  elif do_mode == "playback":
275  #service.recreate_index()
276  service.run_playback(sys.argv[2])
277  else:
278  service.run_daemon()
list object
Definition: dbtoconf.py:77
double split
Definition: MVATrainer.cc:139