CMS 3D CMS Logo

das_client.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #-*- coding: utf-8 -*-
3 #pylint: disable=C0301,C0103,R0914,R0903
4 
5 """
6 DAS command line tool
7 """
8 from __future__ import print_function
9 __author__ = "Valentin Kuznetsov"
10 
11 # system modules
12 import os
13 import sys
14 import pwd
15 if sys.version_info < (2, 6):
16  raise Exception("DAS requires python 2.6 or greater")
17 
18 DAS_CLIENT = 'das-client/1.1::python/%s.%s' % sys.version_info[:2]
19 
20 import os
21 import re
22 import ssl
23 import time
24 import json
25 import urllib
26 import urllib2
27 import httplib
28 import cookielib
29 from optparse import OptionParser
30 from math import log
31 from types import GeneratorType
32 
33 # define exit codes according to Linux sysexists.h
34 EX_OK = 0 # successful termination
35 EX__BASE = 64 # base value for error messages
36 EX_USAGE = 64 # command line usage error
37 EX_DATAERR = 65 # data format error
38 EX_NOINPUT = 66 # cannot open input
39 EX_NOUSER = 67 # addressee unknown
40 EX_NOHOST = 68 # host name unknown
41 EX_UNAVAILABLE = 69 # service unavailable
42 EX_SOFTWARE = 70 # internal software error
43 EX_OSERR = 71 # system error (e.g., can't fork)
44 EX_OSFILE = 72 # critical OS file missing
45 EX_CANTCREAT = 73 # can't create (user) output file
46 EX_IOERR = 74 # input/output error
47 EX_TEMPFAIL = 75 # temp failure; user is invited to retry
48 EX_PROTOCOL = 76 # remote error in protocol
49 EX_NOPERM = 77 # permission denied
50 EX_CONFIG = 78 # configuration error
51 
52 class HTTPSClientAuthHandler(urllib2.HTTPSHandler):
53  """
54  Simple HTTPS client authentication class based on provided
55  key/ca information
56  """
57  def __init__(self, key=None, cert=None, capath=None, level=0):
58  if level > 1:
59  urllib2.HTTPSHandler.__init__(self, debuglevel=1)
60  else:
61  urllib2.HTTPSHandler.__init__(self)
62  self.key = key
63  self.cert = cert
64  self.capath = capath
65 
66  def https_open(self, req):
67  """Open request method"""
68  #Rather than pass in a reference to a connection class, we pass in
69  # a reference to a function which, for all intents and purposes,
70  # will behave as a constructor
71  return self.do_open(self.get_connection, req)
72 
73  def get_connection(self, host, timeout=300):
74  """Connection method"""
75  if self.key and self.cert and not self.capath:
76  return httplib.HTTPSConnection(host, key_file=self.key,
77  cert_file=self.cert)
78  elif self.cert and self.capath:
79  context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
80  context.load_verify_locations(capath=self.capath)
81  context.load_cert_chain(self.cert)
82  return httplib.HTTPSConnection(host, context=context)
83  return httplib.HTTPSConnection(host)
84 
85 def x509():
86  "Helper function to get x509 either from env or tmp file"
87  proxy = os.environ.get('X509_USER_PROXY', '')
88  if not proxy:
89  proxy = '/tmp/x509up_u%s' % pwd.getpwuid( os.getuid() ).pw_uid
90  if not os.path.isfile(proxy):
91  return ''
92  return proxy
93 
95  "Check glideine environment and exit if it is set"
96  glidein = os.environ.get('GLIDEIN_CMSSite', '')
97  if glidein:
98  msg = "ERROR: das_client is running from GLIDEIN environment, it is prohibited"
99  print(msg)
100  sys.exit(EX__BASE)
101 
102 def check_auth(key):
103  "Check if user runs das_client with key/cert and warn users to switch"
104  if not key:
105  msg = "WARNING: das_client is running without user credentials/X509 proxy, create proxy via 'voms-proxy-init -voms cms -rfc'"
106  print(msg, file=sys.stderr)
107 
109  """
110  DAS cache client option parser
111  """
112  def __init__(self):
113  usage = "Usage: %prog [options]\n"
114  usage += "For more help please visit https://cmsweb.cern.ch/das/faq"
115  self.parser = OptionParser(usage=usage)
116  self.parser.add_option("-v", "--verbose", action="store",
117  type="int", default=0, dest="verbose",
118  help="verbose output")
119  self.parser.add_option("--query", action="store", type="string",
120  default=False, dest="query",
121  help="specify query for your request")
122  msg = "host name of DAS cache server, default is https://cmsweb.cern.ch"
123  self.parser.add_option("--host", action="store", type="string",
124  default='https://cmsweb.cern.ch', dest="host", help=msg)
125  msg = "start index for returned result set, aka pagination,"
126  msg += " use w/ limit (default is 0)"
127  self.parser.add_option("--idx", action="store", type="int",
128  default=0, dest="idx", help=msg)
129  msg = "number of returned results (default is 10),"
130  msg += " use --limit=0 to show all results"
131  self.parser.add_option("--limit", action="store", type="int",
132  default=10, dest="limit", help=msg)
133  msg = 'specify return data format (json or plain), default plain.'
134  self.parser.add_option("--format", action="store", type="string",
135  default="plain", dest="format", help=msg)
136  msg = 'query waiting threshold in sec, default is 5 minutes'
137  self.parser.add_option("--threshold", action="store", type="int",
138  default=300, dest="threshold", help=msg)
139  msg = 'specify private key file name, default $X509_USER_PROXY'
140  self.parser.add_option("--key", action="store", type="string",
141  default=x509(), dest="ckey", help=msg)
142  msg = 'specify private certificate file name, default $X509_USER_PROXY'
143  self.parser.add_option("--cert", action="store", type="string",
144  default=x509(), dest="cert", help=msg)
145  msg = 'specify CA path, default $X509_CERT_DIR'
146  self.parser.add_option("--capath", action="store", type="string",
147  default=os.environ.get("X509_CERT_DIR", ""),
148  dest="capath", help=msg)
149  msg = 'specify number of retries upon busy DAS server message'
150  self.parser.add_option("--retry", action="store", type="string",
151  default=0, dest="retry", help=msg)
152  msg = 'show DAS headers in JSON format'
153  msg += ' (obsolete, keep for backward compatibility)'
154  self.parser.add_option("--das-headers", action="store_true",
155  default=False, dest="das_headers", help=msg)
156  msg = 'specify power base for size_format, default is 10 (can be 2)'
157  self.parser.add_option("--base", action="store", type="int",
158  default=0, dest="base", help=msg)
159 
160  msg = 'a file which contains a cached json dictionary for query -> files mapping'
161  self.parser.add_option("--cache", action="store", type="string",
162  default=None, dest="cache", help=msg)
163 
164  msg = 'a query cache value'
165  self.parser.add_option("--query-cache", action="store", type="int",
166  default=0, dest="qcache", help=msg)
167  msg = 'List DAS key/attributes, use "all" or specific DAS key value, e.g. site'
168  self.parser.add_option("--list-attributes", action="store", type="string",
169  default="", dest="keys_attrs", help=msg)
170  def get_opt(self):
171  """
172  Returns parse list of options
173  """
174  return self.parser.parse_args()
175 
176 def convert_time(val):
177  "Convert given timestamp into human readable format"
178  if isinstance(val, int) or isinstance(val, float):
179  return time.strftime('%d/%b/%Y_%H:%M:%S_GMT', time.gmtime(val))
180  return val
181 
182 def size_format(uinput, ibase=0):
183  """
184  Format file size utility, it converts file size into KB, MB, GB, TB, PB units
185  """
186  if not ibase:
187  return uinput
188  try:
189  num = float(uinput)
190  except Exception as _exc:
191  return uinput
192  if ibase == 2.: # power of 2
193  base = 1024.
194  xlist = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']
195  else: # default base is 10
196  base = 1000.
197  xlist = ['', 'KB', 'MB', 'GB', 'TB', 'PB']
198  for xxx in xlist:
199  if num < base:
200  return "%3.1f%s" % (num, xxx)
201  num /= base
202 
203 def unique_filter(rows):
204  """
205  Unique filter drop duplicate rows.
206  """
207  old_row = {}
208  row = None
209  for row in rows:
210  row_data = dict(row)
211  try:
212  del row_data['_id']
213  del row_data['das']
214  del row_data['das_id']
215  del row_data['cache_id']
216  except:
217  pass
218  old_data = dict(old_row)
219  try:
220  del old_data['_id']
221  del old_data['das']
222  del old_data['das_id']
223  del old_data['cache_id']
224  except:
225  pass
226  if row_data == old_data:
227  continue
228  if old_row:
229  yield old_row
230  old_row = row
231  yield row
232 
233 def extract_value(row, key, base=10):
234  """Generator which extracts row[key] value"""
235  if isinstance(row, dict) and key in row:
236  if key == 'creation_time':
237  row = convert_time(row[key])
238  elif key == 'size':
239  row = size_format(row[key], base)
240  else:
241  row = row[key]
242  yield row
243  if isinstance(row, list) or isinstance(row, GeneratorType):
244  for item in row:
245  for vvv in extract_value(item, key, base):
246  yield vvv
247 
248 def get_value(data, filters, base=10):
249  """Filter data from a row for given list of filters"""
250  for ftr in filters:
251  if ftr.find('>') != -1 or ftr.find('<') != -1 or ftr.find('=') != -1:
252  continue
253  row = dict(data)
254  values = []
255  keys = ftr.split('.')
256  for key in keys:
257  val = [v for v in extract_value(row, key, base)]
258  if key == keys[-1]: # we collect all values at last key
259  values += [json.dumps(i) for i in val]
260  else:
261  row = val
262  if len(values) == 1:
263  yield values[0]
264  else:
265  yield values
266 
267 def fullpath(path):
268  "Expand path to full path"
269  if path and path[0] == '~':
270  path = path.replace('~', '')
271  path = path[1:] if path[0] == '/' else path
272  path = os.path.join(os.environ['HOME'], path)
273  return path
274 
275 def get_data(host, query, idx, limit, debug, threshold=300, ckey=None,
276  cert=None, capath=None, qcache=0, das_headers=True):
277  """Contact DAS server and retrieve data for given DAS query"""
278  params = {'input':query, 'idx':idx, 'limit':limit}
279  if qcache:
280  params['qcache'] = qcache
281  path = '/das/cache'
282  pat = re.compile('http[s]{0,1}://')
283  if not pat.match(host):
284  msg = 'Invalid hostname: %s' % host
285  raise Exception(msg)
286  url = host + path
287  client = '%s (%s)' % (DAS_CLIENT, os.environ.get('USER', ''))
288  headers = {"Accept": "application/json", "User-Agent": client}
289  encoded_data = urllib.urlencode(params, doseq=True)
290  url += '?%s' % encoded_data
291  req = urllib2.Request(url=url, headers=headers)
292  if ckey and cert:
293  ckey = fullpath(ckey)
294  cert = fullpath(cert)
295  http_hdlr = HTTPSClientAuthHandler(ckey, cert, capath, debug)
296  elif cert and capath:
297  cert = fullpath(cert)
298  http_hdlr = HTTPSClientAuthHandler(ckey, cert, capath, debug)
299  else:
300  http_hdlr = urllib2.HTTPHandler(debuglevel=debug)
301  proxy_handler = urllib2.ProxyHandler({})
302  cookie_jar = cookielib.CookieJar()
303  cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
304  try:
305  opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
306  fdesc = opener.open(req)
307  data = fdesc.read()
308  fdesc.close()
309  except urllib2.HTTPError as error:
310  print(error.read())
311  sys.exit(1)
312 
313  pat = re.compile(r'^[a-z0-9]{32}')
314  if data and isinstance(data, str) and pat.match(data) and len(data) == 32:
315  pid = data
316  else:
317  pid = None
318  iwtime = 2 # initial waiting time in seconds
319  wtime = 20 # final waiting time in seconds
320  sleep = iwtime
321  time0 = time.time()
322  while pid:
323  params.update({'pid':data})
324  encoded_data = urllib.urlencode(params, doseq=True)
325  url = host + path + '?%s' % encoded_data
326  req = urllib2.Request(url=url, headers=headers)
327  try:
328  fdesc = opener.open(req)
329  data = fdesc.read()
330  fdesc.close()
331  except urllib2.HTTPError as err:
332  return {"status":"fail", "reason":str(err)}
333  if data and isinstance(data, str) and pat.match(data) and len(data) == 32:
334  pid = data
335  else:
336  pid = None
337  time.sleep(sleep)
338  if sleep < wtime:
339  sleep *= 2
340  elif sleep == wtime:
341  sleep = iwtime # start new cycle
342  else:
343  sleep = wtime
344  if (time.time()-time0) > threshold:
345  reason = "client timeout after %s sec" % int(time.time()-time0)
346  return {"status":"fail", "reason":reason}
347  jsondict = json.loads(data)
348  return jsondict
349 
350 def prim_value(row):
351  """Extract primary key value from DAS record"""
352  prim_key = row['das']['primary_key']
353  if prim_key == 'summary':
354  return row.get(prim_key, None)
355  key, att = prim_key.split('.')
356  if isinstance(row[key], list):
357  for item in row[key]:
358  if att in item:
359  return item[att]
360  else:
361  if key in row:
362  if att in row[key]:
363  return row[key][att]
364 
365 def print_summary(rec):
366  "Print summary record information on stdout"
367  if 'summary' not in rec:
368  msg = 'Summary information is not found in record:\n', rec
369  raise Exception(msg)
370  for row in rec['summary']:
371  keys = [k for k in row.keys()]
372  maxlen = max([len(k) for k in keys])
373  for key, val in row.items():
374  pkey = '%s%s' % (key, ' '*(maxlen-len(key)))
375  print('%s: %s' % (pkey, val))
376  print()
377 
378 def print_from_cache(cache, query):
379  "print the list of files reading it from cache"
380  data = open(cache).read()
381  jsondict = json.loads(data)
382  if query in jsondict:
383  print("\n".join(jsondict[query]))
384  exit(0)
385  exit(1)
386 
387 def keys_attrs(lkey, oformat, host, ckey, cert, debug=0):
388  "Contact host for list of key/attributes pairs"
389  url = '%s/das/keys?view=json' % host
390  headers = {"Accept": "application/json", "User-Agent": DAS_CLIENT}
391  req = urllib2.Request(url=url, headers=headers)
392  if ckey and cert:
393  ckey = fullpath(ckey)
394  cert = fullpath(cert)
395  http_hdlr = HTTPSClientAuthHandler(ckey, cert, debug)
396  else:
397  http_hdlr = urllib2.HTTPHandler(debuglevel=debug)
398  proxy_handler = urllib2.ProxyHandler({})
399  cookie_jar = cookielib.CookieJar()
400  cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
401  opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
402  fdesc = opener.open(req)
403  data = json.load(fdesc)
404  fdesc.close()
405  if oformat.lower() == 'json':
406  if lkey == 'all':
407  print(json.dumps(data))
408  else:
409  print(json.dumps({lkey:data[lkey]}))
410  return
411  for key, vdict in data.items():
412  if lkey == 'all':
413  pass
414  elif lkey != key:
415  continue
416  print()
417  print("DAS key:", key)
418  for attr, examples in vdict.items():
419  prefix = ' '
420  print('%s%s' % (prefix, attr))
421  for item in examples:
422  print('%s%s%s' % (prefix, prefix, item))
423 
424 def main():
425  """Main function"""
426  optmgr = DASOptionParser()
427  opts, _ = optmgr.get_opt()
428  host = opts.host
429  debug = opts.verbose
430  query = opts.query
431  idx = opts.idx
432  limit = opts.limit
433  thr = opts.threshold
434  ckey = opts.ckey
435  cert = opts.cert
436  capath = opts.capath
437  base = opts.base
438  qcache = opts.qcache
439  check_glidein()
440  check_auth(ckey)
441  if opts.keys_attrs:
442  keys_attrs(opts.keys_attrs, opts.format, host, ckey, cert, debug)
443  return
444  if not query:
445  print('Input query is missing')
446  sys.exit(EX_USAGE)
447  if opts.format == 'plain':
448  jsondict = get_data(host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
449  cli_msg = jsondict.get('client_message', None)
450  if cli_msg:
451  print("DAS CLIENT WARNING: %s" % cli_msg)
452  if 'status' not in jsondict and opts.cache:
453  print_from_cache(opts.cache, query)
454  if 'status' not in jsondict:
455  print('DAS record without status field:\n%s' % jsondict)
456  sys.exit(EX_PROTOCOL)
457  if jsondict["status"] != 'ok' and opts.cache:
458  print_from_cache(opts.cache, query)
459  if jsondict['status'] != 'ok':
460  print("status: %s, reason: %s" \
461  % (jsondict.get('status'), jsondict.get('reason', 'N/A')))
462  if opts.retry:
463  found = False
464  for attempt in xrange(1, int(opts.retry)):
465  interval = log(attempt)**5
466  print("Retry in %5.3f sec" % interval)
467  time.sleep(interval)
468  data = get_data(host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
469  jsondict = json.loads(data)
470  if jsondict.get('status', 'fail') == 'ok':
471  found = True
472  break
473  else:
474  sys.exit(EX_TEMPFAIL)
475  if not found:
476  sys.exit(EX_TEMPFAIL)
477  nres = jsondict.get('nresults', 0)
478  if not limit:
479  drange = '%s' % nres
480  else:
481  drange = '%s-%s out of %s' % (idx+1, idx+limit, nres)
482  if opts.limit:
483  msg = "\nShowing %s results" % drange
484  msg += ", for more results use --idx/--limit options\n"
485  print(msg)
486  mongo_query = jsondict.get('mongo_query', {})
487  unique = False
488  fdict = mongo_query.get('filters', {})
489  filters = fdict.get('grep', [])
490  aggregators = mongo_query.get('aggregators', [])
491  if 'unique' in fdict.keys():
492  unique = True
493  if filters and not aggregators:
494  data = jsondict['data']
495  if isinstance(data, dict):
496  rows = [r for r in get_value(data, filters, base)]
497  print(' '.join(rows))
498  elif isinstance(data, list):
499  if unique:
500  data = unique_filter(data)
501  for row in data:
502  rows = [r for r in get_value(row, filters, base)]
503  types = [type(r) for r in rows]
504  if len(types)>1: # mixed types print as is
505  print(' '.join([str(r) for r in rows]))
506  elif isinstance(rows[0], list):
507  out = set()
508  for item in rows:
509  for elem in item:
510  out.add(elem)
511  print(' '.join(out))
512  else:
513  print(' '.join(rows))
514  else:
515  print(json.dumps(jsondict))
516  elif aggregators:
517  data = jsondict['data']
518  if unique:
519  data = unique_filter(data)
520  for row in data:
521  if row['key'].find('size') != -1 and \
522  row['function'] == 'sum':
523  val = size_format(row['result']['value'], base)
524  else:
525  val = row['result']['value']
526  print('%s(%s)=%s' \
527  % (row['function'], row['key'], val))
528  else:
529  data = jsondict['data']
530  if isinstance(data, list):
531  old = None
532  val = None
533  for row in data:
534  prim_key = row.get('das', {}).get('primary_key', None)
535  if prim_key == 'summary':
536  print_summary(row)
537  return
538  val = prim_value(row)
539  if not opts.limit:
540  if val != old:
541  print(val)
542  old = val
543  else:
544  print(val)
545  if val != old and not opts.limit:
546  print(val)
547  elif isinstance(data, dict):
548  print(prim_value(data))
549  else:
550  print(data)
551  else:
552  jsondict = get_data(\
553  host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
554  print(json.dumps(jsondict))
555 
556 #
557 # main
558 #
559 if __name__ == '__main__':
560  main()
def extract_value(row, key, base=10)
Definition: das_client.py:233
def get_value(data, filters, base=10)
Definition: das_client.py:248
def check_auth(key)
Definition: das_client.py:102
def keys_attrs(lkey, oformat, host, ckey, cert, debug=0)
Definition: das_client.py:387
def get_data(host, query, idx, limit, debug, threshold=300, ckey=None, cert=None, capath=None, qcache=0, das_headers=True)
Definition: das_client.py:276
def prim_value(row)
Definition: das_client.py:350
def print_summary(rec)
Definition: das_client.py:365
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
def parse_args(args)
Definition: main.py:63
def size_format(uinput, ibase=0)
Definition: das_client.py:182
def __init__(self, key=None, cert=None, capath=None, level=0)
Definition: das_client.py:57
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def get_connection(self, host, timeout=300)
Definition: das_client.py:73
def fullpath(path)
Definition: das_client.py:267
def x509()
Definition: das_client.py:85
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
def print_from_cache(cache, query)
Definition: das_client.py:378
def unique_filter(rows)
Definition: das_client.py:203
Definition: main.py:1
def check_glidein()
Definition: das_client.py:94
#define str(s)
def convert_time(val)
Definition: das_client.py:176
def main()
Definition: das_client.py:424
def exit(msg="")