8 from __future__
import print_function
9 __author__ =
"Valentin Kuznetsov" 15 if sys.version_info < (2, 6):
16 raise Exception(
"DAS requires python 2.6 or greater")
18 DAS_CLIENT =
'das-client/1.1::python/%s.%s' % sys.version_info[:2]
29 from optparse
import OptionParser
31 from types
import GeneratorType
54 Simple HTTPS client authentication class based on provided 57 def __init__(self, key=None, cert=None, capath=None, level=0):
59 urllib2.HTTPSHandler.__init__(self, debuglevel=1)
61 urllib2.HTTPSHandler.__init__(self)
67 """Open request method""" 74 """Connection method""" 76 return httplib.HTTPSConnection(host, key_file=self.
key,
79 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
80 context.load_verify_locations(capath=self.
capath)
81 context.load_cert_chain(self.
cert)
82 return httplib.HTTPSConnection(host, context=context)
83 return httplib.HTTPSConnection(host)
86 "Helper function to get x509 either from env or tmp file" 87 proxy = os.environ.get(
'X509_USER_PROXY',
'')
89 proxy =
'/tmp/x509up_u%s' % pwd.getpwuid( os.getuid() ).pw_uid
90 if not os.path.isfile(proxy):
95 "Check glideine environment and exit if it is set" 96 glidein = os.environ.get(
'GLIDEIN_CMSSite',
'')
98 msg =
"ERROR: das_client is running from GLIDEIN environment, it is prohibited" 103 "Check if user runs das_client with key/cert and warn users to switch" 105 msg =
"WARNING: das_client is running without user credentials/X509 proxy, create proxy via 'voms-proxy-init -voms cms -rfc'" 106 print(msg, file=sys.stderr)
110 DAS cache client option parser 113 usage =
"Usage: %prog [options]\n" 114 usage +=
"For more help please visit https://cmsweb.cern.ch/das/faq" 116 self.
parser.add_option(
"-v",
"--verbose", action=
"store",
117 type=
"int", default=0, dest=
"verbose",
118 help=
"verbose output")
119 self.
parser.add_option(
"--query", action=
"store", type=
"string",
120 default=
False, dest=
"query",
121 help=
"specify query for your request")
122 msg =
"host name of DAS cache server, default is https://cmsweb.cern.ch" 123 self.
parser.add_option(
"--host", action=
"store", type=
"string",
124 default=
'https://cmsweb.cern.ch', dest=
"host", help=msg)
125 msg =
"start index for returned result set, aka pagination," 126 msg +=
" use w/ limit (default is 0)" 127 self.
parser.add_option(
"--idx", action=
"store", type=
"int",
128 default=0, dest=
"idx", help=msg)
129 msg =
"number of returned results (default is 10)," 130 msg +=
" use --limit=0 to show all results" 131 self.
parser.add_option(
"--limit", action=
"store", type=
"int",
132 default=10, dest=
"limit", help=msg)
133 msg =
'specify return data format (json or plain), default plain.' 134 self.
parser.add_option(
"--format", action=
"store", type=
"string",
135 default=
"plain", dest=
"format", help=msg)
136 msg =
'query waiting threshold in sec, default is 5 minutes' 137 self.
parser.add_option(
"--threshold", action=
"store", type=
"int",
138 default=300, dest=
"threshold", help=msg)
139 msg =
'specify private key file name, default $X509_USER_PROXY' 140 self.
parser.add_option(
"--key", action=
"store", type=
"string",
141 default=
x509(), dest=
"ckey", help=msg)
142 msg =
'specify private certificate file name, default $X509_USER_PROXY' 143 self.
parser.add_option(
"--cert", action=
"store", type=
"string",
144 default=
x509(), dest=
"cert", help=msg)
145 msg =
'specify CA path, default $X509_CERT_DIR' 146 self.
parser.add_option(
"--capath", action=
"store", type=
"string",
147 default=os.environ.get(
"X509_CERT_DIR",
""),
148 dest=
"capath", help=msg)
149 msg =
'specify number of retries upon busy DAS server message' 150 self.
parser.add_option(
"--retry", action=
"store", type=
"string",
151 default=0, dest=
"retry", help=msg)
152 msg =
'show DAS headers in JSON format' 153 msg +=
' (obsolete, keep for backward compatibility)' 154 self.
parser.add_option(
"--das-headers", action=
"store_true",
155 default=
False, dest=
"das_headers", help=msg)
156 msg =
'specify power base for size_format, default is 10 (can be 2)' 157 self.
parser.add_option(
"--base", action=
"store", type=
"int",
158 default=0, dest=
"base", help=msg)
160 msg =
'a file which contains a cached json dictionary for query -> files mapping' 161 self.
parser.add_option(
"--cache", action=
"store", type=
"string",
162 default=
None, dest=
"cache", help=msg)
164 msg =
'a query cache value' 165 self.
parser.add_option(
"--query-cache", action=
"store", type=
"int",
166 default=0, dest=
"qcache", help=msg)
167 msg =
'List DAS key/attributes, use "all" or specific DAS key value, e.g. site' 168 self.
parser.add_option(
"--list-attributes", action=
"store", type=
"string",
169 default=
"", dest=
"keys_attrs", help=msg)
172 Returns parse list of options 177 "Convert given timestamp into human readable format" 178 if isinstance(val, int)
or isinstance(val, float):
179 return time.strftime(
'%d/%b/%Y_%H:%M:%S_GMT', time.gmtime(val))
184 Format file size utility, it converts file size into KB, MB, GB, TB, PB units 190 except Exception
as _exc:
194 xlist = [
'',
'KiB',
'MiB',
'GiB',
'TiB',
'PiB']
197 xlist = [
'',
'KB',
'MB',
'GB',
'TB',
'PB']
200 return "%3.1f%s" % (num, xxx)
205 Unique filter drop duplicate rows. 214 del row_data[
'das_id']
215 del row_data[
'cache_id']
218 old_data = dict(old_row)
222 del old_data[
'das_id']
223 del old_data[
'cache_id']
226 if row_data == old_data:
234 """Generator which extracts row[key] value""" 235 if isinstance(row, dict)
and key
in row:
236 if key ==
'creation_time':
243 if isinstance(row, list)
or isinstance(row, GeneratorType):
249 """Filter data from a row for given list of filters""" 251 if ftr.find(
'>') != -1
or ftr.find(
'<') != -1
or ftr.find(
'=') != -1:
255 keys = ftr.split(
'.')
259 values += [json.dumps(i)
for i
in val]
268 "Expand path to full path" 269 if path
and path[0] ==
'~':
270 path = path.replace(
'~',
'')
271 path = path[1:]
if path[0] ==
'/' else path
272 path = os.path.join(os.environ[
'HOME'], path)
275 def get_data(host, query, idx, limit, debug, threshold=300, ckey=None,
276 cert=None, capath=None, qcache=0, das_headers=True):
277 """Contact DAS server and retrieve data for given DAS query""" 278 params = {
'input':query,
'idx':idx,
'limit':limit}
280 params[
'qcache'] = qcache
282 pat = re.compile(
'http[s]{0,1}://')
283 if not pat.match(host):
284 msg =
'Invalid hostname: %s' % host
287 client =
'%s (%s)' % (DAS_CLIENT, os.environ.get(
'USER',
''))
288 headers = {
"Accept":
"application/json",
"User-Agent": client}
289 encoded_data = urllib.urlencode(params, doseq=
True)
290 url +=
'?%s' % encoded_data
291 req = urllib2.Request(url=url, headers=headers)
296 elif cert
and capath:
300 http_hdlr = urllib2.HTTPHandler(debuglevel=debug)
301 proxy_handler = urllib2.ProxyHandler({})
302 cookie_jar = cookielib.CookieJar()
303 cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
305 opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
306 fdesc = opener.open(req)
309 except urllib2.HTTPError
as error:
313 pat = re.compile(
r'^[a-z0-9]{32}')
314 if data
and isinstance(data, str)
and pat.match(data)
and len(data) == 32:
323 params.update({
'pid':data})
324 encoded_data = urllib.urlencode(params, doseq=
True)
325 url = host + path +
'?%s' % encoded_data
326 req = urllib2.Request(url=url, headers=headers)
328 fdesc = opener.open(req)
331 except urllib2.HTTPError
as err:
332 return {
"status":
"fail",
"reason":
str(err)}
333 if data
and isinstance(data, str)
and pat.match(data)
and len(data) == 32:
344 if (time.time()-time0) > threshold:
345 reason =
"client timeout after %s sec" %
int(time.time()-time0)
346 return {
"status":
"fail",
"reason":reason}
347 jsondict = json.loads(data)
351 """Extract primary key value from DAS record""" 352 prim_key = row[
'das'][
'primary_key']
353 if prim_key ==
'summary':
354 return row.get(prim_key,
None)
355 key, att = prim_key.split(
'.')
356 if isinstance(row[key], list):
357 for item
in row[key]:
366 "Print summary record information on stdout" 367 if 'summary' not in rec:
368 msg =
'Summary information is not found in record:\n', rec
370 for row
in rec[
'summary']:
371 keys = [k
for k
in row.keys()]
372 maxlen =
max([len(k)
for k
in keys])
373 for key, val
in row.items():
374 pkey =
'%s%s' % (key,
' '*(maxlen-len(key)))
375 print(
'%s: %s' % (pkey, val))
379 "print the list of files reading it from cache" 380 data = open(cache).
read()
381 jsondict = json.loads(data)
382 if query
in jsondict:
388 "Contact host for list of key/attributes pairs" 389 url =
'%s/das/keys?view=json' % host
390 headers = {
"Accept":
"application/json",
"User-Agent": DAS_CLIENT}
391 req = urllib2.Request(url=url, headers=headers)
397 http_hdlr = urllib2.HTTPHandler(debuglevel=debug)
398 proxy_handler = urllib2.ProxyHandler({})
399 cookie_jar = cookielib.CookieJar()
400 cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
401 opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
402 fdesc = opener.open(req)
403 data = json.load(fdesc)
405 if oformat.lower() ==
'json':
407 print(json.dumps(data))
409 print(json.dumps({lkey:data[lkey]}))
411 for key, vdict
in data.items():
417 print(
"DAS key:", key)
418 for attr, examples
in vdict.items():
420 print(
'%s%s' % (prefix, attr))
421 for item
in examples:
422 print(
'%s%s%s' % (prefix, prefix, item))
427 opts, _ = optmgr.get_opt()
442 keys_attrs(opts.keys_attrs, opts.format, host, ckey, cert, debug)
445 print(
'Input query is missing')
447 if opts.format ==
'plain':
448 jsondict =
get_data(host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
449 cli_msg = jsondict.get(
'client_message',
None)
451 print(
"DAS CLIENT WARNING: %s" % cli_msg)
452 if 'status' not in jsondict
and opts.cache:
454 if 'status' not in jsondict:
455 print(
'DAS record without status field:\n%s' % jsondict)
456 sys.exit(EX_PROTOCOL)
457 if jsondict[
"status"] !=
'ok' and opts.cache:
459 if jsondict[
'status'] !=
'ok':
460 print(
"status: %s, reason: %s" \
461 % (jsondict.get(
'status'), jsondict.get(
'reason',
'N/A')))
464 for attempt
in xrange(1,
int(opts.retry)):
465 interval =
log(attempt)**5
466 print(
"Retry in %5.3f sec" % interval)
468 data =
get_data(host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
469 jsondict = json.loads(data)
470 if jsondict.get(
'status',
'fail') ==
'ok':
474 sys.exit(EX_TEMPFAIL)
476 sys.exit(EX_TEMPFAIL)
477 nres = jsondict.get(
'nresults', 0)
481 drange =
'%s-%s out of %s' % (idx+1, idx+limit, nres)
483 msg =
"\nShowing %s results" % drange
484 msg +=
", for more results use --idx/--limit options\n" 486 mongo_query = jsondict.get(
'mongo_query', {})
488 fdict = mongo_query.get(
'filters', {})
489 filters = fdict.get(
'grep', [])
490 aggregators = mongo_query.get(
'aggregators', [])
491 if 'unique' in fdict.keys():
493 if filters
and not aggregators:
494 data = jsondict[
'data']
495 if isinstance(data, dict):
496 rows = [r
for r
in get_value(data, filters, base)]
498 elif isinstance(data, list):
502 rows = [r
for r
in get_value(row, filters, base)]
503 types = [type(r)
for r
in rows]
506 elif isinstance(rows[0], list):
515 print(json.dumps(jsondict))
517 data = jsondict[
'data']
521 if row[
'key'].
find(
'size') != -1
and \
522 row[
'function'] ==
'sum':
525 val = row[
'result'][
'value']
527 % (row[
'function'], row[
'key'], val))
529 data = jsondict[
'data']
530 if isinstance(data, list):
534 prim_key = row.get(
'das', {}).get(
'primary_key',
None)
535 if prim_key ==
'summary':
545 if val != old
and not opts.limit:
547 elif isinstance(data, dict):
553 host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
554 print(json.dumps(jsondict))
559 if __name__ ==
'__main__':
def extract_value(row, key, base=10)
def get_value(data, filters, base=10)
def keys_attrs(lkey, oformat, host, ckey, cert, debug=0)
def get_data(host, query, idx, limit, debug, threshold=300, ckey=None, cert=None, capath=None, qcache=0, das_headers=True)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
def size_format(uinput, ibase=0)
def __init__(self, key=None, cert=None, capath=None, level=0)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def get_connection(self, host, timeout=300)
static std::string join(char **cmd)
def print_from_cache(cache, query)
def https_open(self, req)