Go to the documentation of this file.00001
00003
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007
00008 #include "EventFilter/StorageManager/interface/Exception.h"
00009 #include "EventFilter/StorageManager/interface/StreamsMonitorCollection.h"
00010
00011
00012 namespace stor {
00013
00014 StreamsMonitorCollection::StreamsMonitorCollection
00015 (
00016 const utils::Duration_t& updateInterval
00017 ) :
00018 MonitorCollection(updateInterval),
00019 updateInterval_(updateInterval),
00020 timeWindowForRecentResults_(boost::posix_time::seconds(10)),
00021 allStreamsFileCount_(updateInterval, timeWindowForRecentResults_),
00022 allStreamsVolume_(updateInterval, timeWindowForRecentResults_),
00023 allStreamsBandwidth_(updateInterval, timeWindowForRecentResults_)
00024 {}
00025
00026
00027 StreamsMonitorCollection::StreamRecordPtr
00028 StreamsMonitorCollection::getNewStreamRecord()
00029 {
00030 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00031
00032 StreamRecordPtr streamRecord(
00033 new StreamRecord(this,updateInterval_,timeWindowForRecentResults_)
00034 );
00035 streamRecords_.push_back(streamRecord);
00036 return streamRecord;
00037 }
00038
00039
00040 void StreamsMonitorCollection::getStreamRecords(StreamRecordList& list) const
00041 {
00042 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00043
00044 list.clear();
00045 list.reserve(streamRecords_.size());
00046
00047 for (
00048 StreamRecordList::const_iterator
00049 it = streamRecords_.begin(), itEnd = streamRecords_.end();
00050 it != itEnd;
00051 ++it
00052 )
00053 {
00054 list.push_back(*it);
00055 }
00056 }
00057
00058
00059 bool StreamsMonitorCollection::getStreamRecordsForOutputModuleLabel
00060 (
00061 const std::string& label,
00062 StreamRecordList& list
00063 ) const
00064 {
00065 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00066
00067 list.clear();
00068 list.reserve(streamRecords_.size());
00069
00070 for (
00071 StreamRecordList::const_iterator
00072 it = streamRecords_.begin(), itEnd = streamRecords_.end();
00073 it != itEnd;
00074 ++it
00075 )
00076 {
00077 if ( (*it)->outputModuleLabel == label )
00078 list.push_back(*it);
00079 }
00080 return ( ! list.empty() );
00081 }
00082
00083
00084 bool StreamsMonitorCollection::streamRecordsExist() const
00085 {
00086 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00087
00088 return ( ! streamRecords_.empty() );
00089 }
00090
00091
00092 void StreamsMonitorCollection::StreamRecord::incrementFileCount
00093 (
00094 const uint32_t lumiSection
00095 )
00096 {
00097 fileCount.addSample(1);
00098 parentCollection->allStreamsFileCount_.addSample(1);
00099 ++fileCountPerLS[lumiSection];
00100 }
00101
00102
00103 void StreamsMonitorCollection::StreamRecord::addSizeInBytes(double size)
00104 {
00105 size = size / (1024 * 1024);
00106 volume.addSample(size);
00107 parentCollection->allStreamsVolume_.addSample(size);
00108 }
00109
00110
00111 bool StreamsMonitorCollection::StreamRecord::reportLumiSectionInfo
00112 (
00113 const uint32_t& lumiSection,
00114 std::string& str
00115 )
00116 {
00117 std::ostringstream msg;
00118 if (str.empty())
00119 {
00120 msg << "LS:" << lumiSection;
00121 }
00122
00123 unsigned int count = 0;
00124 FileCountPerLumiSectionMap::iterator pos = fileCountPerLS.find(lumiSection);
00125 if ( pos != fileCountPerLS.end() )
00126 {
00127 count = pos->second;
00128 fileCountPerLS.erase(pos);
00129 }
00130 msg << "\t" << streamName << ":" << count;
00131 str += msg.str();
00132
00133 return (count>0);
00134 }
00135
00136
00137 void StreamsMonitorCollection::reportAllLumiSectionInfos
00138 (
00139 DbFileHandlerPtr dbFileHandler,
00140 EndOfRunReportPtr endOfRunReport
00141 )
00142 {
00143 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00144
00145 UnreportedLS unreportedLS;
00146 getListOfAllUnreportedLS(unreportedLS);
00147
00148 for (UnreportedLS::const_iterator it = unreportedLS.begin(),
00149 itEnd = unreportedLS.end(); it != itEnd; ++it)
00150 {
00151 std::string lsEntry;
00152 bool filesWritten = false;
00153
00154 for (StreamRecordList::const_iterator
00155 stream = streamRecords_.begin(),
00156 streamEnd = streamRecords_.end();
00157 stream != streamEnd;
00158 ++stream)
00159 {
00160 if ( (*stream)->reportLumiSectionInfo((*it), lsEntry) )
00161 filesWritten = true;
00162 }
00163 lsEntry += "\tEoLS:0";
00164 dbFileHandler->write(lsEntry);
00165
00166 if (filesWritten) ++(endOfRunReport->lsCountWithFiles);
00167 endOfRunReport->updateLatestWrittenLumiSection(*it);
00168 }
00169 }
00170
00171
00172 void StreamsMonitorCollection::getListOfAllUnreportedLS(UnreportedLS& unreportedLS)
00173 {
00174
00175
00176 for (StreamRecordList::const_iterator
00177 stream = streamRecords_.begin(),
00178 streamEnd = streamRecords_.end();
00179 stream != streamEnd;
00180 ++stream)
00181 {
00182 for (StreamRecord::FileCountPerLumiSectionMap::const_iterator
00183 lscount = (*stream)->fileCountPerLS.begin(),
00184 lscountEnd = (*stream)->fileCountPerLS.end();
00185 lscount != lscountEnd; ++lscount)
00186 {
00187 unreportedLS.insert(lscount->first);
00188 }
00189 }
00190 }
00191
00192
00193 void StreamsMonitorCollection::do_calculateStatistics()
00194 {
00195 MonitoredQuantity::Stats stats;
00196
00197 allStreamsFileCount_.calculateStatistics();
00198 allStreamsVolume_.calculateStatistics();
00199 allStreamsVolume_.getStats(stats);
00200 bool samplingHasStarted = (stats.getSampleCount() > 0);
00201 if (samplingHasStarted) {
00202 allStreamsBandwidth_.addSample(stats.getLastValueRate());
00203 }
00204 allStreamsBandwidth_.calculateStatistics();
00205
00206
00207 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00208
00209 for (
00210 StreamRecordList::const_iterator
00211 it = streamRecords_.begin(), itEnd = streamRecords_.end();
00212 it != itEnd;
00213 ++it
00214 )
00215 {
00216 (*it)->fileCount.calculateStatistics();
00217 (*it)->volume.calculateStatistics();
00218 (*it)->volume.getStats(stats);
00219 if (samplingHasStarted) {
00220 (*it)->bandwidth.addSample(stats.getLastValueRate());
00221 }
00222 (*it)->bandwidth.calculateStatistics();
00223 }
00224 }
00225
00226
00227 void StreamsMonitorCollection::do_appendInfoSpaceItems
00228 (
00229 InfoSpaceItems& infoSpaceItems
00230 )
00231 {
00232 infoSpaceItems.push_back(std::make_pair("storedEvents", &storedEvents_));
00233 infoSpaceItems.push_back(std::make_pair("storedVolume", &storedVolume_));
00234 infoSpaceItems.push_back(std::make_pair("bandwidthToDisk", &bandwidthToDisk_));
00235 infoSpaceItems.push_back(std::make_pair("streamNames", &streamNames_));
00236 infoSpaceItems.push_back(std::make_pair("eventsPerStream", &eventsPerStream_));
00237 infoSpaceItems.push_back(std::make_pair("ratePerStream", &ratePerStream_));
00238 infoSpaceItems.push_back(std::make_pair("bandwidthPerStream", &bandwidthPerStream_));
00239 }
00240
00241
00242 void StreamsMonitorCollection::do_reset()
00243 {
00244 allStreamsFileCount_.reset();
00245 allStreamsVolume_.reset();
00246 allStreamsBandwidth_.reset();
00247
00248 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00249 streamRecords_.clear();
00250 }
00251
00252
00253 void StreamsMonitorCollection::do_updateInfoSpaceItems()
00254 {
00255 MonitoredQuantity::Stats allStreamsVolumeStats;
00256 allStreamsVolume_.getStats(allStreamsVolumeStats);
00257
00258 storedEvents_ = static_cast<xdata::UnsignedInteger32>(
00259 allStreamsVolumeStats.getSampleCount()
00260 );
00261 storedVolume_ = static_cast<xdata::Double>(
00262 allStreamsVolumeStats.getValueSum()
00263 );
00264 bandwidthToDisk_ = static_cast<xdata::Double>(
00265 allStreamsVolumeStats.getValueRate(MonitoredQuantity::RECENT)
00266 );
00267
00268 boost::mutex::scoped_lock sl(streamRecordsMutex_);
00269
00270 streamNames_.clear();
00271 eventsPerStream_.clear();
00272 ratePerStream_.clear();
00273 bandwidthPerStream_.clear();
00274
00275 streamNames_.reserve(streamRecords_.size());
00276 eventsPerStream_.reserve(streamRecords_.size());
00277 ratePerStream_.reserve(streamRecords_.size());
00278 bandwidthPerStream_.reserve(streamRecords_.size());
00279
00280 for (
00281 StreamRecordList::const_iterator
00282 it = streamRecords_.begin(), itEnd = streamRecords_.end();
00283 it != itEnd;
00284 ++it
00285 )
00286 {
00287 MonitoredQuantity::Stats streamVolumeStats;
00288 (*it)->volume.getStats(streamVolumeStats);
00289 MonitoredQuantity::Stats streamBandwidthStats;
00290 (*it)->bandwidth.getStats(streamBandwidthStats);
00291
00292 streamNames_.push_back(
00293 static_cast<xdata::String>( (*it)->streamName )
00294 );
00295
00296 eventsPerStream_.push_back(
00297 static_cast<xdata::UnsignedInteger32>(
00298 streamVolumeStats.getSampleCount(MonitoredQuantity::FULL)
00299 )
00300 );
00301
00302 ratePerStream_.push_back(
00303 static_cast<xdata::Double>(
00304 streamVolumeStats.getSampleRate(MonitoredQuantity::RECENT)
00305 )
00306 );
00307
00308 bandwidthPerStream_.push_back(
00309 static_cast<xdata::Double>(
00310 streamBandwidthStats.getValueRate(MonitoredQuantity::RECENT)
00311 )
00312 );
00313 }
00314 }
00315
00316 }
00317