CMS 3D CMS Logo

stor::DataProcessManager Class Reference

#include <EventFilter/SMProxyServer/interface/DataProcessManager.h>

List of all members.

Public Types

typedef std::vector< char > Buf
typedef std::map< std::string,
bool
HeaderConsumer_map
typedef std::map< std::string,
struct timeval > 
LastReqTime_map
typedef std::map< std::string,
unsigned int
RegConsumer_map
enum  STATS_TIME_FRAME { SHORT_TERM = 0, LONG_TERM = 1 }
enum  STATS_TIMING_TYPE { EVENT_FETCH = 10, DQMEVENT_FETCH = 11 }

Public Member Functions

void addDQMSM2Register (std::string DQMsmURL)
void addMeasurement (unsigned long size)
void addSM2Register (std::string smURL)
 DataProcessManager ()
double getAverageValue (STATS_TIME_FRAME timeFrame=SHORT_TERM, STATS_TIMING_TYPE timingType=EVENT_FETCH, double currentTime=BaseCounter::getCurrentTime())
edm::EventBuffergetCommandQueue ()
boost::shared_ptr
< DQMEventServer > & 
getDQMEventServer ()
boost::shared_ptr
< stor::DQMServiceManager > & 
getDQMServiceManager ()
double getDuration (STATS_TIME_FRAME timeFrame=SHORT_TERM, STATS_TIMING_TYPE timingType=EVENT_FETCH, double currentTime=BaseCounter::getCurrentTime())
boost::shared_ptr< EventServer > & getEventServer ()
boost::shared_ptr
< InitMsgCollection > & 
getInitMsgCollection ()
double getSampleCount (STATS_TIME_FRAME timeFrame=SHORT_TERM, STATS_TIMING_TYPE timingType=EVENT_FETCH, double currentTime=BaseCounter::getCurrentTime())
stor::SMPerfStats getStats ()
bool haveHeader ()
bool haveRegWithDQMServer ()
bool haveRegWithEventServer ()
void join ()
unsigned long period4samples ()
unsigned long receivedDQMevents ()
unsigned long receivedevents ()
unsigned long samples ()
void setArchiveDQM (bool archiveDQM)
void setArchiveIntervalDQM (int archiveInterval)
void setCollateDQM (bool collateDQM)
void setCompressionLevelDQM (int compressionLevelDQM)
void setConsumerName (std::string s)
void setDQMConsumerName (std::string s)
void setDQMEventServer (boost::shared_ptr< DQMEventServer > &es)
void setEventServer (boost::shared_ptr< EventServer > &es)
void setFilePrefixDQM (std::string filePrefixDQM)
void setInitMsgCollection (boost::shared_ptr< InitMsgCollection > &imColl)
void setMaxDQMEventRequestRate (double rate)
void setMaxEventRequestRate (double rate)
void setPeriod4Samples (unsigned long period4samples)
void setPurgeTimeDQM (int purgeTimeDQM)
void setReadyTimeDQM (int readyTimeDQM)
void setSamples (unsigned long num_samples)
void setUseCompressionDQM (bool useCompressionDQM)
void start ()
void stop ()
double totalvolumemb ()
void updateMinEventRequestInterval ()
 ~DataProcessManager ()

Private Member Functions

bool getAnyHeaderFromSM ()
void getDQMEventFromAllSM ()
double getDQMTime2Wait (std::string smURL)
void getEventFromAllSM ()
bool getHeaderFromAllSM ()
bool getHeaderFromSM (std::string smURL)
bool getOneDQMEventFromSM (std::string smURL, double &time2wait)
bool getOneEventFromSM (std::string smURL, double &time2wait)
double getTime2Wait (std::string smURL)
void init ()
void processCommands ()
bool registerWithAllDQMSM ()
bool registerWithAllSM ()
int registerWithDQMSM (std::string smURL)
int registerWithSM (std::string smURL)
void setDQMTime2Now (std::string smURL)
void setTime2Now (std::string smURL)
void waitBetweenRegTrys ()

Static Private Member Functions

static void run (DataProcessManager *)

Private Attributes

bool alreadyRegistered_
bool alreadyRegisteredDQM_
std::vector< unsigned char > buf_
edm::EventBuffercmd_q_
unsigned int consumerId_
std::string consumerName_
std::string consumerPriority_
std::string consumerPSetString_
std::string consumerTopFolderName_
unsigned int DQMconsumerId_
std::string DQMconsumerName_
std::string DQMconsumerPriority_
std::string DQMeventpage_
boost::shared_ptr< DQMEventServerDQMeventServer_
edm::CPUTimer dqmFetchTimer_
std::string DQMregpage_
boost::shared_ptr
< stor::DQMServiceManager
dqmServiceManager_
std::vector< std::string > DQMsmList_
RegConsumer_map DQMsmRegMap_
edm::CPUTimer eventFetchTimer_
std::string eventpage_
boost::shared_ptr< EventServereventServer_
std::string headerpage_
bool headerRefetchRequested_
int headerRetryInterval_
boost::shared_ptr
< InitMsgCollection
initMsgCollection_
LastReqTime_map lastDQMReqMap_
LastReqTime_map lastReqMap_
boost::shared_ptr< ForeverCounterltDQMFetchTimeCounter_
boost::shared_ptr< ForeverCounterltEventFetchTimeCounter_
double maxEventRequestRate_
boost::shared_ptr< boost::threadme_
double minDQMEventRequestInterval_
double minEventRequestInterval_
xdata::UnsignedInteger32 period4samples_
stor::SMPerformanceMeterpmeter_
unsigned long receivedDQMEvents_
unsigned long receivedEvents_
std::string regpage_
xdata::UnsignedInteger32 samples_
HeaderConsumer_map smHeaderMap_
std::vector< std::string > smList_
RegConsumer_map smRegMap_
stor::SMPerfStats stats_
boost::shared_ptr
< RollingIntervalCounter
stDQMFetchTimeCounter_
boost::shared_ptr
< RollingIntervalCounter
stEventFetchTimeCounter_
char subscriptionurl_ [2048]


Detailed Description

Definition at line 28 of file DataProcessManager.h.


Member Typedef Documentation

typedef std::vector<char> stor::DataProcessManager::Buf

Definition at line 34 of file DataProcessManager.h.

typedef std::map<std::string, bool> stor::DataProcessManager::HeaderConsumer_map

Definition at line 36 of file DataProcessManager.h.

typedef std::map<std::string, struct timeval> stor::DataProcessManager::LastReqTime_map

Definition at line 37 of file DataProcessManager.h.

typedef std::map<std::string, unsigned int> stor::DataProcessManager::RegConsumer_map

Definition at line 35 of file DataProcessManager.h.


Member Enumeration Documentation

enum stor::DataProcessManager::STATS_TIME_FRAME

Enumerator:
SHORT_TERM 
LONG_TERM 

Definition at line 31 of file DataProcessManager.h.

00031 { SHORT_TERM = 0, LONG_TERM = 1 };

enum stor::DataProcessManager::STATS_TIMING_TYPE

Enumerator:
EVENT_FETCH 
DQMEVENT_FETCH 

Definition at line 32 of file DataProcessManager.h.

00032 { EVENT_FETCH = 10, DQMEVENT_FETCH = 11 };


Constructor & Destructor Documentation

stor::DataProcessManager::DataProcessManager (  ) 

Definition at line 31 of file DataProcessManager.cc.

References init(), and pmeter_.

00031                                         :
00032     cmd_q_(edm::getEventBuffer(voidptr_size,50)),
00033     alreadyRegistered_(false),
00034     alreadyRegisteredDQM_(false),
00035     headerRefetchRequested_(false),
00036     buf_(2000),
00037     headerRetryInterval_(5),
00038     dqmServiceManager_(new stor::DQMServiceManager()),
00039     receivedEvents_(0),
00040     receivedDQMEvents_(0),
00041     samples_(100),
00042     period4samples_(5)
00043   {
00044     // for performance measurements
00045     pmeter_ = new stor::SMPerformanceMeter();
00046     init();
00047   } 

stor::DataProcessManager::~DataProcessManager (  ) 

Definition at line 49 of file DataProcessManager.cc.

References pmeter_.

00050   {
00051     delete pmeter_;
00052   }


Member Function Documentation

void stor::DataProcessManager::addDQMSM2Register ( std::string  DQMsmURL  ) 

Definition at line 332 of file DataProcessManager.cc.

References DQMsmList_, DQMsmRegMap_, i, and lastDQMReqMap_.

00333   {
00334     // Check if already in the list
00335     bool alreadyInList = false;
00336     if(DQMsmList_.size() > 0) {
00337        for(unsigned int i = 0; i < DQMsmList_.size(); ++i) {
00338          if(DQMsmURL.compare(DQMsmList_[i]) == 0) {
00339             alreadyInList = true;
00340             break;
00341          }
00342        }
00343     }
00344     if(alreadyInList) return;
00345     DQMsmList_.push_back(DQMsmURL);
00346     DQMsmRegMap_.insert(std::make_pair(DQMsmURL,0));
00347     struct timeval lastRequestTime;
00348     lastRequestTime.tv_sec = 0;
00349     lastRequestTime.tv_usec = 0;
00350     lastDQMReqMap_.insert(std::make_pair(DQMsmURL,lastRequestTime));
00351   }

void stor::DataProcessManager::addMeasurement ( unsigned long  size  ) 

Definition at line 1154 of file DataProcessManager.cc.

References stor::SMPerformanceMeter::addSample(), stor::SMPerformanceMeter::getStats(), pmeter_, and stats_.

Referenced by getOneDQMEventFromSM(), and getOneEventFromSM().

01155   {
01156     // for bandwidth performance measurements
01157     if(pmeter_->addSample(size))
01158     {
01159        stats_ = pmeter_->getStats();
01160     }
01161   }

void stor::DataProcessManager::addSM2Register ( std::string  smURL  ) 

Definition at line 308 of file DataProcessManager.cc.

References i, lastReqMap_, smHeaderMap_, smList_, smRegMap_, and updateMinEventRequestInterval().

