#include <EventFilter/StorageManager/src/EventStreamHttpReader.h>
Public Types | |
typedef std::vector< char > | Buf |
Public Member Functions | |
EventStreamHttpReader (edm::ParameterSet const &pset, edm::InputSourceDescription const &desc) | |
virtual std::auto_ptr < edm::EventPrincipal > | read () |
void | readHeader () |
void | registerWithEventServer () |
virtual | ~EventStreamHttpReader () |
Private Types | |
enum | { DEFAULT_MAX_CONNECT_TRIES = 360, DEFAULT_CONNECT_TRY_SLEEP_TIME = 10 } |
Private Member Functions | |
std::auto_ptr < edm::EventPrincipal > | getOneEvent () |
Private Attributes | |
bool | alreadySaidHalted_ |
Buf | buf_ |
int | connectTrySleepTime_ |
unsigned int | consumerId_ |
std::string | consumerName_ |
std::string | consumerPriority_ |
std::string | consumerPSetString_ |
bool | endRunAlreadyNotified_ |
char | eventurl_ [256] |
int | headerRetryInterval_ |
char | headerurl_ [256] |
int | hltBitCount |
int | l1BitCount |
struct timeval | lastRequestTime_ |
int | maxConnectTries_ |
double | minEventRequestInterval_ |
bool | runEnded_ |
std::string | sourceurl_ |
char | subscriptionurl_ [256] |
Definition at line 20 of file EventStreamHttpReader.h.
typedef std::vector<char> edm::EventStreamHttpReader::Buf |
Definition at line 23 of file EventStreamHttpReader.h.
anonymous enum [private] |
Definition at line 53 of file EventStreamHttpReader.h.
00054 { 00055 DEFAULT_MAX_CONNECT_TRIES = 360, 00056 DEFAULT_CONNECT_TRY_SLEEP_TIME = 10 00057 };
edm::EventStreamHttpReader::EventStreamHttpReader | ( | edm::ParameterSet const & | pset, | |
edm::InputSourceDescription const & | desc | |||
) |
Definition at line 45 of file EventStreamHttpReader.cc.
References connectTrySleepTime_, consumerId_, consumerName_, consumerPriority_, consumerPSetString_, DEFAULT_CONNECT_TRY_SLEEP_TIME, DEFAULT_MAX_CONNECT_TRIES, eventurl_, edm::ParameterSet::getUntrackedParameter(), header, headerRetryInterval_, headerurl_, i, edm::StreamerInputSource::inputFileTransitionsEachEvent_, lastRequestTime_, maxConnectTries_, L1TDQM_GREJuly_cfg::maxEventRequestRate, minEventRequestInterval_, readHeader(), registerWithEventServer(), sourceurl_, subscriptionurl_, and edm::ParameterSet::toString().
00046 : 00047 edm::StreamerInputSource(ps, desc), 00048 sourceurl_(ps.getParameter<string>("sourceURL")), 00049 buf_(1000*1000*7), 00050 endRunAlreadyNotified_(true), 00051 runEnded_(false), 00052 alreadySaidHalted_(false), 00053 maxConnectTries_(DEFAULT_MAX_CONNECT_TRIES), 00054 connectTrySleepTime_(DEFAULT_CONNECT_TRY_SLEEP_TIME) 00055 { 00056 // Retry connection params (wb) 00057 maxConnectTries_ = ps.getUntrackedParameter<int>("maxConnectTries", 00058 DEFAULT_MAX_CONNECT_TRIES); 00059 connectTrySleepTime_ = ps.getUntrackedParameter<int>("connectTrySleepTime", 00060 DEFAULT_CONNECT_TRY_SLEEP_TIME); 00061 inputFileTransitionsEachEvent_ = 00062 ps.getUntrackedParameter<bool>("inputFileTransitionsEachEvent", true); 00063 00064 std::string evturl = sourceurl_ + "/geteventdata"; 00065 int stlen = evturl.length(); 00066 for (int i=0; i<stlen; i++) eventurl_[i]=evturl[i]; 00067 eventurl_[stlen] = '\0'; 00068 00069 std::string header = sourceurl_ + "/getregdata"; 00070 stlen = header.length(); 00071 for (int i=0; i<stlen; i++) headerurl_[i]=header[i]; 00072 headerurl_[stlen] = '\0'; 00073 00074 std::string regurl = sourceurl_ + "/registerConsumer"; 00075 stlen = regurl.length(); 00076 for (int i=0; i<stlen; i++) subscriptionurl_[i]=regurl[i]; 00077 subscriptionurl_[stlen] = '\0'; 00078 00079 // 09-Aug-2006, KAB: new parameters 00080 const double MAX_REQUEST_INTERVAL = 300.0; // seconds 00081 consumerName_ = ps.getUntrackedParameter<string>("consumerName","Unknown"); 00082 consumerPriority_ = ps.getUntrackedParameter<string>("consumerPriority","normal"); 00083 headerRetryInterval_ = ps.getUntrackedParameter<int>("headerRetryInterval",5); 00084 double maxEventRequestRate = ps.getUntrackedParameter<double>("maxEventRequestRate",1.0); 00085 if (maxEventRequestRate < (1.0 / MAX_REQUEST_INTERVAL)) { 00086 minEventRequestInterval_ = MAX_REQUEST_INTERVAL; 00087 } 00088 else { 00089 minEventRequestInterval_ = 1.0 / maxEventRequestRate; // seconds 00090 } 00091 lastRequestTime_.tv_sec = 0; 00092 lastRequestTime_.tv_usec = 0; 00093 00094 // 28-Aug-2006, KAB: save our parameter set in string format to 00095 // be sent to the event server to specify our "request" (that is, which 00096 // events we are interested in). 00097 consumerPSetString_ = ps.toString(); 00098 00099 // 16-Aug-2006, KAB: register this consumer with the event server 00100 consumerId_ = (time(0) & 0xffffff); // temporary - will get from ES later 00101 registerWithEventServer(); 00102 00103 readHeader(); 00104 }
edm::EventStreamHttpReader::~EventStreamHttpReader | ( | ) | [virtual] |
std::auto_ptr< edm::EventPrincipal > edm::EventStreamHttpReader::getOneEvent | ( | ) | [private] |
Definition at line 132 of file EventStreamHttpReader.cc.
References alreadySaidHalted_, buf_, TestMuL1L2Filter_cff::cerr, HeaderView::code(), consumerId_, convert(), GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, edm::StreamerInputSource::deserializeEvent(), Header::DONE, lat::endl(), endRunAlreadyNotified_, Header::EVENT, Header::EVENT_REQUEST, eventurl_, Exception, FDEBUG, stor::func(), i, lastRequestTime_, len, minEventRequestInterval_, OtherMessageBuilder::msgBody(), NULL, runEnded_, edm::StreamerInputSource::setEndRun(), stor::setopt(), edm::shutdown_flag, OtherMessageBuilder::size(), and OtherMessageBuilder::startAddress().
Referenced by read().
00133 { 00134 // repeat a https get every N seconds until we get an event 00135 // wait for Storage Manager event server buffer to not be empty 00136 // only way to stop is specify a maxEvents parameter or cntrol-c. 00137 // If the Storage Manager is killed so the https get fails, we 00138 // end the job as we would be in an unknown state (If SM is up 00139 // and we have a network problem we just try to get another event, 00140 // but if SM is killed/dead we want to register.) 00141 00142 // check if we need to sleep (to enforce the allowed request rate) 00143 struct timeval now; 00144 struct timezone dummyTZ; 00145 gettimeofday(&now, &dummyTZ); 00146 double timeDiff = (double) now.tv_sec; 00147 timeDiff -= (double) lastRequestTime_.tv_sec; 00148 timeDiff += ((double) now.tv_usec / 1000000.0); 00149 timeDiff -= ((double) lastRequestTime_.tv_usec / 1000000.0); 00150 if (timeDiff < minEventRequestInterval_) 00151 { 00152 double sleepTime = minEventRequestInterval_ - timeDiff; 00153 // trim off a little sleep time to account for the time taken by 00154 // calling gettimeofday again 00155 sleepTime -= 0.01; 00156 if (sleepTime < 0.0) {sleepTime = 0.0;} 00157 //cout << "sleeping for " << sleepTime << endl; 00158 usleep(static_cast<int>(1000000 * sleepTime)); 00159 gettimeofday(&lastRequestTime_, &dummyTZ); 00160 } 00161 else 00162 { 00163 lastRequestTime_ = now; 00164 } 00165 00166 stor::ReadData data; 00167 bool alreadySaidWaiting = false; 00168 do { 00169 CURL* han = curl_easy_init(); 00170 00171 if(han==0) 00172 { 00173 cerr << "could not create handle" << endl; 00174 // this will end cmsRun 00175 //return std::auto_ptr<edm::EventPrincipal>(); 00176 throw cms::Exception("getOneEvent","EventStreamHttpReader") 00177 << "Could not get event: problem with curl" 00178 << "\n"; 00179 } 00180 00181 stor::setopt(han,CURLOPT_URL,eventurl_); 00182 stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func); 00183 stor::setopt(han,CURLOPT_WRITEDATA,&data); 00184 00185 // 24-Aug-2006, KAB: send our consumer ID as part of the event request 00186 char msgBuff[100]; 00187 OtherMessageBuilder requestMessage(&msgBuff[0], Header::EVENT_REQUEST, 00188 sizeof(char_uint32)); 00189 uint8 *bodyPtr = requestMessage.msgBody(); 00190 convert(consumerId_, bodyPtr); 00191 stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress()); 00192 stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size()); 00193 struct curl_slist *headers=NULL; 00194 headers = curl_slist_append(headers, "Content-Type: application/octet-stream"); 00195 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary"); 00196 stor::setopt(han, CURLOPT_HTTPHEADER, headers); 00197 00198 // send the HTTP POST, read the reply, and cleanup before going on 00199 CURLcode messageStatus = curl_easy_perform(han); 00200 curl_slist_free_all(headers); 00201 curl_easy_cleanup(han); 00202 00203 if(messageStatus!=0) 00204 { 00205 cerr << "curl perform failed for event, messageStatus = " 00206 << messageStatus << endl; 00207 // this will end cmsRun 00208 //return std::auto_ptr<edm::EventPrincipal>(); 00209 throw cms::Exception("getOneEvent","EventStreamHttpReader") 00210 << "Could not get event: probably XDAQ not running on Storage Manager " 00211 << "\n"; 00212 } 00213 if(data.d_.length() == 0) 00214 { 00215 if(!alreadySaidWaiting) { 00216 std::cout << "...waiting for event from Storage Manager..." << std::endl; 00217 alreadySaidWaiting = true; 00218 } 00219 // sleep for the standard request interval 00220 usleep(static_cast<int>(1000000 * minEventRequestInterval_)); 00221 } 00222 } while (data.d_.length() == 0 && !edm::shutdown_flag); 00223 if (edm::shutdown_flag) { 00224 return std::auto_ptr<edm::EventPrincipal>(); 00225 } 00226 00227 int len = data.d_.length(); 00228 FDEBUG(9) << "EventStreamHttpReader received len = " << len << std::endl; 00229 buf_.resize(len); 00230 for (int i=0; i<len ; i++) buf_[i] = data.d_[i]; 00231 00232 // first check if done message 00233 OtherMessageView msgView(&buf_[0]); 00234 00235 if (msgView.code() == Header::DONE) { 00236 // no need to register again as the SM/EventServer is kept alive on a stopAction 00237 // *BUT* for a haltAction, we need a code to say when SM is halted as then we need 00238 // register again else the consumerId is wrong and we may get wrong events! 00239 // We may even need to end the job if a new run has new triggers, etc. 00240 if(!alreadySaidHalted_) { 00241 alreadySaidHalted_ = true; 00242 std::cout << "Storage Manager has stopped - waiting for restart" << std::endl; 00243 std::cout << "Warning! If you are waiting forever at: " 00244 << "...waiting for event from Storage Manager... " << std::endl 00245 << " it may be that the Storage Manager has been halted with a haltAction," << std::endl 00246 << " instead of a stopAction. In this case you should control-c to end " << std::endl 00247 << " this consumer and restart it. (This will be fixed in a future update)" << std::endl; 00248 } 00249 // decide if we need to notify that a run has ended 00250 if(!endRunAlreadyNotified_) { 00251 endRunAlreadyNotified_ = true; 00252 setEndRun(); 00253 runEnded_ = true; 00254 } 00255 return std::auto_ptr<edm::EventPrincipal>(); 00256 } else { 00257 // reset need-to-set-end-run flag when we get the first event (here any event) 00258 endRunAlreadyNotified_ = false; 00259 alreadySaidHalted_ = false; 00260 00261 // 29-Jan-2008, KAB: catch (and re-throw) any exceptions decoding 00262 // the event data so that we can display the returned HTML and 00263 // (hopefully) give the user a hint as to the cause of the problem. 00264 std::auto_ptr<edm::EventPrincipal> evtPtr; 00265 try { 00266 HeaderView hdrView(&buf_[0]); 00267 if (hdrView.code() != Header::EVENT) { 00268 throw cms::Exception("EventStreamHttpReader", "readOneEvent"); 00269 } 00270 EventMsgView eventView(&buf_[0]); 00271 evtPtr = deserializeEvent(eventView); 00272 } 00273 catch (cms::Exception excpt) { 00274 const unsigned int MAX_DUMP_LENGTH = 2000; 00275 std::cout << "========================================" << std::endl; 00276 std::cout << "* Exception decoding the geteventdata response from the storage manager!" << std::endl; 00277 if (data.d_.length() <= MAX_DUMP_LENGTH) { 00278 std::cout << "* Here is the raw text that was returned:" << std::endl; 00279 std::cout << data.d_ << std::endl; 00280 } 00281 else { 00282 std::cout << "* Here are the first " << MAX_DUMP_LENGTH << 00283 " characters of the raw text that was returned:" << std::endl; 00284 std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl; 00285 } 00286 std::cout << "========================================" << std::endl; 00287 throw excpt; 00288 } 00289 return evtPtr; 00290 } 00291 }
std::auto_ptr< edm::EventPrincipal > edm::EventStreamHttpReader::read | ( | ) | [virtual] |
Implements edm::StreamerInputSource.
Definition at line 110 of file EventStreamHttpReader.cc.
References getOneEvent(), NULL, HLT_VtxMuL3::result, runEnded_, and edm::shutdown_flag.
00111 { 00112 // repeat a https get every N seconds until we get an event 00113 // wait for Storage Manager event server buffer to not be empty 00114 // only way to stop is specify a maxEvents parameter 00115 // or kill the Storage Manager so the https get fails. 00116 00117 // try to get an event repeat until we get one, this allows 00118 // re-registration if the SM is halted or stopped 00119 00120 bool gotEvent = false; 00121 std::auto_ptr<EventPrincipal> result(0); 00122 while ((!gotEvent) && (!runEnded_) && (!edm::shutdown_flag)) 00123 { 00124 result = getOneEvent(); 00125 if(result.get() != NULL) gotEvent = true; 00126 } 00127 // need next line so we only return a null pointer once for each end of run 00128 if(runEnded_) runEnded_ = false; 00129 return result; 00130 }
void edm::EventStreamHttpReader::readHeader | ( | ) |
Definition at line 293 of file EventStreamHttpReader.cc.
References TestMuL1L2Filter_cff::cerr, consumerId_, convert(), GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, edm::StreamerInputSource::deserializeAndMergeWithRegistry(), lat::endl(), Exception, FDEBUG, stor::func(), Header::HEADER_REQUEST, headerRetryInterval_, headerurl_, i, Header::INIT, len, OtherMessageBuilder::msgBody(), NULL, p, stor::setopt(), edm::shutdown_flag, OtherMessageBuilder::size(), and OtherMessageBuilder::startAddress().
Referenced by EventStreamHttpReader().
00294 { 00295 // repeat a https get every 5 seconds until we get the registry 00296 // do it like this for pull mode 00297 bool alreadySaidWaiting = false; 00298 stor::ReadData data; 00299 do { 00300 CURL* han = curl_easy_init(); 00301 00302 if(han==0) 00303 { 00304 cerr << "could not create handle" << endl; 00305 //return 0; //or use this? 00306 throw cms::Exception("readHeader","EventStreamHttpReader") 00307 << "Could not get header: probably XDAQ not running on Storage Manager " 00308 << "\n"; 00309 } 00310 00311 stor::setopt(han,CURLOPT_URL,headerurl_); 00312 stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func); 00313 stor::setopt(han,CURLOPT_WRITEDATA,&data); 00314 00315 // 10-Aug-2006, KAB: send our consumer ID as part of the header request 00316 char msgBuff[100]; 00317 OtherMessageBuilder requestMessage(&msgBuff[0], Header::HEADER_REQUEST, 00318 sizeof(char_uint32)); 00319 uint8 *bodyPtr = requestMessage.msgBody(); 00320 convert(consumerId_, bodyPtr); 00321 stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress()); 00322 stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size()); 00323 struct curl_slist *headers=NULL; 00324 headers = curl_slist_append(headers, "Content-Type: application/octet-stream"); 00325 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary"); 00326 stor::setopt(han, CURLOPT_HTTPHEADER, headers); 00327 00328 // send the HTTP POST, read the reply, and cleanup before going on 00329 CURLcode messageStatus = curl_easy_perform(han); 00330 curl_slist_free_all(headers); 00331 curl_easy_cleanup(han); 00332 00333 if(messageStatus!=0) 00334 { 00335 cerr << "curl perform failed for header" << endl; 00336 // do not retry curl here as we should return to registration instead if we 00337 // want an automatic recovery 00338 throw cms::Exception("readHeader","EventStreamHttpReader") 00339 << "Could not get header: probably XDAQ not running on Storage Manager " 00340 << "\n"; 00341 } 00342 if(data.d_.length() == 0) 00343 { 00344 if(!alreadySaidWaiting) { 00345 std::cout << "...waiting for header from Storage Manager..." << std::endl; 00346 alreadySaidWaiting = true; 00347 } 00348 // sleep for desired amount of time 00349 sleep(headerRetryInterval_); 00350 } 00351 } while (data.d_.length() == 0 && !edm::shutdown_flag); 00352 if (edm::shutdown_flag) { 00353 throw cms::Exception("readHeader","EventStreamHttpReader") 00354 << "The header read was aborted by a shutdown request.\n"; 00355 } 00356 00357 std::vector<char> regdata(1000*1000); 00358 00359 // rely on https transfer string of correct length! 00360 int len = data.d_.length(); 00361 FDEBUG(9) << "EventStreamHttpReader received registry len = " << len << std::endl; 00362 regdata.resize(len); 00363 for (int i=0; i<len ; i++) regdata[i] = data.d_[i]; 00364 // 21-Jun-2006, KAB: catch (and re-throw) any exceptions decoding 00365 // the job header so that we can display the returned HTML and 00366 // (hopefully) give the user a hint as to the cause of the problem. 00367 std::auto_ptr<SendJobHeader> p; 00368 try { 00369 HeaderView hdrView(®data[0]); 00370 if (hdrView.code() != Header::INIT) { 00371 throw cms::Exception("EventStreamHttpReader", "readHeader"); 00372 } 00373 InitMsgView initView(®data[0]); 00374 deserializeAndMergeWithRegistry(initView); 00375 } 00376 catch (cms::Exception excpt) { 00377 const unsigned int MAX_DUMP_LENGTH = 1000; 00378 std::cout << "========================================" << std::endl; 00379 std::cout << "* Exception decoding the getregdata response from the storage manager!" << std::endl; 00380 if (data.d_.length() <= MAX_DUMP_LENGTH) { 00381 std::cout << "* Here is the raw text that was returned:" << std::endl; 00382 std::cout << data.d_ << std::endl; 00383 } 00384 else { 00385 std::cout << "* Here are the first " << MAX_DUMP_LENGTH << 00386 " characters of the raw text that was returned:" << std::endl; 00387 std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl; 00388 } 00389 std::cout << "========================================" << std::endl; 00390 throw excpt; 00391 } 00392 }
void edm::EventStreamHttpReader::registerWithEventServer | ( | ) |
Definition at line 394 of file EventStreamHttpReader.cc.
References buf_, TestMuL1L2Filter_cff::cerr, connectTrySleepTime_, consumerId_, consumerName_, consumerPriority_, consumerPSetString_, GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, lat::endl(), ConsRegResponseBuilder::ES_NOT_READY, Exception, FDEBUG, stor::func(), headerRetryInterval_, i, len, maxConnectTries_, NULL, stor::setopt(), edm::shutdown_flag, ConsRegRequestBuilder::size(), ConsRegRequestBuilder::startAddress(), and subscriptionurl_.
Referenced by EventStreamHttpReader().
00395 { 00396 stor::ReadData data; 00397 uint32 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY; 00398 bool alreadySaidWaiting = false; 00399 do { 00400 data.d_.clear(); 00401 CURL* han = curl_easy_init(); 00402 if(han==0) 00403 { 00404 cerr << "could not create handle" << endl; 00405 throw cms::Exception("registerWithEventServer","EventStreamHttpReader") 00406 << "Unable to create curl handle\n"; 00407 } 00408 00409 // set the standard https request options 00410 stor::setopt(han,CURLOPT_URL,subscriptionurl_); 00411 stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func); 00412 stor::setopt(han,CURLOPT_WRITEDATA,&data); 00413 00414 // build the registration request message to send to the storage manager 00415 const int BUFFER_SIZE = 2000; 00416 char msgBuff[BUFFER_SIZE]; 00417 ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, consumerName_, 00418 consumerPriority_, consumerPSetString_); 00419 00420 // add the request message as a https post 00421 stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress()); 00422 stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size()); 00423 struct curl_slist *headers=NULL; 00424 headers = curl_slist_append(headers, "Content-Type: application/octet-stream"); 00425 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary"); 00426 stor::setopt(han, CURLOPT_HTTPHEADER, headers); 00427 00428 // send the HTTP POST, read the reply, and cleanup before going on 00429 //CURLcode messageStatus = (CURLcode)-1; 00430 // set messageStatus to a non-zero (but still within CURLcode enum list) 00431 CURLcode messageStatus = CURLE_COULDNT_CONNECT; 00432 int tries = 0; 00433 while (messageStatus!=0 && !edm::shutdown_flag) 00434 { 00435 tries++; 00436 messageStatus = curl_easy_perform(han); 00437 if ( messageStatus != 0 ) 00438 { 00439 if ( tries >= maxConnectTries_ ) 00440 { 00441 std::cerr << "Giving up waiting for connection after " << tries 00442 << " tries" << std::endl; 00443 curl_slist_free_all(headers); 00444 curl_easy_cleanup(han); 00445 cerr << "curl perform failed for registration" << endl; 00446 throw cms::Exception("registerWithEventServer","EventStreamHttpReader") 00447 << "Could not register: probably XDAQ not running on Storage Manager " 00448 << "\n"; 00449 } 00450 else 00451 { 00452 std::cout << "Waiting for connection to StorageManager... " 00453 << tries << "/" << maxConnectTries_ 00454 << std::endl; 00455 sleep(connectTrySleepTime_); 00456 } 00457 } 00458 } 00459 if (edm::shutdown_flag) { 00460 continue; 00461 } 00462 00463 curl_slist_free_all(headers); 00464 curl_easy_cleanup(han); 00465 00466 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY; 00467 if(data.d_.length() > 0) 00468 { 00469 int len = data.d_.length(); 00470 FDEBUG(9) << "EventStreamHttpReader received len = " << len << std::endl; 00471 buf_.resize(len); 00472 for (int i=0; i<len ; i++) buf_[i] = data.d_[i]; 00473 00474 try { 00475 ConsRegResponseView respView(&buf_[0]); 00476 registrationStatus = respView.getStatus(); 00477 consumerId_ = respView.getConsumerId(); 00478 } 00479 catch (cms::Exception excpt) { 00480 const unsigned int MAX_DUMP_LENGTH = 1000; 00481 std::cout << "========================================" << std::endl; 00482 std::cout << "* Exception decoding the registerWithEventServer response!" << std::endl; 00483 if (data.d_.length() <= MAX_DUMP_LENGTH) { 00484 std::cout << "* Here is the raw text that was returned:" << std::endl; 00485 std::cout << data.d_ << std::endl; 00486 } 00487 else { 00488 std::cout << "* Here are the first " << MAX_DUMP_LENGTH << 00489 " characters of the raw text that was returned:" << std::endl; 00490 std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl; 00491 } 00492 std::cout << "========================================" << std::endl; 00493 throw excpt; 00494 } 00495 } 00496 00497 if (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY) 00498 { 00499 if(!alreadySaidWaiting) { 00500 std::cout << "...waiting for registration response from Storage Manager..." << std::endl; 00501 alreadySaidWaiting = true; 00502 } 00503 // sleep for desired amount of time 00504 sleep(headerRetryInterval_); 00505 } 00506 } while (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY && 00507 !edm::shutdown_flag); 00508 if (edm::shutdown_flag) { 00509 throw cms::Exception("registerWithEventServer","EventStreamHttpReader") 00510 << "Registration was aborted by a shutdown request.\n"; 00511 } 00512 00513 FDEBUG(5) << "Consumer ID = " << consumerId_ << endl; 00514 }
Buf edm::EventStreamHttpReader::buf_ [private] |
Definition at line 40 of file EventStreamHttpReader.h.
Referenced by getOneEvent(), and registerWithEventServer().
Definition at line 59 of file EventStreamHttpReader.h.
Referenced by EventStreamHttpReader(), and registerWithEventServer().
unsigned int edm::EventStreamHttpReader::consumerId_ [private] |
Definition at line 48 of file EventStreamHttpReader.h.
Referenced by EventStreamHttpReader(), getOneEvent(), readHeader(), and registerWithEventServer().
std::string edm::EventStreamHttpReader::consumerName_ [private] |
Definition at line 43 of file EventStreamHttpReader.h.
Referenced by EventStreamHttpReader(), and registerWithEventServer().
std::string edm::EventStreamHttpReader::consumerPriority_ [private] |
Definition at line 44 of file EventStreamHttpReader.h.
Referenced by EventStreamHttpReader(), and registerWithEventServer().
std::string edm::EventStreamHttpReader::consumerPSetString_ [private] |
Definition at line 45 of file EventStreamHttpReader.h.
Referenced by EventStreamHttpReader(), and registerWithEventServer().
char edm::EventStreamHttpReader::eventurl_[256] [private] |
Definition at line 37 of file EventStreamHttpReader.h.
Referenced by EventStreamHttpReader(), and getOneEvent().
Definition at line 46 of file EventStreamHttpReader.h.
Referenced by EventStreamHttpReader(), readHeader(), and registerWithEventServer().
char edm::EventStreamHttpReader::headerurl_[256] [private] |
Definition at line 38 of file EventStreamHttpReader.h.
Referenced by EventStreamHttpReader(), and readHeader().
int edm::EventStreamHttpReader::hltBitCount [private] |
Definition at line 41 of file EventStreamHttpReader.h.
int edm::EventStreamHttpReader::l1BitCount [private] |
Definition at line 42 of file EventStreamHttpReader.h.
struct timeval edm::EventStreamHttpReader::lastRequestTime_ [read, private] |
Definition at line 49 of file EventStreamHttpReader.h.
Referenced by EventStreamHttpReader(), and getOneEvent().
Definition at line 58 of file EventStreamHttpReader.h.
Referenced by EventStreamHttpReader(), and registerWithEventServer().
double edm::EventStreamHttpReader::minEventRequestInterval_ [private] |
Definition at line 47 of file EventStreamHttpReader.h.
Referenced by EventStreamHttpReader(), and getOneEvent().
bool edm::EventStreamHttpReader::runEnded_ [private] |
std::string edm::EventStreamHttpReader::sourceurl_ [private] |
char edm::EventStreamHttpReader::subscriptionurl_[256] [private] |
Definition at line 39 of file EventStreamHttpReader.h.
Referenced by EventStreamHttpReader(), and registerWithEventServer().