CMS 3D CMS Logo

DataProcessManager.cc

Go to the documentation of this file.
00001 // $Id: DataProcessManager.cc,v 1.15 2008/10/14 15:02:18 hcheung Exp $
00002 
00003 #include "EventFilter/SMProxyServer/interface/DataProcessManager.h"
00004 #include "EventFilter/StorageManager/interface/SMCurlInterface.h"
00005 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00006 #include "FWCore/Utilities/interface/DebugMacros.h"
00007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00008 #include "IOPool/Streamer/interface/BufferArea.h"
00009 #include "IOPool/Streamer/interface/OtherMessage.h"
00010 #include "IOPool/Streamer/interface/ConsRegMessage.h"
00011 
00012 #include "boost/bind.hpp"
00013 
00014 #include "curl/curl.h"
00015 #include <wait.h>
00016 
00017 using namespace std;
00018 using namespace edm;
00019 
00020 using boost::thread;
00021 using boost::bind;
00022 
00023 namespace 
00024 {
00025   const int voidptr_size = sizeof(void*);
00026 }
00027 
00028 namespace stor
00029 {
00030 
00031   DataProcessManager::DataProcessManager():
00032     cmd_q_(edm::getEventBuffer(voidptr_size,50)),
00033     alreadyRegistered_(false),
00034     alreadyRegisteredDQM_(false),
00035     headerRefetchRequested_(false),
00036     buf_(2000),
00037     headerRetryInterval_(5),
00038     dqmServiceManager_(new stor::DQMServiceManager()),
00039     receivedEvents_(0),
00040     receivedDQMEvents_(0),
00041     samples_(100),
00042     period4samples_(5)
00043   {
00044     // for performance measurements
00045     pmeter_ = new stor::SMPerformanceMeter();
00046     init();
00047   } 
00048 
00049   DataProcessManager::~DataProcessManager()
00050   {
00051     delete pmeter_;
00052   }
00053 
00054   void DataProcessManager::init()
00055   {
00056     regpage_ =  "/registerConsumer";
00057     DQMregpage_ = "/registerDQMConsumer";
00058     eventpage_ = "/geteventdata";
00059     DQMeventpage_ = "/getDQMeventdata";
00060     headerpage_ = "/getregdata";
00061     consumerName_ = stor::PROXY_SERVER_NAME;
00062     //consumerPriority_ = "PushMode"; // this means push mode!
00063     consumerPriority_ = "Normal";
00064     DQMconsumerName_ = stor::PROXY_SERVER_NAME;
00065     //DQMconsumerPriority_ =  "PushMode"; // this means push mode!
00066     DQMconsumerPriority_ =  "Normal";
00067 
00068     this->setMaxEventRequestRate(10.0); // just a default until set in config action
00069     consumerId_ = (time(0) & 0xffffff);  // temporary - will get from ES later
00070 
00071     this->setMaxDQMEventRequestRate(0.2); // set in config later
00072     DQMconsumerId_ = (time(0) & 0xffffff);  // temporary - will get from ES later
00073 
00074     alreadyRegistered_ = false;
00075     alreadyRegisteredDQM_ = false;
00076     headerRefetchRequested_ = false;
00077 
00078     smList_.clear();
00079     smRegMap_.clear();
00080     smHeaderMap_.clear();
00081     DQMsmList_.clear();
00082     DQMsmRegMap_.clear();
00083 
00084     // TODO fixme: only request folders that connected consumers want?
00085     consumerTopFolderName_ = "*";
00086     //consumerTopFolderName_ = "C1";
00087     receivedEvents_ = 0;
00088     receivedDQMEvents_ = 0;
00089     pmeter_->init(samples_, period4samples_);
00090     stats_.fullReset();
00091 
00092     // initialize the counters that we use for statistics
00093     ltEventFetchTimeCounter_.reset(new ForeverCounter());
00094     stEventFetchTimeCounter_.reset(new RollingIntervalCounter(180,5,20));
00095     ltDQMFetchTimeCounter_.reset(new ForeverCounter());
00096     stDQMFetchTimeCounter_.reset(new RollingIntervalCounter(180,5,20));
00097   }
00098 
00099   void DataProcessManager::setSamples(unsigned long num_samples)
00100   { 
00101     samples_ = num_samples;
00102     pmeter_->setSamples(num_samples); 
00103   }
00104 
00105   void DataProcessManager::setPeriod4Samples(unsigned long period4samples)
00106   { 
00107     period4samples_ = period4samples;
00108     pmeter_->setPeriod4Samples(period4samples);
00109   }
00110 
00111 
00112   void DataProcessManager::setMaxEventRequestRate(double rate)
00113   {
00114     if(rate <= 0.0) return; // TODO make sure config is checked!
00115     maxEventRequestRate_ = rate;
00116     updateMinEventRequestInterval();
00117   }
00118 
00119   void DataProcessManager::setMaxDQMEventRequestRate(double rate)
00120   {
00121     const double MAX_REQUEST_INTERVAL = 300.0;  // seconds
00122     if(rate <= 0.0) return; // TODO make sure config is checked!
00123     if (rate < (1.0 / MAX_REQUEST_INTERVAL)) {
00124       minDQMEventRequestInterval_ = MAX_REQUEST_INTERVAL;
00125     }
00126     else {
00127       minDQMEventRequestInterval_ = 1.0 / rate;  // seconds
00128     }
00129   }
00130 
00131   void DataProcessManager::updateMinEventRequestInterval()
00132   {
00133     const double MAX_REQUEST_INTERVAL = 300.0;  // seconds
00134     double rate = maxEventRequestRate_;
00135 
00136     if (rate < (1.0 / MAX_REQUEST_INTERVAL)) {
00137       minEventRequestInterval_ = MAX_REQUEST_INTERVAL;
00138     }
00139     else {
00140       // base the interval on the number of storage managers
00141       // (so that the requested rate doesn't increase proportionally
00142       // with number of SMs)
00143       // We could simply use smCount/rate, but let's be a bit more
00144       // generous than that so that if one of the SMs isn't queueing
00145       // events for us, we still get enough rate
00146       if (smList_.size() <= 1) {
00147         minEventRequestInterval_ = 1.0 / rate;  // seconds
00148       }
00149       else {
00150         minEventRequestInterval_ = (smList_.size() - 1) / rate;
00151       }
00152     }
00153 
00154     // 16-Apr-2008, KAB: set maxEventRequestRate in the parameterSet that
00155     // we send to the storage manager so that the SM knows how many events
00156     // to queue for the SMPS.
00157     edm::ParameterSet ps = ParameterSet();
00158     Entry maxRateEntry("maxEventRequestRate",
00159                        (1.0 / minEventRequestInterval_),
00160                        false);
00161     ps.insert(true, "maxEventRequestRate", maxRateEntry);
00162     // TODO fixme: only request event types that are requested by connected consumers?
00163     consumerPSetString_ = ps.toString();
00164   }
00165 
00166   void DataProcessManager::run(DataProcessManager* t)
00167   {
00168     t->processCommands();
00169   }
00170 
00171   void DataProcessManager::start()
00172   {
00173     // called from a different thread to start things going
00174 
00175     me_.reset(new boost::thread(boost::bind(DataProcessManager::run,this)));
00176   }
00177 
00178   void DataProcessManager::stop()
00179   {
00180     // called from a different thread - trigger completion to the
00181     // data process manager loop
00182 
00183     edm::EventBuffer::ProducerBuffer cb(*cmd_q_);
00184     MsgCode mc(cb.buffer(),MsgCode::DONE);
00185     mc.setCode(MsgCode::DONE);
00186     cb.commit(mc.codeSize());
00187   }
00188 
00189   void DataProcessManager::join()
00190   {
00191     // invoked from a different thread - block until "me_" is done
00192     if(me_) me_->join();
00193   }
00194 
00195   void DataProcessManager::processCommands()
00196   {
00197     // called with this data process manager's own thread.
00198     // first register with the SM for each subfarm
00199     bool doneWithRegistration = false;
00200     // TODO fixme: improve method of hardcored fixed retries
00201     unsigned int count = 0; // keep of count of tries and quit after 255
00202     unsigned int maxcount = 255;
00203     bool doneWithDQMRegistration = false;
00204     unsigned int countDQM = 0; // keep of count of tries and quit after 255
00205     bool alreadysaid = false;
00206     bool alreadysaidDQM = false;
00207 
00208     //bool gotOneHeader = false;
00209     bool gotOneHeaderFromAll = false;
00210     unsigned int countINIT = 0; // keep of count of tries and quit after 255
00211     bool alreadysaidINIT = false;
00212 
00213     bool DoneWithJob = false;
00214     while(!DoneWithJob)
00215     {
00216       // work loop
00217       // if a header re-fetch has been requested, reset the header vars
00218       if (headerRefetchRequested_) {
00219         headerRefetchRequested_ = false;
00220         //gotOneHeader = false;
00221         gotOneHeaderFromAll = false;
00222         smHeaderMap_.clear();
00223         countINIT = 0;
00224       }
00225       // register as event consumer to all SM senders
00226       if(!alreadyRegistered_) {
00227         if(!doneWithRegistration)
00228         {
00229           waitBetweenRegTrys();
00230           bool success = registerWithAllSM();
00231           if(success) doneWithRegistration = true;
00232           ++count;
00233         }
00234         // TODO fixme: decide what to do after max tries
00235         if(count >= maxcount) edm::LogInfo("processCommands") << "Could not register with all SM Servers"
00236            << " after " << maxcount << " tries";
00237         if(doneWithRegistration && !alreadysaid) {
00238           edm::LogInfo("processCommands") << "Registered with all SM Event Servers";
00239           alreadysaid = true;
00240         }
00241         if(doneWithRegistration) alreadyRegistered_ = true;
00242       }
00243       // now register as DQM consumers
00244       if(!alreadyRegisteredDQM_) {
00245         if(!doneWithDQMRegistration)
00246         {
00247           waitBetweenRegTrys();
00248           bool success = registerWithAllDQMSM();
00249           if(success) doneWithDQMRegistration = true;
00250           ++countDQM;
00251         }
00252         // TODO fixme: decide what to do after max tries
00253         if(count >= maxcount) edm::LogInfo("processCommands") << "Could not register with all SM DQMEvent Servers"
00254           << " after " << maxcount << " tries";
00255         if(doneWithDQMRegistration && !alreadysaidDQM) {
00256           edm::LogInfo("processCommands") << "Registered with all SM DQMEvent Servers";
00257           alreadysaidDQM = true;
00258         }
00259         if(doneWithDQMRegistration) alreadyRegisteredDQM_ = true;
00260       }
00261       // now get one INIT header (product registry) and save it
00262       // as long as at least one SMsender registered with
00263       // TODO fixme: use the data member for got header to go across runs
00264       // With multiple SMs, we need to get a Header from each else that consumer
00265       // is counted as not initialized
00266       // TODO how to we get all INIT messages from each SM (and know it!)
00267       //if(!gotOneHeader)
00268       if(!gotOneHeaderFromAll)
00269       {
00270         waitBetweenRegTrys();
00271         //bool success = getAnyHeaderFromSM();
00272         bool success = getHeaderFromAllSM();
00273         //if(success) gotOneHeader = true;
00274         if(success) gotOneHeaderFromAll = true;
00275         ++countINIT;
00276       }
00277       if(countINIT >= maxcount) edm::LogInfo("processCommands") << "Could not get product registry!"
00278           << " after " << maxcount << " tries";
00279       //if(gotOneHeader && !alreadysaidINIT) {
00280       if(gotOneHeaderFromAll && !alreadysaidINIT) {
00281         edm::LogInfo("processCommands") << "Got the product registry";
00282         alreadysaidINIT = true;
00283       }
00284       //if(alreadyRegistered_ && gotOneHeader && haveHeader()) {
00285       if(alreadyRegistered_ && gotOneHeaderFromAll && haveHeader()) {
00286         getEventFromAllSM();
00287       }
00288       if(alreadyRegisteredDQM_) {
00289         getDQMEventFromAllSM();
00290       }
00291 
00292       // check for any commands - empty() does not block
00293       if(!cmd_q_->empty())
00294       {
00295         // the next line blocks until there is an entry in cmd_q
00296         edm::EventBuffer::ConsumerBuffer cb(*cmd_q_);
00297         MsgCode mc(cb.buffer(),cb.size());
00298 
00299         if(mc.getCode()==MsgCode::DONE) DoneWithJob = true;
00300         // right now we will ignore all messages other than DONE
00301       }
00302 
00303     } // done with process loop   
00304     edm::LogInfo("processCommands") << "Received done - stopping";
00305     if(dqmServiceManager_.get() != NULL) dqmServiceManager_->stop();
00306   }
00307 
00308   void DataProcessManager::addSM2Register(std::string smURL)
00309   {
00310     // This smURL is the URN of the StorageManager without the page extension
00311     // Check if already in the list
00312     bool alreadyInList = false;
00313     if(smList_.size() > 0) {
00314        for(unsigned int i = 0; i < smList_.size(); ++i) {
00315          if(smURL.compare(smList_[i]) == 0) {
00316             alreadyInList = true;
00317             break;
00318          }
00319        }
00320     }
00321     if(alreadyInList) return;
00322     smList_.push_back(smURL);
00323     smRegMap_.insert(std::make_pair(smURL,0));
00324     smHeaderMap_.insert(std::make_pair(smURL,false));
00325     struct timeval lastRequestTime;
00326     lastRequestTime.tv_sec = 0;
00327     lastRequestTime.tv_usec = 0;
00328     lastReqMap_.insert(std::make_pair(smURL,lastRequestTime));
00329     updateMinEventRequestInterval();
00330   }
00331 
00332   void DataProcessManager::addDQMSM2Register(std::string DQMsmURL)
00333   {
00334     // Check if already in the list
00335     bool alreadyInList = false;
00336     if(DQMsmList_.size() > 0) {
00337        for(unsigned int i = 0; i < DQMsmList_.size(); ++i) {
00338          if(DQMsmURL.compare(DQMsmList_[i]) == 0) {
00339             alreadyInList = true;
00340             break;
00341          }
00342        }
00343     }
00344     if(alreadyInList) return;
00345     DQMsmList_.push_back(DQMsmURL);
00346     DQMsmRegMap_.insert(std::make_pair(DQMsmURL,0));
00347     struct timeval lastRequestTime;
00348     lastRequestTime.tv_sec = 0;
00349     lastRequestTime.tv_usec = 0;
00350     lastDQMReqMap_.insert(std::make_pair(DQMsmURL,lastRequestTime));
00351   }
00352 
00353   bool DataProcessManager::registerWithAllSM()
00354   {
00355     // One try at registering with the SM on each subfarm
00356     // return true if registered with all SM 
00357     // Only make one attempt and return so we can make this thread stop
00358     if(smList_.size() == 0) return false;
00359     bool allRegistered = true;
00360     for(unsigned int i = 0; i < smList_.size(); ++i) {
00361       if(smRegMap_[smList_[i] ] > 0) continue; // already registered
00362       int consumerid = registerWithSM(smList_[i]);
00363       if(consumerid > 0) smRegMap_[smList_[i] ] = consumerid;
00364       else allRegistered = false;
00365     }
00366     return allRegistered;
00367   }
00368 
00369   bool DataProcessManager::registerWithAllDQMSM()
00370   {
00371     // One try at registering with the SM on each subfarm
00372     // return true if registered with all SM 
00373     // Only make one attempt and return so we can make this thread stop
00374     if(DQMsmList_.size() == 0) return false;
00375     bool allRegistered = true;
00376     for(unsigned int i = 0; i < DQMsmList_.size(); ++i) {
00377       if(DQMsmRegMap_[DQMsmList_[i] ] > 0) continue; // already registered
00378       int consumerid = registerWithDQMSM(DQMsmList_[i]);
00379       if(consumerid > 0) DQMsmRegMap_[DQMsmList_[i] ] = consumerid;
00380       else allRegistered = false;
00381     }
00382     return allRegistered;
00383   }
00384 
00385   int DataProcessManager::registerWithSM(std::string smURL)
00386   {
00387     // Use this same registration method for both event data and DQM data
00388     // return with consumerID or 0 for failure
00389     stor::ReadData data;
00390 
00391     data.d_.clear();
00392     CURL* han = curl_easy_init();
00393     if(han==0)
00394     {
00395       edm::LogError("registerWithSM") << "Could not create curl handle";
00396       // this is a fatal error isn't it? Are we catching this? TODO
00397       throw cms::Exception("registerWithSM","DataProcessManager")
00398           << "Unable to create curl handle\n";
00399     }
00400     // set the standard https request options
00401     std::string url2use = smURL + regpage_;
00402     setopt(han,CURLOPT_URL,url2use.c_str());
00403     setopt(han,CURLOPT_WRITEFUNCTION,func);
00404     setopt(han,CURLOPT_WRITEDATA,&data);
00405 
00406     // build the registration request message to send to the storage manager
00407     const int BUFFER_SIZE = 2000;
00408     char msgBuff[BUFFER_SIZE];
00409     ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, consumerName_,
00410                                          consumerPriority_, consumerPSetString_);
00411 
00412     // add the request message as a https post
00413     setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00414     setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00415     struct curl_slist *headers=NULL;
00416     headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00417     headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00418     setopt(han, CURLOPT_HTTPHEADER, headers);
00419 
00420     // send the HTTP POST, read the reply, and cleanup before going on
00421     CURLcode messageStatus = curl_easy_perform(han);
00422     curl_slist_free_all(headers);
00423     curl_easy_cleanup(han);
00424 
00425     if(messageStatus!=0)
00426     {
00427       cerr << "curl perform failed for registration" << endl;
00428       edm::LogError("registerWithSM") << "curl perform failed for registration. "
00429         << "Could not register: probably XDAQ not running on Storage Manager"
00430         << " at " << smURL;
00431       return 0;
00432     }
00433     uint32 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00434     int consumerId = 0;
00435     if(data.d_.length() > 0)
00436     {
00437       int len = data.d_.length();
00438       FDEBUG(9) << "registerWithSM received len = " << len << std::endl;
00439       buf_.resize(len);
00440       for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00441 
00442       try {
00443         ConsRegResponseView respView(&buf_[0]);
00444         registrationStatus = respView.getStatus();
00445         consumerId = respView.getConsumerId();
00446         if (eventServer_.get() != NULL) {
00447           eventServer_->setStreamSelectionTable(respView.getStreamSelectionTable());
00448         }
00449       }
00450       catch (cms::Exception excpt) {
00451         const unsigned int MAX_DUMP_LENGTH = 1000;
00452         edm::LogError("registerWithSM") << "========================================";
00453         edm::LogError("registerWithSM") << "Exception decoding the registerWithSM response!";
00454         if (data.d_.length() <= MAX_DUMP_LENGTH) {
00455           edm::LogError("registerWithSM") << "Here is the raw text that was returned:";
00456           edm::LogError("registerWithSM") << data.d_;
00457         }
00458         else {
00459           edm::LogError("registerWithSM") << "Here are the first " << MAX_DUMP_LENGTH <<
00460             " characters of the raw text that was returned:";
00461           edm::LogError("registerWithSM") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00462         }
00463         edm::LogError("registerWithSM") << "========================================";
00464         return 0;
00465       }
00466     }
00467     if(registrationStatus == ConsRegResponseBuilder::ES_NOT_READY) return 0;
00468     FDEBUG(5) << "Consumer ID = " << consumerId << endl;
00469     return consumerId;
00470   }
00471 
00472   int DataProcessManager::registerWithDQMSM(std::string smURL)
00473   {
00474     // Use this same registration method for both event data and DQM data
00475     // return with consumerID or 0 for failure
00476     stor::ReadData data;
00477 
00478     data.d_.clear();
00479     CURL* han = curl_easy_init();
00480     if(han==0)
00481     {
00482       edm::LogError("registerWithDQMSM") << "Could not create curl handle";
00483       // this is a fatal error isn't it? Are we catching this? TODO
00484       throw cms::Exception("registerWithDQMSM","DataProcessManager")
00485           << "Unable to create curl handle\n";
00486     }
00487     // set the standard https request options
00488     std::string url2use = smURL + DQMregpage_;
00489     setopt(han,CURLOPT_URL,url2use.c_str());
00490     setopt(han,CURLOPT_WRITEFUNCTION,func);
00491     setopt(han,CURLOPT_WRITEDATA,&data);
00492 
00493     // build the registration request message to send to the storage manager
00494     const int BUFFER_SIZE = 2000;
00495     char msgBuff[BUFFER_SIZE];
00496     ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, DQMconsumerName_,
00497                                          DQMconsumerPriority_, consumerTopFolderName_);
00498 
00499     // add the request message as a https post
00500     setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00501     setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00502     struct curl_slist *headers=NULL;
00503     headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00504     headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00505     setopt(han, CURLOPT_HTTPHEADER, headers);
00506 
00507     // send the HTTP POST, read the reply, and cleanup before going on
00508     CURLcode messageStatus = curl_easy_perform(han);
00509     curl_slist_free_all(headers);
00510     curl_easy_cleanup(han);
00511 
00512     if(messageStatus!=0)
00513     {
00514       cerr << "curl perform failed for DQM registration" << endl;
00515       edm::LogError("registerWithDQMSM") << "curl perform failed for registration. "
00516         << "Could not register with DQM: probably XDAQ not running on Storage Manager"
00517         << " at " << smURL;
00518       return 0;
00519     }
00520     uint32 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00521     int consumerId = 0;
00522     if(data.d_.length() > 0)
00523     {
00524       int len = data.d_.length();
00525       FDEBUG(9) << "registerWithDQMSM received len = " << len << std::endl;
00526       buf_.resize(len);
00527       for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00528 
00529       try {
00530         ConsRegResponseView respView(&buf_[0]);
00531         registrationStatus = respView.getStatus();
00532         consumerId = respView.getConsumerId();
00533       }
00534       catch (cms::Exception excpt) {
00535         const unsigned int MAX_DUMP_LENGTH = 1000;
00536         edm::LogError("registerWithDQMSM") << "========================================";
00537         edm::LogError("registerWithDQMSM") << "Exception decoding the registerWithSM response!";
00538         if (data.d_.length() <= MAX_DUMP_LENGTH) {
00539           edm::LogError("registerWithDQMSM") << "Here is the raw text that was returned:";
00540           edm::LogError("registerWithDQMSM") << data.d_;
00541         }
00542         else {
00543           edm::LogError("registerWithDQMSM") << "Here are the first " << MAX_DUMP_LENGTH <<
00544             " characters of the raw text that was returned:";
00545           edm::LogError("registerWithDQMSM") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00546         }
00547         edm::LogError("registerWithDQMSM") << "========================================";
00548         return 0;
00549       }
00550     }
00551     if(registrationStatus == ConsRegResponseBuilder::ES_NOT_READY) return 0;
00552     FDEBUG(5) << "Consumer ID = " << consumerId << endl;
00553     return consumerId;
00554   }
00555 
00556   bool DataProcessManager::getAnyHeaderFromSM()
00557   {
00558     // Try the list of SM in order of registration to get one Header
00559     bool gotOneHeader = false;
00560     if(smList_.size() > 0) {
00561        for(unsigned int i = 0; i < smList_.size(); ++i) {
00562          if(smRegMap_[smList_[i] ] > 0) {
00563             bool success = getHeaderFromSM(smList_[i]);
00564             if(success) { // should cleam this up!
00565               gotOneHeader = true;
00566               return gotOneHeader;
00567             }
00568          }
00569        }
00570     } else {
00571       // this is a problem (but maybe not with non-blocking processing loop)
00572       return false;
00573     }
00574     return gotOneHeader;
00575   }
00576 
00577   bool DataProcessManager::getHeaderFromAllSM()
00578   {
00579     // Try the list of SM in order of registration to get one Header from each
00580     // TODO: how do we get multiple headers if there are more than one?
00581     bool gotAllHeaders = true;
00582     if(smList_.size() > 0) {
00583        for(unsigned int i = 0; i < smList_.size(); ++i) {
00584          if(smRegMap_[smList_[i] ] > 0) { // is registered
00585             if(smHeaderMap_[smList_[i] ]) continue; // already got header
00586             bool success = getHeaderFromSM(smList_[i]);
00587             if(success) {
00588               smHeaderMap_[smList_[i] ] = true;
00589             } else {
00590               gotAllHeaders = false;
00591             }
00592          } else {
00593            gotAllHeaders = false;
00594          }
00595        }
00596     } else {
00597       return false;
00598     }
00599     return gotAllHeaders;
00600   }
00601 
00602   bool DataProcessManager::getHeaderFromSM(std::string smURL)
00603   {
00604     // One single try to get a header from this SM URL
00605     stor::ReadData data;
00606 
00607     data.d_.clear();
00608     CURL* han = curl_easy_init();
00609     if(han==0)
00610     {
00611       edm::LogError("getHeaderFromSM") << "Could not create curl handle";
00612       // this is a fatal error isn't it? Are we catching this? TODO
00613       throw cms::Exception("getHeaderFromSM","DataProcessManager")
00614           << "Unable to create curl handle\n";
00615     }
00616     // set the standard https request options
00617     std::string url2use = smURL + headerpage_;
00618     setopt(han,CURLOPT_URL,url2use.c_str());
00619     setopt(han,CURLOPT_WRITEFUNCTION,func);
00620     setopt(han,CURLOPT_WRITEDATA,&data);
00621 
00622     // send our consumer ID as part of the header request
00623     char msgBuff[100];
00624     OtherMessageBuilder requestMessage(&msgBuff[0], Header::HEADER_REQUEST,
00625                                        sizeof(char_uint32));
00626     uint8 *bodyPtr = requestMessage.msgBody();
00627     convert(smRegMap_[smURL], bodyPtr);
00628     setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00629     setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00630     struct curl_slist *headers=NULL;
00631     headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00632     headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00633     setopt(han, CURLOPT_HTTPHEADER, headers);
00634 
00635     // send the HTTP POST, read the reply, and cleanup before going on
00636     CURLcode messageStatus = curl_easy_perform(han);
00637     curl_slist_free_all(headers);
00638     curl_easy_cleanup(han);
00639 
00640     if(messageStatus!=0)
00641     { 
00642       cerr << "curl perform failed for header" << endl;
00643       edm::LogError("getHeaderFromSM") << "curl perform failed for header. "
00644         << "Could not get header from an already registered Storage Manager"
00645         << " at " << smURL;
00646       return false;
00647     }
00648     if(data.d_.length() == 0)
00649     { 
00650       return false;
00651     }
00652 
00653     // rely on https transfer string of correct length!
00654     int len = data.d_.length();
00655     FDEBUG(9) << "getHeaderFromSM received registry len = " << len << std::endl;
00656 
00657     // check that we've received a valid INIT message
00658     // or a set of INIT messages.  Save everything that we receive.
00659     bool addedNewInitMsg = false;
00660     try
00661     {
00662       HeaderView hdrView(&data.d_[0]);
00663       if (hdrView.code() == Header::INIT)
00664       {
00665         InitMsgView initView(&data.d_[0]);
00666         if (initMsgCollection_->addIfUnique(initView))
00667         {
00668           addedNewInitMsg = true;
00669         }
00670       }
00671       else if (hdrView.code() == Header::INIT_SET)
00672       {
00673         OtherMessageView otherView(&data.d_[0]);
00674         bodyPtr = otherView.msgBody();
00675         uint32 fullSize = otherView.bodySize();
00676         while ((unsigned int) (bodyPtr-otherView.msgBody()) < fullSize)
00677         {
00678           InitMsgView initView(bodyPtr);
00679           if (initMsgCollection_->addIfUnique(initView))
00680           {
00681             addedNewInitMsg = true;
00682           }
00683           bodyPtr += initView.size();
00684         }
00685       }
00686       else
00687       {
00688         throw cms::Exception("getHeaderFromSM", "DataProcessManager");
00689       }
00690     }
00691     catch (cms::Exception excpt)
00692     {
00693       const unsigned int MAX_DUMP_LENGTH = 1000;
00694       edm::LogError("getHeaderFromSM") << "========================================";
00695       edm::LogError("getHeaderFromSM") << "Exception decoding the getRegistryData response!";
00696       if (data.d_.length() <= MAX_DUMP_LENGTH) {
00697         edm::LogError("getHeaderFromSM") << "Here is the raw text that was returned:";
00698         edm::LogError("getHeaderFromSM") << data.d_;
00699       }
00700       else {
00701         edm::LogError("getHeaderFromSM") << "Here are the first " << MAX_DUMP_LENGTH <<
00702           " characters of the raw text that was returned:";
00703         edm::LogError("getHeaderFromSM") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00704       }
00705       edm::LogError("getHeaderFromSM") << "========================================";
00706       throw excpt;
00707     }
00708 
00709     // check if any currently connected consumers did not specify
00710     // an HLT output module label and we now have multiple, different,
00711     // INIT messages.  If so, we need to complain because the
00712     // SelectHLTOutput parameter needs to be specified when there
00713     // is more than one HLT output module (and correspondingly, more
00714     // than one INIT message)
00715     if (addedNewInitMsg && eventServer_.get() != NULL &&
00716         initMsgCollection_->size() > 1)
00717     {
00718       std::map< uint32, boost::shared_ptr<ConsumerPipe> > consumerTable = 
00719         eventServer_->getConsumerTable();
00720       std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator 
00721         consumerIter;
00722       for (consumerIter = consumerTable.begin();
00723            consumerIter != consumerTable.end();
00724            consumerIter++)
00725       {
00726         boost::shared_ptr<ConsumerPipe> consPtr = consumerIter->second;
00727 
00728         if (consPtr->getHLTOutputSelection().empty())
00729         {
00730           // store a warning message in the consumer pipe to be
00731           // sent to the consumer at the next opportunity
00732           std::string errorString;
00733           errorString.append("ERROR: The configuration for this ");
00734           errorString.append("consumer does not specify an HLT output ");
00735           errorString.append("module.\nPlease specify one of the HLT ");
00736           errorString.append("output modules listed below as the ");
00737           errorString.append("SelectHLTOutput parameter ");
00738           errorString.append("in the InputSource configuration.\n");
00739           errorString.append(initMsgCollection_->getSelectionHelpString());
00740           errorString.append("\n");
00741           consPtr->setRegistryWarning(errorString);
00742         }
00743       }
00744     }
00745 
00746     return true;
00747   }
00748 
00749   void DataProcessManager::waitBetweenRegTrys()
00750   {
00751     // for now just a simple wait for a fixed time
00752     sleep(headerRetryInterval_);
00753     return;
00754   }
00755 
00756   bool DataProcessManager::haveRegWithEventServer()
00757   {
00758     // registered with any of the SM event servers
00759     if(smList_.size() > 0) {
00760       for(unsigned int i = 0; i < smList_.size(); ++i) {
00761         if(smRegMap_[smList_[i] ] > 0) return true;
00762       }
00763     }
00764     return false;
00765   }
00766 
00767   bool DataProcessManager::haveRegWithDQMServer()
00768   {
00769     // registered with any of the SM DQM servers
00770     if(DQMsmList_.size() > 0) {
00771       for(unsigned int i = 0; i < DQMsmList_.size(); ++i) {
00772         if(DQMsmRegMap_[DQMsmList_[i] ] > 0) return true;
00773       }
00774     }
00775     return false;
00776   }
00777 
00778   bool DataProcessManager::haveHeader()
00779   {
00780     if(initMsgCollection_->size() > 0) return true;
00781     return false;
00782   }
00783 
00784   void DataProcessManager::getEventFromAllSM()
00785   {
00786     // Try the list of SM in order of registration to get one event
00787     // so long as we have the header from SM already
00788     if(smList_.size() > 0 && haveHeader()) {
00789       double time2wait = 0.0;
00790       double sleepTime = 300.0;
00791       bool gotOneEvent = false;
00792       bool gotOne = false;
00793       for(unsigned int i = 0; i < smList_.size(); ++i) {
00794         if(smRegMap_[smList_[i] ] > 0) {   // is registered
00795           gotOne = getOneEventFromSM(smList_[i], time2wait);
00796           if(gotOne) {
00797             gotOneEvent = true;
00798           } else {
00799             if(time2wait < sleepTime && time2wait >= 0.0) sleepTime = time2wait;
00800           }
00801         }
00802       }
00803       // check if we need to sleep (to enforce the allowed request rate)
00804       // we don't want to ping the StorageManager app too often
00805       if(!gotOneEvent) {
00806         if(sleepTime > 0.0) usleep(static_cast<int>(1000000 * sleepTime));
00807       }
00808     }
00809   }
00810 
00811   double DataProcessManager::getTime2Wait(std::string smURL)
00812   {
00813     // calculate time since last ping of this SM in seconds
00814     struct timeval now;
00815     struct timezone dummyTZ;
00816     gettimeofday(&now, &dummyTZ);
00817     double timeDiff = (double) now.tv_sec;
00818     timeDiff -= (double) lastReqMap_[smURL].tv_sec;
00819     timeDiff += ((double) now.tv_usec / 1000000.0);
00820     timeDiff -= ((double) lastReqMap_[smURL].tv_usec / 1000000.0);
00821     if (timeDiff < minEventRequestInterval_)
00822     {
00823       return (minEventRequestInterval_ - timeDiff);
00824     }
00825     else
00826     {
00827       return 0.0;
00828     }
00829   }
00830 
00831   void DataProcessManager::setTime2Now(std::string smURL)
00832   {
00833     struct timeval now;
00834     struct timezone dummyTZ;
00835     gettimeofday(&now, &dummyTZ);
00836     lastReqMap_[smURL] = now;
00837   }
00838 
00839   bool DataProcessManager::getOneEventFromSM(std::string smURL, double& time2wait)
00840   {
00841     // See if we will exceed the request rate, if so just return false
00842     // Return values: 
00843     //    true = we have an event; false = no event (whatever reason)
00844     // time2wait values:
00845     //    0.0 = we pinged this SM this time; >0 = did not ping, wait this time
00846     // if every SM returns false we sleep some time
00847     time2wait = getTime2Wait(smURL);
00848     if(time2wait > 0.0) {
00849       return false;
00850     } else {
00851       setTime2Now(smURL);
00852     }
00853 
00854     // One single try to get a event from this SM URL
00855     stor::ReadData data;
00856 
00857     // start a measurement of how long the HTTP POST takes
00858     eventFetchTimer_.stop();
00859     eventFetchTimer_.reset();
00860     eventFetchTimer_.start();
00861 
00862     data.d_.clear();
00863     CURL* han = curl_easy_init();
00864     if(han==0)
00865     {
00866       edm::LogError("getOneEventFromSM") << "Could not create curl handle";
00867       // this is a fatal error isn't it? Are we catching this? TODO
00868       throw cms::Exception("getOneEventFromSM","DataProcessManager")
00869           << "Unable to create curl handle\n";
00870     }
00871     // set the standard https request options
00872     std::string url2use = smURL + eventpage_;
00873     setopt(han,CURLOPT_URL,url2use.c_str());
00874     setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00875     setopt(han,CURLOPT_WRITEDATA,&data);
00876 
00877     // send our consumer ID as part of the event request
00878     // The event request body consists of the consumerId and the
00879     // number of INIT messages in our collection.  The latter is used
00880     // to determine if we need to re-fetch the INIT message collection.
00881     char msgBuff[100];
00882     OtherMessageBuilder requestMessage(&msgBuff[0], Header::EVENT_REQUEST,
00883                                        2 * sizeof(char_uint32));
00884     uint8 *bodyPtr = requestMessage.msgBody();
00885     convert(smRegMap_[smURL], bodyPtr);
00886     bodyPtr += sizeof(char_uint32);
00887     convert((uint32) initMsgCollection_->size(), bodyPtr);
00888     setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00889     setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00890     struct curl_slist *headers=NULL;
00891     headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00892     headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00893     stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00894 
00895     // send the HTTP POST, read the reply, and cleanup before going on
00896     CURLcode messageStatus = curl_easy_perform(han);
00897     curl_slist_free_all(headers);
00898     curl_easy_cleanup(han);
00899 
00900     if(messageStatus!=0)
00901     { 
00902       cerr << "curl perform failed for event" << endl;
00903       edm::LogError("getOneEventFromSM") << "curl perform failed for event. "
00904         << "Could not get event from an already registered Storage Manager"
00905         << " at " << smURL;
00906 
00907       // keep statistics for all HTTP POSTS
00908       eventFetchTimer_.stop();
00909       ltEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00910       stEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00911 
00912       return false;
00913     }
00914 
00915     // rely on https transfer string of correct length!
00916     int len = data.d_.length();
00917     FDEBUG(9) << "getOneEventFromSM received len = " << len << std::endl;
00918     if(data.d_.length() == 0)
00919     { 
00920       // keep statistics for all HTTP POSTS
00921       eventFetchTimer_.stop();
00922       ltEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00923       stEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00924 
00925       return false;
00926     }
00927 
00928     buf_.resize(len);
00929     for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00930 
00931     // keep statistics for all HTTP POSTS
00932     eventFetchTimer_.stop();
00933     ltEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00934     stEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00935 
00936     // first check if done message
00937     OtherMessageView msgView(&buf_[0]);
00938 
00939     if (msgView.code() == Header::DONE) {
00940       // TODO fixme:just print message for now
00941       std::cout << " SM " << smURL << " has halted" << std::endl;
00942       return false;
00943     } else if (msgView.code() == Header::NEW_INIT_AVAILABLE) {
00944       std::cout << "Received NEW_INIT_AVAILABLE message" << std::endl;
00945       headerRefetchRequested_ = true;
00946       return false;
00947     } else {
00948       // 05-Feb-2008, KAB:  catch (and rethrow) any exceptions decoding
00949       // the event data so that we can display the returned HTML and
00950       // (hopefully) give the user a hint as to the cause of the problem.
00951       try {
00952         HeaderView hdrView(&buf_[0]);
00953         if (hdrView.code() != Header::EVENT) {
00954           throw cms::Exception("getOneEventFromSM", "DataProcessManager");
00955         }
00956         EventMsgView eventView(&buf_[0]);
00957         ++receivedEvents_;
00958         addMeasurement((unsigned long)data.d_.length());
00959         if(eventServer_.get() != NULL) {
00960           eventServer_->processEvent(eventView);
00961           return true;
00962         }
00963       }
00964       catch (cms::Exception excpt) {
00965         const unsigned int MAX_DUMP_LENGTH = 1000;
00966         edm::LogError("getOneEventFromSM") << "========================================";
00967         edm::LogError("getOneEventFromSM") << "Exception decoding the getEventData response!";
00968         if (data.d_.length() <= MAX_DUMP_LENGTH) {
00969           edm::LogError("getOneEventFromSM") << "Here is the raw text that was returned:";
00970           edm::LogError("getOneEventFromSM") << data.d_;
00971         }
00972         else {
00973           edm::LogError("getOneEventFromSM") << "Here are the first " << MAX_DUMP_LENGTH <<
00974             " characters of the raw text that was returned:";
00975           edm::LogError("getOneEventFromSM") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00976         }
00977         edm::LogError("getOneEventFromSM") << "========================================";
00978         throw excpt;
00979       }
00980     }
00981     return false;
00982   }
00983 
00984   void DataProcessManager::getDQMEventFromAllSM()
00985   {
00986     // Try the list of SM in order of registration to get one event
00987     // so long as we have the header from SM already
00988     if(smList_.size() > 0) {
00989       double time2wait = 0.0;
00990       double sleepTime = 300.0;
00991       bool gotOneEvent = false;
00992       bool gotOne = false;
00993       for(unsigned int i = 0; i < smList_.size(); ++i) {
00994         if(DQMsmRegMap_[smList_[i] ] > 0) {   // is registered
00995           gotOne = getOneDQMEventFromSM(smList_[i], time2wait);
00996           if(gotOne) {
00997             gotOneEvent = true;
00998           } else {
00999             if(time2wait < sleepTime && time2wait >= 0.0) sleepTime = time2wait;
01000           }
01001         }
01002       }
01003       // check if we need to sleep (to enforce the allowed request rate)
01004       // we don't want to ping the StorageManager app too often
01005       // TODO fixme: Cannot sleep for DQM as this is a long time usually
01006       //             and we block the event request poll if we sleep!
01007       //             have to find out how to ensure the correct poll rate
01008       if(!gotOneEvent) {
01009         //if(sleepTime > 0.0) usleep(static_cast<int>(1000000 * sleepTime));
01010       }
01011     }
01012   }
01013 
01014   double DataProcessManager::getDQMTime2Wait(std::string smURL)
01015   {
01016     // calculate time since last ping of this SM in seconds
01017     struct timeval now;
01018     struct timezone dummyTZ;
01019     gettimeofday(&now, &dummyTZ);
01020     double timeDiff = (double) now.tv_sec;
01021     timeDiff -= (double) lastDQMReqMap_[smURL].tv_sec;
01022     timeDiff += ((double) now.tv_usec / 1000000.0);
01023     timeDiff -= ((double) lastDQMReqMap_[smURL].tv_usec / 1000000.0);
01024     if (timeDiff < minDQMEventRequestInterval_)
01025     {
01026       return (minDQMEventRequestInterval_ - timeDiff);
01027     }
01028     else
01029     {
01030       return 0.0;
01031     }
01032   }
01033 
01034   void DataProcessManager::setDQMTime2Now(std::string smURL)
01035   {
01036     struct timeval now;
01037     struct timezone dummyTZ;
01038     gettimeofday(&now, &dummyTZ);
01039     lastDQMReqMap_[smURL] = now;
01040   }
01041 
01042   bool DataProcessManager::getOneDQMEventFromSM(std::string smURL, double& time2wait)
01043   {
01044     // See if we will exceed the request rate, if so just return false
01045     // Return values: 
01046     //    true = we have an event; false = no event (whatever reason)
01047     // time2wait values:
01048     //    0.0 = we pinged this SM this time; >0 = did not ping, wait this time
01049     // if every SM returns false we sleep some time
01050     time2wait = getDQMTime2Wait(smURL);
01051     if(time2wait > 0.0) {
01052       return false;
01053     } else {
01054       setDQMTime2Now(smURL);
01055     }
01056 
01057     // One single try to get a event from this SM URL
01058     stor::ReadData data;
01059 
01060     // start a measurement of how long the HTTP POST takes
01061     dqmFetchTimer_.stop();
01062     dqmFetchTimer_.reset();
01063     dqmFetchTimer_.start();
01064 
01065     data.d_.clear();
01066     CURL* han = curl_easy_init();
01067     if(han==0)
01068     {
01069       edm::LogError("getOneDQMEventFromSM") << "Could not create curl handle";
01070       // this is a fatal error isn't it? Are we catching this? TODO
01071       throw cms::Exception("getOneDQMEventFromSM","DataProcessManager")
01072           << "Unable to create curl handle\n";
01073     }
01074     // set the standard https request options
01075     std::string url2use = smURL + DQMeventpage_;
01076     setopt(han,CURLOPT_URL,url2use.c_str());
01077     setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
01078     setopt(han,CURLOPT_WRITEDATA,&data);
01079 
01080     // send our consumer ID as part of the event request
01081     char msgBuff[100];
01082     OtherMessageBuilder requestMessage(&msgBuff[0], Header::DQMEVENT_REQUEST,
01083                                        sizeof(char_uint32));
01084     uint8 *bodyPtr = requestMessage.msgBody();
01085     convert(DQMsmRegMap_[smURL], bodyPtr);
01086     setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
01087     setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
01088     struct curl_slist *headers=NULL;
01089     headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
01090     headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
01091     stor::setopt(han, CURLOPT_HTTPHEADER, headers);
01092 
01093     // send the HTTP POST, read the reply, and cleanup before going on
01094     CURLcode messageStatus = curl_easy_perform(han);
01095     curl_slist_free_all(headers);
01096     curl_easy_cleanup(han);
01097 
01098     if(messageStatus!=0)
01099     { 
01100       cerr << "curl perform failed for DQM event" << endl;
01101       edm::LogError("getOneDQMEventFromSM") << "curl perform failed for DQM event. "
01102         << "Could not get DQMevent from an already registered Storage Manager"
01103         << " at " << smURL;
01104 
01105       // keep statistics for all HTTP POSTS
01106       dqmFetchTimer_.stop();
01107       ltDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01108       stDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01109 
01110       return false;
01111     }
01112 
01113     // rely on https transfer string of correct length!
01114     int len = data.d_.length();
01115     FDEBUG(9) << "getOneDQMEventFromSM received len = " << len << std::endl;
01116     if(data.d_.length() == 0)
01117     { 
01118       // keep statistics for all HTTP POSTS
01119       dqmFetchTimer_.stop();
01120       ltDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01121       stDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01122 
01123       return false;
01124     }
01125 
01126     buf_.resize(len);
01127     for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
01128 
01129     // keep statistics for all HTTP POSTS
01130     dqmFetchTimer_.stop();
01131     ltDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01132     stDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01133 
01134     // first check if done message
01135     OtherMessageView msgView(&buf_[0]);
01136 
01137     if (msgView.code() == Header::DONE) {
01138       // TODO fixme:just print message for now
01139       std::cout << " SM " << smURL << " has halted" << std::endl;
01140       return false;
01141     } else {
01142       DQMEventMsgView dqmEventView(&buf_[0]);
01143       ++receivedDQMEvents_;
01144       addMeasurement((unsigned long)data.d_.length());
01145       if(dqmServiceManager_.get() != NULL) {
01146           dqmServiceManager_->manageDQMEventMsg(dqmEventView);
01147           return true;
01148       }
01149     }
01150     return false;
01151   }
01152 
01154   void DataProcessManager::addMeasurement(unsigned long size)
01155   {
01156     // for bandwidth performance measurements
01157     if(pmeter_->addSample(size))
01158     {
01159        stats_ = pmeter_->getStats();
01160     }
01161   }
01162 
01163   double DataProcessManager::getSampleCount(STATS_TIME_FRAME timeFrame,
01164                                             STATS_TIMING_TYPE timingType,
01165                                             double currentTime)
01166   {
01167     if (timeFrame == SHORT_TERM) {
01168       if (timingType == DQMEVENT_FETCH) {
01169         return stDQMFetchTimeCounter_->getSampleCount(currentTime);
01170       }
01171       else {
01172         return stEventFetchTimeCounter_->getSampleCount(currentTime);
01173       }
01174     }
01175     else {
01176       if (timingType == DQMEVENT_FETCH) {
01177         return ltDQMFetchTimeCounter_->getSampleCount();
01178       }
01179       else {
01180         return ltEventFetchTimeCounter_->getSampleCount();
01181       }
01182     }
01183   }
01184 
01185   double DataProcessManager::getAverageValue(STATS_TIME_FRAME timeFrame,
01186                                              STATS_TIMING_TYPE timingType,
01187                                              double currentTime)
01188   {
01189     if (timeFrame == SHORT_TERM) {
01190       if (timingType == DQMEVENT_FETCH) {
01191         return stDQMFetchTimeCounter_->getValueAverage(currentTime);
01192       }
01193       else {
01194         return stEventFetchTimeCounter_->getValueAverage(currentTime);
01195       }
01196     }
01197     else {
01198       if (timingType == DQMEVENT_FETCH) {
01199         return ltDQMFetchTimeCounter_->getValueAverage();
01200       }
01201       else {
01202         return ltEventFetchTimeCounter_->getValueAverage();
01203       }
01204     }
01205   }
01206 
01207   double DataProcessManager::getDuration(STATS_TIME_FRAME timeFrame,
01208                                          STATS_TIMING_TYPE timingType,
01209                                          double currentTime)
01210   {
01211     if (timeFrame == SHORT_TERM) {
01212       if (timingType == DQMEVENT_FETCH) {
01213         return stDQMFetchTimeCounter_->getDuration(currentTime);
01214       }
01215       else {
01216         return stEventFetchTimeCounter_->getDuration(currentTime);
01217       }
01218     }
01219     else {
01220       if (timingType == DQMEVENT_FETCH) {
01221         return ltDQMFetchTimeCounter_->getDuration(currentTime);
01222       }
01223       else {
01224         return ltEventFetchTimeCounter_->getDuration(currentTime);
01225       }
01226     }
01227   }
01228 }

Generated on Tue Jun 9 17:34:51 2009 for CMSSW by  doxygen 1.5.4