CMS 3D CMS Logo

TSqueue.h

Go to the documentation of this file.
00001 #ifndef UTILITIES_THREADS_TSQUEUE_H
00002 #define UTILITIES_THREADS_TSQUEUE_H
00003 //
00004 //  a thread safe queue
00005 //
00006 #include "Utilities/Threads/interface/Thread.h"
00007 #include "Utilities/General/interface/MutexUtils.h"
00008 #include <boost/thread/condition.hpp>
00009 #include "Utilities/General/interface/GeneralVerbosity.h"
00010 
00011 #include "Utilities/General/interface/CMSexception.h"
00012 #include "Utilities/General/interface/UncatchedException.h"
00013 #include "Utilities/General/interface/ClassName.h"
00014 #include "boost/lexical_cast.hpp"
00015 
00016 #include <queue>
00017 #include <deque>
00018 
00019 struct TSqueue_termination : public Thread::Terminate {};
00020 
00023 template<class X>
00024 class TSqueue {
00025   
00026 public: 
00027 
00028   typedef TSqueue<X> self;
00029 
00030 public:
00032   explicit TSqueue(size_t imax=1000, size_t imin=900) : 
00033     maxSize(imax), minSize(imin),
00034     draining_(false), terminate_(false), abort_(false) {
00035     if (minSize<1) minSize=1;
00036   }
00037 
00038   void reset() {
00039     draining_=terminate_=abort_=false;
00040   }
00041   
00043   void push(const X& x) {
00044     SimpleLockMutex gl(genLock);
00045     if (abort_) throw TSqueue_termination();
00046     while(cont.size()>maxSize) {
00047       notFull.wait(gl());
00048       if (abort_) throw TSqueue_termination();
00049     }
00050     cont.push(x);    
00051     if ( !(cont.size()<minSize )) { notEmpty.notify_all(); } 
00052   }
00053   
00055   X pop() {
00056     SimpleLockMutex gl(genLock);
00057     while(cont.empty()) {
00058       if (draining_) dodrain.notify_all();
00059       if (terminate_) throw TSqueue_termination();
00060       notEmpty.wait(gl());
00061     }
00062     X x = cont.front();
00063     cont.pop();
00064     if (cont.size()<minSize) notFull.notify_all(); 
00065     return x;
00066   }
00067   
00069   void terminate() {
00070     SimpleLockMutex gl(genLock);
00071     terminate_ = true;
00072     notEmpty.notify_all();
00073   }
00074   
00076   void abort() {
00077     SimpleLockMutex gl(genLock);
00078     abort_ = true;
00079     clear();
00080     notFull.notify_all(); 
00081     if (draining_) dodrain.notify_all();
00082   }
00083 
00085   void drain() {
00086     SimpleLockMutex gl(genLock);
00087     if (cont.empty()) return;
00088     draining_=true;
00089     notEmpty.notify_all();
00090     while(!cont.empty()) {
00091       dodrain.wait(gl());
00092       if (abort_) throw TSqueue_termination();
00093     }
00094   }
00096   size_t size() const {
00097     SimpleLockMutex gl(genLock);
00098     return cont.size();
00099   }
00100 
00101   void clear() {
00102     while(!cont.empty()) cont.pop();
00103   }
00104 
00105 
00106 private:
00107 
00108 
00109   
00110   std::queue<X,std::deque<X> > cont;
00111   
00112   size_t maxSize;
00113   size_t minSize;
00114   
00115   mutable SimpleLockMutex::Mutex genLock;
00116   boost::condition notEmpty;
00117   boost::condition notFull;
00118   boost::condition dodrain;
00119 
00120 
00121   bool draining_;
00122 
00123   bool terminate_;
00124   bool abort_;
00125   
00126 };
00127 
00130 template<class X, class C>
00131 class TSqueueConsumer : public Thread{
00132 
00133 public: 
00134   
00135   typedef TSqueueConsumer<X,C> self;
00136   
00137 public:
00138   
00139   typedef TSqueue<X>  Xqueue;
00140   
00141 private:
00142   
00143   void run() {
00144       if (!GeneralVerbosity::silent())
00145           GeneralUtilities::cout << ">>>>> consumer " << ClassName<C>::name()
00146                     << " thread starts.." << thread_self_tid() <<  std::endl;
00147     
00148 #ifdef __linux__
00149     // now outside...
00150     //    string mess("Thread "); mess += boost::lexical_cast<string>(thread_self_tid());
00151     //    LinuxElapsedTime totaltime(mess,cout); 
00152 #endif // __linux__
00153     
00154     if (!GeneralVerbosity::silent())
00155       GeneralUtilities::cout << thread_self_tid() << " queue size is "<< inqueue.size() <<  std::endl;
00156     
00157     C consumer;
00158     
00159     try{  // this try logic is here to avoid double throw from consumer destructor...
00160       for(;;) {
00161         try {
00162           consumer(inqueue.pop());
00163         }
00164         catch ( Thread::Terminate ) {
00165           throw;
00166         }
00167         catch ( GenTerminate ) {
00168           if (!interactive_) throw;
00169           abort();
00170         }
00171         catch (...) {
00172           if (!interactive_) throw;
00173           if (!GeneralVerbosity::silent()) {
00174             GeneralUtilities::cout << "thread " 
00175                           << self_tid() << " caught exception " << std::endl;
00176           } 
00177           abort();
00178         } 
00179       }
00180     }
00181     catch ( Thread::Terminate ) {
00182       throw;
00183     }
00184     catch ( GenTerminate ) {
00185       throw;
00186     }
00187     catch ( cms::Exception & cexp) {
00188       if (!GeneralVerbosity::silent()) {
00189         GeneralUtilities::cout << "thread " 
00190                       << self_tid() << " caught  CMS/SEAL exception (" 
00191                       << className(cexp) 
00192                       << ") :" <<  std::endl;
00193         Genexception * ce = dynamic_cast<Genexception *>(&cexp);
00194         if (ce) ce->dump(GeneralUtilities::cout, true);
00195         else  GeneralUtilities::cout << cexp.what() <<  std::endl;
00196         GeneralUtilities::cout << " inform producer...." << std::endl;
00197       }
00198       UncatchedException uce(cexp);
00199       assert(&uce != 0); // Avoids warning message for unused variable uce.
00200       abort();
00201     }
00202     catch (...) {
00203       if (!GeneralVerbosity::silent()) {
00204         GeneralUtilities::cout << "thread " 
00205                       << self_tid() << " caught unknown exception " <<  std::endl;    
00206         GeneralUtilities::cout << " inform producer...." <<  std::endl;
00207       }
00208       UncatchedException uce;
00209       assert(&uce != 0); // Avoids warning message for unused variable uce.
00210       abort();
00211     }
00212     
00213     if (!GeneralVerbosity::silent())
00214       GeneralUtilities::cout << ">>>>> consumer " << ClassName<C>::name()
00215                     << " thread ends.." << thread_self_tid() << std::endl;
00216   }
00217   
00219   virtual void terminate(){
00220     if (!GeneralVerbosity::silent())
00221       GeneralUtilities::cout << ">>>>> consumer " << ClassName<C>::name()
00222                     << " thread terminate.." << thread_self_tid() << std:: endl;
00223   }
00224   
00226   virtual void abort() { 
00227     if (!GeneralVerbosity::silent())
00228       GeneralUtilities::cout << ">>>>> consumer " << ClassName<C>::name()
00229                     << " thread aborts.." << thread_self_tid() <<  std::endl;
00230     inqueue.abort(); 
00231   }
00232   
00233 public:
00234   explicit TSqueueConsumer(size_t imax=1000, size_t imin=900):
00235     inqueue(imax,imin),  interactive_(false) {
00236     initialize();
00237   }
00238   
00239   ~TSqueueConsumer() {
00240     inqueue.terminate();
00241     finalize();
00242   }
00243   
00244   void reset() {  inqueue.reset();}
00245   
00246   void interactive() { interactive_=true;}
00247   
00248   // wait queue to drain
00249   void wait() { 
00250     if (!GeneralVerbosity::silent())
00251       GeneralUtilities::cout << "wait queue to drain" << std::endl;
00252     inqueue.drain();
00253   }
00254   
00255   void push(const X& x) { inqueue.push(x); }
00256   
00257   size_t size() const { return inqueue.size();}
00258 
00259     
00260   
00261 protected:
00262 
00263   Xqueue inqueue;
00264   bool interactive_;
00265   
00266 };
00267 
00268 #endif // UTILITIES_THREADS_TSQUEUE_H

Generated on Tue Jun 9 17:48:57 2009 for CMSSW by  doxygen 1.5.4