CMS 3D CMS Logo

edm::EventStreamHttpReader Class Reference

#include <EventFilter/StorageManager/src/EventStreamHttpReader.h>

Inheritance diagram for edm::EventStreamHttpReader:

edm::StreamerInputSource edm::InputSource edm::ProductRegistryHelper

List of all members.

Public Types

typedef std::vector< char > Buf

Public Member Functions

 EventStreamHttpReader (edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
virtual std::auto_ptr
< edm::EventPrincipal
read ()
void readHeader ()
void registerWithEventServer ()
virtual ~EventStreamHttpReader ()

Private Types

enum  { DEFAULT_MAX_CONNECT_TRIES = 360, DEFAULT_CONNECT_TRY_SLEEP_TIME = 10 }

Private Member Functions

std::auto_ptr
< edm::EventPrincipal
getOneEvent ()

Private Attributes

bool alreadySaidHalted_
Buf buf_
int connectTrySleepTime_
unsigned int consumerId_
std::string consumerName_
std::string consumerPriority_
std::string consumerPSetString_
bool endRunAlreadyNotified_
char eventurl_ [256]
int headerRetryInterval_
char headerurl_ [256]
int hltBitCount
int l1BitCount
struct timeval lastRequestTime_
int maxConnectTries_
double minEventRequestInterval_
bool runEnded_
std::string sourceurl_
char subscriptionurl_ [256]


Detailed Description

Definition at line 20 of file EventStreamHttpReader.h.


Member Typedef Documentation

typedef std::vector<char> edm::EventStreamHttpReader::Buf

Definition at line 23 of file EventStreamHttpReader.h.


Member Enumeration Documentation

anonymous enum [private]

Enumerator:
DEFAULT_MAX_CONNECT_TRIES 
DEFAULT_CONNECT_TRY_SLEEP_TIME 

Definition at line 53 of file EventStreamHttpReader.h.

00054     {
00055       DEFAULT_MAX_CONNECT_TRIES = 360,
00056       DEFAULT_CONNECT_TRY_SLEEP_TIME = 10
00057     };


Constructor & Destructor Documentation

edm::EventStreamHttpReader::EventStreamHttpReader ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)

Definition at line 45 of file EventStreamHttpReader.cc.

References connectTrySleepTime_, consumerId_, consumerName_, consumerPriority_, consumerPSetString_, DEFAULT_CONNECT_TRY_SLEEP_TIME, DEFAULT_MAX_CONNECT_TRIES, eventurl_, edm::ParameterSet::getUntrackedParameter(), header, headerRetryInterval_, headerurl_, i, edm::StreamerInputSource::inputFileTransitionsEachEvent_, lastRequestTime_, maxConnectTries_, L1TDQM_GREJuly_cfg::maxEventRequestRate, minEventRequestInterval_, readHeader(), registerWithEventServer(), sourceurl_, subscriptionurl_, and edm::ParameterSet::toString().

