CMS 3D CMS Logo

edm::OnlineHttpReader Class Reference

#include <EventFilter/StorageManager/src/OnlineHttpReader.h>

Inheritance diagram for edm::OnlineHttpReader:

edm::StreamerInputSource edm::InputSource edm::ProductRegistryHelper

List of all members.

Public Types

typedef std::vector< char > Buf

Public Member Functions

 OnlineHttpReader (edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
virtual std::auto_ptr
< edm::EventPrincipal
read ()
void readHeader ()
void registerWithEventServer ()
virtual ~OnlineHttpReader ()

Private Types

enum  { DEFAULT_MAX_CONNECT_TRIES = 360, DEFAULT_CONNECT_TRY_SLEEP_TIME = 10 }

Private Member Functions

std::auto_ptr
< edm::EventPrincipal
getOneEvent ()
virtual void setRun (RunNumber_t r)

Private Attributes

bool alreadyGotHeader_
bool alreadyRegistered_
bool alreadySaidHalted_
Buf buf_
int connectTrySleepTime_
unsigned int consumerId_
std::string consumerName_
std::string consumerPriority_
std::string consumerPSetString_
bool endRunAlreadyNotified_
char eventurl_ [256]
int headerRetryInterval_
char headerurl_ [256]
int hltBitCount
int l1BitCount
struct timeval lastRequestTime_
int maxConnectTries_
double minEventRequestInterval_
bool runEnded_
std::string sourceurl_
char subscriptionurl_ [256]


Detailed Description

Definition at line 24 of file OnlineHttpReader.h.


Member Typedef Documentation

typedef std::vector<char> edm::OnlineHttpReader::Buf

Definition at line 27 of file OnlineHttpReader.h.


Member Enumeration Documentation

anonymous enum [private]

Enumerator:
DEFAULT_MAX_CONNECT_TRIES 
DEFAULT_CONNECT_TRY_SLEEP_TIME 

Definition at line 61 of file OnlineHttpReader.h.

00062     {
00063       DEFAULT_MAX_CONNECT_TRIES = 360,
00064       DEFAULT_CONNECT_TRY_SLEEP_TIME = 10
00065     };


Constructor & Destructor Documentation

edm::OnlineHttpReader::OnlineHttpReader ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)

Definition at line 45 of file OnlineHttpReader.cc.

References alreadyGotHeader_, alreadyRegistered_, connectTrySleepTime_, consumerId_, consumerName_, consumerPriority_, consumerPSetString_, DEFAULT_CONNECT_TRY_SLEEP_TIME, DEFAULT_MAX_CONNECT_TRIES, eventurl_, edm::ParameterSet::getUntrackedParameter(), header, headerRetryInterval_, headerurl_, i, lastRequestTime_, maxConnectTries_, L1TDQM_GREJuly_cfg::maxEventRequestRate, minEventRequestInterval_, readHeader(), registerWithEventServer(), sourceurl_, subscriptionurl_, and edm::ParameterSet::toString().

