CMS 3D CMS Logo

tier0.py
Go to the documentation of this file.
1 from __future__ import print_function
2 
3 #-toDo: move this to common?
4 
5 import logging
6 import json
7 import os
8 import sys
9 import time
10 import subprocess
11 
12 import pycurl
13 
14 tier0Url = os.getenv('TIER0_API_URL', 'https://cmsweb.cern.ch/t0wmadatasvc/prod/')
15 
17  '''Tier0 exception.
18  '''
19 
20  def __init__(self, message):
21  self.args = (message, )
22 
23 
24 def unique(seq, keepstr=True):
25  t = type(seq)
26  if t is str:
27  t = (list, t('').join)[bool(keepstr)]
28  try:
29  remaining = set(seq)
30  seen = set()
31  return t(c for c in seq if (c in remaining and not remaining.remove(c)))
32  except TypeError: # hashing didn't work, see if seq is sortable
33  try:
34  from itertools import groupby
35  s = sorted(enumerate(seq),key=lambda i_v1:(i_v1[1],i_v1[0]))
36  return t(next(g) for k,g in groupby(s, lambda i_v: i_v[1]))
37  except: # not sortable, use brute force
38  seen = []
39  return t(c for c in seq if not (c in seen or seen.append(c)))
40 
41 #note: this exception seems unused
43 
44  def __init__( self, curl, response, proxy, timeout, maxTime ):
45  super( ResponseError, self ).__init__( response )
46  self.args += ( curl, proxy )
47  self.timeout = timeout
48  self.maxTime = maxTime
49 
50  def __str__(self):
51  errStr = f'Wrong response for curl connection to Tier0DataSvc'\
52  f' from URL "{self.args[1].getinfo(self.args[1].EFFECTIVE_URL)}"'
53  if self.args[-1]:
54  errStr += f' using proxy "{str(self.args[-1])}"'
55  errStr += f' with connection-timeout "{self.timeout}", max-time "{self.maxtime}"'\
56  f' with error code "{self.args[1].getinfo(self.args[1].RESPONSE_CODE)}".'
57  if '<p>' in self.args[0]:
58  full_response = self.args[0].partition('<p>')[-1].rpartition('</p>')[0]
59  errStr += f'\nFull response: "{full_response}".'
60  else:
61  errStr += f'\nFull response: "{self.args[0]}".'
62 
63  return errStr
64 
65 #TODO: Add exceptions for each category of HTTP error codes
66 #TODO: check response code and raise corresponding exceptions
67 #note: this function seems to be unused
68 def _raise_http_error( curl, response, proxy, timeout, maxTime ):
69  raise ResponseError( curl, response, proxy, timeout, maxTime )
70 
72 
73  def __init__( self, uri, timeOut, maxTime, retries, retryPeriod, proxy, debug ):
74  """
75  Parameters:
76  uri: Tier0DataSvc URI;
77  timeOut: time out for connection of Tier0DataSvc HTTPS calls [seconds];
78  maxTime: maximum time for Tier0DataSvc HTTPS calls (including data transfer) [seconds];
79  retries: maximum retries for Tier0DataSvc HTTPS calls;
80  retryPeriod: sleep time between two Tier0DataSvc HTTPS calls [seconds];
81  proxy: HTTP proxy for accessing Tier0DataSvc HTTPS calls;
82  debug: if set to True, enables debug information.
83  """
84  self._uri = uri
85  self._timeOut = timeOut
86  self._maxTime = maxTime
87  self._retries = retries
88  self._retryPeriod = retryPeriod
89  self._proxy = proxy
90  self._debug = debug
91 
92  def setDebug( self ):
93  self._debug = True
94 
95  def unsetDebug( self ):
96  self._debug = False
97 
98  def setProxy( self, proxy ):
99  self._proxy = proxy
100 
101  def _getCerts( self ) -> str:
102  cert_path = os.getenv('X509_USER_CERT', '')
103  key_path = os.getenv('X509_USER_KEY', '')
104 
105  certs = ""
106  if cert_path:
107  certs += f' --cert {cert_path}'
108  else:
109  logging.warning("No certificate provided for Tier0 access, use X509_USER_CERT and"
110  " optionally X509_USER_KEY env variables to specify the path to the cert"
111  " (and the key unless included in the cert file)")
112  if key_path:
113  certs += f' --key {key_path}'
114  return certs
115 
116  def _curlQueryTier0( self, url:str, force_debug:bool = False, force_cert:bool = False):
117  userAgent = "User-Agent: ConditionWebServices/1.0 python/%d.%d.%d PycURL/%s" \
118  % ( sys.version_info[ :3 ] + ( pycurl.version_info()[ 1 ], ) )
119  debug = "-v" if self._debug or force_debug else "-s -S"
120 
121  proxy = f"--proxy {self._proxy}" if self._proxy else ""
122  certs = self._getCerts() if not self._proxy or force_cert else ""
123 
124  cmd = f'/usr/bin/curl -k -L --user-agent "{userAgent}" {proxy}'\
125  f' --connect-timeout {self._timeOut} --max-time {self._maxTime} --retry {self._retries}'\
126  f' {debug} {url} {certs}'
127 
128  # time the curl to understand if re-tries have been carried out
129  start = time.time()
130  process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
131  (stdoutdata, stderrdata) = process.communicate()
132  end = time.time()
133  return process.returncode, stdoutdata, stderrdata, end-start
134 
135  def _queryTier0DataSvc( self, url ):
136  """
137  Queries Tier0DataSvc.
138  url: Tier0DataSvc URL.
139  @returns: dictionary, from whence the required information must be retrieved according to the API call.
140  Raises if connection error, bad response, or timeout after retries occur.
141  """
142 
143  retcode, stdoutdata, stderrdata, query_time = self._curlQueryTier0(url)
144 
145  if retcode != 0 or stderrdata:
146 
147  # if the first curl has failed, logg its stderror and prepare and independent retry
148  msg = "looks like curl returned an error: retcode=%s and took %s seconds" % (retcode, query_time,)
149  msg += ' msg = "'+str(stderrdata)+'"'
150  logging.error(msg)
151  if self._proxy:
152  logging.info("before assumed proxy provides authentication, now trying with both proxy and certificate")
153 
154  time.sleep(self._retryPeriod)
155  retcode, stdoutdata, stderrdata, query_time = self._curlQueryTier0(url, force_debug=True, force_cert=True)
156  if retcode != 0:
157  msg = "looks like curl returned an error for the second time: retcode=%s" % (retcode,)
158  msg += ' msg = "'+str(stderrdata)+'"'
159  logging.error(msg)
160  raise Tier0Error(msg)
161  else:
162  msg = "curl returned ok upon the second try"
163  logging.info(msg)
164  resp = json.loads( ''.join(stdoutdata.decode()).replace( "'", '"').replace(' None', ' "None"') )
165  return resp
166 
167 
168  def getFirstSafeRun( self ):
169  """
170  Queries Tier0DataSvc to get the first condition safe run.
171  Parameters:
172  @returns: integer, the run number.
173  Raises if connection error, bad response, timeout after retries occur, or if the run number is not available.
174  """
175  firstConditionSafeRunAPI = "firstconditionsaferun"
176  safeRunDict = self._queryTier0DataSvc( os.path.join( self._uri, firstConditionSafeRunAPI ) )
177  if safeRunDict is None:
178  errStr = """First condition safe run is not available in Tier0DataSvc from URL \"%s\"""" \
179  %( os.path.join( self._uri, firstConditionSafeRunAPI ), )
180  if self._proxy:
181  errStr += """ using proxy \"%s\".""" %( str( self._proxy ), )
182  raise Tier0Error( errStr )
183  return int(safeRunDict['result'][0])
184 
185  def getGlobalTag( self, config ):
186  """
187  Queries Tier0DataSvc to get the most recent Global Tag for a given workflow.
188  Parameters:
189  config: Tier0DataSvc API call for the workflow to be looked for;
190  @returns: a string with the Global Tag name.
191  Raises if connection error, bad response, timeout after retries occur, or if no Global Tags are available.
192  """
193  data = self._queryTier0DataSvc( os.path.join( self._uri, config ) )
194  gtnames = sorted(unique( [ str( di['global_tag'] ) for di in data['result'] if di['global_tag'] is not None ] ))
195  try:
196  recentGT = gtnames[-1]
197  return recentGT
198  except IndexError:
199  errStr = """No Global Tags for \"%s\" are available in Tier0DataSvc from URL \"%s\"""" \
200  %( config, os.path.join( self._uri, config ) )
201  if self._proxy:
202  errStr += """ using proxy \"%s\".""" %( str( self._proxy ), )
203  raise Tier0Error( errStr )
204 
205 
206 def test( url ):
207  t0 = Tier0Handler( url, 1, 5, 1, 10, None, debug=False)
208 
209  print(' fcsr = %s (%s)' % (t0.getFirstSafeRun(), type(t0.getFirstSafeRun()) ))
210  print(' reco_config = %s' % t0.getGlobalTag('reco_config'))
211  print(' express_config = %s' % t0.getGlobalTag('express_config'))
212  print('\n')
213 
214 
215 if __name__ == '__main__':
216  test( tier0Url )
def _raise_http_error(curl, response, proxy, timeout, maxTime)
Definition: tier0.py:68
def __str__(self)
Definition: tier0.py:50
def _getCerts(self)
Definition: tier0.py:101
def getFirstSafeRun(self)
Definition: tier0.py:168
def __init__(self, message)
Definition: tier0.py:20
def replace(string, replacements)
def __init__(self, uri, timeOut, maxTime, retries, retryPeriod, proxy, debug)
Definition: tier0.py:73
def _queryTier0DataSvc(self, url)
Definition: tier0.py:135
def __init__(self, curl, response, proxy, timeout, maxTime)
Definition: tier0.py:44
def getGlobalTag(self, config)
Definition: tier0.py:185
def setDebug(self)
Definition: tier0.py:92
def unique(seq, keepstr=True)
Definition: tier0.py:24
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def test(url)
Definition: tier0.py:206
static std::string join(char **cmd)
Definition: RemoteFile.cc:21
def unsetDebug(self)
Definition: tier0.py:95
#define str(s)
def _curlQueryTier0
Definition: tier0.py:116
def setProxy(self, proxy)
Definition: tier0.py:98