00046                                                                                      :
00047     edm::StreamerInputSource(ps, desc),
00048     sourceurl_(ps.getParameter<string>("sourceURL")),
00049     buf_(1000*1000*7), 
00050     endRunAlreadyNotified_(true),
00051     runEnded_(false),
00052     alreadySaidHalted_(false),
00053     maxConnectTries_(DEFAULT_MAX_CONNECT_TRIES),
00054     connectTrySleepTime_(DEFAULT_CONNECT_TRY_SLEEP_TIME)
00055   {
00056     // Retry connection params (wb)
00057     maxConnectTries_ = ps.getUntrackedParameter<int>("maxConnectTries",
00058                                                DEFAULT_MAX_CONNECT_TRIES);
00059     connectTrySleepTime_ = ps.getUntrackedParameter<int>("connectTrySleepTime",
00060                                                DEFAULT_CONNECT_TRY_SLEEP_TIME);
00061     inputFileTransitionsEachEvent_ =
00062       ps.getUntrackedParameter<bool>("inputFileTransitionsEachEvent", true);
00063 
00064     std::string evturl = sourceurl_ + "/geteventdata";
00065     int stlen = evturl.length();
00066     for (int i=0; i<stlen; i++) eventurl_[i]=evturl[i];
00067     eventurl_[stlen] = '\0';
00068 
00069     std::string header = sourceurl_ + "/getregdata";
00070     stlen = header.length();
00071     for (int i=0; i<stlen; i++) headerurl_[i]=header[i];
00072     headerurl_[stlen] = '\0';
00073 
00074     std::string regurl = sourceurl_ + "/registerConsumer";
00075     stlen = regurl.length();
00076     for (int i=0; i<stlen; i++) subscriptionurl_[i]=regurl[i];
00077     subscriptionurl_[stlen] = '\0';
00078 
00079     // 09-Aug-2006, KAB: new parameters
00080     const double MAX_REQUEST_INTERVAL = 300.0;  // seconds
00081     consumerName_ = ps.getUntrackedParameter<string>("consumerName","Unknown");
00082     consumerPriority_ = ps.getUntrackedParameter<string>("consumerPriority","normal");
00083     headerRetryInterval_ = ps.getUntrackedParameter<int>("headerRetryInterval",5);
00084     double maxEventRequestRate = ps.getUntrackedParameter<double>("maxEventRequestRate",1.0);
00085     if (maxEventRequestRate < (1.0 / MAX_REQUEST_INTERVAL)) {
00086       minEventRequestInterval_ = MAX_REQUEST_INTERVAL;
00087     }
00088     else {
00089       minEventRequestInterval_ = 1.0 / maxEventRequestRate;  // seconds
00090     }
00091     lastRequestTime_.tv_sec = 0;
00092     lastRequestTime_.tv_usec = 0;
00093 
00094     // 28-Aug-2006, KAB: save our parameter set in string format to
00095     // be sent to the event server to specify our "request" (that is, which
00096     // events we are interested in).
00097     consumerPSetString_ = ps.toString();
00098 
00099     // 16-Aug-2006, KAB: register this consumer with the event server
00100     consumerId_ = (time(0) & 0xffffff);  // temporary - will get from ES later
00101     registerWithEventServer();
00102 
00103     readHeader();
00104   }

edm::EventStreamHttpReader::~EventStreamHttpReader (  )  [virtual]

Definition at line 106 of file EventStreamHttpReader.cc.

00107   {
00108   }


Member Function Documentation

std::auto_ptr< edm::EventPrincipal > edm::EventStreamHttpReader::getOneEvent (  )  [private]

Definition at line 132 of file EventStreamHttpReader.cc.

References alreadySaidHalted_, buf_, TestMuL1L2Filter_cff::cerr, HeaderView::code(), consumerId_, convert(), GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, edm::StreamerInputSource::deserializeEvent(), Header::DONE, lat::endl(), endRunAlreadyNotified_, Header::EVENT, Header::EVENT_REQUEST, eventurl_, Exception, FDEBUG, stor::func(), i, lastRequestTime_, len, minEventRequestInterval_, OtherMessageBuilder::msgBody(), NULL, runEnded_, edm::StreamerInputSource::setEndRun(), stor::setopt(), edm::shutdown_flag, OtherMessageBuilder::size(), and OtherMessageBuilder::startAddress().

Referenced by read().

