00001 #include "CondCore/ORA/interface/Exception.h"
00002 #include "CArrayStreamer.h"
00003 #include "ClassUtils.h"
00004 #include "MappingElement.h"
00005 #include "ContainerSchema.h"
00006 #include "RelationalBuffer.h"
00007 #include "MultiRecordInsertOperation.h"
00008 #include "MultiRecordSelectOperation.h"
00009 #include "RelationalStreamerFactory.h"
00010 #include "ArrayHandlerFactory.h"
00011 #include "IArrayHandler.h"
00012
00013 #include "CoralBase/Attribute.h"
00014 #include "RelationalAccess/IBulkOperation.h"
00015 #include "Reflex/Object.h"
00016
00017 ora::CArrayWriter::CArrayWriter( const Reflex::Type& objectType,
00018 MappingElement& mapping,
00019 ContainerSchema& contSchema ):
00020 m_objectType( objectType ),
00021 m_mappingElement( mapping ),
00022 m_schema( contSchema ),
00023 m_recordId(),
00024 m_localElement( ),
00025 m_offset( 0 ),
00026 m_insertOperation( 0 ),
00027 m_arrayHandler(),
00028 m_dataWriter(){
00029 }
00030
00031 ora::CArrayWriter::~CArrayWriter(){
00032 }
00033
00034 bool ora::CArrayWriter::build( DataElement& offset,
00035 IRelationalData&,
00036 RelationalBuffer& operationBuffer ){
00037
00038 m_localElement.clear();
00039 m_recordId.clear();
00040
00041 m_recordId.push_back(0);
00042
00043
00044 Reflex::Type arrayType = m_objectType.ToType();
00045 Reflex::Type arrayResolvedType = ClassUtils::resolvedType(arrayType);
00046
00047 if ( ! arrayType || !arrayResolvedType ) {
00048 throwException( "Missing dictionary information for the element type of the array \"" +
00049 m_objectType.Name(Reflex::SCOPED|Reflex::FINAL) + "\"",
00050 "CArrayWriter::build" );
00051 }
00052
00053 RelationalStreamerFactory streamerFactory( m_schema );
00054
00055
00056 m_insertOperation = &operationBuffer.newMultiRecordInsert( m_mappingElement.tableName() );
00057 const std::vector<std::string>& columns = m_mappingElement.columnNames();
00058 if( !columns.size() ){
00059 throwException( "Id columns not found in the mapping.",
00060 "CArrayWriter::build");
00061 }
00062 for( size_t i=0; i<columns.size(); i++ ){
00063 m_insertOperation->addId( columns[ i ] );
00064 }
00065
00066 m_offset = &offset;
00067
00068 m_arrayHandler.reset( ArrayHandlerFactory::newArrayHandler( m_objectType ) );
00069
00070 std::string arrayTypeName = arrayType.Name();
00071
00072 MappingElement::iterator iMe = m_mappingElement.find( arrayTypeName );
00073 if ( iMe == m_mappingElement.end() ) {
00074 throwException( "Item for \"" + arrayTypeName + "\" not found in the mapping element",
00075 "CArrayWriter::build" );
00076 }
00077
00078 m_dataWriter.reset( streamerFactory.newWriter( arrayResolvedType, iMe->second ));
00079 m_dataWriter->build( m_localElement, *m_insertOperation, operationBuffer );
00080 return true;
00081 }
00082
00083 void ora::CArrayWriter::setRecordId( const std::vector<int>& identity ){
00084 m_recordId.clear();
00085 for(size_t i=0;i<identity.size();i++) {
00086 m_recordId.push_back( identity[i] );
00087 }
00088 m_recordId.push_back( 0 );
00089 }
00090
00091 void ora::CArrayWriter::write( int oid,
00092 const void* inputData ){
00093
00094 if(!m_offset){
00095 throwException("The streamer has not been built.",
00096 "CArrayWriter::write");
00097 }
00098 const std::vector<std::string>& columns = m_mappingElement.columnNames();
00099 if( columns.size() != m_recordId.size()+1){
00100 throwException( "Record id elements provided are not matching with the mapped id columns.",
00101 "CArrayWriter::write");
00102 }
00103
00104 void* data = m_offset->address( inputData );
00105
00106
00107 size_t containerSize = m_arrayHandler->size( data );
00108 size_t persistentSize = m_arrayHandler->persistentSize( data );
00109
00110 if ( containerSize == 0 || containerSize < persistentSize ) return;
00111
00112 size_t startElementIndex = m_arrayHandler->startElementIndex( data );
00113
00114 std::auto_ptr<IArrayIteratorHandler> iteratorHandler( m_arrayHandler->iterate( data ) );
00115
00116 InsertCache& bulkOperation = m_insertOperation->setUp( containerSize-startElementIndex+1 );
00117
00118 for ( size_t iIndex = startElementIndex; iIndex < containerSize; ++iIndex ) {
00119
00120 m_recordId[m_recordId.size()-1] = iIndex;
00121 coral::AttributeList& dataBuff = m_insertOperation->data();
00122
00123 dataBuff[ columns[0] ].data<int>() = oid;
00124 for( size_t i = 1;i < columns.size(); i++ ){
00125 dataBuff[ columns[i] ].data<int>() = m_recordId[i-1];
00126 }
00127
00128 void* objectReference = iteratorHandler->object();
00129
00130 m_dataWriter->setRecordId( m_recordId );
00131 m_dataWriter->write( oid, objectReference );
00132
00133 bulkOperation.processNextIteration();
00134
00135 iteratorHandler->increment();
00136 }
00137
00138 m_arrayHandler->finalize( const_cast<void*>( data ) );
00139
00140 }
00141
00142 ora::CArrayUpdater::CArrayUpdater(const Reflex::Type& objectType,
00143 MappingElement& mapping,
00144 ContainerSchema& contSchema ):
00145 m_deleter( mapping ),
00146 m_writer( objectType, mapping, contSchema ){
00147 }
00148
00149 ora::CArrayUpdater::~CArrayUpdater(){
00150 }
00151
00152 bool ora::CArrayUpdater::build( DataElement& offset,
00153 IRelationalData& relationalData,
00154 RelationalBuffer& operationBuffer){
00155 m_deleter.build( operationBuffer );
00156 m_writer.build( offset, relationalData, operationBuffer );
00157 return true;
00158 }
00159
00160 void ora::CArrayUpdater::setRecordId( const std::vector<int>& identity ){
00161 m_writer.setRecordId( identity );
00162 }
00163
00164 void ora::CArrayUpdater::update( int oid,
00165 const void* data ){
00166 m_deleter.erase( oid );
00167 m_writer.write( oid, data );
00168 }
00169
00170 ora::CArrayReader::CArrayReader(const Reflex::Type& objectType,
00171 MappingElement& mapping,
00172 ContainerSchema& contSchema ):
00173 m_objectType( objectType ),
00174 m_mappingElement( mapping ),
00175 m_schema( contSchema ),
00176 m_recordId(),
00177 m_localElement( ),
00178 m_offset(0 ),
00179 m_query(),
00180 m_arrayHandler(),
00181 m_dataReader(){
00182 }
00183
00184 ora::CArrayReader::~CArrayReader(){
00185 }
00186
00187 bool ora::CArrayReader::build( DataElement& offset,
00188 IRelationalData& relationalData ){
00189
00190 m_localElement.clear();
00191
00192 m_recordId.clear();
00193
00194 m_recordId.push_back(0);
00195
00196
00197 Reflex::Type arrayType = m_objectType.ToType();
00198 Reflex::Type arrayResolvedType = ClassUtils::resolvedType(arrayType);
00199
00200 if ( ! arrayType || !arrayResolvedType ) {
00201 throwException( "Missing dictionary information for the element type of the array \"" +
00202 m_objectType.Name(Reflex::SCOPED|Reflex::FINAL) + "\"",
00203 "CArrayReader::build" );
00204 }
00205
00206 RelationalStreamerFactory streamerFactory( m_schema );
00207
00208
00209 m_query.reset(new MultiRecordSelectOperation( m_mappingElement.tableName(), m_schema.storageSchema() ));
00210 m_query->addWhereId( m_mappingElement.pkColumn() );
00211 std::vector<std::string> recIdCols = m_mappingElement.recordIdColumns();
00212 for( size_t i=0; i<recIdCols.size(); i++ ){
00213 m_query->addId( recIdCols[ i ] );
00214 m_query->addOrderId( recIdCols[ i ] );
00215 }
00216
00217 m_offset = &offset;
00218
00219 m_arrayHandler.reset( ArrayHandlerFactory::newArrayHandler( m_objectType ) );
00220
00221 std::string arrayTypeName = arrayType.Name();
00222
00223
00224 MappingElement::iterator iMe = m_mappingElement.find( arrayTypeName );
00225 if ( iMe == m_mappingElement.end() ) {
00226 throwException( "Item for \"" + arrayTypeName + "\" not found in the mapping element",
00227 "CArrayReader::build" );
00228 }
00229
00230 m_dataReader.reset( streamerFactory.newReader( arrayResolvedType, iMe->second ) );
00231 m_dataReader->build( m_localElement, *m_query );
00232 return true;
00233 }
00234
00235 void ora::CArrayReader::select( int oid ){
00236 if(!m_query.get()){
00237 throwException("The streamer has not been built.",
00238 "CArrayReader::select");
00239 }
00240 coral::AttributeList& whereData = m_query->whereData();
00241 whereData[ m_mappingElement.pkColumn() ].data<int>() = oid;
00242 m_query->execute();
00243 m_dataReader->select( oid );
00244 }
00245
00246 void ora::CArrayReader::setRecordId( const std::vector<int>& identity ){
00247 m_recordId.clear();
00248 for(size_t i=0;i<identity.size();i++) {
00249 m_recordId.push_back( identity[i] );
00250 }
00251
00252 m_recordId.push_back( 0 );
00253 }
00254
00255 void ora::CArrayReader::read( void* destinationData ) {
00256 if(!m_offset){
00257 throwException("The streamer has not been built.",
00258 "CArrayReader::read");
00259 }
00260 void* address = m_offset->address( destinationData );
00261
00262 Reflex::Type iteratorDereferenceReturnType = m_arrayHandler->iteratorReturnType();
00263
00264 bool isElementFundamental = iteratorDereferenceReturnType.IsFundamental();
00265
00266 std::string positionColumn = m_mappingElement.posColumn();
00267
00268 size_t arraySize = m_objectType.ArrayLength();
00269
00270 m_arrayHandler->clear( address );
00271
00272 size_t cursorSize = m_query->selectionSize(m_recordId, m_recordId.size()-1);
00273 unsigned int i=0;
00274 while ( i< cursorSize ){
00275
00276 m_recordId[m_recordId.size()-1] = (int)i;
00277 m_query->selectRow( m_recordId );
00278 coral::AttributeList& row = m_query->data();
00279
00280 int arrayIndex = row[positionColumn].data< int >();
00281
00282
00283 void* objectData = 0;
00284
00285 if(arrayIndex >= (int)arraySize){
00286 throwException("Found more element then array size.",
00287 "CArrayReader::read");
00288
00289 }
00290
00291
00292 objectData = static_cast<char*>(address)+arrayIndex*iteratorDereferenceReturnType.SizeOf();
00293
00294 if(!isElementFundamental){
00295
00296 iteratorDereferenceReturnType.Construct(Reflex::Type(0,0),std::vector< void* >(),objectData);
00297 }
00298
00299 m_dataReader->setRecordId( m_recordId );
00300 m_dataReader->read( objectData );
00301
00302 ++i;
00303 }
00304
00305 m_arrayHandler->finalize( address );
00306
00307 }
00308
00309 void ora::CArrayReader::clear(){
00310 if(m_dataReader.get()) m_dataReader->clear();
00311 }
00312
00313 ora::CArrayStreamer::CArrayStreamer( const Reflex::Type& objectType,
00314 MappingElement& mapping,
00315 ContainerSchema& contSchema ):
00316 m_objectType( objectType ),
00317 m_mapping( mapping ),
00318 m_schema( contSchema ){
00319 }
00320
00321 ora::CArrayStreamer::~CArrayStreamer(){
00322 }
00323
00324 ora::IRelationalWriter* ora::CArrayStreamer::newWriter(){
00325 return new CArrayWriter( m_objectType, m_mapping, m_schema );
00326 }
00327
00328 ora::IRelationalUpdater* ora::CArrayStreamer::newUpdater(){
00329 return new CArrayUpdater( m_objectType, m_mapping, m_schema );
00330 }
00331
00332 ora::IRelationalReader* ora::CArrayStreamer::newReader(){
00333 return new CArrayReader( m_objectType, m_mapping, m_schema );
00334 }