CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/IOPool/Streamer/src/EventBuffer.cc

Go to the documentation of this file.
00001 
00002 #include "IOPool/Streamer/interface/EventBuffer.h"
00003 
00004 namespace edm
00005 {
00006 
00007   EventBuffer::EventBuffer(int max_event_size, int max_queue_depth):
00008     max_event_size_(max_event_size),max_queue_depth_(max_queue_depth),
00009     pos_(max_queue_depth-1),mem_(max_event_size * max_queue_depth),
00010     queue_(max_queue_depth),
00011     fpos_(),bpos_()
00012   {
00013     // throw if event size 0 or queue depth 0
00014 
00015     for(char* i = &mem_[0]; i < &mem_[mem_.size()]; i += max_event_size)
00016       buffer_pool_.push_back(i);
00017 
00018   }
00019 
00020   EventBuffer::~EventBuffer() { }
00021 
00022   EventBuffer::Buffer EventBuffer::getProducerBuffer()
00023   {
00024     // get lock
00025     boost::mutex::scoped_lock sl(pool_lock_);
00026     // wait for buffer to appear
00027     while(pos_ < 0) {
00028         pool_cond_.wait(sl);
00029     }
00030     void* v = buffer_pool_[pos_];
00031     --pos_;
00032     return Buffer(v,max_event_size_);
00033   }
00034 
00035   void EventBuffer::releaseProducerBuffer(void* v)
00036   {
00037     // get lock
00038     boost::mutex::scoped_lock sl(pool_lock_);
00039     ++pos_;
00040     buffer_pool_[pos_] = v;
00041     pool_cond_.notify_one();
00042     // pool_cond_.notify_all();
00043   }
00044 
00045   void EventBuffer::commitProducerBuffer(void* v, int len)
00046   {
00047     // get lock
00048     boost::mutex::scoped_lock sl(queue_lock_);
00049     // if full, wait for item to be removed
00050     while((bpos_+max_queue_depth_)==fpos_) {
00051         push_cond_.wait(sl);
00052     }
00053     
00054     // put buffer into queue
00055     queue_[fpos_ % max_queue_depth_]=Buffer(v,len);
00056     ++fpos_;
00057     // signal consumer
00058     pop_cond_.notify_one();
00059     // pop_cond_.notify_all();
00060   }
00061   
00062   EventBuffer::Buffer EventBuffer::getConsumerBuffer()
00063   {
00064     // get lock
00065     boost::mutex::scoped_lock sl(queue_lock_);
00066     // if empty, wait for item to appear
00067     while(bpos_==fpos_) {
00068         pop_cond_.wait(sl);
00069     }
00070     // get a buffer from the queue and return it
00071     Buffer v = queue_[bpos_ % max_queue_depth_];
00072     ++bpos_;
00073     // note that these operations cannot throw
00074     // signal producer
00075     push_cond_.notify_one();
00076     // push_cond_.notify_all();
00077     return v;
00078   }
00079 
00080   void EventBuffer::releaseConsumerBuffer(void* v)
00081   {
00082     // should the buffer be placed back onto the queue and not released?
00083     // we got here because a commit did to occur in the consumer.
00084     // we will allow consumers to call or not call commit for now, meaning
00085     // that we cannot distinguish between exception conditions and normal
00086     // return.  The buffer will always be released
00087     releaseProducerBuffer(v);
00088   }
00089 
00090   void EventBuffer::commitConsumerBuffer(void* v, int)
00091   {
00092     releaseProducerBuffer(v);
00093   }
00094 }