00133   {
00134     // repeat a https get every N seconds until we get an event
00135     // wait for Storage Manager event server buffer to not be empty
00136     // only way to stop is specify a maxEvents parameter or cntrol-c.
00137     // If the Storage Manager is killed so the https get fails, we
00138     // end the job as we would be in an unknown state (If SM is up
00139     // and we have a network problem we just try to get another event,
00140     // but if SM is killed/dead we want to register.)
00141 
00142     // check if we need to sleep (to enforce the allowed request rate)
00143     struct timeval now;
00144     struct timezone dummyTZ;
00145     gettimeofday(&now, &dummyTZ);
00146     double timeDiff = (double) now.tv_sec;
00147     timeDiff -= (double) lastRequestTime_.tv_sec;
00148     timeDiff += ((double) now.tv_usec / 1000000.0);
00149     timeDiff -= ((double) lastRequestTime_.tv_usec / 1000000.0);
00150     if (timeDiff < minEventRequestInterval_)
00151     {
00152       double sleepTime = minEventRequestInterval_ - timeDiff;
00153       // trim off a little sleep time to account for the time taken by
00154       // calling gettimeofday again
00155       sleepTime -= 0.01;
00156       if (sleepTime < 0.0) {sleepTime = 0.0;}
00157       //cout << "sleeping for " << sleepTime << endl;
00158       usleep(static_cast<int>(1000000 * sleepTime));
00159       gettimeofday(&lastRequestTime_, &dummyTZ);
00160     }
00161     else
00162     {
00163       lastRequestTime_ = now;
00164     }
00165 
00166     stor::ReadData data;
00167     bool alreadySaidWaiting = false;
00168     do {
00169       CURL* han = curl_easy_init();
00170 
00171       if(han==0)
00172       {
00173         cerr << "could not create handle" << endl;
00174         // this will end cmsRun 
00175         //return std::auto_ptr<edm::EventPrincipal>();
00176         throw cms::Exception("getOneEvent","EventStreamHttpReader")
00177             << "Could not get event: problem with curl"
00178             << "\n";
00179       }
00180 
00181       stor::setopt(han,CURLOPT_URL,eventurl_);
00182       stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00183       stor::setopt(han,CURLOPT_WRITEDATA,&data);
00184 
00185       // 24-Aug-2006, KAB: send our consumer ID as part of the event request
00186       char msgBuff[100];
00187       OtherMessageBuilder requestMessage(&msgBuff[0], Header::EVENT_REQUEST,
00188                                          sizeof(char_uint32));
00189       uint8 *bodyPtr = requestMessage.msgBody();
00190       convert(consumerId_, bodyPtr);
00191       stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00192       stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00193       struct curl_slist *headers=NULL;
00194       headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00195       headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00196       stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00197 
00198       // send the HTTP POST, read the reply, and cleanup before going on
00199       CURLcode messageStatus = curl_easy_perform(han);
00200       curl_slist_free_all(headers);
00201       curl_easy_cleanup(han);
00202 
00203       if(messageStatus!=0)
00204       {
00205         cerr << "curl perform failed for event, messageStatus = "
00206              << messageStatus << endl;
00207         // this will end cmsRun 
00208         //return std::auto_ptr<edm::EventPrincipal>();
00209         throw cms::Exception("getOneEvent","EventStreamHttpReader")
00210             << "Could not get event: probably XDAQ not running on Storage Manager "
00211             << "\n";
00212       }
00213       if(data.d_.length() == 0)
00214       {
00215         if(!alreadySaidWaiting) {
00216           std::cout << "...waiting for event from Storage Manager..." << std::endl;
00217           alreadySaidWaiting = true;
00218         }
00219         // sleep for the standard request interval
00220         usleep(static_cast<int>(1000000 * minEventRequestInterval_));
00221       }
00222     } while (data.d_.length() == 0 && !edm::shutdown_flag);
00223     if (edm::shutdown_flag) {
00224         return std::auto_ptr<edm::EventPrincipal>();
00225     }
00226 
00227     int len = data.d_.length();
00228     FDEBUG(9) << "EventStreamHttpReader received len = " << len << std::endl;
00229     buf_.resize(len);
00230     for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00231 
00232     // first check if done message
00233     OtherMessageView msgView(&buf_[0]);
00234 
00235     if (msgView.code() == Header::DONE) {
00236       // no need to register again as the SM/EventServer is kept alive on a stopAction
00237       // *BUT* for a haltAction, we need a code to say when SM is halted as then we need 
00238       // register again else the consumerId is wrong and we may get wrong events!
00239       // We may even need to end the job if a new run has new triggers, etc.
00240       if(!alreadySaidHalted_) {
00241         alreadySaidHalted_ = true;
00242         std::cout << "Storage Manager has stopped - waiting for restart" << std::endl;
00243         std::cout << "Warning! If you are waiting forever at: "
00244                   << "...waiting for event from Storage Manager... " << std::endl
00245                   << "   it may be that the Storage Manager has been halted with a haltAction," << std::endl
00246                   << "   instead of a stopAction. In this case you should control-c to end " << std::endl
00247                   << "   this consumer and restart it. (This will be fixed in a future update)" << std::endl;
00248       }
00249       // decide if we need to notify that a run has ended
00250       if(!endRunAlreadyNotified_) {
00251         endRunAlreadyNotified_ = true;
00252         setEndRun();
00253         runEnded_ = true;
00254       }
00255       return std::auto_ptr<edm::EventPrincipal>();
00256     } else {
00257       // reset need-to-set-end-run flag when we get the first event (here any event)
00258       endRunAlreadyNotified_ = false;
00259       alreadySaidHalted_ = false;
00260 
00261       // 29-Jan-2008, KAB:  catch (and re-throw) any exceptions decoding
00262       // the event data so that we can display the returned HTML and
00263       // (hopefully) give the user a hint as to the cause of the problem.
00264       std::auto_ptr<edm::EventPrincipal> evtPtr;
00265       try {
00266         HeaderView hdrView(&buf_[0]);
00267         if (hdrView.code() != Header::EVENT) {
00268           throw cms::Exception("EventStreamHttpReader", "readOneEvent");
00269         }
00270         EventMsgView eventView(&buf_[0]);
00271         evtPtr = deserializeEvent(eventView);
00272       }
00273       catch (cms::Exception excpt) {
00274         const unsigned int MAX_DUMP_LENGTH = 2000;
00275         std::cout << "========================================" << std::endl;
00276         std::cout << "* Exception decoding the geteventdata response from the storage manager!" << std::endl;
00277         if (data.d_.length() <= MAX_DUMP_LENGTH) {
00278           std::cout << "* Here is the raw text that was returned:" << std::endl;
00279           std::cout << data.d_ << std::endl;
00280         }
00281         else {
00282           std::cout << "* Here are the first " << MAX_DUMP_LENGTH <<
00283             " characters of the raw text that was returned:" << std::endl;
00284           std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl;
00285         }
00286         std::cout << "========================================" << std::endl;
00287         throw excpt;
00288       }
00289       return evtPtr;
00290     }
00291   }