00309   {
00310     // This smURL is the URN of the StorageManager without the page extension
00311     // Check if already in the list
00312     bool alreadyInList = false;
00313     if(smList_.size() > 0) {
00314        for(unsigned int i = 0; i < smList_.size(); ++i) {
00315          if(smURL.compare(smList_[i]) == 0) {
00316             alreadyInList = true;
00317             break;
00318          }
00319        }
00320     }
00321     if(alreadyInList) return;
00322     smList_.push_back(smURL);
00323     smRegMap_.insert(std::make_pair(smURL,0));
00324     smHeaderMap_.insert(std::make_pair(smURL,false));
00325     struct timeval lastRequestTime;
00326     lastRequestTime.tv_sec = 0;
00327     lastRequestTime.tv_usec = 0;
00328     lastReqMap_.insert(std::make_pair(smURL,lastRequestTime));
00329     updateMinEventRequestInterval();
00330   }

bool stor::DataProcessManager::getAnyHeaderFromSM (  )  [private]

Definition at line 556 of file DataProcessManager.cc.

References getHeaderFromSM(), i, smList_, and smRegMap_.

00557   {
00558     // Try the list of SM in order of registration to get one Header
00559     bool gotOneHeader = false;
00560     if(smList_.size() > 0) {
00561        for(unsigned int i = 0; i < smList_.size(); ++i) {
00562          if(smRegMap_[smList_[i] ] > 0) {
00563             bool success = getHeaderFromSM(smList_[i]);
00564             if(success) { // should cleam this up!
00565               gotOneHeader = true;
00566               return gotOneHeader;
00567             }
00568          }
00569        }
00570     } else {
00571       // this is a problem (but maybe not with non-blocking processing loop)
00572       return false;
00573     }
00574     return gotOneHeader;
00575   }

double stor::DataProcessManager::getAverageValue ( STATS_TIME_FRAME  timeFrame = SHORT_TERM,
STATS_TIMING_TYPE  timingType = EVENT_FETCH,
double  currentTime = BaseCounter::getCurrentTime() 
)

Definition at line 1185 of file DataProcessManager.cc.

References DQMEVENT_FETCH, ltDQMFetchTimeCounter_, ltEventFetchTimeCounter_, SHORT_TERM, stDQMFetchTimeCounter_, and stEventFetchTimeCounter_.

01188   {
01189     if (timeFrame == SHORT_TERM) {
01190       if (timingType == DQMEVENT_FETCH) {
01191         return stDQMFetchTimeCounter_->getValueAverage(currentTime);
01192       }
01193       else {
01194         return stEventFetchTimeCounter_->getValueAverage(currentTime);
01195       }
01196     }
01197     else {
01198       if (timingType == DQMEVENT_FETCH) {
01199         return ltDQMFetchTimeCounter_->getValueAverage();
01200       }
01201       else {
01202         return ltEventFetchTimeCounter_->getValueAverage();
01203       }
01204     }
01205   }

edm::EventBuffer& stor::DataProcessManager::getCommandQueue (  )  [inline]

Definition at line 97 of file DataProcessManager.h.

References cmd_q_.

00097 { return *cmd_q_; }

void stor::DataProcessManager::getDQMEventFromAllSM (  )  [private]

Definition at line 984 of file DataProcessManager.cc.

References DQMsmRegMap_, getOneDQMEventFromSM(), i, and smList_.

Referenced by processCommands().

00985   {
00986     // Try the list of SM in order of registration to get one event
00987     // so long as we have the header from SM already
00988     if(smList_.size() > 0) {
00989       double time2wait = 0.0;
00990       double sleepTime = 300.0;
00991       bool gotOneEvent = false;
00992       bool gotOne = false;
00993       for(unsigned int i = 0; i < smList_.size(); ++i) {
00994         if(DQMsmRegMap_[smList_[i] ] > 0) {   // is registered
00995           gotOne = getOneDQMEventFromSM(smList_[i], time2wait);
00996           if(gotOne) {
00997             gotOneEvent = true;
00998           } else {
00999             if(time2wait < sleepTime && time2wait >= 0.0) sleepTime = time2wait;
01000           }
01001         }
01002       }
01003       // check if we need to sleep (to enforce the allowed request rate)
01004       // we don't want to ping the StorageManager app too often
01005       // TODO fixme: Cannot sleep for DQM as this is a long time usually
01006       //             and we block the event request poll if we sleep!
01007       //             have to find out how to ensure the correct poll rate
01008       if(!gotOneEvent) {
01009         //if(sleepTime > 0.0) usleep(static_cast<int>(1000000 * sleepTime));
01010       }
01011     }
01012   }

boost::shared_ptr<DQMEventServer>& stor::DataProcessManager::getDQMEventServer (  )  [inline]

Definition at line 94 of file DataProcessManager.h.

References DQMeventServer_.

00094 { return DQMeventServer_; }

boost::shared_ptr<stor::DQMServiceManager>& stor::DataProcessManager::getDQMServiceManager (  )  [inline]

Definition at line 95 of file DataProcessManager.h.

References dqmServiceManager_.

00095 { return dqmServiceManager_; }

double stor::DataProcessManager::getDQMTime2Wait ( std::string  smURL  )  [private]

Definition at line 1014 of file DataProcessManager.cc.

References lastDQMReqMap_, and minDQMEventRequestInterval_.

Referenced by getOneDQMEventFromSM().

01015   {
01016     // calculate time since last ping of this SM in seconds
01017     struct timeval now;
01018     struct timezone dummyTZ;
01019     gettimeofday(&now, &dummyTZ);
01020     double timeDiff = (double) now.tv_sec;
01021     timeDiff -= (double) lastDQMReqMap_[smURL].tv_sec;
01022     timeDiff += ((double) now.tv_usec / 1000000.0);
01023     timeDiff -= ((double) lastDQMReqMap_[smURL].tv_usec / 1000000.0);
01024     if (timeDiff < minDQMEventRequestInterval_)
01025     {
01026       return (minDQMEventRequestInterval_ - timeDiff);
01027     }
01028     else
01029     {
01030       return 0.0;
01031     }
01032   }

double stor::DataProcessManager::getDuration ( STATS_TIME_FRAME  timeFrame = SHORT_TERM,
STATS_TIMING_TYPE  timingType = EVENT_FETCH,
double  currentTime = BaseCounter::getCurrentTime() 
)

Definition at line 1207 of file DataProcessManager.cc.

References DQMEVENT_FETCH, ltDQMFetchTimeCounter_, ltEventFetchTimeCounter_, SHORT_TERM, stDQMFetchTimeCounter_, and stEventFetchTimeCounter_.

01210   {
01211     if (timeFrame == SHORT_TERM) {
01212       if (timingType == DQMEVENT_FETCH) {
01213         return stDQMFetchTimeCounter_->getDuration(currentTime);
01214       }
01215       else {
01216         return stEventFetchTimeCounter_->getDuration(currentTime);
01217       }
01218     }
01219     else {
01220       if (timingType == DQMEVENT_FETCH) {
01221         return ltDQMFetchTimeCounter_->getDuration(currentTime);
01222       }
01223       else {
01224         return ltEventFetchTimeCounter_->getDuration(currentTime);
01225       }
01226     }
01227   }

void stor::DataProcessManager::getEventFromAllSM (  )  [private]

Definition at line 784 of file DataProcessManager.cc.

References getOneEventFromSM(), haveHeader(), i, smList_, and smRegMap_.

Referenced by processCommands().

00785   {
00786     // Try the list of SM in order of registration to get one event
00787     // so long as we have the header from SM already
00788     if(smList_.size() > 0 && haveHeader()) {
00789       double time2wait = 0.0;
00790       double sleepTime = 300.0;
00791       bool gotOneEvent = false;
00792       bool gotOne = false;
00793       for(unsigned int i = 0; i < smList_.size(); ++i) {
00794         if(smRegMap_[smList_[i] ] > 0) {   // is registered
00795           gotOne = getOneEventFromSM(smList_[i], time2wait);
00796           if(gotOne) {
00797             gotOneEvent = true;
00798           } else {
00799             if(time2wait < sleepTime && time2wait >= 0.0) sleepTime = time2wait;
00800           }
00801         }
00802       }
00803       // check if we need to sleep (to enforce the allowed request rate)
00804       // we don't want to ping the StorageManager app too often
00805       if(!gotOneEvent) {
00806         if(sleepTime > 0.0) usleep(static_cast<int>(1000000 * sleepTime));
00807       }
00808     }
00809   }

boost::shared_ptr<EventServer>& stor::DataProcessManager::getEventServer (  )  [inline]

Definition at line 51 of file DataProcessManager.h.

References eventServer_.

00051 { return eventServer_; }

bool stor::DataProcessManager::getHeaderFromAllSM (  )  [private]

Definition at line 577 of file DataProcessManager.cc.

References getHeaderFromSM(), i, smHeaderMap_, smList_, and smRegMap_.

Referenced by processCommands().

00578   {
00579     // Try the list of SM in order of registration to get one Header from each
00580     // TODO: how do we get multiple headers if there are more than one?
00581     bool gotAllHeaders = true;
00582     if(smList_.size() > 0) {
00583        for(unsigned int i = 0; i < smList_.size(); ++i) {
00584          if(smRegMap_[smList_[i] ] > 0) { // is registered
00585             if(smHeaderMap_[smList_[i] ]) continue; // already got header
00586             bool success = getHeaderFromSM(smList_[i]);
00587             if(success) {
00588               smHeaderMap_[smList_[i] ] = true;
00589             } else {
00590               gotAllHeaders = false;
00591             }
00592          } else {
00593            gotAllHeaders = false;
00594          }
00595        }
00596     } else {
00597       return false;
00598     }
00599     return gotAllHeaders;
00600   }

bool stor::DataProcessManager::getHeaderFromSM ( std::string  smURL  )  [private]

Definition at line 602 of file DataProcessManager.cc.

References OtherMessageView::bodySize(), TestMuL1L2Filter_cff::cerr, HeaderView::code(), convert(), stor::ReadData::d_, data, lat::endl(), eventServer_, Exception, FDEBUG, stor::func(), Header::HEADER_REQUEST, headerpage_, Header::INIT, Header::INIT_SET, initMsgCollection_, len, OtherMessageBuilder::msgBody(), OtherMessageView::msgBody(), NULL, stor::setopt(), OtherMessageBuilder::size(), InitMsgView::size(), smRegMap_, and OtherMessageBuilder::startAddress().

Referenced by getAnyHeaderFromSM(), and getHeaderFromAllSM().

