CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamsMonitorCollection.cc
Go to the documentation of this file.
1 // $Id: StreamsMonitorCollection.cc,v 1.16 2011/04/14 12:52:48 mommsen Exp $
3 
4 #include <string>
5 #include <sstream>
6 #include <iomanip>
7 
10 
11 
12 namespace stor {
13 
15  (
16  const utils::Duration_t& updateInterval
17  ) :
18  MonitorCollection(updateInterval),
19  updateInterval_(updateInterval),
20  timeWindowForRecentResults_(boost::posix_time::seconds(30)),
21  allStreamsFileCount_(updateInterval, timeWindowForRecentResults_),
22  allStreamsVolume_(updateInterval, timeWindowForRecentResults_),
23  allStreamsBandwidth_(updateInterval, timeWindowForRecentResults_)
24  {}
25 
26 
29  {
30  boost::mutex::scoped_lock sl(streamRecordsMutex_);
31 
32  StreamRecordPtr streamRecord(
34  );
35  streamRecords_.push_back(streamRecord);
36  return streamRecord;
37  }
38 
39 
41  {
42  boost::mutex::scoped_lock sl(streamRecordsMutex_);
43 
44  list.clear();
45  list.reserve(streamRecords_.size());
46 
47  for (
48  StreamRecordList::const_iterator
49  it = streamRecords_.begin(), itEnd = streamRecords_.end();
50  it != itEnd;
51  ++it
52  )
53  {
54  list.push_back(*it);
55  }
56  }
57 
58 
60  {
61  boost::mutex::scoped_lock sl(streamRecordsMutex_);
62 
63  return ( ! streamRecords_.empty() );
64  }
65 
66 
68  (
69  const uint32_t lumiSection
70  )
71  {
72  fileCount.addSample(1);
73  parentCollection->allStreamsFileCount_.addSample(1);
74  ++fileCountPerLS[lumiSection];
75  }
76 
77 
79  {
80  size = size / (1024 * 1024);
81  volume.addSample(size);
83  }
84 
85 
87  (
88  const uint32_t& lumiSection,
89  std::string& str
90  )
91  {
92  std::ostringstream msg;
93  if (str.empty())
94  {
95  msg << "LS:" << lumiSection;
96  }
97 
98  unsigned int count = 0;
99  FileCountPerLumiSectionMap::iterator pos = fileCountPerLS.find(lumiSection);
100  if ( pos != fileCountPerLS.end() )
101  {
102  count = pos->second;
103  fileCountPerLS.erase(pos);
104  }
105  msg << "\t" << streamName << ":" << count;
106  str += msg.str();
107  }
108 
109 
111  {
112  boost::mutex::scoped_lock sl(streamRecordsMutex_);
113 
114  UnreportedLS unreportedLS;
115  getListOfAllUnreportedLS(unreportedLS);
116 
117  for (UnreportedLS::const_iterator it = unreportedLS.begin(),
118  itEnd = unreportedLS.end(); it != itEnd; ++it)
119  {
120  std::string lsEntry;
121  for (StreamRecordList::const_iterator
122  stream = streamRecords_.begin(),
123  streamEnd = streamRecords_.end();
124  stream != streamEnd;
125  ++stream)
126  {
127  (*stream)->reportLumiSectionInfo((*it), lsEntry);
128  }
129  dbFileHandler->write(lsEntry);
130  }
131  }
132 
133 
135  {
136  // Have to loop over all streams as not every stream
137  // might have got an event for a given lumi section
138  for (StreamRecordList::const_iterator
139  stream = streamRecords_.begin(),
140  streamEnd = streamRecords_.end();
141  stream != streamEnd;
142  ++stream)
143  {
144  for (StreamRecord::FileCountPerLumiSectionMap::const_iterator
145  lscount = (*stream)->fileCountPerLS.begin(),
146  lscountEnd = (*stream)->fileCountPerLS.end();
147  lscount != lscountEnd; ++lscount)
148  {
149  unreportedLS.insert(lscount->first);
150  }
151  }
152  }
153 
154 
156  {
158 
162  bool samplingHasStarted = (stats.getSampleCount() > 0);
163  if (samplingHasStarted) {
165  }
167 
168 
169  boost::mutex::scoped_lock sl(streamRecordsMutex_);
170 
171  for (
172  StreamRecordList::const_iterator
173  it = streamRecords_.begin(), itEnd = streamRecords_.end();
174  it != itEnd;
175  ++it
176  )
177  {
178  (*it)->fileCount.calculateStatistics();
179  (*it)->volume.calculateStatistics();
180  (*it)->volume.getStats(stats);
181  if (samplingHasStarted) {
182  (*it)->bandwidth.addSample(stats.getLastValueRate());
183  }
184  (*it)->bandwidth.calculateStatistics();
185  }
186  }
187 
188 
190  (
191  InfoSpaceItems& infoSpaceItems
192  )
193  {
194  infoSpaceItems.push_back(std::make_pair("storedEvents", &storedEvents_));
195  infoSpaceItems.push_back(std::make_pair("storedVolume", &storedVolume_));
196  infoSpaceItems.push_back(std::make_pair("bandwidthToDisk", &bandwidthToDisk_));
197  infoSpaceItems.push_back(std::make_pair("streamNames", &streamNames_));
198  infoSpaceItems.push_back(std::make_pair("eventsPerStream", &eventsPerStream_));
199  infoSpaceItems.push_back(std::make_pair("ratePerStream", &ratePerStream_));
200  infoSpaceItems.push_back(std::make_pair("bandwidthPerStream", &bandwidthPerStream_));
201  }
202 
203 
205  {
209 
210  boost::mutex::scoped_lock sl(streamRecordsMutex_);
211  streamRecords_.clear();
212  }
213 
214 
216  {
217  MonitoredQuantity::Stats allStreamsVolumeStats;
218  allStreamsVolume_.getStats(allStreamsVolumeStats);
219 
220  storedEvents_ = static_cast<xdata::UnsignedInteger32>(
221  allStreamsVolumeStats.getSampleCount()
222  );
223  storedVolume_ = static_cast<xdata::Double>(
224  allStreamsVolumeStats.getValueSum()
225  );
226  bandwidthToDisk_ = static_cast<xdata::Double>(
227  allStreamsVolumeStats.getValueRate(MonitoredQuantity::RECENT)
228  );
229 
230  boost::mutex::scoped_lock sl(streamRecordsMutex_);
231 
232  const size_t statsCount = streamRecords_.size();
233  const size_t infospaceCount = streamNames_.size();
234 
235  if ( statsCount != infospaceCount )
236  {
237  streamNames_.resize(statsCount);
238  eventsPerStream_.resize(statsCount);
239  ratePerStream_.resize(statsCount);
240  bandwidthPerStream_.resize(statsCount);
241  }
242 
243  for (size_t i=0; i < statsCount; ++i)
244  {
245  MonitoredQuantity::Stats streamVolumeStats;
246  streamRecords_.at(i)->volume.getStats(streamVolumeStats);
247  MonitoredQuantity::Stats streamBandwidthStats;
248  streamRecords_.at(i)->bandwidth.getStats(streamBandwidthStats);
249 
250  streamNames_.at(i) = static_cast<xdata::String>(streamRecords_.at(i)->streamName);
251  eventsPerStream_.at(i) = static_cast<xdata::UnsignedInteger32>(
252  streamVolumeStats.getSampleCount(MonitoredQuantity::FULL)
253  );
254  ratePerStream_.at(i) = static_cast<xdata::Double>(
255  streamVolumeStats.getSampleRate(MonitoredQuantity::RECENT)
256  );
257  bandwidthPerStream_.at(i) = static_cast<xdata::Double>(
258  streamBandwidthStats.getValueRate(MonitoredQuantity::RECENT)
259  );
260  }
261  }
262 
263 } // namespace stor
264 
double getValueRate(DataSetType t=FULL) const
int i
Definition: DBlmapReader.cc:9
double seconds()
void addSample(const double &value=1)
uint64_t getSampleCount(DataSetType t=FULL) const
const utils::Duration_t timeWindowForRecentResults_
xdata::Vector< xdata::UnsignedInteger32 > eventsPerStream_
void getStats(Stats &stats) const
xdata::Vector< xdata::String > streamNames_
void reportLumiSectionInfo(const uint32_t &lumiSection, std::string &str)
void calculateStatistics(const utils::TimePoint_t &currentTime=utils::getCurrentTime())
std::vector< StreamRecordPtr > StreamRecordList
xdata::UnsignedInteger32 storedEvents_
void reportAllLumiSectionInfos(DbFileHandlerPtr)
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
boost::shared_ptr< DbFileHandler > DbFileHandlerPtr
Definition: DbFileHandler.h:65
boost::shared_ptr< StreamRecord > StreamRecordPtr
double getSampleRate(DataSetType t=FULL) const
void incrementFileCount(const uint32_t lumiSection)
virtual void do_appendInfoSpaceItems(InfoSpaceItems &)
xdata::Vector< xdata::Double > ratePerStream_
std::vector< std::pair< std::string, xdata::Serializable * > > InfoSpaceItems
void getStreamRecords(StreamRecordList &) const
StreamsMonitorCollection(const utils::Duration_t &updateInterval)
double getValueSum(DataSetType t=FULL) const
tuple size
Write out results.
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
xdata::Vector< xdata::Double > bandwidthPerStream_