00001 import coral
00002 import CommonUtils, TagTree, tagInventory
00003
00004 class DBCopy(object):
00005
00006 def __init__( self, sourcesession, destsession, rowcachesize=1024 ):
00007 self.__sourcesession=sourcesession
00008 self.__destsession=destsession
00009 self.__rowcachesize=rowcachesize
00010
00011 def resetrowcachesize( self, newrowcachesize):
00012 self.__rowcachesize=newrowcachesize
00013
00014 def copyInventory( self ):
00015 """copy entire inventory. The original inventory in the source db will be wiped.
00016 """
00017 inv=tagInventory.tagInventory(self.__destsession)
00018 inv.createInventoryTable()
00019 dest_transaction=self.__destsession.transaction()
00020 source_transaction=self.__sourcesession.transaction()
00021 try:
00022 dest_transaction.start(False)
00023
00024 data=coral.AttributeList()
00025 my_editor=self.__destsession.nominalSchema().tableHandle(CommonUtils.inventoryTableName()).dataEditor()
00026 source_transaction.start(True)
00027 source_query=self.__sourcesession.nominalSchema().tableHandle(CommonUtils.inventoryTableName()).newQuery()
00028 conditionData=coral.AttributeList()
00029 source_query.setCondition('',conditionData)
00030 source_query.setRowCacheSize(self.__rowcachesize)
00031 my_editor.rowBuffer(data)
00032 source_query.defineOutput(data)
00033 bulkOperation=my_editor.bulkInsert(data,self.__rowcachesize)
00034 cursor=source_query.execute()
00035 while (cursor.next() ):
00036 bulkOperation.processNextIteration()
00037 bulkOperation.flush()
00038 del bulkOperation
00039 del source_query
00040
00041
00042 source_query=self.__sourcesession.nominalSchema().tableHandle(CommonUtils.inventoryIDTableName()).newQuery()
00043 my_ideditor=self.__destsession.nominalSchema().tableHandle(CommonUtils.inventoryIDTableName()).dataEditor()
00044 iddata=coral.AttributeList()
00045 source_query.setCondition('',conditionData)
00046 source_query.setRowCacheSize(self.__rowcachesize)
00047 my_ideditor.rowBuffer(iddata)
00048 source_query.defineOutput(iddata)
00049 bulkOperation=my_ideditor.bulkInsert(iddata,self.__rowcachesize)
00050 cursor=source_query.execute()
00051 while cursor.next():
00052 bulkOperation.processNextIteration()
00053 bulkOperation.flush()
00054 del bulkOperation
00055 del source_query
00056
00057
00058 if self.__sourcesession.nominalSchema().existsTable(CommonUtils.commentTableName()):
00059 source_query=self.__sourcesession.nominalSchema().tableHandle(CommonUtils.commentTableName()).newQuery()
00060 my_commenteditor=self.__destsession.nominalSchema().tableHandle(CommonUtils.commentTableName()).dataEditor()
00061 commentdata=coral.AttributeList()
00062 qcondition=coral.AttributeList()
00063 qcondition.extend('tablename','string')
00064 qcondition['tablename'].setData(CommonUtils.commentTableName())
00065 source_query.setCondition('tablename = :tablename',qcondition)
00066 source_query.setRowCacheSize(self.__rowcachesize)
00067 my_commenteditor.rowBuffer(commentdata)
00068 source_query.defineOutput(commentdata)
00069 bulkOperation=my_commenteditor.bulkInsert(commentdata,self.__rowcachesize)
00070 cursor=source_query.execute()
00071 while cursor.next():
00072 bulkOperation.processNextIteration()
00073 bulkOperation.flush()
00074 del bulkOperation
00075 del source_query
00076
00077 source_transaction.commit()
00078 dest_transaction.commit()
00079 except Exception, e:
00080 source_transaction.rollback()
00081 dest_transaction.rollback()
00082 raise Exception, str(e)
00083
00084 def copyTrees( self, treenames ):
00085 """copy tree from an external source.
00086 Merge inventory if existing in the destination
00087 """
00088 allleafs=[]
00089 for treename in treenames:
00090 t=TagTree.tagTree(self.__sourcesession,treename)
00091 allleafs.append(t.getAllLeaves())
00092
00093 merged={}
00094 for s in allleafs:
00095 for x in s:
00096 merged[x.tagid]=1
00097 sourceinv=tagInventory.tagInventory(self.__sourcesession)
00098 sourcetags=sourceinv.getAllEntries()
00099 entries=[]
00100 for i in merged.keys():
00101 for n in sourcetags:
00102 if n.tagid==i:
00103 entry={}
00104 entry['tagid']=i
00105 entry['tagname']=n.tagname
00106 entry['pfn']=n.pfn
00107 entry['recordname']=n.recordname
00108 entry['objectname']=n.objectname
00109 entry['labelname']=n.labelname
00110 entries.append(entry)
00111 inv=tagInventory.tagInventory(self.__destsession)
00112 tagiddict=inv.bulkInsertEntries(entries)
00113 dest_transaction=self.__destsession.transaction()
00114 source_transaction=self.__sourcesession.transaction()
00115
00116 try:
00117 for treename in treenames:
00118 desttree=TagTree.tagTree(self.__destsession,treename)
00119 desttree.createTagTreeTable()
00120 dest_transaction.start(False)
00121 source_transaction.start(True)
00122
00123 data=coral.AttributeList()
00124 dest_editor=self.__destsession.nominalSchema().tableHandle(CommonUtils.treeTableName(treename)).dataEditor()
00125 source_query=self.__sourcesession.nominalSchema().tableHandle(CommonUtils.treeTableName(treename)).newQuery()
00126 conditionData=coral.AttributeList()
00127 source_query.setCondition('',conditionData)
00128 source_query.setRowCacheSize(self.__rowcachesize)
00129 dest_editor.rowBuffer(data)
00130 source_query.defineOutput(data)
00131 bulkOperation=dest_editor.bulkInsert(data,self.__rowcachesize)
00132 cursor=source_query.execute()
00133 while cursor.next():
00134 bulkOperation.processNextIteration()
00135 bulkOperation.flush()
00136 del bulkOperation
00137 del source_query
00138
00139 iddata=coral.AttributeList()
00140 dest_editor=self.__destsession.nominalSchema().tableHandle(CommonUtils.treeIDTableName(treename)).dataEditor()
00141 source_query=self.__sourcesession.nominalSchema().tableHandle(CommonUtils.treeIDTableName(treename)).newQuery()
00142 conditionData=coral.AttributeList()
00143 source_query.setCondition('',conditionData)
00144 source_query.setRowCacheSize(self.__rowcachesize)
00145 dest_editor.rowBuffer(iddata)
00146 source_query.defineOutput(iddata)
00147 bulkOperation=dest_editor.bulkInsert(iddata,self.__rowcachesize)
00148 cursor=source_query.execute()
00149 while cursor.next():
00150 bulkOperation.processNextIteration()
00151 bulkOperation.flush()
00152 del bulkOperation
00153 del source_query
00154
00155 if self.__sourcesession.nominalSchema().existsTable(CommonUtils.commentTableName()):
00156 data=coral.AttributeList()
00157 dest_editor=self.__destsession.nominalSchema().tableHandle(CommonUtils.commentTableName()).dataEditor()
00158 source_query=self.__sourcesession.nominalSchema().tableHandle(CommonUtils.commentTableName()).newQuery()
00159 conditionData=coral.AttributeList()
00160 source_query.setCondition('tablename = :tablename',conditionData)
00161 conditionData.extend('tablename','string')
00162 conditionData['tablename'].setData(CommonUtils.treeTableName(treename))
00163 source_query.setRowCacheSize(self.__rowcachesize)
00164 dest_editor.rowBuffer(data)
00165 source_query.defineOutput(data)
00166 bulkOperation=dest_editor.bulkInsert(data,self.__rowcachesize)
00167 cursor=source_query.execute()
00168 while cursor.next():
00169 bulkOperation.processNextIteration()
00170 bulkOperation.flush()
00171 del bulkOperation
00172 del source_query
00173
00174 source_transaction.commit()
00175 dest_transaction.commit()
00176
00177 desttree.replaceLeafLinks(tagiddict)
00178 except Exception, e:
00179 source_transaction.rollback()
00180 dest_transaction.rollback()
00181 raise Exception, str(e)
00182
00183
00184 def copyDB( self ):
00185 """copy all globaltag related tables from an external source.
00186 The destination database must be empty. If not so, it will be cleaned implicitly. Inventory are implicitly copied as well.
00187 """
00188 dest_transaction=self.__destsession.transaction()
00189 source_transaction=self.__sourcesession.transaction()
00190 tablelist=[]
00191 alltablelist=[]
00192 trees=[]
00193 try:
00194 source_transaction.start(True)
00195 tablelist=list(self.__sourcesession.nominalSchema().listTables())
00196 source_transaction.commit()
00197 except Exception, e:
00198 source_transaction.rollback()
00199 raise Exception, str(e)
00200 try:
00201 i = tablelist.index(CommonUtils.inventoryTableName())
00202 alltablelist.append(CommonUtils.inventoryTableName())
00203 except ValueError:
00204 raise 'Error: '+CommonUtils.inventoryTableName()+' does not exist in the source'
00205 try:
00206 i = tablelist.index(CommonUtils.inventoryIDTableName())
00207 alltablelist.append(CommonUtils.inventoryIDTableName())
00208 except ValueError:
00209 raise 'Error: '+CommonUtils.inventoryIDTableName()+' does not exist'
00210
00211 try:
00212 i = tablelist.index(CommonUtils.commentTableName())
00213 alltablelist.append(CommonUtils.commentTableName())
00214 except ValueError:
00215 pass
00216
00217 for tablename in tablelist:
00218 posbeg=tablename.find('TAGTREE_TABLE_')
00219 if posbeg != -1:
00220 treename=tablename[posbeg+len('TAGTREE_TABLE_'):]
00221 trees.append(treename)
00222 for tree in trees:
00223 try:
00224 tablelist.index(CommonUtils.treeIDTableName(tree))
00225 except ValueError:
00226 print 'non-existing id table for tree ',tree
00227 continue
00228 alltablelist.append(CommonUtils.treeTableName(tree))
00229 alltablelist.append(CommonUtils.treeIDTableName(tree))
00230
00231 inv=tagInventory.tagInventory(self.__destsession)
00232 inv.createInventoryTable()
00233 for treename in trees:
00234 t=TagTree.tagTree(self.__destsession,treename)
00235 t.createTagTreeTable()
00236
00237 try:
00238 for mytable in alltablelist:
00239 dest_transaction.start(False)
00240 source_transaction.start(True)
00241 data=coral.AttributeList()
00242 my_editor=self.__destsession.nominalSchema().tableHandle(mytable).dataEditor()
00243 source_query=self.__sourcesession.nominalSchema().tableHandle(mytable).newQuery()
00244 conditionData=coral.AttributeList()
00245 source_query.setCondition('',conditionData)
00246 source_query.setRowCacheSize(self.__rowcachesize)
00247 my_editor.rowBuffer(data)
00248 source_query.defineOutput(data)
00249 bulkOperation=my_editor.bulkInsert(data,self.__rowcachesize)
00250 cursor=source_query.execute()
00251 while cursor.next():
00252 bulkOperation.processNextIteration()
00253 bulkOperation.flush()
00254 del bulkOperation
00255 del source_query
00256 source_transaction.commit()
00257 dest_transaction.commit()
00258 except Exception, e:
00259 source_transaction.rollback()
00260 dest_transaction.rollback()
00261 raise Exception, str(e)
00262
00263 if __name__ == "__main__":
00264
00265
00266 svc = coral.ConnectionService()
00267
00268 sourcesession = svc.connect( 'sqlite_file:source.db',
00269 accessMode = coral.access_Update )
00270 destsession = svc.connect( 'sqlite_file:dest.db',
00271 accessMode = coral.access_Update )
00272 try:
00273 dbcp=DBCopy(sourcesession,destsession,1024)
00274 print "TEST copyInventory"
00275 dbcp.copyInventory()
00276 print "TEST copytrees"
00277 treenames=['CRUZET3_V2H']
00278 dbcp.copyTrees(treenames)
00279 del sourcesession
00280 del destsession
00281 except Exception, e:
00282 print "Failed in unit test"
00283 print str(e)
00284 del sourcesession
00285 del destsession
00286
00287 sourcesession = svc.connect( 'sqlite_file:source.db',
00288 accessMode = coral.access_Update )
00289 destsession = svc.connect( 'sqlite_file:dest2.db',
00290 accessMode = coral.access_Update )
00291 try:
00292 dbcp=DBCopy(sourcesession,destsession,1024)
00293 print "TEST full dbCopy"
00294 dbcp.copyDB()
00295 del sourcesession
00296 del destsession
00297 except Exception, e:
00298 print "Failed in unit test"
00299 print str(e)
00300 del sourcesession
00301 del destsession