00001
00002
00003 #include "EventFilter/SMProxyServer/interface/DataProcessManager.h"
00004 #include "EventFilter/StorageManager/interface/SMCurlInterface.h"
00005 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00006 #include "FWCore/Utilities/interface/DebugMacros.h"
00007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00008 #include "IOPool/Streamer/interface/BufferArea.h"
00009 #include "IOPool/Streamer/interface/OtherMessage.h"
00010 #include "IOPool/Streamer/interface/ConsRegMessage.h"
00011
00012 #include "boost/bind.hpp"
00013
00014 #include "curl/curl.h"
00015 #include <wait.h>
00016
00017 using namespace std;
00018 using namespace edm;
00019
00020 using boost::thread;
00021 using boost::bind;
00022
00023 namespace
00024 {
00025 const int voidptr_size = sizeof(void*);
00026 }
00027
00028 namespace stor
00029 {
00030
00031 DataProcessManager::DataProcessManager():
00032 cmd_q_(edm::getEventBuffer(voidptr_size,50)),
00033 alreadyRegistered_(false),
00034 alreadyRegisteredDQM_(false),
00035 headerRefetchRequested_(false),
00036 buf_(2000),
00037 headerRetryInterval_(5),
00038 dqmServiceManager_(new stor::DQMServiceManager()),
00039 receivedEvents_(0),
00040 receivedDQMEvents_(0),
00041 samples_(100),
00042 period4samples_(5)
00043 {
00044
00045 pmeter_ = new stor::SMPerformanceMeter();
00046 init();
00047 }
00048
00049 DataProcessManager::~DataProcessManager()
00050 {
00051 delete pmeter_;
00052 }
00053
00054 void DataProcessManager::init()
00055 {
00056 regpage_ = "/registerConsumer";
00057 DQMregpage_ = "/registerDQMConsumer";
00058 eventpage_ = "/geteventdata";
00059 DQMeventpage_ = "/getDQMeventdata";
00060 headerpage_ = "/getregdata";
00061 consumerName_ = stor::PROXY_SERVER_NAME;
00062
00063 consumerPriority_ = "Normal";
00064 DQMconsumerName_ = stor::PROXY_SERVER_NAME;
00065
00066 DQMconsumerPriority_ = "Normal";
00067
00068 this->setMaxEventRequestRate(10.0);
00069 consumerId_ = (time(0) & 0xffffff);
00070
00071 this->setMaxDQMEventRequestRate(0.2);
00072 DQMconsumerId_ = (time(0) & 0xffffff);
00073
00074 alreadyRegistered_ = false;
00075 alreadyRegisteredDQM_ = false;
00076 headerRefetchRequested_ = false;
00077
00078 smList_.clear();
00079 smRegMap_.clear();
00080 smHeaderMap_.clear();
00081 DQMsmList_.clear();
00082 DQMsmRegMap_.clear();
00083
00084
00085 consumerTopFolderName_ = "*";
00086
00087 receivedEvents_ = 0;
00088 receivedDQMEvents_ = 0;
00089 pmeter_->init(samples_, period4samples_);
00090 stats_.fullReset();
00091
00092
00093 ltEventFetchTimeCounter_.reset(new ForeverCounter());
00094 stEventFetchTimeCounter_.reset(new RollingIntervalCounter(180,5,20));
00095 ltDQMFetchTimeCounter_.reset(new ForeverCounter());
00096 stDQMFetchTimeCounter_.reset(new RollingIntervalCounter(180,5,20));
00097 }
00098
00099 void DataProcessManager::setSamples(unsigned long num_samples)
00100 {
00101 samples_ = num_samples;
00102 pmeter_->setSamples(num_samples);
00103 }
00104
00105 void DataProcessManager::setPeriod4Samples(unsigned long period4samples)
00106 {
00107 period4samples_ = period4samples;
00108 pmeter_->setPeriod4Samples(period4samples);
00109 }
00110
00111
00112 void DataProcessManager::setMaxEventRequestRate(double rate)
00113 {
00114 if(rate <= 0.0) return;
00115 maxEventRequestRate_ = rate;
00116 updateMinEventRequestInterval();
00117 }
00118
00119 void DataProcessManager::setMaxDQMEventRequestRate(double rate)
00120 {
00121 const double MAX_REQUEST_INTERVAL = 300.0;
00122 if(rate <= 0.0) return;
00123 if (rate < (1.0 / MAX_REQUEST_INTERVAL)) {
00124 minDQMEventRequestInterval_ = MAX_REQUEST_INTERVAL;
00125 }
00126 else {
00127 minDQMEventRequestInterval_ = 1.0 / rate;
00128 }
00129 }
00130
00131 void DataProcessManager::updateMinEventRequestInterval()
00132 {
00133 const double MAX_REQUEST_INTERVAL = 300.0;
00134 double rate = maxEventRequestRate_;
00135
00136 if (rate < (1.0 / MAX_REQUEST_INTERVAL)) {
00137 minEventRequestInterval_ = MAX_REQUEST_INTERVAL;
00138 }
00139 else {
00140
00141
00142
00143
00144
00145
00146 if (smList_.size() <= 1) {
00147 minEventRequestInterval_ = 1.0 / rate;
00148 }
00149 else {
00150 minEventRequestInterval_ = (smList_.size() - 1) / rate;
00151 }
00152 }
00153
00154
00155
00156
00157 edm::ParameterSet ps = ParameterSet();
00158 Entry maxRateEntry("maxEventRequestRate",
00159 (1.0 / minEventRequestInterval_),
00160 false);
00161 ps.insert(true, "maxEventRequestRate", maxRateEntry);
00162
00163 consumerPSetString_ = ps.toString();
00164 }
00165
00166 void DataProcessManager::run(DataProcessManager* t)
00167 {
00168 t->processCommands();
00169 }
00170
00171 void DataProcessManager::start()
00172 {
00173
00174
00175 me_.reset(new boost::thread(boost::bind(DataProcessManager::run,this)));
00176 }
00177
00178 void DataProcessManager::stop()
00179 {
00180
00181
00182
00183 edm::EventBuffer::ProducerBuffer cb(*cmd_q_);
00184 MsgCode mc(cb.buffer(),MsgCode::DONE);
00185 mc.setCode(MsgCode::DONE);
00186 cb.commit(mc.codeSize());
00187 }
00188
00189 void DataProcessManager::join()
00190 {
00191
00192 if(me_) me_->join();
00193 }
00194
00195 void DataProcessManager::processCommands()
00196 {
00197
00198
00199 bool doneWithRegistration = false;
00200
00201 unsigned int count = 0;
00202 unsigned int maxcount = 255;
00203 bool doneWithDQMRegistration = false;
00204 unsigned int countDQM = 0;
00205 bool alreadysaid = false;
00206 bool alreadysaidDQM = false;
00207
00208
00209 bool gotOneHeaderFromAll = false;
00210 unsigned int countINIT = 0;
00211 bool alreadysaidINIT = false;
00212
00213 bool DoneWithJob = false;
00214 while(!DoneWithJob)
00215 {
00216
00217
00218 if (headerRefetchRequested_) {
00219 headerRefetchRequested_ = false;
00220
00221 gotOneHeaderFromAll = false;
00222 smHeaderMap_.clear();
00223 countINIT = 0;
00224 }
00225
00226 if(!alreadyRegistered_) {
00227 if(!doneWithRegistration)
00228 {
00229 waitBetweenRegTrys();
00230 bool success = registerWithAllSM();
00231 if(success) doneWithRegistration = true;
00232 ++count;
00233 }
00234
00235 if(count >= maxcount) edm::LogInfo("processCommands") << "Could not register with all SM Servers"
00236 << " after " << maxcount << " tries";
00237 if(doneWithRegistration && !alreadysaid) {
00238 edm::LogInfo("processCommands") << "Registered with all SM Event Servers";
00239 alreadysaid = true;
00240 }
00241 if(doneWithRegistration) alreadyRegistered_ = true;
00242 }
00243
00244 if(!alreadyRegisteredDQM_) {
00245 if(!doneWithDQMRegistration)
00246 {
00247 waitBetweenRegTrys();
00248 bool success = registerWithAllDQMSM();
00249 if(success) doneWithDQMRegistration = true;
00250 ++countDQM;
00251 }
00252
00253 if(count >= maxcount) edm::LogInfo("processCommands") << "Could not register with all SM DQMEvent Servers"
00254 << " after " << maxcount << " tries";
00255 if(doneWithDQMRegistration && !alreadysaidDQM) {
00256 edm::LogInfo("processCommands") << "Registered with all SM DQMEvent Servers";
00257 alreadysaidDQM = true;
00258 }
00259 if(doneWithDQMRegistration) alreadyRegisteredDQM_ = true;
00260 }
00261
00262
00263
00264
00265
00266
00267
00268 if(!gotOneHeaderFromAll)
00269 {
00270 waitBetweenRegTrys();
00271
00272 bool success = getHeaderFromAllSM();
00273
00274 if(success) gotOneHeaderFromAll = true;
00275 ++countINIT;
00276 }
00277 if(countINIT >= maxcount) edm::LogInfo("processCommands") << "Could not get product registry!"
00278 << " after " << maxcount << " tries";
00279
00280 if(gotOneHeaderFromAll && !alreadysaidINIT) {
00281 edm::LogInfo("processCommands") << "Got the product registry";
00282 alreadysaidINIT = true;
00283 }
00284
00285 if(alreadyRegistered_ && gotOneHeaderFromAll && haveHeader()) {
00286 getEventFromAllSM();
00287 }
00288 if(alreadyRegisteredDQM_) {
00289 getDQMEventFromAllSM();
00290 }
00291
00292
00293 if(!cmd_q_->empty())
00294 {
00295
00296 edm::EventBuffer::ConsumerBuffer cb(*cmd_q_);
00297 MsgCode mc(cb.buffer(),cb.size());
00298
00299 if(mc.getCode()==MsgCode::DONE) DoneWithJob = true;
00300
00301 }
00302
00303 }
00304 edm::LogInfo("processCommands") << "Received done - stopping";
00305 if(dqmServiceManager_.get() != NULL) dqmServiceManager_->stop();
00306 }
00307
00308 void DataProcessManager::addSM2Register(std::string smURL)
00309 {
00310
00311
00312 bool alreadyInList = false;
00313 if(smList_.size() > 0) {
00314 for(unsigned int i = 0; i < smList_.size(); ++i) {
00315 if(smURL.compare(smList_[i]) == 0) {
00316 alreadyInList = true;
00317 break;
00318 }
00319 }
00320 }
00321 if(alreadyInList) return;
00322 smList_.push_back(smURL);
00323 smRegMap_.insert(std::make_pair(smURL,0));
00324 smHeaderMap_.insert(std::make_pair(smURL,false));
00325 struct timeval lastRequestTime;
00326 lastRequestTime.tv_sec = 0;
00327 lastRequestTime.tv_usec = 0;
00328 lastReqMap_.insert(std::make_pair(smURL,lastRequestTime));
00329 updateMinEventRequestInterval();
00330 }
00331
00332 void DataProcessManager::addDQMSM2Register(std::string DQMsmURL)
00333 {
00334
00335 bool alreadyInList = false;
00336 if(DQMsmList_.size() > 0) {
00337 for(unsigned int i = 0; i < DQMsmList_.size(); ++i) {
00338 if(DQMsmURL.compare(DQMsmList_[i]) == 0) {
00339 alreadyInList = true;
00340 break;
00341 }
00342 }
00343 }
00344 if(alreadyInList) return;
00345 DQMsmList_.push_back(DQMsmURL);
00346 DQMsmRegMap_.insert(std::make_pair(DQMsmURL,0));
00347 struct timeval lastRequestTime;
00348 lastRequestTime.tv_sec = 0;
00349 lastRequestTime.tv_usec = 0;
00350 lastDQMReqMap_.insert(std::make_pair(DQMsmURL,lastRequestTime));
00351 }
00352
00353 bool DataProcessManager::registerWithAllSM()
00354 {
00355
00356
00357
00358 if(smList_.size() == 0) return false;
00359 bool allRegistered = true;
00360 for(unsigned int i = 0; i < smList_.size(); ++i) {
00361 if(smRegMap_[smList_[i] ] > 0) continue;
00362 int consumerid = registerWithSM(smList_[i]);
00363 if(consumerid > 0) smRegMap_[smList_[i] ] = consumerid;
00364 else allRegistered = false;
00365 }
00366 return allRegistered;
00367 }
00368
00369 bool DataProcessManager::registerWithAllDQMSM()
00370 {
00371
00372
00373
00374 if(DQMsmList_.size() == 0) return false;
00375 bool allRegistered = true;
00376 for(unsigned int i = 0; i < DQMsmList_.size(); ++i) {
00377 if(DQMsmRegMap_[DQMsmList_[i] ] > 0) continue;
00378 int consumerid = registerWithDQMSM(DQMsmList_[i]);
00379 if(consumerid > 0) DQMsmRegMap_[DQMsmList_[i] ] = consumerid;
00380 else allRegistered = false;
00381 }
00382 return allRegistered;
00383 }
00384
00385 int DataProcessManager::registerWithSM(std::string smURL)
00386 {
00387
00388
00389 stor::ReadData data;
00390
00391 data.d_.clear();
00392 CURL* han = curl_easy_init();
00393 if(han==0)
00394 {
00395 edm::LogError("registerWithSM") << "Could not create curl handle";
00396
00397 throw cms::Exception("registerWithSM","DataProcessManager")
00398 << "Unable to create curl handle\n";
00399 }
00400
00401 std::string url2use = smURL + regpage_;
00402 setopt(han,CURLOPT_URL,url2use.c_str());
00403 setopt(han,CURLOPT_WRITEFUNCTION,func);
00404 setopt(han,CURLOPT_WRITEDATA,&data);
00405
00406
00407 const int BUFFER_SIZE = 2000;
00408 char msgBuff[BUFFER_SIZE];
00409 ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, consumerName_,
00410 consumerPriority_, consumerPSetString_);
00411
00412
00413 setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00414 setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00415 struct curl_slist *headers=NULL;
00416 headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00417 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00418 setopt(han, CURLOPT_HTTPHEADER, headers);
00419
00420
00421 CURLcode messageStatus = curl_easy_perform(han);
00422 curl_slist_free_all(headers);
00423 curl_easy_cleanup(han);
00424
00425 if(messageStatus!=0)
00426 {
00427 cerr << "curl perform failed for registration" << endl;
00428 edm::LogError("registerWithSM") << "curl perform failed for registration. "
00429 << "Could not register: probably XDAQ not running on Storage Manager"
00430 << " at " << smURL;
00431 return 0;
00432 }
00433 uint32 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00434 int consumerId = 0;
00435 if(data.d_.length() > 0)
00436 {
00437 int len = data.d_.length();
00438 FDEBUG(9) << "registerWithSM received len = " << len << std::endl;
00439 buf_.resize(len);
00440 for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00441
00442 try {
00443 ConsRegResponseView respView(&buf_[0]);
00444 registrationStatus = respView.getStatus();
00445 consumerId = respView.getConsumerId();
00446 if (eventServer_.get() != NULL) {
00447 eventServer_->setStreamSelectionTable(respView.getStreamSelectionTable());
00448 }
00449 }
00450 catch (cms::Exception excpt) {
00451 const unsigned int MAX_DUMP_LENGTH = 1000;
00452 edm::LogError("registerWithSM") << "========================================";
00453 edm::LogError("registerWithSM") << "Exception decoding the registerWithSM response!";
00454 if (data.d_.length() <= MAX_DUMP_LENGTH) {
00455 edm::LogError("registerWithSM") << "Here is the raw text that was returned:";
00456 edm::LogError("registerWithSM") << data.d_;
00457 }
00458 else {
00459 edm::LogError("registerWithSM") << "Here are the first " << MAX_DUMP_LENGTH <<
00460 " characters of the raw text that was returned:";
00461 edm::LogError("registerWithSM") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00462 }
00463 edm::LogError("registerWithSM") << "========================================";
00464 return 0;
00465 }
00466 }
00467 if(registrationStatus == ConsRegResponseBuilder::ES_NOT_READY) return 0;
00468 FDEBUG(5) << "Consumer ID = " << consumerId << endl;
00469 return consumerId;
00470 }
00471
00472 int DataProcessManager::registerWithDQMSM(std::string smURL)
00473 {
00474
00475
00476 stor::ReadData data;
00477
00478 data.d_.clear();
00479 CURL* han = curl_easy_init();
00480 if(han==0)
00481 {
00482 edm::LogError("registerWithDQMSM") << "Could not create curl handle";
00483
00484 throw cms::Exception("registerWithDQMSM","DataProcessManager")
00485 << "Unable to create curl handle\n";
00486 }
00487
00488 std::string url2use = smURL + DQMregpage_;
00489 setopt(han,CURLOPT_URL,url2use.c_str());
00490 setopt(han,CURLOPT_WRITEFUNCTION,func);
00491 setopt(han,CURLOPT_WRITEDATA,&data);
00492
00493
00494 const int BUFFER_SIZE = 2000;
00495 char msgBuff[BUFFER_SIZE];
00496 ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, DQMconsumerName_,
00497 DQMconsumerPriority_, consumerTopFolderName_);
00498
00499
00500 setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00501 setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00502 struct curl_slist *headers=NULL;
00503 headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00504 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00505 setopt(han, CURLOPT_HTTPHEADER, headers);
00506
00507
00508 CURLcode messageStatus = curl_easy_perform(han);
00509 curl_slist_free_all(headers);
00510 curl_easy_cleanup(han);
00511
00512 if(messageStatus!=0)
00513 {
00514 cerr << "curl perform failed for DQM registration" << endl;
00515 edm::LogError("registerWithDQMSM") << "curl perform failed for registration. "
00516 << "Could not register with DQM: probably XDAQ not running on Storage Manager"
00517 << " at " << smURL;
00518 return 0;
00519 }
00520 uint32 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00521 int consumerId = 0;
00522 if(data.d_.length() > 0)
00523 {
00524 int len = data.d_.length();
00525 FDEBUG(9) << "registerWithDQMSM received len = " << len << std::endl;
00526 buf_.resize(len);
00527 for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00528
00529 try {
00530 ConsRegResponseView respView(&buf_[0]);
00531 registrationStatus = respView.getStatus();
00532 consumerId = respView.getConsumerId();
00533 }
00534 catch (cms::Exception excpt) {
00535 const unsigned int MAX_DUMP_LENGTH = 1000;
00536 edm::LogError("registerWithDQMSM") << "========================================";
00537 edm::LogError("registerWithDQMSM") << "Exception decoding the registerWithSM response!";
00538 if (data.d_.length() <= MAX_DUMP_LENGTH) {
00539 edm::LogError("registerWithDQMSM") << "Here is the raw text that was returned:";
00540 edm::LogError("registerWithDQMSM") << data.d_;
00541 }
00542 else {
00543 edm::LogError("registerWithDQMSM") << "Here are the first " << MAX_DUMP_LENGTH <<
00544 " characters of the raw text that was returned:";
00545 edm::LogError("registerWithDQMSM") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00546 }
00547 edm::LogError("registerWithDQMSM") << "========================================";
00548 return 0;
00549 }
00550 }
00551 if(registrationStatus == ConsRegResponseBuilder::ES_NOT_READY) return 0;
00552 FDEBUG(5) << "Consumer ID = " << consumerId << endl;
00553 return consumerId;
00554 }
00555
00556 bool DataProcessManager::getAnyHeaderFromSM()
00557 {
00558
00559 bool gotOneHeader = false;
00560 if(smList_.size() > 0) {
00561 for(unsigned int i = 0; i < smList_.size(); ++i) {
00562 if(smRegMap_[smList_[i] ] > 0) {
00563 bool success = getHeaderFromSM(smList_[i]);
00564 if(success) {
00565 gotOneHeader = true;
00566 return gotOneHeader;
00567 }
00568 }
00569 }
00570 } else {
00571
00572 return false;
00573 }
00574 return gotOneHeader;
00575 }
00576
00577 bool DataProcessManager::getHeaderFromAllSM()
00578 {
00579
00580
00581 bool gotAllHeaders = true;
00582 if(smList_.size() > 0) {
00583 for(unsigned int i = 0; i < smList_.size(); ++i) {
00584 if(smRegMap_[smList_[i] ] > 0) {
00585 if(smHeaderMap_[smList_[i] ]) continue;
00586 bool success = getHeaderFromSM(smList_[i]);
00587 if(success) {
00588 smHeaderMap_[smList_[i] ] = true;
00589 } else {
00590 gotAllHeaders = false;
00591 }
00592 } else {
00593 gotAllHeaders = false;
00594 }
00595 }
00596 } else {
00597 return false;
00598 }
00599 return gotAllHeaders;
00600 }
00601
00602 bool DataProcessManager::getHeaderFromSM(std::string smURL)
00603 {
00604
00605 stor::ReadData data;
00606
00607 data.d_.clear();
00608 CURL* han = curl_easy_init();
00609 if(han==0)
00610 {
00611 edm::LogError("getHeaderFromSM") << "Could not create curl handle";
00612
00613 throw cms::Exception("getHeaderFromSM","DataProcessManager")
00614 << "Unable to create curl handle\n";
00615 }
00616
00617 std::string url2use = smURL + headerpage_;
00618 setopt(han,CURLOPT_URL,url2use.c_str());
00619 setopt(han,CURLOPT_WRITEFUNCTION,func);
00620 setopt(han,CURLOPT_WRITEDATA,&data);
00621
00622
00623 char msgBuff[100];
00624 OtherMessageBuilder requestMessage(&msgBuff[0], Header::HEADER_REQUEST,
00625 sizeof(char_uint32));
00626 uint8 *bodyPtr = requestMessage.msgBody();
00627 convert(smRegMap_[smURL], bodyPtr);
00628 setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00629 setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00630 struct curl_slist *headers=NULL;
00631 headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00632 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00633 setopt(han, CURLOPT_HTTPHEADER, headers);
00634
00635
00636 CURLcode messageStatus = curl_easy_perform(han);
00637 curl_slist_free_all(headers);
00638 curl_easy_cleanup(han);
00639
00640 if(messageStatus!=0)
00641 {
00642 cerr << "curl perform failed for header" << endl;
00643 edm::LogError("getHeaderFromSM") << "curl perform failed for header. "
00644 << "Could not get header from an already registered Storage Manager"
00645 << " at " << smURL;
00646 return false;
00647 }
00648 if(data.d_.length() == 0)
00649 {
00650 return false;
00651 }
00652
00653
00654 int len = data.d_.length();
00655 FDEBUG(9) << "getHeaderFromSM received registry len = " << len << std::endl;
00656
00657
00658
00659 bool addedNewInitMsg = false;
00660 try
00661 {
00662 HeaderView hdrView(&data.d_[0]);
00663 if (hdrView.code() == Header::INIT)
00664 {
00665 InitMsgView initView(&data.d_[0]);
00666 if (initMsgCollection_->addIfUnique(initView))
00667 {
00668 addedNewInitMsg = true;
00669 }
00670 }
00671 else if (hdrView.code() == Header::INIT_SET)
00672 {
00673 OtherMessageView otherView(&data.d_[0]);
00674 bodyPtr = otherView.msgBody();
00675 uint32 fullSize = otherView.bodySize();
00676 while ((unsigned int) (bodyPtr-otherView.msgBody()) < fullSize)
00677 {
00678 InitMsgView initView(bodyPtr);
00679 if (initMsgCollection_->addIfUnique(initView))
00680 {
00681 addedNewInitMsg = true;
00682 }
00683 bodyPtr += initView.size();
00684 }
00685 }
00686 else
00687 {
00688 throw cms::Exception("getHeaderFromSM", "DataProcessManager");
00689 }
00690 }
00691 catch (cms::Exception excpt)
00692 {
00693 const unsigned int MAX_DUMP_LENGTH = 1000;
00694 edm::LogError("getHeaderFromSM") << "========================================";
00695 edm::LogError("getHeaderFromSM") << "Exception decoding the getRegistryData response!";
00696 if (data.d_.length() <= MAX_DUMP_LENGTH) {
00697 edm::LogError("getHeaderFromSM") << "Here is the raw text that was returned:";
00698 edm::LogError("getHeaderFromSM") << data.d_;
00699 }
00700 else {
00701 edm::LogError("getHeaderFromSM") << "Here are the first " << MAX_DUMP_LENGTH <<
00702 " characters of the raw text that was returned:";
00703 edm::LogError("getHeaderFromSM") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00704 }
00705 edm::LogError("getHeaderFromSM") << "========================================";
00706 throw excpt;
00707 }
00708
00709
00710
00711
00712
00713
00714
00715 if (addedNewInitMsg && eventServer_.get() != NULL &&
00716 initMsgCollection_->size() > 1)
00717 {
00718 std::map< uint32, boost::shared_ptr<ConsumerPipe> > consumerTable =
00719 eventServer_->getConsumerTable();
00720 std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator
00721 consumerIter;
00722 for (consumerIter = consumerTable.begin();
00723 consumerIter != consumerTable.end();
00724 consumerIter++)
00725 {
00726 boost::shared_ptr<ConsumerPipe> consPtr = consumerIter->second;
00727
00728 if (consPtr->getHLTOutputSelection().empty())
00729 {
00730
00731
00732 std::string errorString;
00733 errorString.append("ERROR: The configuration for this ");
00734 errorString.append("consumer does not specify an HLT output ");
00735 errorString.append("module.\nPlease specify one of the HLT ");
00736 errorString.append("output modules listed below as the ");
00737 errorString.append("SelectHLTOutput parameter ");
00738 errorString.append("in the InputSource configuration.\n");
00739 errorString.append(initMsgCollection_->getSelectionHelpString());
00740 errorString.append("\n");
00741 consPtr->setRegistryWarning(errorString);
00742 }
00743 }
00744 }
00745
00746 return true;
00747 }
00748
00749 void DataProcessManager::waitBetweenRegTrys()
00750 {
00751
00752 sleep(headerRetryInterval_);
00753 return;
00754 }
00755
00756 bool DataProcessManager::haveRegWithEventServer()
00757 {
00758
00759 if(smList_.size() > 0) {
00760 for(unsigned int i = 0; i < smList_.size(); ++i) {
00761 if(smRegMap_[smList_[i] ] > 0) return true;
00762 }
00763 }
00764 return false;
00765 }
00766
00767 bool DataProcessManager::haveRegWithDQMServer()
00768 {
00769
00770 if(DQMsmList_.size() > 0) {
00771 for(unsigned int i = 0; i < DQMsmList_.size(); ++i) {
00772 if(DQMsmRegMap_[DQMsmList_[i] ] > 0) return true;
00773 }
00774 }
00775 return false;
00776 }
00777
00778 bool DataProcessManager::haveHeader()
00779 {
00780 if(initMsgCollection_->size() > 0) return true;
00781 return false;
00782 }
00783
00784 void DataProcessManager::getEventFromAllSM()
00785 {
00786
00787
00788 if(smList_.size() > 0 && haveHeader()) {
00789 double time2wait = 0.0;
00790 double sleepTime = 300.0;
00791 bool gotOneEvent = false;
00792 bool gotOne = false;
00793 for(unsigned int i = 0; i < smList_.size(); ++i) {
00794 if(smRegMap_[smList_[i] ] > 0) {
00795 gotOne = getOneEventFromSM(smList_[i], time2wait);
00796 if(gotOne) {
00797 gotOneEvent = true;
00798 } else {
00799 if(time2wait < sleepTime && time2wait >= 0.0) sleepTime = time2wait;
00800 }
00801 }
00802 }
00803
00804
00805 if(!gotOneEvent) {
00806 if(sleepTime > 0.0) usleep(static_cast<int>(1000000 * sleepTime));
00807 }
00808 }
00809 }
00810
00811 double DataProcessManager::getTime2Wait(std::string smURL)
00812 {
00813
00814 struct timeval now;
00815 struct timezone dummyTZ;
00816 gettimeofday(&now, &dummyTZ);
00817 double timeDiff = (double) now.tv_sec;
00818 timeDiff -= (double) lastReqMap_[smURL].tv_sec;
00819 timeDiff += ((double) now.tv_usec / 1000000.0);
00820 timeDiff -= ((double) lastReqMap_[smURL].tv_usec / 1000000.0);
00821 if (timeDiff < minEventRequestInterval_)
00822 {
00823 return (minEventRequestInterval_ - timeDiff);
00824 }
00825 else
00826 {
00827 return 0.0;
00828 }
00829 }
00830
00831 void DataProcessManager::setTime2Now(std::string smURL)
00832 {
00833 struct timeval now;
00834 struct timezone dummyTZ;
00835 gettimeofday(&now, &dummyTZ);
00836 lastReqMap_[smURL] = now;
00837 }
00838
00839 bool DataProcessManager::getOneEventFromSM(std::string smURL, double& time2wait)
00840 {
00841
00842
00843
00844
00845
00846
00847 time2wait = getTime2Wait(smURL);
00848 if(time2wait > 0.0) {
00849 return false;
00850 } else {
00851 setTime2Now(smURL);
00852 }
00853
00854
00855 stor::ReadData data;
00856
00857
00858 eventFetchTimer_.stop();
00859 eventFetchTimer_.reset();
00860 eventFetchTimer_.start();
00861
00862 data.d_.clear();
00863 CURL* han = curl_easy_init();
00864 if(han==0)
00865 {
00866 edm::LogError("getOneEventFromSM") << "Could not create curl handle";
00867
00868 throw cms::Exception("getOneEventFromSM","DataProcessManager")
00869 << "Unable to create curl handle\n";
00870 }
00871
00872 std::string url2use = smURL + eventpage_;
00873 setopt(han,CURLOPT_URL,url2use.c_str());
00874 setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00875 setopt(han,CURLOPT_WRITEDATA,&data);
00876
00877
00878
00879
00880
00881 char msgBuff[100];
00882 OtherMessageBuilder requestMessage(&msgBuff[0], Header::EVENT_REQUEST,
00883 2 * sizeof(char_uint32));
00884 uint8 *bodyPtr = requestMessage.msgBody();
00885 convert(smRegMap_[smURL], bodyPtr);
00886 bodyPtr += sizeof(char_uint32);
00887 convert((uint32) initMsgCollection_->size(), bodyPtr);
00888 setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00889 setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00890 struct curl_slist *headers=NULL;
00891 headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00892 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00893 stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00894
00895
00896 CURLcode messageStatus = curl_easy_perform(han);
00897 curl_slist_free_all(headers);
00898 curl_easy_cleanup(han);
00899
00900 if(messageStatus!=0)
00901 {
00902 cerr << "curl perform failed for event" << endl;
00903 edm::LogError("getOneEventFromSM") << "curl perform failed for event. "
00904 << "Could not get event from an already registered Storage Manager"
00905 << " at " << smURL;
00906
00907
00908 eventFetchTimer_.stop();
00909 ltEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00910 stEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00911
00912 return false;
00913 }
00914
00915
00916 int len = data.d_.length();
00917 FDEBUG(9) << "getOneEventFromSM received len = " << len << std::endl;
00918 if(data.d_.length() == 0)
00919 {
00920
00921 eventFetchTimer_.stop();
00922 ltEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00923 stEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00924
00925 return false;
00926 }
00927
00928 buf_.resize(len);
00929 for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00930
00931
00932 eventFetchTimer_.stop();
00933 ltEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00934 stEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00935
00936
00937 OtherMessageView msgView(&buf_[0]);
00938
00939 if (msgView.code() == Header::DONE) {
00940
00941 std::cout << " SM " << smURL << " has halted" << std::endl;
00942 return false;
00943 } else if (msgView.code() == Header::NEW_INIT_AVAILABLE) {
00944 std::cout << "Received NEW_INIT_AVAILABLE message" << std::endl;
00945 headerRefetchRequested_ = true;
00946 return false;
00947 } else {
00948
00949
00950
00951 try {
00952 HeaderView hdrView(&buf_[0]);
00953 if (hdrView.code() != Header::EVENT) {
00954 throw cms::Exception("getOneEventFromSM", "DataProcessManager");
00955 }
00956 EventMsgView eventView(&buf_[0]);
00957 ++receivedEvents_;
00958 addMeasurement((unsigned long)data.d_.length());
00959 if(eventServer_.get() != NULL) {
00960 eventServer_->processEvent(eventView);
00961 return true;
00962 }
00963 }
00964 catch (cms::Exception excpt) {
00965 const unsigned int MAX_DUMP_LENGTH = 1000;
00966 edm::LogError("getOneEventFromSM") << "========================================";
00967 edm::LogError("getOneEventFromSM") << "Exception decoding the getEventData response!";
00968 if (data.d_.length() <= MAX_DUMP_LENGTH) {
00969 edm::LogError("getOneEventFromSM") << "Here is the raw text that was returned:";
00970 edm::LogError("getOneEventFromSM") << data.d_;
00971 }
00972 else {
00973 edm::LogError("getOneEventFromSM") << "Here are the first " << MAX_DUMP_LENGTH <<
00974 " characters of the raw text that was returned:";
00975 edm::LogError("getOneEventFromSM") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00976 }
00977 edm::LogError("getOneEventFromSM") << "========================================";
00978 throw excpt;
00979 }
00980 }
00981 return false;
00982 }
00983
00984 void DataProcessManager::getDQMEventFromAllSM()
00985 {
00986
00987
00988 if(smList_.size() > 0) {
00989 double time2wait = 0.0;
00990 double sleepTime = 300.0;
00991 bool gotOneEvent = false;
00992 bool gotOne = false;
00993 for(unsigned int i = 0; i < smList_.size(); ++i) {
00994 if(DQMsmRegMap_[smList_[i] ] > 0) {
00995 gotOne = getOneDQMEventFromSM(smList_[i], time2wait);
00996 if(gotOne) {
00997 gotOneEvent = true;
00998 } else {
00999 if(time2wait < sleepTime && time2wait >= 0.0) sleepTime = time2wait;
01000 }
01001 }
01002 }
01003
01004
01005
01006
01007
01008 if(!gotOneEvent) {
01009
01010 }
01011 }
01012 }
01013
01014 double DataProcessManager::getDQMTime2Wait(std::string smURL)
01015 {
01016
01017 struct timeval now;
01018 struct timezone dummyTZ;
01019 gettimeofday(&now, &dummyTZ);
01020 double timeDiff = (double) now.tv_sec;
01021 timeDiff -= (double) lastDQMReqMap_[smURL].tv_sec;
01022 timeDiff += ((double) now.tv_usec / 1000000.0);
01023 timeDiff -= ((double) lastDQMReqMap_[smURL].tv_usec / 1000000.0);
01024 if (timeDiff < minDQMEventRequestInterval_)
01025 {
01026 return (minDQMEventRequestInterval_ - timeDiff);
01027 }
01028 else
01029 {
01030 return 0.0;
01031 }
01032 }
01033
01034 void DataProcessManager::setDQMTime2Now(std::string smURL)
01035 {
01036 struct timeval now;
01037 struct timezone dummyTZ;
01038 gettimeofday(&now, &dummyTZ);
01039 lastDQMReqMap_[smURL] = now;
01040 }
01041
01042 bool DataProcessManager::getOneDQMEventFromSM(std::string smURL, double& time2wait)
01043 {
01044
01045
01046
01047
01048
01049
01050 time2wait = getDQMTime2Wait(smURL);
01051 if(time2wait > 0.0) {
01052 return false;
01053 } else {
01054 setDQMTime2Now(smURL);
01055 }
01056
01057
01058 stor::ReadData data;
01059
01060
01061 dqmFetchTimer_.stop();
01062 dqmFetchTimer_.reset();
01063 dqmFetchTimer_.start();
01064
01065 data.d_.clear();
01066 CURL* han = curl_easy_init();
01067 if(han==0)
01068 {
01069 edm::LogError("getOneDQMEventFromSM") << "Could not create curl handle";
01070
01071 throw cms::Exception("getOneDQMEventFromSM","DataProcessManager")
01072 << "Unable to create curl handle\n";
01073 }
01074
01075 std::string url2use = smURL + DQMeventpage_;
01076 setopt(han,CURLOPT_URL,url2use.c_str());
01077 setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
01078 setopt(han,CURLOPT_WRITEDATA,&data);
01079
01080
01081 char msgBuff[100];
01082 OtherMessageBuilder requestMessage(&msgBuff[0], Header::DQMEVENT_REQUEST,
01083 sizeof(char_uint32));
01084 uint8 *bodyPtr = requestMessage.msgBody();
01085 convert(DQMsmRegMap_[smURL], bodyPtr);
01086 setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
01087 setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
01088 struct curl_slist *headers=NULL;
01089 headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
01090 headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
01091 stor::setopt(han, CURLOPT_HTTPHEADER, headers);
01092
01093
01094 CURLcode messageStatus = curl_easy_perform(han);
01095 curl_slist_free_all(headers);
01096 curl_easy_cleanup(han);
01097
01098 if(messageStatus!=0)
01099 {
01100 cerr << "curl perform failed for DQM event" << endl;
01101 edm::LogError("getOneDQMEventFromSM") << "curl perform failed for DQM event. "
01102 << "Could not get DQMevent from an already registered Storage Manager"
01103 << " at " << smURL;
01104
01105
01106 dqmFetchTimer_.stop();
01107 ltDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01108 stDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01109
01110 return false;
01111 }
01112
01113
01114 int len = data.d_.length();
01115 FDEBUG(9) << "getOneDQMEventFromSM received len = " << len << std::endl;
01116 if(data.d_.length() == 0)
01117 {
01118
01119 dqmFetchTimer_.stop();
01120 ltDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01121 stDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01122
01123 return false;
01124 }
01125
01126 buf_.resize(len);
01127 for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
01128
01129
01130 dqmFetchTimer_.stop();
01131 ltDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01132 stDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01133
01134
01135 OtherMessageView msgView(&buf_[0]);
01136
01137 if (msgView.code() == Header::DONE) {
01138
01139 std::cout << " SM " << smURL << " has halted" << std::endl;
01140 return false;
01141 } else {
01142 DQMEventMsgView dqmEventView(&buf_[0]);
01143 ++receivedDQMEvents_;
01144 addMeasurement((unsigned long)data.d_.length());
01145 if(dqmServiceManager_.get() != NULL) {
01146 dqmServiceManager_->manageDQMEventMsg(dqmEventView);
01147 return true;
01148 }
01149 }
01150 return false;
01151 }
01152
01154 void DataProcessManager::addMeasurement(unsigned long size)
01155 {
01156
01157 if(pmeter_->addSample(size))
01158 {
01159 stats_ = pmeter_->getStats();
01160 }
01161 }
01162
01163 double DataProcessManager::getSampleCount(STATS_TIME_FRAME timeFrame,
01164 STATS_TIMING_TYPE timingType,
01165 double currentTime)
01166 {
01167 if (timeFrame == SHORT_TERM) {
01168 if (timingType == DQMEVENT_FETCH) {
01169 return stDQMFetchTimeCounter_->getSampleCount(currentTime);
01170 }
01171 else {
01172 return stEventFetchTimeCounter_->getSampleCount(currentTime);
01173 }
01174 }
01175 else {
01176 if (timingType == DQMEVENT_FETCH) {
01177 return ltDQMFetchTimeCounter_->getSampleCount();
01178 }
01179 else {
01180 return ltEventFetchTimeCounter_->getSampleCount();
01181 }
01182 }
01183 }
01184
01185 double DataProcessManager::getAverageValue(STATS_TIME_FRAME timeFrame,
01186 STATS_TIMING_TYPE timingType,
01187 double currentTime)
01188 {
01189 if (timeFrame == SHORT_TERM) {
01190 if (timingType == DQMEVENT_FETCH) {
01191 return stDQMFetchTimeCounter_->getValueAverage(currentTime);
01192 }
01193 else {
01194 return stEventFetchTimeCounter_->getValueAverage(currentTime);
01195 }
01196 }
01197 else {
01198 if (timingType == DQMEVENT_FETCH) {
01199 return ltDQMFetchTimeCounter_->getValueAverage();
01200 }
01201 else {
01202 return ltEventFetchTimeCounter_->getValueAverage();
01203 }
01204 }
01205 }
01206
01207 double DataProcessManager::getDuration(STATS_TIME_FRAME timeFrame,
01208 STATS_TIMING_TYPE timingType,
01209 double currentTime)
01210 {
01211 if (timeFrame == SHORT_TERM) {
01212 if (timingType == DQMEVENT_FETCH) {
01213 return stDQMFetchTimeCounter_->getDuration(currentTime);
01214 }
01215 else {
01216 return stEventFetchTimeCounter_->getDuration(currentTime);
01217 }
01218 }
01219 else {
01220 if (timingType == DQMEVENT_FETCH) {
01221 return ltDQMFetchTimeCounter_->getDuration(currentTime);
01222 }
01223 else {
01224 return ltEventFetchTimeCounter_->getDuration(currentTime);
01225 }
01226 }
01227 }
01228 }