std::auto_ptr< edm::EventPrincipal > edm::EventStreamHttpReader::read (  )  [virtual]

Implements edm::StreamerInputSource.

Definition at line 110 of file EventStreamHttpReader.cc.

References getOneEvent(), NULL, HLT_VtxMuL3::result, runEnded_, and edm::shutdown_flag.

00111   {
00112     // repeat a https get every N seconds until we get an event
00113     // wait for Storage Manager event server buffer to not be empty
00114     // only way to stop is specify a maxEvents parameter
00115     // or kill the Storage Manager so the https get fails.
00116 
00117     // try to get an event repeat until we get one, this allows
00118     // re-registration if the SM is halted or stopped
00119 
00120     bool gotEvent = false;
00121     std::auto_ptr<EventPrincipal> result(0);
00122     while ((!gotEvent) && (!runEnded_) && (!edm::shutdown_flag))
00123     {
00124        result = getOneEvent();
00125        if(result.get() != NULL) gotEvent = true;
00126     }
00127     // need next line so we only return a null pointer once for each end of run
00128     if(runEnded_) runEnded_ = false;
00129     return result;
00130   }

void edm::EventStreamHttpReader::readHeader (  ) 

Definition at line 293 of file EventStreamHttpReader.cc.

References TestMuL1L2Filter_cff::cerr, consumerId_, convert(), GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, edm::StreamerInputSource::deserializeAndMergeWithRegistry(), lat::endl(), Exception, FDEBUG, stor::func(), Header::HEADER_REQUEST, headerRetryInterval_, headerurl_, i, Header::INIT, len, OtherMessageBuilder::msgBody(), NULL, p, stor::setopt(), edm::shutdown_flag, OtherMessageBuilder::size(), and OtherMessageBuilder::startAddress().

Referenced by EventStreamHttpReader().

