CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/CondCore/ORA/src/CArrayStreamer.cc

Go to the documentation of this file.
00001 #include "CondCore/ORA/interface/Exception.h"
00002 #include "CArrayStreamer.h"
00003 #include "ClassUtils.h"
00004 #include "MappingElement.h"
00005 #include "ContainerSchema.h"
00006 #include "RelationalBuffer.h"
00007 #include "MultiRecordInsertOperation.h"
00008 #include "MultiRecordSelectOperation.h"
00009 #include "RelationalStreamerFactory.h"
00010 #include "ArrayHandlerFactory.h"
00011 #include "IArrayHandler.h"
00012 // externals
00013 #include "CoralBase/Attribute.h"
00014 #include "RelationalAccess/IBulkOperation.h"
00015 #include "Reflex/Object.h"
00016 
00017 ora::CArrayWriter::CArrayWriter( const Reflex::Type& objectType,
00018                                  MappingElement& mapping,
00019                                  ContainerSchema& contSchema ):
00020   m_objectType( objectType ),
00021   m_mappingElement( mapping ),
00022   m_schema( contSchema ),
00023   m_recordId(),
00024   m_localElement( ),
00025   m_offset( 0 ),
00026   m_insertOperation( 0 ),
00027   m_arrayHandler(),
00028   m_dataWriter(){
00029 }
00030 
00031 ora::CArrayWriter::~CArrayWriter(){
00032 }
00033 
00034 bool ora::CArrayWriter::build( DataElement& offset,
00035                                IRelationalData&,
00036                                RelationalBuffer& operationBuffer ){
00037 
00038   m_localElement.clear();
00039   m_recordId.clear();
00040   // allocate for the index...
00041   m_recordId.push_back(0);
00042 
00043   // Check the array type
00044   Reflex::Type arrayType = m_objectType.ToType();
00045   Reflex::Type arrayResolvedType = ClassUtils::resolvedType(arrayType);
00046   // Check the component type
00047   if ( ! arrayType || !arrayResolvedType ) {
00048     throwException( "Missing dictionary information for the element type of the array \"" +
00049                     m_objectType.Name(Reflex::SCOPED|Reflex::FINAL) + "\"",
00050                     "CArrayWriter::build" );
00051   }
00052   
00053   RelationalStreamerFactory streamerFactory( m_schema );
00054   
00055   // first open the insert on the extra table...
00056   m_insertOperation = &operationBuffer.newMultiRecordInsert( m_mappingElement.tableName() );
00057   const std::vector<std::string>& columns = m_mappingElement.columnNames();
00058   if( !columns.size() ){
00059     throwException( "Id columns not found in the mapping.",
00060                     "CArrayWriter::build");    
00061   }
00062   for( size_t i=0; i<columns.size(); i++ ){
00063     m_insertOperation->addId( columns[ i ] );
00064   }
00065 
00066   m_offset = &offset;
00067 
00068   m_arrayHandler.reset( ArrayHandlerFactory::newArrayHandler( m_objectType ) );
00069   
00070   std::string arrayTypeName = arrayType.Name();
00071   // Retrieve the relevant mapping element
00072   MappingElement::iterator iMe = m_mappingElement.find( arrayTypeName );
00073   if ( iMe == m_mappingElement.end() ) {
00074     throwException( "Item for \"" + arrayTypeName + "\" not found in the mapping element",
00075                     "CArrayWriter::build" );
00076   }
00077   
00078   m_dataWriter.reset( streamerFactory.newWriter( arrayResolvedType, iMe->second ));
00079   m_dataWriter->build( m_localElement, *m_insertOperation, operationBuffer );
00080   return true;
00081 }
00082 
00083 void ora::CArrayWriter::setRecordId( const std::vector<int>& identity ){
00084   m_recordId.clear();
00085   for(size_t i=0;i<identity.size();i++) {
00086     m_recordId.push_back( identity[i] );
00087   }
00088   m_recordId.push_back( 0 );
00089 }
00090       
00091 void ora::CArrayWriter::write( int oid,
00092                                const void* inputData ){
00093 
00094   if(!m_offset){
00095     throwException("The streamer has not been built.",
00096                    "CArrayWriter::write");
00097   }
00098   const std::vector<std::string>& columns = m_mappingElement.columnNames();
00099   if( columns.size() != m_recordId.size()+1){
00100     throwException( "Record id elements provided are not matching with the mapped id columns.",
00101                     "CArrayWriter::write");
00102   }
00103   
00104   void* data = m_offset->address( inputData );
00105   
00106   // Use the iterator to loop over the elements of the container.
00107   size_t containerSize = m_arrayHandler->size( data  );
00108   
00109   if ( containerSize == 0  ) return;
00110 
00111   size_t startElementIndex = m_arrayHandler->startElementIndex( data );
00112 
00113   std::auto_ptr<IArrayIteratorHandler> iteratorHandler( m_arrayHandler->iterate( data ) );
00114 
00115   InsertCache& bulkOperation = m_insertOperation->setUp( containerSize-startElementIndex+1 );
00116 
00117   for ( size_t iIndex = startElementIndex; iIndex < containerSize; ++iIndex ) {
00118 
00119     m_recordId[m_recordId.size()-1] = iIndex;
00120     coral::AttributeList& dataBuff = m_insertOperation->data();
00121 
00122     dataBuff[ columns[0] ].data<int>() = oid;
00123     for( size_t i = 1;i < columns.size(); i++ ){
00124       dataBuff[ columns[i] ].data<int>() = m_recordId[i-1];
00125     }
00126 
00127     void* objectReference = iteratorHandler->object();
00128 
00129     m_dataWriter->setRecordId( m_recordId );
00130     m_dataWriter->write( oid, objectReference );
00131     
00132     bulkOperation.processNextIteration();
00133     // Increment the iterator
00134     iteratorHandler->increment();
00135   }
00136 
00137   m_arrayHandler->finalize( const_cast<void*>( data ) );
00138   
00139 }
00140 
00141 ora::CArrayUpdater::CArrayUpdater(const Reflex::Type& objectType,
00142                                   MappingElement& mapping,
00143                                   ContainerSchema& contSchema ):
00144   m_deleter( mapping ),
00145   m_writer( objectType, mapping, contSchema ){
00146 }
00147 
00148 ora::CArrayUpdater::~CArrayUpdater(){
00149 }
00150 
00151 bool ora::CArrayUpdater::build( DataElement& offset,
00152                                 IRelationalData& relationalData,
00153                                 RelationalBuffer& operationBuffer){
00154   m_deleter.build( operationBuffer );
00155   m_writer.build( offset, relationalData, operationBuffer );
00156   return true;
00157 }
00158 
00159 void ora::CArrayUpdater::setRecordId( const std::vector<int>& identity ){
00160   m_writer.setRecordId( identity );
00161 }
00162 
00163 void ora::CArrayUpdater::update( int oid,
00164                                  const void* data ){
00165   m_deleter.erase( oid );
00166   m_writer.write( oid, data );
00167 }
00168 
00169 ora::CArrayReader::CArrayReader(const Reflex::Type& objectType,
00170                                 MappingElement& mapping,
00171                                 ContainerSchema& contSchema ):
00172   m_objectType( objectType ),
00173   m_mappingElement( mapping ),
00174   m_schema( contSchema ),
00175   m_recordId(),
00176   m_localElement( ),
00177   m_offset(0 ),
00178   m_query(),
00179   m_arrayHandler(),
00180   m_dataReader(){
00181 }
00182 
00183 ora::CArrayReader::~CArrayReader(){
00184 }
00185 
00186 bool ora::CArrayReader::build( DataElement& offset,
00187                                IRelationalData& relationalData ){
00188   
00189   m_localElement.clear();
00190 
00191   m_recordId.clear();
00192   // allocate for the index...
00193   m_recordId.push_back(0);
00194 
00195   // Check the array type
00196   Reflex::Type arrayType = m_objectType.ToType();
00197   Reflex::Type arrayResolvedType = ClassUtils::resolvedType(arrayType);
00198   // Check the component type
00199   if ( ! arrayType || !arrayResolvedType ) {
00200     throwException( "Missing dictionary information for the element type of the array \"" +
00201                     m_objectType.Name(Reflex::SCOPED|Reflex::FINAL) + "\"",
00202                     "CArrayReader::build" );
00203   }
00204 
00205   RelationalStreamerFactory streamerFactory( m_schema );
00206   
00207   // first open the insert on the extra table...
00208   m_query.reset(new MultiRecordSelectOperation( m_mappingElement.tableName(), m_schema.storageSchema() ));
00209   m_query->addWhereId( m_mappingElement.pkColumn() );
00210   std::vector<std::string> recIdCols = m_mappingElement.recordIdColumns();
00211   for( size_t i=0; i<recIdCols.size(); i++ ){
00212     m_query->addId( recIdCols[ i ] );
00213     m_query->addOrderId( recIdCols[ i ] );
00214   }
00215   
00216   m_offset = &offset;
00217 
00218   m_arrayHandler.reset( ArrayHandlerFactory::newArrayHandler( m_objectType ) );
00219 
00220   std::string arrayTypeName = arrayType.Name();
00221 
00222   // Retrieve the relevant mapping element
00223   MappingElement::iterator iMe = m_mappingElement.find( arrayTypeName );
00224   if ( iMe == m_mappingElement.end() ) {
00225     throwException( "Item for \"" + arrayTypeName + "\" not found in the mapping element",
00226                     "CArrayReader::build" );
00227   }
00228   
00229   m_dataReader.reset( streamerFactory.newReader( arrayResolvedType, iMe->second ) );
00230   m_dataReader->build( m_localElement, *m_query );
00231   return true;
00232 }
00233 
00234 void ora::CArrayReader::select( int oid ){
00235   if(!m_query.get()){
00236     throwException("The streamer has not been built.",
00237                    "CArrayReader::select");
00238   }
00239   coral::AttributeList& whereData = m_query->whereData();
00240   whereData[ m_mappingElement.pkColumn() ].data<int>() = oid;
00241   m_query->execute();
00242   m_dataReader->select( oid );
00243 }
00244 
00245 void ora::CArrayReader::setRecordId( const std::vector<int>& identity ){
00246   m_recordId.clear();
00247   for(size_t i=0;i<identity.size();i++) {
00248     m_recordId.push_back( identity[i] );
00249   }
00250   // allocate the element for the index...
00251   m_recordId.push_back( 0 );
00252 }
00253 
00254 void ora::CArrayReader::read( void* destinationData ) {
00255   if(!m_offset){
00256     throwException("The streamer has not been built.",
00257                    "CArrayReader::read");
00258   }
00259   void* address = m_offset->address( destinationData );
00260 
00261   Reflex::Type iteratorDereferenceReturnType = m_arrayHandler->iteratorReturnType();
00262   
00263   bool isElementFundamental = iteratorDereferenceReturnType.IsFundamental();
00264 
00265   std::string positionColumn = m_mappingElement.posColumn();
00266 
00267   size_t arraySize = m_objectType.ArrayLength();
00268   
00269   m_arrayHandler->clear( address );
00270 
00271   size_t cursorSize = m_query->selectionSize(m_recordId, m_recordId.size()-1);
00272   unsigned int i=0;
00273   while ( i< cursorSize ){
00274 
00275     m_recordId[m_recordId.size()-1] = (int)i;
00276     m_query->selectRow( m_recordId );
00277     coral::AttributeList& row = m_query->data();
00278 
00279     int arrayIndex = row[positionColumn].data< int >();
00280 
00281     // Create a new element for the array
00282     void* objectData = 0;
00283     
00284     if(arrayIndex >= (int)arraySize){
00285       throwException("Found more element then array size.",
00286                      "CArrayReader::read");
00287                      
00288     }
00289     
00290     // the memory has been allocated already!
00291     objectData = static_cast<char*>(address)+arrayIndex*iteratorDereferenceReturnType.SizeOf();
00292 
00293     if(!isElementFundamental){
00294       // in this case the initialization is required: use default constructor...
00295       iteratorDereferenceReturnType.Construct(Reflex::Type(0,0),std::vector< void* >(),objectData);
00296     }
00297 
00298     m_dataReader->setRecordId( m_recordId );
00299     m_dataReader->read( objectData );
00300     
00301     ++i;
00302   }
00303 
00304   m_arrayHandler->finalize( address );
00305 
00306 }
00307 
00308 void ora::CArrayReader::clear(){
00309   if(m_dataReader.get()) m_dataReader->clear();
00310 }
00311 
00312 ora::CArrayStreamer::CArrayStreamer( const Reflex::Type& objectType,
00313                                    MappingElement& mapping,
00314                                    ContainerSchema& contSchema ):
00315   m_objectType( objectType ),
00316   m_mapping( mapping ),
00317   m_schema( contSchema ){
00318 }
00319 
00320 ora::CArrayStreamer::~CArrayStreamer(){
00321 }
00322 
00323 ora::IRelationalWriter* ora::CArrayStreamer::newWriter(){
00324   return new CArrayWriter( m_objectType, m_mapping, m_schema );
00325 }
00326 
00327 ora::IRelationalUpdater* ora::CArrayStreamer::newUpdater(){
00328   return new CArrayUpdater( m_objectType, m_mapping, m_schema );
00329 }
00330 
00331 ora::IRelationalReader* ora::CArrayStreamer::newReader(){
00332   return new CArrayReader( m_objectType, m_mapping, m_schema );
00333 }