00046                                                                                      :
00047     edm::StreamerInputSource(ps, desc),
00048     sourceurl_(ps.getParameter<string>("sourceURL")),
00049     buf_(1000*1000*7), 
00050     endRunAlreadyNotified_(true),
00051     runEnded_(false),
00052     alreadySaidHalted_(false),
00053     maxConnectTries_(DEFAULT_MAX_CONNECT_TRIES),
00054     connectTrySleepTime_(DEFAULT_CONNECT_TRY_SLEEP_TIME)
00055   {
00056     // Retry connection params (wb)
00057     maxConnectTries_ = ps.getUntrackedParameter<int>("maxConnectTries",
00058                                                DEFAULT_MAX_CONNECT_TRIES);
00059     connectTrySleepTime_ = ps.getUntrackedParameter<int>("connectTrySleepTime",
00060                                                DEFAULT_CONNECT_TRY_SLEEP_TIME);
00061 
00062     std::string evturl = sourceurl_ + "/geteventdata";
00063     int stlen = evturl.length();
00064     for (int i=0; i<stlen; i++) eventurl_[i]=evturl[i];
00065     eventurl_[stlen] = '\0';
00066 
00067     std::string header = sourceurl_ + "/getregdata";
00068     stlen = header.length();
00069     for (int i=0; i<stlen; i++) headerurl_[i]=header[i];
00070     headerurl_[stlen] = '\0';
00071 
00072     std::string regurl = sourceurl_ + "/registerConsumer";
00073     stlen = regurl.length();
00074     for (int i=0; i<stlen; i++) subscriptionurl_[i]=regurl[i];
00075     subscriptionurl_[stlen] = '\0';
00076 
00077     // 09-Aug-2006, KAB: new parameters
00078     const double MAX_REQUEST_INTERVAL = 300.0;  // seconds
00079     consumerName_ = ps.getUntrackedParameter<string>("consumerName","Unknown");
00080     consumerPriority_ = ps.getUntrackedParameter<string>("consumerPriority","normal");
00081     headerRetryInterval_ = ps.getUntrackedParameter<int>("headerRetryInterval",5);
00082     double maxEventRequestRate = ps.getUntrackedParameter<double>("maxEventRequestRate",1.0);
00083     if (maxEventRequestRate < (1.0 / MAX_REQUEST_INTERVAL)) {
00084       minEventRequestInterval_ = MAX_REQUEST_INTERVAL;
00085     }
00086     else {
00087       minEventRequestInterval_ = 1.0 / maxEventRequestRate;  // seconds
00088     }
00089     lastRequestTime_.tv_sec = 0;
00090     lastRequestTime_.tv_usec = 0;
00091 
00092     // 28-Aug-2006, KAB: save our parameter set in string format to
00093     // be sent to the event server to specify our "request" (that is, which
00094     // events we are interested in).
00095     consumerPSetString_ = ps.toString();
00096 
00097     // 16-Aug-2006, KAB: register this consumer with the event server
00098     consumerId_ = (time(0) & 0xffffff);  // temporary - will get from ES later
00099     registerWithEventServer();
00100     alreadyRegistered_ = true;
00101 
00102     readHeader();
00103     alreadyGotHeader_ = true;
00104   }

edm::OnlineHttpReader::~OnlineHttpReader (  )  [virtual]

Definition at line 106 of file OnlineHttpReader.cc.

00107   {
00108   }


Member Function Documentation

std::auto_ptr< edm::EventPrincipal > edm::OnlineHttpReader::getOneEvent (  )  [private]

Definition at line 152 of file OnlineHttpReader.cc.

References alreadyRegistered_, alreadySaidHalted_, buf_, TestMuL1L2Filter_cff::cerr, HeaderView::code(), consumerId_, convert(), GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, edm::StreamerInputSource::deserializeEvent(), Header::DONE, lat::endl(), endRunAlreadyNotified_, Header::EVENT, Header::EVENT_REQUEST, eventurl_, Exception, FDEBUG, stor::func(), i, lastRequestTime_, len, minEventRequestInterval_, OtherMessageBuilder::msgBody(), NULL, runEnded_, edm::StreamerInputSource::setEndRun(), stor::setopt(), edm::shutdown_flag, OtherMessageBuilder::size(), and OtherMessageBuilder::startAddress().

Referenced by read().