00294   {
00295     // repeat a https get every 5 seconds until we get the registry
00296     // do it like this for pull mode
00297     bool alreadySaidWaiting = false;
00298     stor::ReadData data;
00299     do {
00300       CURL* han = curl_easy_init();
00301 
00302       if(han==0)
00303         {
00304           cerr << "could not create handle" << endl;
00305           //return 0; //or use this?
00306           throw cms::Exception("readHeader","EventStreamHttpReader")
00307             << "Could not get header: probably XDAQ not running on Storage Manager "
00308             << "\n";
00309         }
00310 
00311       stor::setopt(han,CURLOPT_URL,headerurl_);
00312       stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00313       stor::setopt(han,CURLOPT_WRITEDATA,&data);
00314 
00315       // 10-Aug-2006, KAB: send our consumer ID as part of the header request
00316       char msgBuff[100];
00317       OtherMessageBuilder requestMessage(&msgBuff[0], Header::HEADER_REQUEST,
00318                                          sizeof(char_uint32));
00319       uint8 *bodyPtr = requestMessage.msgBody();
00320       convert(consumerId_, bodyPtr);
00321       stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00322       stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00323       struct curl_slist *headers=NULL;
00324       headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00325       headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00326       stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00327 
00328       // send the HTTP POST, read the reply, and cleanup before going on
00329       CURLcode messageStatus = curl_easy_perform(han);
00330       curl_slist_free_all(headers);
00331       curl_easy_cleanup(han);
00332 
00333       if(messageStatus!=0)
00334       {
00335         cerr << "curl perform failed for header" << endl;
00336         // do not retry curl here as we should return to registration instead if we
00337         // want an automatic recovery
00338         throw cms::Exception("readHeader","EventStreamHttpReader")
00339           << "Could not get header: probably XDAQ not running on Storage Manager "
00340           << "\n";
00341       }
00342       if(data.d_.length() == 0)
00343       {
00344         if(!alreadySaidWaiting) {
00345           std::cout << "...waiting for header from Storage Manager..." << std::endl;
00346           alreadySaidWaiting = true;
00347         }
00348         // sleep for desired amount of time
00349         sleep(headerRetryInterval_);
00350       }
00351     } while (data.d_.length() == 0 && !edm::shutdown_flag);
00352     if (edm::shutdown_flag) {
00353       throw cms::Exception("readHeader","EventStreamHttpReader")
00354           << "The header read was aborted by a shutdown request.\n";
00355     }
00356 
00357     std::vector<char> regdata(1000*1000);
00358 
00359     // rely on https transfer string of correct length!
00360     int len = data.d_.length();
00361     FDEBUG(9) << "EventStreamHttpReader received registry len = " << len << std::endl;
00362     regdata.resize(len);
00363     for (int i=0; i<len ; i++) regdata[i] = data.d_[i];
00364     // 21-Jun-2006, KAB:  catch (and re-throw) any exceptions decoding
00365     // the job header so that we can display the returned HTML and
00366     // (hopefully) give the user a hint as to the cause of the problem.
00367     std::auto_ptr<SendJobHeader> p;
00368     try {
00369       HeaderView hdrView(&regdata[0]);
00370       if (hdrView.code() != Header::INIT) {
00371         throw cms::Exception("EventStreamHttpReader", "readHeader");
00372       }
00373       InitMsgView initView(&regdata[0]);
00374       deserializeAndMergeWithRegistry(initView);
00375     }
00376     catch (cms::Exception excpt) {
00377       const unsigned int MAX_DUMP_LENGTH = 1000;
00378       std::cout << "========================================" << std::endl;
00379       std::cout << "* Exception decoding the getregdata response from the storage manager!" << std::endl;
00380       if (data.d_.length() <= MAX_DUMP_LENGTH) {
00381         std::cout << "* Here is the raw text that was returned:" << std::endl;
00382         std::cout << data.d_ << std::endl;
00383       }
00384       else {
00385         std::cout << "* Here are the first " << MAX_DUMP_LENGTH <<
00386           " characters of the raw text that was returned:" << std::endl;
00387         std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl;
00388       }
00389       std::cout << "========================================" << std::endl;
00390       throw excpt;
00391     }
00392   }

void edm::EventStreamHttpReader::registerWithEventServer (  ) 

Definition at line 394 of file EventStreamHttpReader.cc.

References buf_, TestMuL1L2Filter_cff::cerr, connectTrySleepTime_, consumerId_, consumerName_, consumerPriority_, consumerPSetString_, GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, lat::endl(), ConsRegResponseBuilder::ES_NOT_READY, Exception, FDEBUG, stor::func(), headerRetryInterval_, i, len, maxConnectTries_, NULL, stor::setopt(), edm::shutdown_flag, ConsRegRequestBuilder::size(), ConsRegRequestBuilder::startAddress(), and subscriptionurl_.