00603   {
00604     // One single try to get a header from this SM URL
00605     stor::ReadData data;
00606 
00607     data.d_.clear();
00608     CURL* han = curl_easy_init();
00609     if(han==0)
00610     {
00611       edm::LogError("getHeaderFromSM") << "Could not create curl handle";
00612       // this is a fatal error isn't it? Are we catching this? TODO
00613       throw cms::Exception("getHeaderFromSM","DataProcessManager")
00614           << "Unable to create curl handle\n";
00615     }
00616     // set the standard https request options
00617     std::string url2use = smURL + headerpage_;
00618     setopt(han,CURLOPT_URL,url2use.c_str());
00619     setopt(han,CURLOPT_WRITEFUNCTION,func);
00620     setopt(han,CURLOPT_WRITEDATA,&data);
00621 
00622     // send our consumer ID as part of the header request
00623     char msgBuff[100];
00624     OtherMessageBuilder requestMessage(&msgBuff[0], Header::HEADER_REQUEST,
00625                                        sizeof(char_uint32));
00626     uint8 *bodyPtr = requestMessage.msgBody();
00627     convert(smRegMap_[smURL], bodyPtr);
00628     setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00629     setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00630     struct curl_slist *headers=NULL;
00631     headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00632     headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00633     setopt(han, CURLOPT_HTTPHEADER, headers);
00634 
00635     // send the HTTP POST, read the reply, and cleanup before going on
00636     CURLcode messageStatus = curl_easy_perform(han);
00637     curl_slist_free_all(headers);
00638     curl_easy_cleanup(han);
00639 
00640     if(messageStatus!=0)
00641     { 
00642       cerr << "curl perform failed for header" << endl;
00643       edm::LogError("getHeaderFromSM") << "curl perform failed for header. "
00644         << "Could not get header from an already registered Storage Manager"
00645         << " at " << smURL;
00646       return false;
00647     }
00648     if(data.d_.length() == 0)
00649     { 
00650       return false;
00651     }
00652 
00653     // rely on https transfer string of correct length!
00654     int len = data.d_.length();
00655     FDEBUG(9) << "getHeaderFromSM received registry len = " << len << std::endl;
00656 
00657     // check that we've received a valid INIT message
00658     // or a set of INIT messages.  Save everything that we receive.
00659     bool addedNewInitMsg = false;
00660     try
00661     {
00662       HeaderView hdrView(&data.d_[0]);
00663       if (hdrView.code() == Header::INIT)
00664       {
00665         InitMsgView initView(&data.d_[0]);
00666         if (initMsgCollection_->addIfUnique(initView))
00667         {
00668           addedNewInitMsg = true;
00669         }
00670       }
00671       else if (hdrView.code() == Header::INIT_SET)
00672       {
00673         OtherMessageView otherView(&data.d_[0]);
00674         bodyPtr = otherView.msgBody();
00675         uint32 fullSize = otherView.bodySize();
00676         while ((unsigned int) (bodyPtr-otherView.msgBody()) < fullSize)
00677         {
00678           InitMsgView initView(bodyPtr);
00679           if (initMsgCollection_->addIfUnique(initView))
00680           {
00681             addedNewInitMsg = true;
00682           }
00683           bodyPtr += initView.size();
00684         }
00685       }
00686       else
00687       {
00688         throw cms::Exception("getHeaderFromSM", "DataProcessManager");
00689       }
00690     }
00691     catch (cms::Exception excpt)
00692     {
00693       const unsigned int MAX_DUMP_LENGTH = 1000;
00694       edm::LogError("getHeaderFromSM") << "========================================";
00695       edm::LogError("getHeaderFromSM") << "Exception decoding the getRegistryData response!";
00696       if (data.d_.length() <= MAX_DUMP_LENGTH) {
00697         edm::LogError("getHeaderFromSM") << "Here is the raw text that was returned:";
00698         edm::LogError("getHeaderFromSM") << data.d_;
00699       }
00700       else {
00701         edm::LogError("getHeaderFromSM") << "Here are the first " << MAX_DUMP_LENGTH <<
00702           " characters of the raw text that was returned:";
00703         edm::LogError("getHeaderFromSM") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00704       }
00705       edm::LogError("getHeaderFromSM") << "========================================";
00706       throw excpt;
00707     }
00708 
00709     // check if any currently connected consumers did not specify
00710     // an HLT output module label and we now have multiple, different,
00711     // INIT messages.  If so, we need to complain because the
00712     // SelectHLTOutput parameter needs to be specified when there
00713     // is more than one HLT output module (and correspondingly, more
00714     // than one INIT message)
00715     if (addedNewInitMsg && eventServer_.get() != NULL &&
00716         initMsgCollection_->size() > 1)
00717     {
00718       std::map< uint32, boost::shared_ptr<ConsumerPipe> > consumerTable = 
00719         eventServer_->getConsumerTable();
00720       std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator 
00721         consumerIter;
00722       for (consumerIter = consumerTable.begin();
00723            consumerIter != consumerTable.end();
00724            consumerIter++)
00725       {
00726         boost::shared_ptr<ConsumerPipe> consPtr = consumerIter->second;
00727 
00728         if (consPtr->getHLTOutputSelection().empty())
00729         {
00730           // store a warning message in the consumer pipe to be
00731           // sent to the consumer at the next opportunity
00732           std::string errorString;
00733           errorString.append("ERROR: The configuration for this ");
00734           errorString.append("consumer does not specify an HLT output ");
00735           errorString.append("module.\nPlease specify one of the HLT ");
00736           errorString.append("output modules listed below as the ");
00737           errorString.append("SelectHLTOutput parameter ");
00738           errorString.append("in the InputSource configuration.\n");
00739           errorString.append(initMsgCollection_->getSelectionHelpString());
00740           errorString.append("\n");
00741           consPtr->setRegistryWarning(errorString);
00742         }
00743       }
00744     }
00745 
00746     return true;
00747   }

boost::shared_ptr<InitMsgCollection>& stor::DataProcessManager::getInitMsgCollection (  )  [inline]

Definition at line 57 of file DataProcessManager.h.

References initMsgCollection_.

00057 { return initMsgCollection_; }

bool stor::DataProcessManager::getOneDQMEventFromSM ( std::string  smURL,
double &  time2wait 
) [private]

Definition at line 1042 of file DataProcessManager.cc.

References addMeasurement(), buf_, TestMuL1L2Filter_cff::cerr, OtherMessageView::code(), convert(), GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, Header::DONE, Header::DQMEVENT_REQUEST, DQMeventpage_, dqmFetchTimer_, dqmServiceManager_, DQMsmRegMap_, lat::endl(), eventFetchTimer_, Exception, FDEBUG, stor::func(), getDQMTime2Wait(), i, len, ltDQMFetchTimeCounter_, OtherMessageBuilder::msgBody(), NULL, edm::CPUTimer::realTime(), receivedDQMEvents_, edm::CPUTimer::reset(), setDQMTime2Now(), stor::setopt(), OtherMessageBuilder::size(), edm::CPUTimer::start(), OtherMessageBuilder::startAddress(), stDQMFetchTimeCounter_, and edm::CPUTimer::stop().

Referenced by getDQMEventFromAllSM().

01043   {
01044     // See if we will exceed the request rate, if so just return false
01045     // Return values: 
01046     //    true = we have an event; false = no event (whatever reason)
01047     // time2wait values:
01048     //    0.0 = we pinged this SM this time; >0 = did not ping, wait this time
01049     // if every SM returns false we sleep some time
01050     time2wait = getDQMTime2Wait(smURL);
01051     if(time2wait > 0.0) {
01052       return false;
01053     } else {
01054       setDQMTime2Now(smURL);
01055     }
01056 
01057     // One single try to get a event from this SM URL
01058     stor::ReadData data;
01059 
01060     // start a measurement of how long the HTTP POST takes
01061     dqmFetchTimer_.stop();
01062     dqmFetchTimer_.reset();
01063     dqmFetchTimer_.start();
01064 
01065     data.d_.clear();
01066     CURL* han = curl_easy_init();
01067     if(han==0)
01068     {
01069       edm::LogError("getOneDQMEventFromSM") << "Could not create curl handle";
01070       // this is a fatal error isn't it? Are we catching this? TODO
01071       throw cms::Exception("getOneDQMEventFromSM","DataProcessManager")
01072           << "Unable to create curl handle\n";
01073     }
01074     // set the standard https request options
01075     std::string url2use = smURL + DQMeventpage_;
01076     setopt(han,CURLOPT_URL,url2use.c_str());
01077     setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
01078     setopt(han,CURLOPT_WRITEDATA,&data);
01079 
01080     // send our consumer ID as part of the event request
01081     char msgBuff[100];
01082     OtherMessageBuilder requestMessage(&msgBuff[0], Header::DQMEVENT_REQUEST,
01083                                        sizeof(char_uint32));
01084     uint8 *bodyPtr = requestMessage.msgBody();
01085     convert(DQMsmRegMap_[smURL], bodyPtr);
01086     setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
01087     setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
01088     struct curl_slist *headers=NULL;
01089     headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
01090     headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
01091     stor::setopt(han, CURLOPT_HTTPHEADER, headers);
01092 
01093     // send the HTTP POST, read the reply, and cleanup before going on
01094     CURLcode messageStatus = curl_easy_perform(han);
01095     curl_slist_free_all(headers);
01096     curl_easy_cleanup(han);
01097 
01098     if(messageStatus!=0)
01099     { 
01100       cerr << "curl perform failed for DQM event" << endl;
01101       edm::LogError("getOneDQMEventFromSM") << "curl perform failed for DQM event. "
01102         << "Could not get DQMevent from an already registered Storage Manager"
01103         << " at " << smURL;
01104 
01105       // keep statistics for all HTTP POSTS
01106       dqmFetchTimer_.stop();
01107       ltDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01108       stDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01109 
01110       return false;
01111     }
01112 
01113     // rely on https transfer string of correct length!
01114     int len = data.d_.length();
01115     FDEBUG(9) << "getOneDQMEventFromSM received len = " << len << std::endl;
01116     if(data.d_.length() == 0)
01117     { 
01118       // keep statistics for all HTTP POSTS
01119       dqmFetchTimer_.stop();
01120       ltDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01121       stDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01122 
01123       return false;
01124     }
01125 
01126     buf_.resize(len);
01127     for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
01128 
01129     // keep statistics for all HTTP POSTS
01130     dqmFetchTimer_.stop();
01131     ltDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01132     stDQMFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
01133 
01134     // first check if done message
01135     OtherMessageView msgView(&buf_[0]);
01136 
01137     if (msgView.code() == Header::DONE) {
01138       // TODO fixme:just print message for now
01139       std::cout << " SM " << smURL << " has halted" << std::endl;
01140       return false;
01141     } else {
01142       DQMEventMsgView dqmEventView(&buf_[0]);
01143       ++receivedDQMEvents_;
01144       addMeasurement((unsigned long)data.d_.length());
01145       if(dqmServiceManager_.get() != NULL) {
01146           dqmServiceManager_->manageDQMEventMsg(dqmEventView);
01147           return true;
01148       }
01149     }
01150     return false;
01151   }

