Go to the documentation of this file.00001
00002 #include "IOPool/Streamer/interface/EventBuffer.h"
00003
00004 namespace edm
00005 {
00006
00007 EventBuffer::EventBuffer(int max_event_size, int max_queue_depth):
00008 max_event_size_(max_event_size),max_queue_depth_(max_queue_depth),
00009 pos_(max_queue_depth-1),mem_(max_event_size * max_queue_depth),
00010 queue_(max_queue_depth),
00011 fpos_(),bpos_()
00012 {
00013
00014
00015 for(char* i = &mem_[0]; i < &mem_[mem_.size()]; i += max_event_size)
00016 buffer_pool_.push_back(i);
00017
00018 }
00019
00020 EventBuffer::~EventBuffer() { }
00021
00022 EventBuffer::Buffer EventBuffer::getProducerBuffer()
00023 {
00024
00025 boost::mutex::scoped_lock sl(pool_lock_);
00026
00027 while(pos_ < 0) {
00028 pool_cond_.wait(sl);
00029 }
00030 void* v = buffer_pool_[pos_];
00031 --pos_;
00032 return Buffer(v,max_event_size_);
00033 }
00034
00035 void EventBuffer::releaseProducerBuffer(void* v)
00036 {
00037
00038 boost::mutex::scoped_lock sl(pool_lock_);
00039 ++pos_;
00040 buffer_pool_[pos_] = v;
00041 pool_cond_.notify_one();
00042
00043 }
00044
00045 void EventBuffer::commitProducerBuffer(void* v, int len)
00046 {
00047
00048 boost::mutex::scoped_lock sl(queue_lock_);
00049
00050 while((bpos_+max_queue_depth_)==fpos_) {
00051 push_cond_.wait(sl);
00052 }
00053
00054
00055 queue_[fpos_ % max_queue_depth_]=Buffer(v,len);
00056 ++fpos_;
00057
00058 pop_cond_.notify_one();
00059
00060 }
00061
00062 EventBuffer::Buffer EventBuffer::getConsumerBuffer()
00063 {
00064
00065 boost::mutex::scoped_lock sl(queue_lock_);
00066
00067 while(bpos_==fpos_) {
00068 pop_cond_.wait(sl);
00069 }
00070
00071 Buffer v = queue_[bpos_ % max_queue_depth_];
00072 ++bpos_;
00073
00074
00075 push_cond_.notify_one();
00076
00077 return v;
00078 }
00079
00080 void EventBuffer::releaseConsumerBuffer(void* v)
00081 {
00082
00083
00084
00085
00086
00087 releaseProducerBuffer(v);
00088 }
00089
00090 void EventBuffer::commitConsumerBuffer(void* v, int)
00091 {
00092 releaseProducerBuffer(v);
00093 }
00094 }