CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_2_9/src/CondCore/ORA/src/QueryableVectorStreamer.cc

Go to the documentation of this file.
00001 #include "CondCore/ORA/interface/Exception.h"
00002 #include "CondCore/ORA/interface/Selection.h"
00003 #include "CondCore/ORA/interface/QueryableVector.h"
00004 #include "QueryableVectorStreamer.h"
00005 #include "MultiRecordInsertOperation.h"
00006 #include "RelationalStreamerFactory.h"
00007 #include "MappingElement.h"
00008 #include "ContainerSchema.h"
00009 #include "ArrayHandlerFactory.h"
00010 #include "ClassUtils.h"
00011 #include "IArrayHandler.h"
00012 #include "ArrayCommonImpl.h"
00013 #include "RelationalBuffer.h"
00014 // externals
00015 #include "RelationalAccess/IBulkOperation.h"
00016 #include "Reflex/Member.h"
00017 #include "Reflex/Base.h"
00018 #include "CoralBase/Attribute.h"
00019 
00020 namespace ora {
00021 
00022   class QVReader {
00023     public:
00024       QVReader( const Reflex::Type& objectType, MappingElement& mapping, ContainerSchema& contSchema ):
00025         m_objectType( objectType ),
00026         m_localElement(),
00027         m_reader( ClassUtils::containerSubType(objectType,"store_base_type"), mapping, contSchema ){
00028         m_localElement.clear();
00029         InputRelationalData dummy;
00030         m_reader.build( m_localElement,dummy ); 
00031       }
00032 
00033     void read( const std::vector<int>& fullId, void* destinationAddress){
00034       m_reader.select( fullId[0] );
00035       std::vector<int> recordId;
00036       for(size_t i=1;i<fullId.size();i++) {
00037          recordId.push_back( fullId[i] );
00038       }
00039       m_reader.setRecordId(recordId);
00040       m_reader.read( destinationAddress );
00041       m_reader.clear();
00042     } 
00043 
00044     private:
00045       Reflex::Type m_objectType;
00046       DataElement m_localElement;
00047       PVectorReader m_reader;
00048   };
00049 
00050   class QVQueryMaker {
00051     public:
00052       QVQueryMaker( const Reflex::Type& objectType, MappingElement& mapping, ContainerSchema& contSchema ):
00053         m_objectType( objectType ),
00054         m_mappingElement( mapping ),
00055         m_schema( contSchema ),
00056         m_recordId(),
00057         m_localElement(),
00058         m_query(),
00059         m_arrayHandler(),
00060         m_oid(-1){
00061       }
00062 
00063       ~QVQueryMaker(){
00064       }
00065       
00066       bool build(){
00067         m_localElement.clear();
00068         m_recordId.clear();
00069         // allocate for the index...
00070         m_recordId.push_back(0);
00071 
00072         RelationalStreamerFactory streamerFactory( m_schema );
00073         // first open the insert on the extra table...
00074         m_query.reset( new SelectOperation( m_mappingElement.tableName(), m_schema.storageSchema() ));
00075 
00076         m_query->addWhereId( m_mappingElement.pkColumn() );
00077         std::vector<std::string> recIdCols = m_mappingElement.recordIdColumns();
00078         for( size_t i=0; i<recIdCols.size(); i++ ){
00079           m_query->addId( recIdCols[ i ] );
00080           m_query->addOrderId( recIdCols[ i ] );
00081         }
00082 
00083         Reflex::Type storeBaseType = ClassUtils::containerSubType(m_objectType,"range_store_base_type");
00084         if( !storeBaseType ){
00085           throwException( "Missing dictionary information for the range store base type of the container \"" +
00086                           m_objectType.Name(Reflex::SCOPED) + "\"",
00087                           "QVQueryMaker::build" );          
00088         }
00089         
00090         m_arrayHandler.reset( ArrayHandlerFactory::newArrayHandler( storeBaseType ) );
00091 
00092         Reflex::Type valueType = ClassUtils::containerValueType(m_objectType);
00093         Reflex::Type valueResolvedType = ClassUtils::resolvedType(valueType);
00094         // Check the component type
00095         if ( ! valueType ||!valueResolvedType ) {
00096           throwException( "Missing dictionary information for the content type of the container \"" +
00097                           m_objectType.Name(Reflex::SCOPED) + "\"",
00098                           "QVQueryMaker::build" );
00099         }
00100         std::string valueName = valueType.Name();
00101         // Retrieve the relevant mapping element
00102         MappingElement::iterator iMe = m_mappingElement.find( valueName );
00103         if ( iMe == m_mappingElement.end() ) {
00104           throwException( "Item for \"" + valueName + "\" not found in the mapping element",
00105                           "QVQueryMaker::build" );
00106         }
00107 
00108         m_dataReader.reset( streamerFactory.newReader( valueResolvedType, iMe->second ) );
00109         m_dataReader->build( m_localElement, *m_query );
00110         return true;
00111       }
00112 
00113       void setQueryCondition( IRelationalData& queryData, const Selection& selection, MappingElement& mappingElement ){
00114         coral::AttributeList& whereData = queryData.whereData();
00115         // adding the selection conditions
00116         const std::vector<std::pair<std::string,std::string> >& theItems =  selection.items();
00117         std::stringstream cond;
00118         unsigned int i=0;
00119         for(std::vector<std::pair<std::string,std::string> >::const_iterator iItem = theItems.begin();
00120             iItem != theItems.end();
00121             ++iItem){
00122           cond << " AND ";
00123           std::string varName = Selection::variableNameFromUniqueString(iItem->first);
00124           std::stringstream selColumn;
00125           std::string colName("");
00126           if(varName == Selection::indexVariable()){
00127             colName = mappingElement.columnNames()[mappingElement.columnNames().size()-1]; // the position column is the last
00128             selColumn << colName<<"_"<<i;
00129             whereData.extend<int>(selColumn.str());
00130             whereData[selColumn.str()].data<int>() = selection.data()[iItem->first].data<int>();
00131           } else {
00132             MappingElement::iterator iElem = mappingElement.find("value_type");
00133             if ( iElem == mappingElement.end() ) {
00134               throwException( "Item for element \"value_type\" not found in the mapping element",
00135                               "QVQueryMaker::setQueryCondition" );
00136             }
00137             MappingElement& valueTypeElement = iElem->second;
00138             if( valueTypeElement.elementType()==MappingElement::Primitive ){
00139               if(varName!="value_type"){
00140                 throwException( "Item for element \"" + varName + "\" not found in the mapping element",
00141                                 "QVQueryMaker::setQueryCondition" );
00142               }
00143               colName = valueTypeElement.columnNames()[0];
00144             } else if( valueTypeElement.elementType()==MappingElement::Object ){
00145               MappingElement::iterator iInnerElem = valueTypeElement.find(varName);
00146               if ( iInnerElem == valueTypeElement.end() ) {
00147                 throwException( "Item for element \"" + varName + "\" not found in the mapping element",
00148                                 "QVQueryMaker::setQueryCondition" );
00149               }
00150               colName = iInnerElem->second.columnNames()[0];
00151             } else {
00152               throwException( "Queries cannot be executed on types mapped on "+
00153                               MappingElement::elementTypeAsString(valueTypeElement.elementType()),
00154                               "QVQueryMaker::setQueryCondition" );
00155             }
00156             selColumn << colName<<"_"<<i;
00157             whereData.extend(selColumn.str(),selection.data()[iItem->first].specification().type());
00158             whereData[selColumn.str()].setValueFromAddress(selection.data()[iItem->first].addressOfData());
00159           }
00160           cond << colName << " " << iItem->second << " :"<<selColumn.str();
00161           i++;
00162           selColumn.str("");
00163         }
00164 
00165         // add the resulting condition clause
00166         queryData.whereClause()+=cond.str();
00167       }
00168       
00169       void select( const std::vector<int>& fullId, const Selection& selection ){
00170         if(!m_query.get()){
00171           throwException("The reader has not been built.",
00172                          "QVReader::select");
00173         }
00174         
00175         m_oid = fullId[0];
00176         m_recordId.clear();
00177         for(size_t i=1;i<fullId.size();i++) {
00178           m_recordId.push_back( fullId[i] );
00179         }
00180         // allocate the element for the index...
00181         m_recordId.push_back( 0 );
00182 
00183         coral::AttributeList& whereData = m_query->whereData();
00184         whereData[ m_mappingElement.pkColumn() ].data<int>() = fullId[0];
00185 
00186         setQueryCondition( *m_query, selection, m_mappingElement );
00187         
00188         m_query->execute();
00189       }
00190 
00191       size_t selectionCount( const std::vector<int>& fullId, const Selection& selection ){
00192         SelectOperation countQuery( m_mappingElement.tableName(), m_schema.storageSchema() );
00193         std::string countColumn("COUNT(*)");
00194         countQuery.addData( countColumn ,typeid(int) );
00195         countQuery.addWhereId( m_mappingElement.pkColumn() );
00196         std::vector<std::string> recIdColumns = m_mappingElement.recordIdColumns();
00197         for( size_t i=0;i<recIdColumns.size();i++){
00198           countQuery.addWhereId( recIdColumns[i] );
00199         }
00200 
00201         coral::AttributeList& whereData = countQuery.whereData();
00202         // Fill-in the identities.
00203         whereData[ m_mappingElement.pkColumn() ].data<int>() = fullId[0];
00204         for ( size_t i=0;i<fullId.size();i++ ){
00205           whereData[ recIdColumns[i] ].data<int>() = fullId[i+1];
00206         }
00207         
00208         setQueryCondition( countQuery, selection, m_mappingElement );
00209         countQuery.execute();
00210 
00211         size_t result = 0;
00212         if( countQuery.nextCursorRow() ){
00213           coral::AttributeList& row = countQuery.data();
00214           result = row[countColumn].data<int>();
00215         }
00216         return result;
00217       }
00218       
00219       void executeAndLoad(void* address){
00220 
00221         if(!m_query.get()){
00222           throwException("The reader has not been built.",
00223                          "QVReader::read");
00224         }
00225         Reflex::Type iteratorDereferenceReturnType = m_arrayHandler->iteratorReturnType();
00226         Reflex::Member firstMember = iteratorDereferenceReturnType.MemberByName( "first" );
00227         if ( ! firstMember ) {
00228           throwException( "Could not retrieve the data member \"first\" of the class \"" +
00229                           iteratorDereferenceReturnType.Name(Reflex::SCOPED) + "\"",
00230                           "QVQueryMakerAndLoad::read" );
00231         }
00232         Reflex::Member secondMember = iteratorDereferenceReturnType.MemberByName( "second" );
00233         if ( ! secondMember ) {
00234           throwException( "Could not retrieve the data member \"second\" of the class \"" +
00235                           iteratorDereferenceReturnType.Name(Reflex::SCOPED) + "\"",
00236                           "QVQueryMakerAndLoad::read" );
00237         }
00238 
00239         m_arrayHandler->clear( address );
00240 
00241         unsigned int i=0;
00242         while ( m_query->nextCursorRow() ){
00243 
00244           // Create a new element for the array
00245           void* objectData = iteratorDereferenceReturnType.Construct().Address();
00246           void* positionData = static_cast< char* >( objectData ) + firstMember.Offset();
00247           void* containerData = static_cast< char* >( objectData ) + secondMember.Offset();
00248 
00249           m_recordId[m_recordId.size()-1] = (int)i;
00250           coral::AttributeList& row = m_query->data();
00251 
00252           *(size_t*)positionData = (size_t)(row[m_mappingElement.posColumn()].data<int>());
00253     
00254           m_dataReader->setRecordId( m_recordId );
00255           m_dataReader->select( m_oid );
00256           m_dataReader->read( containerData );
00257 
00258           size_t prevSize = m_arrayHandler->size( address );
00259           m_arrayHandler->appendNewElement( address, objectData );
00260           bool inserted = m_arrayHandler->size( address )>prevSize;
00261           
00262           iteratorDereferenceReturnType.Destruct( objectData );
00263           if ( !inserted ) {
00264             throwException( "Could not insert a new element in the array type \"" +
00265                             m_objectType.Name(Reflex::SCOPED|Reflex::FINAL) + "\"",
00266                             "QVQueryMakerAndLoad::executeAndLoad" );
00267           }
00268           ++i;
00269         }
00270 
00271         m_arrayHandler->finalize( address );
00272         m_query->clear();
00273       }
00274 
00275     private:
00276       Reflex::Type m_objectType;
00277       MappingElement& m_mappingElement;
00278       ContainerSchema& m_schema;
00279       std::vector<int> m_recordId;
00280       DataElement m_localElement;
00281       std::auto_ptr<SelectOperation> m_query;
00282       std::auto_ptr<IArrayHandler> m_arrayHandler;
00283       std::auto_ptr<IRelationalReader> m_dataReader;
00284       int m_oid;
00285   };  
00286 
00287   class QueryableVectorLoader: public IVectorLoader {
00288 
00289       public:
00290 
00291         // constructor
00292       QueryableVectorLoader( const Reflex::Type& objectType, MappingElement& mapping, ContainerSchema& contSchema,
00293                               const std::vector<int>& fullId ):
00294         m_isValid(true),
00295         m_reader( objectType, mapping, contSchema ),
00296         m_queryMaker( objectType, mapping, contSchema ),
00297         m_identity(fullId){
00298       }
00299 
00300       // destructor
00301       virtual ~QueryableVectorLoader(){
00302       }
00303 
00304       public:
00305 
00306       // triggers the data loading
00307       bool load(void* address) const {
00308         bool ret = false;
00309         if(m_isValid) {
00310           m_reader.read( m_identity, address );
00311           ret = true;
00312         }
00313         return ret;
00314       }
00315 
00316       bool loadSelection(const Selection& selection, void* address) const {
00317         bool ret = false;
00318         if(m_isValid) {
00319           m_queryMaker.build();
00320           m_queryMaker.select( m_identity, selection );
00321           m_queryMaker.executeAndLoad( address );
00322           ret = true;
00323         }
00324         return ret;
00325       }
00326 
00327       size_t getSelectionCount( const Selection& selection ) const {
00328         size_t ret = 0;
00329         if(m_isValid) {
00330           ret = m_queryMaker.selectionCount( m_identity, selection );
00331         }
00332         return ret;
00333       }
00334 
00335       void invalidate(){
00336         m_isValid = false;
00337       }
00338 
00339       bool isValid() const{
00340         return m_isValid;
00341       }
00342         
00343       private:
00344         bool m_isValid;
00345         mutable QVReader m_reader;
00346         mutable QVQueryMaker m_queryMaker;
00347         std::vector<int> m_identity;
00348     };
00349   
00350 }
00351 
00352 ora::QueryableVectorWriter::QueryableVectorWriter( const Reflex::Type& objectType,
00353                                                    MappingElement& mapping,
00354                                                    ContainerSchema& contSchema ):
00355   m_objectType( objectType ),
00356   m_offset( 0 ),
00357   m_localElement(),
00358   m_writer(ClassUtils::containerSubType(objectType,"store_base_type"), mapping, contSchema ){
00359 }
00360 
00361 ora::QueryableVectorWriter::~QueryableVectorWriter(){
00362 }
00363 
00364 bool ora::QueryableVectorWriter::build( DataElement& offset,
00365                                         IRelationalData& data,
00366                                         RelationalBuffer& operationBuffer ){
00367   m_offset = &offset;
00368   m_localElement.clear();
00369   return m_writer.build( m_localElement, data, operationBuffer );
00370 }
00371 
00372 void ora::QueryableVectorWriter::setRecordId( const std::vector<int>& identity ){
00373   m_writer.setRecordId( identity );
00374 }
00375       
00376 void ora::QueryableVectorWriter::write( int oid,
00377                                         const void* inputData ){
00378   if(!m_offset){
00379     throwException("The streamer has not been built.",
00380                    "QueryableVectorWriter::write");
00381   }
00382   void* vectorAddress = m_offset->address( inputData );
00383   Reflex::Object vectorObj( m_objectType,const_cast<void*>(vectorAddress));
00384   vectorObj.Invoke("load",0);
00385   void* storageAddress = 0;
00386   vectorObj.Invoke("storageAddress",storageAddress);
00387   m_writer.write( oid, storageAddress );
00388 }
00389 
00390 ora::QueryableVectorUpdater::QueryableVectorUpdater(const Reflex::Type& objectType,
00391                                                     MappingElement& mapping,
00392                                                     ContainerSchema& contSchema ):
00393   m_objectType( objectType ),
00394   m_offset( 0 ),
00395   m_localElement(),
00396   m_updater( ClassUtils::containerSubType(objectType,"store_base_type"), mapping, contSchema ){
00397 }
00398 
00399 ora::QueryableVectorUpdater::~QueryableVectorUpdater(){
00400 }
00401 
00402 bool ora::QueryableVectorUpdater::build( DataElement& offset,
00403                                          IRelationalData& relationalData,
00404                                          RelationalBuffer& operationBuffer){
00405   m_offset = &offset;
00406   m_localElement.clear();
00407   return m_updater.build( m_localElement, relationalData, operationBuffer );
00408 }
00409 
00410 void ora::QueryableVectorUpdater::setRecordId( const std::vector<int>& identity ){
00411   m_updater.setRecordId( identity );
00412 }
00413 
00414 void ora::QueryableVectorUpdater::update( int oid,
00415                                           const void* data ){
00416   if(!m_offset){
00417     throwException("The streamer has not been built.",
00418                    "QueryableVectorUpdater::update");
00419   }
00420   void* vectorAddress = m_offset->address( data );
00421   Reflex::Object vectorObj( m_objectType,const_cast<void*>(vectorAddress));
00422   vectorObj.Invoke("load",0);
00423   void* storageAddress = 0;
00424   vectorObj.Invoke("storageAddress",storageAddress);
00425   m_updater.update( oid, storageAddress );
00426 }  
00427   
00428 ora::QueryableVectorReader::QueryableVectorReader(const Reflex::Type& objectType,
00429                                                   MappingElement& mapping,
00430                                                   ContainerSchema& contSchema ):
00431   m_objectType(objectType),
00432   m_mapping( mapping ),
00433   m_schema( contSchema ),
00434   m_dataElement( 0 ),
00435   m_loaders(),
00436   m_tmpIds(){
00437 }
00438 
00439 ora::QueryableVectorReader::~QueryableVectorReader(){
00440   for(std::vector<boost::shared_ptr<IVectorLoader> >::const_iterator iL = m_loaders.begin();
00441       iL != m_loaders.end(); ++iL ){
00442     (*iL)->invalidate();
00443   }
00444 }
00445 
00446 bool ora::QueryableVectorReader::build( DataElement& dataElement,
00447                                         IRelationalData& ){
00448   m_dataElement = &dataElement;
00449   m_tmpIds.clear();
00450   m_tmpIds.push_back(0);
00451   return true;
00452 }
00453 
00454 void ora::QueryableVectorReader::select( int oid ){
00455   m_tmpIds[0] = oid;
00456 }
00457 
00458 void ora::QueryableVectorReader::setRecordId( const std::vector<int>& identity ){
00459   m_tmpIds.resize( 1+identity.size() );
00460   for( size_t i=0;i<identity.size();i++){
00461     m_tmpIds[1+i] = identity[i];
00462   }
00463 }
00464 
00465 void ora::QueryableVectorReader::read( void* destinationData ) {
00466   if(!m_dataElement){
00467     throwException("The streamer has not been built.",
00468                    "QueryableVectorReader::read");
00469   }
00470 
00471   // finding the address (on the instance) where to install the loader
00472   Reflex::Member loaderMember = m_objectType.DataMemberByName("m_loader");
00473   if(!loaderMember){
00474     throwException("The loader member has not been found.",
00475                    "QueryableVectorReader::read");     
00476   }
00477   DataElement& loaderMemberElement = m_dataElement->addChild( loaderMember.Offset(), 0 );
00478   void* loaderAddress = loaderMemberElement.address( destinationData );
00479 
00480   // creating and registering the new loader to assign
00481   boost::shared_ptr<IVectorLoader> loader( new QueryableVectorLoader( m_objectType, m_mapping, m_schema, m_tmpIds ) );
00482   m_loaders.push_back(loader);
00483   // installing the loader
00484   *(static_cast<boost::shared_ptr<IVectorLoader>*>( loaderAddress )) = loader;
00485 }
00486 
00487 void ora::QueryableVectorReader::clear(){
00488 }
00489 
00490 
00491 ora::QueryableVectorStreamer::QueryableVectorStreamer( const Reflex::Type& objectType,
00492                                                        MappingElement& mapping,
00493                                                        ContainerSchema& contSchema ):
00494   m_objectType( objectType ),
00495   m_mapping( mapping ),
00496   m_schema( contSchema ){
00497 }
00498 
00499 ora::QueryableVectorStreamer::~QueryableVectorStreamer(){
00500 }
00501 
00502 ora::IRelationalWriter* ora::QueryableVectorStreamer::newWriter(){
00503   return new QueryableVectorWriter( m_objectType, m_mapping, m_schema );
00504 }
00505 
00506 ora::IRelationalUpdater* ora::QueryableVectorStreamer::newUpdater(){
00507   return new QueryableVectorUpdater( m_objectType, m_mapping, m_schema );
00508 }
00509 
00510 ora::IRelationalReader* ora::QueryableVectorStreamer::newReader(){
00511   return new QueryableVectorReader( m_objectType, m_mapping, m_schema );
00512 }