bool stor::DataProcessManager::getOneEventFromSM ( std::string  smURL,
double &  time2wait 
) [private]

Definition at line 839 of file DataProcessManager.cc.

References addMeasurement(), buf_, TestMuL1L2Filter_cff::cerr, HeaderView::code(), OtherMessageView::code(), convert(), GenMuonPlsPt100GeV_cfg::cout, stor::ReadData::d_, data, Header::DONE, lat::endl(), Header::EVENT, Header::EVENT_REQUEST, eventFetchTimer_, eventpage_, eventServer_, Exception, FDEBUG, stor::func(), getTime2Wait(), headerRefetchRequested_, i, initMsgCollection_, len, ltEventFetchTimeCounter_, OtherMessageBuilder::msgBody(), Header::NEW_INIT_AVAILABLE, NULL, edm::CPUTimer::realTime(), receivedEvents_, edm::CPUTimer::reset(), stor::setopt(), setTime2Now(), OtherMessageBuilder::size(), smRegMap_, edm::CPUTimer::start(), OtherMessageBuilder::startAddress(), stEventFetchTimeCounter_, and edm::CPUTimer::stop().

Referenced by getEventFromAllSM().

00840   {
00841     // See if we will exceed the request rate, if so just return false
00842     // Return values: 
00843     //    true = we have an event; false = no event (whatever reason)
00844     // time2wait values:
00845     //    0.0 = we pinged this SM this time; >0 = did not ping, wait this time
00846     // if every SM returns false we sleep some time
00847     time2wait = getTime2Wait(smURL);
00848     if(time2wait > 0.0) {
00849       return false;
00850     } else {
00851       setTime2Now(smURL);
00852     }
00853 
00854     // One single try to get a event from this SM URL
00855     stor::ReadData data;
00856 
00857     // start a measurement of how long the HTTP POST takes
00858     eventFetchTimer_.stop();
00859     eventFetchTimer_.reset();
00860     eventFetchTimer_.start();
00861 
00862     data.d_.clear();
00863     CURL* han = curl_easy_init();
00864     if(han==0)
00865     {
00866       edm::LogError("getOneEventFromSM") << "Could not create curl handle";
00867       // this is a fatal error isn't it? Are we catching this? TODO
00868       throw cms::Exception("getOneEventFromSM","DataProcessManager")
00869           << "Unable to create curl handle\n";
00870     }
00871     // set the standard https request options
00872     std::string url2use = smURL + eventpage_;
00873     setopt(han,CURLOPT_URL,url2use.c_str());
00874     setopt(han,CURLOPT_WRITEFUNCTION,stor::func);
00875     setopt(han,CURLOPT_WRITEDATA,&data);
00876 
00877     // send our consumer ID as part of the event request
00878     // The event request body consists of the consumerId and the
00879     // number of INIT messages in our collection.  The latter is used
00880     // to determine if we need to re-fetch the INIT message collection.
00881     char msgBuff[100];
00882     OtherMessageBuilder requestMessage(&msgBuff[0], Header::EVENT_REQUEST,
00883                                        2 * sizeof(char_uint32));
00884     uint8 *bodyPtr = requestMessage.msgBody();
00885     convert(smRegMap_[smURL], bodyPtr);
00886     bodyPtr += sizeof(char_uint32);
00887     convert((uint32) initMsgCollection_->size(), bodyPtr);
00888     setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00889     setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00890     struct curl_slist *headers=NULL;
00891     headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00892     headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00893     stor::setopt(han, CURLOPT_HTTPHEADER, headers);
00894 
00895     // send the HTTP POST, read the reply, and cleanup before going on
00896     CURLcode messageStatus = curl_easy_perform(han);
00897     curl_slist_free_all(headers);
00898     curl_easy_cleanup(han);
00899 
00900     if(messageStatus!=0)
00901     { 
00902       cerr << "curl perform failed for event" << endl;
00903       edm::LogError("getOneEventFromSM") << "curl perform failed for event. "
00904         << "Could not get event from an already registered Storage Manager"
00905         << " at " << smURL;
00906 
00907       // keep statistics for all HTTP POSTS
00908       eventFetchTimer_.stop();
00909       ltEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00910       stEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00911 
00912       return false;
00913     }
00914 
00915     // rely on https transfer string of correct length!
00916     int len = data.d_.length();
00917     FDEBUG(9) << "getOneEventFromSM received len = " << len << std::endl;
00918     if(data.d_.length() == 0)
00919     { 
00920       // keep statistics for all HTTP POSTS
00921       eventFetchTimer_.stop();
00922       ltEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00923       stEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00924 
00925       return false;
00926     }
00927 
00928     buf_.resize(len);
00929     for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00930 
00931     // keep statistics for all HTTP POSTS
00932     eventFetchTimer_.stop();
00933     ltEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00934     stEventFetchTimeCounter_->addSample(eventFetchTimer_.realTime());
00935 
00936     // first check if done message
00937     OtherMessageView msgView(&buf_[0]);
00938 
00939     if (msgView.code() == Header::DONE) {
00940       // TODO fixme:just print message for now
00941       std::cout << " SM " << smURL << " has halted" << std::endl;
00942       return false;
00943     } else if (msgView.code() == Header::NEW_INIT_AVAILABLE) {
00944       std::cout << "Received NEW_INIT_AVAILABLE message" << std::endl;
00945       headerRefetchRequested_ = true;
00946       return false;
00947     } else {
00948       // 05-Feb-2008, KAB:  catch (and rethrow) any exceptions decoding
00949       // the event data so that we can display the returned HTML and
00950       // (hopefully) give the user a hint as to the cause of the problem.
00951       try {
00952         HeaderView hdrView(&buf_[0]);
00953         if (hdrView.code() != Header::EVENT) {
00954           throw cms::Exception("getOneEventFromSM", "DataProcessManager");
00955         }
00956         EventMsgView eventView(&buf_[0]);
00957         ++receivedEvents_;
00958         addMeasurement((unsigned long)data.d_.length());
00959         if(eventServer_.get() != NULL) {
00960           eventServer_->processEvent(eventView);
00961           return true;
00962         }
00963       }
00964       catch (cms::Exception excpt) {
00965         const unsigned int MAX_DUMP_LENGTH = 1000;
00966         edm::LogError("getOneEventFromSM") << "========================================";
00967         edm::LogError("getOneEventFromSM") << "Exception decoding the getEventData response!";
00968         if (data.d_.length() <= MAX_DUMP_LENGTH) {
00969           edm::LogError("getOneEventFromSM") << "Here is the raw text that was returned:";
00970           edm::LogError("getOneEventFromSM") << data.d_;
00971         }
00972         else {
00973           edm::LogError("getOneEventFromSM") << "Here are the first " << MAX_DUMP_LENGTH <<
00974             " characters of the raw text that was returned:";
00975           edm::LogError("getOneEventFromSM") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00976         }
00977         edm::LogError("getOneEventFromSM") << "========================================";
00978         throw excpt;
00979       }
00980     }
00981     return false;
00982   }

double stor::DataProcessManager::getSampleCount ( STATS_TIME_FRAME  timeFrame = SHORT_TERM,
STATS_TIMING_TYPE  timingType = EVENT_FETCH,
double  currentTime = BaseCounter::getCurrentTime() 
)

Definition at line 1163 of file DataProcessManager.cc.

References DQMEVENT_FETCH, ltDQMFetchTimeCounter_, ltEventFetchTimeCounter_, SHORT_TERM, stDQMFetchTimeCounter_, and stEventFetchTimeCounter_.

01166   {
01167     if (timeFrame == SHORT_TERM) {
01168       if (timingType == DQMEVENT_FETCH) {
01169         return stDQMFetchTimeCounter_->getSampleCount(currentTime);
01170       }
01171       else {
01172         return stEventFetchTimeCounter_->getSampleCount(currentTime);
01173       }
01174     }
01175     else {
01176       if (timingType == DQMEVENT_FETCH) {
01177         return ltDQMFetchTimeCounter_->getSampleCount();
01178       }
01179       else {
01180         return ltEventFetchTimeCounter_->getSampleCount();
01181       }
01182     }
01183   }

stor::SMPerfStats stor::DataProcessManager::getStats (  )  [inline]

Definition at line 111 of file DataProcessManager.h.

References stats_.

00111 { return stats_; }

double stor::DataProcessManager::getTime2Wait ( std::string  smURL  )  [private]

Definition at line 811 of file DataProcessManager.cc.

References lastReqMap_, and minEventRequestInterval_.

Referenced by getOneEventFromSM().

00812   {
00813     // calculate time since last ping of this SM in seconds
00814     struct timeval now;
00815     struct timezone dummyTZ;
00816     gettimeofday(&now, &dummyTZ);
00817     double timeDiff = (double) now.tv_sec;
00818     timeDiff -= (double) lastReqMap_[smURL].tv_sec;
00819     timeDiff += ((double) now.tv_usec / 1000000.0);
00820     timeDiff -= ((double) lastReqMap_[smURL].tv_usec / 1000000.0);
00821     if (timeDiff < minEventRequestInterval_)
00822     {
00823       return (minEventRequestInterval_ - timeDiff);
00824     }
00825     else
00826     {
00827       return 0.0;
00828     }
00829   }

bool stor::DataProcessManager::haveHeader (  ) 

Definition at line 778 of file DataProcessManager.cc.

References initMsgCollection_.

Referenced by getEventFromAllSM(), and processCommands().

00779   {
00780     if(initMsgCollection_->size() > 0) return true;
00781     return false;
00782   }

