Go to the documentation of this file.00001
00003
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007
00008 #include "EventFilter/StorageManager/interface/Exception.h"
00009 #include "EventFilter/StorageManager/interface/StreamsMonitorCollection.h"
00010
00011
00012 namespace stor {
00013
00014 StreamsMonitorCollection::StreamsMonitorCollection
00015 (
00016 const utils::Duration_t& updateInterval
00017 ) :
00018 MonitorCollection(updateInterval),
00019 updateInterval_(updateInterval),
00020 timeWindowForRecentResults_(boost::posix_time::seconds(30)),
00021 allStreamsFileCount_(updateInterval, timeWindowForRecentResults_),
00022 allStreamsVolume_(updateInterval, timeWindowForRecentResults_),
00023 allStreamsBandwidth_(updateInterval, timeWindowForRecentResults_)
00024 {}
00025
00026
00027 StreamsMonitorCollection::StreamRecordPtr
00028 StreamsMonitorCollection::getNewStreamRecord()
00029 {
00030 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00031
00032 StreamRecordPtr streamRecord(
00033 new StreamRecord(this,updateInterval_,timeWindowForRecentResults_)
00034 );
00035 streamRecords_.push_back(streamRecord);
00036 return streamRecord;
00037 }
00038
00039
00040 void StreamsMonitorCollection::getStreamRecords(StreamRecordList& list) const
00041 {
00042 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00043
00044 list.clear();
00045 list.reserve(streamRecords_.size());
00046
00047 for (
00048 StreamRecordList::const_iterator
00049 it = streamRecords_.begin(), itEnd = streamRecords_.end();
00050 it != itEnd;
00051 ++it
00052 )
00053 {
00054 list.push_back(*it);
00055 }
00056 }
00057
00058
00059 bool StreamsMonitorCollection::streamRecordsExist() const
00060 {
00061 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00062
00063 return ( ! streamRecords_.empty() );
00064 }
00065
00066
00067 void StreamsMonitorCollection::StreamRecord::incrementFileCount
00068 (
00069 const uint32_t lumiSection
00070 )
00071 {
00072 fileCount.addSample(1);
00073 parentCollection->allStreamsFileCount_.addSample(1);
00074 ++fileCountPerLS[lumiSection];
00075 }
00076
00077
00078 void StreamsMonitorCollection::StreamRecord::addSizeInBytes(double size)
00079 {
00080 size = size / (1024 * 1024);
00081 volume.addSample(size);
00082 parentCollection->allStreamsVolume_.addSample(size);
00083 }
00084
00085
00086 void StreamsMonitorCollection::StreamRecord::reportLumiSectionInfo
00087 (
00088 const uint32_t& lumiSection,
00089 std::string& str
00090 )
00091 {
00092 std::ostringstream msg;
00093 if (str.empty())
00094 {
00095 msg << "LS:" << lumiSection;
00096 }
00097
00098 unsigned int count = 0;
00099 FileCountPerLumiSectionMap::iterator pos = fileCountPerLS.find(lumiSection);
00100 if ( pos != fileCountPerLS.end() )
00101 {
00102 count = pos->second;
00103 fileCountPerLS.erase(pos);
00104 }
00105 msg << "\t" << streamName << ":" << count;
00106 str += msg.str();
00107 }
00108
00109
00110 void StreamsMonitorCollection::reportAllLumiSectionInfos(DbFileHandlerPtr dbFileHandler)
00111 {
00112 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00113
00114 UnreportedLS unreportedLS;
00115 getListOfAllUnreportedLS(unreportedLS);
00116
00117 for (UnreportedLS::const_iterator it = unreportedLS.begin(),
00118 itEnd = unreportedLS.end(); it != itEnd; ++it)
00119 {
00120 std::string lsEntry;
00121 for (StreamRecordList::const_iterator
00122 stream = streamRecords_.begin(),
00123 streamEnd = streamRecords_.end();
00124 stream != streamEnd;
00125 ++stream)
00126 {
00127 (*stream)->reportLumiSectionInfo((*it), lsEntry);
00128 }
00129 dbFileHandler->write(lsEntry);
00130 }
00131 }
00132
00133
00134 void StreamsMonitorCollection::getListOfAllUnreportedLS(UnreportedLS& unreportedLS)
00135 {
00136
00137
00138 for (StreamRecordList::const_iterator
00139 stream = streamRecords_.begin(),
00140 streamEnd = streamRecords_.end();
00141 stream != streamEnd;
00142 ++stream)
00143 {
00144 for (StreamRecord::FileCountPerLumiSectionMap::const_iterator
00145 lscount = (*stream)->fileCountPerLS.begin(),
00146 lscountEnd = (*stream)->fileCountPerLS.end();
00147 lscount != lscountEnd; ++lscount)
00148 {
00149 unreportedLS.insert(lscount->first);
00150 }
00151 }
00152 }
00153
00154
00155 void StreamsMonitorCollection::do_calculateStatistics()
00156 {
00157 MonitoredQuantity::Stats stats;
00158
00159 allStreamsFileCount_.calculateStatistics();
00160 allStreamsVolume_.calculateStatistics();
00161 allStreamsVolume_.getStats(stats);
00162 bool samplingHasStarted = (stats.getSampleCount() > 0);
00163 if (samplingHasStarted) {
00164 allStreamsBandwidth_.addSample(stats.getLastValueRate());
00165 }
00166 allStreamsBandwidth_.calculateStatistics();
00167
00168
00169 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00170
00171 for (
00172 StreamRecordList::const_iterator
00173 it = streamRecords_.begin(), itEnd = streamRecords_.end();
00174 it != itEnd;
00175 ++it
00176 )
00177 {
00178 (*it)->fileCount.calculateStatistics();
00179 (*it)->volume.calculateStatistics();
00180 (*it)->volume.getStats(stats);
00181 if (samplingHasStarted) {
00182 (*it)->bandwidth.addSample(stats.getLastValueRate());
00183 }
00184 (*it)->bandwidth.calculateStatistics();
00185 }
00186 }
00187
00188
00189 void StreamsMonitorCollection::do_appendInfoSpaceItems
00190 (
00191 InfoSpaceItems& infoSpaceItems
00192 )
00193 {
00194 infoSpaceItems.push_back(std::make_pair("storedEvents", &storedEvents_));
00195 infoSpaceItems.push_back(std::make_pair("storedVolume", &storedVolume_));
00196 infoSpaceItems.push_back(std::make_pair("bandwidthToDisk", &bandwidthToDisk_));
00197 infoSpaceItems.push_back(std::make_pair("streamNames", &streamNames_));
00198 infoSpaceItems.push_back(std::make_pair("eventsPerStream", &eventsPerStream_));
00199 infoSpaceItems.push_back(std::make_pair("ratePerStream", &ratePerStream_));
00200 infoSpaceItems.push_back(std::make_pair("bandwidthPerStream", &bandwidthPerStream_));
00201 }
00202
00203
00204 void StreamsMonitorCollection::do_reset()
00205 {
00206 allStreamsFileCount_.reset();
00207 allStreamsVolume_.reset();
00208 allStreamsBandwidth_.reset();
00209
00210 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00211 streamRecords_.clear();
00212 }
00213
00214
00215 void StreamsMonitorCollection::do_updateInfoSpaceItems()
00216 {
00217 MonitoredQuantity::Stats allStreamsVolumeStats;
00218 allStreamsVolume_.getStats(allStreamsVolumeStats);
00219
00220 storedEvents_ = static_cast<xdata::UnsignedInteger32>(
00221 allStreamsVolumeStats.getSampleCount()
00222 );
00223 storedVolume_ = static_cast<xdata::Double>(
00224 allStreamsVolumeStats.getValueSum()
00225 );
00226 bandwidthToDisk_ = static_cast<xdata::Double>(
00227 allStreamsVolumeStats.getValueRate(MonitoredQuantity::RECENT)
00228 );
00229
00230 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00231
00232 const size_t statsCount = streamRecords_.size();
00233 const size_t infospaceCount = streamNames_.size();
00234
00235 if ( statsCount != infospaceCount )
00236 {
00237 streamNames_.resize(statsCount);
00238 eventsPerStream_.resize(statsCount);
00239 ratePerStream_.resize(statsCount);
00240 bandwidthPerStream_.resize(statsCount);
00241 }
00242
00243 for (size_t i=0; i < statsCount; ++i)
00244 {
00245 MonitoredQuantity::Stats streamVolumeStats;
00246 streamRecords_.at(i)->volume.getStats(streamVolumeStats);
00247 MonitoredQuantity::Stats streamBandwidthStats;
00248 streamRecords_.at(i)->bandwidth.getStats(streamBandwidthStats);
00249
00250 streamNames_.at(i) = static_cast<xdata::String>(streamRecords_.at(i)->streamName);
00251 eventsPerStream_.at(i) = static_cast<xdata::UnsignedInteger32>(
00252 streamVolumeStats.getSampleCount(MonitoredQuantity::FULL)
00253 );
00254 ratePerStream_.at(i) = static_cast<xdata::Double>(
00255 streamVolumeStats.getSampleRate(MonitoredQuantity::RECENT)
00256 );
00257 bandwidthPerStream_.at(i) = static_cast<xdata::Double>(
00258 streamBandwidthStats.getValueRate(MonitoredQuantity::RECENT)
00259 );
00260 }
00261 }
00262
00263 }
00264