00001 #ifndef UTILITIES_THREADS_TSQUEUE_H
00002 #define UTILITIES_THREADS_TSQUEUE_H
00003
00004
00005
00006 #include "Utilities/Threads/interface/Thread.h"
00007 #include "Utilities/General/interface/MutexUtils.h"
00008 #include <boost/thread/condition.hpp>
00009 #include "Utilities/General/interface/GeneralVerbosity.h"
00010
00011 #include "Utilities/General/interface/CMSexception.h"
00012 #include "Utilities/General/interface/UncatchedException.h"
00013 #include "Utilities/General/interface/ClassName.h"
00014 #include "boost/lexical_cast.hpp"
00015
00016 #include <queue>
00017 #include <deque>
00018
00019 struct TSqueue_termination : public Thread::Terminate {};
00020
00023 template<class X>
00024 class TSqueue {
00025
00026 public:
00027
00028 typedef TSqueue<X> self;
00029
00030 public:
00032 explicit TSqueue(size_t imax=1000, size_t imin=900) :
00033 maxSize(imax), minSize(imin),
00034 draining_(false), terminate_(false), abort_(false) {
00035 if (minSize<1) minSize=1;
00036 }
00037
00038 void reset() {
00039 draining_=terminate_=abort_=false;
00040 }
00041
00043 void push(const X& x) {
00044 SimpleLockMutex gl(genLock);
00045 if (abort_) throw TSqueue_termination();
00046 while(cont.size()>maxSize) {
00047 notFull.wait(gl());
00048 if (abort_) throw TSqueue_termination();
00049 }
00050 cont.push(x);
00051 if ( !(cont.size()<minSize )) { notEmpty.notify_all(); }
00052 }
00053
00055 X pop() {
00056 SimpleLockMutex gl(genLock);
00057 while(cont.empty()) {
00058 if (draining_) dodrain.notify_all();
00059 if (terminate_) throw TSqueue_termination();
00060 notEmpty.wait(gl());
00061 }
00062 X x = cont.front();
00063 cont.pop();
00064 if (cont.size()<minSize) notFull.notify_all();
00065 return x;
00066 }
00067
00069 void terminate() {
00070 SimpleLockMutex gl(genLock);
00071 terminate_ = true;
00072 notEmpty.notify_all();
00073 }
00074
00076 void abort() {
00077 SimpleLockMutex gl(genLock);
00078 abort_ = true;
00079 clear();
00080 notFull.notify_all();
00081 if (draining_) dodrain.notify_all();
00082 }
00083
00085 void drain() {
00086 SimpleLockMutex gl(genLock);
00087 if (cont.empty()) return;
00088 draining_=true;
00089 notEmpty.notify_all();
00090 while(!cont.empty()) {
00091 dodrain.wait(gl());
00092 if (abort_) throw TSqueue_termination();
00093 }
00094 }
00096 size_t size() const {
00097 SimpleLockMutex gl(genLock);
00098 return cont.size();
00099 }
00100
00101 void clear() {
00102 while(!cont.empty()) cont.pop();
00103 }
00104
00105
00106 private:
00107
00108
00109
00110 std::queue<X,std::deque<X> > cont;
00111
00112 size_t maxSize;
00113 size_t minSize;
00114
00115 mutable SimpleLockMutex::Mutex genLock;
00116 boost::condition notEmpty;
00117 boost::condition notFull;
00118 boost::condition dodrain;
00119
00120
00121 bool draining_;
00122
00123 bool terminate_;
00124 bool abort_;
00125
00126 };
00127
00130 template<class X, class C>
00131 class TSqueueConsumer : public Thread{
00132
00133 public:
00134
00135 typedef TSqueueConsumer<X,C> self;
00136
00137 public:
00138
00139 typedef TSqueue<X> Xqueue;
00140
00141 private:
00142
00143 void run() {
00144 if (!GeneralVerbosity::silent())
00145 GeneralUtilities::cout << ">>>>> consumer " << ClassName<C>::name()
00146 << " thread starts.." << thread_self_tid() << std::endl;
00147
00148 #ifdef __linux__
00149
00150
00151
00152 #endif // __linux__
00153
00154 if (!GeneralVerbosity::silent())
00155 GeneralUtilities::cout << thread_self_tid() << " queue size is "<< inqueue.size() << std::endl;
00156
00157 C consumer;
00158
00159 try{
00160 for(;;) {
00161 try {
00162 consumer(inqueue.pop());
00163 }
00164 catch ( Thread::Terminate ) {
00165 throw;
00166 }
00167 catch ( GenTerminate ) {
00168 if (!interactive_) throw;
00169 abort();
00170 }
00171 catch (...) {
00172 if (!interactive_) throw;
00173 if (!GeneralVerbosity::silent()) {
00174 GeneralUtilities::cout << "thread "
00175 << self_tid() << " caught exception " << std::endl;
00176 }
00177 abort();
00178 }
00179 }
00180 }
00181 catch ( Thread::Terminate ) {
00182 throw;
00183 }
00184 catch ( GenTerminate ) {
00185 throw;
00186 }
00187 catch ( cms::Exception & cexp) {
00188 if (!GeneralVerbosity::silent()) {
00189 GeneralUtilities::cout << "thread "
00190 << self_tid() << " caught CMS/SEAL exception ("
00191 << className(cexp)
00192 << ") :" << std::endl;
00193 Genexception * ce = dynamic_cast<Genexception *>(&cexp);
00194 if (ce) ce->dump(GeneralUtilities::cout, true);
00195 else GeneralUtilities::cout << cexp.what() << std::endl;
00196 GeneralUtilities::cout << " inform producer...." << std::endl;
00197 }
00198 UncatchedException uce(cexp);
00199 assert(&uce != 0);
00200 abort();
00201 }
00202 catch (...) {
00203 if (!GeneralVerbosity::silent()) {
00204 GeneralUtilities::cout << "thread "
00205 << self_tid() << " caught unknown exception " << std::endl;
00206 GeneralUtilities::cout << " inform producer...." << std::endl;
00207 }
00208 UncatchedException uce;
00209 assert(&uce != 0);
00210 abort();
00211 }
00212
00213 if (!GeneralVerbosity::silent())
00214 GeneralUtilities::cout << ">>>>> consumer " << ClassName<C>::name()
00215 << " thread ends.." << thread_self_tid() << std::endl;
00216 }
00217
00219 virtual void terminate(){
00220 if (!GeneralVerbosity::silent())
00221 GeneralUtilities::cout << ">>>>> consumer " << ClassName<C>::name()
00222 << " thread terminate.." << thread_self_tid() << std:: endl;
00223 }
00224
00226 virtual void abort() {
00227 if (!GeneralVerbosity::silent())
00228 GeneralUtilities::cout << ">>>>> consumer " << ClassName<C>::name()
00229 << " thread aborts.." << thread_self_tid() << std::endl;
00230 inqueue.abort();
00231 }
00232
00233 public:
00234 explicit TSqueueConsumer(size_t imax=1000, size_t imin=900):
00235 inqueue(imax,imin), interactive_(false) {
00236 initialize();
00237 }
00238
00239 ~TSqueueConsumer() {
00240 inqueue.terminate();
00241 finalize();
00242 }
00243
00244 void reset() { inqueue.reset();}
00245
00246 void interactive() { interactive_=true;}
00247
00248
00249 void wait() {
00250 if (!GeneralVerbosity::silent())
00251 GeneralUtilities::cout << "wait queue to drain" << std::endl;
00252 inqueue.drain();
00253 }
00254
00255 void push(const X& x) { inqueue.push(x); }
00256
00257 size_t size() const { return inqueue.size();}
00258
00259
00260
00261 protected:
00262
00263 Xqueue inqueue;
00264 bool interactive_;
00265
00266 };
00267
00268 #endif // UTILITIES_THREADS_TSQUEUE_H