00001 #include "CondCore/ORA/interface/Exception.h"
00002 #include "CondCore/ORA/interface/Selection.h"
00003 #include "CondCore/ORA/interface/QueryableVectorData.h"
00004 #include "QueryableVectorStreamer.h"
00005 #include "MultiRecordInsertOperation.h"
00006 #include "RelationalStreamerFactory.h"
00007 #include "MappingElement.h"
00008 #include "ContainerSchema.h"
00009 #include "ArrayHandlerFactory.h"
00010 #include "ClassUtils.h"
00011 #include "IArrayHandler.h"
00012 #include "ArrayCommonImpl.h"
00013 #include "RelationalBuffer.h"
00014
00015 #include "RelationalAccess/IBulkOperation.h"
00016 #include "Reflex/Member.h"
00017
00018 namespace ora {
00019
00020 class QVReader {
00021 public:
00022 QVReader( const Reflex::Type& objectType, MappingElement& mapping, ContainerSchema& contSchema ):
00023 m_objectType( objectType ),
00024 m_mappingElement( mapping ),
00025 m_schema( contSchema ),
00026 m_recordId(),
00027 m_localElement(),
00028 m_query(),
00029 m_arrayHandler(),
00030 m_dataReader(),
00031 m_oid(-1){
00032 }
00033
00034 ~QVReader(){
00035 }
00036
00037 bool build(){
00038 m_localElement.clear();
00039 m_recordId.clear();
00040
00041 m_recordId.push_back(0);
00042
00043 RelationalStreamerFactory streamerFactory( m_schema );
00044
00045 m_query.reset( new SelectOperation( m_mappingElement.tableName(), m_schema.storageSchema() ));
00046
00047 m_query->addWhereId( m_mappingElement.pkColumn() );
00048 std::vector<std::string> recIdCols = m_mappingElement.recordIdColumns();
00049 for( size_t i=0; i<recIdCols.size(); i++ ){
00050 m_query->addId( recIdCols[ i ] );
00051 m_query->addOrderId( recIdCols[ i ] );
00052 }
00053
00054 Reflex::Type storeBaseType = ClassUtils::containerSubType(m_objectType,"store_base_type");
00055 if( !storeBaseType ){
00056 throwException( "Missing dictionary information for the store base type of the container \"" +
00057 m_objectType.Name(Reflex::SCOPED) + "\"",
00058 "QueryableVectorReadBuffer::build" );
00059 }
00060
00061 m_arrayHandler.reset( ArrayHandlerFactory::newArrayHandler( storeBaseType ) );
00062
00063 Reflex::Type valueType = ClassUtils::containerValueType(m_objectType);
00064 Reflex::Type valueResolvedType = ClassUtils::resolvedType(valueType);
00065
00066 if ( ! valueType ||!valueResolvedType ) {
00067 throwException( "Missing dictionary information for the content type of the container \"" +
00068 m_objectType.Name(Reflex::SCOPED) + "\"",
00069 "QueryableVectorReadBuffer::build" );
00070 }
00071 std::string valueName = valueType.Name();
00072
00073 MappingElement::iterator iMe = m_mappingElement.find( valueName );
00074 if ( iMe == m_mappingElement.end() ) {
00075 throwException( "Item for \"" + valueName + "\" not found in the mapping element",
00076 "QueryableVectorReadBuffer::build" );
00077 }
00078
00079 m_dataReader.reset( streamerFactory.newReader( valueResolvedType, iMe->second ) );
00080 m_dataReader->build( m_localElement, *m_query );
00081 return true;
00082 }
00083
00084 void setQueryCondition( IRelationalData& queryData, const Selection& selection, MappingElement& mappingElement ){
00085 coral::AttributeList& whereData = queryData.whereData();
00086
00087 const std::vector<std::pair<std::string,std::string> >& theItems = selection.items();
00088 std::stringstream cond;
00089 unsigned int i=0;
00090 for(std::vector<std::pair<std::string,std::string> >::const_iterator iItem = theItems.begin();
00091 iItem != theItems.end();
00092 ++iItem){
00093 cond << " AND ";
00094 std::string varName = Selection::variableNameFromUniqueString(iItem->first);
00095 std::stringstream selColumn;
00096 std::string colName("");
00097 if(varName == Selection::indexVariable()){
00098 colName = mappingElement.columnNames()[mappingElement.columnNames().size()-1];
00099 selColumn << colName<<"_"<<i;
00100 whereData.extend<int>(selColumn.str());
00101 whereData[selColumn.str()].data<int>() = selection.data()[iItem->first].data<int>();
00102 } else {
00103 MappingElement::iterator iElem = mappingElement.find("value_type");
00104 if ( iElem == mappingElement.end() ) {
00105 throwException( "Item for element \"value_type\" not found in the mapping element",
00106 "QueryableVectorReadBuffer::setQueryCondition" );
00107 }
00108 MappingElement& valueTypeElement = iElem->second;
00109 if( valueTypeElement.elementType()==MappingElement::Primitive ){
00110 if(varName!="value_type"){
00111 throwException( "Item for element \"" + varName + "\" not found in the mapping element",
00112 "QueryableVectorReadBuffer::setQueryCondition" );
00113 }
00114 colName = valueTypeElement.columnNames()[0];
00115 } else if( valueTypeElement.elementType()==MappingElement::Object ){
00116 MappingElement::iterator iInnerElem = valueTypeElement.find(varName);
00117 if ( iInnerElem == valueTypeElement.end() ) {
00118 throwException( "Item for element \"" + varName + "\" not found in the mapping element",
00119 "QueryableVectorReadBuffer::setQueryCondition" );
00120 }
00121 colName = iInnerElem->second.columnNames()[0];
00122 } else {
00123 throwException( "Queries cannot be executed on types mapped on "+
00124 MappingElement::elementTypeAsString(valueTypeElement.elementType()),
00125 "QueryableVectorReadBuffer::setQueryCondition" );
00126 }
00127 selColumn << colName<<"_"<<i;
00128 whereData.extend(selColumn.str(),selection.data()[iItem->first].specification().type());
00129 whereData[selColumn.str()].setValueFromAddress(selection.data()[iItem->first].addressOfData());
00130 }
00131 cond << colName << " " << iItem->second << " :"<<selColumn.str();
00132 i++;
00133 selColumn.str("");
00134 }
00135
00136
00137 queryData.whereClause()+=cond.str();
00138 }
00139
00140
00141 void select( const std::vector<int>& fullId ){
00142 if(!m_query.get()){
00143 throwException("The reader has not been built.",
00144 "QVReader::select");
00145 }
00146
00147 m_oid = fullId[0];
00148 m_recordId.clear();
00149 for(size_t i=1;i<fullId.size();i++) {
00150 m_recordId.push_back( fullId[i] );
00151 }
00152
00153 m_recordId.push_back( 0 );
00154
00155 coral::AttributeList& whereData = m_query->whereData();
00156 whereData[ m_mappingElement.pkColumn() ].data<int>() = fullId[0];
00157 m_query->execute();
00158 }
00159
00160 void select( const std::vector<int>& fullId, const Selection& selection ){
00161 if(!m_query.get()){
00162 throwException("The reader has not been built.",
00163 "QVReader::select");
00164 }
00165
00166 m_oid = fullId[0];
00167 m_recordId.clear();
00168 for(size_t i=1;i<fullId.size();i++) {
00169 m_recordId.push_back( fullId[i] );
00170 }
00171
00172 m_recordId.push_back( 0 );
00173
00174 coral::AttributeList& whereData = m_query->whereData();
00175 whereData[ m_mappingElement.pkColumn() ].data<int>() = fullId[0];
00176
00177 setQueryCondition( *m_query, selection, m_mappingElement );
00178
00179 m_query->execute();
00180 }
00181
00182 size_t selectionCount( const std::vector<int>& fullId, const Selection& selection ){
00183 SelectOperation countQuery( m_mappingElement.tableName(), m_schema.storageSchema() );
00184 std::string countColumn("COUNT(*)");
00185 countQuery.addData( countColumn ,typeid(int) );
00186 countQuery.addWhereId( m_mappingElement.pkColumn() );
00187 std::vector<std::string> recIdColumns = m_mappingElement.recordIdColumns();
00188 for( size_t i=0;i<recIdColumns.size();i++){
00189 countQuery.addWhereId( recIdColumns[i] );
00190 }
00191
00192 coral::AttributeList& whereData = countQuery.whereData();
00193
00194 whereData[ m_mappingElement.pkColumn() ].data<int>() = fullId[0];
00195 for ( size_t i=0;i<fullId.size();i++ ){
00196 whereData[ recIdColumns[i] ].data<int>() = fullId[i+1];
00197 }
00198
00199 setQueryCondition( countQuery, selection, m_mappingElement );
00200 countQuery.execute();
00201
00202 size_t result = 0;
00203 if( countQuery.nextCursorRow() ){
00204 coral::AttributeList& row = countQuery.data();
00205 result = row[countColumn].data<int>();
00206 }
00207 return result;
00208 }
00209
00210
00211 void read(void* address){
00212
00213 if(!m_query.get()){
00214 throwException("The reader has not been built.",
00215 "QVReader::read");
00216 }
00217 Reflex::Type iteratorDereferenceReturnType = m_arrayHandler->iteratorReturnType();
00218 Reflex::Member firstMember = iteratorDereferenceReturnType.MemberByName( "first" );
00219 if ( ! firstMember ) {
00220 throwException( "Could not retrieve the data member \"first\" of the class \"" +
00221 iteratorDereferenceReturnType.Name(Reflex::SCOPED) + "\"",
00222 "QueryableVectorReadBuffer::read" );
00223 }
00224 Reflex::Member secondMember = iteratorDereferenceReturnType.MemberByName( "second" );
00225 if ( ! secondMember ) {
00226 throwException( "Could not retrieve the data member \"second\" of the class \"" +
00227 iteratorDereferenceReturnType.Name(Reflex::SCOPED) + "\"",
00228 "QueryableVectorReadBuffer::read" );
00229 }
00230
00231 m_arrayHandler->clear( address );
00232
00233 unsigned int i=0;
00234 while ( m_query->nextCursorRow() ){
00235
00236
00237 void* objectData = iteratorDereferenceReturnType.Construct().Address();
00238 void* positionData = static_cast< char* >( objectData ) + firstMember.Offset();
00239 void* containerData = static_cast< char* >( objectData ) + secondMember.Offset();
00240
00241 m_recordId[m_recordId.size()-1] = (int)i;
00242 coral::AttributeList& row = m_query->data();
00243
00244 *(size_t*)positionData = (size_t)(row[m_mappingElement.posColumn()].data<int>());
00245
00246 m_dataReader->setRecordId( m_recordId );
00247 m_dataReader->select( m_oid );
00248 m_dataReader->read( containerData );
00249
00250 size_t prevSize = m_arrayHandler->size( address );
00251 m_arrayHandler->appendNewElement( address, objectData );
00252 bool inserted = m_arrayHandler->size( address )>prevSize;
00253
00254 iteratorDereferenceReturnType.Destruct( objectData );
00255 if ( !inserted ) {
00256 throwException( "Could not insert a new element in the array type \"" +
00257 m_objectType.Name(Reflex::SCOPED|Reflex::FINAL) + "\"",
00258 "QueryableVectorReadBuffer::read" );
00259 }
00260 ++i;
00261 }
00262
00263 m_arrayHandler->finalize( address );
00264 }
00265
00266 private:
00267 Reflex::Type m_objectType;
00268 MappingElement& m_mappingElement;
00269 ContainerSchema& m_schema;
00270 std::vector<int> m_recordId;
00271 DataElement m_localElement;
00272 std::auto_ptr<SelectOperation> m_query;
00273 std::auto_ptr<IArrayHandler> m_arrayHandler;
00274 std::auto_ptr<IRelationalReader> m_dataReader;
00275 int m_oid;
00276 };
00277
00278 class RelationalVectorLoader: public IVectorLoader {
00279
00280 public:
00281
00282
00283 RelationalVectorLoader( const Reflex::Type& objectType, MappingElement& mapping, ContainerSchema& contSchema,
00284 const std::vector<int>& fullId ):
00285 m_isValid(true),
00286 m_reader( objectType, mapping, contSchema ),
00287 m_identity(fullId){
00288 }
00289
00290
00291 virtual ~RelationalVectorLoader(){
00292 }
00293
00294 public:
00295
00296
00297 bool load(void* address) const {
00298 bool ret = false;
00299 if(m_isValid) {
00300 m_reader.build();
00301 m_reader.select( m_identity );
00302 m_reader.read( address );
00303 ret = true;
00304 }
00305 return ret;
00306 }
00307
00308 bool loadSelection(const Selection& selection, void* address) const {
00309 bool ret = false;
00310 if(m_isValid) {
00311 m_reader.build();
00312 m_reader.select( m_identity, selection );
00313 m_reader.read( address );
00314 ret = true;
00315 }
00316 return ret;
00317 }
00318
00319 size_t getSelectionCount( const Selection& selection ) const {
00320 size_t ret = 0;
00321 if(m_isValid) {
00322 ret = m_reader.selectionCount( m_identity, selection );
00323 }
00324 return ret;
00325 }
00326
00327 void invalidate(){
00328 m_isValid = false;
00329 }
00330
00331 bool isValid() const{
00332 return m_isValid;
00333 }
00334
00335 private:
00336 bool m_isValid;
00337 mutable QVReader m_reader;
00338 std::vector<int> m_identity;
00339 };
00340
00341 }
00342
00343 ora::QueryableVectorWriter::QueryableVectorWriter( const Reflex::Type& objectType,
00344 MappingElement& mapping,
00345 ContainerSchema& contSchema ):
00346 m_objectType( objectType ),
00347 m_mappingElement( mapping ),
00348 m_schema( contSchema ),
00349 m_recordId(),
00350 m_localElement(),
00351 m_offset(0),
00352 m_insertOperation( 0 ),
00353 m_arrayHandler(){
00354 }
00355
00356 ora::QueryableVectorWriter::~QueryableVectorWriter(){
00357 }
00358
00359 bool ora::QueryableVectorWriter::build( DataElement& offset,
00360 IRelationalData& data,
00361 RelationalBuffer& operationBuffer ){
00362 m_localElement.clear();
00363 m_recordId.clear();
00364
00365 m_recordId.push_back(0);
00366
00367 RelationalStreamerFactory streamerFactory( m_schema );
00368
00369
00370 m_insertOperation = &operationBuffer.newMultiRecordInsert( m_mappingElement.tableName() );
00371 const std::vector<std::string>& columns = m_mappingElement.columnNames();
00372 if( !columns.size() ){
00373 throwException( "Id columns not found in the mapping.",
00374 "QueryableVectorWriter::build");
00375 }
00376 for( size_t i=0; i<columns.size(); i++ ){
00377 m_insertOperation->addId( columns[ i ] );
00378 }
00379
00380 m_offset = &offset;
00381
00382 Reflex::Type storeBaseType = ClassUtils::containerSubType(m_objectType,"store_base_type");
00383 if( !storeBaseType ){
00384 throwException( "Missing dictionary information for the store base type of the container \"" +
00385 m_objectType.Name(Reflex::SCOPED|Reflex::FINAL) + "\"",
00386 "QueryableVectorWriter::build" );
00387 }
00388
00389 m_arrayHandler.reset( ArrayHandlerFactory::newArrayHandler( storeBaseType ) );
00390
00391 Reflex::Type valueType = ClassUtils::containerValueType(m_objectType);
00392 Reflex::Type valueResolvedType = ClassUtils::resolvedType(valueType);
00393
00394 if ( ! valueType || !valueResolvedType ) {
00395 throwException( "Missing dictionary information for the content type of the container \"" +
00396 m_objectType.Name(Reflex::SCOPED|Reflex::FINAL) + "\"",
00397 "QueryableVectorWriter::build" );
00398 }
00399
00400 std::string valueName = valueType.Name();
00401
00402 MappingElement::iterator iMe = m_mappingElement.find( valueName );
00403 if ( iMe == m_mappingElement.end() ) {
00404 throwException( "Item for \"" + valueName + "\" not found in the mapping element",
00405 "QueryableVectorWriter::build" );
00406 }
00407
00408 m_dataWriter.reset( streamerFactory.newWriter( valueResolvedType, iMe->second ) );
00409 m_dataWriter->build( m_localElement, *m_insertOperation, operationBuffer );
00410 return true;
00411 }
00412
00413 void ora::QueryableVectorWriter::setRecordId( const std::vector<int>& identity ){
00414 m_recordId.clear();
00415 for(size_t i=0;i<identity.size();i++) {
00416 m_recordId.push_back( identity[i] );
00417 }
00418 m_recordId.push_back( 0 );
00419 }
00420
00421 void ora::QueryableVectorWriter::write( int oid,
00422 const void* inputData ){
00423
00424 if(!m_offset){
00425 throwException("The streamer has not been built.",
00426 "QueryableVectorWriter::write");
00427 }
00428
00429 const std::vector<std::string>& columns = m_mappingElement.columnNames();
00430 if( columns.size() != m_recordId.size()+1){
00431 throwException( "Object id elements provided are not matching with the mapped id columns.",
00432 "QueryableVectorWriter::write");
00433 }
00434
00435 void* vectorAddress = m_offset->address( inputData );
00436 Reflex::Object vectorObj( m_objectType,const_cast<void*>(vectorAddress));
00437 vectorObj.Invoke("load",0);
00438 void* storageAddress = 0;
00439 vectorObj.Invoke("storageAddress",storageAddress);
00440
00441
00442 size_t containerSize = m_arrayHandler->size( storageAddress );
00443 size_t persistentSize = m_arrayHandler->persistentSize( storageAddress );
00444
00445 if ( containerSize == 0 || containerSize < persistentSize ) return;
00446
00447 size_t startElementIndex = m_arrayHandler->startElementIndex( storageAddress );
00448
00449 std::auto_ptr<IArrayIteratorHandler> iteratorHandler( m_arrayHandler->iterate( storageAddress ) );
00450 const Reflex::Type& iteratorDereferenceReturnType = iteratorHandler->returnType();
00451 Reflex::Member secondMember = iteratorDereferenceReturnType.MemberByName( "second" );
00452 if ( ! secondMember ) {
00453 throwException( "Could not retrieve the data member \"second\" for the class \"" +
00454 iteratorDereferenceReturnType.Name(Reflex::SCOPED) + "\"",
00455 "QueryableVectorWriter::write" );
00456 }
00457
00458 InsertCache& bulkInsert = m_insertOperation->setUp( containerSize-startElementIndex+1 );
00459
00460 for ( size_t iIndex = startElementIndex; iIndex < containerSize; ++iIndex ) {
00461
00462 m_recordId[m_recordId.size()-1] = iIndex;
00463 coral::AttributeList& dataBuff = m_insertOperation->data();
00464
00465 dataBuff[ columns[0] ].data<int>() = oid;
00466 for( size_t i = 1;i < columns.size(); i++ ){
00467 dataBuff[ columns[i] ].data<int>() = m_recordId[i-1];
00468 }
00469
00470
00471 void* objectReference = iteratorHandler->object();
00472 void* componentData = static_cast< char* >( objectReference ) + secondMember.Offset();
00473
00474 m_dataWriter->setRecordId( m_recordId );
00475 m_dataWriter->write( oid, componentData );
00476
00477 bulkInsert.processNextIteration();
00478
00479
00480 iteratorHandler->increment();
00481 }
00482
00483
00484 m_arrayHandler->finalize( const_cast<void*>( storageAddress ) );
00485 }
00486
00487 ora::QueryableVectorUpdater::QueryableVectorUpdater(const Reflex::Type& objectType,
00488 MappingElement& mapping,
00489 ContainerSchema& contSchema ):
00490 m_buffer( 0 ),
00491 m_writer( objectType, mapping, contSchema ){
00492 }
00493
00494 ora::QueryableVectorUpdater::~QueryableVectorUpdater(){
00495 }
00496
00497 bool ora::QueryableVectorUpdater::build( DataElement& offset,
00498 IRelationalData& relationalData,
00499 RelationalBuffer& operationBuffer){
00500 m_buffer = &operationBuffer;
00501 return m_writer.build( offset, relationalData, operationBuffer );
00502 }
00503
00504 void ora::QueryableVectorUpdater::setRecordId( const std::vector<int>& identity ){
00505 m_writer.setRecordId( identity );
00506 }
00507
00508 void ora::QueryableVectorUpdater::update( int oid,
00509 const void* data ){
00510 if( !m_writer.dataElement() ){
00511 throwException("The streamer has not been built.",
00512 "QueryableVectorUpdater::update");
00513 }
00514
00515 void* vectorAddress = m_writer.dataElement()->address( data );
00516 Reflex::Object vectorObj( m_writer.objectType(),const_cast<void*>(vectorAddress));
00517 vectorObj.Invoke("load",0);
00518 void* storageAddress = 0;
00519 vectorObj.Invoke("storageAddress",storageAddress);
00520
00521 IArrayHandler& arrayHandler = *m_writer.arrayHandler();
00522
00523 size_t arraySize = arrayHandler.size(storageAddress);
00524 size_t persistentSize = arrayHandler.persistentSize(storageAddress);
00525 if(persistentSize>arraySize){
00526 deleteArrayElements( m_writer.mapping(), oid, arraySize, *m_buffer );
00527 }
00528 m_writer.write( oid, data );
00529 }
00530
00531
00532
00533
00534 ora::QueryableVectorReader::QueryableVectorReader(const Reflex::Type& objectType,
00535 MappingElement& mapping,
00536 ContainerSchema& contSchema ):
00537 m_objectType(objectType),
00538 m_mapping( mapping ),
00539 m_schema( contSchema ),
00540 m_dataElement( 0 ),
00541 m_loaders(),
00542 m_tmpIds(){
00543 }
00544
00545 ora::QueryableVectorReader::~QueryableVectorReader(){
00546 for(std::vector<boost::shared_ptr<IVectorLoader> >::const_iterator iL = m_loaders.begin();
00547 iL != m_loaders.end(); ++iL ){
00548 (*iL)->invalidate();
00549 }
00550 }
00551
00552 bool ora::QueryableVectorReader::build( DataElement& dataElement,
00553 IRelationalData& ){
00554 m_dataElement = &dataElement;
00555 m_tmpIds.clear();
00556 m_tmpIds.push_back(0);
00557 return true;
00558 }
00559
00560 void ora::QueryableVectorReader::select( int oid ){
00561 m_tmpIds[0] = oid;
00562 }
00563
00564 void ora::QueryableVectorReader::setRecordId( const std::vector<int>& identity ){
00565 m_tmpIds.resize( 1+identity.size() );
00566 for( size_t i=0;i<identity.size();i++){
00567 m_tmpIds[1+i] = identity[i];
00568 }
00569 }
00570
00571 void ora::QueryableVectorReader::read( void* destinationData ) {
00572 if(!m_dataElement){
00573 throwException("The streamer has not been built.",
00574 "QueryableVectorReader::read");
00575 }
00576
00577 void* arrayAddress = m_dataElement->address( destinationData );
00578
00579 boost::shared_ptr<IVectorLoader> loader( new RelationalVectorLoader( m_objectType, m_mapping, m_schema, m_tmpIds ) );
00580 m_loaders.push_back(loader);
00581
00582 LoaderClient* client = static_cast<LoaderClient*>( arrayAddress );
00583 client->install( loader );
00584 }
00585
00586 void ora::QueryableVectorReader::clear(){
00587 }
00588
00589
00590 ora::QueryableVectorStreamer::QueryableVectorStreamer( const Reflex::Type& objectType,
00591 MappingElement& mapping,
00592 ContainerSchema& contSchema ):
00593 m_objectType( objectType ),
00594 m_mapping( mapping ),
00595 m_schema( contSchema ){
00596 }
00597
00598 ora::QueryableVectorStreamer::~QueryableVectorStreamer(){
00599 }
00600
00601 ora::IRelationalWriter* ora::QueryableVectorStreamer::newWriter(){
00602 return new QueryableVectorWriter( m_objectType, m_mapping, m_schema );
00603 }
00604
00605 ora::IRelationalUpdater* ora::QueryableVectorStreamer::newUpdater(){
00606 return new QueryableVectorUpdater( m_objectType, m_mapping, m_schema );
00607 }
00608
00609 ora::IRelationalReader* ora::QueryableVectorStreamer::newReader(){
00610 return new QueryableVectorReader( m_objectType, m_mapping, m_schema );
00611 }