Referenced by EventStreamHttpReader().

00395   {
00396     stor::ReadData data;
00397     uint32 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00398     bool alreadySaidWaiting = false;
00399     do {
00400       data.d_.clear();
00401       CURL* han = curl_easy_init();
00402       if(han==0)
00403         {
00404           cerr << "could not create handle" << endl;
00405           throw cms::Exception("registerWithEventServer","EventStreamHttpReader")
00406             << "Unable to create curl handle\n";
00407         }
00408 
00409       // set the standard https request options
00410       stor::setopt(han,CURLOPT_URL,subscriptionurl_);
00411       stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00412       stor::setopt(han,CURLOPT_WRITEDATA,&data);
00413 
00414       // build the registration request message to send to the storage manager
00415       const int BUFFER_SIZE = 2000;
00416       char msgBuff[BUFFER_SIZE];
00417       ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, consumerName_,
00418                                        consumerPriority_, consumerPSetString_);
00419 
00420       // add the request message as a https post
00421       stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00422       stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00423       struct curl_slist *headers=NULL;
00424       headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00425       headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00426       stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00427 
00428       // send the HTTP POST, read the reply, and cleanup before going on
00429       //CURLcode messageStatus = (CURLcode)-1;
00430       // set messageStatus to a non-zero (but still within CURLcode enum list)
00431       CURLcode messageStatus = CURLE_COULDNT_CONNECT;
00432       int tries = 0;
00433       while (messageStatus!=0 && !edm::shutdown_flag)
00434       {
00435         tries++;
00436         messageStatus = curl_easy_perform(han);
00437         if ( messageStatus != 0 )
00438         {
00439           if ( tries >= maxConnectTries_ )
00440           {
00441             std::cerr << "Giving up waiting for connection after " << tries 
00442                       << " tries"  << std::endl;
00443             curl_slist_free_all(headers);
00444             curl_easy_cleanup(han);
00445             cerr << "curl perform failed for registration" << endl;
00446             throw cms::Exception("registerWithEventServer","EventStreamHttpReader")
00447               << "Could not register: probably XDAQ not running on Storage Manager "
00448               << "\n";
00449           }
00450           else
00451           {
00452             std::cout << "Waiting for connection to StorageManager... " 
00453                       << tries << "/" << maxConnectTries_
00454                       << std::endl;
00455             sleep(connectTrySleepTime_);
00456           }
00457         }
00458       }
00459       if (edm::shutdown_flag) {
00460           continue;
00461       }
00462 
00463       curl_slist_free_all(headers);
00464       curl_easy_cleanup(han);
00465 
00466       registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00467       if(data.d_.length() > 0)
00468       {
00469         int len = data.d_.length();
00470         FDEBUG(9) << "EventStreamHttpReader received len = " << len << std::endl;
00471         buf_.resize(len);
00472         for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00473 
00474         try {
00475           ConsRegResponseView respView(&buf_[0]);
00476           registrationStatus = respView.getStatus();
00477           consumerId_ = respView.getConsumerId();
00478         }
00479         catch (cms::Exception excpt) {
00480           const unsigned int MAX_DUMP_LENGTH = 1000;
00481           std::cout << "========================================" << std::endl;
00482           std::cout << "* Exception decoding the registerWithEventServer response!" << std::endl;
00483           if (data.d_.length() <= MAX_DUMP_LENGTH) {
00484             std::cout << "* Here is the raw text that was returned:" << std::endl;
00485             std::cout << data.d_ << std::endl;
00486           }
00487           else {
00488             std::cout << "* Here are the first " << MAX_DUMP_LENGTH <<
00489               " characters of the raw text that was returned:" << std::endl;
00490             std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl;
00491           }
00492           std::cout << "========================================" << std::endl;
00493           throw excpt;
00494         }
00495       }
00496 
00497       if (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY)
00498       {
00499         if(!alreadySaidWaiting) {
00500           std::cout << "...waiting for registration response from Storage Manager..." << std::endl;
00501           alreadySaidWaiting = true;
00502         }
00503         // sleep for desired amount of time
00504         sleep(headerRetryInterval_);
00505       }
00506     } while (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY &&
00507              !edm::shutdown_flag);
00508     if (edm::shutdown_flag) {
00509       throw cms::Exception("registerWithEventServer","EventStreamHttpReader")
00510           << "Registration was aborted by a shutdown request.\n";
00511     }
00512 
00513     FDEBUG(5) << "Consumer ID = " << consumerId_ << endl;
00514   }


