CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamsMonitorCollection.cc
Go to the documentation of this file.
1 // $Id: StreamsMonitorCollection.cc,v 1.20 2011/06/20 15:55:53 mommsen Exp $
3 
4 #include <string>
5 #include <sstream>
6 #include <iomanip>
7 
10 
11 
12 namespace stor {
13 
15  (
16  const utils::Duration_t& updateInterval
17  ) :
18  MonitorCollection(updateInterval),
19  updateInterval_(updateInterval),
20  timeWindowForRecentResults_(boost::posix_time::seconds(30)),
21  allStreamsFileCount_(updateInterval, timeWindowForRecentResults_),
22  allStreamsVolume_(updateInterval, timeWindowForRecentResults_),
23  allStreamsBandwidth_(updateInterval, timeWindowForRecentResults_)
24  {}
25 
26 
29  {
30  boost::mutex::scoped_lock sl(streamRecordsMutex_);
31 
32  StreamRecordPtr streamRecord(
34  );
35  streamRecords_.push_back(streamRecord);
36  return streamRecord;
37  }
38 
39 
41  {
42  boost::mutex::scoped_lock sl(streamRecordsMutex_);
43 
44  list.clear();
45  list.reserve(streamRecords_.size());
46 
47  for (
48  StreamRecordList::const_iterator
49  it = streamRecords_.begin(), itEnd = streamRecords_.end();
50  it != itEnd;
51  ++it
52  )
53  {
54  list.push_back(*it);
55  }
56  }
57 
58 
60  {
61  boost::mutex::scoped_lock sl(streamRecordsMutex_);
62 
63  return ( ! streamRecords_.empty() );
64  }
65 
66 
68  (
69  const uint32_t lumiSection
70  )
71  {
72  fileCount.addSample(1);
73  parentCollection->allStreamsFileCount_.addSample(1);
74  ++fileCountPerLS[lumiSection];
75  }
76 
77 
79  {
80  size = size / (1024 * 1024);
81  volume.addSample(size);
83  }
84 
85 
87  (
88  const uint32_t& lumiSection,
89  std::string& str
90  )
91  {
92  std::ostringstream msg;
93  if (str.empty())
94  {
95  msg << "LS:" << lumiSection;
96  }
97 
98  unsigned int count = 0;
99  FileCountPerLumiSectionMap::iterator pos = fileCountPerLS.find(lumiSection);
100  if ( pos != fileCountPerLS.end() )
101  {
102  count = pos->second;
103  fileCountPerLS.erase(pos);
104  }
105  msg << "\t" << streamName << ":" << count;
106  str += msg.str();
107 
108  return (count>0);
109  }
110 
111 
113  (
114  DbFileHandlerPtr dbFileHandler,
115  EndOfRunReportPtr endOfRunReport
116  )
117  {
118  boost::mutex::scoped_lock sl(streamRecordsMutex_);
119 
120  UnreportedLS unreportedLS;
121  getListOfAllUnreportedLS(unreportedLS);
122 
123  for (UnreportedLS::const_iterator it = unreportedLS.begin(),
124  itEnd = unreportedLS.end(); it != itEnd; ++it)
125  {
126  std::string lsEntry;
127  bool filesWritten = false;
128 
129  for (StreamRecordList::const_iterator
130  stream = streamRecords_.begin(),
131  streamEnd = streamRecords_.end();
132  stream != streamEnd;
133  ++stream)
134  {
135  if ( (*stream)->reportLumiSectionInfo((*it), lsEntry) )
136  filesWritten = true;
137  }
138  lsEntry += "\tEoLS:0";
139  dbFileHandler->write(lsEntry);
140 
141  if (filesWritten) ++(endOfRunReport->lsCountWithFiles);
142  endOfRunReport->updateLatestWrittenLumiSection(*it);
143  }
144  }
145 
146 
148  {
149  // Have to loop over all streams as not every stream
150  // might have got an event for a given lumi section
151  for (StreamRecordList::const_iterator
152  stream = streamRecords_.begin(),
153  streamEnd = streamRecords_.end();
154  stream != streamEnd;
155  ++stream)
156  {
157  for (StreamRecord::FileCountPerLumiSectionMap::const_iterator
158  lscount = (*stream)->fileCountPerLS.begin(),
159  lscountEnd = (*stream)->fileCountPerLS.end();
160  lscount != lscountEnd; ++lscount)
161  {
162  unreportedLS.insert(lscount->first);
163  }
164  }
165  }
166 
167 
169  {
171 
175  bool samplingHasStarted = (stats.getSampleCount() > 0);
176  if (samplingHasStarted) {
178  }
180 
181 
182  boost::mutex::scoped_lock sl(streamRecordsMutex_);
183 
184  for (
185  StreamRecordList::const_iterator
186  it = streamRecords_.begin(), itEnd = streamRecords_.end();
187  it != itEnd;
188  ++it
189  )
190  {
191  (*it)->fileCount.calculateStatistics();
192  (*it)->volume.calculateStatistics();
193  (*it)->volume.getStats(stats);
194  if (samplingHasStarted) {
195  (*it)->bandwidth.addSample(stats.getLastValueRate());
196  }
197  (*it)->bandwidth.calculateStatistics();
198  }
199  }
200 
201 
203  (
204  InfoSpaceItems& infoSpaceItems
205  )
206  {
207  infoSpaceItems.push_back(std::make_pair("storedEvents", &storedEvents_));
208  infoSpaceItems.push_back(std::make_pair("storedVolume", &storedVolume_));
209  infoSpaceItems.push_back(std::make_pair("bandwidthToDisk", &bandwidthToDisk_));
210  infoSpaceItems.push_back(std::make_pair("streamNames", &streamNames_));
211  infoSpaceItems.push_back(std::make_pair("eventsPerStream", &eventsPerStream_));
212  infoSpaceItems.push_back(std::make_pair("ratePerStream", &ratePerStream_));
213  infoSpaceItems.push_back(std::make_pair("bandwidthPerStream", &bandwidthPerStream_));
214  }
215 
216 
218  {
222 
223  boost::mutex::scoped_lock sl(streamRecordsMutex_);
224  streamRecords_.clear();
225  }
226 
227 
229  {
230  MonitoredQuantity::Stats allStreamsVolumeStats;
231  allStreamsVolume_.getStats(allStreamsVolumeStats);
232 
233  storedEvents_ = static_cast<xdata::UnsignedInteger32>(
234  allStreamsVolumeStats.getSampleCount()
235  );
236  storedVolume_ = static_cast<xdata::Double>(
237  allStreamsVolumeStats.getValueSum()
238  );
239  bandwidthToDisk_ = static_cast<xdata::Double>(
240  allStreamsVolumeStats.getValueRate(MonitoredQuantity::RECENT)
241  );
242 
243  boost::mutex::scoped_lock sl(streamRecordsMutex_);
244 
245  streamNames_.clear();
246  eventsPerStream_.clear();
247  ratePerStream_.clear();
248  bandwidthPerStream_.clear();
249 
250  streamNames_.reserve(streamRecords_.size());
251  eventsPerStream_.reserve(streamRecords_.size());
252  ratePerStream_.reserve(streamRecords_.size());
253  bandwidthPerStream_.reserve(streamRecords_.size());
254 
255  for (
256  StreamRecordList::const_iterator
257  it = streamRecords_.begin(), itEnd = streamRecords_.end();
258  it != itEnd;
259  ++it
260  )
261  {
262  MonitoredQuantity::Stats streamVolumeStats;
263  (*it)->volume.getStats(streamVolumeStats);
264  MonitoredQuantity::Stats streamBandwidthStats;
265  (*it)->bandwidth.getStats(streamBandwidthStats);
266 
267  streamNames_.push_back(
268  static_cast<xdata::String>( (*it)->streamName )
269  );
270 
271  eventsPerStream_.push_back(
272  static_cast<xdata::UnsignedInteger32>(
273  streamVolumeStats.getSampleCount(MonitoredQuantity::FULL)
274  )
275  );
276 
277  ratePerStream_.push_back(
278  static_cast<xdata::Double>(
279  streamVolumeStats.getSampleRate(MonitoredQuantity::RECENT)
280  )
281  );
282 
283  bandwidthPerStream_.push_back(
284  static_cast<xdata::Double>(
285  streamBandwidthStats.getValueRate(MonitoredQuantity::RECENT)
286  )
287  );
288  }
289  }
290 
291 } // namespace stor
292 
double getValueRate(DataSetType t=FULL) const
double seconds()
void addSample(const double &value=1)
uint64_t getSampleCount(DataSetType t=FULL) const
const utils::Duration_t timeWindowForRecentResults_
xdata::Vector< xdata::UnsignedInteger32 > eventsPerStream_
void getStats(Stats &stats) const
xdata::Vector< xdata::String > streamNames_
boost::shared_ptr< EndOfRunReport > EndOfRunReportPtr
void calculateStatistics(const utils::TimePoint_t &currentTime=utils::getCurrentTime())
std::vector< StreamRecordPtr > StreamRecordList
xdata::UnsignedInteger32 storedEvents_
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
boost::shared_ptr< DbFileHandler > DbFileHandlerPtr
Definition: DbFileHandler.h:65
boost::shared_ptr< StreamRecord > StreamRecordPtr
double getSampleRate(DataSetType t=FULL) const
void incrementFileCount(const uint32_t lumiSection)
void reportAllLumiSectionInfos(DbFileHandlerPtr, EndOfRunReportPtr)
virtual void do_appendInfoSpaceItems(InfoSpaceItems &)
xdata::Vector< xdata::Double > ratePerStream_
std::vector< std::pair< std::string, xdata::Serializable * > > InfoSpaceItems
void getStreamRecords(StreamRecordList &) const
bool reportLumiSectionInfo(const uint32_t &lumiSection, std::string &str)
StreamsMonitorCollection(const utils::Duration_t &updateInterval)
double getValueSum(DataSetType t=FULL) const
tuple size
Write out results.
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
xdata::Vector< xdata::Double > bandwidthPerStream_