CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_6_1_2_SLHC2_patch1/src/FWCore/Utilities/src/SingleConsumerQ.cc

Go to the documentation of this file.
00001 #include "FWCore/Utilities/interface/SingleConsumerQ.h"
00002 
00003 namespace edm
00004 {
00005 
00006   SingleConsumerQ::SingleConsumerQ(int max_event_size, int max_queue_depth):
00007     max_event_size_(max_event_size),max_queue_depth_(max_queue_depth),
00008     pos_(max_queue_depth-1),mem_(max_event_size * max_queue_depth),
00009     buffer_pool_(),
00010     queue_(max_queue_depth),
00011     fpos_(),
00012     bpos_(),
00013     pool_lock_(),
00014     queue_lock_(),
00015     pool_cond_(),
00016     pop_cond_(),
00017     push_cond_()
00018   {
00019     // throw if event size 0 or queue depth 0
00020 
00021     for(char* i=&mem_[0];i<&mem_[mem_.size()];i+=max_event_size)
00022       buffer_pool_.push_back(i);
00023 
00024   }
00025 
00026   SingleConsumerQ::~SingleConsumerQ() { }
00027 
00028   SingleConsumerQ::Buffer SingleConsumerQ::getProducerBuffer()
00029   {
00030     // get lock
00031     boost::mutex::scoped_lock sl(pool_lock_);
00032     // wait for buffer to appear
00033     while(pos_ < 0)
00034       {
00035         pool_cond_.wait(sl);
00036       }
00037     void* v = buffer_pool_[pos_];
00038     --pos_;
00039     return Buffer(v,max_event_size_);
00040   }
00041 
00042   void SingleConsumerQ::releaseProducerBuffer(void* v)
00043   {
00044     // get lock
00045     boost::mutex::scoped_lock sl(pool_lock_);
00046     ++pos_;
00047     buffer_pool_[pos_] = v;
00048     pool_cond_.notify_all();
00049   }
00050 
00051   void SingleConsumerQ::commitProducerBuffer(void* v, int len)
00052   {
00053     // get lock
00054     boost::mutex::scoped_lock sl(queue_lock_);
00055     // if full, wait for item to be removed
00056     while((bpos_+max_queue_depth_)==fpos_)
00057       {
00058         push_cond_.wait(sl);
00059       }
00060 
00061     // put buffer into queue
00062     queue_[fpos_ % max_queue_depth_]=Buffer(v,len);
00063     ++fpos_;
00064     // signal consumer
00065     pop_cond_.notify_all();
00066   }
00067 
00068   SingleConsumerQ::Buffer SingleConsumerQ::getConsumerBuffer()
00069   {
00070     // get lock
00071     boost::mutex::scoped_lock sl(queue_lock_);
00072     // if empty, wait for item to appear
00073     while(bpos_==fpos_)
00074       {
00075         pop_cond_.wait(sl);
00076       }
00077     // get a buffer from the queue and return it
00078     Buffer v = queue_[bpos_ % max_queue_depth_];
00079     ++bpos_;
00080     // note that these operations cannot throw
00081     // signal producer
00082     push_cond_.notify_all();
00083     return v;
00084   }
00085 
00086   void SingleConsumerQ::releaseConsumerBuffer(void* v)
00087   {
00088     // should the buffer be placed back onto the queue and not released?
00089     // we got here because a commit did to occur in the consumer.
00090     // we will allow consumers to call or not call commit for now, meaning
00091     // that we cannot distinguish between exception conditions and normal
00092     // return.  The buffer will always be released
00093     releaseProducerBuffer(v);
00094   }
00095 
00096   void SingleConsumerQ::commitConsumerBuffer(void* v, int)
00097   {
00098     releaseProducerBuffer(v);
00099   }
00100 }