bool stor::DataProcessManager::haveRegWithDQMServer (  ) 

Definition at line 767 of file DataProcessManager.cc.

References DQMsmList_, DQMsmRegMap_, and i.

00768   {
00769     // registered with any of the SM DQM servers
00770     if(DQMsmList_.size() > 0) {
00771       for(unsigned int i = 0; i < DQMsmList_.size(); ++i) {
00772         if(DQMsmRegMap_[DQMsmList_[i] ] > 0) return true;
00773       }
00774     }
00775     return false;
00776   }

bool stor::DataProcessManager::haveRegWithEventServer (  ) 

Definition at line 756 of file DataProcessManager.cc.

References i, smList_, and smRegMap_.

00757   {
00758     // registered with any of the SM event servers
00759     if(smList_.size() > 0) {
00760       for(unsigned int i = 0; i < smList_.size(); ++i) {
00761         if(smRegMap_[smList_[i] ] > 0) return true;
00762       }
00763     }
00764     return false;
00765   }

void stor::DataProcessManager::init ( void   )  [private]

Definition at line 54 of file DataProcessManager.cc.

References alreadyRegistered_, alreadyRegisteredDQM_, consumerId_, consumerName_, consumerPriority_, consumerTopFolderName_, DQMconsumerId_, DQMconsumerName_, DQMconsumerPriority_, DQMeventpage_, DQMregpage_, DQMsmList_, DQMsmRegMap_, eventpage_, stor::SMPerfStats::fullReset(), headerpage_, headerRefetchRequested_, stor::SMPerformanceMeter::init(), ltDQMFetchTimeCounter_, ltEventFetchTimeCounter_, period4samples_, pmeter_, stor::PROXY_SERVER_NAME(), receivedDQMEvents_, receivedEvents_, regpage_, samples_, setMaxDQMEventRequestRate(), setMaxEventRequestRate(), smHeaderMap_, smList_, smRegMap_, stats_, stDQMFetchTimeCounter_, and stEventFetchTimeCounter_.

Referenced by DataProcessManager().

00055   {
00056     regpage_ =  "/registerConsumer";
00057     DQMregpage_ = "/registerDQMConsumer";
00058     eventpage_ = "/geteventdata";
00059     DQMeventpage_ = "/getDQMeventdata";
00060     headerpage_ = "/getregdata";
00061     consumerName_ = stor::PROXY_SERVER_NAME;
00062     //consumerPriority_ = "PushMode"; // this means push mode!
00063     consumerPriority_ = "Normal";
00064     DQMconsumerName_ = stor::PROXY_SERVER_NAME;
00065     //DQMconsumerPriority_ =  "PushMode"; // this means push mode!
00066     DQMconsumerPriority_ =  "Normal";
00067 
00068     this->setMaxEventRequestRate(10.0); // just a default until set in config action
00069     consumerId_ = (time(0) & 0xffffff);  // temporary - will get from ES later
00070 
00071     this->setMaxDQMEventRequestRate(0.2); // set in config later
00072     DQMconsumerId_ = (time(0) & 0xffffff);  // temporary - will get from ES later
00073 
00074     alreadyRegistered_ = false;
00075     alreadyRegisteredDQM_ = false;
00076     headerRefetchRequested_ = false;
00077 
00078     smList_.clear();
00079     smRegMap_.clear();
00080     smHeaderMap_.clear();
00081     DQMsmList_.clear();
00082     DQMsmRegMap_.clear();
00083 
00084     // TODO fixme: only request folders that connected consumers want?
00085     consumerTopFolderName_ = "*";
00086     //consumerTopFolderName_ = "C1";
00087     receivedEvents_ = 0;
00088     receivedDQMEvents_ = 0;
00089     pmeter_->init(samples_, period4samples_);
00090     stats_.fullReset();
00091 
00092     // initialize the counters that we use for statistics
00093     ltEventFetchTimeCounter_.reset(new ForeverCounter());
00094     stEventFetchTimeCounter_.reset(new RollingIntervalCounter(180,5,20));
00095     ltDQMFetchTimeCounter_.reset(new ForeverCounter());
00096     stDQMFetchTimeCounter_.reset(new RollingIntervalCounter(180,5,20));
00097   }

void stor::DataProcessManager::join (  ) 

Definition at line 189 of file DataProcessManager.cc.

References me_.

00190   {
00191     // invoked from a different thread - block until "me_" is done
00192     if(me_) me_->join();
00193   }

unsigned long stor::DataProcessManager::period4samples (  )  [inline]

Definition at line 116 of file DataProcessManager.h.

References stor::SMPerformanceMeter::getPeriod4Samples(), and pmeter_.

00116 { return pmeter_->getPeriod4Samples(); }

void stor::DataProcessManager::processCommands (  )  [private]

Definition at line 195 of file DataProcessManager.cc.

References alreadyRegistered_, alreadyRegisteredDQM_, edm::EventBuffer::OperateBuffer< T >::buffer(), cmd_q_, count, dqmServiceManager_, edm::EventBuffer::empty(), getDQMEventFromAllSM(), getEventFromAllSM(), getHeaderFromAllSM(), haveHeader(), headerRefetchRequested_, NULL, registerWithAllDQMSM(), registerWithAllSM(), edm::EventBuffer::OperateBuffer< T >::size(), smHeaderMap_, and waitBetweenRegTrys().

Referenced by run().

00196   {
00197     // called with this data process manager's own thread.
00198     // first register with the SM for each subfarm
00199     bool doneWithRegistration = false;
00200     // TODO fixme: improve method of hardcored fixed retries
00201     unsigned int count = 0; // keep of count of tries and quit after 255
00202     unsigned int maxcount = 255;
00203     bool doneWithDQMRegistration = false;
00204     unsigned int countDQM = 0; // keep of count of tries and quit after 255
00205     bool alreadysaid = false;
00206     bool alreadysaidDQM = false;
00207 
00208     //bool gotOneHeader = false;
00209     bool gotOneHeaderFromAll = false;
00210     unsigned int countINIT = 0; // keep of count of tries and quit after 255
00211     bool alreadysaidINIT = false;
00212 
00213     bool DoneWithJob = false;
00214     while(!DoneWithJob)
00215     {
00216       // work loop
00217       // if a header re-fetch has been requested, reset the header vars
00218       if (headerRefetchRequested_) {
00219         headerRefetchRequested_ = false;
00220         //gotOneHeader = false;
00221         gotOneHeaderFromAll = false;
00222         smHeaderMap_.clear();
00223         countINIT = 0;
00224       }
00225       // register as event consumer to all SM senders
00226       if(!alreadyRegistered_) {
00227         if(!doneWithRegistration)
00228         {
00229           waitBetweenRegTrys();
00230           bool success = registerWithAllSM();
00231           if(success) doneWithRegistration = true;
00232           ++count;
00233         }
00234         // TODO fixme: decide what to do after max tries
00235         if(count >= maxcount) edm::LogInfo("processCommands") << "Could not register with all SM Servers"
00236            << " after " << maxcount << " tries";
00237         if(doneWithRegistration && !alreadysaid) {
00238           edm::LogInfo("processCommands") << "Registered with all SM Event Servers";
00239           alreadysaid = true;
00240         }
00241         if(doneWithRegistration) alreadyRegistered_ = true;
00242       }
00243       // now register as DQM consumers
00244       if(!alreadyRegisteredDQM_) {
00245         if(!doneWithDQMRegistration)
00246         {
00247           waitBetweenRegTrys();
00248           bool success = registerWithAllDQMSM();
00249           if(success) doneWithDQMRegistration = true;
00250           ++countDQM;
00251         }
00252         // TODO fixme: decide what to do after max tries
00253         if(count >= maxcount) edm::LogInfo("processCommands") << "Could not register with all SM DQMEvent Servers"
00254           << " after " << maxcount << " tries";
00255         if(doneWithDQMRegistration && !alreadysaidDQM) {
00256           edm::LogInfo("processCommands") << "Registered with all SM DQMEvent Servers";
00257           alreadysaidDQM = true;
00258         }
00259         if(doneWithDQMRegistration) alreadyRegisteredDQM_ = true;
00260       }
00261       // now get one INIT header (product registry) and save it
00262       // as long as at least one SMsender registered with
00263       // TODO fixme: use the data member for got header to go across runs
00264       // With multiple SMs, we need to get a Header from each else that consumer
00265       // is counted as not initialized
00266       // TODO how to we get all INIT messages from each SM (and know it!)
00267       //if(!gotOneHeader)
00268       if(!gotOneHeaderFromAll)
00269       {
00270         waitBetweenRegTrys();
00271         //bool success = getAnyHeaderFromSM();
00272         bool success = getHeaderFromAllSM();
00273         //if(success) gotOneHeader = true;
00274         if(success) gotOneHeaderFromAll = true;
00275         ++countINIT;
00276       }
00277       if(countINIT >= maxcount) edm::LogInfo("processCommands") << "Could not get product registry!"
00278           << " after " << maxcount << " tries";
00279       //if(gotOneHeader && !alreadysaidINIT) {
00280       if(gotOneHeaderFromAll && !alreadysaidINIT) {
00281         edm::LogInfo("processCommands") << "Got the product registry";
00282         alreadysaidINIT = true;
00283       }
00284       //if(alreadyRegistered_ && gotOneHeader && haveHeader()) {
00285       if(alreadyRegistered_ && gotOneHeaderFromAll && haveHeader()) {
00286         getEventFromAllSM();
00287       }
00288       if(alreadyRegisteredDQM_) {
00289         getDQMEventFromAllSM();
00290       }
00291 
00292       // check for any commands - empty() does not block
00293       if(!cmd_q_->empty())
00294       {
00295         // the next line blocks until there is an entry in cmd_q
00296         edm::EventBuffer::ConsumerBuffer cb(*cmd_q_);
00297         MsgCode mc(cb.buffer(),cb.size());
00298 
00299         if(mc.getCode()==MsgCode::DONE) DoneWithJob = true;
00300         // right now we will ignore all messages other than DONE
00301       }
00302 
00303     } // done with process loop   
00304     edm::LogInfo("processCommands") << "Received done - stopping";
00305     if(dqmServiceManager_.get() != NULL) dqmServiceManager_->stop();
00306   }

unsigned long stor::DataProcessManager::receivedDQMevents (  )  [inline]

