00001 '''
00002 exporter(sourceSession,destSession[,rowCachesize])
00003 Input parameter sourceSession : session proxy for source schema providing logical service name & access mode
00004 Input parameter destSession : session proxy for destination schema providing logical service name & access mode
00005 Input parameter rowCachesize : the number of rows to be cached at the client side, default value =100
00006 Output paramter : the exporter object
00007 '''
00008
00009 import os
00010 import coral
00011 import time
00012 import math
00013 from multivaluedict import mseqdict
00014 from listobjects import listobjects,listschema,listtables,listtableset
00015
00016 class exporter:
00017 "exporter class for CoralTools"
00018 m_sourceSession = 0
00019 m_destSession = 0
00020 m_rowCachesize = 100
00021
00022 def __init__( self,sourceSession,destSession,rowCachesize=100 ):
00023 try:
00024
00025 self.m_sourceSession = sourceSession
00026 self.m_destSession = destSession
00027 self.m_rowCachesize = rowCachesize
00028
00029 self.m_sourceSession.transaction().start()
00030 self.m_destSession.transaction().start()
00031
00032 except Exception, e:
00033 raise Exception("Error in Initializer: " + str(e))
00034
00035
00036 def copyschema(self ):
00037 try:
00038
00039 listsourceTable=listschema( self.m_sourceSession.nominalSchema() )
00040 listdestTable=listobjects( self.m_destSession.nominalSchema() )
00041 self._checktable(listsourceTable,listdestTable)
00042
00043 for key,value in listsourceTable.items():
00044 iTable = key
00045 print iTable
00046 self._copytablelayout(iTable)
00047
00048 self.m_destSession.transaction().commit()
00049 self.m_sourceSession.transaction().commit()
00050 print "copyschema SUCCESS"
00051
00052 return True
00053
00054 except Exception, e:
00055 self.m_destSession.transaction().rollback()
00056 self.m_sourceSession.transaction().commit()
00057 raise Exception ("Error in copyschema method: " + str(e))
00058 return False
00059
00060
00061 def copydata(self,rowCount=-1 ):
00062 try:
00063
00064 self.m_sourceSession.transaction().start()
00065 self.m_destSession.transaction().start()
00066
00067 listsourceTable=listschema( self.m_sourceSession.nominalSchema() )
00068 listdestTable=listobjects( self.m_destSession.nominalSchema() )
00069 self._checktable(listsourceTable,listdestTable)
00070
00071 selectionclause=""
00072 selectionparameters=coral.AttributeList()
00073 for key,value in listsourceTable.items():
00074 iTable = key
00075 print iTable
00076 currentCount = 0
00077 self._copytablelayout(iTable)
00078 self._copydatalayout(iTable,selectionclause,selectionparameters,currentCount,rowCount)
00079
00080 self.m_destSession.transaction().commit()
00081 self.m_sourceSession.transaction().commit()
00082 print "copydata SUCCESS"
00083 return True
00084
00085 except Exception, e:
00086 self.m_destSession.transaction().rollback()
00087 self.m_sourceSession.transaction().commit()
00088 raise Exception ("Error in copydata method: " + str(e))
00089 return False
00090
00091
00092 def copytableschema(self,tablename ):
00093 try:
00094
00095 iTable=""
00096 listsourceTable=listtables( self.m_sourceSession.nominalSchema(),tablename )
00097 listdestTable=listtables(self.m_destSession.nominalSchema(),tablename )
00098 self._checktable(listsourceTable,listdestTable)
00099
00100 for key,value in listsourceTable.items():
00101 iTable = key
00102 print iTable
00103 self._copytablelayout(iTable)
00104
00105 self.m_destSession.transaction().commit()
00106 self.m_sourceSession.transaction().commit()
00107 print "copytableschema SUCCESS"
00108 return True
00109
00110 except Exception, e:
00111 self.m_destSession.transaction().rollback()
00112 self.m_sourceSession.transaction().commit()
00113 raise Exception ("Error in copytableschema method: " + str(e)+" : "+iTable)
00114 return False
00115
00116
00117 def copytabledata(self,tablename,selectionclause,selectionparameters,rowCount=-1 ):
00118 try:
00119
00120 iTable=""
00121 listsourceTable=listtables( self.m_sourceSession.nominalSchema(),tablename )
00122 listdestTable=listtables( self.m_destSession.nominalSchema(),tablename )
00123
00124 currentCount = 0
00125 for key,value in listsourceTable.items():
00126 iTable = key
00127 print iTable
00128 tableexists = self._checkdata(iTable,listdestTable)
00129
00130 if not tableexists:
00131 self._copytablelayout(iTable)
00132
00133 self._copydatalayout(iTable,selectionclause,selectionparameters,currentCount,rowCount)
00134
00135 self.m_destSession.transaction().commit()
00136 self.m_sourceSession.transaction().commit()
00137 print "copytabledata SUCCESS"
00138 return True
00139
00140 except Exception, e:
00141 self.m_destSession.transaction().rollback()
00142 self.m_sourceSession.transaction().commit()
00143 raise Exception ("Error in copytabledata method: " + str(e)+" : " + iTable)
00144 return False
00145
00146
00147 def copytablelistschema(self,tableset ):
00148 try:
00149
00150 iTable=""
00151 listsourceTable=listtableset( self.m_sourceSession.nominalSchema(),tableset )
00152 listdestTable=listtableset( self.m_destSession.nominalSchema(),tableset )
00153 self._checktable(listsourceTable,listdestTable)
00154
00155 for key,value in listsourceTable.items():
00156 iTable = key
00157 print iTable
00158 self._copytablelayout(iTable)
00159
00160 self.m_destSession.transaction().commit()
00161 self.m_sourceSession.transaction().commit()
00162 print "copytablelistschema SUCCESS"
00163 return True
00164
00165 except Exception, e:
00166 self.m_destSession.transaction().rollback()
00167 self.m_sourceSession.transaction().commit()
00168 raise Exception ("Error in copytablelistschema method: " + str(e)+" : "+iTable)
00169 return False
00170
00171
00172 def copytablelistdata(self,tablelist,rowCount=-1):
00173 try:
00174
00175 iTable=""
00176 tableset=[]
00177 for table in tablelist:
00178 i=0
00179 for parameter in table:
00180 if i==0:
00181 tableset.append(parameter)
00182 i=i+1
00183
00184 listsourceTable=listtableset( self.m_sourceSession.nominalSchema(),tableset )
00185 listdestTable=listtableset( self.m_destSession.nominalSchema(),tableset )
00186 for key,value in listsourceTable.items():
00187 iTable = key
00188 print iTable
00189 currentCount = 0
00190 selectionclause=""
00191 selectionparameters=coral.AttributeList()
00192 for table in tablelist:
00193 i=0
00194 for parameter in table:
00195 if i==0:
00196 table=parameter
00197 if table==iTable:
00198 if i==1:
00199 selectionclause = parameter
00200 if i==2:
00201 selectionparameters = parameter
00202 i=i+1
00203
00204 tableexists = self._checkdata(iTable,listdestTable)
00205
00206 if not tableexists:
00207 self._copytablelayout(iTable)
00208
00209 self._copydatalayout(iTable,selectionclause,selectionparameters,currentCount,rowCount)
00210
00211 self.m_destSession.transaction().commit()
00212 self.m_sourceSession.transaction().commit()
00213 print "copytablelistdata SUCCESS"
00214 return True
00215
00216 except Exception, e:
00217 self.m_destSession.transaction().rollback()
00218 self.m_sourceSession.transaction().commit()
00219 raise Exception ("Error in copytablelistdata method: " + str(e) + " : "+ iTable)
00220 return False
00221
00222
00223 def _copytablelayout(self,iTable ):
00224 try:
00225
00226 description = self.m_sourceSession.nominalSchema().tableHandle( iTable ).description()
00227 table = self.m_destSession.nominalSchema().createTable( description )
00228
00229 return True
00230
00231 except Exception, e:
00232 raise Exception (" " + str(e))
00233 return False
00234
00235
00236 def _copydatalayout(self,iTable,selectionclause,selectionparameters,currentCount,rowCount ):
00237 try:
00238 data=coral.AttributeList()
00239 sourceEditor = self.m_sourceSession.nominalSchema().tableHandle( iTable ).dataEditor()
00240 destEditor = self.m_destSession.nominalSchema().tableHandle( iTable ).dataEditor()
00241
00242 sourceEditor.rowBuffer(data)
00243 sourcequery = self.m_sourceSession.nominalSchema().tableHandle( iTable ).newQuery()
00244 sourcequery.setCondition(selectionclause,selectionparameters)
00245 sourcequery.setRowCacheSize(self.m_rowCachesize)
00246 sourcequery.defineOutput(data)
00247
00248 bulkOperation = destEditor.bulkInsert(data,self.m_rowCachesize )
00249
00250 cursor=sourcequery.execute()
00251
00252 for row in cursor:
00253 currentCount = currentCount+1
00254 bulkOperation.processNextIteration()
00255 if currentCount == rowCount:
00256 bulkOperation.flush()
00257 self.m_destSession.transaction().commit()
00258 self.m_destSession.transaction().start()
00259 currentCount = 0
00260 bulkOperation.flush()
00261 del bulkOperation
00262 del sourcequery
00263
00264 return True
00265
00266 except Exception, e:
00267 raise Exception (" " + str(e))
00268 return False
00269
00270
00271 def _checktable(self,listsourceTable,listdestTable ):
00272 try:
00273
00274 for key,value in listsourceTable.items():
00275 table=key
00276 for key,value in listdestTable.items():
00277 if key==table:
00278 raise Exception( "Table exists in Destination Schema : " )
00279
00280 return True
00281
00282 except Exception, e:
00283 raise Exception (" " + str(e) + table)
00284 return False
00285
00286
00287 def _checkdata(self,iTable,listdestTable ):
00288 try:
00289
00290 foundtable=False
00291 founddata=False
00292 for key,value in listdestTable.items():
00293 if key==iTable:
00294 counter=0
00295
00296 query = self.m_destSession.nominalSchema().tableHandle( iTable ).newQuery()
00297 cursor = query.execute()
00298 foundtable=True
00299 for row in cursor:
00300 counter=counter+1
00301 del query
00302 if (counter > 0):
00303 founddata=True
00304
00305 return foundtable
00306
00307 except Exception, e:
00308 raise Exception (" " + str(e) + iTable)
00309 return False
00310
00311 def __del__( self ):
00312 print ""