00001 #include "FWCore/Utilities/interface/SingleConsumerQ.h"
00002
00003 namespace edm
00004 {
00005
00006 SingleConsumerQ::SingleConsumerQ(int max_event_size, int max_queue_depth):
00007 max_event_size_(max_event_size),max_queue_depth_(max_queue_depth),
00008 pos_(max_queue_depth-1),mem_(max_event_size * max_queue_depth),
00009 buffer_pool_(),
00010 queue_(max_queue_depth),
00011 fpos_(),
00012 bpos_(),
00013 pool_lock_(),
00014 queue_lock_(),
00015 pool_cond_(),
00016 pop_cond_(),
00017 push_cond_()
00018 {
00019
00020
00021 for(char* i=&mem_[0];i<&mem_[mem_.size()];i+=max_event_size)
00022 buffer_pool_.push_back(i);
00023
00024 }
00025
00026 SingleConsumerQ::~SingleConsumerQ() { }
00027
00028 SingleConsumerQ::Buffer SingleConsumerQ::getProducerBuffer()
00029 {
00030
00031 boost::mutex::scoped_lock sl(pool_lock_);
00032
00033 while(pos_ < 0)
00034 {
00035 pool_cond_.wait(sl);
00036 }
00037 void* v = buffer_pool_[pos_];
00038 --pos_;
00039 return Buffer(v,max_event_size_);
00040 }
00041
00042 void SingleConsumerQ::releaseProducerBuffer(void* v)
00043 {
00044
00045 boost::mutex::scoped_lock sl(pool_lock_);
00046 ++pos_;
00047 buffer_pool_[pos_] = v;
00048 pool_cond_.notify_all();
00049 }
00050
00051 void SingleConsumerQ::commitProducerBuffer(void* v, int len)
00052 {
00053
00054 boost::mutex::scoped_lock sl(queue_lock_);
00055
00056 while((bpos_+max_queue_depth_)==fpos_)
00057 {
00058 push_cond_.wait(sl);
00059 }
00060
00061
00062 queue_[fpos_ % max_queue_depth_]=Buffer(v,len);
00063 ++fpos_;
00064
00065 pop_cond_.notify_all();
00066 }
00067
00068 SingleConsumerQ::Buffer SingleConsumerQ::getConsumerBuffer()
00069 {
00070
00071 boost::mutex::scoped_lock sl(queue_lock_);
00072
00073 while(bpos_==fpos_)
00074 {
00075 pop_cond_.wait(sl);
00076 }
00077
00078 Buffer v = queue_[bpos_ % max_queue_depth_];
00079 ++bpos_;
00080
00081
00082 push_cond_.notify_all();
00083 return v;
00084 }
00085
00086 void SingleConsumerQ::releaseConsumerBuffer(void* v)
00087 {
00088
00089
00090
00091
00092
00093 releaseProducerBuffer(v);
00094 }
00095
00096 void SingleConsumerQ::commitConsumerBuffer(void* v, int)
00097 {
00098 releaseProducerBuffer(v);
00099 }
00100 }