CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_14/src/EventFilter/StorageManager/src/StreamsMonitorCollection.cc

Go to the documentation of this file.
00001 // $Id: StreamsMonitorCollection.cc,v 1.22 2011/11/17 17:35:40 mommsen Exp $
00003 
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007 
00008 #include "EventFilter/StorageManager/interface/Exception.h"
00009 #include "EventFilter/StorageManager/interface/StreamsMonitorCollection.h"
00010 
00011 
00012 namespace stor {
00013   
00014   StreamsMonitorCollection::StreamsMonitorCollection
00015   (
00016     const utils::Duration_t& updateInterval
00017   ) :
00018   MonitorCollection(updateInterval),
00019   updateInterval_(updateInterval),
00020   timeWindowForRecentResults_(boost::posix_time::seconds(10)),
00021   allStreamsFileCount_(updateInterval, timeWindowForRecentResults_),
00022   allStreamsVolume_(updateInterval, timeWindowForRecentResults_),
00023   allStreamsBandwidth_(updateInterval, timeWindowForRecentResults_)
00024   {}
00025   
00026   
00027   StreamsMonitorCollection::StreamRecordPtr
00028   StreamsMonitorCollection::getNewStreamRecord()
00029   {
00030     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00031     
00032     StreamRecordPtr streamRecord(
00033       new StreamRecord(this,updateInterval_,timeWindowForRecentResults_)
00034     );
00035     streamRecords_.push_back(streamRecord);
00036     return streamRecord;
00037   }
00038   
00039   
00040   void StreamsMonitorCollection::getStreamRecords(StreamRecordList& list) const
00041   {
00042     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00043 
00044     list.clear();
00045     list.reserve(streamRecords_.size());
00046     
00047     for (
00048       StreamRecordList::const_iterator 
00049         it = streamRecords_.begin(), itEnd = streamRecords_.end();
00050       it != itEnd;
00051       ++it
00052     )
00053     {
00054       list.push_back(*it);
00055     }
00056   }
00057   
00058   
00059   bool StreamsMonitorCollection::getStreamRecordsForOutputModuleLabel
00060   (
00061     const std::string& label,
00062     StreamRecordList& list
00063   ) const
00064   {
00065     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00066 
00067     list.clear();
00068     list.reserve(streamRecords_.size());
00069     
00070     for (
00071       StreamRecordList::const_iterator 
00072         it = streamRecords_.begin(), itEnd = streamRecords_.end();
00073       it != itEnd;
00074       ++it
00075     )
00076     {
00077       if ( (*it)->outputModuleLabel == label )
00078         list.push_back(*it);
00079     }
00080     return ( ! list.empty() );
00081   }
00082   
00083 
00084   bool StreamsMonitorCollection::streamRecordsExist() const
00085   {
00086     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00087 
00088     return ( ! streamRecords_.empty() );
00089   }
00090 
00091   
00092   void StreamsMonitorCollection::StreamRecord::incrementFileCount
00093   (
00094     const uint32_t lumiSection
00095   )
00096   {
00097     fileCount.addSample(1);
00098     parentCollection->allStreamsFileCount_.addSample(1);
00099     ++fileCountPerLS[lumiSection];
00100   }
00101   
00102   
00103   void StreamsMonitorCollection::StreamRecord::addSizeInBytes(double size)
00104   {
00105     size = size / (1024 * 1024);
00106     volume.addSample(size);
00107     parentCollection->allStreamsVolume_.addSample(size);
00108   }
00109   
00110   
00111   bool StreamsMonitorCollection::StreamRecord::reportLumiSectionInfo
00112   (
00113     const uint32_t& lumiSection,
00114     std::string& str
00115   )
00116   {
00117     std::ostringstream msg;
00118     if (str.empty())
00119     {
00120       msg << "LS:" << lumiSection;
00121     }
00122     
00123     unsigned int count = 0;
00124     FileCountPerLumiSectionMap::iterator pos = fileCountPerLS.find(lumiSection);
00125     if ( pos != fileCountPerLS.end() )
00126     {
00127       count = pos->second;
00128       fileCountPerLS.erase(pos);
00129     }
00130     msg << "\t" << streamName << ":" << count;
00131     str += msg.str();
00132 
00133     return (count>0);
00134   }
00135   
00136   
00137   void StreamsMonitorCollection::reportAllLumiSectionInfos
00138   (
00139     DbFileHandlerPtr dbFileHandler,
00140     EndOfRunReportPtr endOfRunReport
00141   )
00142   {
00143     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00144 
00145     UnreportedLS unreportedLS;
00146     getListOfAllUnreportedLS(unreportedLS);
00147     
00148     for (UnreportedLS::const_iterator it = unreportedLS.begin(),
00149            itEnd = unreportedLS.end(); it != itEnd; ++it)
00150     {
00151       std::string lsEntry;
00152       bool filesWritten = false;
00153 
00154       for (StreamRecordList::const_iterator 
00155              stream = streamRecords_.begin(),
00156              streamEnd = streamRecords_.end();
00157            stream != streamEnd;
00158            ++stream)
00159       {
00160         if ( (*stream)->reportLumiSectionInfo((*it), lsEntry) )
00161           filesWritten = true;
00162       }
00163       lsEntry += "\tEoLS:0";
00164       dbFileHandler->write(lsEntry);
00165 
00166       if (filesWritten) ++(endOfRunReport->lsCountWithFiles);
00167       endOfRunReport->updateLatestWrittenLumiSection(*it);
00168     }
00169   }
00170   
00171   
00172   void StreamsMonitorCollection::getListOfAllUnreportedLS(UnreportedLS& unreportedLS)
00173   {
00174     // Have to loop over all streams as not every stream
00175     // might have got an event for a given lumi section
00176     for (StreamRecordList::const_iterator 
00177            stream = streamRecords_.begin(),
00178            streamEnd = streamRecords_.end();
00179          stream != streamEnd;
00180          ++stream)
00181     {
00182       for (StreamRecord::FileCountPerLumiSectionMap::const_iterator
00183              lscount = (*stream)->fileCountPerLS.begin(),
00184              lscountEnd = (*stream)->fileCountPerLS.end();
00185            lscount != lscountEnd; ++lscount)
00186       {
00187         unreportedLS.insert(lscount->first);
00188       }
00189     }
00190   }
00191   
00192   
00193   void StreamsMonitorCollection::do_calculateStatistics()
00194   {
00195     MonitoredQuantity::Stats stats;
00196     
00197     allStreamsFileCount_.calculateStatistics();
00198     allStreamsVolume_.calculateStatistics();
00199     allStreamsVolume_.getStats(stats);
00200     bool samplingHasStarted = (stats.getSampleCount() > 0);
00201     if (samplingHasStarted) {
00202       allStreamsBandwidth_.addSample(stats.getLastValueRate());
00203     }
00204     allStreamsBandwidth_.calculateStatistics();
00205     
00206     
00207     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00208     
00209     for (
00210       StreamRecordList::const_iterator 
00211         it = streamRecords_.begin(), itEnd = streamRecords_.end();
00212       it != itEnd;
00213       ++it
00214     ) 
00215     {
00216       (*it)->fileCount.calculateStatistics();
00217       (*it)->volume.calculateStatistics();
00218       (*it)->volume.getStats(stats);
00219       if (samplingHasStarted) {
00220         (*it)->bandwidth.addSample(stats.getLastValueRate());
00221       }
00222       (*it)->bandwidth.calculateStatistics();
00223     }
00224   }
00225   
00226   
00227   void StreamsMonitorCollection::do_appendInfoSpaceItems
00228   (
00229     InfoSpaceItems& infoSpaceItems
00230   )
00231   {
00232     infoSpaceItems.push_back(std::make_pair("storedEvents",  &storedEvents_));
00233     infoSpaceItems.push_back(std::make_pair("storedVolume",  &storedVolume_));
00234     infoSpaceItems.push_back(std::make_pair("bandwidthToDisk",  &bandwidthToDisk_));
00235     infoSpaceItems.push_back(std::make_pair("streamNames",  &streamNames_));
00236     infoSpaceItems.push_back(std::make_pair("eventsPerStream",  &eventsPerStream_));
00237     infoSpaceItems.push_back(std::make_pair("ratePerStream",  &ratePerStream_));
00238     infoSpaceItems.push_back(std::make_pair("bandwidthPerStream",  &bandwidthPerStream_));
00239   }
00240   
00241   
00242   void StreamsMonitorCollection::do_reset()
00243   {
00244     allStreamsFileCount_.reset();
00245     allStreamsVolume_.reset();
00246     allStreamsBandwidth_.reset();
00247     
00248     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00249     streamRecords_.clear();
00250   }
00251   
00252   
00253   void StreamsMonitorCollection::do_updateInfoSpaceItems()
00254   {
00255     MonitoredQuantity::Stats allStreamsVolumeStats;
00256     allStreamsVolume_.getStats(allStreamsVolumeStats);
00257     
00258     storedEvents_ = static_cast<xdata::UnsignedInteger32>(
00259       allStreamsVolumeStats.getSampleCount()
00260     );
00261     storedVolume_ = static_cast<xdata::Double>(
00262       allStreamsVolumeStats.getValueSum()
00263     );
00264     bandwidthToDisk_ = static_cast<xdata::Double>(
00265       allStreamsVolumeStats.getValueRate(MonitoredQuantity::RECENT)
00266     );
00267     
00268     boost::mutex::scoped_lock sl(streamRecordsMutex_);
00269     
00270     streamNames_.clear();
00271     eventsPerStream_.clear();
00272     ratePerStream_.clear();
00273     bandwidthPerStream_.clear();
00274     
00275     streamNames_.reserve(streamRecords_.size());
00276     eventsPerStream_.reserve(streamRecords_.size());
00277     ratePerStream_.reserve(streamRecords_.size());
00278     bandwidthPerStream_.reserve(streamRecords_.size());
00279     
00280     for (
00281       StreamRecordList::const_iterator
00282         it = streamRecords_.begin(), itEnd = streamRecords_.end();
00283       it != itEnd;
00284       ++it
00285     )
00286     {
00287       MonitoredQuantity::Stats streamVolumeStats;
00288       (*it)->volume.getStats(streamVolumeStats);
00289       MonitoredQuantity::Stats streamBandwidthStats;
00290       (*it)->bandwidth.getStats(streamBandwidthStats);
00291       
00292       streamNames_.push_back(
00293         static_cast<xdata::String>( (*it)->streamName )
00294       );
00295       
00296       eventsPerStream_.push_back(
00297         static_cast<xdata::UnsignedInteger32>(
00298           streamVolumeStats.getSampleCount(MonitoredQuantity::FULL)
00299         )
00300       );
00301       
00302       ratePerStream_.push_back(
00303         static_cast<xdata::Double>(
00304           streamVolumeStats.getSampleRate(MonitoredQuantity::RECENT)
00305         )
00306       );
00307       
00308       bandwidthPerStream_.push_back(
00309         static_cast<xdata::Double>(
00310           streamBandwidthStats.getValueRate(MonitoredQuantity::RECENT)
00311         )
00312       );
00313     }
00314   }
00315   
00316 } // namespace stor
00317