00001 #include "CondCore/ORA/interface/Exception.h"
00002 #include "CArrayStreamer.h"
00003 #include "ClassUtils.h"
00004 #include "MappingElement.h"
00005 #include "ContainerSchema.h"
00006 #include "RelationalBuffer.h"
00007 #include "MultiRecordInsertOperation.h"
00008 #include "MultiRecordSelectOperation.h"
00009 #include "RelationalStreamerFactory.h"
00010 #include "ArrayHandlerFactory.h"
00011 #include "IArrayHandler.h"
00012
00013 #include "CoralBase/Attribute.h"
00014 #include "RelationalAccess/IBulkOperation.h"
00015 #include "Reflex/Object.h"
00016
00017 ora::CArrayWriter::CArrayWriter( const Reflex::Type& objectType,
00018 MappingElement& mapping,
00019 ContainerSchema& contSchema ):
00020 m_objectType( objectType ),
00021 m_mappingElement( mapping ),
00022 m_schema( contSchema ),
00023 m_recordId(),
00024 m_localElement( ),
00025 m_offset( 0 ),
00026 m_insertOperation( 0 ),
00027 m_arrayHandler(),
00028 m_dataWriter(){
00029 }
00030
00031 ora::CArrayWriter::~CArrayWriter(){
00032 }
00033
00034 bool ora::CArrayWriter::build( DataElement& offset,
00035 IRelationalData&,
00036 RelationalBuffer& operationBuffer ){
00037
00038 m_localElement.clear();
00039 m_recordId.clear();
00040
00041 m_recordId.push_back(0);
00042
00043
00044 Reflex::Type arrayType = m_objectType.ToType();
00045 Reflex::Type arrayResolvedType = ClassUtils::resolvedType(arrayType);
00046
00047 if ( ! arrayType || !arrayResolvedType ) {
00048 throwException( "Missing dictionary information for the element type of the array \"" +
00049 m_objectType.Name(Reflex::SCOPED|Reflex::FINAL) + "\"",
00050 "CArrayWriter::build" );
00051 }
00052
00053 RelationalStreamerFactory streamerFactory( m_schema );
00054
00055
00056 m_insertOperation = &operationBuffer.newMultiRecordInsert( m_mappingElement.tableName() );
00057 const std::vector<std::string>& columns = m_mappingElement.columnNames();
00058 if( !columns.size() ){
00059 throwException( "Id columns not found in the mapping.",
00060 "CArrayWriter::build");
00061 }
00062 for( size_t i=0; i<columns.size(); i++ ){
00063 m_insertOperation->addId( columns[ i ] );
00064 }
00065
00066 m_offset = &offset;
00067
00068 m_arrayHandler.reset( ArrayHandlerFactory::newArrayHandler( m_objectType ) );
00069
00070 std::string arrayTypeName = arrayType.Name();
00071
00072 MappingElement::iterator iMe = m_mappingElement.find( arrayTypeName );
00073 if ( iMe == m_mappingElement.end() ) {
00074 throwException( "Item for \"" + arrayTypeName + "\" not found in the mapping element",
00075 "CArrayWriter::build" );
00076 }
00077
00078 m_dataWriter.reset( streamerFactory.newWriter( arrayResolvedType, iMe->second ));
00079 m_dataWriter->build( m_localElement, *m_insertOperation, operationBuffer );
00080 return true;
00081 }
00082
00083 void ora::CArrayWriter::setRecordId( const std::vector<int>& identity ){
00084 m_recordId.clear();
00085 for(size_t i=0;i<identity.size();i++) {
00086 m_recordId.push_back( identity[i] );
00087 }
00088 m_recordId.push_back( 0 );
00089 }
00090
00091 void ora::CArrayWriter::write( int oid,
00092 const void* inputData ){
00093
00094 if(!m_offset){
00095 throwException("The streamer has not been built.",
00096 "CArrayWriter::write");
00097 }
00098 const std::vector<std::string>& columns = m_mappingElement.columnNames();
00099 if( columns.size() != m_recordId.size()+1){
00100 throwException( "Record id elements provided are not matching with the mapped id columns.",
00101 "CArrayWriter::write");
00102 }
00103
00104 void* data = m_offset->address( inputData );
00105
00106
00107 size_t containerSize = m_arrayHandler->size( data );
00108
00109 if ( containerSize == 0 ) return;
00110
00111 size_t startElementIndex = m_arrayHandler->startElementIndex( data );
00112
00113 std::auto_ptr<IArrayIteratorHandler> iteratorHandler( m_arrayHandler->iterate( data ) );
00114
00115 InsertCache& bulkOperation = m_insertOperation->setUp( containerSize-startElementIndex+1 );
00116
00117 for ( size_t iIndex = startElementIndex; iIndex < containerSize; ++iIndex ) {
00118
00119 m_recordId[m_recordId.size()-1] = iIndex;
00120 coral::AttributeList& dataBuff = m_insertOperation->data();
00121
00122 dataBuff[ columns[0] ].data<int>() = oid;
00123 for( size_t i = 1;i < columns.size(); i++ ){
00124 dataBuff[ columns[i] ].data<int>() = m_recordId[i-1];
00125 }
00126
00127 void* objectReference = iteratorHandler->object();
00128
00129 m_dataWriter->setRecordId( m_recordId );
00130 m_dataWriter->write( oid, objectReference );
00131
00132 bulkOperation.processNextIteration();
00133
00134 iteratorHandler->increment();
00135 }
00136
00137 m_arrayHandler->finalize( const_cast<void*>( data ) );
00138
00139 }
00140
00141 ora::CArrayUpdater::CArrayUpdater(const Reflex::Type& objectType,
00142 MappingElement& mapping,
00143 ContainerSchema& contSchema ):
00144 m_deleter( mapping ),
00145 m_writer( objectType, mapping, contSchema ){
00146 }
00147
00148 ora::CArrayUpdater::~CArrayUpdater(){
00149 }
00150
00151 bool ora::CArrayUpdater::build( DataElement& offset,
00152 IRelationalData& relationalData,
00153 RelationalBuffer& operationBuffer){
00154 m_deleter.build( operationBuffer );
00155 m_writer.build( offset, relationalData, operationBuffer );
00156 return true;
00157 }
00158
00159 void ora::CArrayUpdater::setRecordId( const std::vector<int>& identity ){
00160 m_writer.setRecordId( identity );
00161 }
00162
00163 void ora::CArrayUpdater::update( int oid,
00164 const void* data ){
00165 m_deleter.erase( oid );
00166 m_writer.write( oid, data );
00167 }
00168
00169 ora::CArrayReader::CArrayReader(const Reflex::Type& objectType,
00170 MappingElement& mapping,
00171 ContainerSchema& contSchema ):
00172 m_objectType( objectType ),
00173 m_mappingElement( mapping ),
00174 m_schema( contSchema ),
00175 m_recordId(),
00176 m_localElement( ),
00177 m_offset(0 ),
00178 m_query(),
00179 m_arrayHandler(),
00180 m_dataReader(){
00181 }
00182
00183 ora::CArrayReader::~CArrayReader(){
00184 }
00185
00186 bool ora::CArrayReader::build( DataElement& offset,
00187 IRelationalData& relationalData ){
00188
00189 m_localElement.clear();
00190
00191 m_recordId.clear();
00192
00193 m_recordId.push_back(0);
00194
00195
00196 Reflex::Type arrayType = m_objectType.ToType();
00197 Reflex::Type arrayResolvedType = ClassUtils::resolvedType(arrayType);
00198
00199 if ( ! arrayType || !arrayResolvedType ) {
00200 throwException( "Missing dictionary information for the element type of the array \"" +
00201 m_objectType.Name(Reflex::SCOPED|Reflex::FINAL) + "\"",
00202 "CArrayReader::build" );
00203 }
00204
00205 RelationalStreamerFactory streamerFactory( m_schema );
00206
00207
00208 m_query.reset(new MultiRecordSelectOperation( m_mappingElement.tableName(), m_schema.storageSchema() ));
00209 m_query->addWhereId( m_mappingElement.pkColumn() );
00210 std::vector<std::string> recIdCols = m_mappingElement.recordIdColumns();
00211 for( size_t i=0; i<recIdCols.size(); i++ ){
00212 m_query->addId( recIdCols[ i ] );
00213 m_query->addOrderId( recIdCols[ i ] );
00214 }
00215
00216 m_offset = &offset;
00217
00218 m_arrayHandler.reset( ArrayHandlerFactory::newArrayHandler( m_objectType ) );
00219
00220 std::string arrayTypeName = arrayType.Name();
00221
00222
00223 MappingElement::iterator iMe = m_mappingElement.find( arrayTypeName );
00224 if ( iMe == m_mappingElement.end() ) {
00225 throwException( "Item for \"" + arrayTypeName + "\" not found in the mapping element",
00226 "CArrayReader::build" );
00227 }
00228
00229 m_dataReader.reset( streamerFactory.newReader( arrayResolvedType, iMe->second ) );
00230 m_dataReader->build( m_localElement, *m_query );
00231 return true;
00232 }
00233
00234 void ora::CArrayReader::select( int oid ){
00235 if(!m_query.get()){
00236 throwException("The streamer has not been built.",
00237 "CArrayReader::select");
00238 }
00239 coral::AttributeList& whereData = m_query->whereData();
00240 whereData[ m_mappingElement.pkColumn() ].data<int>() = oid;
00241 m_query->execute();
00242 m_dataReader->select( oid );
00243 }
00244
00245 void ora::CArrayReader::setRecordId( const std::vector<int>& identity ){
00246 m_recordId.clear();
00247 for(size_t i=0;i<identity.size();i++) {
00248 m_recordId.push_back( identity[i] );
00249 }
00250
00251 m_recordId.push_back( 0 );
00252 }
00253
00254 void ora::CArrayReader::read( void* destinationData ) {
00255 if(!m_offset){
00256 throwException("The streamer has not been built.",
00257 "CArrayReader::read");
00258 }
00259 void* address = m_offset->address( destinationData );
00260
00261 Reflex::Type iteratorDereferenceReturnType = m_arrayHandler->iteratorReturnType();
00262
00263 bool isElementFundamental = iteratorDereferenceReturnType.IsFundamental();
00264
00265 std::string positionColumn = m_mappingElement.posColumn();
00266
00267 size_t arraySize = m_objectType.ArrayLength();
00268
00269 m_arrayHandler->clear( address );
00270
00271 size_t cursorSize = m_query->selectionSize(m_recordId, m_recordId.size()-1);
00272 unsigned int i=0;
00273 while ( i< cursorSize ){
00274
00275 m_recordId[m_recordId.size()-1] = (int)i;
00276 m_query->selectRow( m_recordId );
00277 coral::AttributeList& row = m_query->data();
00278
00279 int arrayIndex = row[positionColumn].data< int >();
00280
00281
00282 void* objectData = 0;
00283
00284 if(arrayIndex >= (int)arraySize){
00285 throwException("Found more element then array size.",
00286 "CArrayReader::read");
00287
00288 }
00289
00290
00291 objectData = static_cast<char*>(address)+arrayIndex*iteratorDereferenceReturnType.SizeOf();
00292
00293 if(!isElementFundamental){
00294
00295 iteratorDereferenceReturnType.Construct(Reflex::Type(0,0),std::vector< void* >(),objectData);
00296 }
00297
00298 m_dataReader->setRecordId( m_recordId );
00299 m_dataReader->read( objectData );
00300
00301 ++i;
00302 }
00303
00304 m_arrayHandler->finalize( address );
00305
00306 }
00307
00308 void ora::CArrayReader::clear(){
00309 if(m_dataReader.get()) m_dataReader->clear();
00310 }
00311
00312 ora::CArrayStreamer::CArrayStreamer( const Reflex::Type& objectType,
00313 MappingElement& mapping,
00314 ContainerSchema& contSchema ):
00315 m_objectType( objectType ),
00316 m_mapping( mapping ),
00317 m_schema( contSchema ){
00318 }
00319
00320 ora::CArrayStreamer::~CArrayStreamer(){
00321 }
00322
00323 ora::IRelationalWriter* ora::CArrayStreamer::newWriter(){
00324 return new CArrayWriter( m_objectType, m_mapping, m_schema );
00325 }
00326
00327 ora::IRelationalUpdater* ora::CArrayStreamer::newUpdater(){
00328 return new CArrayUpdater( m_objectType, m_mapping, m_schema );
00329 }
00330
00331 ora::IRelationalReader* ora::CArrayStreamer::newReader(){
00332 return new CArrayReader( m_objectType, m_mapping, m_schema );
00333 }