CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamsMonitorCollection.cc
Go to the documentation of this file.
1 // $Id: StreamsMonitorCollection.cc,v 1.22 2011/11/17 17:35:40 mommsen Exp $
3 
4 #include <string>
5 #include <sstream>
6 #include <iomanip>
7 
10 
11 
12 namespace stor {
13 
15  (
16  const utils::Duration_t& updateInterval
17  ) :
18  MonitorCollection(updateInterval),
19  updateInterval_(updateInterval),
20  timeWindowForRecentResults_(boost::posix_time::seconds(10)),
21  allStreamsFileCount_(updateInterval, timeWindowForRecentResults_),
22  allStreamsVolume_(updateInterval, timeWindowForRecentResults_),
23  allStreamsBandwidth_(updateInterval, timeWindowForRecentResults_)
24  {}
25 
26 
29  {
30  boost::mutex::scoped_lock sl(streamRecordsMutex_);
31 
32  StreamRecordPtr streamRecord(
34  );
35  streamRecords_.push_back(streamRecord);
36  return streamRecord;
37  }
38 
39 
41  {
42  boost::mutex::scoped_lock sl(streamRecordsMutex_);
43 
44  list.clear();
45  list.reserve(streamRecords_.size());
46 
47  for (
48  StreamRecordList::const_iterator
49  it = streamRecords_.begin(), itEnd = streamRecords_.end();
50  it != itEnd;
51  ++it
52  )
53  {
54  list.push_back(*it);
55  }
56  }
57 
58 
60  (
61  const std::string& label,
63  ) const
64  {
65  boost::mutex::scoped_lock sl(streamRecordsMutex_);
66 
67  list.clear();
68  list.reserve(streamRecords_.size());
69 
70  for (
71  StreamRecordList::const_iterator
72  it = streamRecords_.begin(), itEnd = streamRecords_.end();
73  it != itEnd;
74  ++it
75  )
76  {
77  if ( (*it)->outputModuleLabel == label )
78  list.push_back(*it);
79  }
80  return ( ! list.empty() );
81  }
82 
83 
85  {
86  boost::mutex::scoped_lock sl(streamRecordsMutex_);
87 
88  return ( ! streamRecords_.empty() );
89  }
90 
91 
93  (
94  const uint32_t lumiSection
95  )
96  {
97  fileCount.addSample(1);
98  parentCollection->allStreamsFileCount_.addSample(1);
99  ++fileCountPerLS[lumiSection];
100  }
101 
102 
104  {
105  size = size / (1024 * 1024);
106  volume.addSample(size);
108  }
109 
110 
112  (
113  const uint32_t& lumiSection,
114  std::string& str
115  )
116  {
117  std::ostringstream msg;
118  if (str.empty())
119  {
120  msg << "LS:" << lumiSection;
121  }
122 
123  unsigned int count = 0;
124  FileCountPerLumiSectionMap::iterator pos = fileCountPerLS.find(lumiSection);
125  if ( pos != fileCountPerLS.end() )
126  {
127  count = pos->second;
128  fileCountPerLS.erase(pos);
129  }
130  msg << "\t" << streamName << ":" << count;
131  str += msg.str();
132 
133  return (count>0);
134  }
135 
136 
138  (
139  DbFileHandlerPtr dbFileHandler,
140  EndOfRunReportPtr endOfRunReport
141  )
142  {
143  boost::mutex::scoped_lock sl(streamRecordsMutex_);
144 
145  UnreportedLS unreportedLS;
146  getListOfAllUnreportedLS(unreportedLS);
147 
148  for (UnreportedLS::const_iterator it = unreportedLS.begin(),
149  itEnd = unreportedLS.end(); it != itEnd; ++it)
150  {
151  std::string lsEntry;
152  bool filesWritten = false;
153 
154  for (StreamRecordList::const_iterator
155  stream = streamRecords_.begin(),
156  streamEnd = streamRecords_.end();
157  stream != streamEnd;
158  ++stream)
159  {
160  if ( (*stream)->reportLumiSectionInfo((*it), lsEntry) )
161  filesWritten = true;
162  }
163  lsEntry += "\tEoLS:0";
164  dbFileHandler->write(lsEntry);
165 
166  if (filesWritten) ++(endOfRunReport->lsCountWithFiles);
167  endOfRunReport->updateLatestWrittenLumiSection(*it);
168  }
169  }
170 
171 
173  {
174  // Have to loop over all streams as not every stream
175  // might have got an event for a given lumi section
176  for (StreamRecordList::const_iterator
177  stream = streamRecords_.begin(),
178  streamEnd = streamRecords_.end();
179  stream != streamEnd;
180  ++stream)
181  {
182  for (StreamRecord::FileCountPerLumiSectionMap::const_iterator
183  lscount = (*stream)->fileCountPerLS.begin(),
184  lscountEnd = (*stream)->fileCountPerLS.end();
185  lscount != lscountEnd; ++lscount)
186  {
187  unreportedLS.insert(lscount->first);
188  }
189  }
190  }
191 
192 
194  {
196 
200  bool samplingHasStarted = (stats.getSampleCount() > 0);
201  if (samplingHasStarted) {
203  }
205 
206 
207  boost::mutex::scoped_lock sl(streamRecordsMutex_);
208 
209  for (
210  StreamRecordList::const_iterator
211  it = streamRecords_.begin(), itEnd = streamRecords_.end();
212  it != itEnd;
213  ++it
214  )
215  {
216  (*it)->fileCount.calculateStatistics();
217  (*it)->volume.calculateStatistics();
218  (*it)->volume.getStats(stats);
219  if (samplingHasStarted) {
220  (*it)->bandwidth.addSample(stats.getLastValueRate());
221  }
222  (*it)->bandwidth.calculateStatistics();
223  }
224  }
225 
226 
228  (
229  InfoSpaceItems& infoSpaceItems
230  )
231  {
232  infoSpaceItems.push_back(std::make_pair("storedEvents", &storedEvents_));
233  infoSpaceItems.push_back(std::make_pair("storedVolume", &storedVolume_));
234  infoSpaceItems.push_back(std::make_pair("bandwidthToDisk", &bandwidthToDisk_));
235  infoSpaceItems.push_back(std::make_pair("streamNames", &streamNames_));
236  infoSpaceItems.push_back(std::make_pair("eventsPerStream", &eventsPerStream_));
237  infoSpaceItems.push_back(std::make_pair("ratePerStream", &ratePerStream_));
238  infoSpaceItems.push_back(std::make_pair("bandwidthPerStream", &bandwidthPerStream_));
239  }
240 
241 
243  {
247 
248  boost::mutex::scoped_lock sl(streamRecordsMutex_);
249  streamRecords_.clear();
250  }
251 
252 
254  {
255  MonitoredQuantity::Stats allStreamsVolumeStats;
256  allStreamsVolume_.getStats(allStreamsVolumeStats);
257 
258  storedEvents_ = static_cast<xdata::UnsignedInteger32>(
259  allStreamsVolumeStats.getSampleCount()
260  );
261  storedVolume_ = static_cast<xdata::Double>(
262  allStreamsVolumeStats.getValueSum()
263  );
264  bandwidthToDisk_ = static_cast<xdata::Double>(
265  allStreamsVolumeStats.getValueRate(MonitoredQuantity::RECENT)
266  );
267 
268  boost::mutex::scoped_lock sl(streamRecordsMutex_);
269 
270  streamNames_.clear();
271  eventsPerStream_.clear();
272  ratePerStream_.clear();
273  bandwidthPerStream_.clear();
274 
275  streamNames_.reserve(streamRecords_.size());
276  eventsPerStream_.reserve(streamRecords_.size());
277  ratePerStream_.reserve(streamRecords_.size());
278  bandwidthPerStream_.reserve(streamRecords_.size());
279 
280  for (
281  StreamRecordList::const_iterator
282  it = streamRecords_.begin(), itEnd = streamRecords_.end();
283  it != itEnd;
284  ++it
285  )
286  {
287  MonitoredQuantity::Stats streamVolumeStats;
288  (*it)->volume.getStats(streamVolumeStats);
289  MonitoredQuantity::Stats streamBandwidthStats;
290  (*it)->bandwidth.getStats(streamBandwidthStats);
291 
292  streamNames_.push_back(
293  static_cast<xdata::String>( (*it)->streamName )
294  );
295 
296  eventsPerStream_.push_back(
297  static_cast<xdata::UnsignedInteger32>(
298  streamVolumeStats.getSampleCount(MonitoredQuantity::FULL)
299  )
300  );
301 
302  ratePerStream_.push_back(
303  static_cast<xdata::Double>(
304  streamVolumeStats.getSampleRate(MonitoredQuantity::RECENT)
305  )
306  );
307 
308  bandwidthPerStream_.push_back(
309  static_cast<xdata::Double>(
310  streamBandwidthStats.getValueRate(MonitoredQuantity::RECENT)
311  )
312  );
313  }
314  }
315 
316 } // namespace stor
317 
double getValueRate(DataSetType t=FULL) const
double seconds()
void addSample(const double &value=1)
bool getStreamRecordsForOutputModuleLabel(const std::string &, StreamRecordList &) const
uint64_t getSampleCount(DataSetType t=FULL) const
const utils::Duration_t timeWindowForRecentResults_
xdata::Vector< xdata::UnsignedInteger32 > eventsPerStream_
void getStats(Stats &stats) const
xdata::Vector< xdata::String > streamNames_
boost::shared_ptr< EndOfRunReport > EndOfRunReportPtr
void calculateStatistics(const utils::TimePoint_t &currentTime=utils::getCurrentTime())
std::vector< StreamRecordPtr > StreamRecordList
xdata::UnsignedInteger32 storedEvents_
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
boost::shared_ptr< DbFileHandler > DbFileHandlerPtr
Definition: DbFileHandler.h:71
boost::shared_ptr< StreamRecord > StreamRecordPtr
double getSampleRate(DataSetType t=FULL) const
void incrementFileCount(const uint32_t lumiSection)
void reportAllLumiSectionInfos(DbFileHandlerPtr, EndOfRunReportPtr)
virtual void do_appendInfoSpaceItems(InfoSpaceItems &)
xdata::Vector< xdata::Double > ratePerStream_
std::vector< std::pair< std::string, xdata::Serializable * > > InfoSpaceItems
void getStreamRecords(StreamRecordList &) const
bool reportLumiSectionInfo(const uint32_t &lumiSection, std::string &str)
StreamsMonitorCollection(const utils::Duration_t &updateInterval)
double getValueSum(DataSetType t=FULL) const
tuple size
Write out results.
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
xdata::Vector< xdata::Double > bandwidthPerStream_