00153   {
00154     // repeat a https get every N seconds until we get an event
00155     // wait for Storage Manager event server buffer to not be empty
00156     // only way to stop is specify a maxEvents parameter or cntrol-c.
00157     // If the Storage Manager is killed so the https get fails, we
00158     // end the job as we would be in an unknown state (If SM is up
00159     // and we have a network problem we just try to get another event,
00160     // but if SM is killed/dead we want to register.)
00161 
00162     // check if we need to sleep (to enforce the allowed request rate)
00163     struct timeval now;
00164     struct timezone dummyTZ;
00165     gettimeofday(&now, &dummyTZ);
00166     double timeDiff = (double) now.tv_sec;
00167     timeDiff -= (double) lastRequestTime_.tv_sec;
00168     timeDiff += ((double) now.tv_usec / 1000000.0);
00169     timeDiff -= ((double) lastRequestTime_.tv_usec / 1000000.0);
00170     if (timeDiff < minEventRequestInterval_)
00171     {
00172       double sleepTime = minEventRequestInterval_ - timeDiff;
00173       // trim off a little sleep time to account for the time taken by
00174       // calling gettimeofday again
00175       sleepTime -= 0.01;
00176       if (sleepTime < 0.0) {sleepTime = 0.0;}
00177       //cout << "sleeping for " << sleepTime << endl;
00178       usleep(static_cast<int>(1000000 * sleepTime));
00179       gettimeofday(&lastRequestTime_, &dummyTZ);
00180     }
00181     else
00182     {
00183       lastRequestTime_ = now;
00184     }
00185 
00186     stor::ReadData data;
00187     bool alreadySaidWaiting = false;
00188     do {
00189       CURL* han = curl_easy_init();
00190 
00191       if(han==0)
00192       {
00193         cerr << "could not create handle" << endl;
00194         // this will end cmsRun 
00195         //return std::auto_ptr<edm::EventPrincipal>();
00196         throw cms::Exception("getOneEvent","OnlineHttpReader")
00197             << "Could not get event: problem with curl"
00198             << "\n";
00199       }
00200 
00201       stor::setopt(han,CURLOPT_URL,eventurl_);
00202       stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00203       stor::setopt(han,CURLOPT_WRITEDATA,&data);
00204 
00205       // 24-Aug-2006, KAB: send our consumer ID as part of the event request
00206       char msgBuff[100];
00207       OtherMessageBuilder requestMessage(&msgBuff[0], Header::EVENT_REQUEST,
00208                                          sizeof(char_uint32));
00209       uint8 *bodyPtr = requestMessage.msgBody();
00210       convert(consumerId_, bodyPtr);
00211       stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00212       stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00213       struct curl_slist *headers=NULL;
00214       headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00215       headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00216       stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00217 
00218       // send the HTTP POST, read the reply, and cleanup before going on
00219       CURLcode messageStatus = curl_easy_perform(han);
00220       curl_slist_free_all(headers);
00221       curl_easy_cleanup(han);
00222 
00223       if(messageStatus!=0)
00224       {
00225         cerr << "curl perform failed for event, messageStatus = "
00226              << messageStatus << endl;
00227         // this will end cmsRun 
00228         //return std::auto_ptr<edm::EventPrincipal>();
00229         throw cms::Exception("getOneEvent","OnlineHttpReader")
00230             << "Could not get event: probably XDAQ not running on Storage Manager "
00231             << "\n";
00232       }
00233       if(data.d_.length() == 0)
00234       {
00235         if(!alreadySaidWaiting) {
00236           std::cout << "...waiting for event from Storage Manager..." << std::endl;
00237           alreadySaidWaiting = true;
00238         }
00239         // sleep for the standard request interval
00240         usleep(static_cast<int>(1000000 * minEventRequestInterval_));
00241       }
00242     } while (data.d_.length() == 0 && !edm::shutdown_flag);
00243     if (edm::shutdown_flag) {
00244         return std::auto_ptr<edm::EventPrincipal>();
00245     }
00246 
00247     int len = data.d_.length();
00248     FDEBUG(9) << "OnlineHttpReader received len = " << len << std::endl;
00249     buf_.resize(len);
00250     for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00251 
00252     // first check if done message
00253     OtherMessageView msgView(&buf_[0]);
00254 
00255     if (msgView.code() == Header::DONE) {
00256       // no need to register again as the SM/EventServer is kept alive on a stopAction
00257       // *BUT* for a haltAction, we need a code to say when SM is halted as then we need 
00258       // register again else the consumerId is wrong and we may get wrong events!
00259       // We may even need to end the job if a new run has new triggers, etc.
00260       if(!alreadySaidHalted_) {
00261         alreadySaidHalted_ = true;
00262         std::cout << "Storage Manager has stopped - waiting for restart" << std::endl;
00263         std::cout << "Warning! If you are waiting forever at: "
00264                   << "...waiting for event from Storage Manager... " << std::endl
00265                   << "   it may be that the Storage Manager has been halted with a haltAction," << std::endl
00266                   << "   instead of a stopAction. In this case you should control-c to end " << std::endl
00267                   << "   this consumer and restart it. (This will be fixed in a future update)" << std::endl;
00268       }
00269       // decide if we need to notify that a run has ended
00270       if(!endRunAlreadyNotified_) {
00271         endRunAlreadyNotified_ = true;
00272         setEndRun();
00273         runEnded_ = true;
00274         alreadyRegistered_ = false;
00275       }
00276       return std::auto_ptr<edm::EventPrincipal>();
00277     } else {
00278       // reset need-to-set-end-run flag when we get the first event (here any event)
00279       endRunAlreadyNotified_ = false;
00280       alreadySaidHalted_ = false;
00281 
00282       // 29-Jan-2008, KAB:  catch (and re-throw) any exceptions decoding
00283       // the event data so that we can display the returned HTML and
00284       // (hopefully) give the user a hint as to the cause of the problem.
00285       std::auto_ptr<edm::EventPrincipal> evtPtr;
00286       try {
00287         HeaderView hdrView(&buf_[0]);
00288         if (hdrView.code() != Header::EVENT) {
00289           throw cms::Exception("OnlineHttpReader", "readOneEvent");
00290         }
00291         EventMsgView eventView(&buf_[0]);
00292         evtPtr = deserializeEvent(eventView);
00293       }
00294       catch (cms::Exception excpt) {
00295         const unsigned int MAX_DUMP_LENGTH = 2000;
00296         std::cout << "========================================" << std::endl;
00297         std::cout << "* Exception decoding the geteventdata response from the storage manager!" << std::endl;
00298         if (data.d_.length() <= MAX_DUMP_LENGTH) {
00299           std::cout << "* Here is the raw text that was returned:" << std::endl;
00300           std::cout << data.d_ << std::endl;
00301         }
00302         else {
00303           std::cout << "* Here are the first " << MAX_DUMP_LENGTH <<
00304             " characters of the raw text that was returned:" << std::endl;
00305           std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl;
00306         }
00307         std::cout << "========================================" << std::endl;
00308         throw excpt;
00309       }
00310       return evtPtr;
00311     }
00312   }