Definition at line 110 of file DataProcessManager.h.

References receivedDQMEvents_.

00110 { return receivedDQMEvents_; }

unsigned long stor::DataProcessManager::receivedevents (  )  [inline]

Definition at line 109 of file DataProcessManager.h.

References receivedEvents_.

00109 { return receivedEvents_; }

bool stor::DataProcessManager::registerWithAllDQMSM (  )  [private]

Definition at line 369 of file DataProcessManager.cc.

References DQMsmList_, DQMsmRegMap_, i, and registerWithDQMSM().

Referenced by processCommands().

00370   {
00371     // One try at registering with the SM on each subfarm
00372     // return true if registered with all SM 
00373     // Only make one attempt and return so we can make this thread stop
00374     if(DQMsmList_.size() == 0) return false;
00375     bool allRegistered = true;
00376     for(unsigned int i = 0; i < DQMsmList_.size(); ++i) {
00377       if(DQMsmRegMap_[DQMsmList_[i] ] > 0) continue; // already registered
00378       int consumerid = registerWithDQMSM(DQMsmList_[i]);
00379       if(consumerid > 0) DQMsmRegMap_[DQMsmList_[i] ] = consumerid;
00380       else allRegistered = false;
00381     }
00382     return allRegistered;
00383   }

bool stor::DataProcessManager::registerWithAllSM (  )  [private]

Definition at line 353 of file DataProcessManager.cc.

References i, registerWithSM(), smList_, and smRegMap_.

Referenced by processCommands().

00354   {
00355     // One try at registering with the SM on each subfarm
00356     // return true if registered with all SM 
00357     // Only make one attempt and return so we can make this thread stop
00358     if(smList_.size() == 0) return false;
00359     bool allRegistered = true;
00360     for(unsigned int i = 0; i < smList_.size(); ++i) {
00361       if(smRegMap_[smList_[i] ] > 0) continue; // already registered
00362       int consumerid = registerWithSM(smList_[i]);
00363       if(consumerid > 0) smRegMap_[smList_[i] ] = consumerid;
00364       else allRegistered = false;
00365     }
00366     return allRegistered;
00367   }

int stor::DataProcessManager::registerWithDQMSM ( std::string  smURL  )  [private]

Definition at line 472 of file DataProcessManager.cc.

References buf_, TestMuL1L2Filter_cff::cerr, consumerTopFolderName_, stor::ReadData::d_, data, DQMconsumerName_, DQMconsumerPriority_, DQMregpage_, lat::endl(), ConsRegResponseBuilder::ES_NOT_READY, Exception, FDEBUG, stor::func(), i, len, NULL, stor::setopt(), ConsRegRequestBuilder::size(), and ConsRegRequestBuilder::startAddress().

Referenced by registerWithAllDQMSM().

00473   {
00474     // Use this same registration method for both event data and DQM data
00475     // return with consumerID or 0 for failure
00476     stor::ReadData data;
00477 
00478     data.d_.clear();
00479     CURL* han = curl_easy_init();
00480     if(han==0)
00481     {
00482       edm::LogError("registerWithDQMSM") << "Could not create curl handle";
00483       // this is a fatal error isn't it? Are we catching this? TODO
00484       throw cms::Exception("registerWithDQMSM","DataProcessManager")
00485           << "Unable to create curl handle\n";
00486     }
00487     // set the standard https request options
00488     std::string url2use = smURL + DQMregpage_;
00489     setopt(han,CURLOPT_URL,url2use.c_str());
00490     setopt(han,CURLOPT_WRITEFUNCTION,func);
00491     setopt(han,CURLOPT_WRITEDATA,&data);
00492 
00493     // build the registration request message to send to the storage manager
00494     const int BUFFER_SIZE = 2000;
00495     char msgBuff[BUFFER_SIZE];
00496     ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, DQMconsumerName_,
00497                                          DQMconsumerPriority_, consumerTopFolderName_);
00498 
00499     // add the request message as a https post
00500     setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00501     setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00502     struct curl_slist *headers=NULL;
00503     headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00504     headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00505     setopt(han, CURLOPT_HTTPHEADER, headers);
00506 
00507     // send the HTTP POST, read the reply, and cleanup before going on
00508     CURLcode messageStatus = curl_easy_perform(han);
00509     curl_slist_free_all(headers);
00510     curl_easy_cleanup(han);
00511 
00512     if(messageStatus!=0)
00513     {
00514       cerr << "curl perform failed for DQM registration" << endl;
00515       edm::LogError("registerWithDQMSM") << "curl perform failed for registration. "
00516         << "Could not register with DQM: probably XDAQ not running on Storage Manager"
00517         << " at " << smURL;
00518       return 0;
00519     }
00520     uint32 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00521     int consumerId = 0;
00522     if(data.d_.length() > 0)
00523     {
00524       int len = data.d_.length();
00525       FDEBUG(9) << "registerWithDQMSM received len = " << len << std::endl;
00526       buf_.resize(len);
00527       for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00528 
00529       try {
00530         ConsRegResponseView respView(&buf_[0]);
00531         registrationStatus = respView.getStatus();
00532         consumerId = respView.getConsumerId();
00533       }
00534       catch (cms::Exception excpt) {
00535         const unsigned int MAX_DUMP_LENGTH = 1000;
00536         edm::LogError("registerWithDQMSM") << "========================================";
00537         edm::LogError("registerWithDQMSM") << "Exception decoding the registerWithSM response!";
00538         if (data.d_.length() <= MAX_DUMP_LENGTH) {
00539           edm::LogError("registerWithDQMSM") << "Here is the raw text that was returned:";
00540           edm::LogError("registerWithDQMSM") << data.d_;
00541         }
00542         else {
00543           edm::LogError("registerWithDQMSM") << "Here are the first " << MAX_DUMP_LENGTH <<
00544             " characters of the raw text that was returned:";
00545           edm::LogError("registerWithDQMSM") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00546         }
00547         edm::LogError("registerWithDQMSM") << "========================================";
00548         return 0;
00549       }
00550     }
00551     if(registrationStatus == ConsRegResponseBuilder::ES_NOT_READY) return 0;
00552     FDEBUG(5) << "Consumer ID = " << consumerId << endl;
00553     return consumerId;
00554   }

int stor::DataProcessManager::registerWithSM ( std::string  smURL  )  [private]

Definition at line 385 of file DataProcessManager.cc.

References buf_, TestMuL1L2Filter_cff::cerr, consumerName_, consumerPriority_, consumerPSetString_, stor::ReadData::d_, data, lat::endl(), ConsRegResponseBuilder::ES_NOT_READY, eventServer_, Exception, FDEBUG, stor::func(), i, len, NULL, regpage_, stor::setopt(), ConsRegRequestBuilder::size(), and ConsRegRequestBuilder::startAddress().

Referenced by registerWithAllSM().

00386   {
00387     // Use this same registration method for both event data and DQM data
00388     // return with consumerID or 0 for failure
00389     stor::ReadData data;
00390 
00391     data.d_.clear();
00392     CURL* han = curl_easy_init();
00393     if(han==0)
00394     {
00395       edm::LogError("registerWithSM") << "Could not create curl handle";
00396       // this is a fatal error isn't it? Are we catching this? TODO
00397       throw cms::Exception("registerWithSM","DataProcessManager")
00398           << "Unable to create curl handle\n";
00399     }
00400     // set the standard https request options
00401     std::string url2use = smURL + regpage_;
00402     setopt(han,CURLOPT_URL,url2use.c_str());
00403     setopt(han,CURLOPT_WRITEFUNCTION,func);
00404     setopt(han,CURLOPT_WRITEDATA,&data);
00405 
00406     // build the registration request message to send to the storage manager
00407     const int BUFFER_SIZE = 2000;
00408     char msgBuff[BUFFER_SIZE];
00409     ConsRegRequestBuilder requestMessage(msgBuff, BUFFER_SIZE, consumerName_,
00410                                          consumerPriority_, consumerPSetString_);
00411 
00412     // add the request message as a https post
00413     setopt(han, CURLOPT_POSTFIELDS, requestMessage.startAddress());
00414     setopt(han, CURLOPT_POSTFIELDSIZE, requestMessage.size());
00415     struct curl_slist *headers=NULL;
00416     headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
00417     headers = curl_slist_append(headers, "Content-Transfer-Encoding: binary");
00418     setopt(han, CURLOPT_HTTPHEADER, headers);
00419 
00420     // send the HTTP POST, read the reply, and cleanup before going on
00421     CURLcode messageStatus = curl_easy_perform(han);
00422     curl_slist_free_all(headers);
00423     curl_easy_cleanup(han);
00424 
00425     if(messageStatus!=0)
00426     {
00427       cerr << "curl perform failed for registration" << endl;
00428       edm::LogError("registerWithSM") << "curl perform failed for registration. "
00429         << "Could not register: probably XDAQ not running on Storage Manager"
00430         << " at " << smURL;
00431       return 0;
00432     }
00433     uint32 registrationStatus = ConsRegResponseBuilder::ES_NOT_READY;
00434     int consumerId = 0;
00435     if(data.d_.length() > 0)
00436     {
00437       int len = data.d_.length();
00438       FDEBUG(9) << "registerWithSM received len = " << len << std::endl;
00439       buf_.resize(len);
00440       for (int i=0; i<len ; i++) buf_[i] = data.d_[i];
00441 
00442       try {
00443         ConsRegResponseView respView(&buf_[0]);
00444         registrationStatus = respView.getStatus();
00445         consumerId = respView.getConsumerId();
00446         if (eventServer_.get() != NULL) {
00447           eventServer_->setStreamSelectionTable(respView.getStreamSelectionTable());
00448         }
00449       }
00450       catch (cms::Exception excpt) {
00451         const unsigned int MAX_DUMP_LENGTH = 1000;
00452         edm::LogError("registerWithSM") << "========================================";
00453         edm::LogError("registerWithSM") << "Exception decoding the registerWithSM response!";
00454         if (data.d_.length() <= MAX_DUMP_LENGTH) {
00455           edm::LogError("registerWithSM") << "Here is the raw text that was returned:";
00456           edm::LogError("registerWithSM") << data.d_;
00457         }
00458         else {
00459           edm::LogError("registerWithSM") << "Here are the first " << MAX_DUMP_LENGTH <<
00460             " characters of the raw text that was returned:";
00461           edm::LogError("registerWithSM") << (data.d_.substr(0, MAX_DUMP_LENGTH));
00462         }
00463         edm::LogError("registerWithSM") << "========================================";
00464         return 0;
00465       }
00466     }
00467     if(registrationStatus == ConsRegResponseBuilder::ES_NOT_READY) return 0;
00468     FDEBUG(5) << "Consumer ID = " << consumerId << endl;
00469     return consumerId;
00470   }

