CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
fff_monitoring.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import os
4 import sys
5 import logging
6 import re
7 import datetime
8 import subprocess
9 import socket
10 import time
11 import select
12 import json
13 
14 log = logging.getLogger(__name__)
15 
17  # minihack
18  sys.path.append('/opt/hltd/python')
19  sys.path.append('/opt/hltd/lib')
20 
21  global inotify, watcher, es_client
22 
23  import _inotify as inotify
24  import watcher
25  import pyelasticsearch.client as es_client
26 
27 
29  def __init__(self, top_path, rescan_timeout=30):
30  self.path = top_path
31  self.rescan_timeout = rescan_timeout
32  self.es = es_client.ElasticSearch("http://127.0.0.1:9200")
33  self.index_name = "dqm_online_monitoring"
34 
35  try:
36  os.makedirs(self.path)
37  except OSError:
38  pass
39 
40  self.mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO
41  self.w = watcher.Watcher()
42  self.w.add(self.path, self.mask)
43 
44  def recreate_index(self):
45  self.delete_index()
46  self.create_index()
47 
48  def delete_index(self):
49  log.info("Deleting index: %s", self.index_name)
50  self.es.delete_index(self.index_name)
51 
52  def create_index(self):
53  log.info("Creating index: %s", self.index_name)
54 
55  self.settings = {
56  "analysis": {
57  "analyzer": {
58  "prefix-test-analyzer": {
59  "type": "custom",
60  "tokenizer": "prefix-test-tokenizer"
61  }
62  },
63  "tokenizer": {
64  "prefix-test-tokenizer": {
65  "type": "path_hierarchy",
66  "delimiter": "_"
67  }
68  }
69  },
70  "index":{
71  'number_of_shards' : 16,
72  'number_of_replicas' : 1
73  }
74  }
75 
76  self.mappings = {
77  'dqm-source-state' : {
78  'properties' : {
79  'type' : {'type' : 'string' },
80  'pid' : { 'type' : 'integer' },
81  'hostname' : { 'type' : 'string' },
82  'sequence' : { 'type' : 'integer', "index" : "not_analyzed" },
83  'run' : { 'type' : 'integer' },
84  'lumi' : { 'type' : 'integer' },
85  },
86  '_timestamp' : { 'enabled' : True, 'store' : True, },
87  '_ttl' : { 'enabled' : True, 'default' : '24h' }
88  },
89  'dqm-diskspace' : {
90  'properties' : {
91  'type' : {'type' : 'string' },
92  'pid' : { 'type' : 'integer' },
93  'hostname' : { 'type' : 'string' },
94  'sequence' : { 'type' : 'integer', "index" : "not_analyzed" },
95  },
96  '_timestamp' : { 'enabled' : True, 'store' : True, },
97  '_ttl' : { 'enabled' : True, 'default' : '24h' }
98  },
99  }
100 
101  try:
102  self.es.create_index(self.index_name, settings={ 'settings': self.settings, 'mappings': self.mappings })
103  except es_client.IndexAlreadyExistsError:
104  logger.info("Index already exists.", exc_info=True)
105  pass
106  except:
107  logger.warning("Cannot create index", exc_info=True)
108 
109  log.info("Created index: %s", self.index_name)
110 
111  def upload_file(self, fp):
112  log.info("Uploading: %s", fp)
113 
114  try:
115  f = open(fp, "r")
116  document = json.load(f)
117  f.close()
118 
119  ret = self.es.index(self.index_name, document["type"], document, id=document["_id"])
120  print ret
121 
122  except:
123  log.warning("Failure to upload the document: %s", fp, exc_info=True)
124 
125  def process_file(self, fp):
126  fname = os.path.basename(fp)
127 
128  if fname.startswith("."):
129  return
130 
131  if not fname.endswith(".jsn"):
132  return
133 
134  self.upload_file(fp)
135  os.unlink(fp)
136 
137  def process_dir(self):
138  for f in os.listdir(self.path):
139  fp = os.path.join(self.path, f)
140  self.process_file(fp)
141 
142  def run(self):
143  poll = select.poll()
144  poll.register(self.w, select.POLLIN)
145  poll.poll(self.rescan_timeout*1000)
146 
147  # clear the events
148  for event in self.w.read(bufsize=0):
149  pass
150  #print event
151 
152  self.process_dir()
153 
154  def run_daemon(self):
155  self.process_dir()
156 
157  while True:
158  service.run()
159 
160 # use a named socket check if we are running
161 # this is very clean and atomic and leave no files
162 # from: http://stackoverflow.com/a/7758075
163 def lock(pname):
164  sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
165  try:
166  sock.bind('\0' + pname)
167  return sock
168  except socket.error:
169  return None
170 
171 # same as in fff_deleter.py
172 def daemonize(logfile, pidfile):
173  # do the double fork
174  pid = os.fork()
175  if pid != 0:
176  sys.exit(0)
177 
178  os.setsid()
179  sys.stdin.close()
180  sys.stdout.close()
181  sys.stderr.close()
182 
183  fl = open(logfile, "a")
184  sys.stdout = fl
185  sys.stderr = fl
186 
187  pid = os.fork()
188  if pid != 0:
189  sys.exit(0)
190 
191  if pidfile:
192  f = open(pidfile, "w")
193  f.write("%d\n" % os.getpid())
194  f.close()
195 
196 if __name__ == "__main__":
197  do_reindex = False
198  if len(sys.argv) > 1 and sys.argv[1] == "reindex":
199  do_reindex = True
200 
201  # try to take the lock or quit
202  sock = lock("fff_dqmmon")
203  if sock is None:
204  sys.stderr.write("Already running, exitting.\n")
205  sys.stderr.flush()
206  sys.exit(1)
207 
208  # setup logging
209  if not do_reindex:
210  daemonize("/var/log/fff_monitoring.log", "/var/run/fff_monitoring.pid")
211 
212  # log to stderr (it might be redirected)
213  formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
214  flog_ch = logging.StreamHandler()
215  flog_ch.setLevel(logging.INFO)
216  flog_ch.setFormatter(formatter)
217  log.setLevel(logging.INFO)
218  log.addHandler(flog_ch)
219 
220  # write the pid file
221  log.info("Pid is %d", os.getpid())
222 
224 
225  service = DQMMonitor(
226  top_path = "/tmp/dqm_monitoring/",
227  )
228 
229  if do_reindex:
230  service.recreate_index()
231  sys.exit(0)
232 
233  service.run_daemon()
list object
Definition: dbtoconf.py:77