00001 #include "CondCore/ORA/interface/Exception.h"
00002 #include "DatabaseContainer.h"
00003 #include "DatabaseSession.h"
00004 #include "IDatabaseSchema.h"
00005 #include "ContainerSchema.h"
00006 #include "IRelationalStreamer.h"
00007 #include "RelationalBuffer.h"
00008 #include "RelationalOperation.h"
00009 #include "DataElement.h"
00010 #include "RelationalDeleter.h"
00011 #include "RelationalStreamerFactory.h"
00012 #include "IDatabaseSchema.h"
00013 #include "ClassUtils.h"
00014 #include "MappingRules.h"
00015
00016 #include "CoralBase/Attribute.h"
00017
00018 namespace ora {
00019
00020 class WriteBuffer {
00021 public:
00022 explicit WriteBuffer( ContainerSchema& contSchema ):
00023 m_buffer(),
00024 m_contSchema( contSchema ){
00025 }
00026
00027 ~WriteBuffer(){
00028 }
00029
00030 void registerForWrite( int oid, const void* data ){
00031 m_buffer.push_back( std::make_pair(oid, data ) );
00032 }
00033
00034 size_t flush(){
00035 size_t nobj = 0;
00036 if( m_buffer.size() ){
00037 MappingElement& topLevelMapping = m_contSchema.mapping( true ).topElement();
00038 RelationalBuffer operationBuffer( m_contSchema.storageSchema() );
00039 InsertOperation* topLevelInsert = &operationBuffer.newInsert( topLevelMapping.tableName() );
00040 topLevelInsert->addId( topLevelMapping.columnNames()[ 0 ] );
00041 const Reflex::Type& type = m_contSchema.type();
00042 MappingElement::iterator iMap = topLevelMapping.find( type.Name(Reflex::SCOPED) );
00043
00044 if( iMap == topLevelMapping.end()){
00045 throwException("Could not find a mapping element for class \""+
00046 type.Name(Reflex::SCOPED)+"\"",
00047 "WriteBuffer::flush");
00048 }
00049 MappingElement& mapping = iMap->second;
00050 RelationalStreamerFactory streamerFactory( m_contSchema );
00051 DataElement topLevelElement;
00052 std::auto_ptr<IRelationalWriter> writer( streamerFactory.newWriter( type, mapping ) );
00053 writer->build( topLevelElement, *topLevelInsert, operationBuffer );
00054
00055 for( std::vector<std::pair<int, const void*> >::const_iterator iW = m_buffer.begin();
00056 iW != m_buffer.end(); ++iW ){
00057 int oid = iW->first;
00058 const void* data = iW->second;
00059 coral::AttributeList& dataBuff = topLevelInsert->data();
00060 dataBuff.begin()->data<int>() = oid;
00061 writer->write( oid, data );
00062 if( operationBuffer.flush() ) nobj++;
00063 }
00064 m_buffer.clear();
00065 }
00066 return nobj;
00067 }
00068
00069
00070 private:
00071 std::vector<std::pair<int, const void*> > m_buffer;
00072 ContainerSchema& m_contSchema;
00073
00074 };
00075
00076 class UpdateBuffer {
00077 public:
00078 explicit UpdateBuffer( ContainerSchema& contSchema ):
00079 m_buffer(),
00080 m_contSchema( contSchema ){
00081 }
00082
00083 ~UpdateBuffer(){
00084 }
00085
00086
00087 void registerForUpdate( int oid, const void* data ){
00088 m_buffer.push_back( std::make_pair( oid, data ));
00089 }
00090
00091
00092 size_t flush(){
00093 size_t nobj = 0;
00094 if( m_buffer.size() ){
00095 RelationalBuffer operationBuffer( m_contSchema.storageSchema() );
00096 std::vector<MappingElement> dependentMappings;
00097 m_contSchema.mappingForDependentClasses( dependentMappings );
00098 RelationalDeleter depDeleter( dependentMappings );
00099 depDeleter.build( operationBuffer );
00100 dependentMappings.clear();
00101
00102 MappingElement& topLevelMapping = m_contSchema.mapping( true ).topElement();
00103 UpdateOperation* topLevelUpdate = &operationBuffer.newUpdate( topLevelMapping.tableName(), true );
00104 topLevelUpdate->addId( topLevelMapping.columnNames()[ 0 ] );
00105 topLevelUpdate->addWhereId( topLevelMapping.columnNames()[ 0 ] );
00106 const Reflex::Type& type = m_contSchema.type();
00107 MappingElement::iterator iMap = topLevelMapping.find( type.Name(Reflex::SCOPED) );
00108
00109 if( iMap == topLevelMapping.end()){
00110 throwException("Could not find a mapping element for class \""+
00111 type.Name(Reflex::SCOPED)+"\"",
00112 "UpdateBuffer::flush");
00113 }
00114 MappingElement& mapping = iMap->second;
00115 RelationalStreamerFactory streamerFactory( m_contSchema );
00116 DataElement topLevelElement;
00117 std::auto_ptr<IRelationalUpdater> updater( streamerFactory.newUpdater( type, mapping ));
00118 updater->build( topLevelElement, *topLevelUpdate, operationBuffer );
00119 for( std::vector<std::pair<int, const void*> >::const_iterator iU = m_buffer.begin();
00120 iU != m_buffer.end(); ++iU ){
00121 int oid = iU->first;
00122 const void* data = iU->second;
00123
00124 depDeleter.erase( oid );
00125 coral::AttributeList& dataBuff = topLevelUpdate->data();
00126 dataBuff.begin()->data<int>() = oid;
00127 coral::AttributeList& whereDataBuff = topLevelUpdate->whereData();
00128 whereDataBuff.begin()->data<int>() = oid;
00129 updater->update( oid, data );
00130 if( operationBuffer.flush()) nobj++;
00131 }
00132 m_buffer.clear();
00133 }
00134 return nobj;
00135 }
00136
00137 private:
00138 std::vector<std::pair<int, const void*> > m_buffer;
00139 ContainerSchema& m_contSchema;
00140 };
00141
00142 class ReadBuffer {
00143 public:
00144 explicit ReadBuffer( ContainerSchema& contSchema ):
00145 m_topLevelElement(),
00146 m_type( contSchema.type() ),
00147 m_reader(),
00148 m_topLevelQuery( contSchema.mapping().topElement().tableName(), contSchema.storageSchema() ){
00149
00150 MappingElement& topLevelMapping = contSchema.mapping().topElement();
00151 m_topLevelQuery.addWhereId( topLevelMapping.columnNames()[ 0 ] );
00152 MappingElement::iterator iMap = topLevelMapping.find( m_type.Name(Reflex::SCOPED) );
00153
00154 if( iMap == topLevelMapping.end()){
00155 throwException("Could not find a mapping element for class \""+
00156 m_type.Name(Reflex::SCOPED)+"\"",
00157 "ReadBuffer::ReadBuffer");
00158 }
00159 MappingElement& mapping = iMap->second;
00160 RelationalStreamerFactory streamerFactory( contSchema );
00161 m_reader.reset( streamerFactory.newReader( m_type, mapping )) ;
00162 m_reader->build( m_topLevelElement , m_topLevelQuery );
00163 }
00164
00165 ~ReadBuffer(){
00166 }
00167
00168 void* read( int oid ){
00169 coral::AttributeList& dataBuff = m_topLevelQuery.whereData();
00170 dataBuff.begin()->data<int>() = oid;
00171 m_topLevelQuery.execute();
00172 m_reader->select( oid );
00173 void* destination = 0;
00174 if( m_topLevelQuery.nextCursorRow() ){
00175 destination = ClassUtils::constructObject( m_type );
00176 m_reader->read( destination );
00177 }
00178 m_reader->clear();
00179 m_topLevelQuery.clear();
00180 return destination;
00181 }
00182
00183 const Reflex::Type& type(){
00184 return m_type;
00185 }
00186
00187 private:
00188 DataElement m_topLevelElement;
00189 const Reflex::Type& m_type;
00190 std::auto_ptr<IRelationalReader> m_reader;
00191 SelectOperation m_topLevelQuery;
00192 };
00193
00194 class DeleteBuffer {
00195 public:
00196 explicit DeleteBuffer( ContainerSchema& contSchema ):
00197 m_buffer(),
00198 m_contSchema( contSchema ){
00199 }
00200
00201 ~DeleteBuffer(){
00202 }
00203
00204
00205 void registerForDelete( int oid ){
00206 m_buffer.push_back( oid );
00207 }
00208
00209 size_t flush(){
00210 size_t nobj = 0;
00211 if( m_buffer.size()) {
00212 RelationalBuffer operationBuffer( m_contSchema.storageSchema() );
00213 RelationalDeleter mainDeleter( m_contSchema.mapping().topElement() );
00214 mainDeleter.build( operationBuffer );
00215 std::vector<MappingElement> dependentMappings;
00216 m_contSchema.mappingForDependentClasses( dependentMappings );
00217 RelationalDeleter depDeleter( dependentMappings );
00218 depDeleter.build( operationBuffer );
00219 dependentMappings.clear();
00220
00221 for( std::vector<int>::const_iterator iD = m_buffer.begin();
00222 iD != m_buffer.end(); ++iD ){
00223 depDeleter.erase( *iD );
00224 mainDeleter.erase( *iD );
00225 if( operationBuffer.flush() ) nobj++;
00226 }
00227 m_buffer.clear();
00228 }
00229 return nobj;
00230 }
00231
00232 private:
00233 std::vector<int> m_buffer;
00234 ContainerSchema& m_contSchema;
00235 };
00236
00237 }
00238
00239 ora::IteratorBuffer::IteratorBuffer( ContainerSchema& schema,
00240 ReadBuffer& buffer ):
00241 m_query( schema.mapping().topElement().tableName(), schema.storageSchema() ),
00242 m_itemId( -1 ),
00243 m_readBuffer( buffer ){
00244 const std::string& idCol = schema.mapping().topElement().columnNames()[0];
00245 m_query.addId( idCol );
00246 m_query.addOrderId( idCol );
00247 }
00248
00249 ora::IteratorBuffer::~IteratorBuffer(){
00250 }
00251
00252 void ora::IteratorBuffer::reset(){
00253 m_query.execute();
00254 }
00255
00256 bool ora::IteratorBuffer::next(){
00257 bool prevValid = (m_itemId != -1);
00258 bool currValid = false;
00259 m_itemId = -1;
00260 if( m_query.nextCursorRow() ){
00261 coral::AttributeList& row = m_query.data();
00262 m_itemId = row.begin()->data<int>();
00263 currValid = true;
00264 }
00265
00266 if( !currValid && prevValid ) m_query.clear();
00267 return currValid;
00268 }
00269
00270 void* ora::IteratorBuffer::getItem(){
00271 void* ret = 0;
00272 if( m_itemId != -1 ){
00273 ret = m_readBuffer.read( m_itemId );
00274 }
00275 return ret;
00276 }
00277
00278 void* ora::IteratorBuffer::getItemAsType( const Reflex::Type& asType ){
00279 if( !ClassUtils::isType( type(), asType ) ){
00280 throwException("Provided output object type \""+asType.Name(Reflex::SCOPED)+"\" does not match with the container type \""+
00281 type().Name(Reflex::SCOPED)+"\"","IteratorBuffer::getItemAsType");
00282 }
00283 return getItem();
00284 }
00285
00286 int ora::IteratorBuffer::itemId(){
00287 return m_itemId;
00288 }
00289
00290 const Reflex::Type& ora::IteratorBuffer::type(){
00291 return m_readBuffer.type();
00292 }
00293
00294 ora::DatabaseContainer::DatabaseContainer( int contId,
00295 const std::string& containerName,
00296 const std::string& className,
00297 unsigned int containerSize,
00298 DatabaseSession& session ):
00299 m_dbSchema( session.schema() ),
00300 m_schema( new ContainerSchema(contId, containerName, className, session) ),
00301 m_writeBuffer(),
00302 m_updateBuffer(),
00303 m_readBuffer(),
00304 m_deleteBuffer(),
00305 m_iteratorBuffer(),
00306 m_size( containerSize ),
00307 m_containerUpdateTable( session.containerUpdateTable() ),
00308 m_lock( false ){
00309 }
00310
00311 ora::DatabaseContainer::DatabaseContainer( int contId,
00312 const std::string& containerName,
00313 const Reflex::Type& containerType,
00314 DatabaseSession& session ):
00315 m_dbSchema( session.schema() ),
00316 m_schema( new ContainerSchema(contId, containerName, containerType, session) ),
00317 m_writeBuffer(),
00318 m_updateBuffer(),
00319 m_readBuffer(),
00320 m_deleteBuffer(),
00321 m_iteratorBuffer(),
00322 m_size(0),
00323 m_containerUpdateTable( session.containerUpdateTable() ),
00324 m_lock( false ) {
00325 }
00326
00327 ora::DatabaseContainer::~DatabaseContainer(){
00328 m_iteratorBuffer.clear();
00329 }
00330
00331 int ora::DatabaseContainer::id(){
00332 return m_schema->containerId();
00333 }
00334
00335 const std::string& ora::DatabaseContainer::name(){
00336 return m_schema->containerName();
00337 }
00338
00339 const std::string& ora::DatabaseContainer::className(){
00340 return m_schema->className();
00341 }
00342
00343 const Reflex::Type& ora::DatabaseContainer::type(){
00344 return m_schema->type();
00345 }
00346
00347 const std::string& ora::DatabaseContainer::mappingVersion(){
00348 return m_schema->mappingVersion();
00349 }
00350
00351 size_t ora::DatabaseContainer::size(){
00352 return m_size;
00353 }
00354
00355 ora::Handle<ora::IteratorBuffer> ora::DatabaseContainer::iteratorBuffer(){
00356 if(!m_readBuffer.get()){
00357 m_readBuffer.reset( new ReadBuffer( *m_schema ) );
00358 }
00359 if( !m_iteratorBuffer ){
00360 m_iteratorBuffer.reset( new IteratorBuffer(*m_schema, *m_readBuffer ) );
00361 m_iteratorBuffer->reset();
00362 }
00363 return m_iteratorBuffer;
00364 }
00365
00366 bool ora::DatabaseContainer::lock(){
00367 if( !m_lock ){
00368 ContainerHeaderData headerData;
00369 m_lock = m_dbSchema.containerHeaderTable().lockContainer( m_schema->containerId(), headerData );
00370 if(!m_lock) throwException("Container \""+name()+"\" has been dropped.","DatabaseContainer::lock()");
00371
00372 m_size = headerData.numberOfObjects;
00373 }
00374 return m_lock;
00375 }
00376
00377 bool ora::DatabaseContainer::isLocked(){
00378 return m_lock;
00379 }
00380
00381 void ora::DatabaseContainer::create(){
00382 m_schema->create();
00383 }
00384
00385 void ora::DatabaseContainer::drop(){
00386 if(!m_schema->dbSession().testDropPermission()){
00387 throwException("Drop permission has been denied for the current user.",
00388 "DatabaseContainer::drop");
00389 }
00390 m_schema->drop();
00391 m_containerUpdateTable.remove( m_schema->containerId() );
00392 }
00393
00394 void ora::DatabaseContainer::extendSchema( const Reflex::Type& dependentType ){
00395 m_schema->extendIfRequired( dependentType );
00396 }
00397
00398 void ora::DatabaseContainer::setAccessPermission( const std::string& principal,
00399 bool forWrite ){
00400 m_schema->setAccessPermission( principal, forWrite );
00401 }
00402
00403 void* ora::DatabaseContainer::fetchItem(int itemId){
00404
00405 if(!m_readBuffer.get()){
00406 m_readBuffer.reset( new ReadBuffer( *m_schema ) );
00407 }
00408 return m_readBuffer->read( itemId );
00409 }
00410
00411 void* ora::DatabaseContainer::fetchItemAsType(int itemId,
00412 const Reflex::Type& asType){
00413 if(!m_readBuffer.get()){
00414 m_readBuffer.reset( new ReadBuffer( *m_schema ) );
00415 }
00416 if( !ClassUtils::isType( type(), asType ) ){
00417 throwException("Provided output object type \""+asType.Name(Reflex::SCOPED)+"\" does not match with the container type \""+
00418 type().Name(Reflex::SCOPED)+"\"","DatabaseContainer::fetchItemAsType");
00419 }
00420 return m_readBuffer->read( itemId );
00421 }
00422
00423 int ora::DatabaseContainer::insertItem( const void* data,
00424 const Reflex::Type& dataType ){
00425 if(!m_writeBuffer.get()){
00426 m_writeBuffer.reset( new WriteBuffer( *m_schema ) );
00427 }
00428 Reflex::Type inputResType = ClassUtils::resolvedType( dataType );
00429 Reflex::Type contType = ClassUtils::resolvedType(m_schema->type());
00430 if( inputResType.Name()!= contType.Name() && !inputResType.HasBase( contType ) ){
00431 throwException( "Provided input object type=\""+inputResType.Name()+
00432 "\" does not match with the container type=\""+contType.Name()+"\"",
00433 "DatabaseContainer::insertItem" );
00434 }
00435
00436 int newId = m_schema->containerSequences().getNextId( MappingRules::sequenceNameForContainer( m_schema->containerName()) );
00437 m_writeBuffer->registerForWrite( newId, data );
00438 return newId;
00439 }
00440
00441 void ora::DatabaseContainer::updateItem( int itemId,
00442 const void* data,
00443 const Reflex::Type& dataType ){
00444 if(!m_updateBuffer.get()){
00445 m_updateBuffer.reset( new UpdateBuffer( *m_schema ) );
00446 }
00447 Reflex::Type inputResType = ClassUtils::resolvedType( dataType );
00448 Reflex::Type contType = ClassUtils::resolvedType(m_schema->type());
00449 if( inputResType.Name()!= contType.Name() && !inputResType.HasBase( contType ) ){
00450 throwException( "Provided input object type=\""+inputResType.Name()+"\" does not match with the container type=\""+
00451 contType.Name()+"\".",
00452 "DatabaseContainer::updateItem" );
00453 }
00454
00455 m_updateBuffer->registerForUpdate( itemId, data );
00456 }
00457
00458 void ora::DatabaseContainer::erase( int itemId ){
00459 if(!m_deleteBuffer.get()){
00460 m_deleteBuffer.reset( new DeleteBuffer( *m_schema ) );
00461 }
00462 m_deleteBuffer->registerForDelete( itemId );
00463 }
00464
00465 void ora::DatabaseContainer::flush(){
00466 size_t prevSize = m_size;
00467 if(m_writeBuffer.get()) m_size += m_writeBuffer->flush();
00468 if(m_updateBuffer.get()) m_updateBuffer->flush();
00469 if(m_deleteBuffer.get()) m_size -= m_deleteBuffer->flush();
00470 m_schema->containerSequences().sinchronizeAll();
00471 if( prevSize != m_size ){
00472 m_containerUpdateTable.takeNote( id(), m_size );
00473 }
00474 }
00475
00476 void ora::DatabaseContainer::setItemName( const std::string& name,
00477 int itemId ){
00478 m_schema->dbSession().setObjectName( name, m_schema->containerId(), itemId );
00479 }
00480
00481 bool ora::DatabaseContainer::getNames( std::vector<std::string>& destination ){
00482 return m_schema->dbSession().getNamesForContainer( m_schema->containerId(), destination );
00483 }
00484
00485