15 log = logging.getLogger(__name__)
19 sys.path.append(
'/opt/hltd/python')
20 sys.path.append(
'/opt/hltd/lib')
22 global inotify, watcher, es_client
24 import _inotify
as inotify
26 import pyelasticsearch.client
as es_client
30 def __init__(self, top_path, rescan_timeout=30):
33 self.
es = es_client.ElasticSearch(
"http://127.0.0.1:9200")
37 os.makedirs(self.
path)
41 self.
mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO
42 self.
w = watcher.Watcher()
50 log.info(
"Deleting index: %s", self.
index_name)
54 log.info(
"Creating index: %s", self.
index_name)
59 "prefix-test-analyzer": {
61 "tokenizer":
"prefix-test-tokenizer"
65 "prefix-test-tokenizer": {
66 "type":
"path_hierarchy",
72 'number_of_shards' : 16,
73 'number_of_replicas' : 1
78 'dqm-source-state' : {
80 'type' : {
'type' :
'string' },
81 'pid' : {
'type' :
'integer' },
82 'hostname' : {
'type' :
'string' },
83 'sequence' : {
'type' :
'integer',
"index" :
"not_analyzed" },
84 'run' : {
'type' :
'integer' },
85 'lumi' : {
'type' :
'integer' },
87 '_timestamp' : {
'enabled' :
True,
'store' :
True, },
88 '_ttl' : {
'enabled' :
True,
'default' :
'15d' }
92 'type' : {
'type' :
'string' },
93 'pid' : {
'type' :
'integer' },
94 'hostname' : {
'type' :
'string' },
95 'sequence' : {
'type' :
'integer',
"index" :
"not_analyzed" },
97 '_timestamp' : {
'enabled' :
True,
'store' :
True, },
98 '_ttl' : {
'enabled' :
True,
'default' :
'15d' }
104 except es_client.IndexAlreadyExistsError:
105 logger.info(
"Index already exists.", exc_info=
True)
108 logger.warning(
"Cannot create index", exc_info=
True)
110 log.info(
"Created index: %s", self.
index_name)
113 log.info(
"Uploading: %s", fp)
117 document = json.load(f)
121 document = preprocess(document)
123 ret = self.es.index(self.
index_name, document[
"type"], document, id=document[
"_id"])
125 log.warning(
"Failure to upload the document: %s", fp, exc_info=
True)
128 fname = os.path.basename(fp)
130 if fname.startswith(
"."):
133 if not fname.endswith(
".jsn"):
140 for f
in os.listdir(self.
path):
141 fp = os.path.join(self.
path, f)
146 poll.register(self.
w, select.POLLIN)
150 for event
in self.w.read(bufsize=0):
163 files = os.listdir(directory)
170 date, seq = spl[0].
split(
".")
171 date, seq = datetime.datetime.fromtimestamp(long(date)), long(seq)
173 todo.append({
'date': date,
'seq': seq,
'f': os.path.join(directory, f)})
177 return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
179 todo.sort(key=
lambda x: (x[
"date"], x[
"seq"], ))
182 f[
"diff"] = ts(f[
"date"] - m) / scale
185 doc[
"tag"] = os.path.basename(doc[
"tag"])
188 start_time = datetime.datetime.now()
190 elapsed = ts(datetime.datetime.now() - start_time)
191 if todo[0][
"diff"] <= elapsed:
201 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
203 sock.bind(
'\0' + pname)
220 fl = open(logfile,
"a")
229 f = open(pidfile,
"w")
230 f.write(
"%d\n" % os.getpid())
233 if __name__ ==
"__main__":
236 do_foreground =
False
237 if len(sys.argv) > 1
and sys.argv[1] ==
"reindex":
241 if len(sys.argv) > 1
and sys.argv[1] ==
"playback":
245 if not do_foreground:
249 sys.stderr.write(
"Already running, exitting.\n")
253 daemonize(
"/var/log/fff_monitoring.log",
"/var/run/fff_monitoring.pid")
256 formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
257 flog_ch = logging.StreamHandler()
258 flog_ch.setLevel(logging.INFO)
259 flog_ch.setFormatter(formatter)
260 log.setLevel(logging.INFO)
261 log.addHandler(flog_ch)
264 log.info(
"Pid is %d", os.getpid())
269 top_path =
"/tmp/dqm_monitoring/",
272 if do_mode ==
"reindex":
273 service.recreate_index()
274 elif do_mode ==
"playback":
276 service.run_playback(sys.argv[2])