CMS 3D CMS Logo

DQMHttpSource.cc

Go to the documentation of this file.
00001 
00008 #include "EventFilter/StorageManager/src/DQMHttpSource.h"
00009 #include "EventFilter/StorageManager/interface/SMCurlInterface.h"
00010 #include "EventFilter/StorageManager/interface/DQMInstance.h"
00011 #include "FWCore/Utilities/interface/DebugMacros.h"
00012 #include "FWCore/Framework/interface/Event.h"
00013 #include "FWCore/ServiceRegistry/interface/Service.h"
00014 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00015 
00016 #include "IOPool/Streamer/interface/OtherMessage.h"
00017 #include "IOPool/Streamer/interface/ConsRegMessage.h"
00018 #include "IOPool/Streamer/interface/DQMEventMessage.h"
00019 #include "IOPool/Streamer/interface/StreamDQMDeserializer.h"
00020 
00021 #include "DQMServices/Core/interface/DQMStore.h"
00022 #include "DQMServices/Core/interface/MonitorElement.h"
00023 
00024 #include "TClass.h"
00025 
00026 #include <iostream>
00027 #include <sys/time.h>
00028 #include "curl/curl.h"
00029 #include <wait.h>
00030 
00031 using namespace edm;
00032 using namespace std;
00033 
00034 namespace edm
00035 {  
00036   DQMHttpSource::DQMHttpSource(const ParameterSet& pset, 
00037                                          const InputSourceDescription& desc) :
00038     edm::RawInputSource(pset,desc), 
00039     updatesCounter_(0),
00040     sourceurl_(pset.getUntrackedParameter<string>("sourceURL")),
00041     buf_(1000*1000*7), 
00042     events_read_(0),
00043     consumerTopFolderName_(pset.getUntrackedParameter<string>("topLevelFolderName")),
00044     alreadySaidHalted_(false)
00045   {
00046     std::string evturl = sourceurl_ + "/getDQMeventdata";
00047     int stlen = evturl.length();
00048     for (int i=0; i<stlen; i++) DQMeventurl_[i]=evturl[i];
00049     DQMeventurl_[stlen] = '\0';
00050 
00051     std::string regurl = sourceurl_ + "/registerDQMConsumer";
00052     stlen = regurl.length();
00053     for (int i=0; i<stlen; i++) DQMsubscriptionurl_[i]=regurl[i];
00054     DQMsubscriptionurl_[stlen] = '\0';
00055 
00056     const double MAX_REQUEST_INTERVAL = 300.0;  // seconds
00057     DQMconsumerName_ = pset.getUntrackedParameter<string>("DQMconsumerName","Unknown");
00058     DQMconsumerPriority_ = pset.getUntrackedParameter<string>("DQMconsumerPriority","normal");
00059     headerRetryInterval_ = pset.getUntrackedParameter<int>("headerRetryInterval",5);
00060     double maxEventRequestRate = pset.getUntrackedParameter<double>("maxDQMEventRequestRate",1.0);
00061     if (maxEventRequestRate < (1.0 / MAX_REQUEST_INTERVAL)) {
00062       minDQMEventRequestInterval_ = MAX_REQUEST_INTERVAL;
00063     }
00064     else {
00065       minDQMEventRequestInterval_ = 1.0 / maxEventRequestRate;  // seconds
00066     }
00067     lastDQMRequestTime_.tv_sec = 0;
00068     lastDQMRequestTime_.tv_usec = 0;
00069 
00070     // register this DQM consumer with the DQMevent server of the Storage Manager
00071     DQMconsumerId_ = (time(0) & 0xffffff);  // temporary - will get from ES later
00072     registerWithDQMEventServer();
00073     // when running Async it seems bei_ is not NULL at the start after default ctor
00074     bei_ = NULL;
00075   }
00076 
00077 
00078   std::auto_ptr<Event> DQMHttpSource::readOneEvent()
00079   {
00080     // repeat a https get every X seconds until we get a DQMevent
00081     // only way to stop is specify a maxEvents parameter
00082     // or kill the Storage Manager XDAQ application so the https get fails.
00083 
00084     // try to get an event repeat until we get one, this allows
00085     // re-registration is the SM is halted or stopped
00086 
00087     bool gotEvent = false;
00088     std::auto_ptr<Event> result(0);
00089     while ((!gotEvent) && (!edm::shutdown_flag))
00090     { 
00091        result = getOneDQMEvent();
00092        if(result.get() != NULL) gotEvent = true;
00093     } 
00094     return result;
00095   }
00096 
00097   std::auto_ptr<Event> DQMHttpSource::getOneDQMEvent()
00098   {
00099     // repeat a https get every X seconds until we get a DQMevent
00100     // only way to stop is specify a maxEvents parameter
00101     // or kill the Storage Manager XDAQ application so the https get fails.
00102 
00103     // check if we need to sleep (to enforce the allowed request rate)
00104     struct timeval now;
00105     struct timezone dummyTZ;
00106     gettimeofday(&now, &dummyTZ);
00107     double timeDiff = (double) now.tv_sec;
00108     timeDiff -= (double) lastDQMRequestTime_.tv_sec;
00109     timeDiff += ((double) now.tv_usec / 1000000.0);
00110     timeDiff -= ((double) lastDQMRequestTime_.tv_usec / 1000000.0);
00111     if (timeDiff < minDQMEventRequestInterval_)
00112     {
00113       double sleepTime = minDQMEventRequestInterval_ - timeDiff;
00114       // trim off a little sleep time to account for the time taken by
00115       // calling gettimeofday again
00116       sleepTime -= 0.01;
00117       if (sleepTime < 0.0) {sleepTime = 0.0;}
00118       usleep(static_cast<int>(1000000 * sleepTime));
00119       gettimeofday(&lastDQMRequestTime_, &dummyTZ);
00120     }
00121     else
00122     {
00123       lastDQMRequestTime_ = now;
00124     }
00125 
00126     stor::ReadData data;
00127     bool alreadySaidWaiting = false;
00128     do {
00129       CURL* han = curl_easy_init();
00130 
00131       if(han==0)
00132       {
00133         cerr << "DQMHttpSOurce: could not create handle" << endl;
00134         throw cms::Exception("getOneEvent","DQMHttpSource")
00135             << "Unable to create curl handle\n";
00136         // this will end cmsRun
00137       }
00138 
00139       stor::setopt(han,CURLOPT_URL,DQMeventurl_);
00140       stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00141       stor::setopt(han,CURLOPT_WRITEDATA,&data);
00142 
00143       // send our consumer ID as part of the event request
00144       char msgBuff[100];
00145       OtherMessageBuilder requestMessage(&msgBuff[0], Header::DQMEVENT_REQUEST,
00146                                          sizeof(char_uint32));
00147       uint8 *bodyPtr = requestMessage.msgBody();
00148       char_uint32 convertedId;
00149       convert(DQMconsumerId_, convertedId);
00150       for (unsigned int idx = 0; idx < sizeof(char_uint32); idx++) {
00151         bodyPtr[idx] = convertedId[idx];
00152       }
00153       stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00154       stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00155       struct curl_slist *headers=NULL;
00156       headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00157       headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00158       stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00159 
00160       // send the HTTP POST, read the reply, and cleanup before going on
00161       CURLcode messageStatus = curl_easy_perform(han);
00162       curl_slist_free_all(headers);
00163       curl_easy_cleanup(han);
00164 
00165       if(messageStatus!=0)
00166       {
00167         cerr << "curl perform failed for DQMevent, messageStatus = "
00168              << messageStatus << endl;
00169         throw cms::Exception("getOneDQMEvent","DQMHttpSource")
00170             << "Could not get event: probably XDAQ not running on Storage Manager "
00171             << "\n";
00172         // this will end cmsRun
00173       }
00174       if(data.d_.length() == 0)
00175       {
00176         if(!alreadySaidWaiting) {
00177           std::cout << "...waiting for DQMevent from Storage Manager..." << std::endl;
00178           alreadySaidWaiting = true;
00179         }
00180         // sleep for the standard request interval
00181         usleep(static_cast<int>(1000000 * minDQMEventRequestInterval_));
00182       }
00183     } while (data.d_.length() == 0 && !edm::shutdown_flag);
00184     if (edm::shutdown_flag) {
00185       return std::auto_ptr<edm::Event>();
00186     }
00187 
00188     int len = data.d_.length();
00189     FDEBUG(9) << "DQMHttpSource received len = " << len << std::endl;
00190     buf_.resize(len);
00191     for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00192 
00193     OtherMessageView msgView(&buf_[0]);
00194 
00195     RunNumber_t iRun = 0;
00196     LuminosityBlockNumber_t iLumi = 0;
00197     EventNumber_t iEvent = 0;
00198     TimeValue_t tStamp = 1;
00199     Timestamp timeStamp (tStamp);
00200 
00201     if (msgView.code() == Header::DONE) {
00202       // Continue past run boundaries (SM halt)
00203       // no need to register again as the SM/EventServer is kept alive on a stopAction
00204      if(!alreadySaidHalted_) {
00205        alreadySaidHalted_ = true;
00206        std::cout << "Storage Manager has halted - waiting for restart" << std::endl;
00207      }
00208      return std::auto_ptr<edm::Event>();
00209     } else {
00210       // counting the updates
00211       ++updatesCounter_;
00212       ++events_read_;
00213       DQMEventMsgView dqmEventView(&buf_[0]);
00214       iRun = dqmEventView.runNumber();
00215       iLumi = dqmEventView.lumiSection();
00216       iEvent = dqmEventView.eventNumberAtUpdate();
00217       timeStamp = dqmEventView.timeStamp();
00218 
00219       FDEBUG(8) << "  DQM Message data:" << std::endl;
00220       FDEBUG(8) << "    protocol version = "
00221                 << dqmEventView.protocolVersion() << std::endl;
00222       FDEBUG(8) << "    header size = "
00223                 << dqmEventView.headerSize() << std::endl;
00224       FDEBUG(8) << "    run number = "
00225                 << dqmEventView.runNumber() << std::endl;
00226       FDEBUG(8) << "    event number = "
00227                 << dqmEventView.eventNumberAtUpdate() << std::endl;
00228       FDEBUG(8) << "    lumi section = "
00229                 << dqmEventView.lumiSection() << std::endl;
00230       FDEBUG(8) << "    update number = "
00231                 << dqmEventView.updateNumber() << std::endl;
00232       FDEBUG(8) << "    compression flag = "
00233                 << dqmEventView.compressionFlag() << std::endl;
00234       FDEBUG(8) << "    reserved word = "
00235                 << dqmEventView.reserved() << std::endl;
00236       FDEBUG(8) << "    release tag = "
00237                 << dqmEventView.releaseTag() << std::endl;
00238       FDEBUG(8) << "    top folder name = "
00239                 << dqmEventView.topFolderName() << std::endl;
00240       FDEBUG(8) << "    sub folder count = "
00241                 << dqmEventView.subFolderCount() << std::endl;
00242 
00243       // deserialize and stick into DQM backend
00244       // need both types of interfaces as the extractObject I use is
00245       // only in DQMStore
00246       if (bei_ == NULL) {
00247         bei_ = edm::Service<DQMStore>().operator->();
00248       }
00249       if (bei_ == NULL) {
00250         throw cms::Exception("readOneEvent", "DQMHttpSource")
00251           << "Unable to lookup the DQMStore service!\n";
00252       }
00253 
00254       edm::StreamDQMDeserializer deserializeWorker;
00255       std::auto_ptr<DQMEvent::TObjectTable> toTablePtr =
00256           deserializeWorker.deserializeDQMEvent(dqmEventView);
00257 
00258       unsigned int count = 0;
00259       DQMEvent::TObjectTable::const_iterator toIter;
00260       for (toIter = toTablePtr->begin();
00261            toIter != toTablePtr->end(); toIter++) {
00262         std::string subFolderName = toIter->first;
00263         std::vector<TObject *> toList = toIter->second;
00264         bei_->makeDirectory(subFolderName);  // fetch or create
00265         bei_->setCurrentFolder(subFolderName);
00266         for (int tdx = 0; tdx < (int) toList.size(); tdx++) {
00267           TObject *toPtr = toList[tdx];
00268           std::string cls = toPtr->IsA()->GetName();
00269           std::string nm = stor::DQMInstance::getSafeMEName(toPtr);
00270           FDEBUG(8) << "    TObject class = " << cls << ", name = " << nm << std::endl;
00271           if (bei_->extract(toPtr, bei_->pwd(), true))
00272           {
00273             std::string path;
00274             if (MonitorElement *me = bei_->findObject(subFolderName, nm, path))
00275               me->update();
00276             ++count;
00277           }
00278         }
00279       }
00280 
00281       // clean up memory by spinning through the DQMEvent::TObjectTable map and
00282       // deleting each TObject in the std::vector<TObject *> later we will
00283       // change map to use std::vector< boost::shared_ptr<TObject> >
00284       DQMEvent::TObjectTable::iterator ti(toTablePtr->begin()), te(toTablePtr->end());
00285       for ( ; ti != te; ++ti) {
00286         std::string subFolderName = ti->first;
00287         std::vector<TObject *>::iterator vi(ti->second.begin()), ve(ti->second.end());
00288         for ( ; vi != ve; ++vi) {
00289           std::string histoName = stor::DQMInstance::getSafeMEName(*vi);
00290           std::string fullName = subFolderName + "/" + histoName;
00291           std::vector<std::string>::iterator entryFound;
00292           entryFound = std::find(firstHistoExtractDone_.begin(),
00293                                  firstHistoExtractDone_.end(),
00294                                  fullName);
00295           // 30-May-2008, KAB - skip over deleting the memory of the first
00296           // ME passed to bei_->extract() until we check into having that
00297           // code copy the ME instead of using it directly.
00298           if (entryFound == firstHistoExtractDone_.end()) {
00299             firstHistoExtractDone_.push_back(fullName);
00300           }
00301           else {
00302             delete *vi;
00303           }
00304         }
00305       }
00306     }
00307 
00308     EventID eventId(iRun,iEvent);
00309 
00310     // make a fake event containing no data but the evId and runId from DQMEvent
00311     // and the time stamp from the event at update
00312     std::auto_ptr<Event> e = makeEvent(iRun,iLumi,iEvent,timeStamp);
00313 
00314     return e;
00315   }
00316 
00317   void DQMHttpSource::registerWithDQMEventServer()
00318   {
00319     stor::ReadData data;
00320     uint32 registrationStatus;
00321     bool alreadySaidWaiting = false;
00322     do {
00323       data.d_.clear();
00324       CURL* han = curl_easy_init();
00325       if(han==0)
00326         {
00327           cerr << "could not create handle" << endl;
00328           throw cms::Exception("registerWithDQMEventServer","DQMHttpSource")
00329             << "Unable to create curl handle\n";
00330         }
00331 
00332       // set the standard https request options
00333       stor::setopt(han,CURLOPT_URL,DQMsubscriptionurl_);
00334       stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00335       stor::setopt(han,CURLOPT_WRITEDATA,&data);
00336 
00337       // build the registration request message to send to the storage manager
00338       const int BUFFER_SIZE = 2000;
00339       char msgBuff[BUFFER_SIZE];
00340       ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, DQMconsumerName_,
00341                                        DQMconsumerPriority_, consumerTopFolderName_);
00342 
00343       // add the request message as a https post
00344       stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00345       stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00346       struct curl_slist *headers=NULL;
00347       headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00348       headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00349       stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00350 
00351       // send the HTTP POST, read the reply, and cleanup before going on
00352       CURLcode messageStatus = curl_easy_perform(han);
00353       curl_slist_free_all(headers);
00354       curl_easy_cleanup(han);
00355 
00356       if(messageStatus!=0)
00357       {
00358         cerr << "curl perform failed for DQM registration" << endl;
00359         throw cms::Exception("registerWithDQMEventServer","DQMHttpSource")
00360           << "Could not register: probably XDAQ not running or no Storage Manager/SMProxyServer loaded"
00361           << "\n";
00362       }
00363       registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00364       if(data.d_.length() > 0)
00365       {
00366         int len = data.d_.length();
00367         FDEBUG(9) << "DQMHttpSource received len = " << len << std::endl;
00368         buf_.resize(len);
00369         for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00370 
00371         try {
00372           ConsRegResponseView respView(&buf_[0]);
00373           registrationStatus = respView.getStatus();
00374           DQMconsumerId_ = respView.getConsumerId();
00375         }
00376         catch (cms::Exception excpt) {
00377           const unsigned int MAX_DUMP_LENGTH = 1000;
00378           std::cout << "========================================" << std::endl;
00379           std::cout << "* Exception decoding the registerWithEventServer response!" << std::endl;
00380           if (data.d_.length() <= MAX_DUMP_LENGTH) {
00381             std::cout << "* Here is the raw text that was returned:" << std::endl;
00382             std::cout << data.d_ << std::endl;
00383           }
00384           else {
00385             std::cout << "* Here are the first " << MAX_DUMP_LENGTH <<
00386               " characters of the raw text that was returned:" << std::endl;
00387             std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl;
00388           }
00389           std::cout << "========================================" << std::endl;
00390           throw excpt;
00391         }
00392       }
00393 
00394       if (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY)
00395       {
00396         if(!alreadySaidWaiting) {
00397           std::cout << "...waiting for DQM registration response from StorageManager or SMProxyServer..." 
00398                     << std::endl;
00399           alreadySaidWaiting = true;
00400         }
00401         // sleep for desired amount of time
00402         sleep(headerRetryInterval_);
00403       }
00404     } while (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY &&
00405              !edm::shutdown_flag);
00406 
00407     FDEBUG(9) << "Consumer ID = " << DQMconsumerId_ << endl;
00408   }
00409 }

Generated on Tue Jun 9 17:34:55 2009 for CMSSW by  doxygen 1.5.4