void stor::DataProcessManager::run ( DataProcessManager t  )  [static, private]

Definition at line 166 of file DataProcessManager.cc.

References processCommands().

Referenced by start().

00167   {
00168     t->processCommands();
00169   }

unsigned long stor::DataProcessManager::samples (  )  [inline]

Definition at line 115 of file DataProcessManager.h.

References stor::SMPerformanceMeter::getSetSamples(), and pmeter_.

00115 { return pmeter_->getSetSamples(); }

void stor::DataProcessManager::setArchiveDQM ( bool  archiveDQM  )  [inline]

Definition at line 67 of file DataProcessManager.h.

References dqmServiceManager_.

00068     { dqmServiceManager_->setArchiveDQM(archiveDQM); }

void stor::DataProcessManager::setArchiveIntervalDQM ( int  archiveInterval  )  [inline]

Definition at line 70 of file DataProcessManager.h.

References dqmServiceManager_.

00071     { dqmServiceManager_->setArchiveInterval(archiveInterval); }

void stor::DataProcessManager::setCollateDQM ( bool  collateDQM  )  [inline]

Definition at line 64 of file DataProcessManager.h.

References dqmServiceManager_.

00065     { dqmServiceManager_->setCollateDQM(collateDQM); }

void stor::DataProcessManager::setCompressionLevelDQM ( int  compressionLevelDQM  )  [inline]

Definition at line 85 of file DataProcessManager.h.

References dqmServiceManager_.

00086     { dqmServiceManager_->setCompressionLevel(compressionLevelDQM);}

void stor::DataProcessManager::setConsumerName ( std::string  s  )  [inline]

Definition at line 99 of file DataProcessManager.h.

References consumerName_.

00099 { consumerName_ = s; }

void stor::DataProcessManager::setDQMConsumerName ( std::string  s  )  [inline]

Definition at line 100 of file DataProcessManager.h.

References DQMconsumerName_.

00100 { DQMconsumerName_ = s; }

void stor::DataProcessManager::setDQMEventServer ( boost::shared_ptr< DQMEventServer > &  es  )  [inline]

Definition at line 88 of file DataProcessManager.h.

References DQMeventServer_, dqmServiceManager_, and NULL.

00089     {
00090       // The auto_ptr still owns the memory after this get()
00091       if (dqmServiceManager_.get() != NULL) dqmServiceManager_->setDQMEventServer(es);
00092       DQMeventServer_ = es;
00093     }

void stor::DataProcessManager::setDQMTime2Now ( std::string  smURL  )  [private]

Definition at line 1034 of file DataProcessManager.cc.

References lastDQMReqMap_.

Referenced by getOneDQMEventFromSM().

01035   {
01036     struct timeval now;
01037     struct timezone dummyTZ;
01038     gettimeofday(&now, &dummyTZ);
01039     lastDQMReqMap_[smURL] = now;
01040   }

void stor::DataProcessManager::setEventServer ( boost::shared_ptr< EventServer > &  es  )  [inline]

Definition at line 47 of file DataProcessManager.h.

References eventServer_.

00048     {
00049       eventServer_ = es;
00050     }

void stor::DataProcessManager::setFilePrefixDQM ( std::string  filePrefixDQM  )  [inline]

Definition at line 79 of file DataProcessManager.h.

References dqmServiceManager_.

00080     { dqmServiceManager_->setFilePrefix(filePrefixDQM);}

void stor::DataProcessManager::setInitMsgCollection ( boost::shared_ptr< InitMsgCollection > &  imColl  )  [inline]

Definition at line 53 of file DataProcessManager.h.

References initMsgCollection_.

00054     {
00055       initMsgCollection_ = imColl;
00056     }

void stor::DataProcessManager::setMaxDQMEventRequestRate ( double  rate  ) 

Definition at line 119 of file DataProcessManager.cc.

References minDQMEventRequestInterval_.

Referenced by init().

00120   {
00121     const double MAX_REQUEST_INTERVAL = 300.0;  // seconds
00122     if(rate <= 0.0) return; // TODO make sure config is checked!
00123     if (rate < (1.0 / MAX_REQUEST_INTERVAL)) {
00124       minDQMEventRequestInterval_ = MAX_REQUEST_INTERVAL;
00125     }
00126     else {
00127       minDQMEventRequestInterval_ = 1.0 / rate;  // seconds
00128     }
00129   }

void stor::DataProcessManager::setMaxEventRequestRate ( double  rate  ) 

Definition at line 112 of file DataProcessManager.cc.

References maxEventRequestRate_, and updateMinEventRequestInterval().

Referenced by init().

00113   {
00114     if(rate <= 0.0) return; // TODO make sure config is checked!
00115     maxEventRequestRate_ = rate;
00116     updateMinEventRequestInterval();
00117   }

void stor::DataProcessManager::setPeriod4Samples ( unsigned long  period4samples  ) 

Definition at line 105 of file DataProcessManager.cc.

References period4samples_, pmeter_, and stor::SMPerformanceMeter::setPeriod4Samples().

void stor::DataProcessManager::setPurgeTimeDQM ( int  purgeTimeDQM  )  [inline]

Definition at line 73 of file DataProcessManager.h.

References dqmServiceManager_.

00074     { dqmServiceManager_->setPurgeTime(purgeTimeDQM);}

void stor::DataProcessManager::setReadyTimeDQM ( int  readyTimeDQM  )  [inline]

Definition at line 76 of file DataProcessManager.h.

References dqmServiceManager_.

00077     { dqmServiceManager_->setReadyTime(readyTimeDQM);}

void stor::DataProcessManager::setSamples ( unsigned long  num_samples  ) 

Definition at line 99 of file DataProcessManager.cc.

References pmeter_, samples_, and stor::SMPerformanceMeter::setSamples().

00100   { 
00101     samples_ = num_samples;
00102     pmeter_->setSamples(num_samples); 
00103   }

void stor::DataProcessManager::setTime2Now ( std::string  smURL  )  [private]

Definition at line 831 of file DataProcessManager.cc.

References lastReqMap_.

Referenced by getOneEventFromSM().

00832   {
00833     struct timeval now;
00834     struct timezone dummyTZ;
00835     gettimeofday(&now, &dummyTZ);
00836     lastReqMap_[smURL] = now;
00837   }

void stor::DataProcessManager::setUseCompressionDQM ( bool  useCompressionDQM  )  [inline]

Definition at line 82 of file DataProcessManager.h.

References dqmServiceManager_.

00083     { dqmServiceManager_->setUseCompression(useCompressionDQM);}

void stor::DataProcessManager::start ( void   ) 

Definition at line 171 of file DataProcessManager.cc.

References me_, run(), and thread.

00172   {
00173     // called from a different thread to start things going
00174 
00175     me_.reset(new boost::thread(boost::bind(DataProcessManager::run,this)));
00176   }

void stor::DataProcessManager::stop (  ) 

Definition at line 178 of file DataProcessManager.cc.

References edm::EventBuffer::OperateBuffer< T >::buffer(), cmd_q_, and edm::EventBuffer::OperateBuffer< T >::commit().

00179   {
00180     // called from a different thread - trigger completion to the
00181     // data process manager loop
00182 
00183     edm::EventBuffer::ProducerBuffer cb(*cmd_q_);
00184     MsgCode mc(cb.buffer(),MsgCode::DONE);
00185     mc.setCode(MsgCode::DONE);
00186     cb.commit(mc.codeSize());
00187   }

double stor::DataProcessManager::totalvolumemb (  )  [inline]

Definition at line 117 of file DataProcessManager.h.

References pmeter_, and stor::SMPerformanceMeter::totalvolumemb().

00117 { return pmeter_->totalvolumemb(); }

void stor::DataProcessManager::updateMinEventRequestInterval (  ) 

Definition at line 131 of file DataProcessManager.cc.

References consumerPSetString_, edm::ParameterSet::insert(), maxEventRequestRate_, minEventRequestInterval_, smList_, and edm::ParameterSet::toString().

Referenced by addSM2Register(), and setMaxEventRequestRate().

00132   {
00133     const double MAX_REQUEST_INTERVAL = 300.0;  // seconds
00134     double rate = maxEventRequestRate_;
00135 
00136     if (rate < (1.0 / MAX_REQUEST_INTERVAL)) {
00137       minEventRequestInterval_ = MAX_REQUEST_INTERVAL;
00138     }
00139     else {
00140       // base the interval on the number of storage managers
00141       // (so that the requested rate doesn't increase proportionally
00142       // with number of SMs)
00143       // We could simply use smCount/rate, but let's be a bit more
00144       // generous than that so that if one of the SMs isn't queueing
00145       // events for us, we still get enough rate
00146       if (smList_.size() <= 1) {
00147         minEventRequestInterval_ = 1.0 / rate;  // seconds
00148       }
00149       else {
00150         minEventRequestInterval_ = (smList_.size() - 1) / rate;
00151       }
00152     }
00153 
00154     // 16-Apr-2008, KAB: set maxEventRequestRate in the parameterSet that
00155     // we send to the storage manager so that the SM knows how many events
00156     // to queue for the SMPS.
00157     edm::ParameterSet ps = ParameterSet();
00158     Entry maxRateEntry("maxEventRequestRate",
00159                        (1.0 / minEventRequestInterval_),
00160                        false);
00161     ps.insert(true, "maxEventRequestRate", maxRateEntry);
00162     // TODO fixme: only request event types that are requested by connected consumers?
00163     consumerPSetString_ = ps.toString();
00164   }

void stor::DataProcessManager::waitBetweenRegTrys (  )  [private]

Definition at line 749 of file DataProcessManager.cc.

References headerRetryInterval_.

Referenced by processCommands().

00750   {
00751     // for now just a simple wait for a fixed time
00752     sleep(headerRetryInterval_);
00753     return;
00754   }


Member Data Documentation

bool stor::DataProcessManager::alreadyRegistered_ [private]

