#include <EventServerProxy.h>
Retrieve events from the Storage Manager event server.
This does uses a HTTP get using the CURL library. The Storage Manager event server responses with a binary octet-stream. The init message is also obtained through a HTTP get.
Definition at line 36 of file EventServerProxy.h.
stor::EventServerProxy< RegInfo >::EventServerProxy | ( | edm::ParameterSet const & | ) |
virtual stor::EventServerProxy< RegInfo >::~EventServerProxy | ( | ) | [inline, virtual] |
Definition at line 42 of file EventServerProxy.h.
{};
void stor::EventServerProxy< RegInfo >::checkEvent | ( | CurlInterface::Content & | ) | [private] |
void stor::EventServerProxy< RegInfo >::checkInitMsg | ( | CurlInterface::Content & | ) | [private] |
void stor::EventServerProxy< EventConsumerRegistrationInfo >::checkInitMsg | ( | CurlInterface::Content & | data | ) | [inline, private] |
Definition at line 144 of file EventServerProxy.h.
References HeaderView::code(), hcal_timing_source_file_cfg::dump, Exception, and Header::INIT.
{ try { HeaderView hdrView(&data[0]); if (hdrView.code() != Header::INIT) { throw cms::Exception("EventServerProxy", "readHeader"); } } catch (cms::Exception excpt) { const unsigned int MAX_DUMP_LENGTH = 1000; std::ostringstream dump; dump << "========================================" << std::endl; dump << "* Exception decoding the getregdata response from the event server!" << std::endl; if (data.size() <= MAX_DUMP_LENGTH) { dump << "* Here is the raw text that was returned:" << std::endl; dump << std::string(&data[0]) << std::endl; } else { dump << "* Here are the first " << MAX_DUMP_LENGTH << " characters of the raw text that was returned:" << std::endl; dump << std::string(&data[0], MAX_DUMP_LENGTH) << std::endl; } dump << "========================================" << std::endl; edm::LogError("EventServerProxy") << dump.str(); throw excpt; } }
void stor::EventServerProxy< RegInfo >::connectToEventServer | ( | CurlInterface::Content & | ) | [private] |
bool stor::EventServerProxy< RegInfo >::extractConsumerId | ( | CurlInterface::Content & | ) | [private] |
bool stor::EventServerProxy< RegInfo >::getEventMaybe | ( | CurlInterface::Content & | data | ) |
Try to get one event from the event server. If succesful, returns true.
void stor::EventServerProxy< RegInfo >::getInitMsg | ( | CurlInterface::Content & | data | ) |
Get the init message from the the event server.
Referenced by edm::EventStreamHttpReader::readHeader().
void stor::EventServerProxy< EventConsumerRegistrationInfo >::getInitMsg | ( | CurlInterface::Content & | data | ) | [inline] |
Definition at line 177 of file EventServerProxy.h.
References Exception, and edm::shutdown_flag.
{ do { data.clear(); getInitMsgFromEventServer(data); } while ( !edm::shutdown_flag && data.empty() ); if (edm::shutdown_flag) { throw cms::Exception("readHeader","EventServerProxy") << "The header read was aborted by a shutdown request.\n"; } checkInitMsg(data); }
void stor::EventServerProxy< RegInfo >::getInitMsgFromEventServer | ( | CurlInterface::Content & | ) | [private] |
void stor::EventServerProxy< EventConsumerRegistrationInfo >::getInitMsgFromEventServer | ( | CurlInterface::Content & | data | ) | [inline, private] |
Definition at line 95 of file EventServerProxy.h.
References lhef::cc::convert(), AlCaHLTBitMon_QueryRunRegistry::data, stor::CurlInterface::getInterface(), Header::HEADER_REQUEST, OtherMessageBuilder::msgBody(), query::result, OtherMessageBuilder::size(), stor::utils::sleep(), and OtherMessageBuilder::startAddress().
{ // build the header request message to send to the event server char msgBuff[100]; OtherMessageBuilder requestMessage( &msgBuff[0], Header::HEADER_REQUEST, sizeof(char_uint32) ); uint8 *bodyPtr = requestMessage.msgBody(); convert(consumerId_, bodyPtr); // send the header request stor::CurlInterfacePtr curl = stor::CurlInterface::getInterface(); CURLcode result = curl->postBinaryMessage( regInfo_.sourceURL() + "/getregdata", requestMessage.startAddress(), requestMessage.size(), data ); if ( result != CURLE_OK ) { // connection failed: try to reconnect edm::LogError("EventServerProxy") << "curl perform failed for header:" << std::string(&data[0]) << std::endl << ". Trying to reconnect."; data.clear(); registerWithEventServer(); } if( data.empty() ) { if(!alreadySaidWaiting_) { edm::LogInfo("EventServerProxy") << "...waiting for header from event server..."; alreadySaidWaiting_ = true; } // sleep for desired amount of time ::sleep(regInfo_.headerRetryInterval()); } else { alreadySaidWaiting_ = false; } }
void stor::EventServerProxy< RegInfo >::getOneEvent | ( | CurlInterface::Content & | data | ) |
Get one event from the event server.
Referenced by edm::EventStreamHttpReader::read(), and edm::DQMHttpSource::readOneEvent().
void stor::EventServerProxy< RegInfo >::getOneEventFromEventServer | ( | CurlInterface::Content & | ) | [private] |
void stor::EventServerProxy< RegInfo >::reconnect | ( | ) |
Reconnect to the event server
void stor::EventServerProxy< RegInfo >::registerWithEventServer | ( | ) | [private] |
bool stor::EventServerProxy< RegInfo >::alreadySaidHalted_ [private] |
Definition at line 81 of file EventServerProxy.h.
bool stor::EventServerProxy< RegInfo >::alreadySaidWaiting_ [private] |
Definition at line 82 of file EventServerProxy.h.
unsigned int stor::EventServerProxy< RegInfo >::consumerId_ [private] |
Definition at line 77 of file EventServerProxy.h.
unsigned int stor::EventServerProxy< RegInfo >::failedAttemptsToGetData_ [private] |
Definition at line 83 of file EventServerProxy.h.
const stor::utils::Duration_t stor::EventServerProxy< RegInfo >::minEventRequestInterval_ [private] |
Definition at line 79 of file EventServerProxy.h.
stor::utils::TimePoint_t stor::EventServerProxy< RegInfo >::nextRequestTime_ [private] |
Definition at line 78 of file EventServerProxy.h.
const RegInfo stor::EventServerProxy< RegInfo >::regInfo_ [private] |
Definition at line 76 of file EventServerProxy.h.