00001 import coral
00002 import CommonUtils, IdGenerator, Node, DBImpl
00003 class tagInventory(object):
00004 """Class manages tag inventory
00005 """
00006 def __init__( self , session ):
00007 """Input: coral session handle
00008 """
00009 self.__session = session
00010 self.__tagInventoryTableName=CommonUtils.inventoryTableName()
00011 self.__tagInventoryIDName=CommonUtils.inventoryIDTableName()
00012 self.__tagInventoryTableColumns = {'tagid':'unsigned long', 'tagname':'string', 'pfn':'string','recordname':'string', 'objectname':'string', 'labelname':'string'}
00013 self.__tagInventoryTableNotNullColumns = ['tagname','pfn','recordname','objectname']
00014
00015 self.__tagInventoryTablePK = ('tagid')
00016 def dropme( self ):
00017 """Drop inventory related tables
00018 """
00019 try:
00020 transaction=self.__session.transaction()
00021 transaction.start(False)
00022 schema = self.__session.nominalSchema()
00023 schema.dropIfExistsTable( self.__tagInventoryIDName )
00024 schema.dropIfExistsTable( self.__tagInventoryTableName )
00025 transaction.commit()
00026 except Exception, er:
00027 transaction.rollback()
00028 raise Exception, str(er)
00029 return
00030
00031 def existInventoryTable( self ):
00032 """Check if inventory table exists
00033 """
00034 try:
00035 transaction=self.__session.transaction()
00036 transaction.start(True)
00037 schema = self.__session.nominalSchema()
00038 result=schema.existsTable(self.__tagInventoryTableName)
00039 transaction.commit()
00040
00041 except Exception, er:
00042 transaction.rollback()
00043 raise Exception, str(er)
00044 return result
00045 def createInventoryTable( self ):
00046 """Create tag inventory table. Existing table will be deleted.
00047 """
00048 try:
00049 transaction=self.__session.transaction()
00050 transaction.start()
00051 schema = self.__session.nominalSchema()
00052 schema.dropIfExistsTable( self.__tagInventoryTableName )
00053 description = coral.TableDescription();
00054 description.setName( self.__tagInventoryTableName )
00055 for columnName, columnType in self.__tagInventoryTableColumns.items():
00056 description.insertColumn(columnName, columnType)
00057 for columnName in self.__tagInventoryTableNotNullColumns :
00058 description.setNotNullConstraint(columnName,True)
00059
00060
00061
00062
00063
00064
00065 description.setPrimaryKey( self.__tagInventoryTablePK )
00066 self.__tagInventoryTableHandle = schema.createTable( description )
00067 self.__tagInventoryTableHandle.privilegeManager().grantToPublic( coral.privilege_Select )
00068
00069 generator=IdGenerator.IdGenerator(schema)
00070 generator.createIDTable(self.__tagInventoryIDName,True)
00071 transaction.commit()
00072 except Exception, er:
00073 transaction.rollback()
00074 raise Exception, str(er)
00075 def addEntry( self, leafNode ):
00076 """Add entry into the inventory.\n
00077 Input: base tag info. If identical data found already exists, do nothing
00078 Output: tagid. if tagid=0, there's no new entry added, i.e.no new tagid
00079
00080 """
00081 tagid=0
00082 transaction=self.__session.transaction()
00083 duplicate=False
00084 try:
00085 transaction.start(True)
00086 schema = self.__session.nominalSchema()
00087 query = schema.tableHandle(self.__tagInventoryTableName).newQuery()
00088 condition='tagname=:tagname'
00089 conditionbindDict=coral.AttributeList()
00090 conditionbindDict.extend('tagname','string')
00091 conditionbindDict['tagname'].setData(leafNode.tagname)
00092 if len(leafNode.pfn)!=0:
00093 condition+=' AND pfn=:pfn'
00094 conditionbindDict.extend('pfn','string')
00095 conditionbindDict['pfn'].setData(leafNode.pfn)
00096 query.setCondition(condition,conditionbindDict)
00097
00098 cursor=query.execute()
00099 while( cursor.next() ):
00100 duplicate=True
00101 tagid=cursor.currentRow()['tagid'].data()
00102 cursor.close()
00103 transaction.commit()
00104 del query
00105
00106 if duplicate is False:
00107 transaction.start(False)
00108 generator=IdGenerator.IdGenerator(schema)
00109 tagid=generator.getNewID(self.__tagInventoryIDName)
00110 tabrowValueDict={'tagid':tagid,'tagname':leafNode.tagname,'objectname':leafNode.objectname,'pfn':leafNode.pfn,'labelname':leafNode.labelname,'recordname':leafNode.recordname}
00111 dbop=DBImpl.DBImpl(schema)
00112 dbop.insertOneRow(self.__tagInventoryTableName,
00113 self.__tagInventoryTableColumns,
00114 tabrowValueDict)
00115 generator.incrementNextID(self.__tagInventoryIDName)
00116 transaction.commit()
00117 return tagid
00118 except Exception, er:
00119 transaction.rollback()
00120 raise Exception, str(er)
00121
00122 def addEntriesReplaceService( self, newservicename ):
00123 """ clone all existing entries only servicename in pfn are different
00124 return collection of new (oldtagid,newtagid) pair
00125 """
00126 newtaglinks=[]
00127 transaction=self.__session.transaction()
00128 try:
00129 results=[]
00130 transaction.start(True)
00131 query = self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).newQuery()
00132 for columnName in self.__tagInventoryTableColumns:
00133 query.addToOutputList(columnName)
00134 cursor=query.execute()
00135 while cursor.next():
00136 tagid=cursor.currentRow()['tagid'].data()
00137 tagname=cursor.currentRow()['tagname'].data()
00138 pfn=cursor.currentRow()['pfn'].data()
00139 (servicename,schemaname)=pfn.rsplit('/',1)
00140 newpfn=('/').join([newservicename,schemaname])
00141
00142 objname=cursor.currentRow()['objectname'].data()
00143 redname=cursor.currentRow()['recordname'].data()
00144 labname=cursor.currentRow()['labelname'].data()
00145
00146 r=(tagid,tagname,newpfn,objname,redname,labname)
00147 results.append(r)
00148 transaction.commit()
00149 del query
00150 except Exception, er:
00151 transaction.rollback()
00152 raise Exception, str(er)
00153
00154 inv=tagInventory(self.__session)
00155 try:
00156 for r in results:
00157 nd=Node.LeafNode()
00158 oldtagid=r[0]
00159 nd.tagname=r[1]
00160 nd.pfn=r[2]
00161 nd.objectname=r[3]
00162 nd.recordname=r[4]
00163
00164 nd.labelname=r[5]
00165 n=inv.addEntry(nd)
00166 if n==0:
00167 raise "addEntry returns 0"
00168 newtaglinks.append((oldtagid,n))
00169 return newtaglinks
00170 except Exception, e:
00171 print str(e)
00172 raise Exception, str(e)
00173
00174 def modifyEntriesReplaceService( self, newservicename ):
00175 """ replace all existing entries replace service name in pfn
00176 no change in other parameters
00177 """
00178 transaction=self.__session.transaction()
00179 try:
00180 allpfns=[]
00181 transaction.start(True)
00182 query = self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).newQuery()
00183 query.addToOutputList('pfn')
00184 cursor=query.execute()
00185 while cursor.next():
00186 pfn=cursor.currentRow()['pfn'].data()
00187 allpfns.append(pfn)
00188 transaction.commit()
00189 del query
00190 except Exception, er:
00191 transaction.rollback()
00192 del query
00193 raise Exception, str(er)
00194 try:
00195 transaction.start(False)
00196 editor = self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).dataEditor()
00197 inputData = coral.AttributeList()
00198 inputData.extend('newpfn','string')
00199 inputData.extend('oldpfn','string')
00200 for pfn in allpfns:
00201 (servicename,schemaname)=pfn.rsplit('/',1)
00202 newpfn=('/').join([newservicename,schemaname])
00203 inputData['newpfn'].setData(newpfn)
00204 inputData['oldpfn'].setData(pfn)
00205 editor.updateRows( "pfn = :newpfn", "pfn = :oldpfn", inputData )
00206 transaction.commit()
00207 except Exception, e:
00208 transaction.rollback()
00209 raise Exception, str(e)
00210
00211 def cloneEntry( self, sourcetagid, pfn ):
00212 """ clone an existing entry with different pfn parameter
00213 Input: sourcetagid, pfn.
00214 Output: tagid of the new entry. Return 0 in case no new entry created or required entry already exists.
00215 """
00216 newtagid=sourcetagid
00217 if len(pfn)==0:
00218 return newtagid
00219 try:
00220 nd=self.getEntryById(sourcetagid)
00221 if nd.tagid==0:
00222 return newtagid
00223 oldpfn=nd.pfn
00224 if oldpfn==pfn:
00225 return nd.tagid
00226 transaction=self.__session.transaction()
00227 transaction.start(False)
00228 schema = self.__session.nominalSchema()
00229 generator=IdGenerator.IdGenerator(schema)
00230 newtagid=generator.getNewID(self.__tagInventoryIDName)
00231 tabrowValueDict={'tagid':newtagid,'tagname':nd.tagname,'objectname':nd.objectname,'pfn':pfn,'labelname':nd.labelname,'recordname':nd.recordname}
00232 dbop=DBImpl.DBImpl(schema)
00233 dbop.insertOneRow(self.__tagInventoryTableName,
00234 self.__tagInventoryTableColumns,
00235 tabrowValueDict)
00236 generator.incrementNextID(self.__tagInventoryIDName)
00237 transaction.commit()
00238 return newtagid
00239 except Exception, er:
00240 transaction.rollback()
00241 raise Exception, str(er)
00242
00243 def getEntryByName( self, tagName, pfn ):
00244 """Get basic tag from inventory by tagName+pfn. pfn can be empty\n
00245 Input: tagname,pfn
00246 Output: leafNode
00247 throw if more than one entry is found.
00248 """
00249 leafnode = Node.LeafNode()
00250 transaction=self.__session.transaction()
00251 try:
00252 transaction.start(True)
00253 query = self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).newQuery()
00254 for columnName in self.__tagInventoryTableColumns:
00255 query.addToOutputList(columnName)
00256 conditionData = coral.AttributeList()
00257 condition = "tagname=:tagname"
00258 conditionData.extend( 'tagname','string' )
00259 conditionData['tagname'].setData(tagName)
00260 if len(pfn)!=0 :
00261 condition += " AND pfn=:pfn"
00262 conditionData.extend( 'pfn','string' )
00263 conditionData['pfn'].setData(pfn)
00264 query.setCondition(condition,conditionData)
00265 cursor = query.execute()
00266 counter=0
00267 while ( cursor.next() ):
00268 if counter > 0 :
00269 raise ValueError, "tagName "+tagName+" is not unique, please further specify parameter pfn"
00270 counter+=1
00271 leafnode.tagid=cursor.currentRow()['tagid'].data()
00272 leafnode.tagname=cursor.currentRow()['tagname'].data()
00273 leafnode.objectname=cursor.currentRow()['objectname'].data()
00274 leafnode.pfn=cursor.currentRow()['pfn'].data()
00275 leafnode.labelname=cursor.currentRow()['labelname'].data()
00276 leafnode.recordname=cursor.currentRow()['recordname'].data()
00277 transaction.commit()
00278 del query
00279 return leafnode
00280 except Exception, e:
00281 transaction.rollback()
00282 raise Exception, str(e)
00283 def getEntryById( self, tagId ):
00284 """Get basic tag from inventory by id.\n
00285 Input: tagid
00286 Output: leafNode
00287 """
00288 leafnode = Node.LeafNode()
00289 transaction=self.__session.transaction()
00290 try:
00291 transaction.start(True)
00292 query = self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).newQuery()
00293 for columnName in self.__tagInventoryTableColumns:
00294 query.addToOutputList(columnName)
00295 condition = "tagid=:tagid"
00296 conditionData = coral.AttributeList()
00297 conditionData.extend( 'tagid','unsigned long' )
00298 conditionData['tagid'].setData(tagId)
00299 query.setCondition( condition, conditionData)
00300 cursor = query.execute()
00301 while ( cursor.next() ):
00302
00303 leafnode.tagid=cursor.currentRow()['tagid'].data()
00304 leafnode.tagname=cursor.currentRow()['tagname'].data()
00305 leafnode.objectname=cursor.currentRow()['objectname'].data()
00306 leafnode.pfn=cursor.currentRow()['pfn'].data()
00307 leafnode.labelname=cursor.currentRow()['labelname'].data()
00308 leafnode.recordname=cursor.currentRow()['recordname'].data()
00309 transaction.commit()
00310 del query
00311 return leafnode
00312 except Exception, e:
00313 transaction.rollback()
00314 raise Exception, str(e)
00315 def getAllEntries( self ):
00316 """Get all entries in the inventory
00317 Output: list of leafNode objects
00318 """
00319 result=[]
00320 transaction=self.__session.transaction()
00321 try:
00322 transaction.start(True)
00323 query = self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).newQuery()
00324 for columnName in self.__tagInventoryTableColumns:
00325 query.addToOutputList(columnName)
00326 cursor = query.execute()
00327 while ( cursor.next() ):
00328 leafnode = Node.LeafNode()
00329 leafnode.tagid=cursor.currentRow()['tagid'].data()
00330 leafnode.tagname=cursor.currentRow()['tagname'].data()
00331 leafnode.objectname=cursor.currentRow()['objectname'].data()
00332 leafnode.pfn=cursor.currentRow()['pfn'].data()
00333 leafnode.recordname=cursor.currentRow()['recordname'].data()
00334 leafnode.labelname=cursor.currentRow()['labelname'].data()
00335 result.append(leafnode)
00336 transaction.commit()
00337 del query
00338 return result
00339 except Exception, e:
00340 transaction.rollback()
00341 raise Exception, str(e)
00342 def getIDsByName( self, name ):
00343 """get tagids correspond to a given tag name
00344 """
00345 transaction=self.__session.transaction()
00346 ids=[]
00347 try:
00348 transaction.start(True)
00349 query = self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).newQuery()
00350 condition='tagname = :tagname'
00351 conditionBindData=coral.AttributeList()
00352 conditionBindData.extend('tagname','string')
00353 conditionBindData['tagname'].setData(name)
00354 query.addToOutputList(tagid)
00355 query.setCondition(condition,conditionBindData)
00356 cursor = query.execute()
00357 while ( cursor.next() ):
00358 tagid=cursor.currentRow()['tagid'].data()
00359 ids.append(tagid)
00360 transaction.commit()
00361 except Exception, e:
00362 transaction.rollback()
00363 raise Exception, str(e)
00364 return ids
00365 def deleteAllEntries( self ):
00366 """Delete all entries in the inventory
00367 """
00368 transaction=self.__session.transaction()
00369 try:
00370 transaction.start(False)
00371 schema = self.__session.nominalSchema()
00372 dbop=DBImpl.DBImpl(schema)
00373 inputData = coral.AttributeList()
00374 dbop.deleteRows(self.__tagInventoryTableName,
00375 '',
00376 inputData)
00377 transaction.commit()
00378 except Exception, e:
00379 transaction.rollback()
00380 raise Exception, str(e)
00381
00382 def deleteEntryByName( self, tagname ):
00383 """Delete entry with given tag name
00384 """
00385 try:
00386 transaction=self.__session.transaction()
00387 transaction.start(False)
00388 schema = self.__session.nominalSchema()
00389 dbop=DBImpl.DBImpl(schema)
00390 inputData = coral.AttributeList()
00391 inputData.extend( "tagname","string" )
00392 inputData[0].setData(tagname)
00393 dbop.deleteRows(self.__tagInventoryTableName,
00394 'tagname=:tagname',
00395 inputData)
00396 transaction.commit()
00397 except Exception, e:
00398 transaction.rollback()
00399 raise Exception, str(e)
00400
00401 def replaceTagLabel( self, tagname, label ):
00402 """Replace the run time label of the given tag
00403 """
00404 try:
00405 transaction=self.__session.transaction()
00406 transaction.start(False)
00407 schema = self.__session.nominalSchema()
00408 inputData = coral.AttributeList()
00409 inputData.extend( "labelname","string" )
00410 inputData.extend( "tagname", "string" )
00411 inputData[0].setData(label)
00412 inputData[1].setData(tagname)
00413 editor = schema.tableHandle(self.__tagInventoryTableName).dataEditor()
00414 editor.updateRows( "labelname=:labelname", "tagname=:tagname", inputData )
00415 transaction.commit()
00416 except Exception, e:
00417 transaction.rollback()
00418 raise Exception, str(e)
00419
00420 def bulkInsertEntries( self, entries ):
00421 """insert a chunk of entries.
00422 Input: entries [{tagid:unsigned long, tagname:string , pfn:string , recordname:string , objectname:string, labelname:string }]
00423 Output: {oldtagid:newtagid} of the inserted entries. If tag already exists, old tagid is returned
00424 """
00425 transaction=self.__session.transaction()
00426 results={}
00427 ihad=[]
00428 try:
00429 if self.existInventoryTable():
00430 ihad=self.getAllEntries()
00431 else:
00432 self.createInventoryTable()
00433
00434 for e in entries:
00435 for n in ihad:
00436 if n.tagname==e['tagname'] and n.pfn==e['pfn']:
00437 results[n.tagid]=n.tagid
00438 entries.remove(e)
00439 transaction.start(False)
00440 query=self.__session.nominalSchema().tableHandle(self.__tagInventoryIDName).newQuery()
00441 query.addToOutputList('nextID')
00442 query.setForUpdate()
00443 cursor = query.execute()
00444 nextid=0
00445 while cursor.next():
00446 nextid=cursor.currentRow()[0].data()
00447 idEditor = self.__session.nominalSchema().tableHandle(self.__tagInventoryIDName).dataEditor()
00448 inputData = coral.AttributeList()
00449 inputData.extend( 'delta', 'unsigned long' )
00450
00451 delta=len(entries)
00452 if nextid==0:
00453 nextid=1
00454 delta=1
00455
00456 inputData['delta'].setData(delta)
00457 idEditor.updateRows('nextID = nextID + :delta','',inputData)
00458
00459 dataEditor=self.__session.nominalSchema().tableHandle(self.__tagInventoryTableName).dataEditor()
00460 insertdata=coral.AttributeList()
00461 insertdata.extend('tagid','unsigned long')
00462 insertdata.extend('tagname','string')
00463 insertdata.extend('pfn','string')
00464 insertdata.extend('recordname','string')
00465 insertdata.extend('objectname','string')
00466 insertdata.extend('labelname','string')
00467 bulkOperation=dataEditor.bulkInsert(insertdata,delta)
00468 for entry in entries:
00469 insertdata['tagid'].setData(nextid)
00470 insertdata['tagname'].setData(entry['tagname'])
00471 insertdata['pfn'].setData(entry['pfn'])
00472 insertdata['recordname'].setData(entry['recordname'])
00473 insertdata['objectname'].setData(entry['objectname'])
00474 insertdata['labelname'].setData(entry['labelname'])
00475 bulkOperation.processNextIteration()
00476 results[entry['tagid']]=nextid
00477 nextid=nextid+1
00478 bulkOperation.flush()
00479 transaction.commit()
00480 del bulkOperation
00481 del query
00482 return results
00483 except Exception, e:
00484 transaction.rollback()
00485 raise Exception, str(e)
00486
00487 if __name__ == "__main__":
00488
00489
00490 svc = coral.ConnectionService()
00491 session = svc.connect( 'sqlite_file:testInventory.db',
00492 accessMode = coral.access_Update )
00493 try:
00494 inv=tagInventory(session)
00495 inv.createInventoryTable()
00496 tagentry=Node.LeafNode()
00497 tagentry.tagname='ecalpedestalsfromonline'
00498 tagentry.objectname='EcalPedestals'
00499 tagentry.pfn='oracle://devdb10/CMS_COND_ECAL'
00500 tagentry.recordname='EcalPedestalsRcd'
00501 tagentry.labelname=''
00502 inv.addEntry(tagentry)
00503 tagentry.tagname='crap'
00504 tagentry.objectname='MyPedestals'
00505 tagentry.pfn='oracle://devdb10/CMS_COND_ME'
00506 tagentry.recordname='MyPedestalsRcd'
00507 tagentry.labelname='mylabel'
00508 inv.addEntry(tagentry)
00509 result=inv.getAllEntries()
00510 print 'get all##\t',result
00511 result=inv.getEntryByName('ecalpedestalsfromonline','oracle://devdb10/CMS_COND_ECAL')
00512 print 'get ecalpedestalsfromonline##\t',result
00513 result=inv.getEntryByName('crap','oracle://devdb10/CMS_COND_ME')
00514 print 'get crap##\t',result
00515 result=inv.getEntryById(0)
00516 print 'get by id 0##\t',result
00517 inv.deleteEntryByName('ecalpedestalsfromonline')
00518 print 'TESTING getEntryByName'
00519 result=inv.getEntryByName('ecalpedestalsfromonline','oracle://devdb10/CMS_COND_ECAL')
00520 print 'get ecalpedestalsfromonline##\t',result
00521 result=inv.getEntryByName('crap','oracle://devdb10/CMS_COND_ME')
00522 print 'get crap##\t',result
00523 print 'TESTING cloneEntry'
00524 newid=inv.cloneEntry(result.tagid,'fontier://crap/crap')
00525 print 'newid ',newid
00526 print 'TESTING addEntriesReplaceService'
00527 newtagids=inv.addEntriesReplaceService('oracle://cms_orcoff_int')
00528 print 'new tag ids ',newtagids
00529 print 'TESTING modifyEntriesReplaceService'
00530 inv.modifyEntriesReplaceService('oracle://cms_orcoff_int9r')
00531 print 'TESTING bulkInsertEntries'
00532 entries=[]
00533 entries.append({'tagid':10,'tagname':'tag1','pfn':'dbdb','recordname':'myrcd','objectname':'bobo','labelname':''})
00534 entries.append({'tagid':11,'tagname':'tag2','pfn':'dbdb','recordname':'mdrcd','objectname':'bobo','labelname':''})
00535 entries.append({'tagid':12,'tagname':'tag3','pfn':'dbdb','recordname':'ndrcd','objectname':'bobo','labelname':''})
00536 entries.append({'tagid':13,'tagname':'tag4','pfn':'dbdb','recordname':'ndrcd','objectname':'bobo','labelname':''})
00537 a=inv.bulkInsertEntries(entries)
00538 print a
00539 del session
00540
00541 except Exception, e:
00542 print "Failed in unit test"
00543 print str(e)
00544 del session
00545