Go to the documentation of this file.00001
00003
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007
00008 #include "EventFilter/StorageManager/interface/Exception.h"
00009 #include "EventFilter/StorageManager/interface/StreamsMonitorCollection.h"
00010
00011
00012 namespace stor {
00013
00014 StreamsMonitorCollection::StreamsMonitorCollection
00015 (
00016 const utils::Duration_t& updateInterval
00017 ) :
00018 MonitorCollection(updateInterval),
00019 updateInterval_(updateInterval),
00020 timeWindowForRecentResults_(boost::posix_time::seconds(30)),
00021 allStreamsFileCount_(updateInterval, timeWindowForRecentResults_),
00022 allStreamsVolume_(updateInterval, timeWindowForRecentResults_),
00023 allStreamsBandwidth_(updateInterval, timeWindowForRecentResults_)
00024 {}
00025
00026
00027 StreamsMonitorCollection::StreamRecordPtr
00028 StreamsMonitorCollection::getNewStreamRecord()
00029 {
00030 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00031
00032 StreamRecordPtr streamRecord(
00033 new StreamRecord(this,updateInterval_,timeWindowForRecentResults_)
00034 );
00035 streamRecords_.push_back(streamRecord);
00036 return streamRecord;
00037 }
00038
00039
00040 void StreamsMonitorCollection::getStreamRecords(StreamRecordList& list) const
00041 {
00042 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00043
00044 list.clear();
00045 list.reserve(streamRecords_.size());
00046
00047 for (
00048 StreamRecordList::const_iterator
00049 it = streamRecords_.begin(), itEnd = streamRecords_.end();
00050 it != itEnd;
00051 ++it
00052 )
00053 {
00054 list.push_back(*it);
00055 }
00056 }
00057
00058
00059 bool StreamsMonitorCollection::streamRecordsExist() const
00060 {
00061 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00062
00063 return ( ! streamRecords_.empty() );
00064 }
00065
00066
00067 void StreamsMonitorCollection::StreamRecord::incrementFileCount
00068 (
00069 const uint32_t lumiSection
00070 )
00071 {
00072 fileCount.addSample(1);
00073 parentCollection->allStreamsFileCount_.addSample(1);
00074 ++fileCountPerLS[lumiSection];
00075 }
00076
00077
00078 void StreamsMonitorCollection::StreamRecord::addSizeInBytes(double size)
00079 {
00080 size = size / (1024 * 1024);
00081 volume.addSample(size);
00082 parentCollection->allStreamsVolume_.addSample(size);
00083 }
00084
00085
00086 bool StreamsMonitorCollection::StreamRecord::reportLumiSectionInfo
00087 (
00088 const uint32_t& lumiSection,
00089 std::string& str
00090 )
00091 {
00092 std::ostringstream msg;
00093 if (str.empty())
00094 {
00095 msg << "LS:" << lumiSection;
00096 }
00097
00098 unsigned int count = 0;
00099 FileCountPerLumiSectionMap::iterator pos = fileCountPerLS.find(lumiSection);
00100 if ( pos != fileCountPerLS.end() )
00101 {
00102 count = pos->second;
00103 fileCountPerLS.erase(pos);
00104 }
00105 msg << "\t" << streamName << ":" << count;
00106 str += msg.str();
00107
00108 return (count>0);
00109 }
00110
00111
00112 void StreamsMonitorCollection::reportAllLumiSectionInfos
00113 (
00114 DbFileHandlerPtr dbFileHandler,
00115 EndOfRunReportPtr endOfRunReport
00116 )
00117 {
00118 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00119
00120 UnreportedLS unreportedLS;
00121 getListOfAllUnreportedLS(unreportedLS);
00122
00123 for (UnreportedLS::const_iterator it = unreportedLS.begin(),
00124 itEnd = unreportedLS.end(); it != itEnd; ++it)
00125 {
00126 std::string lsEntry;
00127 bool filesWritten = false;
00128
00129 for (StreamRecordList::const_iterator
00130 stream = streamRecords_.begin(),
00131 streamEnd = streamRecords_.end();
00132 stream != streamEnd;
00133 ++stream)
00134 {
00135 if ( (*stream)->reportLumiSectionInfo((*it), lsEntry) )
00136 filesWritten = true;
00137 }
00138 lsEntry += "\tEoLS:0";
00139 dbFileHandler->write(lsEntry);
00140
00141 if (filesWritten) ++(endOfRunReport->lsCountWithFiles);
00142 endOfRunReport->updateLatestWrittenLumiSection(*it);
00143 }
00144 }
00145
00146
00147 void StreamsMonitorCollection::getListOfAllUnreportedLS(UnreportedLS& unreportedLS)
00148 {
00149
00150
00151 for (StreamRecordList::const_iterator
00152 stream = streamRecords_.begin(),
00153 streamEnd = streamRecords_.end();
00154 stream != streamEnd;
00155 ++stream)
00156 {
00157 for (StreamRecord::FileCountPerLumiSectionMap::const_iterator
00158 lscount = (*stream)->fileCountPerLS.begin(),
00159 lscountEnd = (*stream)->fileCountPerLS.end();
00160 lscount != lscountEnd; ++lscount)
00161 {
00162 unreportedLS.insert(lscount->first);
00163 }
00164 }
00165 }
00166
00167
00168 void StreamsMonitorCollection::do_calculateStatistics()
00169 {
00170 MonitoredQuantity::Stats stats;
00171
00172 allStreamsFileCount_.calculateStatistics();
00173 allStreamsVolume_.calculateStatistics();
00174 allStreamsVolume_.getStats(stats);
00175 bool samplingHasStarted = (stats.getSampleCount() > 0);
00176 if (samplingHasStarted) {
00177 allStreamsBandwidth_.addSample(stats.getLastValueRate());
00178 }
00179 allStreamsBandwidth_.calculateStatistics();
00180
00181
00182 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00183
00184 for (
00185 StreamRecordList::const_iterator
00186 it = streamRecords_.begin(), itEnd = streamRecords_.end();
00187 it != itEnd;
00188 ++it
00189 )
00190 {
00191 (*it)->fileCount.calculateStatistics();
00192 (*it)->volume.calculateStatistics();
00193 (*it)->volume.getStats(stats);
00194 if (samplingHasStarted) {
00195 (*it)->bandwidth.addSample(stats.getLastValueRate());
00196 }
00197 (*it)->bandwidth.calculateStatistics();
00198 }
00199 }
00200
00201
00202 void StreamsMonitorCollection::do_appendInfoSpaceItems
00203 (
00204 InfoSpaceItems& infoSpaceItems
00205 )
00206 {
00207 infoSpaceItems.push_back(std::make_pair("storedEvents", &storedEvents_));
00208 infoSpaceItems.push_back(std::make_pair("storedVolume", &storedVolume_));
00209 infoSpaceItems.push_back(std::make_pair("bandwidthToDisk", &bandwidthToDisk_));
00210 infoSpaceItems.push_back(std::make_pair("streamNames", &streamNames_));
00211 infoSpaceItems.push_back(std::make_pair("eventsPerStream", &eventsPerStream_));
00212 infoSpaceItems.push_back(std::make_pair("ratePerStream", &ratePerStream_));
00213 infoSpaceItems.push_back(std::make_pair("bandwidthPerStream", &bandwidthPerStream_));
00214 }
00215
00216
00217 void StreamsMonitorCollection::do_reset()
00218 {
00219 allStreamsFileCount_.reset();
00220 allStreamsVolume_.reset();
00221 allStreamsBandwidth_.reset();
00222
00223 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00224 streamRecords_.clear();
00225 }
00226
00227
00228 void StreamsMonitorCollection::do_updateInfoSpaceItems()
00229 {
00230 MonitoredQuantity::Stats allStreamsVolumeStats;
00231 allStreamsVolume_.getStats(allStreamsVolumeStats);
00232
00233 storedEvents_ = static_cast<xdata::UnsignedInteger32>(
00234 allStreamsVolumeStats.getSampleCount()
00235 );
00236 storedVolume_ = static_cast<xdata::Double>(
00237 allStreamsVolumeStats.getValueSum()
00238 );
00239 bandwidthToDisk_ = static_cast<xdata::Double>(
00240 allStreamsVolumeStats.getValueRate(MonitoredQuantity::RECENT)
00241 );
00242
00243 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00244
00245 streamNames_.clear();
00246 eventsPerStream_.clear();
00247 ratePerStream_.clear();
00248 bandwidthPerStream_.clear();
00249
00250 streamNames_.reserve(streamRecords_.size());
00251 eventsPerStream_.reserve(streamRecords_.size());
00252 ratePerStream_.reserve(streamRecords_.size());
00253 bandwidthPerStream_.reserve(streamRecords_.size());
00254
00255 for (
00256 StreamRecordList::const_iterator
00257 it = streamRecords_.begin(), itEnd = streamRecords_.end();
00258 it != itEnd;
00259 ++it
00260 )
00261 {
00262 MonitoredQuantity::Stats streamVolumeStats;
00263 (*it)->volume.getStats(streamVolumeStats);
00264 MonitoredQuantity::Stats streamBandwidthStats;
00265 (*it)->bandwidth.getStats(streamBandwidthStats);
00266
00267 streamNames_.push_back(
00268 static_cast<xdata::String>( (*it)->streamName )
00269 );
00270
00271 eventsPerStream_.push_back(
00272 static_cast<xdata::UnsignedInteger32>(
00273 streamVolumeStats.getSampleCount(MonitoredQuantity::FULL)
00274 )
00275 );
00276
00277 ratePerStream_.push_back(
00278 static_cast<xdata::Double>(
00279 streamVolumeStats.getSampleRate(MonitoredQuantity::RECENT)
00280 )
00281 );
00282
00283 bandwidthPerStream_.push_back(
00284 static_cast<xdata::Double>(
00285 streamBandwidthStats.getValueRate(MonitoredQuantity::RECENT)
00286 )
00287 );
00288 }
00289 }
00290
00291 }
00292