14 log = logging.getLogger(__name__)
18 sys.path.append(
'/opt/hltd/python')
19 sys.path.append(
'/opt/hltd/lib')
21 global inotify, watcher, es_client
23 import _inotify
as inotify
25 import pyelasticsearch.client
as es_client
29 def __init__(self, top_path, rescan_timeout=30):
32 self.
es = es_client.ElasticSearch(
"http://127.0.0.1:9200")
36 os.makedirs(self.
path)
40 self.
mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO
41 self.
w = watcher.Watcher()
49 log.info(
"Deleting index: %s", self.
index_name)
53 log.info(
"Creating index: %s", self.
index_name)
58 "prefix-test-analyzer": {
60 "tokenizer":
"prefix-test-tokenizer"
64 "prefix-test-tokenizer": {
65 "type":
"path_hierarchy",
71 'number_of_shards' : 16,
72 'number_of_replicas' : 1
77 'dqm-source-state' : {
79 'type' : {
'type' :
'string' },
80 'pid' : {
'type' :
'integer' },
81 'hostname' : {
'type' :
'string' },
82 'sequence' : {
'type' :
'integer',
"index" :
"not_analyzed" },
83 'run' : {
'type' :
'integer' },
84 'lumi' : {
'type' :
'integer' },
86 '_timestamp' : {
'enabled' :
True,
'store' :
True, },
87 '_ttl' : {
'enabled' :
True,
'default' :
'24h' }
91 'type' : {
'type' :
'string' },
92 'pid' : {
'type' :
'integer' },
93 'hostname' : {
'type' :
'string' },
94 'sequence' : {
'type' :
'integer',
"index" :
"not_analyzed" },
96 '_timestamp' : {
'enabled' :
True,
'store' :
True, },
97 '_ttl' : {
'enabled' :
True,
'default' :
'24h' }
103 except es_client.IndexAlreadyExistsError:
104 logger.info(
"Index already exists.", exc_info=
True)
107 logger.warning(
"Cannot create index", exc_info=
True)
109 log.info(
"Created index: %s", self.
index_name)
112 log.info(
"Uploading: %s", fp)
116 document = json.load(f)
119 ret = self.es.index(self.index_name, document["type"], document, id=document[
"_id"])
123 log.warning(
"Failure to upload the document: %s", fp, exc_info=
True)
126 fname = os.path.basename(fp)
128 if fname.startswith(
"."):
131 if not fname.endswith(
".jsn"):
138 for f
in os.listdir(self.
path):
139 fp = os.path.join(self.
path, f)
144 poll.register(self.
w, select.POLLIN)
148 for event
in self.w.read(bufsize=0):
164 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
166 sock.bind(
'\0' + pname)
183 fl = open(logfile,
"a")
192 f = open(pidfile,
"w")
193 f.write(
"%d\n" % os.getpid())
196 if __name__ ==
"__main__":
198 if len(sys.argv) > 1
and sys.argv[1] ==
"reindex":
204 sys.stderr.write(
"Already running, exitting.\n")
210 daemonize(
"/var/log/fff_monitoring.log",
"/var/run/fff_monitoring.pid")
213 formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
214 flog_ch = logging.StreamHandler()
215 flog_ch.setLevel(logging.INFO)
216 flog_ch.setFormatter(formatter)
217 log.setLevel(logging.INFO)
218 log.addHandler(flog_ch)
221 log.info(
"Pid is %d", os.getpid())
226 top_path =
"/tmp/dqm_monitoring/",
230 service.recreate_index()