CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/EventFilter/StorageManager/src/StreamsMonitorCollection.cc

Go to the documentation of this file.
00001 // $Id: StreamsMonitorCollection.cc,v 1.21 2011/06/20 16:38:51 mommsen Exp $
00003 
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007 
00008 #include "EventFilter/StorageManager/interface/Exception.h"
00009 #include "EventFilter/StorageManager/interface/StreamsMonitorCollection.h"
00010 
00011 
00012 namespace stor {
00013   
00014   StreamsMonitorCollection::StreamsMonitorCollection
00015   (
00016     const utils::Duration_t& updateInterval
00017   ) :
00018   MonitorCollection(updateInterval),
00019   updateInterval_(updateInterval),
00020   timeWindowForRecentResults_(boost::posix_time::seconds(30)),
00021   allStreamsFileCount_(updateInterval, timeWindowForRecentResults_),
00022   allStreamsVolume_(updateInterval, timeWindowForRecentResults_),
00023   allStreamsBandwidth_(updateInterval, timeWindowForRecentResults_)
00024   {}
00025   
00026   
00027   StreamsMonitorCollection::StreamRecordPtr
00028   StreamsMonitorCollection::getNewStreamRecord()
00029   {
00030     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00031     
00032     StreamRecordPtr streamRecord(
00033       new StreamRecord(this,updateInterval_,timeWindowForRecentResults_)
00034     );
00035     streamRecords_.push_back(streamRecord);
00036     return streamRecord;
00037   }
00038   
00039   
00040   void StreamsMonitorCollection::getStreamRecords(StreamRecordList& list) const
00041   {
00042     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00043 
00044     list.clear();
00045     list.reserve(streamRecords_.size());
00046     
00047     for (
00048       StreamRecordList::const_iterator 
00049         it = streamRecords_.begin(), itEnd = streamRecords_.end();
00050       it != itEnd;
00051       ++it
00052     )
00053     {
00054       list.push_back(*it);
00055     }
00056   }
00057   
00058 
00059   bool StreamsMonitorCollection::streamRecordsExist() const
00060   {
00061     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00062 
00063     return ( ! streamRecords_.empty() );
00064   }
00065 
00066   
00067   void StreamsMonitorCollection::StreamRecord::incrementFileCount
00068   (
00069     const uint32_t lumiSection
00070   )
00071   {
00072     fileCount.addSample(1);
00073     parentCollection->allStreamsFileCount_.addSample(1);
00074     ++fileCountPerLS[lumiSection];
00075   }
00076   
00077   
00078   void StreamsMonitorCollection::StreamRecord::addSizeInBytes(double size)
00079   {
00080     size = size / (1024 * 1024);
00081     volume.addSample(size);
00082     parentCollection->allStreamsVolume_.addSample(size);
00083   }
00084   
00085   
00086   bool StreamsMonitorCollection::StreamRecord::reportLumiSectionInfo
00087   (
00088     const uint32_t& lumiSection,
00089     std::string& str
00090   )
00091   {
00092     std::ostringstream msg;
00093     if (str.empty())
00094     {
00095       msg << "LS:" << lumiSection;
00096     }
00097     
00098     unsigned int count = 0;
00099     FileCountPerLumiSectionMap::iterator pos = fileCountPerLS.find(lumiSection);
00100     if ( pos != fileCountPerLS.end() )
00101     {
00102       count = pos->second;
00103       fileCountPerLS.erase(pos);
00104     }
00105     msg << "\t" << streamName << ":" << count;
00106     str += msg.str();
00107 
00108     return (count>0);
00109   }
00110   
00111   
00112   void StreamsMonitorCollection::reportAllLumiSectionInfos
00113   (
00114     DbFileHandlerPtr dbFileHandler,
00115     EndOfRunReportPtr endOfRunReport
00116   )
00117   {
00118     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00119 
00120     UnreportedLS unreportedLS;
00121     getListOfAllUnreportedLS(unreportedLS);
00122     
00123     for (UnreportedLS::const_iterator it = unreportedLS.begin(),
00124            itEnd = unreportedLS.end(); it != itEnd; ++it)
00125     {
00126       std::string lsEntry;
00127       bool filesWritten = false;
00128 
00129       for (StreamRecordList::const_iterator 
00130              stream = streamRecords_.begin(),
00131              streamEnd = streamRecords_.end();
00132            stream != streamEnd;
00133            ++stream)
00134       {
00135         if ( (*stream)->reportLumiSectionInfo((*it), lsEntry) )
00136           filesWritten = true;
00137       }
00138       lsEntry += "\tEoLS:0";
00139       dbFileHandler->write(lsEntry);
00140 
00141       if (filesWritten) ++(endOfRunReport->lsCountWithFiles);
00142       endOfRunReport->updateLatestWrittenLumiSection(*it);
00143     }
00144   }
00145   
00146   
00147   void StreamsMonitorCollection::getListOfAllUnreportedLS(UnreportedLS& unreportedLS)
00148   {
00149     // Have to loop over all streams as not every stream
00150     // might have got an event for a given lumi section
00151     for (StreamRecordList::const_iterator 
00152            stream = streamRecords_.begin(),
00153            streamEnd = streamRecords_.end();
00154          stream != streamEnd;
00155          ++stream)
00156     {
00157       for (StreamRecord::FileCountPerLumiSectionMap::const_iterator
00158              lscount = (*stream)->fileCountPerLS.begin(),
00159              lscountEnd = (*stream)->fileCountPerLS.end();
00160            lscount != lscountEnd; ++lscount)
00161       {
00162         unreportedLS.insert(lscount->first);
00163       }
00164     }
00165   }
00166   
00167   
00168   void StreamsMonitorCollection::do_calculateStatistics()
00169   {
00170     MonitoredQuantity::Stats stats;
00171     
00172     allStreamsFileCount_.calculateStatistics();
00173     allStreamsVolume_.calculateStatistics();
00174     allStreamsVolume_.getStats(stats);
00175     bool samplingHasStarted = (stats.getSampleCount() > 0);
00176     if (samplingHasStarted) {
00177       allStreamsBandwidth_.addSample(stats.getLastValueRate());
00178     }
00179     allStreamsBandwidth_.calculateStatistics();
00180     
00181     
00182     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00183     
00184     for (
00185       StreamRecordList::const_iterator 
00186         it = streamRecords_.begin(), itEnd = streamRecords_.end();
00187       it != itEnd;
00188       ++it
00189     ) 
00190     {
00191       (*it)->fileCount.calculateStatistics();
00192       (*it)->volume.calculateStatistics();
00193       (*it)->volume.getStats(stats);
00194       if (samplingHasStarted) {
00195         (*it)->bandwidth.addSample(stats.getLastValueRate());
00196       }
00197       (*it)->bandwidth.calculateStatistics();
00198     }
00199   }
00200   
00201   
00202   void StreamsMonitorCollection::do_appendInfoSpaceItems
00203   (
00204     InfoSpaceItems& infoSpaceItems
00205   )
00206   {
00207     infoSpaceItems.push_back(std::make_pair("storedEvents",  &storedEvents_));
00208     infoSpaceItems.push_back(std::make_pair("storedVolume",  &storedVolume_));
00209     infoSpaceItems.push_back(std::make_pair("bandwidthToDisk",  &bandwidthToDisk_));
00210     infoSpaceItems.push_back(std::make_pair("streamNames",  &streamNames_));
00211     infoSpaceItems.push_back(std::make_pair("eventsPerStream",  &eventsPerStream_));
00212     infoSpaceItems.push_back(std::make_pair("ratePerStream",  &ratePerStream_));
00213     infoSpaceItems.push_back(std::make_pair("bandwidthPerStream",  &bandwidthPerStream_));
00214   }
00215   
00216   
00217   void StreamsMonitorCollection::do_reset()
00218   {
00219     allStreamsFileCount_.reset();
00220     allStreamsVolume_.reset();
00221     allStreamsBandwidth_.reset();
00222     
00223     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00224     streamRecords_.clear();
00225   }
00226   
00227   
00228   void StreamsMonitorCollection::do_updateInfoSpaceItems()
00229   {
00230     MonitoredQuantity::Stats allStreamsVolumeStats;
00231     allStreamsVolume_.getStats(allStreamsVolumeStats);
00232     
00233     storedEvents_ = static_cast<xdata::UnsignedInteger32>(
00234       allStreamsVolumeStats.getSampleCount()
00235     );
00236     storedVolume_ = static_cast<xdata::Double>(
00237       allStreamsVolumeStats.getValueSum()
00238     );
00239     bandwidthToDisk_ = static_cast<xdata::Double>(
00240       allStreamsVolumeStats.getValueRate(MonitoredQuantity::RECENT)
00241     );
00242     
00243     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00244     
00245     streamNames_.clear();
00246     eventsPerStream_.clear();
00247     ratePerStream_.clear();
00248     bandwidthPerStream_.clear();
00249     
00250     streamNames_.reserve(streamRecords_.size());
00251     eventsPerStream_.reserve(streamRecords_.size());
00252     ratePerStream_.reserve(streamRecords_.size());
00253     bandwidthPerStream_.reserve(streamRecords_.size());
00254     
00255     for (
00256       StreamRecordList::const_iterator
00257         it = streamRecords_.begin(), itEnd = streamRecords_.end();
00258       it != itEnd;
00259       ++it
00260     )
00261     {
00262       MonitoredQuantity::Stats streamVolumeStats;
00263       (*it)->volume.getStats(streamVolumeStats);
00264       MonitoredQuantity::Stats streamBandwidthStats;
00265       (*it)->bandwidth.getStats(streamBandwidthStats);
00266       
00267       streamNames_.push_back(
00268         static_cast<xdata::String>( (*it)->streamName )
00269       );
00270       
00271       eventsPerStream_.push_back(
00272         static_cast<xdata::UnsignedInteger32>(
00273           streamVolumeStats.getSampleCount(MonitoredQuantity::FULL)
00274         )
00275       );
00276       
00277       ratePerStream_.push_back(
00278         static_cast<xdata::Double>(
00279           streamVolumeStats.getSampleRate(MonitoredQuantity::RECENT)
00280         )
00281       );
00282       
00283       bandwidthPerStream_.push_back(
00284         static_cast<xdata::Double>(
00285           streamBandwidthStats.getValueRate(MonitoredQuantity::RECENT)
00286         )
00287       );
00288     }
00289   }
00290   
00291 } // namespace stor
00292