CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_3/src/EventFilter/StorageManager/interface/StreamsMonitorCollection.h

Go to the documentation of this file.
00001 // $Id: StreamsMonitorCollection.h,v 1.17 2011/11/17 17:35:41 mommsen Exp $
00003 
00004 #ifndef EventFilter_StorageManager_StreamsMonitorCollection_h
00005 #define EventFilter_StorageManager_StreamsMonitorCollection_h
00006 
00007 #include <sstream>
00008 #include <iomanip>
00009 #include <vector>
00010 #include <set>
00011 
00012 #include <boost/thread/mutex.hpp>
00013 #include <boost/shared_ptr.hpp>
00014 
00015 #include "xdata/Double.h"
00016 #include "xdata/String.h"
00017 #include "xdata/UnsignedInteger32.h"
00018 #include "xdata/Vector.h"
00019 
00020 #include "EventFilter/StorageManager/interface/DbFileHandler.h"
00021 #include "EventFilter/StorageManager/interface/MonitorCollection.h"
00022 #include "EventFilter/StorageManager/interface/Utils.h"
00023 
00024 
00025 namespace stor {
00026 
00035   class StreamsMonitorCollection : public MonitorCollection
00036   {
00037   public:
00038 
00039     struct StreamRecord
00040     {
00041       StreamRecord
00042       (
00043         StreamsMonitorCollection* coll,
00044         const utils::Duration_t& updateInterval,
00045         const utils::Duration_t& timeWindowForRecentResults
00046       ) :
00047       streamName(""),
00048       outputModuleLabel(""),
00049       fractionToDisk(1),
00050       fileCount(updateInterval,timeWindowForRecentResults),
00051       volume(updateInterval,timeWindowForRecentResults),
00052       bandwidth(updateInterval,timeWindowForRecentResults),
00053       parentCollection(coll) {}
00054 
00055       ~StreamRecord()
00056       { fileCountPerLS.clear(); }
00057 
00058       void incrementFileCount(const uint32_t lumiSection);
00059       void addSizeInBytes(double);
00060       bool reportLumiSectionInfo
00061       (
00062         const uint32_t& lumiSection,
00063         std::string& str
00064       );
00065       
00066       std::string streamName;       // name of the stream
00067       std::string outputModuleLabel;// label of the associated output module
00068       double fractionToDisk;        // fraction of events written to disk
00069       MonitoredQuantity fileCount;  // number of files written for this stream
00070       MonitoredQuantity volume;     // data in MBytes stored in this stream
00071       MonitoredQuantity bandwidth;  // bandwidth in MBytes for this stream
00072 
00073       StreamsMonitorCollection* parentCollection;
00074 
00075       typedef std::map<uint32_t, unsigned int> FileCountPerLumiSectionMap;
00076       FileCountPerLumiSectionMap fileCountPerLS;
00077     };
00078 
00079     // We do not know how many streams there will be.
00080     // Thus, we need a vector of them.
00081     typedef boost::shared_ptr<StreamRecord> StreamRecordPtr;
00082     typedef std::vector<StreamRecordPtr> StreamRecordList;
00083 
00084 
00085     struct EndOfRunReport
00086     {
00087       EndOfRunReport() { reset(); }
00088 
00089       void reset()
00090       { latestLumiSectionWritten = eolsCount = lsCountWithFiles = 0; }
00091 
00092       void updateLatestWrittenLumiSection(uint32_t ls)
00093       {
00094         if (ls > latestLumiSectionWritten) latestLumiSectionWritten = ls;
00095       }
00096 
00097       uint32_t latestLumiSectionWritten;
00098       unsigned int eolsCount;
00099       unsigned int lsCountWithFiles;
00100     };
00101     typedef boost::shared_ptr<EndOfRunReport> EndOfRunReportPtr;
00102 
00103 
00104     explicit StreamsMonitorCollection(const utils::Duration_t& updateInterval);
00105 
00106     StreamRecordPtr getNewStreamRecord();
00107 
00108     void getStreamRecords(StreamRecordList&) const;
00109 
00110     bool getStreamRecordsForOutputModuleLabel(const std::string&, StreamRecordList&) const;
00111 
00112     bool streamRecordsExist() const;
00113 
00114     const MonitoredQuantity& getAllStreamsFileCountMQ() const {
00115       return allStreamsFileCount_;
00116     }
00117     MonitoredQuantity& getAllStreamsFileCountMQ() {
00118       return allStreamsFileCount_;
00119     }
00120 
00121     const MonitoredQuantity& getAllStreamsVolumeMQ() const {
00122       return allStreamsVolume_;
00123     }
00124     MonitoredQuantity& getAllStreamsVolumeMQ() {
00125       return allStreamsVolume_;
00126     }
00127 
00128     const MonitoredQuantity& getAllStreamsBandwidthMQ() const {
00129       return allStreamsBandwidth_;
00130     }
00131     MonitoredQuantity& getAllStreamsBandwidthMQ() {
00132       return allStreamsBandwidth_;
00133     }
00134 
00135     void reportAllLumiSectionInfos(DbFileHandlerPtr, EndOfRunReportPtr);
00136 
00137 
00138   private:
00139 
00140     //Prevent copying of the StreamsMonitorCollection
00141     StreamsMonitorCollection(StreamsMonitorCollection const&);
00142     StreamsMonitorCollection& operator=(StreamsMonitorCollection const&);
00143 
00144     typedef std::set<uint32_t> UnreportedLS;
00145     void getListOfAllUnreportedLS(UnreportedLS&);
00146 
00147     virtual void do_calculateStatistics();
00148     virtual void do_reset();
00149     virtual void do_appendInfoSpaceItems(InfoSpaceItems&);
00150     virtual void do_updateInfoSpaceItems();
00151 
00152     StreamRecordList streamRecords_;
00153     mutable boost::mutex streamRecordsMutex_;
00154 
00155     const utils::Duration_t updateInterval_;
00156     const utils::Duration_t timeWindowForRecentResults_;
00157 
00158     MonitoredQuantity allStreamsFileCount_;
00159     MonitoredQuantity allStreamsVolume_;
00160     MonitoredQuantity allStreamsBandwidth_;
00161 
00162     xdata::UnsignedInteger32 storedEvents_;   // number of events stored in all streams
00163     xdata::Double storedVolume_;              // total volume in MB stored on disk
00164     xdata::Double bandwidthToDisk_;           // recent bandwidth in MB/s written to disk
00165     xdata::Vector<xdata::String> streamNames_; // names of all streams written
00166     xdata::Vector<xdata::UnsignedInteger32> eventsPerStream_; // total number of events stored per stream
00167     xdata::Vector<xdata::Double> ratePerStream_; // recent event rate (Hz) per stream
00168     xdata::Vector<xdata::Double> bandwidthPerStream_; // recent bandwidth (MB/s) per stream
00169   };
00170   
00171 } // namespace stor
00172 
00173 #endif // EventFilter_StorageManager_StreamsMonitorCollection_h 
00174 
00175