std::auto_ptr< edm::EventPrincipal > edm::OnlineHttpReader::read (  )  [virtual]

Implements edm::StreamerInputSource.

Definition at line 120 of file OnlineHttpReader.cc.

References alreadyRegistered_, consumerId_, getOneEvent(), NULL, readHeader(), registerWithEventServer(), HLT_VtxMuL3::result, runEnded_, and edm::shutdown_flag.

00121   {
00122     // on a stop and enable reregister but don't get the header as
00123     // we cannot change the product registry
00124     if(!alreadyRegistered_) {
00125       consumerId_ = (time(0) & 0xffffff);  // temporary - will get from ES later
00126       registerWithEventServer();
00127       alreadyRegistered_ = true;
00128       // we need to fake getting the Header again, but we don't need it
00129       // TODO: change the requirements in the SM and SMProxy to get an event
00130       readHeader();
00131     }
00132     // repeat a https get every N seconds until we get an event
00133     // wait for Storage Manager event server buffer to not be empty
00134     // only way to stop is specify a maxEvents parameter
00135     // or kill the Storage Manager so the https get fails.
00136 
00137     // try to get an event repeat until we get one, this allows
00138     // re-registration if the SM is halted or stopped
00139 
00140     bool gotEvent = false;
00141     std::auto_ptr<EventPrincipal> result(0);
00142     while ((!gotEvent) && (!runEnded_) && (!edm::shutdown_flag))
00143     {
00144        result = getOneEvent();
00145        if(result.get() != NULL) gotEvent = true;
00146     }
00147     // need next line so we only return a null pointer once for each end of run
00148     if(runEnded_) runEnded_ = false;
00149     return result;
00150   }

void edm::OnlineHttpReader::readHeader (  ) 

Definition at line 314 of file OnlineHttpReader.cc.

References TestMuL1L2Filter_cff::cerr, consumerId_, convert(), GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, edm::StreamerInputSource::deserializeAndMergeWithRegistry(), lat::endl(), Exception, FDEBUG, stor::func(), Header::HEADER_REQUEST, headerRetryInterval_, headerurl_, i, Header::INIT, len, OtherMessageBuilder::msgBody(), NULL, p, stor::setopt(), edm::shutdown_flag, OtherMessageBuilder::size(), and OtherMessageBuilder::startAddress().

Referenced by OnlineHttpReader(), and read().

