00001 #include "CondCore/ORA/interface/Exception.h"
00002 #include "DatabaseContainer.h"
00003 #include "DatabaseSession.h"
00004 #include "IDatabaseSchema.h"
00005 #include "ContainerSchema.h"
00006 #include "IRelationalStreamer.h"
00007 #include "RelationalBuffer.h"
00008 #include "RelationalOperation.h"
00009 #include "DataElement.h"
00010 #include "RelationalDeleter.h"
00011 #include "RelationalStreamerFactory.h"
00012 #include "IDatabaseSchema.h"
00013 #include "ClassUtils.h"
00014 #include "MappingRules.h"
00015
00016 #include "CoralBase/Attribute.h"
00017
00018 namespace ora {
00019
00020 class WriteBuffer {
00021 public:
00022 explicit WriteBuffer( ContainerSchema& contSchema ):
00023 m_buffer(),
00024 m_operationBuffer( contSchema.storageSchema() ),
00025 m_topLevelElement(),
00026 m_writer(),
00027 m_topLevelInsert( 0 ){
00028
00029 MappingElement& topLevelMapping = contSchema.mapping( true ).topElement();
00030 m_topLevelInsert = &m_operationBuffer.newInsert( topLevelMapping.tableName() );
00031 m_topLevelInsert->addId( topLevelMapping.columnNames()[ 0 ] );
00032 const Reflex::Type& type = contSchema.type();
00033 MappingElement::iterator iMap = topLevelMapping.find( type.Name(Reflex::SCOPED) );
00034
00035 if( iMap == topLevelMapping.end()){
00036 throwException("Could not find a mapping element for class \""+
00037 type.Name(Reflex::SCOPED)+"\"",
00038 "WriteBuffer::WriteBuffer");
00039 }
00040 MappingElement& mapping = iMap->second;
00041 RelationalStreamerFactory streamerFactory( contSchema );
00042 m_writer.reset( streamerFactory.newWriter( type, mapping ) );
00043 m_writer->build( m_topLevelElement, *m_topLevelInsert, m_operationBuffer );
00044 }
00045
00046
00047 ~WriteBuffer(){
00048 }
00049
00050 void registerForWrite( int oid, const void* data ){
00051 m_buffer.push_back( std::make_pair(oid, data ) );
00052 }
00053
00054 size_t flush(){
00055 size_t nobj = 0;
00056 for( std::vector<std::pair<int, const void*> >::const_iterator iW = m_buffer.begin();
00057 iW != m_buffer.end(); ++iW ){
00058 write( iW->first, iW->second );
00059 if( m_operationBuffer.flush() ) nobj++;
00060 }
00061 m_buffer.clear();
00062 return nobj;
00063 }
00064
00065 private:
00066 void write( int oid, const void* data ){
00067 coral::AttributeList& dataBuff = m_topLevelInsert->data();
00068 dataBuff.begin()->data<int>() = oid;
00069 m_writer->write( oid, data );
00070 }
00071
00072 private:
00073 std::vector<std::pair<int, const void*> > m_buffer;
00074 RelationalBuffer m_operationBuffer;
00075 DataElement m_topLevelElement;
00076 std::auto_ptr<IRelationalWriter> m_writer;
00077 InsertOperation* m_topLevelInsert;
00078 };
00079
00080 class UpdateBuffer {
00081 public:
00082 explicit UpdateBuffer( ContainerSchema& contSchema ):
00083 m_buffer(),
00084 m_operationBuffer( contSchema.storageSchema() ),
00085 m_topLevelElement(),
00086 m_depDeleter(),
00087 m_updater(),
00088 m_topLevelUpdate( 0 ){
00089
00090 std::vector<MappingElement> dependentMappings;
00091 contSchema.mappingForDependentClasses( dependentMappings );
00092 m_depDeleter.reset( new RelationalDeleter( dependentMappings ) );
00093 m_depDeleter->build( m_operationBuffer );
00094 dependentMappings.clear();
00095
00096 MappingElement& topLevelMapping = contSchema.mapping( true ).topElement();
00097 m_topLevelUpdate = &m_operationBuffer.newUpdate( topLevelMapping.tableName(), true );
00098 m_topLevelUpdate->addId( topLevelMapping.columnNames()[ 0 ] );
00099 m_topLevelUpdate->addWhereId( topLevelMapping.columnNames()[ 0 ] );
00100 const Reflex::Type& type = contSchema.type();
00101 MappingElement::iterator iMap = topLevelMapping.find( type.Name(Reflex::SCOPED) );
00102
00103 if( iMap == topLevelMapping.end()){
00104 throwException("Could not find a mapping element for class \""+
00105 type.Name(Reflex::SCOPED)+"\"",
00106 "UpdateBuffer::UpdateBuffer");
00107 }
00108 MappingElement& mapping = iMap->second;
00109 RelationalStreamerFactory streamerFactory( contSchema );
00110 m_updater.reset( streamerFactory.newUpdater( type, mapping ));
00111 m_updater->build( m_topLevelElement, *m_topLevelUpdate, m_operationBuffer );
00112 }
00113
00114 ~UpdateBuffer(){
00115 }
00116
00117
00118 void registerForUpdate( int oid, const void* data ){
00119 m_buffer.push_back( std::make_pair( oid, data ));
00120 }
00121
00122
00123 size_t flush(){
00124 size_t nobj = 0;
00125 for( std::vector<std::pair<int, const void*> >::const_iterator iU = m_buffer.begin();
00126 iU != m_buffer.end(); ++iU ){
00127 update( iU->first, iU->second );
00128 if( m_operationBuffer.flush()) nobj++;
00129 }
00130 m_buffer.clear();
00131 return nobj;
00132 }
00133
00134
00135 private:
00136 void update( int oid, const void* data ){
00137
00138 m_depDeleter->erase( oid );
00139 coral::AttributeList& dataBuff = m_topLevelUpdate->data();
00140 dataBuff.begin()->data<int>() = oid;
00141 coral::AttributeList& whereDataBuff = m_topLevelUpdate->whereData();
00142 whereDataBuff.begin()->data<int>() = oid;
00143 m_updater->update( oid, data );
00144 }
00145
00146
00147 private:
00148 std::vector<std::pair<int, const void*> > m_buffer;
00149 RelationalBuffer m_operationBuffer;
00150 DataElement m_topLevelElement;
00151 std::auto_ptr<RelationalDeleter> m_depDeleter;
00152 std::auto_ptr<IRelationalUpdater> m_updater;
00153 UpdateOperation* m_topLevelUpdate;
00154 };
00155
00156 class ReadBuffer {
00157 public:
00158 explicit ReadBuffer( ContainerSchema& contSchema ):
00159 m_topLevelElement(),
00160 m_type( contSchema.type() ),
00161 m_reader(),
00162 m_topLevelQuery( contSchema.mapping().topElement().tableName(), contSchema.storageSchema() ){
00163
00164 MappingElement& topLevelMapping = contSchema.mapping().topElement();
00165 m_topLevelQuery.addWhereId( topLevelMapping.columnNames()[ 0 ] );
00166 MappingElement::iterator iMap = topLevelMapping.find( m_type.Name(Reflex::SCOPED) );
00167
00168 if( iMap == topLevelMapping.end()){
00169 throwException("Could not find a mapping element for class \""+
00170 m_type.Name(Reflex::SCOPED)+"\"",
00171 "ReadBuffer::ReadBuffer");
00172 }
00173 MappingElement& mapping = iMap->second;
00174 RelationalStreamerFactory streamerFactory( contSchema );
00175 m_reader.reset( streamerFactory.newReader( m_type, mapping )) ;
00176 m_reader->build( m_topLevelElement , m_topLevelQuery );
00177 }
00178
00179 ~ReadBuffer(){
00180 }
00181
00182 void* read( int oid ){
00183 coral::AttributeList& dataBuff = m_topLevelQuery.whereData();
00184 dataBuff.begin()->data<int>() = oid;
00185 m_topLevelQuery.execute();
00186 m_reader->select( oid );
00187 void* destination = 0;
00188 if( m_topLevelQuery.nextCursorRow() ){
00189 destination = ClassUtils::constructObject( m_type );
00190 m_reader->read( destination );
00191 }
00192 m_reader->clear();
00193 m_topLevelQuery.clear();
00194 return destination;
00195 }
00196
00197 const Reflex::Type& type(){
00198 return m_type;
00199 }
00200
00201 private:
00202 DataElement m_topLevelElement;
00203 const Reflex::Type& m_type;
00204 std::auto_ptr<IRelationalReader> m_reader;
00205 SelectOperation m_topLevelQuery;
00206 };
00207
00208 class DeleteBuffer {
00209 public:
00210 explicit DeleteBuffer( ContainerSchema& contSchema ):
00211 m_buffer(),
00212 m_operationBuffer( contSchema.storageSchema() ),
00213 m_mainDeleter(),
00214 m_depDeleter(){
00215 m_mainDeleter.reset( new RelationalDeleter( contSchema.mapping().topElement() ));
00216 m_mainDeleter->build( m_operationBuffer );
00217
00218 std::vector<MappingElement> dependentMappings;
00219 contSchema.mappingForDependentClasses( dependentMappings );
00220 m_depDeleter.reset( new RelationalDeleter( dependentMappings ) );
00221 m_depDeleter->build( m_operationBuffer );
00222 dependentMappings.clear();
00223 }
00224
00225 ~DeleteBuffer(){
00226 }
00227
00228
00229 void registerForDelete( int oid ){
00230 m_buffer.push_back( oid );
00231 }
00232
00233 size_t flush(){
00234 size_t nobj = 0;
00235 for( std::vector<int>::const_iterator iD = m_buffer.begin();
00236 iD != m_buffer.end(); ++iD ){
00237 m_depDeleter->erase( *iD );
00238 m_mainDeleter->erase( *iD );
00239 if( m_operationBuffer.flush() ) nobj++;
00240 }
00241 m_buffer.clear();
00242 return nobj;
00243 }
00244
00245 private:
00246 std::vector<int> m_buffer;
00247 RelationalBuffer m_operationBuffer;
00248 std::auto_ptr<RelationalDeleter> m_mainDeleter;
00249 std::auto_ptr<RelationalDeleter> m_depDeleter;
00250 };
00251
00252 }
00253
00254 ora::IteratorBuffer::IteratorBuffer( ContainerSchema& schema,
00255 ReadBuffer& buffer ):
00256 m_query( schema.mapping().topElement().tableName(), schema.storageSchema() ),
00257 m_itemId( -1 ),
00258 m_readBuffer( buffer ){
00259 const std::string& idCol = schema.mapping().topElement().columnNames()[0];
00260 m_query.addId( idCol );
00261 m_query.addOrderId( idCol );
00262 }
00263
00264 ora::IteratorBuffer::~IteratorBuffer(){
00265 }
00266
00267 void ora::IteratorBuffer::reset(){
00268 m_query.execute();
00269 }
00270
00271 bool ora::IteratorBuffer::next(){
00272 bool prevValid = (m_itemId != -1);
00273 bool currValid = false;
00274 m_itemId = -1;
00275 if( m_query.nextCursorRow() ){
00276 coral::AttributeList& row = m_query.data();
00277 m_itemId = row.begin()->data<int>();
00278 currValid = true;
00279 }
00280
00281 if( !currValid && prevValid ) m_query.clear();
00282 return currValid;
00283 }
00284
00285 void* ora::IteratorBuffer::getItem(){
00286 void* ret = 0;
00287 if( m_itemId != -1 ){
00288 ret = m_readBuffer.read( m_itemId );
00289 }
00290 return ret;
00291 }
00292
00293 void* ora::IteratorBuffer::getItemAsType( const Reflex::Type& asType ){
00294 if( !ClassUtils::isType( type(), asType ) ){
00295 throwException("Provided output object type \""+asType.Name(Reflex::SCOPED)+"\" does not match with the container type \""+
00296 type().Name(Reflex::SCOPED)+"\"","IteratorBuffer::getItemAsType");
00297 }
00298 return getItem();
00299 }
00300
00301 int ora::IteratorBuffer::itemId(){
00302 return m_itemId;
00303 }
00304
00305 const Reflex::Type& ora::IteratorBuffer::type(){
00306 return m_readBuffer.type();
00307 }
00308
00309 ora::DatabaseContainer::DatabaseContainer( int contId,
00310 const std::string& containerName,
00311 const std::string& className,
00312 unsigned int containerSize,
00313 DatabaseSession& session ):
00314 m_dbSchema( session.schema() ),
00315 m_schema( new ContainerSchema(contId, containerName, className, session) ),
00316 m_writeBuffer(),
00317 m_updateBuffer(),
00318 m_readBuffer(),
00319 m_deleteBuffer(),
00320 m_iteratorBuffer(),
00321 m_size( containerSize ),
00322 m_containerUpdateTable( session.containerUpdateTable() ),
00323 m_lock( false ){
00324 }
00325
00326 ora::DatabaseContainer::DatabaseContainer( int contId,
00327 const std::string& containerName,
00328 const Reflex::Type& containerType,
00329 DatabaseSession& session ):
00330 m_dbSchema( session.schema() ),
00331 m_schema( new ContainerSchema(contId, containerName, containerType, session) ),
00332 m_writeBuffer(),
00333 m_updateBuffer(),
00334 m_readBuffer(),
00335 m_deleteBuffer(),
00336 m_iteratorBuffer(),
00337 m_size(0),
00338 m_containerUpdateTable( session.containerUpdateTable() ),
00339 m_lock( false ) {
00340 }
00341
00342 ora::DatabaseContainer::~DatabaseContainer(){
00343 m_iteratorBuffer.clear();
00344 }
00345
00346 int ora::DatabaseContainer::id(){
00347 return m_schema->containerId();
00348 }
00349
00350 const std::string& ora::DatabaseContainer::name(){
00351 return m_schema->containerName();
00352 }
00353
00354 const std::string& ora::DatabaseContainer::className(){
00355 return m_schema->className();
00356 }
00357
00358 const Reflex::Type& ora::DatabaseContainer::type(){
00359 return m_schema->type();
00360 }
00361
00362 const std::string& ora::DatabaseContainer::mappingVersion(){
00363 return m_schema->mappingVersion();
00364 }
00365
00366 size_t ora::DatabaseContainer::size(){
00367 return m_size;
00368 }
00369
00370 ora::Handle<ora::IteratorBuffer> ora::DatabaseContainer::iteratorBuffer(){
00371 if(!m_readBuffer.get()){
00372 m_readBuffer.reset( new ReadBuffer( *m_schema ) );
00373 }
00374 if( !m_iteratorBuffer ){
00375 m_iteratorBuffer.reset( new IteratorBuffer(*m_schema, *m_readBuffer ) );
00376 m_iteratorBuffer->reset();
00377 }
00378 return m_iteratorBuffer;
00379 }
00380
00381 bool ora::DatabaseContainer::lock(){
00382 if( !m_lock ){
00383 ContainerHeaderData headerData;
00384 m_lock = m_dbSchema.containerHeaderTable().lockContainer( m_schema->containerId(), headerData );
00385 if(!m_lock) throwException("Container \""+name()+"\" has been dropped.","DatabaseContainer::lock()");
00386
00387 m_size = headerData.numberOfObjects;
00388 }
00389 return m_lock;
00390 }
00391
00392 bool ora::DatabaseContainer::isLocked(){
00393 return m_lock;
00394 }
00395
00396 void ora::DatabaseContainer::create(){
00397 m_schema->create();
00398 }
00399
00400 void ora::DatabaseContainer::drop(){
00401 if(!m_schema->dbSession().testDropPermission()){
00402 throwException("Drop permission has been denied for the current user.",
00403 "DatabaseContainer::drop");
00404 }
00405 m_schema->drop();
00406 m_containerUpdateTable.remove( m_schema->containerId() );
00407 }
00408
00409 void ora::DatabaseContainer::extendSchema( const Reflex::Type& dependentType ){
00410 m_schema->extendIfRequired( dependentType );
00411 }
00412
00413 void ora::DatabaseContainer::setAccessPermission( const std::string& principal,
00414 bool forWrite ){
00415 m_schema->setAccessPermission( principal, forWrite );
00416 }
00417
00418 void* ora::DatabaseContainer::fetchItem(int itemId){
00419
00420 if(!m_readBuffer.get()){
00421 m_readBuffer.reset( new ReadBuffer( *m_schema ) );
00422 }
00423 return m_readBuffer->read( itemId );
00424 }
00425
00426 void* ora::DatabaseContainer::fetchItemAsType(int itemId,
00427 const Reflex::Type& asType){
00428 if(!m_readBuffer.get()){
00429 m_readBuffer.reset( new ReadBuffer( *m_schema ) );
00430 }
00431 if( !ClassUtils::isType( type(), asType ) ){
00432 throwException("Provided output object type \""+asType.Name(Reflex::SCOPED)+"\" does not match with the container type \""+
00433 type().Name(Reflex::SCOPED)+"\"","DatabaseContainer::fetchItemAsType");
00434 }
00435 return m_readBuffer->read( itemId );
00436 }
00437
00438 int ora::DatabaseContainer::insertItem( const void* data,
00439 const Reflex::Type& dataType ){
00440 if(!m_writeBuffer.get()){
00441 m_writeBuffer.reset( new WriteBuffer( *m_schema ) );
00442 }
00443 Reflex::Type inputResType = ClassUtils::resolvedType( dataType );
00444 Reflex::Type contType = ClassUtils::resolvedType(m_schema->type());
00445 if( inputResType.Name()!= contType.Name() && !inputResType.HasBase( contType ) ){
00446 throwException( "Provided input object type=\""+inputResType.Name()+
00447 "\" does not match with the container type=\""+contType.Name()+"\"",
00448 "DatabaseContainer::insertItem" );
00449 }
00450
00451 int newId = m_schema->containerSequences().getNextId( MappingRules::sequenceNameForContainer( m_schema->containerName()) );
00452 m_writeBuffer->registerForWrite( newId, data );
00453 return newId;
00454 }
00455
00456 void ora::DatabaseContainer::updateItem( int itemId,
00457 const void* data,
00458 const Reflex::Type& dataType ){
00459 if(!m_updateBuffer.get()){
00460 m_updateBuffer.reset( new UpdateBuffer( *m_schema ) );
00461 }
00462 Reflex::Type inputResType = ClassUtils::resolvedType( dataType );
00463 Reflex::Type contType = ClassUtils::resolvedType(m_schema->type());
00464 if( inputResType.Name()!= contType.Name() && !inputResType.HasBase( contType ) ){
00465 throwException( "Provided input object type=\""+inputResType.Name()+"\" does not match with the container type=\""+
00466 contType.Name()+"\".",
00467 "DatabaseContainer::updateItem" );
00468 }
00469
00470 m_updateBuffer->registerForUpdate( itemId, data );
00471 }
00472
00473 void ora::DatabaseContainer::erase( int itemId ){
00474 if(!m_deleteBuffer.get()){
00475 m_deleteBuffer.reset( new DeleteBuffer( *m_schema ) );
00476 }
00477 m_deleteBuffer->registerForDelete( itemId );
00478 }
00479
00480 void ora::DatabaseContainer::flush(){
00481 size_t prevSize = m_size;
00482 if(m_writeBuffer.get()) m_size += m_writeBuffer->flush();
00483 if(m_updateBuffer.get()) m_updateBuffer->flush();
00484 if(m_deleteBuffer.get()) m_size -= m_deleteBuffer->flush();
00485 m_schema->containerSequences().sinchronizeAll();
00486 if( prevSize != m_size ){
00487 m_containerUpdateTable.takeNote( id(), m_size );
00488 }
00489 }
00490
00491 void ora::DatabaseContainer::setItemName( const std::string& name,
00492 int itemId ){
00493 m_schema->dbSession().setObjectName( name, m_schema->containerId(), itemId );
00494 }
00495
00496 bool ora::DatabaseContainer::getNames( std::vector<std::string>& destination ){
00497 return m_schema->dbSession().getNamesForContainer( m_schema->containerId(), destination );
00498 }
00499
00500