CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch12/src/EventFilter/StorageManager/src/StreamsMonitorCollection.cc

Go to the documentation of this file.
00001 // $Id: StreamsMonitorCollection.cc,v 1.17 2011/04/18 15:18:57 mommsen Exp $
00003 
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007 
00008 #include "EventFilter/StorageManager/interface/Exception.h"
00009 #include "EventFilter/StorageManager/interface/StreamsMonitorCollection.h"
00010 
00011 
00012 namespace stor {
00013   
00014   StreamsMonitorCollection::StreamsMonitorCollection
00015   (
00016     const utils::Duration_t& updateInterval
00017   ) :
00018   MonitorCollection(updateInterval),
00019   updateInterval_(updateInterval),
00020   timeWindowForRecentResults_(boost::posix_time::seconds(30)),
00021   allStreamsFileCount_(updateInterval, timeWindowForRecentResults_),
00022   allStreamsVolume_(updateInterval, timeWindowForRecentResults_),
00023   allStreamsBandwidth_(updateInterval, timeWindowForRecentResults_)
00024   {}
00025   
00026   
00027   StreamsMonitorCollection::StreamRecordPtr
00028   StreamsMonitorCollection::getNewStreamRecord()
00029   {
00030     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00031     
00032     StreamRecordPtr streamRecord(
00033       new StreamRecord(this,updateInterval_,timeWindowForRecentResults_)
00034     );
00035     streamRecords_.push_back(streamRecord);
00036     return streamRecord;
00037   }
00038   
00039   
00040   void StreamsMonitorCollection::getStreamRecords(StreamRecordList& list) const
00041   {
00042     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00043 
00044     list.clear();
00045     list.reserve(streamRecords_.size());
00046     
00047     for (
00048       StreamRecordList::const_iterator 
00049         it = streamRecords_.begin(), itEnd = streamRecords_.end();
00050       it != itEnd;
00051       ++it
00052     )
00053     {
00054       list.push_back(*it);
00055     }
00056   }
00057   
00058 
00059   bool StreamsMonitorCollection::streamRecordsExist() const
00060   {
00061     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00062 
00063     return ( ! streamRecords_.empty() );
00064   }
00065 
00066   
00067   void StreamsMonitorCollection::StreamRecord::incrementFileCount
00068   (
00069     const uint32_t lumiSection
00070   )
00071   {
00072     fileCount.addSample(1);
00073     parentCollection->allStreamsFileCount_.addSample(1);
00074     ++fileCountPerLS[lumiSection];
00075   }
00076   
00077   
00078   void StreamsMonitorCollection::StreamRecord::addSizeInBytes(double size)
00079   {
00080     size = size / (1024 * 1024);
00081     volume.addSample(size);
00082     parentCollection->allStreamsVolume_.addSample(size);
00083   }
00084   
00085   
00086   void StreamsMonitorCollection::StreamRecord::reportLumiSectionInfo
00087   (
00088     const uint32_t& lumiSection,
00089     std::string& str
00090   )
00091   {
00092     std::ostringstream msg;
00093     if (str.empty())
00094     {
00095       msg << "LS:" << lumiSection;
00096     }
00097     
00098     unsigned int count = 0;
00099     FileCountPerLumiSectionMap::iterator pos = fileCountPerLS.find(lumiSection);
00100     if ( pos != fileCountPerLS.end() )
00101     {
00102       count = pos->second;
00103       fileCountPerLS.erase(pos);
00104     }
00105     msg << "\t" << streamName << ":" << count;
00106     str += msg.str();
00107   }
00108   
00109   
00110   void StreamsMonitorCollection::reportAllLumiSectionInfos(DbFileHandlerPtr dbFileHandler)
00111   {
00112     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00113     
00114     UnreportedLS unreportedLS;
00115     getListOfAllUnreportedLS(unreportedLS);
00116     
00117     for (UnreportedLS::const_iterator it = unreportedLS.begin(),
00118            itEnd = unreportedLS.end(); it != itEnd; ++it)
00119     {
00120       std::string lsEntry;
00121       for (StreamRecordList::const_iterator 
00122              stream = streamRecords_.begin(),
00123              streamEnd = streamRecords_.end();
00124            stream != streamEnd;
00125            ++stream)
00126       {
00127         (*stream)->reportLumiSectionInfo((*it), lsEntry);
00128       }
00129       dbFileHandler->write(lsEntry);
00130     }
00131   }
00132   
00133   
00134   void StreamsMonitorCollection::getListOfAllUnreportedLS(UnreportedLS& unreportedLS)
00135   {
00136     // Have to loop over all streams as not every stream
00137     // might have got an event for a given lumi section
00138     for (StreamRecordList::const_iterator 
00139            stream = streamRecords_.begin(),
00140            streamEnd = streamRecords_.end();
00141          stream != streamEnd;
00142          ++stream)
00143     {
00144       for (StreamRecord::FileCountPerLumiSectionMap::const_iterator
00145              lscount = (*stream)->fileCountPerLS.begin(),
00146              lscountEnd = (*stream)->fileCountPerLS.end();
00147            lscount != lscountEnd; ++lscount)
00148       {
00149         unreportedLS.insert(lscount->first);
00150       }
00151     }
00152   }
00153   
00154   
00155   void StreamsMonitorCollection::do_calculateStatistics()
00156   {
00157     MonitoredQuantity::Stats stats;
00158     
00159     allStreamsFileCount_.calculateStatistics();
00160     allStreamsVolume_.calculateStatistics();
00161     allStreamsVolume_.getStats(stats);
00162     bool samplingHasStarted = (stats.getSampleCount() > 0);
00163     if (samplingHasStarted) {
00164       allStreamsBandwidth_.addSample(stats.getLastValueRate());
00165     }
00166     allStreamsBandwidth_.calculateStatistics();
00167     
00168     
00169     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00170     
00171     for (
00172       StreamRecordList::const_iterator 
00173         it = streamRecords_.begin(), itEnd = streamRecords_.end();
00174       it != itEnd;
00175       ++it
00176     ) 
00177     {
00178       (*it)->fileCount.calculateStatistics();
00179       (*it)->volume.calculateStatistics();
00180       (*it)->volume.getStats(stats);
00181       if (samplingHasStarted) {
00182         (*it)->bandwidth.addSample(stats.getLastValueRate());
00183       }
00184       (*it)->bandwidth.calculateStatistics();
00185     }
00186   }
00187   
00188   
00189   void StreamsMonitorCollection::do_appendInfoSpaceItems
00190   (
00191     InfoSpaceItems& infoSpaceItems
00192   )
00193   {
00194     infoSpaceItems.push_back(std::make_pair("storedEvents",  &storedEvents_));
00195     infoSpaceItems.push_back(std::make_pair("storedVolume",  &storedVolume_));
00196     infoSpaceItems.push_back(std::make_pair("bandwidthToDisk",  &bandwidthToDisk_));
00197     infoSpaceItems.push_back(std::make_pair("streamNames",  &streamNames_));
00198     infoSpaceItems.push_back(std::make_pair("eventsPerStream",  &eventsPerStream_));
00199     infoSpaceItems.push_back(std::make_pair("ratePerStream",  &ratePerStream_));
00200     infoSpaceItems.push_back(std::make_pair("bandwidthPerStream",  &bandwidthPerStream_));
00201   }
00202   
00203   
00204   void StreamsMonitorCollection::do_reset()
00205   {
00206     allStreamsFileCount_.reset();
00207     allStreamsVolume_.reset();
00208     allStreamsBandwidth_.reset();
00209     
00210     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00211     streamRecords_.clear();
00212   }
00213   
00214   
00215   void StreamsMonitorCollection::do_updateInfoSpaceItems()
00216   {
00217     MonitoredQuantity::Stats allStreamsVolumeStats;
00218     allStreamsVolume_.getStats(allStreamsVolumeStats);
00219     
00220     storedEvents_ = static_cast<xdata::UnsignedInteger32>(
00221       allStreamsVolumeStats.getSampleCount()
00222     );
00223     storedVolume_ = static_cast<xdata::Double>(
00224       allStreamsVolumeStats.getValueSum()
00225     );
00226     bandwidthToDisk_ = static_cast<xdata::Double>(
00227       allStreamsVolumeStats.getValueRate(MonitoredQuantity::RECENT)
00228     );
00229     
00230     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00231     
00232     const size_t statsCount = streamRecords_.size();
00233     const size_t infospaceCount = streamNames_.size();
00234 
00235     if ( statsCount != infospaceCount )
00236     {
00237       streamNames_.resize(statsCount);
00238       eventsPerStream_.resize(statsCount);
00239       ratePerStream_.resize(statsCount);
00240       bandwidthPerStream_.resize(statsCount);
00241     }
00242 
00243     for (size_t i=0; i < statsCount; ++i)
00244     {
00245       MonitoredQuantity::Stats streamVolumeStats;
00246       streamRecords_.at(i)->volume.getStats(streamVolumeStats);
00247       MonitoredQuantity::Stats streamBandwidthStats;
00248       streamRecords_.at(i)->bandwidth.getStats(streamBandwidthStats);
00249 
00250       streamNames_.at(i) = static_cast<xdata::String>(streamRecords_.at(i)->streamName);
00251       eventsPerStream_.at(i) = static_cast<xdata::UnsignedInteger32>(
00252         streamVolumeStats.getSampleCount(MonitoredQuantity::FULL)
00253       );
00254       ratePerStream_.at(i) = static_cast<xdata::Double>(
00255         streamVolumeStats.getSampleRate(MonitoredQuantity::RECENT)
00256       );
00257       bandwidthPerStream_.at(i) = static_cast<xdata::Double>(
00258         streamBandwidthStats.getValueRate(MonitoredQuantity::RECENT)
00259       );
00260     }
00261   }
00262   
00263 } // namespace stor
00264