00001 // $Id: TRootXTReq.cc,v 1.3 2011/01/24 17:44:21 amraktad Exp $ 00002 00003 #include "Fireworks/Core/interface/TRootXTReq.h" 00004 00005 #include "TCondition.h" 00006 #include "TThread.h" 00007 00008 #include <TSysEvtHandler.h> 00009 #include <TSystem.h> 00010 #include <TTimer.h> 00011 00012 // Bloody root threads do not provide signal delivery. 00013 #include <csignal> 00014 00015 // TRootXTReq 00016 00017 //______________________________________________________________________________ 00018 // 00019 // Abstract base-class for delivering cross-thread requests into ROOT 00020 // main thread. 00021 // Sub-classes must implement the Act() method. 00022 // Two methods are available to request execution: 00023 // 00024 // - ShootRequest() 00025 // Request execution and return immediately. 00026 // The request object is deleted in the main thread. 00027 // 00028 // - ShootRequestAndWait() 00029 // Request execution and wait until execution is finished. 00030 // This way results can be returned to the caller in data-members. 00031 // It is callers responsibility to delete the request object. 00032 // It can also be reused. 00033 // 00034 // 00035 // Global queue and locks are implemented via static members of this 00036 // class. 00037 // 00038 // Must be initialized from the main thread like this: 00039 // TRootXTReq::Bootstrap(TThread::SelfId()); 00040 00041 00042 TRootXTReq::lpXTReq_t TRootXTReq::sQueue; 00043 pthread_t TRootXTReq::sRootThread = 0; 00044 TMutex *TRootXTReq::sQueueMutex = 0; 00045 TSignalHandler *TRootXTReq::sSigHandler = 0; 00046 bool TRootXTReq::sSheduled = false; 00047 00048 00049 //============================================================================== 00050 00051 TRootXTReq::TRootXTReq(const char* n) : 00052 m_return_condition(0), 00053 mName(n) 00054 {} 00055 00056 TRootXTReq::~TRootXTReq() 00057 { 00058 delete m_return_condition; 00059 } 00060 00061 //------------------------------------------------------------------------------ 00062 00063 void TRootXTReq::post_request() 00064 { 00065 TLockGuard _lck(sQueueMutex); 00066 00067 sQueue.push_back(this); 00068 00069 if ( ! sSheduled) 00070 { 00071 sSheduled = true; 00072 pthread_kill(sRootThread, SIGUSR1); 00073 } 00074 } 00075 00076 void TRootXTReq::ShootRequest() 00077 { 00078 // Places request into the queue and requests execution in Rint thread. 00079 // It returns immediately after that, without waiting for execution. 00080 // The request is deleted after execution. 00081 00082 if (m_return_condition) 00083 { 00084 delete m_return_condition; 00085 m_return_condition = 0; 00086 } 00087 00088 post_request(); 00089 } 00090 00091 void TRootXTReq::ShootRequestAndWait() 00092 { 00093 // Places request into the queue, requests execution in Rint thread and 00094 // waits for the execution to be completed. 00095 // The request is not deleted after execution as it might carry return 00096 // value. 00097 // The same request can be reused several times. 00098 00099 if (!m_return_condition) 00100 m_return_condition = new TCondition; 00101 00102 m_return_condition->GetMutex()->Lock(); 00103 00104 post_request(); 00105 00106 m_return_condition->Wait(); 00107 m_return_condition->GetMutex()->UnLock(); 00108 } 00109 00110 00111 //============================================================================== 00112 00113 class RootSig2XTReqHandler : public TSignalHandler 00114 { 00115 private: 00116 class XTReqTimer : public TTimer 00117 { 00118 public: 00119 XTReqTimer() : TTimer() {} 00120 virtual ~XTReqTimer() {} 00121 00122 void FireAway() 00123 { 00124 Reset(); 00125 gSystem->AddTimer(this); 00126 } 00127 00128 virtual Bool_t Notify() 00129 { 00130 gSystem->RemoveTimer(this); 00131 TRootXTReq::ProcessQueue(); 00132 return kTRUE; 00133 } 00134 }; 00135 00136 XTReqTimer mTimer; 00137 00138 public: 00139 RootSig2XTReqHandler() : TSignalHandler(kSigUser1), mTimer() { Add(); } 00140 virtual ~RootSig2XTReqHandler() {} 00141 00142 virtual Bool_t Notify() 00143 { 00144 printf("Usr1 Woof Woof in Root thread! Starting Timer.\n"); 00145 mTimer.FireAway(); 00146 return kTRUE; 00147 } 00148 }; 00149 00150 //------------------------------------------------------------------------------ 00151 00152 void TRootXTReq::Bootstrap(pthread_t root_thread) 00153 { 00154 static const TString _eh("TRootXTReq::Bootstrap "); 00155 00156 if (sRootThread != 0) 00157 throw _eh + "Already initialized."; 00158 00159 sRootThread = root_thread; 00160 sQueueMutex = new TMutex(kTRUE); 00161 sSigHandler = new RootSig2XTReqHandler; 00162 } 00163 00164 void TRootXTReq::Shutdown() 00165 { 00166 static const TString _eh("TRootXTReq::Shutdown "); 00167 00168 if (sRootThread == 0) 00169 throw _eh + "Have not beem initialized."; 00170 00171 // Should lock and drain queue ... or sth. 00172 00173 sRootThread = 0; 00174 delete sSigHandler; sSigHandler = 0; 00175 delete sQueueMutex; sQueueMutex = 0; 00176 } 00177 00178 void TRootXTReq::ProcessQueue() 00179 { 00180 printf("Timer fired, processing queue.\n"); 00181 00182 while (true) 00183 { 00184 TRootXTReq *req = 0; 00185 { 00186 TLockGuard _lck(sQueueMutex); 00187 00188 if ( ! sQueue.empty()) 00189 { 00190 req = sQueue.front(); 00191 sQueue.pop_front(); 00192 } 00193 else 00194 { 00195 sSheduled = false; 00196 break; 00197 } 00198 } 00199 00200 req->Act(); 00201 00202 if (req->m_return_condition) 00203 { 00204 req->m_return_condition->GetMutex()->Lock(); 00205 req->m_return_condition->Signal(); 00206 req->m_return_condition->GetMutex()->UnLock(); 00207 } 00208 else 00209 { 00210 delete req; 00211 } 00212 } 00213 }