CMS 3D CMS Logo

EventStreamHttpReader.cc

Go to the documentation of this file.
00001 /*
00002   Input source for event consumers that will get events from the
00003   Storage Manager Event Server. This does uses a HTTP get using the
00004   cURL library. The Storage Manager Event Server responses with
00005   a binary octet-stream.  The product registry is also obtained
00006   through a HTTP get.
00007       There is currently no test of the product registry against
00008   the consumer client product registry within the code. It should
00009   already be done if this was inherenting from the standard
00010   framework input source. Currently we inherit from InputSource.
00011 
00012   17 Mar 2006 - HWKC - initial version for testing
00013   30 Mar 2006 - HWKC - first proof of principle version that can
00014                 wait for the the product registry and events from
00015                 the Storage Manager. Only way to stop the cmsRun
00016                 using this input source is to kill the Storage
00017                 Manager or specify a maximum number of events for
00018                 the client to read through a maxEvents parameter.
00019 
00020   $Id: EventStreamHttpReader.cc,v 1.31.22.1 2009/02/26 17:32:49 wdd Exp $
00021 */
00022 
00023 #include "EventFilter/StorageManager/src/EventStreamHttpReader.h"
00024 #include "EventFilter/StorageManager/interface/SMCurlInterface.h"
00025 #include "FWCore/Utilities/interface/DebugMacros.h"
00026 #include "FWCore/Utilities/interface/Exception.h"
00027 #include "IOPool/Streamer/interface/ClassFiller.h"
00028 #include "IOPool/Streamer/interface/OtherMessage.h"
00029 #include "IOPool/Streamer/interface/ConsRegMessage.h"
00030 #include "EventFilter/StorageManager/interface/ConsumerPipe.h"
00031 #include "FWCore/Framework/interface/EventPrincipal.h"
00032 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00033 
00034 #include <algorithm>
00035 #include <iterator>
00036 #include "curl/curl.h"
00037 
00038 #include <wait.h>
00039 
00040 using namespace std;
00041 using namespace edm;
00042 
00043 namespace edm
00044 {  
00045   EventStreamHttpReader::EventStreamHttpReader(edm::ParameterSet const& ps,
00046                                                edm::InputSourceDescription const& desc):
00047     edm::StreamerInputSource(ps, desc),
00048     sourceurl_(ps.getParameter<string>("sourceURL")),
00049     buf_(1000*1000*7), 
00050     endRunAlreadyNotified_(true),
00051     runEnded_(false),
00052     alreadySaidHalted_(false),
00053     maxConnectTries_(DEFAULT_MAX_CONNECT_TRIES),
00054     connectTrySleepTime_(DEFAULT_CONNECT_TRY_SLEEP_TIME)
00055   {
00056     // Retry connection params (wb)
00057     maxConnectTries_ = ps.getUntrackedParameter<int>("maxConnectTries",
00058                                                DEFAULT_MAX_CONNECT_TRIES);
00059     connectTrySleepTime_ = ps.getUntrackedParameter<int>("connectTrySleepTime",
00060                                                DEFAULT_CONNECT_TRY_SLEEP_TIME);
00061     inputFileTransitionsEachEvent_ =
00062       ps.getUntrackedParameter<bool>("inputFileTransitionsEachEvent", true);
00063 
00064     std::string evturl = sourceurl_ + "/geteventdata";
00065     int stlen = evturl.length();
00066     for (int i=0; i<stlen; i++) eventurl_[i]=evturl[i];
00067     eventurl_[stlen] = '\0';
00068 
00069     std::string header = sourceurl_ + "/getregdata";
00070     stlen = header.length();
00071     for (int i=0; i<stlen; i++) headerurl_[i]=header[i];
00072     headerurl_[stlen] = '\0';
00073 
00074     std::string regurl = sourceurl_ + "/registerConsumer";
00075     stlen = regurl.length();
00076     for (int i=0; i<stlen; i++) subscriptionurl_[i]=regurl[i];
00077     subscriptionurl_[stlen] = '\0';
00078 
00079     // 09-Aug-2006, KAB: new parameters
00080     const double MAX_REQUEST_INTERVAL = 300.0;  // seconds
00081     consumerName_ = ps.getUntrackedParameter<string>("consumerName","Unknown");
00082     consumerPriority_ = ps.getUntrackedParameter<string>("consumerPriority","normal");
00083     headerRetryInterval_ = ps.getUntrackedParameter<int>("headerRetryInterval",5);
00084     double maxEventRequestRate = ps.getUntrackedParameter<double>("maxEventRequestRate",1.0);
00085     if (maxEventRequestRate < (1.0 / MAX_REQUEST_INTERVAL)) {
00086       minEventRequestInterval_ = MAX_REQUEST_INTERVAL;
00087     }
00088     else {
00089       minEventRequestInterval_ = 1.0 / maxEventRequestRate;  // seconds
00090     }
00091     lastRequestTime_.tv_sec = 0;
00092     lastRequestTime_.tv_usec = 0;
00093 
00094     // 28-Aug-2006, KAB: save our parameter set in string format to
00095     // be sent to the event server to specify our "request" (that is, which
00096     // events we are interested in).
00097     consumerPSetString_ = ps.toString();
00098 
00099     // 16-Aug-2006, KAB: register this consumer with the event server
00100     consumerId_ = (time(0) & 0xffffff);  // temporary - will get from ES later
00101     registerWithEventServer();
00102 
00103     readHeader();
00104   }
00105 
00106   EventStreamHttpReader::~EventStreamHttpReader()
00107   {
00108   }
00109 
00110   std::auto_ptr<edm::EventPrincipal> EventStreamHttpReader::read()
00111   {
00112     // repeat a https get every N seconds until we get an event
00113     // wait for Storage Manager event server buffer to not be empty
00114     // only way to stop is specify a maxEvents parameter
00115     // or kill the Storage Manager so the https get fails.
00116 
00117     // try to get an event repeat until we get one, this allows
00118     // re-registration if the SM is halted or stopped
00119 
00120     bool gotEvent = false;
00121     std::auto_ptr<EventPrincipal> result(0);
00122     while ((!gotEvent) && (!runEnded_) && (!edm::shutdown_flag))
00123     {
00124        result = getOneEvent();
00125        if(result.get() != NULL) gotEvent = true;
00126     }
00127     // need next line so we only return a null pointer once for each end of run
00128     if(runEnded_) runEnded_ = false;
00129     return result;
00130   }
00131 
00132   std::auto_ptr<edm::EventPrincipal> EventStreamHttpReader::getOneEvent()
00133   {
00134     // repeat a https get every N seconds until we get an event
00135     // wait for Storage Manager event server buffer to not be empty
00136     // only way to stop is specify a maxEvents parameter or cntrol-c.
00137     // If the Storage Manager is killed so the https get fails, we
00138     // end the job as we would be in an unknown state (If SM is up
00139     // and we have a network problem we just try to get another event,
00140     // but if SM is killed/dead we want to register.)
00141 
00142     // check if we need to sleep (to enforce the allowed request rate)
00143     struct timeval now;
00144     struct timezone dummyTZ;
00145     gettimeofday(&now, &dummyTZ);
00146     double timeDiff = (double) now.tv_sec;
00147     timeDiff -= (double) lastRequestTime_.tv_sec;
00148     timeDiff += ((double) now.tv_usec / 1000000.0);
00149     timeDiff -= ((double) lastRequestTime_.tv_usec / 1000000.0);
00150     if (timeDiff < minEventRequestInterval_)
00151     {
00152       double sleepTime = minEventRequestInterval_ - timeDiff;
00153       // trim off a little sleep time to account for the time taken by
00154       // calling gettimeofday again
00155       sleepTime -= 0.01;
00156       if (sleepTime < 0.0) {sleepTime = 0.0;}
00157       //cout << "sleeping for " << sleepTime << endl;
00158       usleep(static_cast<int>(1000000 * sleepTime));
00159       gettimeofday(&lastRequestTime_, &dummyTZ);
00160     }
00161     else
00162     {
00163       lastRequestTime_ = now;
00164     }
00165 
00166     stor::ReadData data;
00167     bool alreadySaidWaiting = false;
00168     do {
00169       CURL* han = curl_easy_init();
00170 
00171       if(han==0)
00172       {
00173         cerr << "could not create handle" << endl;
00174         // this will end cmsRun 
00175         //return std::auto_ptr<edm::EventPrincipal>();
00176         throw cms::Exception("getOneEvent","EventStreamHttpReader")
00177             << "Could not get event: problem with curl"
00178             << "\n";
00179       }
00180 
00181       stor::setopt(han,CURLOPT_URL,eventurl_);
00182       stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00183       stor::setopt(han,CURLOPT_WRITEDATA,&data);
00184 
00185       // 24-Aug-2006, KAB: send our consumer ID as part of the event request
00186       char msgBuff[100];
00187       OtherMessageBuilder requestMessage(&msgBuff[0], Header::EVENT_REQUEST,
00188                                          sizeof(char_uint32));
00189       uint8 *bodyPtr = requestMessage.msgBody();
00190       convert(consumerId_, bodyPtr);
00191       stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00192       stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00193       struct curl_slist *headers=NULL;
00194       headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00195       headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00196       stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00197 
00198       // send the HTTP POST, read the reply, and cleanup before going on
00199       CURLcode messageStatus = curl_easy_perform(han);
00200       curl_slist_free_all(headers);
00201       curl_easy_cleanup(han);
00202 
00203       if(messageStatus!=0)
00204       {
00205         cerr << "curl perform failed for event, messageStatus = "
00206              << messageStatus << endl;
00207         // this will end cmsRun 
00208         //return std::auto_ptr<edm::EventPrincipal>();
00209         throw cms::Exception("getOneEvent","EventStreamHttpReader")
00210             << "Could not get event: probably XDAQ not running on Storage Manager "
00211             << "\n";
00212       }
00213       if(data.d_.length() == 0)
00214       {
00215         if(!alreadySaidWaiting) {
00216           std::cout << "...waiting for event from Storage Manager..." << std::endl;
00217           alreadySaidWaiting = true;
00218         }
00219         // sleep for the standard request interval
00220         usleep(static_cast<int>(1000000 * minEventRequestInterval_));
00221       }
00222     } while (data.d_.length() == 0 && !edm::shutdown_flag);
00223     if (edm::shutdown_flag) {
00224         return std::auto_ptr<edm::EventPrincipal>();
00225     }
00226 
00227     int len = data.d_.length();
00228     FDEBUG(9) << "EventStreamHttpReader received len = " << len << std::endl;
00229     buf_.resize(len);
00230     for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00231 
00232     // first check if done message
00233     OtherMessageView msgView(&buf_[0]);
00234 
00235     if (msgView.code() == Header::DONE) {
00236       // no need to register again as the SM/EventServer is kept alive on a stopAction
00237       // *BUT* for a haltAction, we need a code to say when SM is halted as then we need 
00238       // register again else the consumerId is wrong and we may get wrong events!
00239       // We may even need to end the job if a new run has new triggers, etc.
00240       if(!alreadySaidHalted_) {
00241         alreadySaidHalted_ = true;
00242         std::cout << "Storage Manager has stopped - waiting for restart" << std::endl;
00243         std::cout << "Warning! If you are waiting forever at: "
00244                   << "...waiting for event from Storage Manager... " << std::endl
00245                   << "   it may be that the Storage Manager has been halted with a haltAction," << std::endl
00246                   << "   instead of a stopAction. In this case you should control-c to end " << std::endl
00247                   << "   this consumer and restart it. (This will be fixed in a future update)" << std::endl;
00248       }
00249       // decide if we need to notify that a run has ended
00250       if(!endRunAlreadyNotified_) {
00251         endRunAlreadyNotified_ = true;
00252         setEndRun();
00253         runEnded_ = true;
00254       }
00255       return std::auto_ptr<edm::EventPrincipal>();
00256     } else {
00257       // reset need-to-set-end-run flag when we get the first event (here any event)
00258       endRunAlreadyNotified_ = false;
00259       alreadySaidHalted_ = false;
00260 
00261       // 29-Jan-2008, KAB:  catch (and re-throw) any exceptions decoding
00262       // the event data so that we can display the returned HTML and
00263       // (hopefully) give the user a hint as to the cause of the problem.
00264       std::auto_ptr<edm::EventPrincipal> evtPtr;
00265       try {
00266         HeaderView hdrView(&buf_[0]);
00267         if (hdrView.code() != Header::EVENT) {
00268           throw cms::Exception("EventStreamHttpReader", "readOneEvent");
00269         }
00270         EventMsgView eventView(&buf_[0]);
00271         evtPtr = deserializeEvent(eventView);
00272       }
00273       catch (cms::Exception excpt) {
00274         const unsigned int MAX_DUMP_LENGTH = 2000;
00275         std::cout << "========================================" << std::endl;
00276         std::cout << "* Exception decoding the geteventdata response from the storage manager!" << std::endl;
00277         if (data.d_.length() <= MAX_DUMP_LENGTH) {
00278           std::cout << "* Here is the raw text that was returned:" << std::endl;
00279           std::cout << data.d_ << std::endl;
00280         }
00281         else {
00282           std::cout << "* Here are the first " << MAX_DUMP_LENGTH <<
00283             " characters of the raw text that was returned:" << std::endl;
00284           std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl;
00285         }
00286         std::cout << "========================================" << std::endl;
00287         throw excpt;
00288       }
00289       return evtPtr;
00290     }
00291   }
00292 
00293   void EventStreamHttpReader::readHeader()
00294   {
00295     // repeat a https get every 5 seconds until we get the registry
00296     // do it like this for pull mode
00297     bool alreadySaidWaiting = false;
00298     stor::ReadData data;
00299     do {
00300       CURL* han = curl_easy_init();
00301 
00302       if(han==0)
00303         {
00304           cerr << "could not create handle" << endl;
00305           //return 0; //or use this?
00306           throw cms::Exception("readHeader","EventStreamHttpReader")
00307             << "Could not get header: probably XDAQ not running on Storage Manager "
00308             << "\n";
00309         }
00310 
00311       stor::setopt(han,CURLOPT_URL,headerurl_);
00312       stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00313       stor::setopt(han,CURLOPT_WRITEDATA,&data);
00314 
00315       // 10-Aug-2006, KAB: send our consumer ID as part of the header request
00316       char msgBuff[100];
00317       OtherMessageBuilder requestMessage(&msgBuff[0], Header::HEADER_REQUEST,
00318                                          sizeof(char_uint32));
00319       uint8 *bodyPtr = requestMessage.msgBody();
00320       convert(consumerId_, bodyPtr);
00321       stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00322       stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00323       struct curl_slist *headers=NULL;
00324       headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00325       headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00326       stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00327 
00328       // send the HTTP POST, read the reply, and cleanup before going on
00329       CURLcode messageStatus = curl_easy_perform(han);
00330       curl_slist_free_all(headers);
00331       curl_easy_cleanup(han);
00332 
00333       if(messageStatus!=0)
00334       {
00335         cerr << "curl perform failed for header" << endl;
00336         // do not retry curl here as we should return to registration instead if we
00337         // want an automatic recovery
00338         throw cms::Exception("readHeader","EventStreamHttpReader")
00339           << "Could not get header: probably XDAQ not running on Storage Manager "
00340           << "\n";
00341       }
00342       if(data.d_.length() == 0)
00343       {
00344         if(!alreadySaidWaiting) {
00345           std::cout << "...waiting for header from Storage Manager..." << std::endl;
00346           alreadySaidWaiting = true;
00347         }
00348         // sleep for desired amount of time
00349         sleep(headerRetryInterval_);
00350       }
00351     } while (data.d_.length() == 0 && !edm::shutdown_flag);
00352     if (edm::shutdown_flag) {
00353       throw cms::Exception("readHeader","EventStreamHttpReader")
00354           << "The header read was aborted by a shutdown request.\n";
00355     }
00356 
00357     std::vector<char> regdata(1000*1000);
00358 
00359     // rely on https transfer string of correct length!
00360     int len = data.d_.length();
00361     FDEBUG(9) << "EventStreamHttpReader received registry len = " << len << std::endl;
00362     regdata.resize(len);
00363     for (int i=0; i<len ; i++) regdata[i] = data.d_[i];
00364     // 21-Jun-2006, KAB:  catch (and re-throw) any exceptions decoding
00365     // the job header so that we can display the returned HTML and
00366     // (hopefully) give the user a hint as to the cause of the problem.
00367     std::auto_ptr<SendJobHeader> p;
00368     try {
00369       HeaderView hdrView(&regdata[0]);
00370       if (hdrView.code() != Header::INIT) {
00371         throw cms::Exception("EventStreamHttpReader", "readHeader");
00372       }
00373       InitMsgView initView(&regdata[0]);
00374       deserializeAndMergeWithRegistry(initView);
00375     }
00376     catch (cms::Exception excpt) {
00377       const unsigned int MAX_DUMP_LENGTH = 1000;
00378       std::cout << "========================================" << std::endl;
00379       std::cout << "* Exception decoding the getregdata response from the storage manager!" << std::endl;
00380       if (data.d_.length() <= MAX_DUMP_LENGTH) {
00381         std::cout << "* Here is the raw text that was returned:" << std::endl;
00382         std::cout << data.d_ << std::endl;
00383       }
00384       else {
00385         std::cout << "* Here are the first " << MAX_DUMP_LENGTH <<
00386           " characters of the raw text that was returned:" << std::endl;
00387         std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl;
00388       }
00389       std::cout << "========================================" << std::endl;
00390       throw excpt;
00391     }
00392   }
00393 
00394   void EventStreamHttpReader::registerWithEventServer()
00395   {
00396     stor::ReadData data;
00397     uint32 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00398     bool alreadySaidWaiting = false;
00399     do {
00400       data.d_.clear();
00401       CURL* han = curl_easy_init();
00402       if(han==0)
00403         {
00404           cerr << "could not create handle" << endl;
00405           throw cms::Exception("registerWithEventServer","EventStreamHttpReader")
00406             << "Unable to create curl handle\n";
00407         }
00408 
00409       // set the standard https request options
00410       stor::setopt(han,CURLOPT_URL,subscriptionurl_);
00411       stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00412       stor::setopt(han,CURLOPT_WRITEDATA,&data);
00413 
00414       // build the registration request message to send to the storage manager
00415       const int BUFFER_SIZE = 2000;
00416       char msgBuff[BUFFER_SIZE];
00417       ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, consumerName_,
00418                                        consumerPriority_, consumerPSetString_);
00419 
00420       // add the request message as a https post
00421       stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00422       stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00423       struct curl_slist *headers=NULL;
00424       headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00425       headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00426       stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00427 
00428       // send the HTTP POST, read the reply, and cleanup before going on
00429       //CURLcode messageStatus = (CURLcode)-1;
00430       // set messageStatus to a non-zero (but still within CURLcode enum list)
00431       CURLcode messageStatus = CURLE_COULDNT_CONNECT;
00432       int tries = 0;
00433       while (messageStatus!=0 && !edm::shutdown_flag)
00434       {
00435         tries++;
00436         messageStatus = curl_easy_perform(han);
00437         if ( messageStatus != 0 )
00438         {
00439           if ( tries >= maxConnectTries_ )
00440           {
00441             std::cerr << "Giving up waiting for connection after " << tries 
00442                       << " tries"  << std::endl;
00443             curl_slist_free_all(headers);
00444             curl_easy_cleanup(han);
00445             cerr << "curl perform failed for registration" << endl;
00446             throw cms::Exception("registerWithEventServer","EventStreamHttpReader")
00447               << "Could not register: probably XDAQ not running on Storage Manager "
00448               << "\n";
00449           }
00450           else
00451           {
00452             std::cout << "Waiting for connection to StorageManager... " 
00453                       << tries << "/" << maxConnectTries_
00454                       << std::endl;
00455             sleep(connectTrySleepTime_);
00456           }
00457         }
00458       }
00459       if (edm::shutdown_flag) {
00460           continue;
00461       }
00462 
00463       curl_slist_free_all(headers);
00464       curl_easy_cleanup(han);
00465 
00466       registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00467       if(data.d_.length() > 0)
00468       {
00469         int len = data.d_.length();
00470         FDEBUG(9) << "EventStreamHttpReader received len = " << len << std::endl;
00471         buf_.resize(len);
00472         for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00473 
00474         try {
00475           ConsRegResponseView respView(&buf_[0]);
00476           registrationStatus = respView.getStatus();
00477           consumerId_ = respView.getConsumerId();
00478         }
00479         catch (cms::Exception excpt) {
00480           const unsigned int MAX_DUMP_LENGTH = 1000;
00481           std::cout << "========================================" << std::endl;
00482           std::cout << "* Exception decoding the registerWithEventServer response!" << std::endl;
00483           if (data.d_.length() <= MAX_DUMP_LENGTH) {
00484             std::cout << "* Here is the raw text that was returned:" << std::endl;
00485             std::cout << data.d_ << std::endl;
00486           }
00487           else {
00488             std::cout << "* Here are the first " << MAX_DUMP_LENGTH <<
00489               " characters of the raw text that was returned:" << std::endl;
00490             std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl;
00491           }
00492           std::cout << "========================================" << std::endl;
00493           throw excpt;
00494         }
00495       }
00496 
00497       if (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY)
00498       {
00499         if(!alreadySaidWaiting) {
00500           std::cout << "...waiting for registration response from Storage Manager..." << std::endl;
00501           alreadySaidWaiting = true;
00502         }
00503         // sleep for desired amount of time
00504         sleep(headerRetryInterval_);
00505       }
00506     } while (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY &&
00507              !edm::shutdown_flag);
00508     if (edm::shutdown_flag) {
00509       throw cms::Exception("registerWithEventServer","EventStreamHttpReader")
00510           << "Registration was aborted by a shutdown request.\n";
00511     }
00512 
00513     FDEBUG(5) << "Consumer ID = " << consumerId_ << endl;
00514   }
00515 }

Generated on Tue Jun 9 17:34:56 2009 for CMSSW by  doxygen 1.5.4