00315   {
00316     // repeat a https get every 5 seconds until we get the registry
00317     // do it like this for pull mode
00318     bool alreadySaidWaiting = false;
00319     stor::ReadData data;
00320     do {
00321       CURL* han = curl_easy_init();
00322 
00323       if(han==0)
00324         {
00325           cerr << "could not create handle" << endl;
00326           //return 0; //or use this?
00327           throw cms::Exception("readHeader","OnlineHttpReader")
00328             << "Could not get header: probably XDAQ not running on Storage Manager "
00329             << "\n";
00330         }
00331 
00332       stor::setopt(han,CURLOPT_URL,headerurl_);
00333       stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00334       stor::setopt(han,CURLOPT_WRITEDATA,&data);
00335 
00336       // 10-Aug-2006, KAB: send our consumer ID as part of the header request
00337       char msgBuff[100];
00338       OtherMessageBuilder requestMessage(&msgBuff[0], Header::HEADER_REQUEST,
00339                                          sizeof(char_uint32));
00340       uint8 *bodyPtr = requestMessage.msgBody();
00341       convert(consumerId_, bodyPtr);
00342       stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00343       stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00344       struct curl_slist *headers=NULL;
00345       headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00346       headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00347       stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00348 
00349       // send the HTTP POST, read the reply, and cleanup before going on
00350       CURLcode messageStatus = curl_easy_perform(han);
00351       curl_slist_free_all(headers);
00352       curl_easy_cleanup(han);
00353 
00354       if(messageStatus!=0)
00355       {
00356         cerr << "curl perform failed for header" << endl;
00357         // do not retry curl here as we should return to registration instead if we
00358         // want an automatic recovery
00359         throw cms::Exception("readHeader","OnlineHttpReader")
00360           << "Could not get header: probably XDAQ not running on Storage Manager "
00361           << "\n";
00362       }
00363       if(data.d_.length() == 0)
00364       {
00365         if(!alreadySaidWaiting) {
00366           std::cout << "...waiting for header from Storage Manager..." << std::endl;
00367           alreadySaidWaiting = true;
00368         }
00369         // sleep for desired amount of time
00370         sleep(headerRetryInterval_);
00371       }
00372     } while (data.d_.length() == 0 && !edm::shutdown_flag);
00373     if (edm::shutdown_flag) {
00374       throw cms::Exception("readHeader","OnlineHttpReader")
00375           << "The header read was aborted by a shutdown request.\n";
00376     }
00377 
00378     std::vector<char> regdata(1000*1000);
00379 
00380     // rely on https transfer string of correct length!
00381     int len = data.d_.length();
00382     FDEBUG(9) << "OnlineHttpReader received registry len = " << len << std::endl;
00383     regdata.resize(len);
00384     for (int i=0; i<len ; i++) regdata[i] = data.d_[i];
00385     // 21-Jun-2006, KAB:  catch (and re-throw) any exceptions decoding
00386     // the job header so that we can display the returned HTML and
00387     // (hopefully) give the user a hint as to the cause of the problem.
00388     std::auto_ptr<SendJobHeader> p;
00389     try {
00390       HeaderView hdrView(&regdata[0]);
00391       if (hdrView.code() != Header::INIT) {
00392         throw cms::Exception("OnlineHttpReader", "readHeader");
00393       }
00394       InitMsgView initView(&regdata[0]);
00395       deserializeAndMergeWithRegistry(initView);
00396     }
00397     catch (cms::Exception excpt) {
00398       const unsigned int MAX_DUMP_LENGTH = 1000;
00399       std::cout << "========================================" << std::endl;
00400       std::cout << "* Exception decoding the getregdata response from the storage manager!" << std::endl;
00401       if (data.d_.length() <= MAX_DUMP_LENGTH) {
00402         std::cout << "* Here is the raw text that was returned:" << std::endl;
00403         std::cout << data.d_ << std::endl;
00404       }
00405       else {
00406         std::cout << "* Here are the first " << MAX_DUMP_LENGTH <<
00407           " characters of the raw text that was returned:" << std::endl;
00408         std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl;
00409       }
00410       std::cout << "========================================" << std::endl;
00411       throw excpt;
00412     }
00413   }

void edm::OnlineHttpReader::registerWithEventServer (  ) 

Definition at line 415 of file OnlineHttpReader.cc.

References buf_, TestMuL1L2Filter_cff::cerr, connectTrySleepTime_, consumerId_, consumerName_, consumerPriority_, consumerPSetString_, GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, lat::endl(), ConsRegResponseBuilder::ES_NOT_READY, Exception, FDEBUG, stor::func(), headerRetryInterval_, i, len, maxConnectTries_, NULL, stor::setopt(), edm::shutdown_flag, ConsRegRequestBuilder::size(), ConsRegRequestBuilder::startAddress(), and subscriptionurl_.

Referenced by OnlineHttpReader(), and read().

