Go to the documentation of this file.00001
00003
00004 #ifndef EventFilter_StorageManager_StreamsMonitorCollection_h
00005 #define EventFilter_StorageManager_StreamsMonitorCollection_h
00006
00007 #include <sstream>
00008 #include <iomanip>
00009 #include <vector>
00010 #include <set>
00011
00012 #include <boost/thread/mutex.hpp>
00013 #include <boost/shared_ptr.hpp>
00014
00015 #include "xdata/Double.h"
00016 #include "xdata/String.h"
00017 #include "xdata/UnsignedInteger32.h"
00018 #include "xdata/Vector.h"
00019
00020 #include "EventFilter/StorageManager/interface/DbFileHandler.h"
00021 #include "EventFilter/StorageManager/interface/MonitorCollection.h"
00022 #include "EventFilter/StorageManager/interface/Utils.h"
00023
00024
00025 namespace stor {
00026
00035 class StreamsMonitorCollection : public MonitorCollection
00036 {
00037 public:
00038
00039 struct StreamRecord
00040 {
00041 StreamRecord
00042 (
00043 StreamsMonitorCollection* coll,
00044 const utils::Duration_t& updateInterval,
00045 const utils::Duration_t& timeWindowForRecentResults
00046 ) :
00047 streamName(""),
00048 outputModuleLabel(""),
00049 fractionToDisk(1),
00050 fileCount(updateInterval,timeWindowForRecentResults),
00051 volume(updateInterval,timeWindowForRecentResults),
00052 bandwidth(updateInterval,timeWindowForRecentResults),
00053 parentCollection(coll) {}
00054
00055 ~StreamRecord()
00056 { fileCountPerLS.clear(); }
00057
00058 void incrementFileCount(const uint32_t lumiSection);
00059 void addSizeInBytes(double);
00060 bool reportLumiSectionInfo
00061 (
00062 const uint32_t& lumiSection,
00063 std::string& str
00064 );
00065
00066 std::string streamName;
00067 std::string outputModuleLabel;
00068 double fractionToDisk;
00069 MonitoredQuantity fileCount;
00070 MonitoredQuantity volume;
00071 MonitoredQuantity bandwidth;
00072
00073 StreamsMonitorCollection* parentCollection;
00074
00075 typedef std::map<uint32_t, unsigned int> FileCountPerLumiSectionMap;
00076 FileCountPerLumiSectionMap fileCountPerLS;
00077 };
00078
00079
00080
00081 typedef boost::shared_ptr<StreamRecord> StreamRecordPtr;
00082 typedef std::vector<StreamRecordPtr> StreamRecordList;
00083
00084
00085 struct EndOfRunReport
00086 {
00087 EndOfRunReport() { reset(); }
00088
00089 void reset()
00090 { latestLumiSectionWritten = eolsCount = lsCountWithFiles = 0; }
00091
00092 void updateLatestWrittenLumiSection(uint32_t ls)
00093 {
00094 if (ls > latestLumiSectionWritten) latestLumiSectionWritten = ls;
00095 }
00096
00097 uint32_t latestLumiSectionWritten;
00098 unsigned int eolsCount;
00099 unsigned int lsCountWithFiles;
00100 };
00101 typedef boost::shared_ptr<EndOfRunReport> EndOfRunReportPtr;
00102
00103
00104 explicit StreamsMonitorCollection(const utils::Duration_t& updateInterval);
00105
00106 StreamRecordPtr getNewStreamRecord();
00107
00108 void getStreamRecords(StreamRecordList&) const;
00109
00110 bool getStreamRecordsForOutputModuleLabel(const std::string&, StreamRecordList&) const;
00111
00112 bool streamRecordsExist() const;
00113
00114 const MonitoredQuantity& getAllStreamsFileCountMQ() const {
00115 return allStreamsFileCount_;
00116 }
00117 MonitoredQuantity& getAllStreamsFileCountMQ() {
00118 return allStreamsFileCount_;
00119 }
00120
00121 const MonitoredQuantity& getAllStreamsVolumeMQ() const {
00122 return allStreamsVolume_;
00123 }
00124 MonitoredQuantity& getAllStreamsVolumeMQ() {
00125 return allStreamsVolume_;
00126 }
00127
00128 const MonitoredQuantity& getAllStreamsBandwidthMQ() const {
00129 return allStreamsBandwidth_;
00130 }
00131 MonitoredQuantity& getAllStreamsBandwidthMQ() {
00132 return allStreamsBandwidth_;
00133 }
00134
00135 void reportAllLumiSectionInfos(DbFileHandlerPtr, EndOfRunReportPtr);
00136
00137
00138 private:
00139
00140
00141 StreamsMonitorCollection(StreamsMonitorCollection const&);
00142 StreamsMonitorCollection& operator=(StreamsMonitorCollection const&);
00143
00144 typedef std::set<uint32_t> UnreportedLS;
00145 void getListOfAllUnreportedLS(UnreportedLS&);
00146
00147 virtual void do_calculateStatistics();
00148 virtual void do_reset();
00149 virtual void do_appendInfoSpaceItems(InfoSpaceItems&);
00150 virtual void do_updateInfoSpaceItems();
00151
00152 StreamRecordList streamRecords_;
00153 mutable boost::mutex streamRecordsMutex_;
00154
00155 const utils::Duration_t updateInterval_;
00156 const utils::Duration_t timeWindowForRecentResults_;
00157
00158 MonitoredQuantity allStreamsFileCount_;
00159 MonitoredQuantity allStreamsVolume_;
00160 MonitoredQuantity allStreamsBandwidth_;
00161
00162 xdata::UnsignedInteger32 storedEvents_;
00163 xdata::Double storedVolume_;
00164 xdata::Double bandwidthToDisk_;
00165 xdata::Vector<xdata::String> streamNames_;
00166 xdata::Vector<xdata::UnsignedInteger32> eventsPerStream_;
00167 xdata::Vector<xdata::Double> ratePerStream_;
00168 xdata::Vector<xdata::Double> bandwidthPerStream_;
00169 };
00170
00171 }
00172
00173 #endif // EventFilter_StorageManager_StreamsMonitorCollection_h
00174
00175