00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "EventFilter/StorageManager/src/EventStreamHttpReader.h"
00024 #include "EventFilter/StorageManager/interface/SMCurlInterface.h"
00025 #include "FWCore/Utilities/interface/DebugMacros.h"
00026 #include "FWCore/Utilities/interface/Exception.h"
00027 #include "IOPool/Streamer/interface/ClassFiller.h"
00028 #include "IOPool/Streamer/interface/OtherMessage.h"
00029 #include "IOPool/Streamer/interface/ConsRegMessage.h"
00030 #include "EventFilter/StorageManager/interface/ConsumerPipe.h"
00031 #include "FWCore/Framework/interface/EventPrincipal.h"
00032 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00033
00034 #include <algorithm>
00035 #include <iterator>
00036 #include "curl/curl.h"
00037
00038 #include <wait.h>
00039
00040 using namespace std;
00041 using namespace edm;
00042
00043 namespace edm
00044 {
00045 EventStreamHttpReader::EventStreamHttpReader(edm::ParameterSet const& ps,
00046 edm::InputSourceDescription const& desc):
00047 edm::StreamerInputSource(ps, desc),
00048 sourceurl_(ps.getParameter<string>("sourceURL")),
00049 buf_(1000*1000*7),
00050 endRunAlreadyNotified_(true),
00051 runEnded_(false),
00052 alreadySaidHalted_(false),
00053 maxConnectTries_(DEFAULT_MAX_CONNECT_TRIES),
00054 connectTrySleepTime_(DEFAULT_CONNECT_TRY_SLEEP_TIME)
00055 {
00056
00057 maxConnectTries_ = ps.getUntrackedParameter<int>("maxConnectTries",
00058 DEFAULT_MAX_CONNECT_TRIES);
00059 connectTrySleepTime_ = ps.getUntrackedParameter<int>("connectTrySleepTime",
00060 DEFAULT_CONNECT_TRY_SLEEP_TIME);
00061 inputFileTransitionsEachEvent_ =
00062 ps.getUntrackedParameter<bool>("inputFileTransitionsEachEvent", true);
00063
00064 std::string evturl = sourceurl_ + "/geteventdata";
00065 int stlen = evturl.length();
00066 for (int i=0; i<stlen; i++) eventurl_[i]=evturl[i];
00067 eventurl_[stlen] = '\0';
00068
00069 std::string header = sourceurl_ + "/getregdata";
00070 stlen = header.length();
00071 for (int i=0; i<stlen; i++) headerurl_[i]=header[i];
00072 headerurl_[stlen] = '\0';
00073
00074 std::string regurl = sourceurl_ + "/registerConsumer";
00075 stlen = regurl.length();
00076 for (int i=0; i<stlen; i++) subscriptionurl_[i]=regurl[i];
00077 subscriptionurl_[stlen] = '\0';
00078
00079
00080 const double MAX_REQUEST_INTERVAL = 300.0;
00081 consumerName_ = ps.getUntrackedParameter<string>("consumerName","Unknown");
00082 consumerPriority_ = ps.getUntrackedParameter<string>("consumerPriority","normal");
00083 headerRetryInterval_ = ps.getUntrackedParameter<int>("headerRetryInterval",5);
00084 double maxEventRequestRate = ps.getUntrackedParameter<double>("maxEventRequestRate",1.0);
00085 if (maxEventRequestRate < (1.0 / MAX_REQUEST_INTERVAL)) {
00086 minEventRequestInterval_ = MAX_REQUEST_INTERVAL;
00087 }
00088 else {
00089 minEventRequestInterval_ = 1.0 / maxEventRequestRate;
00090 }
00091 lastRequestTime_.tv_sec = 0;
00092 lastRequestTime_.tv_usec = 0;
00093
00094
00095
00096
00097 consumerPSetString_ = ps.toString();
00098
00099
00100 consumerId_ = (time(0) & 0xffffff);
00101 registerWithEventServer();
00102
00103 readHeader();
00104 }
00105
00106 EventStreamHttpReader::~EventStreamHttpReader()
00107 {
00108 }
00109
00110 std::auto_ptr<edm::EventPrincipal> EventStreamHttpReader::read()
00111 {
00112
00113
00114
00115
00116
00117
00118
00119
00120 bool gotEvent = false;
00121 std::auto_ptr<EventPrincipal> result(0);
00122 while ((!gotEvent) && (!runEnded_) && (!edm::shutdown_flag))
00123 {
00124 result = getOneEvent();
00125 if(result.get() != NULL) gotEvent = true;
00126 }
00127
00128 if(runEnded_) runEnded_ = false;
00129 return result;
00130 }
00131
00132 std::auto_ptr<edm::EventPrincipal> EventStreamHttpReader::getOneEvent()
00133 {
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143 struct timeval now;
00144 struct timezone dummyTZ;
00145 gettimeofday(&now, &dummyTZ);
00146 double timeDiff = (double) now.tv_sec;
00147 timeDiff -= (double) lastRequestTime_.tv_sec;
00148 timeDiff += ((double) now.tv_usec / 1000000.0);
00149 timeDiff -= ((double) lastRequestTime_.tv_usec / 1000000.0);
00150 if (timeDiff < minEventRequestInterval_)
00151 {
00152 double sleepTime = minEventRequestInterval_ - timeDiff;
00153
00154
00155 sleepTime -= 0.01;
00156 if (sleepTime < 0.0) {sleepTime = 0.0;}
00157
00158 usleep(static_cast<int>(1000000 * sleepTime));
00159 gettimeofday(&lastRequestTime_, &dummyTZ);
00160 }
00161 else
00162 {
00163 lastRequestTime_ = now;
00164 }
00165
00166 stor::ReadData data;
00167 bool alreadySaidWaiting = false;
00168 do {
00169 CURL* han = curl_easy_init();
00170
00171 if(han==0)
00172 {
00173 cerr << "could not create handle" << endl;
00174
00175
00176 throw cms::Exception("getOneEvent","EventStreamHttpReader")
00177 << "Could not get event: problem with curl"
00178 << "\n";
00179 }
00180
00181 stor::setopt(han,CURLOPT_URL,eventurl_);
00182 stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00183 stor::setopt(han,CURLOPT_WRITEDATA,&data);
00184
00185
00186 char msgBuff[100];
00187 OtherMessageBuilder requestMessage(&msgBuff[0], Header::EVENT_REQUEST,
00188 sizeof(char_uint32));
00189 uint8 *bodyPtr = requestMessage.msgBody();
00190 convert(consumerId_, bodyPtr);
00191 stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00192 stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00193 struct curl_slist *headers=NULL;
00194 headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00195 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00196 stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00197
00198
00199 CURLcode messageStatus = curl_easy_perform(han);
00200 curl_slist_free_all(headers);
00201 curl_easy_cleanup(han);
00202
00203 if(messageStatus!=0)
00204 {
00205 cerr << "curl perform failed for event, messageStatus = "
00206 << messageStatus << endl;
00207
00208
00209 throw cms::Exception("getOneEvent","EventStreamHttpReader")
00210 << "Could not get event: probably XDAQ not running on Storage Manager "
00211 << "\n";
00212 }
00213 if(data.d_.length() == 0)
00214 {
00215 if(!alreadySaidWaiting) {
00216 std::cout << "...waiting for event from Storage Manager..." << std::endl;
00217 alreadySaidWaiting = true;
00218 }
00219
00220 usleep(static_cast<int>(1000000 * minEventRequestInterval_));
00221 }
00222 } while (data.d_.length() == 0 && !edm::shutdown_flag);
00223 if (edm::shutdown_flag) {
00224 return std::auto_ptr<edm::EventPrincipal>();
00225 }
00226
00227 int len = data.d_.length();
00228 FDEBUG(9) << "EventStreamHttpReader received len = " << len << std::endl;
00229 buf_.resize(len);
00230 for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00231
00232
00233 OtherMessageView msgView(&buf_[0]);
00234
00235 if (msgView.code() == Header::DONE) {
00236
00237
00238
00239
00240 if(!alreadySaidHalted_) {
00241 alreadySaidHalted_ = true;
00242 std::cout << "Storage Manager has stopped - waiting for restart" << std::endl;
00243 std::cout << "Warning! If you are waiting forever at: "
00244 << "...waiting for event from Storage Manager... " << std::endl
00245 << " it may be that the Storage Manager has been halted with a haltAction," << std::endl
00246 << " instead of a stopAction. In this case you should control-c to end " << std::endl
00247 << " this consumer and restart it. (This will be fixed in a future update)" << std::endl;
00248 }
00249
00250 if(!endRunAlreadyNotified_) {
00251 endRunAlreadyNotified_ = true;
00252 setEndRun();
00253 runEnded_ = true;
00254 }
00255 return std::auto_ptr<edm::EventPrincipal>();
00256 } else {
00257
00258 endRunAlreadyNotified_ = false;
00259 alreadySaidHalted_ = false;
00260
00261
00262
00263
00264 std::auto_ptr<edm::EventPrincipal> evtPtr;
00265 try {
00266 HeaderView hdrView(&buf_[0]);
00267 if (hdrView.code() != Header::EVENT) {
00268 throw cms::Exception("EventStreamHttpReader", "readOneEvent");
00269 }
00270 EventMsgView eventView(&buf_[0]);
00271 evtPtr = deserializeEvent(eventView);
00272 }
00273 catch (cms::Exception excpt) {
00274 const unsigned int MAX_DUMP_LENGTH = 2000;
00275 std::cout << "========================================" << std::endl;
00276 std::cout << "* Exception decoding the geteventdata response from the storage manager!" << std::endl;
00277 if (data.d_.length() <= MAX_DUMP_LENGTH) {
00278 std::cout << "* Here is the raw text that was returned:" << std::endl;
00279 std::cout << data.d_ << std::endl;
00280 }
00281 else {
00282 std::cout << "* Here are the first " << MAX_DUMP_LENGTH <<
00283 " characters of the raw text that was returned:" << std::endl;
00284 std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl;
00285 }
00286 std::cout << "========================================" << std::endl;
00287 throw excpt;
00288 }
00289 return evtPtr;
00290 }
00291 }
00292
00293 void EventStreamHttpReader::readHeader()
00294 {
00295
00296
00297 bool alreadySaidWaiting = false;
00298 stor::ReadData data;
00299 do {
00300 CURL* han = curl_easy_init();
00301
00302 if(han==0)
00303 {
00304 cerr << "could not create handle" << endl;
00305
00306 throw cms::Exception("readHeader","EventStreamHttpReader")
00307 << "Could not get header: probably XDAQ not running on Storage Manager "
00308 << "\n";
00309 }
00310
00311 stor::setopt(han,CURLOPT_URL,headerurl_);
00312 stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00313 stor::setopt(han,CURLOPT_WRITEDATA,&data);
00314
00315
00316 char msgBuff[100];
00317 OtherMessageBuilder requestMessage(&msgBuff[0], Header::HEADER_REQUEST,
00318 sizeof(char_uint32));
00319 uint8 *bodyPtr = requestMessage.msgBody();
00320 convert(consumerId_, bodyPtr);
00321 stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00322 stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00323 struct curl_slist *headers=NULL;
00324 headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00325 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00326 stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00327
00328
00329 CURLcode messageStatus = curl_easy_perform(han);
00330 curl_slist_free_all(headers);
00331 curl_easy_cleanup(han);
00332
00333 if(messageStatus!=0)
00334 {
00335 cerr << "curl perform failed for header" << endl;
00336
00337
00338 throw cms::Exception("readHeader","EventStreamHttpReader")
00339 << "Could not get header: probably XDAQ not running on Storage Manager "
00340 << "\n";
00341 }
00342 if(data.d_.length() == 0)
00343 {
00344 if(!alreadySaidWaiting) {
00345 std::cout << "...waiting for header from Storage Manager..." << std::endl;
00346 alreadySaidWaiting = true;
00347 }
00348
00349 sleep(headerRetryInterval_);
00350 }
00351 } while (data.d_.length() == 0 && !edm::shutdown_flag);
00352 if (edm::shutdown_flag) {
00353 throw cms::Exception("readHeader","EventStreamHttpReader")
00354 << "The header read was aborted by a shutdown request.\n";
00355 }
00356
00357 std::vector<char> regdata(1000*1000);
00358
00359
00360 int len = data.d_.length();
00361 FDEBUG(9) << "EventStreamHttpReader received registry len = " << len << std::endl;
00362 regdata.resize(len);
00363 for (int i=0; i<len ; i++) regdata[i] = data.d_[i];
00364
00365
00366
00367 std::auto_ptr<SendJobHeader> p;
00368 try {
00369 HeaderView hdrView(®data[0]);
00370 if (hdrView.code() != Header::INIT) {
00371 throw cms::Exception("EventStreamHttpReader", "readHeader");
00372 }
00373 InitMsgView initView(®data[0]);
00374 deserializeAndMergeWithRegistry(initView);
00375 }
00376 catch (cms::Exception excpt) {
00377 const unsigned int MAX_DUMP_LENGTH = 1000;
00378 std::cout << "========================================" << std::endl;
00379 std::cout << "* Exception decoding the getregdata response from the storage manager!" << std::endl;
00380 if (data.d_.length() <= MAX_DUMP_LENGTH) {
00381 std::cout << "* Here is the raw text that was returned:" << std::endl;
00382 std::cout << data.d_ << std::endl;
00383 }
00384 else {
00385 std::cout << "* Here are the first " << MAX_DUMP_LENGTH <<
00386 " characters of the raw text that was returned:" << std::endl;
00387 std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl;
00388 }
00389 std::cout << "========================================" << std::endl;
00390 throw excpt;
00391 }
00392 }
00393
00394 void EventStreamHttpReader::registerWithEventServer()
00395 {
00396 stor::ReadData data;
00397 uint32 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00398 bool alreadySaidWaiting = false;
00399 do {
00400 data.d_.clear();
00401 CURL* han = curl_easy_init();
00402 if(han==0)
00403 {
00404 cerr << "could not create handle" << endl;
00405 throw cms::Exception("registerWithEventServer","EventStreamHttpReader")
00406 << "Unable to create curl handle\n";
00407 }
00408
00409
00410 stor::setopt(han,CURLOPT_URL,subscriptionurl_);
00411 stor::setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00412 stor::setopt(han,CURLOPT_WRITEDATA,&data);
00413
00414
00415 const int BUFFER_SIZE = 2000;
00416 char msgBuff[BUFFER_SIZE];
00417 ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, consumerName_,
00418 consumerPriority_, consumerPSetString_);
00419
00420
00421 stor::setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00422 stor::setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00423 struct curl_slist *headers=NULL;
00424 headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00425 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00426 stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00427
00428
00429
00430
00431 CURLcode messageStatus = CURLE_COULDNT_CONNECT;
00432 int tries = 0;
00433 while (messageStatus!=0 && !edm::shutdown_flag)
00434 {
00435 tries++;
00436 messageStatus = curl_easy_perform(han);
00437 if ( messageStatus != 0 )
00438 {
00439 if ( tries >= maxConnectTries_ )
00440 {
00441 std::cerr << "Giving up waiting for connection after " << tries
00442 << " tries" << std::endl;
00443 curl_slist_free_all(headers);
00444 curl_easy_cleanup(han);
00445 cerr << "curl perform failed for registration" << endl;
00446 throw cms::Exception("registerWithEventServer","EventStreamHttpReader")
00447 << "Could not register: probably XDAQ not running on Storage Manager "
00448 << "\n";
00449 }
00450 else
00451 {
00452 std::cout << "Waiting for connection to StorageManager... "
00453 << tries << "/" << maxConnectTries_
00454 << std::endl;
00455 sleep(connectTrySleepTime_);
00456 }
00457 }
00458 }
00459 if (edm::shutdown_flag) {
00460 continue;
00461 }
00462
00463 curl_slist_free_all(headers);
00464 curl_easy_cleanup(han);
00465
00466 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00467 if(data.d_.length() > 0)
00468 {
00469 int len = data.d_.length();
00470 FDEBUG(9) << "EventStreamHttpReader received len = " << len << std::endl;
00471 buf_.resize(len);
00472 for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00473
00474 try {
00475 ConsRegResponseView respView(&buf_[0]);
00476 registrationStatus = respView.getStatus();
00477 consumerId_ = respView.getConsumerId();
00478 }
00479 catch (cms::Exception excpt) {
00480 const unsigned int MAX_DUMP_LENGTH = 1000;
00481 std::cout << "========================================" << std::endl;
00482 std::cout << "* Exception decoding the registerWithEventServer response!" << std::endl;
00483 if (data.d_.length() <= MAX_DUMP_LENGTH) {
00484 std::cout << "* Here is the raw text that was returned:" << std::endl;
00485 std::cout << data.d_ << std::endl;
00486 }
00487 else {
00488 std::cout << "* Here are the first " << MAX_DUMP_LENGTH <<
00489 " characters of the raw text that was returned:" << std::endl;
00490 std::cout << (data.d_.substr(0, MAX_DUMP_LENGTH)) << std::endl;
00491 }
00492 std::cout << "========================================" << std::endl;
00493 throw excpt;
00494 }
00495 }
00496
00497 if (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY)
00498 {
00499 if(!alreadySaidWaiting) {
00500 std::cout << "...waiting for registration response from Storage Manager..." << std::endl;
00501 alreadySaidWaiting = true;
00502 }
00503
00504 sleep(headerRetryInterval_);
00505 }
00506 } while (registrationStatus == ConsRegResponseBuilder::ES_NOT_READY &&
00507 !edm::shutdown_flag);
00508 if (edm::shutdown_flag) {
00509 throw cms::Exception("registerWithEventServer","EventStreamHttpReader")
00510 << "Registration was aborted by a shutdown request.\n";
00511 }
00512
00513 FDEBUG(5) << "Consumer ID = " << consumerId_ << endl;
00514 }
00515 }