CMS 3D CMS Logo

conddb_version_mgr.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import cx_Oracle
4 import datetime
5 import calendar
6 import sys
7 import logging
8 import CondCore.Utilities.conddb_serialization_metadata as sm
9 import CondCore.Utilities.credentials as auth
10 import os
11 
12 authPathEnvVar = 'COND_AUTH_PATH'
13 prod_db_service = ('cms_orcon_prod',{'w':'cms_orcon_prod/cms_cond_general_w','r':'cms_orcon_prod/cms_cond_general_r'})
14 adg_db_service = ('cms_orcon_adg',{'r':'cms_orcon_adg/cms_cond_general_r'})
15 dev_db_service = ('cms_orcoff_prep',{'w':'cms_orcoff_prep/cms_cond_general_w','r':'cms_orcoff_prep/cms_cond_general_r'})
16 schema_name = 'CMS_CONDITIONS'
17 
18 fmt_str = "[%(asctime)s] %(levelname)s: %(message)s"
19 logLevel = logging.INFO
20 logFormatter = logging.Formatter(fmt_str)
21 
22 def print_table( headers, table ):
23  ws = []
24  for h in headers:
25  ws.append(len(h))
26  for row in table:
27  ind = 0
28  for c in row:
29  c = str(c)
30  if ind<len(ws):
31  if len(c)> ws[ind]:
32  ws[ind] = len(c)
33  ind += 1
34 
35  def printf( row ):
36  line = ''
37  ind = 0
38  for w in ws:
39  fmt = '{:<%s}' %w
40  if ind<len(ws):
41  line += (fmt.format( row[ind] )+' ')
42  ind += 1
43  print line
44  printf( headers )
45  hsep = ''
46  for w in ws:
47  fmt = '{:-<%s}' %w
48  hsep += (fmt.format('')+' ')
49  print hsep
50  for row in table:
51  printf( row )
52 
54  def __init__(self, db ):
55  self.db = db
56  self.cmssw_boost_map = {}
57  self.boost_run_map = []
58 
59  def fetch_cmssw_boost_map( self ):
60  cursor = self.db.cursor()
61  cursor.execute('SELECT BOOST_VERSION, CMSSW_VERSION FROM CMSSW_BOOST_MAP');
62  rows = cursor.fetchall()
63  self.cmssw_boost_map = {}
64  for r in rows:
65  self.cmssw_boost_map[r[1]]=r[0]
66  return self.cmssw_boost_map
67 
68  def fetch_boost_run_map( self ):
69  cursor = self.db.cursor()
70  cursor.execute('SELECT RUN_NUMBER, RUN_START_TIME, BOOST_VERSION, INSERTION_TIME FROM BOOST_RUN_MAP ORDER BY RUN_NUMBER, INSERTION_TIME')
71  rows = cursor.fetchall()
72  self.boost_run_map = []
73  for r in rows:
74  self.boost_run_map.append( (r[0],r[1],r[2],str(r[3])) )
75  return self.boost_run_map
76 
77  def insert_boost_run_range( self, run, boost_version ):
78  cursor = self.db.cursor()
79  cursor.execute('SELECT MIN(RUN_NUMBER) FROM RUN_INFO WHERE RUN_NUMBER >= :RUN',(run,))
80  min_run = cursor.fetchone()[0]
81  cursor.execute('SELECT START_TIME FROM RUN_INFO WHERE RUN_NUMBER=:RUN',(min_run,))
82  min_run_time = cursor.fetchone()[0]
83  min_run_ts = calendar.timegm( min_run_time.utctimetuple() ) << 32
84  now = datetime.datetime.utcnow()
85  cursor.execute('INSERT INTO BOOST_RUN_MAP ( RUN_NUMBER, RUN_START_TIME, BOOST_VERSION, INSERTION_TIME ) VALUES (:RUN, :RUN_START_T, :BOOST, :TIME)',(run,min_run_ts,boost_version,now) )
86 
87  def insert_cmssw_boost( self, cmssw_version,boost_version ):
88  cursor = self.db.cursor()
89  cursor.execute('INSERT INTO CMSSW_BOOST_MAP ( CMSSW_VERSION, BOOST_VERSION ) VALUES ( :CMSSW_VERSION, :BOOST_VERSION )',(cmssw_version,boost_version))
90 
91  def lookup_boost_in_cmssw( self, cmssw_version ):
92  cmssw_v = sm.check_cmssw_version( cmssw_version )
93  the_arch = None
94  releaseRoot = None
95  if sm.is_release_cycle( cmssw_v ):
96  cmssw_v = sm.strip_cmssw_version( cmssw_v )
97  archs = sm.get_production_arch( cmssw_v )
98  for arch in archs:
99  path = sm.get_release_root( cmssw_v, arch )
100  if os.path.exists(os.path.join(path,cmssw_v)):
101  releaseRoot = path
102  the_arch = arch
103  break
104  if releaseRoot is None:
105  for arch in archs:
106  the_arch = arch
107  releaseRoot = sm.get_release_root( cmssw_v, arch )
108  for r in sorted (os.listdir( releaseRoot )):
109  if r.startswith(cmssw_v):
110  cmssw_v = r
111  logging.debug('Boost version will be verified in release %s' %cmssw_v)
112 
113  if cmssw_v in self.cmssw_boost_map.keys():
114  return self.cmssw_boost_map[cmssw_v]
115 
116  if releaseRoot is None:
117  archs = sm.get_production_arch( cmssw_v )
118  for arch in archs:
119  path = sm.get_release_root( cmssw_v, arch )
120  if os.path.exists(os.path.join(path,cmssw_v)):
121  releaseRoot = path
122  the_arch = arch
123  break
124  logging.debug('Release path: %s' %releaseRoot)
125  boost_version = sm.get_cmssw_boost( the_arch, '%s/%s' %(releaseRoot,cmssw_v) )
126  if not boost_version is None:
127  self.cmssw_boost_map[cmssw_v] = boost_version
128  self.insert_cmssw_boost( cmssw_v,boost_version )
129  return boost_version
130 
131  def populate_for_gts( self ):
132  cursor = self.db.cursor()
133  cursor.execute('SELECT DISTINCT(RELEASE) FROM GLOBAL_TAG')
134  rows = cursor.fetchall()
135  for r in rows:
136  self.lookup_boost_in_cmssw( r[0] )
137 
139  def __init__( self ):
140  self.db = None
141  self.version_db = None
142  self.args = None
143  self.logger = logging.getLogger()
144  self.logger.setLevel(logLevel)
145  consoleHandler = logging.StreamHandler(sys.stdout)
146  consoleHandler.setFormatter(logFormatter)
147  self.logger.addHandler(consoleHandler)
148  self.iovs = None
149  self.versionIovs = None
150 
151  def connect( self ):
152  if self.args.db is None:
153  self.args.db = 'pro'
154  if self.args.db == 'dev' or self.args.db == 'oradev' :
155  db_service = dev_db_service
156  elif self.args.db == 'orapro':
157  db_service = adg_db_service
158  elif self.args.db != 'onlineorapro' or self.args.db != 'pro':
159  db_service = prod_db_service
160  else:
161  raise Exception("Database '%s' is not known." %args.db )
162  if self.args.accessType not in db_service[1].keys():
163  raise Exception('The specified database connection %s does not support the requested action.' %db_service[0])
164  service = db_service[1][self.args.accessType]
165  creds = auth.get_credentials( authPathEnvVar, service, self.args.auth )
166  if creds is None:
167  raise Exception("Could not find credentials for service %s" %service)
168  (username, account, pwd) = creds
169  connStr = '%s/%s@%s' %(username,pwd,db_service[0])
170  self.db = cx_Oracle.connect(connStr)
171  logging.info('Connected to %s as user %s' %(db_service[0],username))
172  self.db.current_schema = schema_name
173 
174  def process_tag_boost_version( self, t, timetype, tagBoostVersion, timeCut ):
175  if self.iovs is None:
176  self.iovs = []
177  cursor = self.db.cursor()
178  stmt = 'SELECT IOV.SINCE SINCE, IOV.INSERTION_TIME INSERTION_TIME, P.STREAMER_INFO STREAMER_INFO FROM TAG, IOV, PAYLOAD P WHERE TAG.NAME = IOV.TAG_NAME AND P.HASH = IOV.PAYLOAD_HASH AND TAG.NAME = :TAG_NAME'
179  params = (t,)
180  stmt = stmt + ' ORDER BY SINCE'
181  logging.debug('Executing: "%s"' %stmt)
182  cursor.execute(stmt,params)
183  for r in cursor:
184  streamer_info = str(r[2].read())
185  self.iovs.append((r[0],r[1],streamer_info))
186  niovs = 0
187  self.versionIovs = []
188  lastBoost = None
189  update = False
190  if tagBoostVersion is not None:
191  update = True
192  for iov in self.iovs:
193  if timeCut is not None:
194  if tagBoostVersion is not None:
195  if timeCut > iov[1]:
196  continue
197  else:
198  if timeCut < iov[1]:
199  continue
200  niovs += 1
201  iovBoostVersion, tagBoostVersion = sm.update_tag_boost_version( tagBoostVersion, iov[2], iov[0], timetype, self.version_db.boost_run_map )
202  logging.debug('iov: %s - boost version: %s - streamer: %s' %(iov[0],iovBoostVersion,iov[2]))
203  if lastBoost is None or lastBoost!=iovBoostVersion:
204  self.versionIovs.append((iov[0],iovBoostVersion))
205  lastBoost = iovBoostVersion
206 
207  if tagBoostVersion is None:
208  if niovs == 0:
209  logging.warning( 'No iovs found. boost version cannot be determined.')
210  return None
211  else:
212  logging.error('Could not determine the tag boost version.' )
213  return None
214  else:
215  if niovs == 0:
216  logging.info('Tag boost version has not changed.')
217  else:
218  msg = 'Found tag boost version %s combining payloads from %s iovs' %(tagBoostVersion,niovs)
219  if timeCut is not None:
220  if update:
221  msg += ' (iov insertion time>%s)' %str(timeCut)
222  else:
223  msg += ' (iov insertion time<%s)' %str(timeCut)
224  logging.info( msg )
225  return tagBoostVersion
226 
227  def validate_boost_version( self, t, timetype, tagBoostVersion ):
228  cursor = self.db.cursor()
229  cursor.execute('SELECT GT.NAME, GT.RELEASE, GT.SNAPSHOT_TIME FROM GLOBAL_TAG GT, GLOBAL_TAG_MAP GTM WHERE GT.NAME = GTM.GLOBAL_TAG_NAME AND GTM.TAG_NAME = :TAG_NAME',(t,))
230  rows = cursor.fetchall()
231  invalid_gts = []
232  ngt = 0
233  gts = []
234  for r in rows:
235  gts.append((r[0],r[1],r[2]))
236  if len(gts)>0:
237  logging.info('validating %s gts.' %len(gts))
238  boost_snapshot_map = {}
239  for gt in gts:
240  ngt += 1
241  logging.debug('Validating for GT %s (release %s)' %(gt[0],gt[1]))
242  gtCMSSWVersion = sm.check_cmssw_version( gt[1] )
243  gtBoostVersion = self.version_db.lookup_boost_in_cmssw( gtCMSSWVersion )
244  if sm.cmp_boost_version( gtBoostVersion, tagBoostVersion )<0:
245  logging.warning( 'The boost version computed from all the iovs in the tag (%s) is incompatible with the gt [%s] %s (consuming ver: %s, snapshot: %s)' %(tagBoostVersion,ngt,gt[0],gtBoostVersion,str(gt[2])))
246  if str(gt[2]) not in boost_snapshot_map.keys():
247  tagSnapshotBoostVersion = None
248  tagSnapshotBoostVersion = self.process_tag_boost_version(t, timetype, tagSnapshotBoostVersion, gt[2])
249  if tagSnapshotBoostVersion is not None:
250  boost_snapshot_map[str(gt[2])] = tagSnapshotBoostVersion
251  else:
252  continue
253  else:
254  tagSnapshotBoostVersion = boost_snapshot_map[str(gt[2])]
255  if sm.cmp_boost_version( gtBoostVersion, tagSnapshotBoostVersion )<0:
256  logging.error('The snapshot from tag used by gt %s (consuming ver: %s) has an incompatible combined boost version %s' %(gt[0],gtBoostVersion,tagSnapshotBoostVersion))
257  invalid_gts.append( ( gt[0], gtBoostVersion ) )
258  if len(invalid_gts)==0:
259  if ngt>0:
260  logging.info('boost version for the tag validated in %s referencing Gts' %(ngt))
261  else:
262  logging.info('No GT referencing this tag found.')
263  else:
264  logging.error( 'boost version for the tag is invalid.')
265  return invalid_gts
266 
267  def update_tag_boost_version_in_db( self, t, tagBoostVersion, update ):
268  cursor = self.db.cursor()
269  now = datetime.datetime.utcnow()
270  if update:
271  cursor.execute('UPDATE TAG_METADATA SET MIN_SERIALIZATION_V=:BOOST_V, MODIFICATION_TIME=:NOW WHERE TAG_NAME = :NAME',( tagBoostVersion,now,t))
272  else:
273  cursor.execute('INSERT INTO TAG_METADATA ( TAG_NAME, MIN_SERIALIZATION_V, MODIFICATION_TIME ) VALUES ( :NAME, :BOOST_V, :NOW )',(t, tagBoostVersion,now))
274  logging.info('Minimum boost version for the tag updated.')
275 
276  def update_tags( self ):
277  cursor = self.db.cursor()
278  self.version_db = version_db( self.db )
279  self.version_db.fetch_cmssw_boost_map()
280  self.version_db.fetch_boost_run_map()
281  tags = {}
282  wpars = ()
283  if self.args.name is not None:
284  stmt0 = 'SELECT NAME FROM TAG WHERE NAME = :TAG_NAME'
285  wpars = (self.args.name,)
286  cursor.execute(stmt0,wpars);
287  rows = cursor.fetchall()
288  found = False
289  for r in rows:
290  found = True
291  break
292  if not found:
293  raise Exception('Tag %s does not exists in the database.' %self.args.name )
294  tags[self.args.name] = None
295  stmt1 = 'SELECT MIN_SERIALIZATION_V, MODIFICATION_TIME FROM TAG_METADATA WHERE TAG_NAME = :NAME'
296  cursor.execute(stmt1,wpars);
297  rows = cursor.fetchall()
298  for r in rows:
299  tags[self.args.name] = (r[0],r[1])
300  else:
301  #stmt = 'SELECT MAX(INSERTION_TIME) FROM IOV WHERE TAG_NAME= :TAG_NAME'
302  #cursor.execute(stmt)
303  #rows = cursor.fetchall()
304  #lastInsertionTime = None
305  #for r in rows:
306  # lastInsertionTime = r[0]
307  #if lastInsertionTime is None:
308  stmt0 = 'SELECT NAME FROM TAG WHERE NAME NOT IN ( SELECT TAG_NAME FROM TAG_METADATA) ORDER BY NAME'
309  nmax = 100
310  if self.args.max is not None:
311  nmax = self.args.max
312  if self.args.all:
313  nmax = -1
314  if nmax >=0:
315  stmt0 = 'SELECT NAME FROM (SELECT NAME FROM TAG WHERE NAME NOT IN ( SELECT TAG_NAME FROM TAG_METADATA ) ORDER BY NAME) WHERE ROWNUM<= :MAXR'
316  wpars = (nmax,)
317  cursor.execute(stmt0,wpars);
318  rows = cursor.fetchall()
319  for r in rows:
320  tags[r[0]] = None
321  stmt1 = 'SELECT T.NAME NAME, TM.MIN_SERIALIZATION_V MIN_SERIALIZATION_V, TM.MODIFICATION_TIME MODIFICATION_TIME FROM TAG T, TAG_METADATA TM WHERE T.NAME=TM.TAG_NAME AND TM.MODIFICATION_TIME < (SELECT MAX(INSERTION_TIME) FROM IOV WHERE IOV.TAG_NAME=TM.TAG_NAME) ORDER BY NAME'
322  nmax = nmax-len(tags)
323  if nmax >=0:
324  stmt1 = 'SELECT NAME, MIN_SERIALIZATION_V, MODIFICATION_TIME FROM (SELECT T.NAME NAME, TM.MIN_SERIALIZATION_V MIN_SERIALIZATION_V, TM.MODIFICATION_TIME MODIFICATION_TIME FROM TAG T, TAG_METADATA TM WHERE T.NAME=TM.TAG_NAME AND TM.MODIFICATION_TIME < (SELECT MAX(INSERTION_TIME) FROM IOV WHERE IOV.TAG_NAME=TM.TAG_NAME) ORDER BY NAME) WHERE ROWNUM<= :MAXR'
325  wpars = (nmax,)
326  cursor.execute(stmt1,wpars);
327  rows = cursor.fetchall()
328  i = 0
329  for r in rows:
330  i += 1
331  if nmax >=0 and i>nmax:
332  break
333  tags[r[0]] = (r[1],r[2])
334  logging.info( 'Processing boost version for %s tags' %len(tags))
335  count = 0
336  for t in sorted(tags.keys()):
337  count += 1
338  try:
339  update = False
340  cursor.execute('SELECT TIME_TYPE FROM TAG WHERE NAME= :TAG_NAME',(t,))
341  timetype = cursor.fetchone()[0]
342  self.iovs = None
343  logging.info('************************************************************************')
344  logging.info('Tag [%s] %s - timetype: %s' %(count,t,timetype))
345  tagBoostVersion = None
346  timeCut = None
347  if tags[t] is not None:
348  update = True
349  tagBoostVersion = tags[t][0]
350  timeCut = tags[t][1]
351  tagBoostVersion = self.process_tag_boost_version( t, timetype, tagBoostVersion, timeCut )
352  if tagBoostVersion is None:
353  continue
354  logging.debug('boost versions in the %s iovs: %s' %(len(self.iovs),str(self.versionIovs)))
355  invalid_gts = self.validate_boost_version( t, timetype, tagBoostVersion )
356  if len(invalid_gts)>0:
357  with open('invalid_tags_in_gts.txt','a') as error_file:
358  for gt in invalid_gts:
359  error_file.write('Tag %s (boost %s) is invalid for GT %s ( boost %s) \n' %(t,tagBoostVersion,gt[0],gt[1]))
360  self.update_tag_boost_version_in_db( t, tagBoostVersion, update )
361  self.db.commit()
362  except Exception as e:
363  logging.error(str(e))
364 
365  def insert_boost_run( self ):
366  cursor = self.db.cursor()
367  self.version_db = version_db( self.db )
368  self.version_db.insert_boost_run_range( self.args.since, self.args.label )
369  self.db.commit()
370  logging.info('boost version %s inserted with since %s' %(self.args.label,self.args.since))
371 
372  def list_boost_run( self ):
373  cursor = self.db.cursor()
374  self.version_db = version_db( self.db )
375  self.version_db.fetch_boost_run_map()
376  headers = ['Run','Run start time','Boost Version','Insertion time']
377  print_table( headers, self.version_db.boost_run_map )
378 
380  cursor = self.db.cursor()
381  tag = self.args.tag_name
382  cursor.execute('SELECT TIME_TYPE FROM TAG WHERE NAME= :TAG_NAME',(tag,))
383  rows = cursor.fetchall()
384  timeType = None
385  t_modificationTime = None
386  for r in rows:
387  timeType = r[0]
388  if timeType is None:
389  raise Exception("Tag %s does not exist in the database." %tag)
390  cursor.execute('SELECT MAX(INSERTION_TIME) FROM IOV WHERE TAG_NAME= :TAG_NAME',(tag,))
391  rows = cursor.fetchall()
392  for r in rows:
393  t_modificationTime = r[0]
394  if t_modificationTime is None:
395  raise Exception("Tag %s does not have any iov stored." %tag)
396  logging.info('Tag %s - timetype: %s' %(tag,timeType))
397  cursor.execute('SELECT MIN_SERIALIZATION_V, MODIFICATION_TIME FROM TAG_METADATA WHERE TAG_NAME= :TAG_NAME',(tag,))
398  rows = cursor.fetchall()
399  tagBoostVersion = None
400  v_modificationTime = None
401  for r in rows:
402  tagBoostVersion = r[0]
403  v_modificationTime = r[1]
404  if v_modificationTime is not None:
405  if t_modificationTime > v_modificationTime:
406  logging.warning('The minimum boost version stored is out of date.')
407  else:
408  logging.info('The minimum boost version stored is up to date.')
409  mt = '-'
410  if v_modificationTime is not None:
411  mt = str(v_modificationTime)
412  r_tagBoostVersion = None
413  if self.args.rebuild or self.args.full:
414  self.version_db = version_db( self.db )
415  self.version_db.fetch_boost_run_map()
416  timeCut = None
417  logging.info('Calculating minimum boost version for the available iovs...')
418  r_tagBoostVersion = self.process_tag_boost_version( tag, timeType, tagBoostVersion, timeCut )
419  print '# Currently stored: %s' %(tagBoostVersion)
420  print '# Last update: %s' %mt
421  print '# Last update on the iovs: %s' %str(t_modificationTime)
422  if self.args.rebuild or self.args.full:
423  print '# Based on the %s available IOVs: %s' %(len(self.iovs),r_tagBoostVersion)
424  if self.args.full:
425  headers = ['Run','Boost Version']
426  print_table( headers, self.versionIovs )
427 
428 import optparse
429 import argparse
430 
431 def main():
432  tool = conddb_tool()
433  parser = argparse.ArgumentParser(description='CMS conddb command-line tool for serialiation metadata. For general help (manual page), use the help subcommand.')
434  parser.add_argument('--db', type=str, help='The target database: pro ( for prod ) or dev ( for prep ). default=pro')
435  parser.add_argument("--auth","-a", type=str, help="The path of the authentication file")
436  parser.add_argument('--verbose', '-v', action='count', help='The verbosity level')
437  parser_subparsers = parser.add_subparsers(title='Available subcommands')
438  parser_update_tags = parser_subparsers.add_parser('update_tags', description='Update the existing tags headers with the boost version')
439  parser_update_tags.add_argument('--name', '-n', type=str, help='Name of the specific tag to process (default=None - in this case all of the tags will be processed.')
440  parser_update_tags.add_argument('--max', '-m', type=int, help='the maximum number of tags processed',default=100)
441  parser_update_tags.add_argument('--all',action='store_true', help='process all of the tags with boost_version = None')
442  parser_update_tags.set_defaults(func=tool.update_tags,accessType='w')
443  parser_insert_boost_version = parser_subparsers.add_parser('insert_boost_version', description='Insert a new boost version range in the run map')
444  parser_insert_boost_version.add_argument('--label', '-l',type=str, help='The boost version label',required=True)
445  parser_insert_boost_version.add_argument('--since', '-s',type=int, help='The since validity (run number)',required=True)
446  parser_insert_boost_version.set_defaults(func=tool.insert_boost_run,accessType='w')
447  parser_list_boost_versions = parser_subparsers.add_parser('list_boost_versions', description='list the boost versions in the run map')
448  parser_list_boost_versions.set_defaults(func=tool.list_boost_run,accessType='r')
449  parser_show_version = parser_subparsers.add_parser('show_boost_version', description='Display the minimum boost version for the specified tag (the value stored, by default)')
450  parser_show_version.add_argument('tag_name',help='The name of the tag')
451  parser_show_version.add_argument('--rebuild','-r',action='store_true',default=False,help='Re-calculate the minimum boost versio ')
452  parser_show_version.add_argument('--full',action='store_true',default=False,help='Recalulate the minimum boost version, listing the versions in the iov sequence')
453  parser_show_version.set_defaults(func=tool.show_tag_boost_version,accessType='r')
454  args = parser.parse_args()
455  tool.args = args
456  if args.verbose >=1:
457  tool.logger.setLevel(logging.DEBUG)
458  tool.connect()
459  return args.func()
460  else:
461  try:
462  tool.connect()
463  sys.exit( args.func())
464  except Exception as e:
465  logging.error(e)
466  sys.exit(1)
467 
468 if __name__ == '__main__':
469  main()
def print_table(headers, table)
def lookup_boost_in_cmssw(self, cmssw_version)
def insert_cmssw_boost(self, cmssw_version, boost_version)
def update_tag_boost_version_in_db(self, t, tagBoostVersion, update)
def validate_boost_version(self, t, timetype, tagBoostVersion)
def process_tag_boost_version(self, t, timetype, tagBoostVersion, timeCut)
def insert_boost_run_range(self, run, boost_version)
Definition: main.py:1