CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/EventFilter/StorageManager/src/DataSenderMonitorCollection.cc

Go to the documentation of this file.
00001 // $Id: DataSenderMonitorCollection.cc,v 1.17 2011/03/07 15:31:32 mommsen Exp $
00003 
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007 
00008 #include <zlib.h>
00009 #include <boost/lexical_cast.hpp>
00010 
00011 #include "EventFilter/StorageManager/interface/Exception.h"
00012 #include "EventFilter/StorageManager/interface/DataSenderMonitorCollection.h"
00013 
00014 
00015 namespace stor {
00016   
00017   DataSenderMonitorCollection::DataSenderMonitorCollection
00018   (
00019     const utils::Duration_t& updateInterval,
00020     AlarmHandlerPtr ah
00021   ) :
00022   MonitorCollection(updateInterval),
00023   connectedRBs_(0),
00024   connectedEPs_(0),
00025   activeEPs_(0),
00026   outstandingDataDiscards_(0),
00027   outstandingDQMDiscards_(0),
00028   faultyEvents_(0),
00029   ignoredDiscards_(0),
00030   updateInterval_(updateInterval),
00031   alarmHandler_(ah)
00032   {}
00033   
00034   
00035   void DataSenderMonitorCollection::addFragmentSample(I2OChain const& i2oChain)
00036   {
00037     // focus only on INIT and event fragments, for now
00038     if (i2oChain.messageCode() != Header::INIT &&
00039       i2oChain.messageCode() != Header::EVENT) {return;}
00040     if (i2oChain.fragmentCount() != 1) {return;}
00041     
00042     // fetch basic data from the I2OChain
00043     //double fragmentSize = static_cast<double>(i2oChain.totalDataSize());
00044     
00045     // look up the monitoring records that we need
00046     bool pointersAreValid;
00047     RBRecordPtr rbRecordPtr;
00048     FURecordPtr fuRecordPtr;
00049     OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00050     {
00051       boost::mutex::scoped_lock sl(collectionsMutex_);
00052       pointersAreValid = getAllNeededPointers(
00053         i2oChain, rbRecordPtr, fuRecordPtr,
00054         topLevelOutModPtr, rbSpecificOutModPtr,
00055         fuSpecificOutModPtr);
00056     }
00057     
00058     // accumulate the data of interest
00059     //if (pointersAreValid)
00060     //{
00061     //topLevelOutModPtr->fragmentSize.addSample(fragmentSize);
00062     //rbSpecificOutModPtr->fragmentSize.addSample(fragmentSize);
00063     //fuSpecificOutModPtr->fragmentSize.addSample(fragmentSize);
00064     //}
00065   }
00066   
00067   
00068   void DataSenderMonitorCollection::addInitSample(I2OChain const& i2oChain)
00069   {
00070     // sanity checks
00071     if (i2oChain.messageCode() != Header::INIT) {return;}
00072     if (! i2oChain.complete()) {return;}
00073     
00074     // fetch basic data from the I2OChain
00075     std::string outModName = i2oChain.outputModuleLabel();
00076     uint32_t msgSize = i2oChain.totalDataSize();
00077     
00078     // look up the monitoring records that we need
00079     bool pointersAreValid;
00080     RBRecordPtr rbRecordPtr;
00081     FURecordPtr fuRecordPtr;
00082     OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00083     {
00084       boost::mutex::scoped_lock sl(collectionsMutex_);
00085       pointersAreValid = getAllNeededPointers(
00086         i2oChain, rbRecordPtr, fuRecordPtr,
00087         topLevelOutModPtr, rbSpecificOutModPtr,
00088         fuSpecificOutModPtr);
00089     }
00090     
00091     // accumulate the data of interest
00092     if (pointersAreValid)
00093     {
00094       topLevelOutModPtr->name = outModName;
00095       topLevelOutModPtr->initMsgSize = msgSize;
00096       
00097       ++rbRecordPtr->initMsgCount;
00098       rbSpecificOutModPtr->name = outModName;
00099       rbSpecificOutModPtr->initMsgSize = msgSize;
00100       
00101       ++fuRecordPtr->initMsgCount;
00102     fuSpecificOutModPtr->name = outModName;
00103     fuSpecificOutModPtr->initMsgSize = msgSize;
00104     }
00105   }
00106   
00107   
00108   void DataSenderMonitorCollection::addEventSample(I2OChain const& i2oChain)
00109   {
00110     // sanity checks
00111     if (i2oChain.messageCode() != Header::EVENT) {return;}
00112     if (! i2oChain.complete()) {return;}
00113 
00114     // fetch basic data from the I2OChain
00115     double eventSize = static_cast<double>(i2oChain.totalDataSize());
00116     uint32_t runNumber = i2oChain.runNumber();
00117     uint32_t eventNumber = i2oChain.eventNumber();
00118     
00119     // look up the monitoring records that we need
00120     bool pointersAreValid;
00121     RBRecordPtr rbRecordPtr;
00122     FURecordPtr fuRecordPtr;
00123     OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00124     {
00125       boost::mutex::scoped_lock sl(collectionsMutex_);
00126       pointersAreValid = getAllNeededPointers(
00127         i2oChain, rbRecordPtr, fuRecordPtr,
00128         topLevelOutModPtr, rbSpecificOutModPtr,
00129         fuSpecificOutModPtr);
00130     }
00131     
00132     // accumulate the data of interest
00133     if (pointersAreValid)
00134     {
00135       topLevelOutModPtr->eventSize.addSample(eventSize);
00136       
00137       rbRecordPtr->lastRunNumber = runNumber;
00138       rbRecordPtr->lastEventNumber = eventNumber;
00139       rbRecordPtr->eventSize.addSample(eventSize);
00140       rbSpecificOutModPtr->eventSize.addSample(eventSize);
00141       
00142       fuRecordPtr->lastRunNumber = runNumber;
00143       fuRecordPtr->lastEventNumber = eventNumber;
00144       fuRecordPtr->shortIntervalEventSize.addSample(eventSize);
00145       fuRecordPtr->mediumIntervalEventSize.addSample(eventSize);
00146       fuSpecificOutModPtr->eventSize.addSample(eventSize);
00147     }
00148   }
00149   
00150   
00151   void DataSenderMonitorCollection::addDQMEventSample(I2OChain const& i2oChain)
00152   {
00153     // sanity checks
00154     if (i2oChain.messageCode() != Header::DQM_EVENT) {return;}
00155     if (! i2oChain.complete()) {return;}
00156     
00157     // fetch basic data from the I2OChain
00158     double eventSize = static_cast<double>(i2oChain.totalDataSize());
00159     
00160     // look up the monitoring records that we need
00161     bool pointersAreValid;
00162     RBRecordPtr rbRecordPtr;
00163     FURecordPtr fuRecordPtr;
00164     {
00165       boost::mutex::scoped_lock sl(collectionsMutex_);
00166       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00167       if (pointersAreValid)
00168       {
00169         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00170       }
00171     }
00172     
00173     // accumulate the data of interest
00174     if (pointersAreValid)
00175     {
00176       rbRecordPtr->dqmEventSize.addSample(eventSize);
00177       fuRecordPtr->dqmEventSize.addSample(eventSize);
00178     }
00179   }
00180   
00181   
00182   void DataSenderMonitorCollection::addErrorEventSample(I2OChain const& i2oChain)
00183   {
00184     // sanity checks
00185     if (i2oChain.messageCode() != Header::ERROR_EVENT) {return;}
00186     if (! i2oChain.complete()) {return;}
00187     
00188     // fetch basic data from the I2OChain
00189     double eventSize = static_cast<double>(i2oChain.totalDataSize());
00190     
00191     // look up the monitoring records that we need
00192     bool pointersAreValid;
00193     RBRecordPtr rbRecordPtr;
00194     FURecordPtr fuRecordPtr;
00195     {
00196       boost::mutex::scoped_lock sl(collectionsMutex_);
00197       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00198       if (pointersAreValid)
00199       {
00200         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00201       }
00202     }
00203     
00204     // accumulate the data of interest
00205     if (pointersAreValid)
00206     {
00207       rbRecordPtr->errorEventSize.addSample(eventSize);
00208       fuRecordPtr->errorEventSize.addSample(eventSize);
00209     }
00210   }
00211   
00212   
00213   void DataSenderMonitorCollection::addFaultyEventSample(I2OChain const& i2oChain)
00214   {
00215     // fetch basic data from the I2OChain
00216     double eventSize = static_cast<double>(i2oChain.totalDataSize());
00217     
00218     // look up the monitoring records that we need
00219     bool pointersAreValid;
00220     RBRecordPtr rbRecordPtr;
00221     FURecordPtr fuRecordPtr;
00222     {
00223       boost::mutex::scoped_lock sl(collectionsMutex_);
00224       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00225       if (pointersAreValid)
00226       {
00227         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00228       }
00229     }
00230     
00231     // accumulate the data of interest
00232     if (pointersAreValid)
00233     {
00234       if (i2oChain.messageCode() == Header::DQM_EVENT)
00235       {
00236         rbRecordPtr->faultyDQMEventSize.addSample(eventSize);
00237         fuRecordPtr->faultyDQMEventSize.addSample(eventSize);
00238       }
00239       else
00240       {
00241         rbRecordPtr->faultyEventSize.addSample(eventSize);
00242         fuRecordPtr->faultyEventSize.addSample(eventSize);
00243       }
00244     }
00245   }
00246   
00247   
00248   void DataSenderMonitorCollection::incrementDataDiscardCount(I2OChain const& i2oChain)
00249   {
00250     // look up the monitoring records that we need
00251     bool pointersAreValid;
00252     RBRecordPtr rbRecordPtr;
00253     FURecordPtr fuRecordPtr;
00254     {
00255       boost::mutex::scoped_lock sl(collectionsMutex_);
00256       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00257       if (pointersAreValid)
00258       {
00259         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00260       }
00261     }
00262     
00263     // accumulate the data of interest
00264     if (pointersAreValid)
00265     {
00266       rbRecordPtr->dataDiscardCount.addSample(1);
00267       fuRecordPtr->dataDiscardCount.addSample(1);
00268     }
00269   }
00270   
00271   
00272   void DataSenderMonitorCollection::incrementDQMDiscardCount(I2OChain const& i2oChain)
00273   {
00274     // look up the monitoring records that we need
00275     bool pointersAreValid;
00276     RBRecordPtr rbRecordPtr;
00277     FURecordPtr fuRecordPtr;
00278     {
00279       boost::mutex::scoped_lock sl(collectionsMutex_);
00280       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00281       if (pointersAreValid)
00282       {
00283         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00284       }
00285     }
00286     
00287     // accumulate the data of interest
00288     if (pointersAreValid)
00289     {
00290       rbRecordPtr->dqmDiscardCount.addSample(1);
00291       fuRecordPtr->dqmDiscardCount.addSample(1);
00292     }
00293   }
00294   
00295   
00296   void DataSenderMonitorCollection::incrementSkippedDiscardCount(I2OChain const& i2oChain)
00297   {
00298     // look up the monitoring records that we need
00299     bool pointersAreValid;
00300     RBRecordPtr rbRecordPtr;
00301     FURecordPtr fuRecordPtr;
00302     {
00303       boost::mutex::scoped_lock sl(collectionsMutex_);
00304       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00305       if (pointersAreValid)
00306       {
00307         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00308       }
00309     }
00310     
00311     // accumulate the data of interest
00312     if (pointersAreValid)
00313     {
00314       rbRecordPtr->skippedDiscardCount.addSample(1);
00315       fuRecordPtr->skippedDiscardCount.addSample(1);
00316     }
00317   }
00318   
00319   
00320   DataSenderMonitorCollection::OutputModuleResultsList
00321   DataSenderMonitorCollection::getTopLevelOutputModuleResults() const
00322   {
00323     boost::mutex::scoped_lock sl(collectionsMutex_);
00324     
00325     return buildOutputModuleResults(outputModuleMap_);
00326   }
00327   
00328   
00329   DataSenderMonitorCollection::ResourceBrokerResultsList
00330   DataSenderMonitorCollection::getAllResourceBrokerResults() const
00331   {
00332     boost::mutex::scoped_lock sl(collectionsMutex_);
00333     ResourceBrokerResultsList resultsList;
00334     
00335     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00336     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00337       resourceBrokerMap_.end();
00338     for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
00339     {
00340       RBRecordPtr rbRecordPtr = rbMapIter->second;
00341       RBResultPtr result = buildResourceBrokerResult(rbRecordPtr);
00342       result->uniqueRBID = rbMapIter->first;
00343       resultsList.push_back(result);
00344     }
00345     
00346     return resultsList;
00347   }
00348   
00349   
00350   DataSenderMonitorCollection::RBResultPtr
00351   DataSenderMonitorCollection::getOneResourceBrokerResult(UniqueResourceBrokerID_t uniqueRBID) const
00352   {
00353     boost::mutex::scoped_lock sl(collectionsMutex_);
00354     RBResultPtr result;
00355     
00356     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00357     rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00358     if (rbMapIter != resourceBrokerMap_.end())
00359     {
00360       RBRecordPtr rbRecordPtr = rbMapIter->second;
00361       result = buildResourceBrokerResult(rbRecordPtr);
00362       result->uniqueRBID = rbMapIter->first;
00363     }
00364     
00365     return result;
00366   }
00367   
00368   
00369   DataSenderMonitorCollection::OutputModuleResultsList
00370   DataSenderMonitorCollection::getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
00371   {
00372     boost::mutex::scoped_lock sl(collectionsMutex_);
00373     OutputModuleResultsList resultsList;
00374     
00375     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00376     rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00377     if (rbMapIter != resourceBrokerMap_.end())
00378     {
00379       RBRecordPtr rbRecordPtr = rbMapIter->second;
00380       resultsList = buildOutputModuleResults(rbRecordPtr->outputModuleMap);
00381     }
00382     
00383     return resultsList;
00384   }
00385   
00386   
00387   DataSenderMonitorCollection::FilterUnitResultsList
00388   DataSenderMonitorCollection::getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
00389   {
00390     boost::mutex::scoped_lock sl(collectionsMutex_);
00391     FilterUnitResultsList resultsList;
00392     
00393     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00394     rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00395     if (rbMapIter != resourceBrokerMap_.end())
00396     {
00397       RBRecordPtr rbRecordPtr = rbMapIter->second;
00398       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00399       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00400         rbRecordPtr->filterUnitMap.end();        
00401       for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
00402            fuMapIter != fuMapEnd; ++fuMapIter)
00403       {
00404         FURecordPtr fuRecordPtr = fuMapIter->second;
00405         FUResultPtr result(new FilterUnitResult(fuRecordPtr->key));
00406         result->initMsgCount = fuRecordPtr->initMsgCount;
00407         result->lastRunNumber = fuRecordPtr->lastRunNumber;
00408         result->lastEventNumber = fuRecordPtr->lastEventNumber;
00409         fuRecordPtr->shortIntervalEventSize.getStats(result->shortIntervalEventStats);
00410         fuRecordPtr->mediumIntervalEventSize.getStats(result->mediumIntervalEventStats);
00411         fuRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
00412         fuRecordPtr->errorEventSize.getStats(result->errorEventStats);
00413         fuRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
00414         fuRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
00415         fuRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
00416         fuRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
00417         fuRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
00418         
00419         result->outstandingDataDiscardCount =
00420           result->initMsgCount +
00421           result->shortIntervalEventStats.getSampleCount() +
00422           result->errorEventStats.getSampleCount() +
00423           result->faultyEventStats.getSampleCount() -
00424           result->dataDiscardStats.getSampleCount();
00425         result->outstandingDQMDiscardCount =
00426           result->dqmEventStats.getSampleCount() +
00427           result->faultyDQMEventStats.getSampleCount() -
00428           result->dqmDiscardStats.getSampleCount();
00429         
00430         resultsList.push_back(result);
00431       }
00432     }
00433     
00434     return resultsList;
00435   }
00436   
00437   
00438   void DataSenderMonitorCollection::do_calculateStatistics()
00439   {
00440     boost::mutex::scoped_lock sl(collectionsMutex_);
00441     
00442     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00443     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00444       resourceBrokerMap_.end();
00445     for (rbMapIter=resourceBrokerMap_.begin(); rbMapIter!=rbMapEnd; ++rbMapIter)
00446     {
00447       RBRecordPtr rbRecordPtr = rbMapIter->second;
00448       rbRecordPtr->eventSize.calculateStatistics();
00449       rbRecordPtr->dqmEventSize.calculateStatistics();
00450       rbRecordPtr->errorEventSize.calculateStatistics();
00451       rbRecordPtr->faultyEventSize.calculateStatistics();
00452       rbRecordPtr->faultyDQMEventSize.calculateStatistics();
00453       rbRecordPtr->dataDiscardCount.calculateStatistics();
00454       rbRecordPtr->dqmDiscardCount.calculateStatistics();
00455       rbRecordPtr->skippedDiscardCount.calculateStatistics();
00456       calcStatsForOutputModules(rbRecordPtr->outputModuleMap);
00457       
00458       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00459       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00460         rbRecordPtr->filterUnitMap.end();        
00461       for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
00462            fuMapIter != fuMapEnd; ++fuMapIter)
00463       {
00464         FURecordPtr fuRecordPtr = fuMapIter->second;
00465         fuRecordPtr->shortIntervalEventSize.calculateStatistics();
00466         fuRecordPtr->mediumIntervalEventSize.calculateStatistics();
00467         fuRecordPtr->dqmEventSize.calculateStatistics();
00468         fuRecordPtr->errorEventSize.calculateStatistics();
00469         fuRecordPtr->faultyEventSize.calculateStatistics();
00470         fuRecordPtr->faultyDQMEventSize.calculateStatistics();
00471         fuRecordPtr->dataDiscardCount.calculateStatistics();
00472         fuRecordPtr->dqmDiscardCount.calculateStatistics();
00473         fuRecordPtr->skippedDiscardCount.calculateStatistics();
00474         calcStatsForOutputModules(fuRecordPtr->outputModuleMap);
00475       }
00476     }
00477     
00478     calcStatsForOutputModules(outputModuleMap_);
00479   }
00480   
00481   
00482   void DataSenderMonitorCollection::do_reset()
00483   {
00484     boost::mutex::scoped_lock sl(collectionsMutex_);
00485     
00486     connectedRBs_ = 0;
00487     connectedEPs_ = 0;
00488     activeEPs_ = 0;
00489     outstandingDataDiscards_ = 0;
00490     outstandingDQMDiscards_ = 0;
00491     faultyEvents_ = 0;
00492     ignoredDiscards_ = 0;
00493     resourceBrokerMap_.clear();
00494     outputModuleMap_.clear();
00495   }
00496   
00497   
00498   void DataSenderMonitorCollection::do_appendInfoSpaceItems(InfoSpaceItems& infoSpaceItems)
00499   {
00500     infoSpaceItems.push_back(std::make_pair("connectedRBs", &connectedRBs_));
00501     infoSpaceItems.push_back(std::make_pair("connectedEPs", &connectedEPs_));
00502     infoSpaceItems.push_back(std::make_pair("activeEPs", &activeEPs_));
00503     infoSpaceItems.push_back(std::make_pair("outstandingDataDiscards", &outstandingDataDiscards_));
00504     infoSpaceItems.push_back(std::make_pair("outstandingDQMDiscards", &outstandingDQMDiscards_));
00505     infoSpaceItems.push_back(std::make_pair("faultyEvents", &faultyEvents_));
00506     infoSpaceItems.push_back(std::make_pair("ignoredDiscards", &ignoredDiscards_));
00507   }
00508   
00509   
00510   void DataSenderMonitorCollection::do_updateInfoSpaceItems()
00511   {
00512     boost::mutex::scoped_lock sl(collectionsMutex_);
00513     
00514     connectedRBs_ = static_cast<xdata::UnsignedInteger32>(resourceBrokerMap_.size());
00515     
00516     uint32_t localEPCount = 0;
00517     uint32_t localActiveEPCount = 0;
00518     int localMissingDataDiscardCount = 0;
00519     int localMissingDQMDiscardCount = 0;
00520     uint32_t localFaultyEventsCount = 0;
00521     uint32_t localIgnoredDiscardCount = 0;
00522     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00523     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00524       resourceBrokerMap_.end();
00525     for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
00526     {
00527       RBRecordPtr rbRecordPtr = rbMapIter->second;
00528       localEPCount += rbRecordPtr->filterUnitMap.size();
00529       
00530       MonitoredQuantity::Stats skippedDiscardStats;
00531       rbRecordPtr->skippedDiscardCount.getStats(skippedDiscardStats);
00532       localIgnoredDiscardCount += skippedDiscardStats.getSampleCount();
00533       
00534       MonitoredQuantity::Stats eventStats;
00535       MonitoredQuantity::Stats errorEventStats;
00536       MonitoredQuantity::Stats dataDiscardStats;
00537       rbRecordPtr->eventSize.getStats(eventStats);
00538       rbRecordPtr->errorEventSize.getStats(errorEventStats);
00539       rbRecordPtr->dataDiscardCount.getStats(dataDiscardStats);
00540       localMissingDataDiscardCount += rbRecordPtr->initMsgCount + eventStats.getSampleCount() +
00541         errorEventStats.getSampleCount() - dataDiscardStats.getSampleCount();
00542       
00543       MonitoredQuantity::Stats dqmEventStats;
00544       MonitoredQuantity::Stats dqmDiscardStats;
00545       rbRecordPtr->dqmEventSize.getStats(dqmEventStats);
00546       rbRecordPtr->dqmDiscardCount.getStats(dqmDiscardStats);
00547       localMissingDQMDiscardCount += dqmEventStats.getSampleCount() -
00548         dqmDiscardStats.getSampleCount();
00549       
00550       MonitoredQuantity::Stats faultyEventStats;
00551       rbRecordPtr->faultyEventSize.getStats(faultyEventStats);
00552       localFaultyEventsCount += faultyEventStats.getSampleCount();
00553       MonitoredQuantity::Stats faultyDQMEventStats;
00554       rbRecordPtr->faultyDQMEventSize.getStats(faultyDQMEventStats);
00555       localFaultyEventsCount += faultyDQMEventStats.getSampleCount();
00556       
00557       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00558       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00559         rbRecordPtr->filterUnitMap.end();        
00560       for (fuMapIter = rbRecordPtr->filterUnitMap.begin(); fuMapIter != fuMapEnd; ++fuMapIter)
00561       {
00562         FURecordPtr fuRecordPtr = fuMapIter->second;
00563         MonitoredQuantity::Stats fuMediumIntervalEventStats;
00564         fuRecordPtr->mediumIntervalEventSize.getStats(fuMediumIntervalEventStats);
00565         if (fuMediumIntervalEventStats.getSampleCount(MonitoredQuantity::RECENT) > 0) {
00566           ++localActiveEPCount;
00567         }
00568       }
00569     }
00570     connectedEPs_ = static_cast<xdata::UnsignedInteger32>(localEPCount);
00571     activeEPs_ = static_cast<xdata::UnsignedInteger32>(localActiveEPCount);
00572     outstandingDataDiscards_ = static_cast<xdata::Integer32>(localMissingDataDiscardCount);
00573     outstandingDQMDiscards_ = static_cast<xdata::Integer32>(localMissingDQMDiscardCount);
00574     faultyEvents_ = static_cast<xdata::UnsignedInteger32>(localFaultyEventsCount);
00575     ignoredDiscards_ = static_cast<xdata::UnsignedInteger32>(localIgnoredDiscardCount);
00576     
00577     faultyEventsAlarm(localFaultyEventsCount);
00578     ignoredDiscardAlarm(localIgnoredDiscardCount);
00579   }
00580   
00581   
00582   void DataSenderMonitorCollection::faultyEventsAlarm(const uint32_t& faultyEventsCount) const
00583   {
00584     const std::string alarmName = "FaultyEvents";
00585     
00586     if (faultyEventsCount > 0)
00587     {
00588       std::ostringstream msg;
00589       msg << "Missing or faulty I2O fragments for " <<
00590         faultyEventsCount <<
00591         " events. These events are lost!";
00592       XCEPT_DECLARE(stor::exception::FaultyEvents, ex, msg.str());
00593       alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
00594     }
00595     else
00596     {
00597       alarmHandler_->revokeAlarm(alarmName);
00598     }
00599   }
00600   
00601   
00602   void DataSenderMonitorCollection::ignoredDiscardAlarm(const uint32_t& ignoredDiscardCount) const
00603   {
00604     const std::string alarmName = "IgnoredDiscard";
00605     
00606     if ( ignoredDiscardCount > 0)
00607     {
00608       std::ostringstream msg;
00609       msg << ignoredDiscardCount <<
00610         " discard messages ignored. These events might be stuck in the resource broker.";
00611       XCEPT_DECLARE(stor::exception::IgnoredDiscard, ex, msg.str());
00612       alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
00613     }
00614     else
00615     {
00616       alarmHandler_->revokeAlarm(alarmName);
00617     }
00618   }
00619 
00620   
00621   typedef DataSenderMonitorCollection DSMC;
00622   
00623   bool DSMC::getAllNeededPointers
00624   (
00625     I2OChain const& i2oChain,
00626     DSMC::RBRecordPtr& rbRecordPtr,
00627     DSMC::FURecordPtr& fuRecordPtr,
00628     DSMC::OutModRecordPtr& topLevelOutModPtr,
00629     DSMC::OutModRecordPtr& rbSpecificOutModPtr,
00630     DSMC::OutModRecordPtr& fuSpecificOutModPtr
00631   )
00632   {
00633     ResourceBrokerKey rbKey(i2oChain);
00634     if (! rbKey.isValid) {return false;}
00635     FilterUnitKey fuKey(i2oChain);
00636     if (! fuKey.isValid) {return false;}
00637     OutputModuleKey outModKey = i2oChain.outputModuleId();
00638     
00639     topLevelOutModPtr = getOutputModuleRecord(outputModuleMap_, outModKey);
00640     
00641     rbRecordPtr = getResourceBrokerRecord(rbKey);
00642     rbSpecificOutModPtr = getOutputModuleRecord(
00643       rbRecordPtr->outputModuleMap,
00644       outModKey);
00645     
00646     fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
00647     fuSpecificOutModPtr = getOutputModuleRecord(
00648       fuRecordPtr->outputModuleMap,
00649       outModKey);
00650     
00651     return true;
00652   }
00653   
00654   
00655   bool DSMC::getRBRecordPointer
00656   (
00657     I2OChain const& i2oChain,
00658     DSMC::RBRecordPtr& rbRecordPtr
00659   )
00660   {
00661     ResourceBrokerKey rbKey(i2oChain);
00662     if (! rbKey.isValid) {return false;}
00663     
00664     rbRecordPtr = getResourceBrokerRecord(rbKey);
00665     return true;
00666   }
00667   
00668   
00669   bool DSMC::getFURecordPointer
00670   (
00671     I2OChain const& i2oChain,
00672     DSMC::RBRecordPtr& rbRecordPtr,
00673     DSMC::FURecordPtr& fuRecordPtr
00674   )
00675   {
00676     FilterUnitKey fuKey(i2oChain);
00677     if (! fuKey.isValid) {return false;}
00678     
00679     fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
00680     return true;
00681   }
00682   
00683   
00684   DSMC::RBRecordPtr
00685   DSMC::getResourceBrokerRecord(DSMC::ResourceBrokerKey const& rbKey)
00686   {
00687     RBRecordPtr rbRecordPtr;
00688     UniqueResourceBrokerID_t uniqueRBID = getUniqueResourceBrokerID(rbKey);
00689     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00690     rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00691     if (rbMapIter == resourceBrokerMap_.end())
00692     {
00693       rbRecordPtr.reset(new ResourceBrokerRecord(rbKey,updateInterval_));
00694       resourceBrokerMap_[uniqueRBID] = rbRecordPtr;
00695     }
00696     else
00697     {
00698       rbRecordPtr = rbMapIter->second;
00699     }
00700     return rbRecordPtr;
00701   }
00702   
00703   
00704   DSMC::UniqueResourceBrokerID_t
00705   DSMC::getUniqueResourceBrokerID(DSMC::ResourceBrokerKey const& rbKey)
00706   {
00707     UniqueResourceBrokerID_t uniqueID;
00708     std::map<ResourceBrokerKey, UniqueResourceBrokerID_t>::const_iterator rbMapIter;
00709     rbMapIter = resourceBrokerIDs_.find(rbKey);
00710     if (rbMapIter == resourceBrokerIDs_.end())
00711     {
00712       std::string workString = rbKey.hltURL +
00713         boost::lexical_cast<std::string>(rbKey.hltTid) +
00714         boost::lexical_cast<std::string>(rbKey.hltInstance) +
00715         boost::lexical_cast<std::string>(rbKey.hltLocalId) +
00716         rbKey.hltClassName;
00717       uLong crc = crc32(0L, Z_NULL, 0);
00718       Bytef* crcbuf = (Bytef*) workString.data();
00719       crc = crc32(crc, crcbuf, workString.length());
00720       uniqueID = static_cast<UniqueResourceBrokerID_t>(crc);
00721       resourceBrokerIDs_[rbKey] = uniqueID;
00722     }
00723     else
00724     {
00725       uniqueID = rbMapIter->second;
00726     }
00727     return uniqueID;
00728   }
00729   
00730   
00731   DSMC::FURecordPtr
00732   DSMC::getFilterUnitRecord
00733   (
00734     DSMC::RBRecordPtr& rbRecordPtr,
00735     DSMC::FilterUnitKey const& fuKey
00736   )
00737   {
00738     FURecordPtr fuRecordPtr;
00739     std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00740     fuMapIter = rbRecordPtr->filterUnitMap.find(fuKey);
00741     if (fuMapIter == rbRecordPtr->filterUnitMap.end())
00742     {
00743       fuRecordPtr.reset(new FilterUnitRecord(fuKey,updateInterval_));
00744       rbRecordPtr->filterUnitMap[fuKey] = fuRecordPtr;
00745     }
00746     else
00747     {
00748       fuRecordPtr = fuMapIter->second;
00749     }
00750     return fuRecordPtr;
00751   }
00752   
00753   
00754   DSMC::OutModRecordPtr
00755   DSMC::getOutputModuleRecord
00756   (
00757     OutputModuleRecordMap& outModMap,
00758     DSMC::OutputModuleKey const& outModKey
00759   )
00760   {
00761     OutModRecordPtr outModRecordPtr;
00762     OutputModuleRecordMap::const_iterator omMapIter;
00763     omMapIter = outModMap.find(outModKey);
00764     if (omMapIter == outModMap.end())
00765     {
00766       outModRecordPtr.reset(new OutputModuleRecord(updateInterval_));
00767       
00768       outModRecordPtr->name = "Unknown";
00769       outModRecordPtr->id = outModKey;
00770       outModRecordPtr->initMsgSize = 0;
00771       
00772       outModMap[outModKey] = outModRecordPtr;
00773     }
00774     else
00775     {
00776       outModRecordPtr = omMapIter->second;
00777     }
00778     return outModRecordPtr;
00779   }
00780   
00781   
00782   DSMC::OutputModuleResultsList
00783   DSMC::buildOutputModuleResults(DSMC::OutputModuleRecordMap const& outputModuleMap) const
00784   {
00785     OutputModuleResultsList resultsList;
00786     
00787     OutputModuleRecordMap::const_iterator omMapIter;
00788     OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
00789     for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
00790     {
00791       OutModRecordPtr outModRecordPtr = omMapIter->second;
00792       boost::shared_ptr<OutputModuleResult> result(new OutputModuleResult());
00793       result->name = outModRecordPtr->name;
00794       result->id = outModRecordPtr->id;
00795       result->initMsgSize = outModRecordPtr->initMsgSize;
00796       outModRecordPtr->eventSize.getStats(result->eventStats);
00797       resultsList.push_back(result);
00798     }
00799     
00800     return resultsList;
00801   }
00802   
00803   
00804   DSMC::RBResultPtr
00805   DSMC::buildResourceBrokerResult(DSMC::RBRecordPtr const& rbRecordPtr) const
00806   {
00807     RBResultPtr result(new ResourceBrokerResult(rbRecordPtr->key));
00808     
00809     result->filterUnitCount = rbRecordPtr->filterUnitMap.size();
00810     result->initMsgCount = rbRecordPtr->initMsgCount;
00811     result->lastRunNumber = rbRecordPtr->lastRunNumber;
00812     result->lastEventNumber = rbRecordPtr->lastEventNumber;
00813     rbRecordPtr->eventSize.getStats(result->eventStats);
00814     rbRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
00815     rbRecordPtr->errorEventSize.getStats(result->errorEventStats);
00816     rbRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
00817     rbRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
00818     rbRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
00819     rbRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
00820     rbRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
00821     
00822     result->outstandingDataDiscardCount =
00823       result->initMsgCount +
00824       result->eventStats.getSampleCount() +
00825       result->errorEventStats.getSampleCount() +
00826       result->faultyEventStats.getSampleCount() -
00827       result->dataDiscardStats.getSampleCount();
00828     result->outstandingDQMDiscardCount =
00829       result->dqmEventStats.getSampleCount() +
00830       result->faultyDQMEventStats.getSampleCount() -
00831       result->dqmDiscardStats.getSampleCount();
00832     
00833     return result;
00834   }
00835   
00836   
00837   void DSMC::calcStatsForOutputModules(DSMC::OutputModuleRecordMap& outputModuleMap)
00838   {
00839     OutputModuleRecordMap::const_iterator omMapIter;
00840     OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
00841     for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
00842     {
00843       OutModRecordPtr outModRecordPtr = omMapIter->second;
00844       
00845       //outModRecordPtr->fragmentSize.calculateStatistics();
00846       outModRecordPtr->eventSize.calculateStatistics();
00847     }
00848   }
00849   
00850   
00851   bool compareRBResultPtrValues
00852   (
00853     DSMC::RBResultPtr firstValue,
00854     DSMC::RBResultPtr secondValue
00855   )
00856   {
00857     return *firstValue < *secondValue;
00858   }
00859 
00860 } // namespace stor
00861 
00862