CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
tagInventory.py
Go to the documentation of this file.
1 import coral
2 import CommonUtils, IdGenerator, Node, DBImpl
4  """Class manages tag inventory
5  """
6  def __init__( self , session ):
7  """Input: coral session handle
8  """
9  self.__session = session
10  self.__tagInventoryTableName=CommonUtils.inventoryTableName()
11  self.__tagInventoryIDName=CommonUtils.inventoryIDTableName()
12  self.__tagInventoryTableColumns = {'tagid':'unsigned long', 'tagname':'string', 'pfn':'string','recordname':'string', 'objectname':'string', 'labelname':'string'}
13  self.__tagInventoryTableNotNullColumns = ['tagname','pfn','recordname','objectname']
14  #self.__tagInventoryTableUniqueColumns = ['tagname']
15  self.__tagInventoryTablePK = ('tagid')
16  def dropme( self ):
17  """Drop inventory related tables
18  """
19  try:
20  transaction=self.__session.transaction()
21  transaction.start(False)
22  schema = self.__session.nominalSchema()
23  schema.dropIfExistsTable( self.__tagInventoryIDName )
24  schema.dropIfExistsTable( self.__tagInventoryTableName )
25  transaction.commit()
26  except Exception, er:
27  transaction.rollback()
28  raise Exception, str(er)
29  return
30 
31  def existInventoryTable( self ):
32  """Check if inventory table exists
33  """
34  try:
35  transaction=self.__session.transaction()
36  transaction.start(True)
37  schema = self.__session.nominalSchema()
38  result=schema.existsTable(self.__tagInventoryTableName)
39  transaction.commit()
40  #print result
41  except Exception, er:
42  transaction.rollback()
43  raise Exception, str(er)
44  return result
45  def createInventoryTable( self ):
46  """Create tag inventory table. Existing table will be deleted.
47  """
48  try:
49  transaction=self.__session.transaction()
50  transaction.start()
51  schema = self.__session.nominalSchema()
52  schema.dropIfExistsTable( self.__tagInventoryTableName )
53  description = coral.TableDescription();
54  description.setName( self.__tagInventoryTableName )
55  for columnName, columnType in self.__tagInventoryTableColumns.items():
56  description.insertColumn(columnName, columnType)
57  for columnName in self.__tagInventoryTableNotNullColumns :
58  description.setNotNullConstraint(columnName,True)
59  #for columnName in self.__tagInventoryTableUniqueColumns :
60  #description.setUniqueConstraint(columnName)
61  #combinedunique1=('pfn','recordname','objectname','labelname')
62  #description.setUniqueConstraint(combinedunique1)
63  #combinedunique2=('tagname','pfn')
64  #description.setUniqueConstraint(combinedunique2)
65  description.setPrimaryKey( self.__tagInventoryTablePK )
66  self.__tagInventoryTableHandle = schema.createTable( description )
67  self.__tagInventoryTableHandle.privilegeManager().grantToPublic( coral.privilege_Select )
68  #create also the associated id table
69  generator=IdGenerator.IdGenerator(schema)
70  generator.createIDTable(self.__tagInventoryIDName,True)
71  transaction.commit()
72  except Exception, er:
73  transaction.rollback()
74  raise Exception, str(er)
75  def addEntry( self, leafNode ):
76  """Add entry into the inventory.\n
77  Input: base tag info. If identical data found already exists, do nothing
78  Output: tagid. if tagid=0, there's no new entry added, i.e.no new tagid
79 
80  """
81  tagid=0
82  transaction=self.__session.transaction()
83  duplicate=False
84  try:
85  transaction.start(True)
86  schema = self.__session.nominalSchema()
87  query = schema.tableHandle(self.__tagInventoryTableName).newQuery()
88  condition='tagname=:tagname'
89  conditionbindDict=coral.AttributeList()
90  conditionbindDict.extend('tagname','string')
91  conditionbindDict['tagname'].setData(leafNode.tagname)
92  if len(leafNode.pfn)!=0:
93  condition+=' AND pfn=:pfn'
94  conditionbindDict.extend('pfn','string')
95  conditionbindDict['pfn'].setData(leafNode.pfn)
96  query.setCondition(condition,conditionbindDict)
97  #duplicate=dbop.existRow(self.__tagInventoryTableName,condition,conditionbindDict)
98  cursor=query.execute()
99  while( cursor.next() ):
100  duplicate=True
101  tagid=cursor.currentRow()['tagid'].data()
102  cursor.close()
103  transaction.commit()
104  del query
105  #transaction.commit()
106  if duplicate is False:
107  transaction.start(False)
108  generator=IdGenerator.IdGenerator(schema)
109  tagid=generator.getNewID(self.__tagInventoryIDName)
110  tabrowValueDict={'tagid':tagid,'tagname':leafNode.tagname,'objectname':leafNode.objectname,'pfn':leafNode.pfn,'labelname':leafNode.labelname,'recordname':leafNode.recordname}
111  dbop=DBImpl.DBImpl(schema)
112  dbop.insertOneRow(self.__tagInventoryTableName,
114  tabrowValueDict)
115  generator.incrementNextID(self.__tagInventoryIDName)
116  transaction.commit()
117  return tagid
118  except Exception, er:
119  transaction.rollback()
120  raise Exception, str(er)
121 
122  def addEntriesReplaceService( self, newservicename ):
123  """ clone all existing entries only servicename in pfn are different
124  return collection of new (oldtagid,newtagid) pair
125  """
126  newtaglinks=[]
127  transaction=self.__session.transaction()
128  try:
129  results=[]
130  transaction.start(True)
131  query = self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).newQuery()
132  for columnName in self.__tagInventoryTableColumns:
133  query.addToOutputList(columnName)
134  cursor=query.execute()
135  while cursor.next():
136  tagid=cursor.currentRow()['tagid'].data()
137  tagname=cursor.currentRow()['tagname'].data()
138  pfn=cursor.currentRow()['pfn'].data()
139  (servicename,schemaname)=pfn.rsplit('/',1)
140  newpfn=('/').join([newservicename,schemaname])
141  #k=(' ').join([tagname,newpfn])
142  objname=cursor.currentRow()['objectname'].data()
143  redname=cursor.currentRow()['recordname'].data()
144  labname=cursor.currentRow()['labelname'].data()
145  #return a tuple
146  r=(tagid,tagname,newpfn,objname,redname,labname)
147  results.append(r)
148  transaction.commit()
149  del query
150  except Exception, er:
151  transaction.rollback()
152  raise Exception, str(er)
153 
154  inv=tagInventory(self.__session)
155  try:
156  for r in results:
157  nd=Node.LeafNode()
158  oldtagid=r[0]
159  nd.tagname=r[1]
160  nd.pfn=r[2]
161  nd.objectname=r[3]
162  nd.recordname=r[4]
163  #if not r.items()[1][2] is None:
164  nd.labelname=r[5]
165  n=inv.addEntry(nd)
166  if n==0:
167  raise "addEntry returns 0"
168  newtaglinks.append((oldtagid,n))
169  return newtaglinks
170  except Exception, e:
171  print str(e)
172  raise Exception, str(e)
173 
174  def modifyEntriesReplaceService( self, newservicename ):
175  """ replace all existing entries replace service name in pfn
176  no change in other parameters
177  """
178  transaction=self.__session.transaction()
179  try:
180  allpfns=[]
181  transaction.start(True)
182  query = self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).newQuery()
183  query.addToOutputList('pfn')
184  cursor=query.execute()
185  while cursor.next():
186  pfn=cursor.currentRow()['pfn'].data()
187  allpfns.append(pfn)
188  transaction.commit()
189  del query
190  except Exception, er:
191  transaction.rollback()
192  del query
193  raise Exception, str(er)
194  try:
195  transaction.start(False)
196  editor = self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).dataEditor()
197  inputData = coral.AttributeList()
198  inputData.extend('newpfn','string')
199  inputData.extend('oldpfn','string')
200  for pfn in allpfns:
201  (servicename,schemaname)=pfn.rsplit('/',1)
202  newpfn=('/').join([newservicename,schemaname])
203  inputData['newpfn'].setData(newpfn)
204  inputData['oldpfn'].setData(pfn)
205  editor.updateRows( "pfn = :newpfn", "pfn = :oldpfn", inputData )
206  transaction.commit()
207  except Exception, e:
208  transaction.rollback()
209  raise Exception, str(e)
210 
211  def cloneEntry( self, sourcetagid, pfn ):
212  """ clone an existing entry with different pfn parameter
213  Input: sourcetagid, pfn.
214  Output: tagid of the new entry. Return 0 in case no new entry created or required entry already exists.
215  """
216  newtagid=sourcetagid
217  if len(pfn)==0:
218  return newtagid
219  try:
220  nd=self.getEntryById(sourcetagid)
221  if nd.tagid==0:
222  return newtagid
223  oldpfn=nd.pfn
224  if oldpfn==pfn:
225  return nd.tagid
226  transaction=self.__session.transaction()
227  transaction.start(False)
228  schema = self.__session.nominalSchema()
229  generator=IdGenerator.IdGenerator(schema)
230  newtagid=generator.getNewID(self.__tagInventoryIDName)
231  tabrowValueDict={'tagid':newtagid,'tagname':nd.tagname,'objectname':nd.objectname,'pfn':pfn,'labelname':nd.labelname,'recordname':nd.recordname}
232  dbop=DBImpl.DBImpl(schema)
233  dbop.insertOneRow(self.__tagInventoryTableName,
235  tabrowValueDict)
236  generator.incrementNextID(self.__tagInventoryIDName)
237  transaction.commit()
238  return newtagid
239  except Exception, er:
240  transaction.rollback()
241  raise Exception, str(er)
242 
243  def getEntryByName( self, tagName, pfn ):
244  """Get basic tag from inventory by tagName+pfn. pfn can be empty\n
245  Input: tagname,pfn
246  Output: leafNode
247  throw if more than one entry is found.
248  """
249  leafnode = Node.LeafNode()
250  transaction=self.__session.transaction()
251  try:
252  transaction.start(True)
253  query = self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).newQuery()
254  for columnName in self.__tagInventoryTableColumns:
255  query.addToOutputList(columnName)
256  conditionData = coral.AttributeList()
257  condition = "tagname=:tagname"
258  conditionData.extend( 'tagname','string' )
259  conditionData['tagname'].setData(tagName)
260  if len(pfn)!=0 :
261  condition += " AND pfn=:pfn"
262  conditionData.extend( 'pfn','string' )
263  conditionData['pfn'].setData(pfn)
264  query.setCondition(condition,conditionData)
265  cursor = query.execute()
266  counter=0
267  while ( cursor.next() ):
268  if counter > 0 :
269  raise ValueError, "tagName "+tagName+" is not unique, please further specify parameter pfn"
270  counter+=1
271  leafnode.tagid=cursor.currentRow()['tagid'].data()
272  leafnode.tagname=cursor.currentRow()['tagname'].data()
273  leafnode.objectname=cursor.currentRow()['objectname'].data()
274  leafnode.pfn=cursor.currentRow()['pfn'].data()
275  leafnode.labelname=cursor.currentRow()['labelname'].data()
276  leafnode.recordname=cursor.currentRow()['recordname'].data()
277  transaction.commit()
278  del query
279  return leafnode
280  except Exception, e:
281  transaction.rollback()
282  raise Exception, str(e)
283  def getEntryById( self, tagId ):
284  """Get basic tag from inventory by id.\n
285  Input: tagid
286  Output: leafNode
287  """
288  leafnode = Node.LeafNode()
289  transaction=self.__session.transaction()
290  try:
291  transaction.start(True)
292  query = self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).newQuery()
293  for columnName in self.__tagInventoryTableColumns:
294  query.addToOutputList(columnName)
295  condition = "tagid=:tagid"
296  conditionData = coral.AttributeList()
297  conditionData.extend( 'tagid','unsigned long' )
298  conditionData['tagid'].setData(tagId)
299  query.setCondition( condition, conditionData)
300  cursor = query.execute()
301  while ( cursor.next() ):
302  #print 'got it'
303  leafnode.tagid=cursor.currentRow()['tagid'].data()
304  leafnode.tagname=cursor.currentRow()['tagname'].data()
305  leafnode.objectname=cursor.currentRow()['objectname'].data()
306  leafnode.pfn=cursor.currentRow()['pfn'].data()
307  leafnode.labelname=cursor.currentRow()['labelname'].data()
308  leafnode.recordname=cursor.currentRow()['recordname'].data()
309  transaction.commit()
310  del query
311  return leafnode
312  except Exception, e:
313  transaction.rollback()
314  raise Exception, str(e)
315  def getAllEntries( self ):
316  """Get all entries in the inventory
317  Output: list of leafNode objects
318  """
319  result=[]
320  transaction=self.__session.transaction()
321  try:
322  transaction.start(True)
323  query = self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).newQuery()
324  for columnName in self.__tagInventoryTableColumns:
325  query.addToOutputList(columnName)
326  cursor = query.execute()
327  while ( cursor.next() ):
328  leafnode = Node.LeafNode()
329  leafnode.tagid=cursor.currentRow()['tagid'].data()
330  leafnode.tagname=cursor.currentRow()['tagname'].data()
331  leafnode.objectname=cursor.currentRow()['objectname'].data()
332  leafnode.pfn=cursor.currentRow()['pfn'].data()
333  leafnode.recordname=cursor.currentRow()['recordname'].data()
334  leafnode.labelname=cursor.currentRow()['labelname'].data()
335  result.append(leafnode)
336  transaction.commit()
337  del query
338  return result
339  except Exception, e:
340  transaction.rollback()
341  raise Exception, str(e)
342  def getIDsByName( self, name ):
343  """get tagids correspond to a given tag name
344  """
345  transaction=self.__session.transaction()
346  ids=[]
347  try:
348  transaction.start(True)
349  query = self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).newQuery()
350  condition='tagname = :tagname'
351  conditionBindData=coral.AttributeList()
352  conditionBindData.extend('tagname','string')
353  conditionBindData['tagname'].setData(name)
354  query.addToOutputList(tagid)
355  query.setCondition(condition,conditionBindData)
356  cursor = query.execute()
357  while ( cursor.next() ):
358  tagid=cursor.currentRow()['tagid'].data()
359  ids.append(tagid)
360  transaction.commit()
361  except Exception, e:
362  transaction.rollback()
363  raise Exception, str(e)
364  return ids
365  def deleteAllEntries( self ):
366  """Delete all entries in the inventory
367  """
368  transaction=self.__session.transaction()
369  try:
370  transaction.start(False)
371  schema = self.__session.nominalSchema()
372  dbop=DBImpl.DBImpl(schema)
373  inputData = coral.AttributeList()
374  dbop.deleteRows(self.__tagInventoryTableName,
375  '',
376  inputData)
377  transaction.commit()
378  except Exception, e:
379  transaction.rollback()
380  raise Exception, str(e)
381 
382  def deleteEntryByName( self, tagname ):
383  """Delete entry with given tag name
384  """
385  try:
386  transaction=self.__session.transaction()
387  transaction.start(False)
388  schema = self.__session.nominalSchema()
389  dbop=DBImpl.DBImpl(schema)
390  inputData = coral.AttributeList()
391  inputData.extend( "tagname","string" )
392  inputData[0].setData(tagname)
393  dbop.deleteRows(self.__tagInventoryTableName,
394  'tagname=:tagname',
395  inputData)
396  transaction.commit()
397  except Exception, e:
398  transaction.rollback()
399  raise Exception, str(e)
400 
401  def replaceTagLabel( self, tagname, label ):
402  """Replace the run time label of the given tag
403  """
404  try:
405  transaction=self.__session.transaction()
406  transaction.start(False)
407  schema = self.__session.nominalSchema()
408  inputData = coral.AttributeList()
409  inputData.extend( "labelname","string" )
410  inputData.extend( "tagname", "string" )
411  inputData[0].setData(label)
412  inputData[1].setData(tagname)
413  editor = schema.tableHandle(self.__tagInventoryTableName).dataEditor()
414  editor.updateRows( "labelname=:labelname", "tagname=:tagname", inputData )
415  transaction.commit()
416  except Exception, e:
417  transaction.rollback()
418  raise Exception, str(e)
419 
420  def bulkInsertEntries( self, entries ):
421  """insert a chunk of entries.
422  Input: entries [{tagid:unsigned long, tagname:string , pfn:string , recordname:string , objectname:string, labelname:string }]
423  Output: {oldtagid:newtagid} of the inserted entries. If tag already exists, old tagid is returned
424  """
425  transaction=self.__session.transaction()
426  results={}
427  ihad=[]
428  try:
429  if self.existInventoryTable():
430  ihad=self.getAllEntries()
431  else:
432  self.createInventoryTable()
433  #clean input list removing duplicates
434  for e in entries:
435  for n in ihad:
436  if n.tagname==e['tagname'] and n.pfn==e['pfn']:
437  results[n.tagid]=n.tagid
438  entries.remove(e)
439  transaction.start(False)
440  query=self.__session.nominalSchema().tableHandle(self.__tagInventoryIDName).newQuery()
441  query.addToOutputList('nextID')
442  query.setForUpdate()
443  cursor = query.execute()
444  nextid=0
445  while cursor.next():
446  nextid=cursor.currentRow()[0].data()
447  idEditor = self.__session.nominalSchema().tableHandle(self.__tagInventoryIDName).dataEditor()
448  inputData = coral.AttributeList()
449  inputData.extend( 'delta', 'unsigned long' )
450 
451  delta=len(entries)
452  if nextid==0:
453  nextid=1
454  delta=1
455 
456  inputData['delta'].setData(delta)
457  idEditor.updateRows('nextID = nextID + :delta','',inputData)
458 
459  dataEditor=self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).dataEditor()
460  insertdata=coral.AttributeList()
461  insertdata.extend('tagid','unsigned long')
462  insertdata.extend('tagname','string')
463  insertdata.extend('pfn','string')
464  insertdata.extend('recordname','string')
465  insertdata.extend('objectname','string')
466  insertdata.extend('labelname','string')
467  bulkOperation=dataEditor.bulkInsert(insertdata,delta)
468  for entry in entries:
469  insertdata['tagid'].setData(nextid)
470  insertdata['tagname'].setData(entry['tagname'])
471  insertdata['pfn'].setData(entry['pfn'])
472  insertdata['recordname'].setData(entry['recordname'])
473  insertdata['objectname'].setData(entry['objectname'])
474  insertdata['labelname'].setData(entry['labelname'])
475  bulkOperation.processNextIteration()
476  results[entry['tagid']]=nextid
477  nextid=nextid+1
478  bulkOperation.flush()
479  transaction.commit()
480  del bulkOperation
481  del query
482  return results
483  except Exception, e:
484  transaction.rollback()
485  raise Exception, str(e)
486 
487 if __name__ == "__main__":
488  #context = coral.Context()
489  #context.setVerbosityLevel( 'ERROR' )
490  svc = coral.ConnectionService()
491  session = svc.connect( 'sqlite_file:testInventory.db',
492  accessMode = coral.access_Update )
493  try:
494  inv=tagInventory(session)
495  inv.createInventoryTable()
496  tagentry=Node.LeafNode()
497  tagentry.tagname='ecalpedestalsfromonline'
498  tagentry.objectname='EcalPedestals'
499  tagentry.pfn='oracle://devdb10/CMS_COND_ECAL'
500  tagentry.recordname='EcalPedestalsRcd'
501  tagentry.labelname=''
502  inv.addEntry(tagentry)
503  tagentry.tagname='crap'
504  tagentry.objectname='MyPedestals'
505  tagentry.pfn='oracle://devdb10/CMS_COND_ME'
506  tagentry.recordname='MyPedestalsRcd'
507  tagentry.labelname='mylabel'
508  inv.addEntry(tagentry)
509  result=inv.getAllEntries()
510  print 'get all##\t',result
511  result=inv.getEntryByName('ecalpedestalsfromonline','oracle://devdb10/CMS_COND_ECAL')
512  print 'get ecalpedestalsfromonline##\t',result
513  result=inv.getEntryByName('crap','oracle://devdb10/CMS_COND_ME')
514  print 'get crap##\t',result
515  result=inv.getEntryById(0)
516  print 'get by id 0##\t',result
517  inv.deleteEntryByName('ecalpedestalsfromonline')
518  print 'TESTING getEntryByName'
519  result=inv.getEntryByName('ecalpedestalsfromonline','oracle://devdb10/CMS_COND_ECAL')
520  print 'get ecalpedestalsfromonline##\t',result
521  result=inv.getEntryByName('crap','oracle://devdb10/CMS_COND_ME')
522  print 'get crap##\t',result
523  print 'TESTING cloneEntry'
524  newid=inv.cloneEntry(result.tagid,'fontier://crap/crap')
525  print 'newid ',newid
526  print 'TESTING addEntriesReplaceService'
527  newtagids=inv.addEntriesReplaceService('oracle://cms_orcoff_int')
528  print 'new tag ids ',newtagids
529  print 'TESTING modifyEntriesReplaceService'
530  inv.modifyEntriesReplaceService('oracle://cms_orcoff_int9r')
531  print 'TESTING bulkInsertEntries'
532  entries=[]
533  entries.append({'tagid':10,'tagname':'tag1','pfn':'dbdb','recordname':'myrcd','objectname':'bobo','labelname':''})
534  entries.append({'tagid':11,'tagname':'tag2','pfn':'dbdb','recordname':'mdrcd','objectname':'bobo','labelname':''})
535  entries.append({'tagid':12,'tagname':'tag3','pfn':'dbdb','recordname':'ndrcd','objectname':'bobo','labelname':''})
536  entries.append({'tagid':13,'tagname':'tag4','pfn':'dbdb','recordname':'ndrcd','objectname':'bobo','labelname':''})
537  a=inv.bulkInsertEntries(entries)
538  print a
539  del session
540 
541  except Exception, e:
542  print "Failed in unit test"
543  print str(e)
544  del session
545 
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
list object
Definition: dbtoconf.py:77
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82