00001
00008 #include "EventFilter/StorageManager/src/DQMHttpSource.h"
00009 #include "EventFilter/StorageManager/interface/SMCurlInterface.h"
00010 #include "EventFilter/StorageManager/interface/DQMInstance.h"
00011 #include "FWCore/Utilities/interface/DebugMacros.h"
00012 #include "FWCore/Framework/interface/Event.h"
00013 #include "FWCore/ServiceRegistry/interface/Service.h"
00014 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00015
00016 #include "IOPool/Streamer/interface/OtherMessage.h"
00017 #include "IOPool/Streamer/interface/ConsRegMessage.h"
00018 #include "IOPool/Streamer/interface/DQMEventMessage.h"
00019 #include "IOPool/Streamer/interface/StreamDQMDeserializer.h"
00020
00021 #include "DQMServices/Core/interface/DQMStore.h"
00022 #include "DQMServices/Core/interface/MonitorElement.h"
00023
00024 #include "TClass.h"
00025
00026 #include <iostream>
00027 #include <sys/time.h>
00028 #include "curl/curl.h"
00029 #include <wait.h>
00030
00031 using namespace edm;
00032 using namespace std;
00033
00034 namespace edm
00035 {
00036 DQMHttpSource::DQMHttpSource(const ParameterSet& pset,
00037 const InputSourceDescription& desc) :
00038 edm::RawInputSource(pset,desc),
00039 updatesCounter_(0),
00040 sourceurl_(pset.getUntrackedParameter<string>("sourceURL")),
00041 buf_(1000*1000*7),
00042 events_read_(0),
00043 consumerTopFolderName_(pset.getUntrackedParameter<string>("topLevelFolderName")),
00044 alreadySaidHalted_(false)
00045 {
00046 std::string evturl = sourceurl_ + "/getDQMeventdata";
00047 int stlen = evturl.length();
00048 for (int i=0; i<stlen; i++) DQMeventurl_[i]=evturl[i];
00049 DQMeventurl_[stlen] = '\0';
00050
00051 std::string regurl = sourceurl_ + "/registerDQMConsumer";
00052 stlen = regurl.length();
00053 for (int i=0; i<stlen; i++) DQMsubscriptionurl_[i]=regurl[i];
00054 DQMsubscriptionurl_[stlen] = '\0';
00055
00056 const double MAX_REQUEST_INTERVAL = 300.0;
00057 DQMconsumerName_ = pset.getUntrackedParameter<string>("DQMconsumerName","Unknown");
00058 DQMconsumerPriority_ = pset.getUntrackedParameter<string>("DQMconsumerPriority","normal");
00059 headerRetryInterval_ = pset.getUntrackedParameter<int>("headerRetryInterval",5);
00060 double maxEventRequestRate = pset.getUntrackedParameter<double>("maxDQMEventRequestRate",1.0);
00061 if (maxEventRequestRate < (1.0 / MAX_REQUEST_INTERVAL)) {
00062 minDQMEventRequestInterval_ = MAX_REQUEST_INTERVAL;
00063 }
00064 else {
00065 minDQMEventRequestInterval_ = 1.0 / maxEventRequestRate;
00066 }
00067 lastDQMRequestTime_.tv_sec = 0;
00068 lastDQMRequestTime_.tv_usec = 0;
00069
00070
00071 DQMconsumerId_ = (time(0) & 0xffffff);
00072 registerWithDQMEventServer();
00073
00074 bei_ = NULL;
00075 }
00076
00077
00078 std::auto_ptr<Event> DQMHttpSource::readOneEvent()
00079 {
00080
00081
00082
00083
00084
00085
00086
00087 bool gotEvent = false;
00088 std::auto_ptr<Event> result(0);
00089 while ((!gotEvent) && (!edm::shutdown_flag))
00090 {
00091 result = getOneDQMEvent();
00092 if(result.get() != NULL) gotEvent = true;
00093 }
00094 return result;
00095 }
00096
00097 std::auto_ptr<Event> DQMHttpSource::getOneDQMEvent()
00098 {
00099
00100
00101
00102
00103
00104 struct timeval now;
00105 struct timezone dummyTZ;
00106 gettimeofday(&now, &dummyTZ);
00107 double timeDiff = (double) now.tv_sec;
00108 timeDiff -= (double) lastDQMRequestTime_.tv_sec;
00109 timeDiff += ((double) now.tv_usec / 1000000.0);
00110 timeDiff -= ((double) lastDQMRequestTime_.tv_usec / 1000000.0);
00111 if (timeDiff < minDQMEventRequestInterval_)
00112 {
00113 double sleepTime = minDQMEventRequestInterval_ - timeDiff;
00114
00115
00116 sleepTime -= 0.01;
00117 if (sleepTime < 0.0) {sleepTime = 0.0;}
00118 usleep(static_cast<int>(1000000 * sleepTime));
00119 gettimeofday(&lastDQMRequestTime_, &dummyTZ);
00120 }
00121 else
00122 {
00123 lastDQMRequestTime_ = now;
00124 }
00125
00126 stor::ReadData data;
00127 bool alreadySaidWaiting = false;
00128 do {
00129 CURL* han = curl_easy_init();
00130
00131 if(han==0)
00132 {
00133 cerr << "DQMHttpSOurce: could not create handle" << endl;
00134 throw cms::Exception("getOneEvent","DQMHttpSource")
00135 << "Unable to create curl handle\n";
00136
00137 }
00138
00139 stor::setopt(han,CURLOPT_URL,DQMeventurl_);
00140 stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00141 stor::setopt(han,CURLOPT_WRITEDATA,&data);
00142
00143
00144 char msgBuff[100];
00145 OtherMessageBuilder requestMessage(&msgBuff[0], Header::DQMEVENT_REQUEST,
00146 sizeof(char_uint32));
00147 uint8 *bodyPtr = requestMessage.msgBody();
00148 char_uint32 convertedId;
00149 convert(DQMconsumerId_, convertedId);
00150 for (unsigned int idx = 0; idx < sizeof(char_uint32); idx++) {
00151 bodyPtr[idx] = convertedId[idx];
00152 }
00153 stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00154 stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00155 struct curl_slist *headers=NULL;
00156 headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00157 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00158 stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00159
00160
00161 CURLcode messageStatus = curl_easy_perform(han);
00162 curl_slist_free_all(headers);
00163 curl_easy_cleanup(han);
00164
00165 if(messageStatus!=0)
00166 {
00167 cerr << "curl perform failed for DQMevent, messageStatus = "
00168 << messageStatus << endl;
00169 throw cms::Exception("getOneDQMEvent","DQMHttpSource")
00170 << "Could not get event: probably XDAQ not running on Storage Manager "
00171 << "\n";
00172
00173 }
00174 if(data.d_.length() == 0)
00175 {
00176 if(!alreadySaidWaiting) {
00177 std::cout << "...waiting for DQMevent from Storage Manager..." << std::endl;
00178 alreadySaidWaiting = true;
00179 }
00180
00181 usleep(static_cast<int>(1000000 * minDQMEventRequestInterval_));
00182 }
00183 } while (data.d_.length() == 0 && !edm::shutdown_flag);
00184 if (edm::shutdown_flag) {
00185 return std::auto_ptr<edm::Event>();
00186 }
00187
00188 int len = data.d_.length();
00189 FDEBUG(9) << "DQMHttpSource received len = " << len << std::endl;
00190 buf_.resize(len);
00191 for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00192
00193 OtherMessageView msgView(&buf_[0]);
00194
00195 RunNumber_t iRun = 0;
00196 LuminosityBlockNumber_t iLumi = 0;
00197 EventNumber_t iEvent = 0;
00198 TimeValue_t tStamp = 1;
00199 Timestamp timeStamp (tStamp);
00200
00201 if (msgView.code() == Header::DONE) {
00202
00203
00204 if(!alreadySaidHalted_) {
00205 alreadySaidHalted_ = true;
00206 std::cout << "Storage Manager has halted - waiting for restart" << std::endl;
00207 }
00208 return std::auto_ptr<edm::Event>();
00209 } else {
00210
00211 ++updatesCounter_;
00212 ++events_read_;
00213 DQMEventMsgView dqmEventView(&buf_[0]);
00214 iRun = dqmEventView.runNumber();
00215 iLumi = dqmEventView.lumiSection();
00216 iEvent = dqmEventView.eventNumberAtUpdate();
00217 timeStamp = dqmEventView.timeStamp();
00218
00219 FDEBUG(8) << " DQM Message data:" << std::endl;
00220 FDEBUG(8) << " protocol version = "
00221 << dqmEventView.protocolVersion() << std::endl;
00222 FDEBUG(8) << " header size = "
00223 << dqmEventView.headerSize() << std::endl;
00224 FDEBUG(8) << " run number = "
00225 << dqmEventView.runNumber() << std::endl;
00226 FDEBUG(8) << " event number = "
00227 << dqmEventView.eventNumberAtUpdate() << std::endl;
00228 FDEBUG(8) << " lumi section = "
00229 << dqmEventView.lumiSection() << std::endl;
00230 FDEBUG(8) << " update number = "
00231 << dqmEventView.updateNumber() << std::endl;
00232 FDEBUG(8) << " compression flag = "
00233 << dqmEventView.compressionFlag() << std::endl;
00234 FDEBUG(8) << " reserved word = "
00235 << dqmEventView.reserved() << std::endl;
00236 FDEBUG(8) << " release tag = "
00237 << dqmEventView.releaseTag() << std::endl;
00238 FDEBUG(8) << " top folder name = "
00239 << dqmEventView.topFolderName() << std::endl;
00240 FDEBUG(8) << " sub folder count = "
00241 << dqmEventView.subFolderCount() << std::endl;
00242
00243
00244
00245
00246 if (bei_ == NULL) {
00247 bei_ = edm::Service<DQMStore>().operator->();
00248 }
00249 if (bei_ == NULL) {
00250 throw cms::Exception("readOneEvent", "DQMHttpSource")
00251 << "Unable to lookup the DQMStore service!\n";
00252 }
00253
00254 edm::StreamDQMDeserializer deserializeWorker;
00255 std::auto_ptr<DQMEvent::TObjectTable> toTablePtr =
00256 deserializeWorker.deserializeDQMEvent(dqmEventView);
00257
00258 unsigned int count = 0;
00259 DQMEvent::TObjectTable::const_iterator toIter;
00260 for (toIter = toTablePtr->begin();
00261 toIter != toTablePtr->end(); toIter++) {
00262 std::string subFolderName = toIter->first;
00263 std::vector<TObject *> toList = toIter->second;
00264 bei_->makeDirectory(subFolderName);
00265 bei_->setCurrentFolder(subFolderName);
00266 for (int tdx = 0; tdx < (int) toList.size(); tdx++) {
00267 TObject *toPtr = toList[tdx];
00268 std::string cls = toPtr->IsA()->GetName();
00269 std::string nm = stor::DQMInstance::getSafeMEName(toPtr);
00270 FDEBUG(8) << " TObject class = " << cls << ", name = " << nm << std::endl;
00271 if (bei_->extract(toPtr, bei_->pwd(), true))
00272 {
00273 std::string path;
00274 if (MonitorElement *me = bei_->findObject(subFolderName, nm, path))
00275 me->update();
00276 ++count;
00277 }
00278 }
00279 }
00280
00281
00282
00283
00284 DQMEvent::TObjectTable::iterator ti(toTablePtr->begin()), te(toTablePtr->end());
00285 for ( ; ti != te; ++ti) {
00286 std::string subFolderName = ti->first;
00287 std::vector<TObject *>::iterator vi(ti->second.begin()), ve(ti->second.end());
00288 for ( ; vi != ve; ++vi) {
00289 std::string histoName = stor::DQMInstance::getSafeMEName(*vi);
00290 std::string fullName = subFolderName + "/" + histoName;
00291 std::vector<std::string>::iterator entryFound;
00292 entryFound = std::find(firstHistoExtractDone_.begin(),
00293 firstHistoExtractDone_.end(),
00294 fullName);
00295
00296
00297
00298 if (entryFound == firstHistoExtractDone_.end()) {
00299 firstHistoExtractDone_.push_back(fullName);
00300 }
00301 else {
00302 delete *vi;
00303 }
00304 }
00305 }
00306 }
00307
00308 EventID eventId(iRun,iEvent);
00309
00310
00311
00312 std::auto_ptr<Event> e = makeEvent(iRun,iLumi,iEvent,timeStamp);
00313
00314 return e;
00315 }
00316
00317 void DQMHttpSource::registerWithDQMEventServer()
00318 {
00319 stor::ReadData data;
00320 uint32 registrationStatus;
00321 bool alreadySaidWaiting = false;
00322 do {
00323 data.d_.clear();
00324 CURL* han = curl_easy_init();
00325 if(han==0)
00326 {
00327 cerr << "could not create handle" << endl;
00328 throw cms::Exception("registerWithDQMEventServer","DQMHttpSource")
00329 << "Unable to create curl handle\n";
00330 }
00331
00332
00333 stor::setopt(han,CURLOPT_URL,DQMsubscriptionurl_);
00334 stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00335 stor::setopt(han,CURLOPT_WRITEDATA,&data);
00336
00337
00338 const int BUFFER_SIZE = 2000;
00339 char msgBuff[BUFFER_SIZE];
00340 ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, DQMconsumerName_,
00341 DQMconsumerPriority_, consumerTopFolderName_);
00342
00343
00344 stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00345 stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00346 struct curl_slist *headers=NULL;
00347 headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00348 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00349 stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00350
00351
00352 CURLcode messageStatus = curl_easy_perform(han);
00353 curl_slist_free_all(headers);
00354 curl_easy_cleanup(han);
00355
00356 if(messageStatus!=0)
00357 {
00358 cerr << "curl perform failed for DQM registration" << endl;
00359 throw cms::Exception("registerWithDQMEventServer","DQMHttpSource")
00360 << "Could not register: probably XDAQ not running or no Storage Manager/SMProxyServer loaded"
00361 << "\n";
00362 }
00363 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00364 if(data.d_.length() > 0)
00365 {
00366 int len = data.d_.length();
00367 FDEBUG(9) << "DQMHttpSource received len = " << len << std::endl;
00368 buf_.resize(len);
00369 for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00370
00371 try {
00372 ConsRegResponseView respView(&buf_[0]);
00373 registrationStatus = respView.getStatus();
00374 DQMconsumerId_ = respView.getConsumerId();
00375 }
00376 catch (cms::Exception excpt) {
00377 const unsigned int MAX_DUMP_LENGTH = 1000;
00378 std::cout << "========================================" << std::endl;
00379 std::cout << "* Exception decoding the registerWithEventServer response!" << std::endl;
00380 if (data.d_.length() <= MAX_DUMP_LENGTH) {
00381 std::cout << "* Here is the raw text that was returned:" << std::endl;
00382 std::cout << data.d_ << std::endl;
00383 }
00384 else {
00385 std::cout << "* Here are the first " << MAX_DUMP_LENGTH <<
00386 " characters of the raw text that was returned:" << std::endl;
00387 std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl;
00388 }
00389 std::cout << "========================================" << std::endl;
00390 throw excpt;
00391 }
00392 }
00393
00394 if (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY)
00395 {
00396 if(!alreadySaidWaiting) {
00397 std::cout << "...waiting for DQM registration response from StorageManager or SMProxyServer..."
00398 << std::endl;
00399 alreadySaidWaiting = true;
00400 }
00401
00402 sleep(headerRetryInterval_);
00403 }
00404 } while (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY &&
00405 !edm::shutdown_flag);
00406
00407 FDEBUG(9) << "Consumer ID = " << DQMconsumerId_ << endl;
00408 }
00409 }