CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
ThroughputMonitorCollection.h
Go to the documentation of this file.
1 // $Id: ThroughputMonitorCollection.h,v 1.19 2011/03/07 15:31:32 mommsen Exp $
3 
4 #ifndef EventFilter_StorageManager_ThroughputMonitorCollection_h
5 #define EventFilter_StorageManager_ThroughputMonitorCollection_h
6 
7 #include <boost/shared_ptr.hpp>
8 #include <boost/thread/mutex.hpp>
9 
10 #include "toolbox/mem/Pool.h"
11 #include "xdata/Double.h"
12 #include "xdata/UnsignedInteger32.h"
13 
19 
20 namespace stor {
21 
32  {
33  public:
34 
36  (
37  const utils::Duration_t& updateInterval,
38  const unsigned int& throuphputAveragingCycles
39  );
40 
41  int getBinCount() const {return binCount_;}
42 
47  void setMemoryPoolPointer(toolbox::mem::Pool*);
48 
49  void setFragmentQueue(FragmentQueuePtr fragmentQueue) {
50  fragmentQueue_ = fragmentQueue;
51  }
52 
54  return poolUsageMQ_;
55  }
57  return poolUsageMQ_;
58  }
59 
62  }
65  }
66 
69  }
72  }
73 
74  void addPoppedFragmentSample(double dataSize);
75 
77  return poppedFragmentSizeMQ_;
78  }
80  return poppedFragmentSizeMQ_;
81  }
82 
84 
87  }
90  }
91 
94  }
97  }
98 
101  }
104  }
105 
106  void setStreamQueue(StreamQueuePtr streamQueue) {
107  streamQueue_ = streamQueue;
108  }
109 
112  }
115  }
116 
119  }
122  }
123 
124  void addPoppedEventSample(double dataSize);
125 
127  return poppedEventSizeMQ_;
128  }
130  return poppedEventSizeMQ_;
131  }
132 
134 
136  return diskWriterIdleTimeMQ_;
137  }
139  return diskWriterIdleTimeMQ_;
140  }
141 
142  void addDiskWriteSample(double dataSize);
143 
145  return diskWriteSizeMQ_;
146  }
148  return diskWriteSizeMQ_;
149  }
150 
151  void setDQMEventQueue(DQMEventQueuePtr dqmEventQueue) {
152  dqmEventQueue_ = dqmEventQueue;
153  }
154 
157  }
160  }
161 
164  }
167  }
168 
169  void addPoppedDQMEventSample(double dataSize);
170 
172  return poppedDQMEventSizeMQ_;
173  }
175  return poppedDQMEventSizeMQ_;
176  }
177 
179 
182  }
185  }
186 
190  inline void setFragmentStoreSize(unsigned int size) {
192  }
193 
197  inline void setFragmentStoreMemoryUsed(size_t memoryUsed) {
198  currentFragmentStoreMemoryUsedMB_ = static_cast<double>(memoryUsed) / (1024*1024);
199  }
200 
201  struct Stats
202  {
203 
204  struct Snapshot
205  {
208  double poolUsage; //bytes
211  double fragmentQueueRate; //Hz
212  double fragmentQueueBandwidth; //MB/s
217  double streamQueueRate; //Hz
218  double streamQueueBandwidth; //MB/s
219  double writtenEventsRate; //Hz
220  double writtenEventsBandwidth; //MB/s
222  double memoryUsedInDQMQueue; //MB
223  double dqmQueueRate; //Hz
224  double dqmQueueBandwidth; //MB/s
225 
227  double diskWriterBusy; //%
229 
230  Snapshot();
231  Snapshot operator=(const Snapshot&);
232  Snapshot operator+=(const Snapshot&);
233  Snapshot operator/=(const double&);
234 
235  };
236 
237  typedef std::vector<Snapshot> Snapshots;
238  Snapshots snapshots; // time sorted with newest entry first
240 
241  void reset();
242  };
243 
247  void getStats(Stats&) const;
248 
252  void getStats(Stats&, const unsigned int sampleCount) const;
253 
254 
255  private:
256 
257  //Prevent copying of the ThroughputMonitorCollection
260 
261  virtual void do_calculateStatistics();
262  virtual void do_reset();
264  virtual void do_updateInfoSpaceItems();
265 
266  void do_getStats(Stats&, const unsigned int sampleCount) const;
267 
276 
278  (
279  std::vector<double>& idleTimes,
280  std::vector<utils::Duration_t>& durations,
281  int firstIndex, int lastIndex
282  ) const;
283 
285  (
287  const int& idx,
288  double& rate,
289  double& bandwidth
290  ) const;
291 
292  double calcBusyPercentage(
294  const int& idx
295  ) const;
296 
297  void calcPoolUsage();
298 
299  const unsigned int binCount_;
301 
309 
315 
320 
324 
328 
329  toolbox::mem::Pool* pool_;
330 
331  xdata::UnsignedInteger32 poolUsage_; //I2O message pool usage in bytes
332  xdata::UnsignedInteger32 entriesInFragmentQueue_; //Instantaneous number of fragments in fragment queue
333  xdata::Double memoryUsedInFragmentQueue_; //Instantaneous memory usage of events in fragment queue (MB)
334  xdata::Double fragmentQueueRate_; //Rate of fragments popped from fragment queue
335  xdata::Double fragmentQueueBandwidth_; //Bandwidth of fragments popped from fragment queue (MB/s)
336  xdata::UnsignedInteger32 fragmentStoreSize_; //Instantaneous number of fragments in fragment store
337  xdata::Double fragmentStoreMemoryUsed_; //Instantaneous memory usage of events in fragment store (MB)
338  xdata::UnsignedInteger32 entriesInStreamQueue_; //Instantaneous number of events in stream queue
339  xdata::Double memoryUsedInStreamQueue_; //Instantaneous memory usage of events in stream queue (MB)
340  xdata::Double streamQueueRate_; //Rate of events popped from fragment queue
341  xdata::Double streamQueueBandwidth_; //Bandwidth of events popped from fragment queue (MB/s)
342  xdata::Double writtenEventsRate_; //Rate of (non-unique) events written to disk
343  xdata::Double writtenEventsBandwidth_; //Bandwidth of (non-unique) events written to disk
344  xdata::UnsignedInteger32 entriesInDQMQueue_; //Instantaneous number of events in dqm event queue
345  xdata::Double memoryUsedInDQMQueue_; //Instantaneous memory usage of events in dqm event queue (MB)
346  xdata::Double dqmQueueRate_; //Rate of events popped from dqm event queue
347  xdata::Double dqmQueueBandwidth_; //Bandwidth of events popped from dqm event queue (MB/s)
348 
349  xdata::Double fragmentProcessorBusy_; //Fragment processor busy percentage
350  xdata::Double diskWriterBusy_; //Disk writer busy percentage
351  xdata::Double dqmEventProcessorBusy_; //DQM event processor busy percentage
352  xdata::Double averagingTime_; //Time in s over which above values are averaged
353  };
354 
355 } // namespace stor
356 
357 #endif // EventFilter_StorageManager_ThroughputMonitorCollection_h
358 
359 
const MonitoredQuantity & getFragmentStoreEntryCountMQ() const
const MonitoredQuantity & getFragmentProcessorIdleMQ() const
ThroughputMonitorCollection(const utils::Duration_t &updateInterval, const unsigned int &throuphputAveragingCycles)
const MonitoredQuantity & getPoppedDQMEventSizeMQ() const
static boost::mutex mutex
Definition: LHEProxy.cc:11
const MonitoredQuantity & getDiskWriteMQ() const
const MonitoredQuantity & getDQMEventProcessorIdleMQ() const
const MonitoredQuantity & getFragmentQueueMemoryUsedMQ() const
const MonitoredQuantity & getDQMEventQueueEntryCountMQ() const
const MonitoredQuantity & getPoppedFragmentSizeMQ() const
const MonitoredQuantity & getFragmentStoreMemoryUsedMQ() const
void setFragmentQueue(FragmentQueuePtr fragmentQueue)
boost::shared_ptr< DQMEventQueue > DQMEventQueuePtr
Definition: DQMEventQueue.h:23
int smoothIdleTimesHelper(std::vector< double > &idleTimes, std::vector< utils::Duration_t > &durations, int firstIndex, int lastIndex) const
virtual void do_appendInfoSpaceItems(InfoSpaceItems &)
boost::shared_ptr< FragmentQueue > FragmentQueuePtr
Definition: FragmentQueue.h:22
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
const MonitoredQuantity & getPoppedEventSizeMQ() const
double calcBusyPercentage(MonitoredQuantity::Stats &, const int &idx) const
void do_getStats(Stats &, const unsigned int sampleCount) const
void setMemoryPoolPointer(toolbox::mem::Pool *)
boost::shared_ptr< StreamQueue > StreamQueuePtr
Definition: StreamQueue.h:22
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
const MonitoredQuantity & getStreamQueueEntryCountMQ() const
void getRateAndBandwidth(MonitoredQuantity::Stats &stats, const int &idx, double &rate, double &bandwidth) const
const MonitoredQuantity & getFragmentQueueEntryCountMQ() const
void addDiskWriterIdleSample(utils::Duration_t idleTime)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
std::vector< std::pair< std::string, xdata::Serializable * > > InfoSpaceItems
ThroughputMonitorCollection & operator=(ThroughputMonitorCollection const &)
const MonitoredQuantity & getDQMEventQueueMemoryUsedMQ() const
double rate(double x)
Definition: Constants.cc:3
void smoothIdleTimes(MonitoredQuantity::Stats &) const
void setDQMEventQueue(DQMEventQueuePtr dqmEventQueue)
const MonitoredQuantity & getDiskWriterIdleMQ() const
void setStreamQueue(StreamQueuePtr streamQueue)
void addFragmentProcessorIdleSample(utils::Duration_t idleTime)
const MonitoredQuantity & getPoolUsageMQ() const
const MonitoredQuantity & getStreamQueueMemoryUsedMQ() const
void addDQMEventProcessorIdleSample(utils::Duration_t idleTime)
tuple size
Write out results.