CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_4_5_patch3/src/EventFilter/StorageManager/src/DataSenderMonitorCollection.cc

Go to the documentation of this file.
00001 // $Id: DataSenderMonitorCollection.cc,v 1.18 2011/05/24 10:14:58 mommsen Exp $
00003 
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007 
00008 #include <zlib.h>
00009 #include <boost/lexical_cast.hpp>
00010 
00011 #include "EventFilter/StorageManager/interface/Exception.h"
00012 #include "EventFilter/StorageManager/interface/DataSenderMonitorCollection.h"
00013 
00014 
00015 namespace stor {
00016   
00017   DataSenderMonitorCollection::DataSenderMonitorCollection
00018   (
00019     const utils::Duration_t& updateInterval,
00020     AlarmHandlerPtr ah
00021   ) :
00022   MonitorCollection(updateInterval),
00023   connectedRBs_(0),
00024   connectedEPs_(0),
00025   activeEPs_(0),
00026   outstandingDataDiscards_(0),
00027   outstandingDQMDiscards_(0),
00028   faultyEvents_(0),
00029   ignoredDiscards_(0),
00030   updateInterval_(updateInterval),
00031   alarmHandler_(ah)
00032   {}
00033   
00034   
00035   void DataSenderMonitorCollection::addFragmentSample(I2OChain const& i2oChain)
00036   {
00037     // focus only on INIT and event fragments, for now
00038     if (i2oChain.messageCode() != Header::INIT &&
00039       i2oChain.messageCode() != Header::EVENT) {return;}
00040     if (i2oChain.fragmentCount() != 1) {return;}
00041     
00042     // fetch basic data from the I2OChain
00043     //double fragmentSize = static_cast<double>(i2oChain.totalDataSize());
00044     
00045     // look up the monitoring records that we need
00046     bool pointersAreValid;
00047     RBRecordPtr rbRecordPtr;
00048     FURecordPtr fuRecordPtr;
00049     OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00050     {
00051       boost::mutex::scoped_lock sl(collectionsMutex_);
00052       pointersAreValid = getAllNeededPointers(
00053         i2oChain, rbRecordPtr, fuRecordPtr,
00054         topLevelOutModPtr, rbSpecificOutModPtr,
00055         fuSpecificOutModPtr);
00056     }
00057     
00058     // accumulate the data of interest
00059     //if (pointersAreValid)
00060     //{
00061     //topLevelOutModPtr->fragmentSize.addSample(fragmentSize);
00062     //rbSpecificOutModPtr->fragmentSize.addSample(fragmentSize);
00063     //fuSpecificOutModPtr->fragmentSize.addSample(fragmentSize);
00064     //}
00065   }
00066   
00067   
00068   void DataSenderMonitorCollection::addInitSample(I2OChain const& i2oChain)
00069   {
00070     // sanity checks
00071     if (i2oChain.messageCode() != Header::INIT) {return;}
00072     if (! i2oChain.complete()) {return;}
00073     
00074     // fetch basic data from the I2OChain
00075     std::string outModName = i2oChain.outputModuleLabel();
00076     uint32_t msgSize = i2oChain.totalDataSize();
00077     
00078     // look up the monitoring records that we need
00079     bool pointersAreValid;
00080     RBRecordPtr rbRecordPtr;
00081     FURecordPtr fuRecordPtr;
00082     OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00083     {
00084       boost::mutex::scoped_lock sl(collectionsMutex_);
00085       pointersAreValid = getAllNeededPointers(
00086         i2oChain, rbRecordPtr, fuRecordPtr,
00087         topLevelOutModPtr, rbSpecificOutModPtr,
00088         fuSpecificOutModPtr);
00089     }
00090     
00091     // accumulate the data of interest
00092     if (pointersAreValid)
00093     {
00094       topLevelOutModPtr->name = outModName;
00095       topLevelOutModPtr->initMsgSize = msgSize;
00096       
00097       ++rbRecordPtr->initMsgCount;
00098       rbSpecificOutModPtr->name = outModName;
00099       rbSpecificOutModPtr->initMsgSize = msgSize;
00100       
00101       ++fuRecordPtr->initMsgCount;
00102     fuSpecificOutModPtr->name = outModName;
00103     fuSpecificOutModPtr->initMsgSize = msgSize;
00104     }
00105   }
00106   
00107   
00108   void DataSenderMonitorCollection::addEventSample(I2OChain const& i2oChain)
00109   {
00110     // sanity checks
00111     if (i2oChain.messageCode() != Header::EVENT) {return;}
00112     if (! i2oChain.complete()) {return;}
00113 
00114     // fetch basic data from the I2OChain
00115     double eventSize = static_cast<double>(i2oChain.totalDataSize());
00116     uint32_t runNumber = i2oChain.runNumber();
00117     uint32_t eventNumber = i2oChain.eventNumber();
00118     
00119     // look up the monitoring records that we need
00120     bool pointersAreValid;
00121     RBRecordPtr rbRecordPtr;
00122     FURecordPtr fuRecordPtr;
00123     OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00124     {
00125       boost::mutex::scoped_lock sl(collectionsMutex_);
00126       pointersAreValid = getAllNeededPointers(
00127         i2oChain, rbRecordPtr, fuRecordPtr,
00128         topLevelOutModPtr, rbSpecificOutModPtr,
00129         fuSpecificOutModPtr);
00130     }
00131     
00132     // accumulate the data of interest
00133     if (pointersAreValid)
00134     {
00135       topLevelOutModPtr->eventSize.addSample(eventSize);
00136       
00137       rbRecordPtr->lastRunNumber = runNumber;
00138       rbRecordPtr->lastEventNumber = eventNumber;
00139       rbRecordPtr->eventSize.addSample(eventSize);
00140       rbSpecificOutModPtr->eventSize.addSample(eventSize);
00141       
00142       fuRecordPtr->lastRunNumber = runNumber;
00143       fuRecordPtr->lastEventNumber = eventNumber;
00144       fuRecordPtr->shortIntervalEventSize.addSample(eventSize);
00145       fuRecordPtr->mediumIntervalEventSize.addSample(eventSize);
00146       fuSpecificOutModPtr->eventSize.addSample(eventSize);
00147     }
00148   }
00149   
00150   
00151   void DataSenderMonitorCollection::addDQMEventSample(I2OChain const& i2oChain)
00152   {
00153     // sanity checks
00154     if (i2oChain.messageCode() != Header::DQM_EVENT) {return;}
00155     if (! i2oChain.complete()) {return;}
00156     
00157     // fetch basic data from the I2OChain
00158     double eventSize = static_cast<double>(i2oChain.totalDataSize());
00159     
00160     // look up the monitoring records that we need
00161     bool pointersAreValid;
00162     RBRecordPtr rbRecordPtr;
00163     FURecordPtr fuRecordPtr;
00164     {
00165       boost::mutex::scoped_lock sl(collectionsMutex_);
00166       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00167       if (pointersAreValid)
00168       {
00169         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00170       }
00171     }
00172     
00173     // accumulate the data of interest
00174     if (pointersAreValid)
00175     {
00176       rbRecordPtr->dqmEventSize.addSample(eventSize);
00177       fuRecordPtr->dqmEventSize.addSample(eventSize);
00178     }
00179   }
00180   
00181   
00182   void DataSenderMonitorCollection::addErrorEventSample(I2OChain const& i2oChain)
00183   {
00184     // sanity checks
00185     if (i2oChain.messageCode() != Header::ERROR_EVENT) {return;}
00186     if (! i2oChain.complete()) {return;}
00187     
00188     // fetch basic data from the I2OChain
00189     double eventSize = static_cast<double>(i2oChain.totalDataSize());
00190     
00191     // look up the monitoring records that we need
00192     bool pointersAreValid;
00193     RBRecordPtr rbRecordPtr;
00194     FURecordPtr fuRecordPtr;
00195     {
00196       boost::mutex::scoped_lock sl(collectionsMutex_);
00197       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00198       if (pointersAreValid)
00199       {
00200         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00201       }
00202     }
00203     
00204     // accumulate the data of interest
00205     if (pointersAreValid)
00206     {
00207       rbRecordPtr->errorEventSize.addSample(eventSize);
00208       fuRecordPtr->errorEventSize.addSample(eventSize);
00209     }
00210   }
00211   
00212   
00213   void DataSenderMonitorCollection::addFaultyEventSample(I2OChain const& i2oChain)
00214   {
00215     // fetch basic data from the I2OChain
00216     double eventSize = static_cast<double>(i2oChain.totalDataSize());
00217     
00218     // look up the monitoring records that we need
00219     bool pointersAreValid;
00220     RBRecordPtr rbRecordPtr;
00221     FURecordPtr fuRecordPtr;
00222     {
00223       boost::mutex::scoped_lock sl(collectionsMutex_);
00224       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00225       if (pointersAreValid)
00226       {
00227         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00228       }
00229     }
00230     
00231     // accumulate the data of interest
00232     if (pointersAreValid)
00233     {
00234       if (i2oChain.messageCode() == Header::DQM_EVENT)
00235       {
00236         rbRecordPtr->faultyDQMEventSize.addSample(eventSize);
00237         fuRecordPtr->faultyDQMEventSize.addSample(eventSize);
00238       }
00239       else
00240       {
00241         rbRecordPtr->faultyEventSize.addSample(eventSize);
00242         fuRecordPtr->faultyEventSize.addSample(eventSize);
00243       }
00244     }
00245   }
00246   
00247   
00248   void DataSenderMonitorCollection::incrementDataDiscardCount(I2OChain const& i2oChain)
00249   {
00250     // look up the monitoring records that we need
00251     bool pointersAreValid;
00252     RBRecordPtr rbRecordPtr;
00253     FURecordPtr fuRecordPtr;
00254     {
00255       boost::mutex::scoped_lock sl(collectionsMutex_);
00256       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00257       if (pointersAreValid)
00258       {
00259         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00260       }
00261     }
00262     
00263     // accumulate the data of interest
00264     if (pointersAreValid)
00265     {
00266       rbRecordPtr->dataDiscardCount.addSample(1);
00267       fuRecordPtr->dataDiscardCount.addSample(1);
00268     }
00269   }
00270   
00271   
00272   void DataSenderMonitorCollection::incrementDQMDiscardCount(I2OChain const& i2oChain)
00273   {
00274     // look up the monitoring records that we need
00275     bool pointersAreValid;
00276     RBRecordPtr rbRecordPtr;
00277     FURecordPtr fuRecordPtr;
00278     {
00279       boost::mutex::scoped_lock sl(collectionsMutex_);
00280       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00281       if (pointersAreValid)
00282       {
00283         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00284       }
00285     }
00286     
00287     // accumulate the data of interest
00288     if (pointersAreValid)
00289     {
00290       rbRecordPtr->dqmDiscardCount.addSample(1);
00291       fuRecordPtr->dqmDiscardCount.addSample(1);
00292     }
00293   }
00294   
00295   
00296   void DataSenderMonitorCollection::incrementSkippedDiscardCount(I2OChain const& i2oChain)
00297   {
00298     // look up the monitoring records that we need
00299     bool pointersAreValid;
00300     RBRecordPtr rbRecordPtr;
00301     FURecordPtr fuRecordPtr;
00302     {
00303       boost::mutex::scoped_lock sl(collectionsMutex_);
00304       pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00305       if (pointersAreValid)
00306       {
00307         pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00308       }
00309     }
00310     
00311     // accumulate the data of interest
00312     if (pointersAreValid)
00313     {
00314       rbRecordPtr->skippedDiscardCount.addSample(1);
00315       fuRecordPtr->skippedDiscardCount.addSample(1);
00316     }
00317   }
00318   
00319   
00320   DataSenderMonitorCollection::OutputModuleResultsList
00321   DataSenderMonitorCollection::getTopLevelOutputModuleResults() const
00322   {
00323     boost::mutex::scoped_lock sl(collectionsMutex_);
00324     
00325     return buildOutputModuleResults(outputModuleMap_);
00326   }
00327   
00328   
00329   DataSenderMonitorCollection::ResourceBrokerResultsList
00330   DataSenderMonitorCollection::getAllResourceBrokerResults() const
00331   {
00332     boost::mutex::scoped_lock sl(collectionsMutex_);
00333     ResourceBrokerResultsList resultsList;
00334     
00335     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00336     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00337       resourceBrokerMap_.end();
00338     for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
00339     {
00340       RBRecordPtr rbRecordPtr = rbMapIter->second;
00341       RBResultPtr result = buildResourceBrokerResult(rbRecordPtr);
00342       result->uniqueRBID = rbMapIter->first;
00343       resultsList.push_back(result);
00344     }
00345     
00346     return resultsList;
00347   }
00348   
00349   
00350   DataSenderMonitorCollection::RBResultPtr
00351   DataSenderMonitorCollection::getOneResourceBrokerResult(UniqueResourceBrokerID_t uniqueRBID) const
00352   {
00353     boost::mutex::scoped_lock sl(collectionsMutex_);
00354     RBResultPtr result;
00355     
00356     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00357     rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00358     if (rbMapIter != resourceBrokerMap_.end())
00359     {
00360       RBRecordPtr rbRecordPtr = rbMapIter->second;
00361       result = buildResourceBrokerResult(rbRecordPtr);
00362       result->uniqueRBID = rbMapIter->first;
00363     }
00364     
00365     return result;
00366   }
00367   
00368   
00369   DataSenderMonitorCollection::OutputModuleResultsList
00370   DataSenderMonitorCollection::getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
00371   {
00372     boost::mutex::scoped_lock sl(collectionsMutex_);
00373     OutputModuleResultsList resultsList;
00374     
00375     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00376     rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00377     if (rbMapIter != resourceBrokerMap_.end())
00378     {
00379       RBRecordPtr rbRecordPtr = rbMapIter->second;
00380       resultsList = buildOutputModuleResults(rbRecordPtr->outputModuleMap);
00381     }
00382     
00383     return resultsList;
00384   }
00385   
00386   
00387   DataSenderMonitorCollection::FilterUnitResultsList
00388   DataSenderMonitorCollection::getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
00389   {
00390     boost::mutex::scoped_lock sl(collectionsMutex_);
00391     FilterUnitResultsList resultsList;
00392     
00393     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00394     rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00395     if (rbMapIter != resourceBrokerMap_.end())
00396     {
00397       RBRecordPtr rbRecordPtr = rbMapIter->second;
00398       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00399       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00400         rbRecordPtr->filterUnitMap.end();        
00401       for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
00402            fuMapIter != fuMapEnd; ++fuMapIter)
00403       {
00404         FURecordPtr fuRecordPtr = fuMapIter->second;
00405         FUResultPtr result(new FilterUnitResult(fuRecordPtr->key));
00406         result->initMsgCount = fuRecordPtr->initMsgCount;
00407         result->lastRunNumber = fuRecordPtr->lastRunNumber;
00408         result->lastEventNumber = fuRecordPtr->lastEventNumber;
00409         fuRecordPtr->shortIntervalEventSize.getStats(result->shortIntervalEventStats);
00410         fuRecordPtr->mediumIntervalEventSize.getStats(result->mediumIntervalEventStats);
00411         fuRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
00412         fuRecordPtr->errorEventSize.getStats(result->errorEventStats);
00413         fuRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
00414         fuRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
00415         fuRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
00416         fuRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
00417         fuRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
00418         
00419         result->outstandingDataDiscardCount =
00420           result->initMsgCount +
00421           result->shortIntervalEventStats.getSampleCount() +
00422           result->errorEventStats.getSampleCount() +
00423           result->faultyEventStats.getSampleCount() -
00424           result->dataDiscardStats.getSampleCount();
00425         result->outstandingDQMDiscardCount =
00426           result->dqmEventStats.getSampleCount() +
00427           result->faultyDQMEventStats.getSampleCount() -
00428           result->dqmDiscardStats.getSampleCount();
00429         
00430         resultsList.push_back(result);
00431       }
00432     }
00433     
00434     return resultsList;
00435   }
00436   
00437   
00438   void DataSenderMonitorCollection::do_calculateStatistics()
00439   {
00440     boost::mutex::scoped_lock sl(collectionsMutex_);
00441     
00442     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00443     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00444       resourceBrokerMap_.end();
00445     for (rbMapIter=resourceBrokerMap_.begin(); rbMapIter!=rbMapEnd; ++rbMapIter)
00446     {
00447       RBRecordPtr rbRecordPtr = rbMapIter->second;
00448       rbRecordPtr->eventSize.calculateStatistics();
00449       rbRecordPtr->dqmEventSize.calculateStatistics();
00450       rbRecordPtr->errorEventSize.calculateStatistics();
00451       rbRecordPtr->faultyEventSize.calculateStatistics();
00452       rbRecordPtr->faultyDQMEventSize.calculateStatistics();
00453       rbRecordPtr->dataDiscardCount.calculateStatistics();
00454       rbRecordPtr->dqmDiscardCount.calculateStatistics();
00455       rbRecordPtr->skippedDiscardCount.calculateStatistics();
00456       calcStatsForOutputModules(rbRecordPtr->outputModuleMap);
00457       
00458       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00459       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00460         rbRecordPtr->filterUnitMap.end();        
00461       for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
00462            fuMapIter != fuMapEnd; ++fuMapIter)
00463       {
00464         FURecordPtr fuRecordPtr = fuMapIter->second;
00465         fuRecordPtr->shortIntervalEventSize.calculateStatistics();
00466         fuRecordPtr->mediumIntervalEventSize.calculateStatistics();
00467         fuRecordPtr->dqmEventSize.calculateStatistics();
00468         fuRecordPtr->errorEventSize.calculateStatistics();
00469         fuRecordPtr->faultyEventSize.calculateStatistics();
00470         fuRecordPtr->faultyDQMEventSize.calculateStatistics();
00471         fuRecordPtr->dataDiscardCount.calculateStatistics();
00472         fuRecordPtr->dqmDiscardCount.calculateStatistics();
00473         fuRecordPtr->skippedDiscardCount.calculateStatistics();
00474         calcStatsForOutputModules(fuRecordPtr->outputModuleMap);
00475       }
00476     }
00477     
00478     calcStatsForOutputModules(outputModuleMap_);
00479   }
00480   
00481   
00482   void DataSenderMonitorCollection::do_reset()
00483   {
00484     boost::mutex::scoped_lock sl(collectionsMutex_);
00485     
00486     connectedRBs_ = 0;
00487     connectedEPs_ = 0;
00488     activeEPs_ = 0;
00489     outstandingDataDiscards_ = 0;
00490     outstandingDQMDiscards_ = 0;
00491     faultyEvents_ = 0;
00492     ignoredDiscards_ = 0;
00493     resourceBrokerMap_.clear();
00494     outputModuleMap_.clear();
00495   }
00496   
00497   
00498   void DataSenderMonitorCollection::do_appendInfoSpaceItems(InfoSpaceItems& infoSpaceItems)
00499   {
00500     infoSpaceItems.push_back(std::make_pair("connectedRBs", &connectedRBs_));
00501     infoSpaceItems.push_back(std::make_pair("connectedEPs", &connectedEPs_));
00502     infoSpaceItems.push_back(std::make_pair("activeEPs", &activeEPs_));
00503     infoSpaceItems.push_back(std::make_pair("outstandingDataDiscards", &outstandingDataDiscards_));
00504     infoSpaceItems.push_back(std::make_pair("outstandingDQMDiscards", &outstandingDQMDiscards_));
00505     infoSpaceItems.push_back(std::make_pair("faultyEvents", &faultyEvents_));
00506     infoSpaceItems.push_back(std::make_pair("ignoredDiscards", &ignoredDiscards_));
00507   }
00508   
00509   
00510   void DataSenderMonitorCollection::do_updateInfoSpaceItems()
00511   {
00512     boost::mutex::scoped_lock sl(collectionsMutex_);
00513     
00514     connectedRBs_ = static_cast<xdata::UnsignedInteger32>(resourceBrokerMap_.size());
00515     
00516     uint32_t localEPCount = 0;
00517     uint32_t localActiveEPCount = 0;
00518     int localMissingDataDiscardCount = 0;
00519     int localMissingDQMDiscardCount = 0;
00520     uint32_t localFaultyEventsCount = 0;
00521     uint32_t localIgnoredDiscardCount = 0;
00522     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00523     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00524       resourceBrokerMap_.end();
00525     for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
00526     {
00527       RBRecordPtr rbRecordPtr = rbMapIter->second;
00528       localEPCount += rbRecordPtr->filterUnitMap.size();
00529       
00530       MonitoredQuantity::Stats skippedDiscardStats;
00531       rbRecordPtr->skippedDiscardCount.getStats(skippedDiscardStats);
00532       localIgnoredDiscardCount += skippedDiscardStats.getSampleCount();
00533       
00534       MonitoredQuantity::Stats eventStats;
00535       MonitoredQuantity::Stats errorEventStats;
00536       MonitoredQuantity::Stats dataDiscardStats;
00537       rbRecordPtr->eventSize.getStats(eventStats);
00538       rbRecordPtr->errorEventSize.getStats(errorEventStats);
00539       rbRecordPtr->dataDiscardCount.getStats(dataDiscardStats);
00540       localMissingDataDiscardCount += rbRecordPtr->initMsgCount + eventStats.getSampleCount() +
00541         errorEventStats.getSampleCount() - dataDiscardStats.getSampleCount();
00542       localEPCount -= errorEventStats.getSampleCount();
00543       
00544       MonitoredQuantity::Stats dqmEventStats;
00545       MonitoredQuantity::Stats dqmDiscardStats;
00546       rbRecordPtr->dqmEventSize.getStats(dqmEventStats);
00547       rbRecordPtr->dqmDiscardCount.getStats(dqmDiscardStats);
00548       localMissingDQMDiscardCount += dqmEventStats.getSampleCount() -
00549         dqmDiscardStats.getSampleCount();
00550       
00551       MonitoredQuantity::Stats faultyEventStats;
00552       rbRecordPtr->faultyEventSize.getStats(faultyEventStats);
00553       localFaultyEventsCount += faultyEventStats.getSampleCount();
00554       MonitoredQuantity::Stats faultyDQMEventStats;
00555       rbRecordPtr->faultyDQMEventSize.getStats(faultyDQMEventStats);
00556       localFaultyEventsCount += faultyDQMEventStats.getSampleCount();
00557       
00558       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00559       std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00560         rbRecordPtr->filterUnitMap.end();        
00561       for (fuMapIter = rbRecordPtr->filterUnitMap.begin(); fuMapIter != fuMapEnd; ++fuMapIter)
00562       {
00563         FURecordPtr fuRecordPtr = fuMapIter->second;
00564         MonitoredQuantity::Stats fuMediumIntervalEventStats;
00565         fuRecordPtr->mediumIntervalEventSize.getStats(fuMediumIntervalEventStats);
00566         if (fuMediumIntervalEventStats.getSampleCount(MonitoredQuantity::RECENT) > 0) {
00567           ++localActiveEPCount;
00568         }
00569       }
00570     }
00571     connectedEPs_ = static_cast<xdata::UnsignedInteger32>(localEPCount);
00572     activeEPs_ = static_cast<xdata::UnsignedInteger32>(localActiveEPCount);
00573     outstandingDataDiscards_ = static_cast<xdata::Integer32>(localMissingDataDiscardCount);
00574     outstandingDQMDiscards_ = static_cast<xdata::Integer32>(localMissingDQMDiscardCount);
00575     faultyEvents_ = static_cast<xdata::UnsignedInteger32>(localFaultyEventsCount);
00576     ignoredDiscards_ = static_cast<xdata::UnsignedInteger32>(localIgnoredDiscardCount);
00577     
00578     faultyEventsAlarm(localFaultyEventsCount);
00579     ignoredDiscardAlarm(localIgnoredDiscardCount);
00580   }
00581   
00582   
00583   void DataSenderMonitorCollection::faultyEventsAlarm(const uint32_t& faultyEventsCount) const
00584   {
00585     const std::string alarmName = "FaultyEvents";
00586     
00587     if (faultyEventsCount > 0)
00588     {
00589       std::ostringstream msg;
00590       msg << "Missing or faulty I2O fragments for " <<
00591         faultyEventsCount <<
00592         " events. These events are lost!";
00593       XCEPT_DECLARE(stor::exception::FaultyEvents, ex, msg.str());
00594       alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
00595     }
00596     else
00597     {
00598       alarmHandler_->revokeAlarm(alarmName);
00599     }
00600   }
00601   
00602   
00603   void DataSenderMonitorCollection::ignoredDiscardAlarm(const uint32_t& ignoredDiscardCount) const
00604   {
00605     const std::string alarmName = "IgnoredDiscard";
00606     
00607     if ( ignoredDiscardCount > 0)
00608     {
00609       std::ostringstream msg;
00610       msg << ignoredDiscardCount <<
00611         " discard messages ignored. These events might be stuck in the resource broker.";
00612       XCEPT_DECLARE(stor::exception::IgnoredDiscard, ex, msg.str());
00613       alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
00614     }
00615     else
00616     {
00617       alarmHandler_->revokeAlarm(alarmName);
00618     }
00619   }
00620 
00621   
00622   typedef DataSenderMonitorCollection DSMC;
00623   
00624   bool DSMC::getAllNeededPointers
00625   (
00626     I2OChain const& i2oChain,
00627     DSMC::RBRecordPtr& rbRecordPtr,
00628     DSMC::FURecordPtr& fuRecordPtr,
00629     DSMC::OutModRecordPtr& topLevelOutModPtr,
00630     DSMC::OutModRecordPtr& rbSpecificOutModPtr,
00631     DSMC::OutModRecordPtr& fuSpecificOutModPtr
00632   )
00633   {
00634     ResourceBrokerKey rbKey(i2oChain);
00635     if (! rbKey.isValid) {return false;}
00636     FilterUnitKey fuKey(i2oChain);
00637     if (! fuKey.isValid) {return false;}
00638     OutputModuleKey outModKey = i2oChain.outputModuleId();
00639     
00640     topLevelOutModPtr = getOutputModuleRecord(outputModuleMap_, outModKey);
00641     
00642     rbRecordPtr = getResourceBrokerRecord(rbKey);
00643     rbSpecificOutModPtr = getOutputModuleRecord(
00644       rbRecordPtr->outputModuleMap,
00645       outModKey);
00646     
00647     fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
00648     fuSpecificOutModPtr = getOutputModuleRecord(
00649       fuRecordPtr->outputModuleMap,
00650       outModKey);
00651     
00652     return true;
00653   }
00654   
00655   
00656   bool DSMC::getRBRecordPointer
00657   (
00658     I2OChain const& i2oChain,
00659     DSMC::RBRecordPtr& rbRecordPtr
00660   )
00661   {
00662     ResourceBrokerKey rbKey(i2oChain);
00663     if (! rbKey.isValid) {return false;}
00664     
00665     rbRecordPtr = getResourceBrokerRecord(rbKey);
00666     return true;
00667   }
00668   
00669   
00670   bool DSMC::getFURecordPointer
00671   (
00672     I2OChain const& i2oChain,
00673     DSMC::RBRecordPtr& rbRecordPtr,
00674     DSMC::FURecordPtr& fuRecordPtr
00675   )
00676   {
00677     FilterUnitKey fuKey(i2oChain);
00678     if (! fuKey.isValid) {return false;}
00679     
00680     fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
00681     return true;
00682   }
00683   
00684   
00685   DSMC::RBRecordPtr
00686   DSMC::getResourceBrokerRecord(DSMC::ResourceBrokerKey const& rbKey)
00687   {
00688     RBRecordPtr rbRecordPtr;
00689     UniqueResourceBrokerID_t uniqueRBID = getUniqueResourceBrokerID(rbKey);
00690     std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00691     rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00692     if (rbMapIter == resourceBrokerMap_.end())
00693     {
00694       rbRecordPtr.reset(new ResourceBrokerRecord(rbKey,updateInterval_));
00695       resourceBrokerMap_[uniqueRBID] = rbRecordPtr;
00696     }
00697     else
00698     {
00699       rbRecordPtr = rbMapIter->second;
00700     }
00701     return rbRecordPtr;
00702   }
00703   
00704   
00705   DSMC::UniqueResourceBrokerID_t
00706   DSMC::getUniqueResourceBrokerID(DSMC::ResourceBrokerKey const& rbKey)
00707   {
00708     UniqueResourceBrokerID_t uniqueID;
00709     std::map<ResourceBrokerKey, UniqueResourceBrokerID_t>::const_iterator rbMapIter;
00710     rbMapIter = resourceBrokerIDs_.find(rbKey);
00711     if (rbMapIter == resourceBrokerIDs_.end())
00712     {
00713       std::string workString = rbKey.hltURL +
00714         boost::lexical_cast<std::string>(rbKey.hltTid) +
00715         boost::lexical_cast<std::string>(rbKey.hltInstance) +
00716         boost::lexical_cast<std::string>(rbKey.hltLocalId) +
00717         rbKey.hltClassName;
00718       uLong crc = crc32(0L, Z_NULL, 0);
00719       Bytef* crcbuf = (Bytef*) workString.data();
00720       crc = crc32(crc, crcbuf, workString.length());
00721       uniqueID = static_cast<UniqueResourceBrokerID_t>(crc);
00722       resourceBrokerIDs_[rbKey] = uniqueID;
00723     }
00724     else
00725     {
00726       uniqueID = rbMapIter->second;
00727     }
00728     return uniqueID;
00729   }
00730   
00731   
00732   DSMC::FURecordPtr
00733   DSMC::getFilterUnitRecord
00734   (
00735     DSMC::RBRecordPtr& rbRecordPtr,
00736     DSMC::FilterUnitKey const& fuKey
00737   )
00738   {
00739     FURecordPtr fuRecordPtr;
00740     std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00741     fuMapIter = rbRecordPtr->filterUnitMap.find(fuKey);
00742     if (fuMapIter == rbRecordPtr->filterUnitMap.end())
00743     {
00744       fuRecordPtr.reset(new FilterUnitRecord(fuKey,updateInterval_));
00745       rbRecordPtr->filterUnitMap[fuKey] = fuRecordPtr;
00746     }
00747     else
00748     {
00749       fuRecordPtr = fuMapIter->second;
00750     }
00751     return fuRecordPtr;
00752   }
00753   
00754   
00755   DSMC::OutModRecordPtr
00756   DSMC::getOutputModuleRecord
00757   (
00758     OutputModuleRecordMap& outModMap,
00759     DSMC::OutputModuleKey const& outModKey
00760   )
00761   {
00762     OutModRecordPtr outModRecordPtr;
00763     OutputModuleRecordMap::const_iterator omMapIter;
00764     omMapIter = outModMap.find(outModKey);
00765     if (omMapIter == outModMap.end())
00766     {
00767       outModRecordPtr.reset(new OutputModuleRecord(updateInterval_));
00768       
00769       outModRecordPtr->name = "Unknown";
00770       outModRecordPtr->id = outModKey;
00771       outModRecordPtr->initMsgSize = 0;
00772       
00773       outModMap[outModKey] = outModRecordPtr;
00774     }
00775     else
00776     {
00777       outModRecordPtr = omMapIter->second;
00778     }
00779     return outModRecordPtr;
00780   }
00781   
00782   
00783   DSMC::OutputModuleResultsList
00784   DSMC::buildOutputModuleResults(DSMC::OutputModuleRecordMap const& outputModuleMap) const
00785   {
00786     OutputModuleResultsList resultsList;
00787     
00788     OutputModuleRecordMap::const_iterator omMapIter;
00789     OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
00790     for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
00791     {
00792       OutModRecordPtr outModRecordPtr = omMapIter->second;
00793       boost::shared_ptr<OutputModuleResult> result(new OutputModuleResult());
00794       result->name = outModRecordPtr->name;
00795       result->id = outModRecordPtr->id;
00796       result->initMsgSize = outModRecordPtr->initMsgSize;
00797       outModRecordPtr->eventSize.getStats(result->eventStats);
00798       resultsList.push_back(result);
00799     }
00800     
00801     return resultsList;
00802   }
00803   
00804   
00805   DSMC::RBResultPtr
00806   DSMC::buildResourceBrokerResult(DSMC::RBRecordPtr const& rbRecordPtr) const
00807   {
00808     RBResultPtr result(new ResourceBrokerResult(rbRecordPtr->key));
00809     
00810     result->filterUnitCount = rbRecordPtr->filterUnitMap.size();
00811     result->initMsgCount = rbRecordPtr->initMsgCount;
00812     result->lastRunNumber = rbRecordPtr->lastRunNumber;
00813     result->lastEventNumber = rbRecordPtr->lastEventNumber;
00814     rbRecordPtr->eventSize.getStats(result->eventStats);
00815     rbRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
00816     rbRecordPtr->errorEventSize.getStats(result->errorEventStats);
00817     rbRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
00818     rbRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
00819     rbRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
00820     rbRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
00821     rbRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
00822     
00823     result->outstandingDataDiscardCount =
00824       result->initMsgCount +
00825       result->eventStats.getSampleCount() +
00826       result->errorEventStats.getSampleCount() +
00827       result->faultyEventStats.getSampleCount() -
00828       result->dataDiscardStats.getSampleCount();
00829     result->outstandingDQMDiscardCount =
00830       result->dqmEventStats.getSampleCount() +
00831       result->faultyDQMEventStats.getSampleCount() -
00832       result->dqmDiscardStats.getSampleCount();
00833     
00834     return result;
00835   }
00836   
00837   
00838   void DSMC::calcStatsForOutputModules(DSMC::OutputModuleRecordMap& outputModuleMap)
00839   {
00840     OutputModuleRecordMap::const_iterator omMapIter;
00841     OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
00842     for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
00843     {
00844       OutModRecordPtr outModRecordPtr = omMapIter->second;
00845       
00846       //outModRecordPtr->fragmentSize.calculateStatistics();
00847       outModRecordPtr->eventSize.calculateStatistics();
00848     }
00849   }
00850   
00851   
00852   bool compareRBResultPtrValues
00853   (
00854     DSMC::RBResultPtr firstValue,
00855     DSMC::RBResultPtr secondValue
00856   )
00857   {
00858     return *firstValue < *secondValue;
00859   }
00860 
00861 } // namespace stor
00862 
00863