00416   {
00417     stor::ReadData data;
00418     uint32 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00419     bool alreadySaidWaiting = false;
00420     do {
00421       data.d_.clear();
00422       CURL* han = curl_easy_init();
00423       if(han==0)
00424         {
00425           cerr << "could not create handle" << endl;
00426           throw cms::Exception("registerWithEventServer","OnlineHttpReader")
00427             << "Unable to create curl handle\n";
00428         }
00429 
00430       // set the standard https request options
00431       stor::setopt(han,CURLOPT_URL,subscriptionurl_);
00432       stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00433       stor::setopt(han,CURLOPT_WRITEDATA,&data);
00434 
00435       // build the registration request message to send to the storage manager
00436       const int BUFFER_SIZE = 2000;
00437       char msgBuff[BUFFER_SIZE];
00438       ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, consumerName_,
00439                                        consumerPriority_, consumerPSetString_);
00440 
00441       // add the request message as a https post
00442       stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00443       stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00444       struct curl_slist *headers=NULL;
00445       headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00446       headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00447       stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00448 
00449       // send the HTTP POST, read the reply, and cleanup before going on
00450       //CURLcode messageStatus = (CURLcode)-1;
00451       // set messageStatus to a non-zero (but still within CURLcode enum list)
00452       CURLcode messageStatus = CURLE_COULDNT_CONNECT;
00453       int tries = 0;
00454       while (messageStatus!=0 && !edm::shutdown_flag)
00455       {
00456         tries++;
00457         messageStatus = curl_easy_perform(han);
00458         if ( messageStatus != 0 )
00459         {
00460           if ( tries >= maxConnectTries_ )
00461           {
00462             std::cerr << "Giving up waiting for connection after " << tries 
00463                       << " tries"  << std::endl;
00464             curl_slist_free_all(headers);
00465             curl_easy_cleanup(han);
00466             cerr << "curl perform failed for registration" << endl;
00467             throw cms::Exception("registerWithEventServer","OnlineHttpReader")
00468               << "Could not register: probably XDAQ not running on Storage Manager "
00469               << "\n";
00470           }
00471           else
00472           {
00473             std::cout << "Waiting for connection to StorageManager... " 
00474                       << tries << "/" << maxConnectTries_
00475                       << std::endl;
00476             sleep(connectTrySleepTime_);
00477           }
00478         }
00479       }
00480       if (edm::shutdown_flag) {
00481           continue;
00482       }
00483 
00484       curl_slist_free_all(headers);
00485       curl_easy_cleanup(han);
00486 
00487       registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00488       if(data.d_.length() > 0)
00489       {
00490         int len = data.d_.length();
00491         FDEBUG(9) << "OnlineHttpReader received len = " << len << std::endl;
00492         buf_.resize(len);
00493         for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00494 
00495         try {
00496           ConsRegResponseView respView(&buf_[0]);
00497           registrationStatus = respView.getStatus();
00498           consumerId_ = respView.getConsumerId();
00499         }
00500         catch (cms::Exception excpt) {
00501           const unsigned int MAX_DUMP_LENGTH = 1000;
00502           std::cout << "========================================" << std::endl;
00503           std::cout << "* Exception decoding the registerWithEventServer response!" << std::endl;
00504           if (data.d_.length() <= MAX_DUMP_LENGTH) {
00505             std::cout << "* Here is the raw text that was returned:" << std::endl;
00506             std::cout << data.d_ << std::endl;
00507           }
00508           else {
00509             std::cout << "* Here are the first " << MAX_DUMP_LENGTH <<
00510               " characters of the raw text that was returned:" << std::endl;
00511             std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl;
00512           }
00513           std::cout << "========================================" << std::endl;
00514           throw excpt;
00515         }
00516       }
00517 
00518       if (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY)
00519       {
00520         if(!alreadySaidWaiting) {
00521           std::cout << "...waiting for registration response from Storage Manager..." << std::endl;
00522           alreadySaidWaiting = true;
00523         }
00524         // sleep for desired amount of time
00525         sleep(headerRetryInterval_);
00526       }
00527     } while (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY &&
00528              !edm::shutdown_flag);
00529     if (edm::shutdown_flag) {
00530       throw cms::Exception("registerWithEventServer","OnlineHttpReader")
00531           << "Registration was aborted by a shutdown request.\n";
00532     }
00533 
00534     FDEBUG(5) << "Consumer ID = " << consumerId_ << endl;
00535   }

void edm::OnlineHttpReader::setRun ( RunNumber_t  r  )  [private, virtual]

Reimplemented from edm::StreamerInputSource.

Definition at line 110 of file OnlineHttpReader.cc.

References edm::StreamerInputSource::resetAfterEndRun(), edm::InputSource::resetLuminosityBlockPrincipal(), and edm::InputSource::resetRunPrincipal().