Member Data Documentation

bool edm::EventStreamHttpReader::alreadySaidHalted_ [private]

Definition at line 52 of file EventStreamHttpReader.h.

Referenced by getOneEvent().

Buf edm::EventStreamHttpReader::buf_ [private]

Definition at line 40 of file EventStreamHttpReader.h.

Referenced by getOneEvent(), and registerWithEventServer().

int edm::EventStreamHttpReader::connectTrySleepTime_ [private]

Definition at line 59 of file EventStreamHttpReader.h.

Referenced by EventStreamHttpReader(), and registerWithEventServer().

unsigned int edm::EventStreamHttpReader::consumerId_ [private]

Definition at line 48 of file EventStreamHttpReader.h.

Referenced by EventStreamHttpReader(), getOneEvent(), readHeader(), and registerWithEventServer().

std::string edm::EventStreamHttpReader::consumerName_ [private]

Definition at line 43 of file EventStreamHttpReader.h.

Referenced by EventStreamHttpReader(), and registerWithEventServer().

std::string edm::EventStreamHttpReader::consumerPriority_ [private]

Definition at line 44 of file EventStreamHttpReader.h.

Referenced by EventStreamHttpReader(), and registerWithEventServer().

std::string edm::EventStreamHttpReader::consumerPSetString_ [private]

Definition at line 45 of file EventStreamHttpReader.h.

Referenced by EventStreamHttpReader(), and registerWithEventServer().

bool edm::EventStreamHttpReader::endRunAlreadyNotified_ [private]

Definition at line 50 of file EventStreamHttpReader.h.

Referenced by getOneEvent().

char edm::EventStreamHttpReader::eventurl_[256] [private]

Definition at line 37 of file EventStreamHttpReader.h.

Referenced by EventStreamHttpReader(), and getOneEvent().

int edm::EventStreamHttpReader::headerRetryInterval_ [private]

Definition at line 46 of file EventStreamHttpReader.h.

Referenced by EventStreamHttpReader(), readHeader(), and registerWithEventServer().

char edm::EventStreamHttpReader::headerurl_[256] [private]

Definition at line 38 of file EventStreamHttpReader.h.

Referenced by EventStreamHttpReader(), and readHeader().

int edm::EventStreamHttpReader::hltBitCount [private]

Definition at line 41 of file EventStreamHttpReader.h.

int edm::EventStreamHttpReader::l1BitCount [private]

Definition at line 42 of file EventStreamHttpReader.h.

struct timeval edm::EventStreamHttpReader::lastRequestTime_ [read, private]

Definition at line 49 of file EventStreamHttpReader.h.

Referenced by EventStreamHttpReader(), and getOneEvent().

int edm::EventStreamHttpReader::maxConnectTries_ [private]

Definition at line 58 of file EventStreamHttpReader.h.

Referenced by EventStreamHttpReader(), and registerWithEventServer().

double edm::EventStreamHttpReader::minEventRequestInterval_ [private]

Definition at line 47 of file EventStreamHttpReader.h.

Referenced by EventStreamHttpReader(), and getOneEvent().

bool edm::EventStreamHttpReader::runEnded_ [private]

Definition at line 51 of file EventStreamHttpReader.h.

Referenced by getOneEvent(), and read().

std::string edm::EventStreamHttpReader::sourceurl_ [private]

Definition at line 36 of file EventStreamHttpReader.h.

Referenced by EventStreamHttpReader().

char edm::EventStreamHttpReader::subscriptionurl_[256] [private]

Definition at line 39 of file EventStreamHttpReader.h.

Referenced by EventStreamHttpReader(), and registerWithEventServer().


The documentation for this class was generated from the following files:
Generated on Tue Jun 9 18:41:00 2009 for CMSSW by  doxygen 1.5.4