CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/CondCore/ORA/src/CArrayStreamer.cc

Go to the documentation of this file.
00001 #include "CondCore/ORA/interface/Exception.h"
00002 #include "CArrayStreamer.h"
00003 #include "ClassUtils.h"
00004 #include "MappingElement.h"
00005 #include "ContainerSchema.h"
00006 #include "RelationalBuffer.h"
00007 #include "MultiRecordInsertOperation.h"
00008 #include "MultiRecordSelectOperation.h"
00009 #include "RelationalStreamerFactory.h"
00010 #include "ArrayHandlerFactory.h"
00011 #include "IArrayHandler.h"
00012 // externals
00013 #include "CoralBase/Attribute.h"
00014 #include "RelationalAccess/IBulkOperation.h"
00015 #include "Reflex/Object.h"
00016 
00017 ora::CArrayWriter::CArrayWriter( const Reflex::Type& objectType,
00018                                  MappingElement& mapping,
00019                                  ContainerSchema& contSchema ):
00020   m_objectType( objectType ),
00021   m_mappingElement( mapping ),
00022   m_schema( contSchema ),
00023   m_recordId(),
00024   m_localElement( ),
00025   m_offset( 0 ),
00026   m_insertOperation( 0 ),
00027   m_arrayHandler(),
00028   m_dataWriter(){
00029 }
00030 
00031 ora::CArrayWriter::~CArrayWriter(){
00032 }
00033 
00034 bool ora::CArrayWriter::build( DataElement& offset,
00035                                IRelationalData&,
00036                                RelationalBuffer& operationBuffer ){
00037 
00038   m_localElement.clear();
00039   m_recordId.clear();
00040   // allocate for the index...
00041   m_recordId.push_back(0);
00042 
00043   // Check the array type
00044   Reflex::Type arrayType = m_objectType.ToType();
00045   Reflex::Type arrayResolvedType = ClassUtils::resolvedType(arrayType);
00046   // Check the component type
00047   if ( ! arrayType || !arrayResolvedType ) {
00048     throwException( "Missing dictionary information for the element type of the array \"" +
00049                     m_objectType.Name(Reflex::SCOPED|Reflex::FINAL) + "\"",
00050                     "CArrayWriter::build" );
00051   }
00052   
00053   RelationalStreamerFactory streamerFactory( m_schema );
00054   
00055   // first open the insert on the extra table...
00056   m_insertOperation = &operationBuffer.newMultiRecordInsert( m_mappingElement.tableName() );
00057   const std::vector<std::string>& columns = m_mappingElement.columnNames();
00058   if( !columns.size() ){
00059     throwException( "Id columns not found in the mapping.",
00060                     "CArrayWriter::build");    
00061   }
00062   for( size_t i=0; i<columns.size(); i++ ){
00063     m_insertOperation->addId( columns[ i ] );
00064   }
00065 
00066   m_offset = &offset;
00067 
00068   m_arrayHandler.reset( ArrayHandlerFactory::newArrayHandler( m_objectType ) );
00069   
00070   std::string arrayTypeName = arrayType.Name();
00071   // Retrieve the relevant mapping element
00072   MappingElement::iterator iMe = m_mappingElement.find( arrayTypeName );
00073   if ( iMe == m_mappingElement.end() ) {
00074     throwException( "Item for \"" + arrayTypeName + "\" not found in the mapping element",
00075                     "CArrayWriter::build" );
00076   }
00077   
00078   m_dataWriter.reset( streamerFactory.newWriter( arrayResolvedType, iMe->second ));
00079   m_dataWriter->build( m_localElement, *m_insertOperation, operationBuffer );
00080   return true;
00081 }
00082 
00083 void ora::CArrayWriter::setRecordId( const std::vector<int>& identity ){
00084   m_recordId.clear();
00085   for(size_t i=0;i<identity.size();i++) {
00086     m_recordId.push_back( identity[i] );
00087   }
00088   m_recordId.push_back( 0 );
00089 }
00090       
00091 void ora::CArrayWriter::write( int oid,
00092                                const void* inputData ){
00093 
00094   if(!m_offset){
00095     throwException("The streamer has not been built.",
00096                    "CArrayWriter::write");
00097   }
00098   const std::vector<std::string>& columns = m_mappingElement.columnNames();
00099   if( columns.size() != m_recordId.size()+1){
00100     throwException( "Record id elements provided are not matching with the mapped id columns.",
00101                     "CArrayWriter::write");
00102   }
00103   
00104   void* data = m_offset->address( inputData );
00105   
00106   // Use the iterator to loop over the elements of the container.
00107   size_t containerSize = m_arrayHandler->size( data  );
00108   size_t persistentSize = m_arrayHandler->persistentSize( data  );
00109   
00110   if ( containerSize == 0 || containerSize < persistentSize ) return;
00111 
00112   size_t startElementIndex = m_arrayHandler->startElementIndex( data );
00113 
00114   std::auto_ptr<IArrayIteratorHandler> iteratorHandler( m_arrayHandler->iterate( data ) );
00115 
00116   InsertCache& bulkOperation = m_insertOperation->setUp( containerSize-startElementIndex+1 );
00117 
00118   for ( size_t iIndex = startElementIndex; iIndex < containerSize; ++iIndex ) {
00119 
00120     m_recordId[m_recordId.size()-1] = iIndex;
00121     coral::AttributeList& dataBuff = m_insertOperation->data();
00122 
00123     dataBuff[ columns[0] ].data<int>() = oid;
00124     for( size_t i = 1;i < columns.size(); i++ ){
00125       dataBuff[ columns[i] ].data<int>() = m_recordId[i-1];
00126     }
00127 
00128     void* objectReference = iteratorHandler->object();
00129 
00130     m_dataWriter->setRecordId( m_recordId );
00131     m_dataWriter->write( oid, objectReference );
00132     
00133     bulkOperation.processNextIteration();
00134     // Increment the iterator
00135     iteratorHandler->increment();
00136   }
00137 
00138   m_arrayHandler->finalize( const_cast<void*>( data ) );
00139   
00140 }
00141 
00142 ora::CArrayUpdater::CArrayUpdater(const Reflex::Type& objectType,
00143                                   MappingElement& mapping,
00144                                   ContainerSchema& contSchema ):
00145   m_deleter( mapping ),
00146   m_writer( objectType, mapping, contSchema ){
00147 }
00148 
00149 ora::CArrayUpdater::~CArrayUpdater(){
00150 }
00151 
00152 bool ora::CArrayUpdater::build( DataElement& offset,
00153                                 IRelationalData& relationalData,
00154                                 RelationalBuffer& operationBuffer){
00155   m_deleter.build( operationBuffer );
00156   m_writer.build( offset, relationalData, operationBuffer );
00157   return true;
00158 }
00159 
00160 void ora::CArrayUpdater::setRecordId( const std::vector<int>& identity ){
00161   m_writer.setRecordId( identity );
00162 }
00163 
00164 void ora::CArrayUpdater::update( int oid,
00165                                  const void* data ){
00166   m_deleter.erase( oid );
00167   m_writer.write( oid, data );
00168 }
00169 
00170 ora::CArrayReader::CArrayReader(const Reflex::Type& objectType,
00171                                 MappingElement& mapping,
00172                                 ContainerSchema& contSchema ):
00173   m_objectType( objectType ),
00174   m_mappingElement( mapping ),
00175   m_schema( contSchema ),
00176   m_recordId(),
00177   m_localElement( ),
00178   m_offset(0 ),
00179   m_query(),
00180   m_arrayHandler(),
00181   m_dataReader(){
00182 }
00183 
00184 ora::CArrayReader::~CArrayReader(){
00185 }
00186 
00187 bool ora::CArrayReader::build( DataElement& offset,
00188                                IRelationalData& relationalData ){
00189   
00190   m_localElement.clear();
00191 
00192   m_recordId.clear();
00193   // allocate for the index...
00194   m_recordId.push_back(0);
00195 
00196   // Check the array type
00197   Reflex::Type arrayType = m_objectType.ToType();
00198   Reflex::Type arrayResolvedType = ClassUtils::resolvedType(arrayType);
00199   // Check the component type
00200   if ( ! arrayType || !arrayResolvedType ) {
00201     throwException( "Missing dictionary information for the element type of the array \"" +
00202                     m_objectType.Name(Reflex::SCOPED|Reflex::FINAL) + "\"",
00203                     "CArrayReader::build" );
00204   }
00205 
00206   RelationalStreamerFactory streamerFactory( m_schema );
00207   
00208   // first open the insert on the extra table...
00209   m_query.reset(new MultiRecordSelectOperation( m_mappingElement.tableName(), m_schema.storageSchema() ));
00210   m_query->addWhereId( m_mappingElement.pkColumn() );
00211   std::vector<std::string> recIdCols = m_mappingElement.recordIdColumns();
00212   for( size_t i=0; i<recIdCols.size(); i++ ){
00213     m_query->addId( recIdCols[ i ] );
00214     m_query->addOrderId( recIdCols[ i ] );
00215   }
00216   
00217   m_offset = &offset;
00218 
00219   m_arrayHandler.reset( ArrayHandlerFactory::newArrayHandler( m_objectType ) );
00220 
00221   std::string arrayTypeName = arrayType.Name();
00222 
00223   // Retrieve the relevant mapping element
00224   MappingElement::iterator iMe = m_mappingElement.find( arrayTypeName );
00225   if ( iMe == m_mappingElement.end() ) {
00226     throwException( "Item for \"" + arrayTypeName + "\" not found in the mapping element",
00227                     "CArrayReader::build" );
00228   }
00229   
00230   m_dataReader.reset( streamerFactory.newReader( arrayResolvedType, iMe->second ) );
00231   m_dataReader->build( m_localElement, *m_query );
00232   return true;
00233 }
00234 
00235 void ora::CArrayReader::select( int oid ){
00236   if(!m_query.get()){
00237     throwException("The streamer has not been built.",
00238                    "CArrayReader::select");
00239   }
00240   coral::AttributeList& whereData = m_query->whereData();
00241   whereData[ m_mappingElement.pkColumn() ].data<int>() = oid;
00242   m_query->execute();
00243   m_dataReader->select( oid );
00244 }
00245 
00246 void ora::CArrayReader::setRecordId( const std::vector<int>& identity ){
00247   m_recordId.clear();
00248   for(size_t i=0;i<identity.size();i++) {
00249     m_recordId.push_back( identity[i] );
00250   }
00251   // allocate the element for the index...
00252   m_recordId.push_back( 0 );
00253 }
00254 
00255 void ora::CArrayReader::read( void* destinationData ) {
00256   if(!m_offset){
00257     throwException("The streamer has not been built.",
00258                    "CArrayReader::read");
00259   }
00260   void* address = m_offset->address( destinationData );
00261 
00262   Reflex::Type iteratorDereferenceReturnType = m_arrayHandler->iteratorReturnType();
00263   
00264   bool isElementFundamental = iteratorDereferenceReturnType.IsFundamental();
00265 
00266   std::string positionColumn = m_mappingElement.posColumn();
00267 
00268   size_t arraySize = m_objectType.ArrayLength();
00269   
00270   m_arrayHandler->clear( address );
00271 
00272   size_t cursorSize = m_query->selectionSize(m_recordId, m_recordId.size()-1);
00273   unsigned int i=0;
00274   while ( i< cursorSize ){
00275 
00276     m_recordId[m_recordId.size()-1] = (int)i;
00277     m_query->selectRow( m_recordId );
00278     coral::AttributeList& row = m_query->data();
00279 
00280     int arrayIndex = row[positionColumn].data< int >();
00281 
00282     // Create a new element for the array
00283     void* objectData = 0;
00284     
00285     if(arrayIndex >= (int)arraySize){
00286       throwException("Found more element then array size.",
00287                      "CArrayReader::read");
00288                      
00289     }
00290     
00291     // the memory has been allocated already!
00292     objectData = static_cast<char*>(address)+arrayIndex*iteratorDereferenceReturnType.SizeOf();
00293 
00294     if(!isElementFundamental){
00295       // in this case the initialization is required: use default constructor...
00296       iteratorDereferenceReturnType.Construct(Reflex::Type(0,0),std::vector< void* >(),objectData);
00297     }
00298 
00299     m_dataReader->setRecordId( m_recordId );
00300     m_dataReader->read( objectData );
00301     
00302     ++i;
00303   }
00304 
00305   m_arrayHandler->finalize( address );
00306 
00307 }
00308 
00309 void ora::CArrayReader::clear(){
00310   if(m_dataReader.get()) m_dataReader->clear();
00311 }
00312 
00313 ora::CArrayStreamer::CArrayStreamer( const Reflex::Type& objectType,
00314                                    MappingElement& mapping,
00315                                    ContainerSchema& contSchema ):
00316   m_objectType( objectType ),
00317   m_mapping( mapping ),
00318   m_schema( contSchema ){
00319 }
00320 
00321 ora::CArrayStreamer::~CArrayStreamer(){
00322 }
00323 
00324 ora::IRelationalWriter* ora::CArrayStreamer::newWriter(){
00325   return new CArrayWriter( m_objectType, m_mapping, m_schema );
00326 }
00327 
00328 ora::IRelationalUpdater* ora::CArrayStreamer::newUpdater(){
00329   return new CArrayUpdater( m_objectType, m_mapping, m_schema );
00330 }
00331 
00332 ora::IRelationalReader* ora::CArrayStreamer::newReader(){
00333   return new CArrayReader( m_objectType, m_mapping, m_schema );
00334 }