00110                                              {
00111      // We do not actually set the run number, just use this hook
00112      // to do the needed resetting of flags in the EP and this source
00113      resetAfterEndRun();
00114      // I think we need the 2 of the 3 lines below as in DAQ source
00115      //newRun_ = newLumi_ = true;
00116      resetLuminosityBlockPrincipal();
00117      resetRunPrincipal();
00118    }


Member Data Documentation

bool edm::OnlineHttpReader::alreadyGotHeader_ [private]

Definition at line 60 of file OnlineHttpReader.h.

Referenced by OnlineHttpReader().

bool edm::OnlineHttpReader::alreadyRegistered_ [private]

Definition at line 59 of file OnlineHttpReader.h.

Referenced by getOneEvent(), OnlineHttpReader(), and read().

bool edm::OnlineHttpReader::alreadySaidHalted_ [private]

Definition at line 58 of file OnlineHttpReader.h.

Referenced by getOneEvent().

Buf edm::OnlineHttpReader::buf_ [private]

Definition at line 46 of file OnlineHttpReader.h.

Referenced by getOneEvent(), and registerWithEventServer().

int edm::OnlineHttpReader::connectTrySleepTime_ [private]

Definition at line 67 of file OnlineHttpReader.h.

Referenced by OnlineHttpReader(), and registerWithEventServer().

unsigned int edm::OnlineHttpReader::consumerId_ [private]

Definition at line 54 of file OnlineHttpReader.h.

Referenced by getOneEvent(), OnlineHttpReader(), read(), readHeader(), and registerWithEventServer().

std::string edm::OnlineHttpReader::consumerName_ [private]

Definition at line 49 of file OnlineHttpReader.h.

Referenced by OnlineHttpReader(), and registerWithEventServer().

std::string edm::OnlineHttpReader::consumerPriority_ [private]

Definition at line 50 of file OnlineHttpReader.h.

Referenced by OnlineHttpReader(), and registerWithEventServer().

std::string edm::OnlineHttpReader::consumerPSetString_ [private]

Definition at line 51 of file OnlineHttpReader.h.

Referenced by OnlineHttpReader(), and registerWithEventServer().

bool edm::OnlineHttpReader::endRunAlreadyNotified_ [private]

Definition at line 56 of file OnlineHttpReader.h.

Referenced by getOneEvent().

char edm::OnlineHttpReader::eventurl_[256] [private]

Definition at line 43 of file OnlineHttpReader.h.

Referenced by getOneEvent(), and OnlineHttpReader().

int edm::OnlineHttpReader::headerRetryInterval_ [private]

Definition at line 52 of file OnlineHttpReader.h.

Referenced by OnlineHttpReader(), readHeader(), and registerWithEventServer().

char edm::OnlineHttpReader::headerurl_[256] [private]

Definition at line 44 of file OnlineHttpReader.h.

Referenced by OnlineHttpReader(), and readHeader().

int edm::OnlineHttpReader::hltBitCount [private]

Definition at line 47 of file OnlineHttpReader.h.

int edm::OnlineHttpReader::l1BitCount [private]

Definition at line 48 of file OnlineHttpReader.h.

struct timeval edm::OnlineHttpReader::lastRequestTime_ [read, private]

Definition at line 55 of file OnlineHttpReader.h.

Referenced by getOneEvent(), and OnlineHttpReader().

int edm::OnlineHttpReader::maxConnectTries_ [private]

Definition at line 66 of file OnlineHttpReader.h.

Referenced by OnlineHttpReader(), and registerWithEventServer().

double edm::OnlineHttpReader::minEventRequestInterval_ [private]

Definition at line 53 of file OnlineHttpReader.h.

Referenced by getOneEvent(), and OnlineHttpReader().

bool edm::OnlineHttpReader::runEnded_ [private]

Definition at line 57 of file OnlineHttpReader.h.

Referenced by getOneEvent(), and read().

std::string edm::OnlineHttpReader::sourceurl_ [private]

Definition at line 42 of file OnlineHttpReader.h.

Referenced by OnlineHttpReader().

char edm::OnlineHttpReader::subscriptionurl_[256] [private]

Definition at line 45 of file OnlineHttpReader.h.

Referenced by OnlineHttpReader(), and registerWithEventServer().


The documentation for this class was generated from the following files:
Generated on Tue Jun 9 18:42:01 2009 for CMSSW by  doxygen 1.5.4