![]() |
![]() |
#include <EventFilter/StorageManager/src/DQMHttpSource.h>
Public Types | |
typedef std::vector< char > | Buf |
Public Member Functions | |
DQMHttpSource (const edm::ParameterSet &pset, const edm::InputSourceDescription &desc) | |
virtual | ~DQMHttpSource () |
Protected Attributes | |
DQMStore * | bei_ |
Private Member Functions | |
std::auto_ptr< edm::Event > | getOneDQMEvent () |
virtual std::auto_ptr< edm::Event > | readOneEvent () |
virtual void | registerWithDQMEventServer () |
Private Attributes | |
bool | alreadySaidHalted_ |
Buf | buf_ |
std::string | consumerTopFolderName_ |
unsigned int | DQMconsumerId_ |
std::string | DQMconsumerName_ |
std::string | DQMconsumerPriority_ |
char | DQMeventurl_ [256] |
char | DQMsubscriptionurl_ [256] |
unsigned int | events_read_ |
Strings | firstHistoExtractDone_ |
int | headerRetryInterval_ |
struct timeval | lastDQMRequestTime_ |
double | minDQMEventRequestInterval_ |
std::string | sourceurl_ |
unsigned int | updatesCounter_ |
Definition at line 25 of file DQMHttpSource.h.
typedef std::vector<char> edm::DQMHttpSource::Buf |
Definition at line 28 of file DQMHttpSource.h.
edm::DQMHttpSource::DQMHttpSource | ( | const edm::ParameterSet & | pset, | |
const edm::InputSourceDescription & | desc | |||
) | [explicit] |
Definition at line 36 of file DQMHttpSource.cc.
References bei_, DQMconsumerId_, DQMconsumerName_, DQMconsumerPriority_, DQMeventurl_, DQMsubscriptionurl_, edm::ParameterSet::getUntrackedParameter(), headerRetryInterval_, i, lastDQMRequestTime_, L1TDQM_GREJuly_cfg::maxEventRequestRate, minDQMEventRequestInterval_, NULL, registerWithDQMEventServer(), and sourceurl_.
00037 : 00038 edm::RawInputSource(pset,desc), 00039 updatesCounter_(0), 00040 sourceurl_(pset.getUntrackedParameter<string>("sourceURL")), 00041 buf_(1000*1000*7), 00042 events_read_(0), 00043 consumerTopFolderName_(pset.getUntrackedParameter<string>("topLevelFolderName")), 00044 alreadySaidHalted_(false) 00045 { 00046 std::string evturl = sourceurl_ + "/getDQMeventdata"; 00047 int stlen = evturl.length(); 00048 for (int i=0; i<stlen; i++) DQMeventurl_[i]=evturl[i]; 00049 DQMeventurl_[stlen] = '\0'; 00050 00051 std::string regurl = sourceurl_ + "/registerDQMConsumer"; 00052 stlen = regurl.length(); 00053 for (int i=0; i<stlen; i++) DQMsubscriptionurl_[i]=regurl[i]; 00054 DQMsubscriptionurl_[stlen] = '\0'; 00055 00056 const double MAX_REQUEST_INTERVAL = 300.0; // seconds 00057 DQMconsumerName_ = pset.getUntrackedParameter<string>("DQMconsumerName","Unknown"); 00058 DQMconsumerPriority_ = pset.getUntrackedParameter<string>("DQMconsumerPriority","normal"); 00059 headerRetryInterval_ = pset.getUntrackedParameter<int>("headerRetryInterval",5); 00060 double maxEventRequestRate = pset.getUntrackedParameter<double>("maxDQMEventRequestRate",1.0); 00061 if (maxEventRequestRate < (1.0 / MAX_REQUEST_INTERVAL)) { 00062 minDQMEventRequestInterval_ = MAX_REQUEST_INTERVAL; 00063 } 00064 else { 00065 minDQMEventRequestInterval_ = 1.0 / maxEventRequestRate; // seconds 00066 } 00067 lastDQMRequestTime_.tv_sec = 0; 00068 lastDQMRequestTime_.tv_usec = 0; 00069 00070 // register this DQM consumer with the DQMevent server of the Storage Manager 00071 DQMconsumerId_ = (time(0) & 0xffffff); // temporary - will get from ES later 00072 registerWithDQMEventServer(); 00073 // when running Async it seems bei_ is not NULL at the start after default ctor 00074 bei_ = NULL; 00075 }
virtual edm::DQMHttpSource::~DQMHttpSource | ( | ) | [inline, virtual] |
std::auto_ptr< Event > edm::DQMHttpSource::getOneDQMEvent | ( | ) | [private] |
Definition at line 97 of file DQMHttpSource.cc.
References alreadySaidHalted_, bei_, buf_, TestMuL1L2Filter_cff::cerr, DQMEventMsgView::compressionFlag(), convert(), count, GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, edm::StreamDQMDeserializer::deserializeDQMEvent(), Header::DONE, DQMconsumerId_, Header::DQMEVENT_REQUEST, DQMeventurl_, e, lat::endl(), DQMEventMsgView::eventNumberAtUpdate(), events_read_, Exception, DQMStore::extract(), FDEBUG, find(), DQMStore::findObject(), firstHistoExtractDone_, stor::func(), stor::DQMInstance::getSafeMEName(), DQMEventMsgView::headerSize(), i, iEvent, int, lastDQMRequestTime_, len, DQMEventMsgView::lumiSection(), DQMStore::makeDirectory(), edm::RawInputSource::makeEvent(), me, minDQMEventRequestInterval_, OtherMessageBuilder::msgBody(), NULL, path(), DQMEventMsgView::protocolVersion(), DQMStore::pwd(), DQMEventMsgView::releaseTag(), DQMEventMsgView::reserved(), DQMEventMsgView::runNumber(), DQMStore::setCurrentFolder(), stor::setopt(), edm::shutdown_flag, OtherMessageBuilder::size(), OtherMessageBuilder::startAddress(), DQMEventMsgView::subFolderCount(), te, ti, DQMEventMsgView::timeStamp(), DQMEventMsgView::topFolderName(), DQMEventMsgView::updateNumber(), and updatesCounter_.
Referenced by readOneEvent().
00098 { 00099 // repeat a https get every X seconds until we get a DQMevent 00100 // only way to stop is specify a maxEvents parameter 00101 // or kill the Storage Manager XDAQ application so the https get fails. 00102 00103 // check if we need to sleep (to enforce the allowed request rate) 00104 struct timeval now; 00105 struct timezone dummyTZ; 00106 gettimeofday(&now, &dummyTZ); 00107 double timeDiff = (double) now.tv_sec; 00108 timeDiff -= (double) lastDQMRequestTime_.tv_sec; 00109 timeDiff += ((double) now.tv_usec / 1000000.0); 00110 timeDiff -= ((double) lastDQMRequestTime_.tv_usec / 1000000.0); 00111 if (timeDiff < minDQMEventRequestInterval_) 00112 { 00113 double sleepTime = minDQMEventRequestInterval_ - timeDiff; 00114 // trim off a little sleep time to account for the time taken by 00115 // calling gettimeofday again 00116 sleepTime -= 0.01; 00117 if (sleepTime < 0.0) {sleepTime = 0.0;} 00118 usleep(static_cast<int>(1000000 * sleepTime)); 00119 gettimeofday(&lastDQMRequestTime_, &dummyTZ); 00120 } 00121 else 00122 { 00123 lastDQMRequestTime_ = now; 00124 } 00125 00126 stor::ReadData data; 00127 bool alreadySaidWaiting = false; 00128 do { 00129 CURL* han = curl_easy_init(); 00130 00131 if(han==0) 00132 { 00133 cerr << "DQMHttpSOurce: could not create handle" << endl; 00134 throw cms::Exception("getOneEvent","DQMHttpSource") 00135 << "Unable to create curl handle\n"; 00136 // this will end cmsRun 00137 } 00138 00139 stor::setopt(han,CURLOPT_URL,DQMeventurl_); 00140 stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func); 00141 stor::setopt(han,CURLOPT_WRITEDATA,&data); 00142 00143 // send our consumer ID as part of the event request 00144 char msgBuff[100]; 00145 OtherMessageBuilder requestMessage(&msgBuff[0], Header::DQMEVENT_REQUEST, 00146 sizeof(char_uint32)); 00147 uint8 *bodyPtr = requestMessage.msgBody(); 00148 char_uint32 convertedId; 00149 convert(DQMconsumerId_, convertedId); 00150 for (unsigned int idx = 0; idx < sizeof(char_uint32); idx++) { 00151 bodyPtr[idx] = convertedId[idx]; 00152 } 00153 stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress()); 00154 stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size()); 00155 struct curl_slist *headers=NULL; 00156 headers = curl_slist_append(headers, "Content-Type: application/octet-stream"); 00157 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary"); 00158 stor::setopt(han, CURLOPT_HTTPHEADER, headers); 00159 00160 // send the HTTP POST, read the reply, and cleanup before going on 00161 CURLcode messageStatus = curl_easy_perform(han); 00162 curl_slist_free_all(headers); 00163 curl_easy_cleanup(han); 00164 00165 if(messageStatus!=0) 00166 { 00167 cerr << "curl perform failed for DQMevent, messageStatus = " 00168 << messageStatus << endl; 00169 throw cms::Exception("getOneDQMEvent","DQMHttpSource") 00170 << "Could not get event: probably XDAQ not running on Storage Manager " 00171 << "\n"; 00172 // this will end cmsRun 00173 } 00174 if(data.d_.length() == 0) 00175 { 00176 if(!alreadySaidWaiting) { 00177 std::cout << "...waiting for DQMevent from Storage Manager..." << std::endl; 00178 alreadySaidWaiting = true; 00179 } 00180 // sleep for the standard request interval 00181 usleep(static_cast<int>(1000000 * minDQMEventRequestInterval_)); 00182 } 00183 } while (data.d_.length() == 0 && !edm::shutdown_flag); 00184 if (edm::shutdown_flag) { 00185 return std::auto_ptr<edm::Event>(); 00186 } 00187 00188 int len = data.d_.length(); 00189 FDEBUG(9) << "DQMHttpSource received len = " << len << std::endl; 00190 buf_.resize(len); 00191 for (int i=0; i<len ; i++) buf_[i] = data.d_[i]; 00192 00193 OtherMessageView msgView(&buf_[0]); 00194 00195 RunNumber_t iRun = 0; 00196 LuminosityBlockNumber_t iLumi = 0; 00197 EventNumber_t iEvent = 0; 00198 TimeValue_t tStamp = 1; 00199 Timestamp timeStamp (tStamp); 00200 00201 if (msgView.code() == Header::DONE) { 00202 // Continue past run boundaries (SM halt) 00203 // no need to register again as the SM/EventServer is kept alive on a stopAction 00204 if(!alreadySaidHalted_) { 00205 alreadySaidHalted_ = true; 00206 std::cout << "Storage Manager has halted - waiting for restart" << std::endl; 00207 } 00208 return std::auto_ptr<edm::Event>(); 00209 } else { 00210 // counting the updates 00211 ++updatesCounter_; 00212 ++events_read_; 00213 DQMEventMsgView dqmEventView(&buf_[0]); 00214 iRun = dqmEventView.runNumber(); 00215 iLumi = dqmEventView.lumiSection(); 00216 iEvent = dqmEventView.eventNumberAtUpdate(); 00217 timeStamp = dqmEventView.timeStamp(); 00218 00219 FDEBUG(8) << " DQM Message data:" << std::endl; 00220 FDEBUG(8) << " protocol version = " 00221 << dqmEventView.protocolVersion() << std::endl; 00222 FDEBUG(8) << " header size = " 00223 << dqmEventView.headerSize() << std::endl; 00224 FDEBUG(8) << " run number = " 00225 << dqmEventView.runNumber() << std::endl; 00226 FDEBUG(8) << " event number = " 00227 << dqmEventView.eventNumberAtUpdate() << std::endl; 00228 FDEBUG(8) << " lumi section = " 00229 << dqmEventView.lumiSection() << std::endl; 00230 FDEBUG(8) << " update number = " 00231 << dqmEventView.updateNumber() << std::endl; 00232 FDEBUG(8) << " compression flag = " 00233 << dqmEventView.compressionFlag() << std::endl; 00234 FDEBUG(8) << " reserved word = " 00235 << dqmEventView.reserved() << std::endl; 00236 FDEBUG(8) << " release tag = " 00237 << dqmEventView.releaseTag() << std::endl; 00238 FDEBUG(8) << " top folder name = " 00239 << dqmEventView.topFolderName() << std::endl; 00240 FDEBUG(8) << " sub folder count = " 00241 << dqmEventView.subFolderCount() << std::endl; 00242 00243 // deserialize and stick into DQM backend 00244 // need both types of interfaces as the extractObject I use is 00245 // only in DQMStore 00246 if (bei_ == NULL) { 00247 bei_ = edm::Service<DQMStore>().operator->(); 00248 } 00249 if (bei_ == NULL) { 00250 throw cms::Exception("readOneEvent", "DQMHttpSource") 00251 << "Unable to lookup the DQMStore service!\n"; 00252 } 00253 00254 edm::StreamDQMDeserializer deserializeWorker; 00255 std::auto_ptr<DQMEvent::TObjectTable> toTablePtr = 00256 deserializeWorker.deserializeDQMEvent(dqmEventView); 00257 00258 unsigned int count = 0; 00259 DQMEvent::TObjectTable::const_iterator toIter; 00260 for (toIter = toTablePtr->begin(); 00261 toIter != toTablePtr->end(); toIter++) { 00262 std::string subFolderName = toIter->first; 00263 std::vector<TObject *> toList = toIter->second; 00264 bei_->makeDirectory(subFolderName); // fetch or create 00265 bei_->setCurrentFolder(subFolderName); 00266 for (int tdx = 0; tdx < (int) toList.size(); tdx++) { 00267 TObject *toPtr = toList[tdx]; 00268 std::string cls = toPtr->IsA()->GetName(); 00269 std::string nm = stor::DQMInstance::getSafeMEName(toPtr); 00270 FDEBUG(8) << " TObject class = " << cls << ", name = " << nm << std::endl; 00271 if (bei_->extract(toPtr, bei_->pwd(), true)) 00272 { 00273 std::string path; 00274 if (MonitorElement *me = bei_->findObject(subFolderName, nm, path)) 00275 me->update(); 00276 ++count; 00277 } 00278 } 00279 } 00280 00281 // clean up memory by spinning through the DQMEvent::TObjectTable map and 00282 // deleting each TObject in the std::vector<TObject *> later we will 00283 // change map to use std::vector< boost::shared_ptr<TObject> > 00284 DQMEvent::TObjectTable::iterator ti(toTablePtr->begin()), te(toTablePtr->end()); 00285 for ( ; ti != te; ++ti) { 00286 std::string subFolderName = ti->first; 00287 std::vector<TObject *>::iterator vi(ti->second.begin()), ve(ti->second.end()); 00288 for ( ; vi != ve; ++vi) { 00289 std::string histoName = stor::DQMInstance::getSafeMEName(*vi); 00290 std::string fullName = subFolderName + "/" + histoName; 00291 std::vector<std::string>::iterator entryFound; 00292 entryFound = std::find(firstHistoExtractDone_.begin(), 00293 firstHistoExtractDone_.end(), 00294 fullName); 00295 // 30-May-2008, KAB - skip over deleting the memory of the first 00296 // ME passed to bei_->extract() until we check into having that 00297 // code copy the ME instead of using it directly. 00298 if (entryFound == firstHistoExtractDone_.end()) { 00299 firstHistoExtractDone_.push_back(fullName); 00300 } 00301 else { 00302 delete *vi; 00303 } 00304 } 00305 } 00306 } 00307 00308 EventID eventId(iRun,iEvent); 00309 00310 // make a fake event containing no data but the evId and runId from DQMEvent 00311 // and the time stamp from the event at update 00312 std::auto_ptr<Event> e = makeEvent(iRun,iLumi,iEvent,timeStamp); 00313 00314 return e; 00315 }
std::auto_ptr< Event > edm::DQMHttpSource::readOneEvent | ( | ) | [private, virtual] |
Implements edm::RawInputSource.
Definition at line 78 of file DQMHttpSource.cc.
References getOneDQMEvent(), NULL, HLT_VtxMuL3::result, and edm::shutdown_flag.
00079 { 00080 // repeat a https get every X seconds until we get a DQMevent 00081 // only way to stop is specify a maxEvents parameter 00082 // or kill the Storage Manager XDAQ application so the https get fails. 00083 00084 // try to get an event repeat until we get one, this allows 00085 // re-registration is the SM is halted or stopped 00086 00087 bool gotEvent = false; 00088 std::auto_ptr<Event> result(0); 00089 while ((!gotEvent) && (!edm::shutdown_flag)) 00090 { 00091 result = getOneDQMEvent(); 00092 if(result.get() != NULL) gotEvent = true; 00093 } 00094 return result; 00095 }
void edm::DQMHttpSource::registerWithDQMEventServer | ( | ) | [private, virtual] |
Definition at line 317 of file DQMHttpSource.cc.
References buf_, TestMuL1L2Filter_cff::cerr, consumerTopFolderName_, GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, DQMconsumerId_, DQMconsumerName_, DQMconsumerPriority_, DQMsubscriptionurl_, lat::endl(), ConsRegResponseBuilder::ES_NOT_READY, Exception, FDEBUG, stor::func(), headerRetryInterval_, i, len, NULL, stor::setopt(), ConsRegRequestBuilder::size(), and ConsRegRequestBuilder::startAddress().
Referenced by DQMHttpSource().
00318 { 00319 stor::ReadData data; 00320 uint32 registrationStatus; 00321 bool alreadySaidWaiting = false; 00322 do { 00323 data.d_.clear(); 00324 CURL* han = curl_easy_init(); 00325 if(han==0) 00326 { 00327 cerr << "could not create handle" << endl; 00328 throw cms::Exception("registerWithDQMEventServer","DQMHttpSource") 00329 << "Unable to create curl handle\n"; 00330 } 00331 00332 // set the standard https request options 00333 stor::setopt(han,CURLOPT_URL,DQMsubscriptionurl_); 00334 stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func); 00335 stor::setopt(han,CURLOPT_WRITEDATA,&data); 00336 00337 // build the registration request message to send to the storage manager 00338 const int BUFFER_SIZE = 2000; 00339 char msgBuff[BUFFER_SIZE]; 00340 ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, DQMconsumerName_, 00341 DQMconsumerPriority_, consumerTopFolderName_); 00342 00343 // add the request message as a https post 00344 stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress()); 00345 stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size()); 00346 struct curl_slist *headers=NULL; 00347 headers = curl_slist_append(headers, "Content-Type: application/octet-stream"); 00348 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary"); 00349 stor::setopt(han, CURLOPT_HTTPHEADER, headers); 00350 00351 // send the HTTP POST, read the reply, and cleanup before going on 00352 CURLcode messageStatus = curl_easy_perform(han); 00353 curl_slist_free_all(headers); 00354 curl_easy_cleanup(han); 00355 00356 if(messageStatus!=0) 00357 { 00358 cerr << "curl perform failed for DQM registration" << endl; 00359 throw cms::Exception("registerWithDQMEventServer","DQMHttpSource") 00360 << "Could not register: probably XDAQ not running or no Storage Manager/SMProxyServer loaded" 00361 << "\n"; 00362 } 00363 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY; 00364 if(data.d_.length() > 0) 00365 { 00366 int len = data.d_.length(); 00367 FDEBUG(9) << "DQMHttpSource received len = " << len << std::endl; 00368 buf_.resize(len); 00369 for (int i=0; i<len ; i++) buf_[i] = data.d_[i]; 00370 00371 try { 00372 ConsRegResponseView respView(&buf_[0]); 00373 registrationStatus = respView.getStatus(); 00374 DQMconsumerId_ = respView.getConsumerId(); 00375 } 00376 catch (cms::Exception excpt) { 00377 const unsigned int MAX_DUMP_LENGTH = 1000; 00378 std::cout << "========================================" << std::endl; 00379 std::cout << "* Exception decoding the registerWithEventServer response!" << std::endl; 00380 if (data.d_.length() <= MAX_DUMP_LENGTH) { 00381 std::cout << "* Here is the raw text that was returned:" << std::endl; 00382 std::cout << data.d_ << std::endl; 00383 } 00384 else { 00385 std::cout << "* Here are the first " << MAX_DUMP_LENGTH << 00386 " characters of the raw text that was returned:" << std::endl; 00387 std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl; 00388 } 00389 std::cout << "========================================" << std::endl; 00390 throw excpt; 00391 } 00392 } 00393 00394 if (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY) 00395 { 00396 if(!alreadySaidWaiting) { 00397 std::cout << "...waiting for DQM registration response from StorageManager or SMProxyServer..." 00398 << std::endl; 00399 alreadySaidWaiting = true; 00400 } 00401 // sleep for desired amount of time 00402 sleep(headerRetryInterval_); 00403 } 00404 } while (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY && 00405 !edm::shutdown_flag); 00406 00407 FDEBUG(9) << "Consumer ID = " << DQMconsumerId_ << endl; 00408 }
bool edm::DQMHttpSource::alreadySaidHalted_ [private] |
DQMStore* edm::DQMHttpSource::bei_ [protected] |
Buf edm::DQMHttpSource::buf_ [private] |
Definition at line 46 of file DQMHttpSource.h.
Referenced by getOneDQMEvent(), and registerWithDQMEventServer().
std::string edm::DQMHttpSource::consumerTopFolderName_ [private] |
unsigned int edm::DQMHttpSource::DQMconsumerId_ [private] |
Definition at line 53 of file DQMHttpSource.h.
Referenced by DQMHttpSource(), getOneDQMEvent(), and registerWithDQMEventServer().
std::string edm::DQMHttpSource::DQMconsumerName_ [private] |
Definition at line 48 of file DQMHttpSource.h.
Referenced by DQMHttpSource(), and registerWithDQMEventServer().
std::string edm::DQMHttpSource::DQMconsumerPriority_ [private] |
Definition at line 49 of file DQMHttpSource.h.
Referenced by DQMHttpSource(), and registerWithDQMEventServer().
char edm::DQMHttpSource::DQMeventurl_[256] [private] |
char edm::DQMHttpSource::DQMsubscriptionurl_[256] [private] |
Definition at line 45 of file DQMHttpSource.h.
Referenced by DQMHttpSource(), and registerWithDQMEventServer().
unsigned int edm::DQMHttpSource::events_read_ [private] |
int edm::DQMHttpSource::headerRetryInterval_ [private] |
Definition at line 51 of file DQMHttpSource.h.
Referenced by DQMHttpSource(), and registerWithDQMEventServer().
struct timeval edm::DQMHttpSource::lastDQMRequestTime_ [read, private] |
double edm::DQMHttpSource::minDQMEventRequestInterval_ [private] |
std::string edm::DQMHttpSource::sourceurl_ [private] |
unsigned int edm::DQMHttpSource::updatesCounter_ [private] |