Definition at line 153 of file DataProcessManager.h.

Referenced by init(), and processCommands().

bool stor::DataProcessManager::alreadyRegisteredDQM_ [private]

Definition at line 154 of file DataProcessManager.h.

Referenced by init(), and processCommands().

std::vector<unsigned char> stor::DataProcessManager::buf_ [private]

Definition at line 156 of file DataProcessManager.h.

Referenced by getOneDQMEventFromSM(), getOneEventFromSM(), registerWithDQMSM(), and registerWithSM().

edm::EventBuffer* stor::DataProcessManager::cmd_q_ [private]

Definition at line 151 of file DataProcessManager.h.

Referenced by getCommandQueue(), processCommands(), and stop().

unsigned int stor::DataProcessManager::consumerId_ [private]

Definition at line 176 of file DataProcessManager.h.

Referenced by init().

std::string stor::DataProcessManager::consumerName_ [private]

Definition at line 170 of file DataProcessManager.h.

Referenced by init(), registerWithSM(), and setConsumerName().

std::string stor::DataProcessManager::consumerPriority_ [private]

Definition at line 171 of file DataProcessManager.h.

Referenced by init(), and registerWithSM().

std::string stor::DataProcessManager::consumerPSetString_ [private]

Definition at line 172 of file DataProcessManager.h.

Referenced by registerWithSM(), and updateMinEventRequestInterval().

std::string stor::DataProcessManager::consumerTopFolderName_ [private]

Definition at line 183 of file DataProcessManager.h.

Referenced by init(), and registerWithDQMSM().

unsigned int stor::DataProcessManager::DQMconsumerId_ [private]

Definition at line 179 of file DataProcessManager.h.

Referenced by init().

std::string stor::DataProcessManager::DQMconsumerName_ [private]

Definition at line 181 of file DataProcessManager.h.

Referenced by init(), registerWithDQMSM(), and setDQMConsumerName().

std::string stor::DataProcessManager::DQMconsumerPriority_ [private]

Definition at line 182 of file DataProcessManager.h.

Referenced by init(), and registerWithDQMSM().

std::string stor::DataProcessManager::DQMeventpage_ [private]

Definition at line 165 of file DataProcessManager.h.

Referenced by getOneDQMEventFromSM(), and init().

boost::shared_ptr<DQMEventServer> stor::DataProcessManager::DQMeventServer_ [private]

Definition at line 189 of file DataProcessManager.h.

Referenced by getDQMEventServer(), and setDQMEventServer().

edm::CPUTimer stor::DataProcessManager::dqmFetchTimer_ [private]

Definition at line 205 of file DataProcessManager.h.

Referenced by getOneDQMEventFromSM().

std::string stor::DataProcessManager::DQMregpage_ [private]

Definition at line 166 of file DataProcessManager.h.

Referenced by init(), and registerWithDQMSM().

boost::shared_ptr<stor::DQMServiceManager> stor::DataProcessManager::dqmServiceManager_ [private]

Definition at line 186 of file DataProcessManager.h.

Referenced by getDQMServiceManager(), getOneDQMEventFromSM(), processCommands(), setArchiveDQM(), setArchiveIntervalDQM(), setCollateDQM(), setCompressionLevelDQM(), setDQMEventServer(), setFilePrefixDQM(), setPurgeTimeDQM(), setReadyTimeDQM(), and setUseCompressionDQM().

std::vector<std::string> stor::DataProcessManager::DQMsmList_ [private]

Definition at line 161 of file DataProcessManager.h.

Referenced by addDQMSM2Register(), haveRegWithDQMServer(), init(), and registerWithAllDQMSM().

RegConsumer_map stor::DataProcessManager::DQMsmRegMap_ [private]

Definition at line 162 of file DataProcessManager.h.

Referenced by addDQMSM2Register(), getDQMEventFromAllSM(), getOneDQMEventFromSM(), haveRegWithDQMServer(), init(), and registerWithAllDQMSM().

edm::CPUTimer stor::DataProcessManager::eventFetchTimer_ [private]

Definition at line 204 of file DataProcessManager.h.

Referenced by getOneDQMEventFromSM(), and getOneEventFromSM().

std::string stor::DataProcessManager::eventpage_ [private]

Definition at line 163 of file DataProcessManager.h.

Referenced by getOneEventFromSM(), and init().

boost::shared_ptr<EventServer> stor::DataProcessManager::eventServer_ [private]

Definition at line 188 of file DataProcessManager.h.

Referenced by getEventServer(), getHeaderFromSM(), getOneEventFromSM(), registerWithSM(), and setEventServer().

std::string stor::DataProcessManager::headerpage_ [private]

Definition at line 167 of file DataProcessManager.h.

Referenced by getHeaderFromSM(), and init().

bool stor::DataProcessManager::headerRefetchRequested_ [private]

Definition at line 155 of file DataProcessManager.h.

Referenced by getOneEventFromSM(), init(), and processCommands().

int stor::DataProcessManager::headerRetryInterval_ [private]

Definition at line 173 of file DataProcessManager.h.

Referenced by waitBetweenRegTrys().

boost::shared_ptr<InitMsgCollection> stor::DataProcessManager::initMsgCollection_ [private]

Definition at line 190 of file DataProcessManager.h.

Referenced by getHeaderFromSM(), getInitMsgCollection(), getOneEventFromSM(), haveHeader(), and setInitMsgCollection().

LastReqTime_map stor::DataProcessManager::lastDQMReqMap_ [private]

Definition at line 180 of file DataProcessManager.h.

Referenced by addDQMSM2Register(), getDQMTime2Wait(), and setDQMTime2Now().

LastReqTime_map stor::DataProcessManager::lastReqMap_ [private]

Definition at line 177 of file DataProcessManager.h.

Referenced by addSM2Register(), getTime2Wait(), and setTime2Now().

boost::shared_ptr<ForeverCounter> stor::DataProcessManager::ltDQMFetchTimeCounter_ [private]

Definition at line 208 of file DataProcessManager.h.

Referenced by getAverageValue(), getDuration(), getOneDQMEventFromSM(), getSampleCount(), and init().

boost::shared_ptr<ForeverCounter> stor::DataProcessManager::ltEventFetchTimeCounter_ [private]

Definition at line 206 of file DataProcessManager.h.

Referenced by getAverageValue(), getDuration(), getOneEventFromSM(), getSampleCount(), and init().

double stor::DataProcessManager::maxEventRequestRate_ [private]

Definition at line 174 of file DataProcessManager.h.

Referenced by setMaxEventRequestRate(), and updateMinEventRequestInterval().

boost::shared_ptr<boost::thread> stor::DataProcessManager::me_ [private]

Definition at line 192 of file DataProcessManager.h.

Referenced by join(), and start().

double stor::DataProcessManager::minDQMEventRequestInterval_ [private]

Definition at line 178 of file DataProcessManager.h.

Referenced by getDQMTime2Wait(), and setMaxDQMEventRequestRate().

double stor::DataProcessManager::minEventRequestInterval_ [private]

Definition at line 175 of file DataProcessManager.h.

Referenced by getTime2Wait(), and updateMinEventRequestInterval().

xdata::UnsignedInteger32 stor::DataProcessManager::period4samples_ [private]

Definition at line 200 of file DataProcessManager.h.

Referenced by init(), and setPeriod4Samples().

stor::SMPerformanceMeter* stor::DataProcessManager::pmeter_ [private]

Definition at line 197 of file DataProcessManager.h.

Referenced by addMeasurement(), DataProcessManager(), init(), period4samples(), samples(), setPeriod4Samples(), setSamples(), totalvolumemb(), and ~DataProcessManager().

unsigned long stor::DataProcessManager::receivedDQMEvents_ [private]

Definition at line 196 of file DataProcessManager.h.

Referenced by getOneDQMEventFromSM(), init(), and receivedDQMevents().

unsigned long stor::DataProcessManager::receivedEvents_ [private]

Definition at line 195 of file DataProcessManager.h.

Referenced by getOneEventFromSM(), init(), and receivedevents().

std::string stor::DataProcessManager::regpage_ [private]

Definition at line 164 of file DataProcessManager.h.

Referenced by init(), and registerWithSM().

xdata::UnsignedInteger32 stor::DataProcessManager::samples_ [private]

Definition at line 199 of file DataProcessManager.h.

Referenced by init(), and setSamples().

HeaderConsumer_map stor::DataProcessManager::smHeaderMap_ [private]

Definition at line 160 of file DataProcessManager.h.

Referenced by addSM2Register(), getHeaderFromAllSM(), init(), and processCommands().

std::vector<std::string> stor::DataProcessManager::smList_ [private]

Definition at line 158 of file DataProcessManager.h.

Referenced by addSM2Register(), getAnyHeaderFromSM(), getDQMEventFromAllSM(), getEventFromAllSM(), getHeaderFromAllSM(), haveRegWithEventServer(), init(), registerWithAllSM(), and updateMinEventRequestInterval().

RegConsumer_map stor::DataProcessManager::smRegMap_ [private]

Definition at line 159 of file DataProcessManager.h.

Referenced by addSM2Register(), getAnyHeaderFromSM(), getEventFromAllSM(), getHeaderFromAllSM(), getHeaderFromSM(), getOneEventFromSM(), haveRegWithEventServer(), init(), and registerWithAllSM().

stor::SMPerfStats stor::DataProcessManager::stats_ [private]

Definition at line 201 of file DataProcessManager.h.

Referenced by addMeasurement(), getStats(), and init().

boost::shared_ptr<RollingIntervalCounter> stor::DataProcessManager::stDQMFetchTimeCounter_ [private]

Definition at line 209 of file DataProcessManager.h.

Referenced by getAverageValue(), getDuration(), getOneDQMEventFromSM(), getSampleCount(), and init().

boost::shared_ptr<RollingIntervalCounter> stor::DataProcessManager::stEventFetchTimeCounter_ [private]

Definition at line 207 of file DataProcessManager.h.

Referenced by getAverageValue(), getDuration(), getOneEventFromSM(), getSampleCount(), and init().

char stor::DataProcessManager::subscriptionurl_[2048] [private]

Definition at line 168 of file DataProcessManager.h.


The documentation for this class was generated from the following files:
Generated on Tue Jun 9 18:52:50 2009 for CMSSW by  doxygen 1.5.4