00001 #include "CondCore/ORA/interface/Exception.h"
00002 #include "CondCore/ORA/interface/Selection.h"
00003 #include "CondCore/ORA/interface/QueryableVector.h"
00004 #include "QueryableVectorStreamer.h"
00005 #include "MultiRecordInsertOperation.h"
00006 #include "RelationalStreamerFactory.h"
00007 #include "MappingElement.h"
00008 #include "ContainerSchema.h"
00009 #include "ArrayHandlerFactory.h"
00010 #include "ClassUtils.h"
00011 #include "IArrayHandler.h"
00012 #include "ArrayCommonImpl.h"
00013 #include "RelationalBuffer.h"
00014
00015 #include "RelationalAccess/IBulkOperation.h"
00016 #include "Reflex/Member.h"
00017 #include "Reflex/Base.h"
00018 #include "CoralBase/Attribute.h"
00019
00020 namespace ora {
00021
00022 class QVReader {
00023 public:
00024 QVReader( const Reflex::Type& objectType, MappingElement& mapping, ContainerSchema& contSchema ):
00025 m_objectType( objectType ),
00026 m_localElement(),
00027 m_reader( ClassUtils::containerSubType(objectType,"store_base_type"), mapping, contSchema ){
00028 m_localElement.clear();
00029 InputRelationalData dummy;
00030 m_reader.build( m_localElement,dummy );
00031 }
00032
00033 void read( const std::vector<int>& fullId, void* destinationAddress){
00034 m_reader.select( fullId[0] );
00035 std::vector<int> recordId;
00036 for(size_t i=1;i<fullId.size();i++) {
00037 recordId.push_back( fullId[i] );
00038 }
00039 m_reader.setRecordId(recordId);
00040 m_reader.read( destinationAddress );
00041 m_reader.clear();
00042 }
00043
00044 private:
00045 Reflex::Type m_objectType;
00046 DataElement m_localElement;
00047 PVectorReader m_reader;
00048 };
00049
00050 class QVQueryMaker {
00051 public:
00052 QVQueryMaker( const Reflex::Type& objectType, MappingElement& mapping, ContainerSchema& contSchema ):
00053 m_objectType( objectType ),
00054 m_mappingElement( mapping ),
00055 m_schema( contSchema ),
00056 m_recordId(),
00057 m_localElement(),
00058 m_query(),
00059 m_arrayHandler(),
00060 m_oid(-1){
00061 }
00062
00063 ~QVQueryMaker(){
00064 }
00065
00066 bool build(){
00067 m_localElement.clear();
00068 m_recordId.clear();
00069
00070 m_recordId.push_back(0);
00071
00072 RelationalStreamerFactory streamerFactory( m_schema );
00073
00074 m_query.reset( new SelectOperation( m_mappingElement.tableName(), m_schema.storageSchema() ));
00075
00076 m_query->addWhereId( m_mappingElement.pkColumn() );
00077 std::vector<std::string> recIdCols = m_mappingElement.recordIdColumns();
00078 for( size_t i=0; i<recIdCols.size(); i++ ){
00079 m_query->addId( recIdCols[ i ] );
00080 m_query->addOrderId( recIdCols[ i ] );
00081 }
00082
00083 Reflex::Type storeBaseType = ClassUtils::containerSubType(m_objectType,"range_store_base_type");
00084 if( !storeBaseType ){
00085 throwException( "Missing dictionary information for the range store base type of the container \"" +
00086 m_objectType.Name(Reflex::SCOPED) + "\"",
00087 "QVQueryMaker::build" );
00088 }
00089
00090 m_arrayHandler.reset( ArrayHandlerFactory::newArrayHandler( storeBaseType ) );
00091
00092 Reflex::Type valueType = ClassUtils::containerValueType(m_objectType);
00093 Reflex::Type valueResolvedType = ClassUtils::resolvedType(valueType);
00094
00095 if ( ! valueType ||!valueResolvedType ) {
00096 throwException( "Missing dictionary information for the content type of the container \"" +
00097 m_objectType.Name(Reflex::SCOPED) + "\"",
00098 "QVQueryMaker::build" );
00099 }
00100 std::string valueName = valueType.Name();
00101
00102 MappingElement::iterator iMe = m_mappingElement.find( valueName );
00103 if ( iMe == m_mappingElement.end() ) {
00104 throwException( "Item for \"" + valueName + "\" not found in the mapping element",
00105 "QVQueryMaker::build" );
00106 }
00107
00108 m_dataReader.reset( streamerFactory.newReader( valueResolvedType, iMe->second ) );
00109 m_dataReader->build( m_localElement, *m_query );
00110 return true;
00111 }
00112
00113 void setQueryCondition( IRelationalData& queryData, const Selection& selection, MappingElement& mappingElement ){
00114 coral::AttributeList& whereData = queryData.whereData();
00115
00116 const std::vector<std::pair<std::string,std::string> >& theItems = selection.items();
00117 std::stringstream cond;
00118 unsigned int i=0;
00119 for(std::vector<std::pair<std::string,std::string> >::const_iterator iItem = theItems.begin();
00120 iItem != theItems.end();
00121 ++iItem){
00122 cond << " AND ";
00123 std::string varName = Selection::variableNameFromUniqueString(iItem->first);
00124 std::stringstream selColumn;
00125 std::string colName("");
00126 if(varName == Selection::indexVariable()){
00127 colName = mappingElement.columnNames()[mappingElement.columnNames().size()-1];
00128 selColumn << colName<<"_"<<i;
00129 whereData.extend<int>(selColumn.str());
00130 whereData[selColumn.str()].data<int>() = selection.data()[iItem->first].data<int>();
00131 } else {
00132 MappingElement::iterator iElem = mappingElement.find("value_type");
00133 if ( iElem == mappingElement.end() ) {
00134 throwException( "Item for element \"value_type\" not found in the mapping element",
00135 "QVQueryMaker::setQueryCondition" );
00136 }
00137 MappingElement& valueTypeElement = iElem->second;
00138 if( valueTypeElement.elementType()==MappingElement::Primitive ){
00139 if(varName!="value_type"){
00140 throwException( "Item for element \"" + varName + "\" not found in the mapping element",
00141 "QVQueryMaker::setQueryCondition" );
00142 }
00143 colName = valueTypeElement.columnNames()[0];
00144 } else if( valueTypeElement.elementType()==MappingElement::Object ){
00145 MappingElement::iterator iInnerElem = valueTypeElement.find(varName);
00146 if ( iInnerElem == valueTypeElement.end() ) {
00147 throwException( "Item for element \"" + varName + "\" not found in the mapping element",
00148 "QVQueryMaker::setQueryCondition" );
00149 }
00150 colName = iInnerElem->second.columnNames()[0];
00151 } else {
00152 throwException( "Queries cannot be executed on types mapped on "+
00153 MappingElement::elementTypeAsString(valueTypeElement.elementType()),
00154 "QVQueryMaker::setQueryCondition" );
00155 }
00156 selColumn << colName<<"_"<<i;
00157 whereData.extend(selColumn.str(),selection.data()[iItem->first].specification().type());
00158 whereData[selColumn.str()].setValueFromAddress(selection.data()[iItem->first].addressOfData());
00159 }
00160 cond << colName << " " << iItem->second << " :"<<selColumn.str();
00161 i++;
00162 selColumn.str("");
00163 }
00164
00165
00166 queryData.whereClause()+=cond.str();
00167 }
00168
00169 void select( const std::vector<int>& fullId, const Selection& selection ){
00170 if(!m_query.get()){
00171 throwException("The reader has not been built.",
00172 "QVReader::select");
00173 }
00174
00175 m_oid = fullId[0];
00176 m_recordId.clear();
00177 for(size_t i=1;i<fullId.size();i++) {
00178 m_recordId.push_back( fullId[i] );
00179 }
00180
00181 m_recordId.push_back( 0 );
00182
00183 coral::AttributeList& whereData = m_query->whereData();
00184 whereData[ m_mappingElement.pkColumn() ].data<int>() = fullId[0];
00185
00186 setQueryCondition( *m_query, selection, m_mappingElement );
00187
00188 m_query->execute();
00189 }
00190
00191 size_t selectionCount( const std::vector<int>& fullId, const Selection& selection ){
00192 SelectOperation countQuery( m_mappingElement.tableName(), m_schema.storageSchema() );
00193 std::string countColumn("COUNT(*)");
00194 countQuery.addData( countColumn ,typeid(int) );
00195 countQuery.addWhereId( m_mappingElement.pkColumn() );
00196 std::vector<std::string> recIdColumns = m_mappingElement.recordIdColumns();
00197 for( size_t i=0;i<recIdColumns.size();i++){
00198 countQuery.addWhereId( recIdColumns[i] );
00199 }
00200
00201 coral::AttributeList& whereData = countQuery.whereData();
00202
00203 whereData[ m_mappingElement.pkColumn() ].data<int>() = fullId[0];
00204 for ( size_t i=0;i<fullId.size();i++ ){
00205 whereData[ recIdColumns[i] ].data<int>() = fullId[i+1];
00206 }
00207
00208 setQueryCondition( countQuery, selection, m_mappingElement );
00209 countQuery.execute();
00210
00211 size_t result = 0;
00212 if( countQuery.nextCursorRow() ){
00213 coral::AttributeList& row = countQuery.data();
00214 result = row[countColumn].data<int>();
00215 }
00216 return result;
00217 }
00218
00219 void executeAndLoad(void* address){
00220
00221 if(!m_query.get()){
00222 throwException("The reader has not been built.",
00223 "QVReader::read");
00224 }
00225 Reflex::Type iteratorDereferenceReturnType = m_arrayHandler->iteratorReturnType();
00226 Reflex::Member firstMember = iteratorDereferenceReturnType.MemberByName( "first" );
00227 if ( ! firstMember ) {
00228 throwException( "Could not retrieve the data member \"first\" of the class \"" +
00229 iteratorDereferenceReturnType.Name(Reflex::SCOPED) + "\"",
00230 "QVQueryMakerAndLoad::read" );
00231 }
00232 Reflex::Member secondMember = iteratorDereferenceReturnType.MemberByName( "second" );
00233 if ( ! secondMember ) {
00234 throwException( "Could not retrieve the data member \"second\" of the class \"" +
00235 iteratorDereferenceReturnType.Name(Reflex::SCOPED) + "\"",
00236 "QVQueryMakerAndLoad::read" );
00237 }
00238
00239 m_arrayHandler->clear( address );
00240
00241 unsigned int i=0;
00242 while ( m_query->nextCursorRow() ){
00243
00244
00245 void* objectData = iteratorDereferenceReturnType.Construct().Address();
00246 void* positionData = static_cast< char* >( objectData ) + firstMember.Offset();
00247 void* containerData = static_cast< char* >( objectData ) + secondMember.Offset();
00248
00249 m_recordId[m_recordId.size()-1] = (int)i;
00250 coral::AttributeList& row = m_query->data();
00251
00252 *(size_t*)positionData = (size_t)(row[m_mappingElement.posColumn()].data<int>());
00253
00254 m_dataReader->setRecordId( m_recordId );
00255 m_dataReader->select( m_oid );
00256 m_dataReader->read( containerData );
00257
00258 size_t prevSize = m_arrayHandler->size( address );
00259 m_arrayHandler->appendNewElement( address, objectData );
00260 bool inserted = m_arrayHandler->size( address )>prevSize;
00261
00262 iteratorDereferenceReturnType.Destruct( objectData );
00263 if ( !inserted ) {
00264 throwException( "Could not insert a new element in the array type \"" +
00265 m_objectType.Name(Reflex::SCOPED|Reflex::FINAL) + "\"",
00266 "QVQueryMakerAndLoad::executeAndLoad" );
00267 }
00268 ++i;
00269 }
00270
00271 m_arrayHandler->finalize( address );
00272 m_query->clear();
00273 }
00274
00275 private:
00276 Reflex::Type m_objectType;
00277 MappingElement& m_mappingElement;
00278 ContainerSchema& m_schema;
00279 std::vector<int> m_recordId;
00280 DataElement m_localElement;
00281 std::auto_ptr<SelectOperation> m_query;
00282 std::auto_ptr<IArrayHandler> m_arrayHandler;
00283 std::auto_ptr<IRelationalReader> m_dataReader;
00284 int m_oid;
00285 };
00286
00287 class QueryableVectorLoader: public IVectorLoader {
00288
00289 public:
00290
00291
00292 QueryableVectorLoader( const Reflex::Type& objectType, MappingElement& mapping, ContainerSchema& contSchema,
00293 const std::vector<int>& fullId ):
00294 m_isValid(true),
00295 m_reader( objectType, mapping, contSchema ),
00296 m_queryMaker( objectType, mapping, contSchema ),
00297 m_identity(fullId){
00298 }
00299
00300
00301 virtual ~QueryableVectorLoader(){
00302 }
00303
00304 public:
00305
00306
00307 bool load(void* address) const {
00308 bool ret = false;
00309 if(m_isValid) {
00310 m_reader.read( m_identity, address );
00311 ret = true;
00312 }
00313 return ret;
00314 }
00315
00316 bool loadSelection(const Selection& selection, void* address) const {
00317 bool ret = false;
00318 if(m_isValid) {
00319 m_queryMaker.build();
00320 m_queryMaker.select( m_identity, selection );
00321 m_queryMaker.executeAndLoad( address );
00322 ret = true;
00323 }
00324 return ret;
00325 }
00326
00327 size_t getSelectionCount( const Selection& selection ) const {
00328 size_t ret = 0;
00329 if(m_isValid) {
00330 ret = m_queryMaker.selectionCount( m_identity, selection );
00331 }
00332 return ret;
00333 }
00334
00335 void invalidate(){
00336 m_isValid = false;
00337 }
00338
00339 bool isValid() const{
00340 return m_isValid;
00341 }
00342
00343 private:
00344 bool m_isValid;
00345 mutable QVReader m_reader;
00346 mutable QVQueryMaker m_queryMaker;
00347 std::vector<int> m_identity;
00348 };
00349
00350 }
00351
00352 ora::QueryableVectorWriter::QueryableVectorWriter( const Reflex::Type& objectType,
00353 MappingElement& mapping,
00354 ContainerSchema& contSchema ):
00355 m_objectType( objectType ),
00356 m_offset( 0 ),
00357 m_localElement(),
00358 m_writer(ClassUtils::containerSubType(objectType,"store_base_type"), mapping, contSchema ){
00359 }
00360
00361 ora::QueryableVectorWriter::~QueryableVectorWriter(){
00362 }
00363
00364 bool ora::QueryableVectorWriter::build( DataElement& offset,
00365 IRelationalData& data,
00366 RelationalBuffer& operationBuffer ){
00367 m_offset = &offset;
00368 m_localElement.clear();
00369 return m_writer.build( m_localElement, data, operationBuffer );
00370 }
00371
00372 void ora::QueryableVectorWriter::setRecordId( const std::vector<int>& identity ){
00373 m_writer.setRecordId( identity );
00374 }
00375
00376 void ora::QueryableVectorWriter::write( int oid,
00377 const void* inputData ){
00378 if(!m_offset){
00379 throwException("The streamer has not been built.",
00380 "QueryableVectorWriter::write");
00381 }
00382 void* vectorAddress = m_offset->address( inputData );
00383 Reflex::Object vectorObj( m_objectType,const_cast<void*>(vectorAddress));
00384 vectorObj.Invoke("load",0);
00385 void* storageAddress = 0;
00386 vectorObj.Invoke("storageAddress",storageAddress);
00387 m_writer.write( oid, storageAddress );
00388 }
00389
00390 ora::QueryableVectorUpdater::QueryableVectorUpdater(const Reflex::Type& objectType,
00391 MappingElement& mapping,
00392 ContainerSchema& contSchema ):
00393 m_objectType( objectType ),
00394 m_offset( 0 ),
00395 m_localElement(),
00396 m_updater( ClassUtils::containerSubType(objectType,"store_base_type"), mapping, contSchema ){
00397 }
00398
00399 ora::QueryableVectorUpdater::~QueryableVectorUpdater(){
00400 }
00401
00402 bool ora::QueryableVectorUpdater::build( DataElement& offset,
00403 IRelationalData& relationalData,
00404 RelationalBuffer& operationBuffer){
00405 m_offset = &offset;
00406 m_localElement.clear();
00407 return m_updater.build( m_localElement, relationalData, operationBuffer );
00408 }
00409
00410 void ora::QueryableVectorUpdater::setRecordId( const std::vector<int>& identity ){
00411 m_updater.setRecordId( identity );
00412 }
00413
00414 void ora::QueryableVectorUpdater::update( int oid,
00415 const void* data ){
00416 if(!m_offset){
00417 throwException("The streamer has not been built.",
00418 "QueryableVectorUpdater::update");
00419 }
00420 void* vectorAddress = m_offset->address( data );
00421 Reflex::Object vectorObj( m_objectType,const_cast<void*>(vectorAddress));
00422 vectorObj.Invoke("load",0);
00423 void* storageAddress = 0;
00424 vectorObj.Invoke("storageAddress",storageAddress);
00425 m_updater.update( oid, storageAddress );
00426 }
00427
00428 ora::QueryableVectorReader::QueryableVectorReader(const Reflex::Type& objectType,
00429 MappingElement& mapping,
00430 ContainerSchema& contSchema ):
00431 m_objectType(objectType),
00432 m_mapping( mapping ),
00433 m_schema( contSchema ),
00434 m_dataElement( 0 ),
00435 m_loaders(),
00436 m_tmpIds(){
00437 }
00438
00439 ora::QueryableVectorReader::~QueryableVectorReader(){
00440 for(std::vector<boost::shared_ptr<IVectorLoader> >::const_iterator iL = m_loaders.begin();
00441 iL != m_loaders.end(); ++iL ){
00442 (*iL)->invalidate();
00443 }
00444 }
00445
00446 bool ora::QueryableVectorReader::build( DataElement& dataElement,
00447 IRelationalData& ){
00448 m_dataElement = &dataElement;
00449 m_tmpIds.clear();
00450 m_tmpIds.push_back(0);
00451 return true;
00452 }
00453
00454 void ora::QueryableVectorReader::select( int oid ){
00455 m_tmpIds[0] = oid;
00456 }
00457
00458 void ora::QueryableVectorReader::setRecordId( const std::vector<int>& identity ){
00459 m_tmpIds.resize( 1+identity.size() );
00460 for( size_t i=0;i<identity.size();i++){
00461 m_tmpIds[1+i] = identity[i];
00462 }
00463 }
00464
00465 void ora::QueryableVectorReader::read( void* destinationData ) {
00466 if(!m_dataElement){
00467 throwException("The streamer has not been built.",
00468 "QueryableVectorReader::read");
00469 }
00470
00471
00472 Reflex::Member loaderMember = m_objectType.DataMemberByName("m_loader");
00473 if(!loaderMember){
00474 throwException("The loader member has not been found.",
00475 "QueryableVectorReader::read");
00476 }
00477 DataElement& loaderMemberElement = m_dataElement->addChild( loaderMember.Offset(), 0 );
00478 void* loaderAddress = loaderMemberElement.address( destinationData );
00479
00480
00481 boost::shared_ptr<IVectorLoader> loader( new QueryableVectorLoader( m_objectType, m_mapping, m_schema, m_tmpIds ) );
00482 m_loaders.push_back(loader);
00483
00484 *(static_cast<boost::shared_ptr<IVectorLoader>*>( loaderAddress )) = loader;
00485 }
00486
00487 void ora::QueryableVectorReader::clear(){
00488 }
00489
00490
00491 ora::QueryableVectorStreamer::QueryableVectorStreamer( const Reflex::Type& objectType,
00492 MappingElement& mapping,
00493 ContainerSchema& contSchema ):
00494 m_objectType( objectType ),
00495 m_mapping( mapping ),
00496 m_schema( contSchema ){
00497 }
00498
00499 ora::QueryableVectorStreamer::~QueryableVectorStreamer(){
00500 }
00501
00502 ora::IRelationalWriter* ora::QueryableVectorStreamer::newWriter(){
00503 return new QueryableVectorWriter( m_objectType, m_mapping, m_schema );
00504 }
00505
00506 ora::IRelationalUpdater* ora::QueryableVectorStreamer::newUpdater(){
00507 return new QueryableVectorUpdater( m_objectType, m_mapping, m_schema );
00508 }
00509
00510 ora::IRelationalReader* ora::QueryableVectorStreamer::newReader(){
00511 return new QueryableVectorReader( m_objectType, m_mapping, m_schema );
00512 }