CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/EventFilter/StorageManager/src/DataSenderMonitorCollection.cc

Go to the documentation of this file.
00001 // $Id: DataSenderMonitorCollection.cc,v 1.20 2012/04/20 10:48:02 mommsen Exp $
00003 
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007 
00008 #include <zlib.h>
00009 #include <boost/lexical_cast.hpp>
00010 
00011 #include "EventFilter/StorageManager/interface/Exception.h"
00012 #include "EventFilter/StorageManager/interface/DataSenderMonitorCollection.h"
00013 
00014 
00015 namespace stor {
00016   
00017   DataSenderMonitorCollection::DataSenderMonitorCollection
00018   (
00019     const utils::Duration_t& updateInterval,
00020     AlarmHandlerPtr ah
00021   ) :
00022   MonitorCollection(updateInterval),
00023   connectedRBs_(0),
00024   connectedEPs_(0),
00025   activeEPs_(0),
00026   outstandingDataDiscards_(0),
00027   outstandingDQMDiscards_(0),
00028   faultyEvents_(0),
00029   ignoredDiscards_(0),
00030   updateInterval_(updateInterval),
00031   alarmHandler_(ah)
00032   {}
00033   
00034   
00035   void DataSenderMonitorCollection::addInitSample(I2OChain const& i2oChain)
00036   {
00037     // sanity checks
00038     if (i2oChain.messageCode() != Header::INIT) {return;}
00039     if (! i2oChain.complete()) {return;}
00040     
00041     // fetch basic data from the I2OChain
00042     std::string outModName = i2oChain.outputModuleLabel();
00043     uint32_t msgSize = i2oChain.totalDataSize();
00044     
00045     // look up the monitoring records that we need
00046     bool pointersAreValid;
00047     RBRecordPtr rbRecordPtr;
00048     FURecordPtr fuRecordPtr;
00049     OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00050     {
00051       boost::mutex::scoped_lock sl(collectionsMutex_);
00052       pointersAreValid = getAllNeededPointers(
00053         i2oChain, rbRecordPtr, fuRecordPtr,
00054         topLevelOutModPtr, rbSpecificOutModPtr,
00055         fuSpecificOutModPtr);
00056     }
00057     
00058     // accumulate the data of interest
00059     if (pointersAreValid)
00060     {
00061       topLevelOutModPtr->name = outModName;
00062       topLevelOutModPtr->initMsgSize = msgSize;
00063       
00064       ++rbRecordPtr->initMsgCount;
00065       rbRecordPtr->nExpectedEPs += i2oChain.nExpectedEPs();
00066       rbSpecificOutModPtr->name = outModName;
00067       rbSpecificOutModPtr->initMsgSize = msgSize;
00068       
00069       ++fuRecordPtr->initMsgCount;
00070       fuSpecificOutModPtr->name = outModName;
00071       fuSpecificOutModPtr->initMsgSize = msgSize;
00072     }
00073   }
00074   
00075   
00076   void DataSenderMonitorCollection::addEventSample(I2OChain const& i2oChain)
00077   {
00078     // sanity checks
00079     if (i2oChain.messageCode() != Header::EVENT) {return;}
00080     if (! i2oChain.complete()) {return;}
00081 
00082     // fetch basic data from the I2OChain
00083     double eventSize = static_cast<double>(i2oChain.totalDataSize());
00084     uint32_t runNumber = i2oChain.runNumber();
00085     uint32_t eventNumber = i2oChain.eventNumber();
00086     
00087     // look up the monitoring records that we need
00088     bool pointersAreValid;
00089     RBRecordPtr rbRecordPtr;
00090     FURecordPtr fuRecordPtr;
00091     OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00092     {
00093       boost::mutex::scoped_lock sl(collectionsMutex_);
00094       pointersAreValid = getAllNeededPointers(
00095         i2oChain, rbRecordPtr, fuRecordPtr,
00096         topLevelOutModPtr, rbSpecificOutModPtr,
00097         fuSpecificOutModPtr);
00098     }
00099     
00100     // accumulate the data of interest
00101     if (pointersAreValid)
00102     {
00103       topLevelOutModPtr->eventSize.addSample(eventSize);
00104       
00105       rbRecordPtr->lastRunNumber = runNumber;
00106       rbRecordPtr->lastEventNumber = eventNumber;
00107       rbRecordPtr->eventSize.addSample(eventSize);
00108       rbSpecificOutModPtr->eventSize.addSample(eventSize);
00109       
00110       fuRecordPtr->lastRunNumber = runNumber;
00111       fuRecordPtr->lastEventNumber = eventNumber;
00112       fuRecordPtr->shortIntervalEventSize.addSample(eventSize);
00113       fuRecordPtr->mediumIntervalEventSize.addSample(eventSize);
00114       fuSpecificOutModPtr->eventSize.addSample(eventSize);
00115     }
00116   }
00117   
00118   
00119   void DataSenderMonitorCollection::addDQMEventSample(I2OChain const& i2oChain)
00120   {
00121     // sanity checks
00122     if (i2oChain.messageCode() != Header::DQM_EVENT) {return;}
00123     if (! i2oChain.complete()) {return;}
00124     
00125     // fetch basic data from the I2OChain
00126     double eventSize = static_cast<double>(i2oChain.totalDataSize());
00127     
00128     // look up the monitoring records that we need
00129     bool pointersAreValid;
00130     RBRecordPtr rbRecordPtr;
00131     FURecordPtr fuRecordPtr;
00132     {
00133       boost::mutex::scoped_lock sl(collectionsMutex_);
00134       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00135       if (pointersAreValid)
00136       {
00137         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00138       }
00139     }
00140     
00141     // accumulate the data of interest
00142     if (pointersAreValid)
00143     {
00144       rbRecordPtr->dqmEventSize.addSample(eventSize);
00145       fuRecordPtr->dqmEventSize.addSample(eventSize);
00146     }
00147   }
00148   
00149   
00150   void DataSenderMonitorCollection::addErrorEventSample(I2OChain const& i2oChain)
00151   {
00152     // sanity checks
00153     if (i2oChain.messageCode() != Header::ERROR_EVENT) {return;}
00154     if (! i2oChain.complete()) {return;}
00155     
00156     // fetch basic data from the I2OChain
00157     double eventSize = static_cast<double>(i2oChain.totalDataSize());
00158     
00159     // look up the monitoring records that we need
00160     bool pointersAreValid;
00161     RBRecordPtr rbRecordPtr;
00162     FURecordPtr fuRecordPtr;
00163     {
00164       boost::mutex::scoped_lock sl(collectionsMutex_);
00165       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00166       if (pointersAreValid)
00167       {
00168         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00169       }
00170     }
00171     
00172     // accumulate the data of interest
00173     if (pointersAreValid)
00174     {
00175       rbRecordPtr->errorEventSize.addSample(eventSize);
00176       fuRecordPtr->errorEventSize.addSample(eventSize);
00177     }
00178   }
00179   
00180   
00181   void DataSenderMonitorCollection::addFaultyEventSample(I2OChain const& i2oChain)
00182   {
00183     // fetch basic data from the I2OChain
00184     double eventSize = static_cast<double>(i2oChain.totalDataSize());
00185     
00186     // look up the monitoring records that we need
00187     bool pointersAreValid;
00188     RBRecordPtr rbRecordPtr;
00189     FURecordPtr fuRecordPtr;
00190     {
00191       boost::mutex::scoped_lock sl(collectionsMutex_);
00192       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00193       if (pointersAreValid)
00194       {
00195         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00196       }
00197     }
00198     
00199     // accumulate the data of interest
00200     if (pointersAreValid)
00201     {
00202       if (i2oChain.messageCode() == Header::DQM_EVENT)
00203       {
00204         rbRecordPtr->faultyDQMEventSize.addSample(eventSize);
00205         fuRecordPtr->faultyDQMEventSize.addSample(eventSize);
00206       }
00207       else
00208       {
00209         rbRecordPtr->faultyEventSize.addSample(eventSize);
00210         fuRecordPtr->faultyEventSize.addSample(eventSize);
00211       }
00212     }
00213   }
00214   
00215   
00216   void DataSenderMonitorCollection::incrementDataDiscardCount(I2OChain const& i2oChain)
00217   {
00218     // look up the monitoring records that we need
00219     bool pointersAreValid;
00220     RBRecordPtr rbRecordPtr;
00221     FURecordPtr fuRecordPtr;
00222     {
00223       boost::mutex::scoped_lock sl(collectionsMutex_);
00224       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00225       if (pointersAreValid)
00226       {
00227         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00228       }
00229     }
00230     
00231     // accumulate the data of interest
00232     if (pointersAreValid)
00233     {
00234       rbRecordPtr->dataDiscardCount.addSample(1);
00235       fuRecordPtr->dataDiscardCount.addSample(1);
00236     }
00237   }
00238   
00239   
00240   void DataSenderMonitorCollection::incrementDQMDiscardCount(I2OChain const& i2oChain)
00241   {
00242     // look up the monitoring records that we need
00243     bool pointersAreValid;
00244     RBRecordPtr rbRecordPtr;
00245     FURecordPtr fuRecordPtr;
00246     {
00247       boost::mutex::scoped_lock sl(collectionsMutex_);
00248       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00249       if (pointersAreValid)
00250       {
00251         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00252       }
00253     }
00254     
00255     // accumulate the data of interest
00256     if (pointersAreValid)
00257     {
00258       rbRecordPtr->dqmDiscardCount.addSample(1);
00259       fuRecordPtr->dqmDiscardCount.addSample(1);
00260     }
00261   }
00262   
00263   
00264   void DataSenderMonitorCollection::incrementSkippedDiscardCount(I2OChain const& i2oChain)
00265   {
00266     // look up the monitoring records that we need
00267     bool pointersAreValid;
00268     RBRecordPtr rbRecordPtr;
00269     FURecordPtr fuRecordPtr;
00270     {
00271       boost::mutex::scoped_lock sl(collectionsMutex_);
00272       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00273       if (pointersAreValid)
00274       {
00275         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00276       }
00277     }
00278     
00279     // accumulate the data of interest
00280     if (pointersAreValid)
00281     {
00282       rbRecordPtr->skippedDiscardCount.addSample(1);
00283       fuRecordPtr->skippedDiscardCount.addSample(1);
00284     }
00285   }
00286   
00287   
00288   DataSenderMonitorCollection::OutputModuleResultsList
00289   DataSenderMonitorCollection::getTopLevelOutputModuleResults() const
00290   {
00291     boost::mutex::scoped_lock sl(collectionsMutex_);
00292     
00293     return buildOutputModuleResults(outputModuleMap_);
00294   }
00295   
00296   
00297   DataSenderMonitorCollection::ResourceBrokerResultsList
00298   DataSenderMonitorCollection::getAllResourceBrokerResults() const
00299   {
00300     boost::mutex::scoped_lock sl(collectionsMutex_);
00301     ResourceBrokerResultsList resultsList;
00302     
00303     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00304     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00305       resourceBrokerMap_.end();
00306     for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
00307     {
00308       RBRecordPtr rbRecordPtr = rbMapIter->second;
00309       RBResultPtr result = buildResourceBrokerResult(rbRecordPtr);
00310       result->uniqueRBID = rbMapIter->first;
00311       resultsList.push_back(result);
00312     }
00313     
00314     return resultsList;
00315   }
00316   
00317   
00318   DataSenderMonitorCollection::RBResultPtr
00319   DataSenderMonitorCollection::getOneResourceBrokerResult(UniqueResourceBrokerID_t uniqueRBID) const
00320   {
00321     boost::mutex::scoped_lock sl(collectionsMutex_);
00322     RBResultPtr result;
00323     
00324     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00325     rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00326     if (rbMapIter != resourceBrokerMap_.end())
00327     {
00328       RBRecordPtr rbRecordPtr = rbMapIter->second;
00329       result = buildResourceBrokerResult(rbRecordPtr);
00330       result->uniqueRBID = rbMapIter->first;
00331     }
00332     
00333     return result;
00334   }
00335   
00336   
00337   DataSenderMonitorCollection::OutputModuleResultsList
00338   DataSenderMonitorCollection::getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
00339   {
00340     boost::mutex::scoped_lock sl(collectionsMutex_);
00341     OutputModuleResultsList resultsList;
00342     
00343     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00344     rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00345     if (rbMapIter != resourceBrokerMap_.end())
00346     {
00347       RBRecordPtr rbRecordPtr = rbMapIter->second;
00348       resultsList = buildOutputModuleResults(rbRecordPtr->outputModuleMap);
00349     }
00350     
00351     return resultsList;
00352   }
00353   
00354   
00355   DataSenderMonitorCollection::FilterUnitResultsList
00356   DataSenderMonitorCollection::getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
00357   {
00358     boost::mutex::scoped_lock sl(collectionsMutex_);
00359     FilterUnitResultsList resultsList;
00360     
00361     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00362     rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00363     if (rbMapIter != resourceBrokerMap_.end())
00364     {
00365       RBRecordPtr rbRecordPtr = rbMapIter->second;
00366       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00367       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00368         rbRecordPtr->filterUnitMap.end();        
00369       for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
00370            fuMapIter != fuMapEnd; ++fuMapIter)
00371       {
00372         FURecordPtr fuRecordPtr = fuMapIter->second;
00373         FUResultPtr result(new FilterUnitResult(fuRecordPtr->key));
00374         result->initMsgCount = fuRecordPtr->initMsgCount;
00375         result->lastRunNumber = fuRecordPtr->lastRunNumber;
00376         result->lastEventNumber = fuRecordPtr->lastEventNumber;
00377         fuRecordPtr->shortIntervalEventSize.getStats(result->shortIntervalEventStats);
00378         fuRecordPtr->mediumIntervalEventSize.getStats(result->mediumIntervalEventStats);
00379         fuRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
00380         fuRecordPtr->errorEventSize.getStats(result->errorEventStats);
00381         fuRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
00382         fuRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
00383         fuRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
00384         fuRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
00385         fuRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
00386         
00387         result->outstandingDataDiscardCount =
00388           result->initMsgCount +
00389           result->shortIntervalEventStats.getSampleCount() +
00390           result->errorEventStats.getSampleCount() +
00391           result->faultyEventStats.getSampleCount() -
00392           result->dataDiscardStats.getSampleCount();
00393         result->outstandingDQMDiscardCount =
00394           result->dqmEventStats.getSampleCount() +
00395           result->faultyDQMEventStats.getSampleCount() -
00396           result->dqmDiscardStats.getSampleCount();
00397         
00398         resultsList.push_back(result);
00399       }
00400     }
00401     
00402     return resultsList;
00403   }
00404   
00405   
00406   void DataSenderMonitorCollection::do_calculateStatistics()
00407   {
00408     boost::mutex::scoped_lock sl(collectionsMutex_);
00409     
00410     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00411     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00412       resourceBrokerMap_.end();
00413     for (rbMapIter=resourceBrokerMap_.begin(); rbMapIter!=rbMapEnd; ++rbMapIter)
00414     {
00415       RBRecordPtr rbRecordPtr = rbMapIter->second;
00416       rbRecordPtr->eventSize.calculateStatistics();
00417       rbRecordPtr->dqmEventSize.calculateStatistics();
00418       rbRecordPtr->errorEventSize.calculateStatistics();
00419       rbRecordPtr->faultyEventSize.calculateStatistics();
00420       rbRecordPtr->faultyDQMEventSize.calculateStatistics();
00421       rbRecordPtr->dataDiscardCount.calculateStatistics();
00422       rbRecordPtr->dqmDiscardCount.calculateStatistics();
00423       rbRecordPtr->skippedDiscardCount.calculateStatistics();
00424       calcStatsForOutputModules(rbRecordPtr->outputModuleMap);
00425       
00426       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00427       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00428         rbRecordPtr->filterUnitMap.end();        
00429       for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
00430            fuMapIter != fuMapEnd; ++fuMapIter)
00431       {
00432         FURecordPtr fuRecordPtr = fuMapIter->second;
00433         fuRecordPtr->shortIntervalEventSize.calculateStatistics();
00434         fuRecordPtr->mediumIntervalEventSize.calculateStatistics();
00435         fuRecordPtr->dqmEventSize.calculateStatistics();
00436         fuRecordPtr->errorEventSize.calculateStatistics();
00437         fuRecordPtr->faultyEventSize.calculateStatistics();
00438         fuRecordPtr->faultyDQMEventSize.calculateStatistics();
00439         fuRecordPtr->dataDiscardCount.calculateStatistics();
00440         fuRecordPtr->dqmDiscardCount.calculateStatistics();
00441         fuRecordPtr->skippedDiscardCount.calculateStatistics();
00442         calcStatsForOutputModules(fuRecordPtr->outputModuleMap);
00443       }
00444     }
00445     
00446     calcStatsForOutputModules(outputModuleMap_);
00447   }
00448   
00449   
00450   void DataSenderMonitorCollection::do_reset()
00451   {
00452     boost::mutex::scoped_lock sl(collectionsMutex_);
00453     
00454     connectedRBs_ = 0;
00455     connectedEPs_ = 0;
00456     activeEPs_ = 0;
00457     outstandingDataDiscards_ = 0;
00458     outstandingDQMDiscards_ = 0;
00459     faultyEvents_ = 0;
00460     ignoredDiscards_ = 0;
00461     resourceBrokerMap_.clear();
00462     outputModuleMap_.clear();
00463   }
00464   
00465   
00466   void DataSenderMonitorCollection::do_appendInfoSpaceItems(InfoSpaceItems& infoSpaceItems)
00467   {
00468     infoSpaceItems.push_back(std::make_pair("connectedRBs", &connectedRBs_));
00469     infoSpaceItems.push_back(std::make_pair("connectedEPs", &connectedEPs_));
00470     infoSpaceItems.push_back(std::make_pair("activeEPs", &activeEPs_));
00471     infoSpaceItems.push_back(std::make_pair("outstandingDataDiscards", &outstandingDataDiscards_));
00472     infoSpaceItems.push_back(std::make_pair("outstandingDQMDiscards", &outstandingDQMDiscards_));
00473     infoSpaceItems.push_back(std::make_pair("faultyEvents", &faultyEvents_));
00474     infoSpaceItems.push_back(std::make_pair("ignoredDiscards", &ignoredDiscards_));
00475   }
00476   
00477   
00478   void DataSenderMonitorCollection::do_updateInfoSpaceItems()
00479   {
00480     boost::mutex::scoped_lock sl(collectionsMutex_);
00481     
00482     connectedRBs_ = static_cast<xdata::UnsignedInteger32>(resourceBrokerMap_.size());
00483     
00484     uint32_t localEPCount = 0;
00485     uint32_t localActiveEPCount = 0;
00486     int localMissingDataDiscardCount = 0;
00487     int localMissingDQMDiscardCount = 0;
00488     uint32_t localFaultyEventsCount = 0;
00489     uint32_t localIgnoredDiscardCount = 0;
00490     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00491     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00492       resourceBrokerMap_.end();
00493     for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
00494     {
00495       RBRecordPtr rbRecordPtr = rbMapIter->second;
00496       if ( rbRecordPtr->initMsgCount > 0 )
00497         localEPCount += rbRecordPtr->nExpectedEPs / rbRecordPtr->initMsgCount;
00498       
00499       MonitoredQuantity::Stats skippedDiscardStats;
00500       rbRecordPtr->skippedDiscardCount.getStats(skippedDiscardStats);
00501       localIgnoredDiscardCount += skippedDiscardStats.getSampleCount();
00502       
00503       MonitoredQuantity::Stats eventStats;
00504       MonitoredQuantity::Stats errorEventStats;
00505       MonitoredQuantity::Stats dataDiscardStats;
00506       rbRecordPtr->eventSize.getStats(eventStats);
00507       rbRecordPtr->errorEventSize.getStats(errorEventStats);
00508       rbRecordPtr->dataDiscardCount.getStats(dataDiscardStats);
00509       localMissingDataDiscardCount += rbRecordPtr->initMsgCount + eventStats.getSampleCount() +
00510         errorEventStats.getSampleCount() - dataDiscardStats.getSampleCount();
00511       
00512       MonitoredQuantity::Stats dqmEventStats;
00513       MonitoredQuantity::Stats dqmDiscardStats;
00514       rbRecordPtr->dqmEventSize.getStats(dqmEventStats);
00515       rbRecordPtr->dqmDiscardCount.getStats(dqmDiscardStats);
00516       localMissingDQMDiscardCount += dqmEventStats.getSampleCount() -
00517         dqmDiscardStats.getSampleCount();
00518       
00519       MonitoredQuantity::Stats faultyEventStats;
00520       rbRecordPtr->faultyEventSize.getStats(faultyEventStats);
00521       localFaultyEventsCount += faultyEventStats.getSampleCount();
00522       MonitoredQuantity::Stats faultyDQMEventStats;
00523       rbRecordPtr->faultyDQMEventSize.getStats(faultyDQMEventStats);
00524       localFaultyEventsCount += faultyDQMEventStats.getSampleCount();
00525       
00526       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00527       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00528         rbRecordPtr->filterUnitMap.end();        
00529       for (fuMapIter = rbRecordPtr->filterUnitMap.begin(); fuMapIter != fuMapEnd; ++fuMapIter)
00530       {
00531         FURecordPtr fuRecordPtr = fuMapIter->second;
00532         MonitoredQuantity::Stats fuMediumIntervalEventStats;
00533         fuRecordPtr->mediumIntervalEventSize.getStats(fuMediumIntervalEventStats);
00534         if (fuMediumIntervalEventStats.getSampleCount(MonitoredQuantity::RECENT) > 0) {
00535           ++localActiveEPCount;
00536         }
00537       }
00538     }
00539     connectedEPs_ = static_cast<xdata::UnsignedInteger32>(localEPCount);
00540     activeEPs_ = static_cast<xdata::UnsignedInteger32>(localActiveEPCount);
00541     outstandingDataDiscards_ = static_cast<xdata::Integer32>(localMissingDataDiscardCount);
00542     outstandingDQMDiscards_ = static_cast<xdata::Integer32>(localMissingDQMDiscardCount);
00543     faultyEvents_ = static_cast<xdata::UnsignedInteger32>(localFaultyEventsCount);
00544     ignoredDiscards_ = static_cast<xdata::UnsignedInteger32>(localIgnoredDiscardCount);
00545     
00546     faultyEventsAlarm(localFaultyEventsCount);
00547     ignoredDiscardAlarm(localIgnoredDiscardCount);
00548   }
00549   
00550   
00551   void DataSenderMonitorCollection::faultyEventsAlarm(const uint32_t& faultyEventsCount) const
00552   {
00553     const std::string alarmName = "FaultyEvents";
00554     
00555     if (faultyEventsCount > 0)
00556     {
00557       std::ostringstream msg;
00558       msg << "Missing or faulty I2O fragments for " <<
00559         faultyEventsCount <<
00560         " events. These events are lost!";
00561       XCEPT_DECLARE(stor::exception::FaultyEvents, ex, msg.str());
00562       alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
00563     }
00564     else
00565     {
00566       alarmHandler_->revokeAlarm(alarmName);
00567     }
00568   }
00569   
00570   
00571   void DataSenderMonitorCollection::ignoredDiscardAlarm(const uint32_t& ignoredDiscardCount) const
00572   {
00573     const std::string alarmName = "IgnoredDiscard";
00574     
00575     if ( ignoredDiscardCount > 0)
00576     {
00577       std::ostringstream msg;
00578       msg << ignoredDiscardCount <<
00579         " discard messages ignored. These events might be stuck in the resource broker.";
00580       XCEPT_DECLARE(stor::exception::IgnoredDiscard, ex, msg.str());
00581       alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
00582     }
00583     else
00584     {
00585       alarmHandler_->revokeAlarm(alarmName);
00586     }
00587   }
00588 
00589   
00590   size_t DataSenderMonitorCollection::getConnectedEPs() const
00591   {
00592     size_t count = 0;
00593     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00594     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00595       resourceBrokerMap_.end();
00596     for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
00597     {
00598       if ( rbMapIter->second->initMsgCount > 0 )
00599         count += rbMapIter->second->nExpectedEPs / rbMapIter->second->initMsgCount;
00600     }
00601     return count;
00602   }
00603 
00604   
00605   typedef DataSenderMonitorCollection DSMC;
00606   
00607   bool DSMC::getAllNeededPointers
00608   (
00609     I2OChain const& i2oChain,
00610     DSMC::RBRecordPtr& rbRecordPtr,
00611     DSMC::FURecordPtr& fuRecordPtr,
00612     DSMC::OutModRecordPtr& topLevelOutModPtr,
00613     DSMC::OutModRecordPtr& rbSpecificOutModPtr,
00614     DSMC::OutModRecordPtr& fuSpecificOutModPtr
00615   )
00616   {
00617     ResourceBrokerKey rbKey(i2oChain);
00618     if (! rbKey.isValid) {return false;}
00619     FilterUnitKey fuKey(i2oChain);
00620     if (! fuKey.isValid) {return false;}
00621     OutputModuleKey outModKey = i2oChain.outputModuleId();
00622     
00623     topLevelOutModPtr = getOutputModuleRecord(outputModuleMap_, outModKey);
00624     
00625     rbRecordPtr = getResourceBrokerRecord(rbKey);
00626     rbSpecificOutModPtr = getOutputModuleRecord(
00627       rbRecordPtr->outputModuleMap,
00628       outModKey);
00629     
00630     fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
00631     fuSpecificOutModPtr = getOutputModuleRecord(
00632       fuRecordPtr->outputModuleMap,
00633       outModKey);
00634     
00635     return true;
00636   }
00637   
00638   
00639   bool DSMC::getRBRecordPointer
00640   (
00641     I2OChain const& i2oChain,
00642     DSMC::RBRecordPtr& rbRecordPtr
00643   )
00644   {
00645     ResourceBrokerKey rbKey(i2oChain);
00646     if (! rbKey.isValid) {return false;}
00647     
00648     rbRecordPtr = getResourceBrokerRecord(rbKey);
00649     return true;
00650   }
00651   
00652   
00653   bool DSMC::getFURecordPointer
00654   (
00655     I2OChain const& i2oChain,
00656     DSMC::RBRecordPtr& rbRecordPtr,
00657     DSMC::FURecordPtr& fuRecordPtr
00658   )
00659   {
00660     FilterUnitKey fuKey(i2oChain);
00661     if (! fuKey.isValid) {return false;}
00662     
00663     fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
00664     return true;
00665   }
00666   
00667   
00668   DSMC::RBRecordPtr
00669   DSMC::getResourceBrokerRecord(DSMC::ResourceBrokerKey const& rbKey)
00670   {
00671     RBRecordPtr rbRecordPtr;
00672     UniqueResourceBrokerID_t uniqueRBID = getUniqueResourceBrokerID(rbKey);
00673     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00674     rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00675     if (rbMapIter == resourceBrokerMap_.end())
00676     {
00677       rbRecordPtr.reset(new ResourceBrokerRecord(rbKey,updateInterval_));
00678       resourceBrokerMap_[uniqueRBID] = rbRecordPtr;
00679     }
00680     else
00681     {
00682       rbRecordPtr = rbMapIter->second;
00683     }
00684     return rbRecordPtr;
00685   }
00686   
00687   
00688   DSMC::UniqueResourceBrokerID_t
00689   DSMC::getUniqueResourceBrokerID(DSMC::ResourceBrokerKey const& rbKey)
00690   {
00691     UniqueResourceBrokerID_t uniqueID;
00692     std::map<ResourceBrokerKey, UniqueResourceBrokerID_t>::const_iterator rbMapIter;
00693     rbMapIter = resourceBrokerIDs_.find(rbKey);
00694     if (rbMapIter == resourceBrokerIDs_.end())
00695     {
00696       std::string workString = rbKey.hltURL +
00697         boost::lexical_cast<std::string>(rbKey.hltTid) +
00698         boost::lexical_cast<std::string>(rbKey.hltInstance) +
00699         boost::lexical_cast<std::string>(rbKey.hltLocalId) +
00700         rbKey.hltClassName;
00701       uLong crc = crc32(0L, Z_NULL, 0);
00702       Bytef* crcbuf = (Bytef*) workString.data();
00703       crc = crc32(crc, crcbuf, workString.length());
00704       uniqueID = static_cast<UniqueResourceBrokerID_t>(crc);
00705       resourceBrokerIDs_[rbKey] = uniqueID;
00706     }
00707     else
00708     {
00709       uniqueID = rbMapIter->second;
00710     }
00711     return uniqueID;
00712   }
00713   
00714   
00715   DSMC::FURecordPtr
00716   DSMC::getFilterUnitRecord
00717   (
00718     DSMC::RBRecordPtr& rbRecordPtr,
00719     DSMC::FilterUnitKey const& fuKey
00720   )
00721   {
00722     FURecordPtr fuRecordPtr;
00723     std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00724     fuMapIter = rbRecordPtr->filterUnitMap.find(fuKey);
00725     if (fuMapIter == rbRecordPtr->filterUnitMap.end())
00726     {
00727       fuRecordPtr.reset(new FilterUnitRecord(fuKey,updateInterval_));
00728       rbRecordPtr->filterUnitMap[fuKey] = fuRecordPtr;
00729     }
00730     else
00731     {
00732       fuRecordPtr = fuMapIter->second;
00733     }
00734     return fuRecordPtr;
00735   }
00736   
00737   
00738   DSMC::OutModRecordPtr
00739   DSMC::getOutputModuleRecord
00740   (
00741     OutputModuleRecordMap& outModMap,
00742     DSMC::OutputModuleKey const& outModKey
00743   )
00744   {
00745     OutModRecordPtr outModRecordPtr;
00746     OutputModuleRecordMap::const_iterator omMapIter;
00747     omMapIter = outModMap.find(outModKey);
00748     if (omMapIter == outModMap.end())
00749     {
00750       outModRecordPtr.reset(new OutputModuleRecord(updateInterval_));
00751       
00752       outModRecordPtr->name = "Unknown";
00753       outModRecordPtr->id = outModKey;
00754       outModRecordPtr->initMsgSize = 0;
00755       
00756       outModMap[outModKey] = outModRecordPtr;
00757     }
00758     else
00759     {
00760       outModRecordPtr = omMapIter->second;
00761     }
00762     return outModRecordPtr;
00763   }
00764   
00765   
00766   DSMC::OutputModuleResultsList
00767   DSMC::buildOutputModuleResults(DSMC::OutputModuleRecordMap const& outputModuleMap) const
00768   {
00769     OutputModuleResultsList resultsList;
00770     
00771     OutputModuleRecordMap::const_iterator omMapIter;
00772     OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
00773     for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
00774     {
00775       OutModRecordPtr outModRecordPtr = omMapIter->second;
00776       boost::shared_ptr<OutputModuleResult> result(new OutputModuleResult());
00777       result->name = outModRecordPtr->name;
00778       result->id = outModRecordPtr->id;
00779       result->initMsgSize = outModRecordPtr->initMsgSize;
00780       outModRecordPtr->eventSize.getStats(result->eventStats);
00781       resultsList.push_back(result);
00782     }
00783     
00784     return resultsList;
00785   }
00786   
00787   
00788   DSMC::RBResultPtr
00789   DSMC::buildResourceBrokerResult(DSMC::RBRecordPtr const& rbRecordPtr) const
00790   {
00791     RBResultPtr result(new ResourceBrokerResult(rbRecordPtr->key));
00792     
00793     result->filterUnitCount = rbRecordPtr->initMsgCount>0 ? rbRecordPtr->nExpectedEPs / rbRecordPtr->initMsgCount : 0;
00794     result->initMsgCount = rbRecordPtr->initMsgCount;
00795     result->lastRunNumber = rbRecordPtr->lastRunNumber;
00796     result->lastEventNumber = rbRecordPtr->lastEventNumber;
00797     rbRecordPtr->eventSize.getStats(result->eventStats);
00798     rbRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
00799     rbRecordPtr->errorEventSize.getStats(result->errorEventStats);
00800     rbRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
00801     rbRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
00802     rbRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
00803     rbRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
00804     rbRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
00805     
00806     result->outstandingDataDiscardCount =
00807       result->initMsgCount +
00808       result->eventStats.getSampleCount() +
00809       result->errorEventStats.getSampleCount() +
00810       result->faultyEventStats.getSampleCount() -
00811       result->dataDiscardStats.getSampleCount();
00812     result->outstandingDQMDiscardCount =
00813       result->dqmEventStats.getSampleCount() +
00814       result->faultyDQMEventStats.getSampleCount() -
00815       result->dqmDiscardStats.getSampleCount();
00816     
00817     return result;
00818   }
00819   
00820   
00821   void DSMC::calcStatsForOutputModules(DSMC::OutputModuleRecordMap& outputModuleMap)
00822   {
00823     OutputModuleRecordMap::const_iterator omMapIter;
00824     OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
00825     for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
00826     {
00827       OutModRecordPtr outModRecordPtr = omMapIter->second;
00828       
00829       outModRecordPtr->fragmentSize.calculateStatistics();
00830       outModRecordPtr->eventSize.calculateStatistics();
00831     }
00832   }
00833   
00834   
00835   bool compareRBResultPtrValues
00836   (
00837     DSMC::RBResultPtr firstValue,
00838     DSMC::RBResultPtr secondValue
00839   )
00840   {
00841     return *firstValue < *secondValue;
00842   }
00843 
00844 } // namespace stor
00845 
00846