CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DataRetrieverMonitorCollection.cc
Go to the documentation of this file.
1 // $Id: DataRetrieverMonitorCollection.cc,v 1.3 2011/05/09 11:03:34 mommsen Exp $
3 
4 #include <string>
5 #include <sstream>
6 #include <iomanip>
7 
8 #include <boost/pointer_cast.hpp>
9 
12 
13 
14 namespace smproxy {
15 
17  (
18  const stor::utils::Duration_t& updateInterval,
19  stor::AlarmHandlerPtr alarmHandler
20  ) :
21  MonitorCollection(updateInterval),
22  updateInterval_(updateInterval),
23  alarmHandler_(alarmHandler),
24  totals_(updateInterval),
25  eventTypeMqMap_(updateInterval)
26  {}
27 
28 
30  (
31  const stor::RegPtr regPtr
32  )
33  {
34  boost::mutex::scoped_lock sl(statsMutex_);
35  ++nextConnectionId_;
36 
37  DataRetrieverMQPtr dataRetrieverMQ( new DataRetrieverMQ(regPtr, updateInterval_) );
38  retrieverMqMap_.insert(
39  RetrieverMqMap::value_type(nextConnectionId_, dataRetrieverMQ)
40  );
41 
42  eventTypeMqMap_.insert(regPtr);
43 
44  connectionMqMap_.insert(ConnectionMqMap::value_type(
45  regPtr->sourceURL(),
46  EventMQPtr(new EventMQ(updateInterval_))
47  ));
48 
49  return nextConnectionId_;
50  }
51 
52 
54  (
55  const ConnectionID& connectionId,
57  )
58  {
59  boost::mutex::scoped_lock sl(statsMutex_);
60  RetrieverMqMap::const_iterator pos = retrieverMqMap_.find(connectionId);
61  if ( pos == retrieverMqMap_.end() ) return false;
62  pos->second->connectionStatus_ = status;
63  return true;
64  }
65 
66 
68  (
69  const ConnectionID& connectionId,
71  )
72  {
73  boost::mutex::scoped_lock sl(statsMutex_);
74  RetrieverMqMap::const_iterator pos = retrieverMqMap_.find(connectionId);
75 
76  if ( pos == retrieverMqMap_.end() ) return false;
77 
78  stats.regPtr = pos->second->regPtr_;
79  stats.connectionStatus = pos->second->connectionStatus_;
80  pos->second->eventMQ_->getStats(stats.eventStats);
81 
82  return true;
83  }
84 
85 
87  (
88  const ConnectionID& connectionId,
89  const unsigned int& size
90  )
91  {
92  boost::mutex::scoped_lock sl(statsMutex_);
93 
94  RetrieverMqMap::const_iterator retrieverPos = retrieverMqMap_.find(connectionId);
95  if ( retrieverPos == retrieverMqMap_.end() ) return false;
96 
97  const double sizeKB = static_cast<double>(size) / 1024;
98  retrieverPos->second->eventMQ_->size_.addSample(sizeKB);
99 
100  const stor::RegPtr regPtr = retrieverPos->second->regPtr_;
101 
102  eventTypeMqMap_.addSample(regPtr, sizeKB);
103 
104  const std::string sourceURL = regPtr->sourceURL();
105  ConnectionMqMap::const_iterator connectionPos = connectionMqMap_.find(sourceURL);
106  connectionPos->second->size_.addSample(sizeKB);
107 
108  totals_.size_.addSample(sizeKB);
109 
110  return true;
111  }
112 
113 
115  (
116  const ConnectionID& connectionId
117  )
118  {
119  boost::mutex::scoped_lock sl(statsMutex_);
120 
121  RetrieverMqMap::const_iterator retrieverPos = retrieverMqMap_.find(connectionId);
122  if ( retrieverPos == retrieverMqMap_.end() ) return false;
123 
124  retrieverPos->second->eventMQ_->corruptedEvents_.addSample(1);
125 
126  const stor::RegPtr regPtr = retrieverPos->second->regPtr_;
127 
128  eventTypeMqMap_.receivedCorruptedEvent(regPtr);
129 
130  const std::string sourceURL = regPtr->sourceURL();
131  ConnectionMqMap::const_iterator connectionPos = connectionMqMap_.find(sourceURL);
132  connectionPos->second->corruptedEvents_.addSample(1);
133 
134  totals_.corruptedEvents_.addSample(1);
135 
136  return true;
137  }
138 
139 
141  {
142  boost::mutex::scoped_lock sl(statsMutex_);
143 
144  stats.registeredSMs = 0;
145  stats.activeSMs = 0;
146 
147  for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
148  itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
149  {
150  ++stats.registeredSMs;
151  if ( it->second->connectionStatus_ == CONNECTED )
152  ++stats.activeSMs;
153  }
154 
156 
157  totals_.getStats(stats.totals);
158  }
159 
160 
162  {
163  boost::mutex::scoped_lock sl(statsMutex_);
164  cs.clear();
165 
166  for (ConnectionMqMap::const_iterator it = connectionMqMap_.begin(),
167  itEnd = connectionMqMap_.end(); it != itEnd; ++it)
168  {
169  EventStats stats;
170  it->second->getStats(stats);
171  cs.insert(ConnectionStats::value_type(it->first, stats));
172  }
173  }
174 
175 
177  (
179  ) const
180  {
181  boost::mutex::scoped_lock sl(statsMutex_);
182  etsl.clear();
183 
184  for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
185  itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
186  {
187  const DataRetrieverMQPtr mq = it->second;
189  stats.regPtr = mq->regPtr_;
190  stats.connectionStatus = mq->connectionStatus_;
191  mq->eventMQ_->getStats(stats.eventStats);
192  etsl.push_back(stats);
193  }
194  std::sort(etsl.begin(), etsl.end());
195  }
196 
197 
199  {
200  alarmParams_ = alarmParams;
201  }
202 
203 
205  {
206  if ( ! alarmParams_.sendAlarms_ ) return;
207 
209  }
210 
211 
213  {
214  const std::string alarmName = "CorruptedEvents";
215 
216  EventStats eventStats;
217  totals_.getStats(eventStats);
218  const double corruptedEventRate =
220  if ( corruptedEventRate > alarmParams_.corruptedEventRate_ )
221  {
222  std::ostringstream msg;
223  msg << "Received " << corruptedEventRate << " Hz of corrupted events from StorageManagers.";
224  XCEPT_DECLARE(exception::CorruptedEvents, ex, msg.str());
225  alarmHandler_->raiseAlarm(alarmName, stor::AlarmHandler::ERROR, ex);
226  }
227  else if ( corruptedEventRate < (alarmParams_.corruptedEventRate_ * 0.9) )
228  // avoid revoking the alarm if we're close to the limit
229  {
230  alarmHandler_->revokeAlarm(alarmName);
231  }
232  }
233 
234 
236  {
237  boost::mutex::scoped_lock sl(statsMutex_);
238 
240 
241  for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
242  itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
243  {
244  it->second->eventMQ_->calculateStatistics();
245  }
246 
247  for (ConnectionMqMap::const_iterator it = connectionMqMap_.begin(),
248  itEnd = connectionMqMap_.end(); it != itEnd; ++it)
249  {
250  it->second->calculateStatistics();
251  }
252 
254 
255  sendAlarms();
256  }
257 
258 
260  {
261  boost::mutex::scoped_lock sl(statsMutex_);
262  totals_.reset();
263  retrieverMqMap_.clear();
264  connectionMqMap_.clear();
266  }
267 
268 
271  {
272  return (
273  insert(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer)) ||
274  insert(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer))
275  );
276  }
277 
278 
280  addSample(const stor::RegPtr consumer, const double& sizeKB)
281  {
282  return (
283  addSample(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer), sizeKB) ||
284  addSample(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer), sizeKB)
285  );
286  }
287 
288 
291  {
292  return (
293  receivedCorruptedEvent(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer)) ||
294  receivedCorruptedEvent(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer))
295  );
296  }
297 
298 
301  {
302  eventTypeStats.clear();
303  eventTypeStats.reserve(eventMap_.size()+dqmEventMap_.size());
304 
305  for (EventMap::const_iterator it = eventMap_.begin(),
306  itEnd = eventMap_.end(); it != itEnd; ++it)
307  {
308  EventStats eventStats;
309  it->second->size_.getStats(eventStats.sizeStats);
310  it->second->corruptedEvents_.getStats(eventStats.corruptedEventsStats);
311  eventTypeStats.push_back(
312  std::make_pair(it->first, eventStats));
313  }
314 
315  for (DQMEventMap::const_iterator it = dqmEventMap_.begin(),
316  itEnd = dqmEventMap_.end(); it != itEnd; ++it)
317  {
318  EventStats eventStats;
319  it->second->size_.getStats(eventStats.sizeStats);
320  it->second->corruptedEvents_.getStats(eventStats.corruptedEventsStats);
321  eventTypeStats.push_back(
322  std::make_pair(it->first, eventStats));
323  }
324  }
325 
326 
329  {
330  for (EventMap::iterator it = eventMap_.begin(),
331  itEnd = eventMap_.end(); it != itEnd; ++it)
332  {
333  it->second->size_.calculateStatistics();
334  it->second->corruptedEvents_.calculateStatistics();
335  }
336  for (DQMEventMap::iterator it = dqmEventMap_.begin(),
337  itEnd = dqmEventMap_.end(); it != itEnd; ++it)
338  {
339  it->second->size_.calculateStatistics();
340  it->second->corruptedEvents_.calculateStatistics();
341  }
342  }
343 
344 
347  {
348  eventMap_.clear();
349  dqmEventMap_.clear();
350  }
351 
352 
354  insert(const stor::EventConsRegPtr eventConsumer)
355  {
356  if ( eventConsumer == 0 ) return false;
357  eventMap_.insert(EventMap::value_type(eventConsumer,
359  ));
360  return true;
361  }
362 
363 
365  insert(const stor::DQMEventConsRegPtr dqmEventConsumer)
366  {
367  if ( dqmEventConsumer == 0 ) return false;
368  dqmEventMap_.insert(DQMEventMap::value_type(dqmEventConsumer,
370  ));
371  return true;
372  }
373 
374 
376  addSample(const stor::EventConsRegPtr eventConsumer, const double& sizeKB)
377  {
378  if ( eventConsumer == 0 ) return false;
379  EventMap::const_iterator pos = eventMap_.find(eventConsumer);
380  pos->second->size_.addSample(sizeKB);
381  return true;
382  }
383 
384 
386  addSample(const stor::DQMEventConsRegPtr dqmEventConsumer, const double& sizeKB)
387  {
388  if ( dqmEventConsumer == 0 ) return false;
389  DQMEventMap::const_iterator pos = dqmEventMap_.find(dqmEventConsumer);
390  pos->second->size_.addSample(sizeKB);
391  return true;
392  }
393 
394 
397  {
398  if ( eventConsumer == 0 ) return false;
399  EventMap::const_iterator pos = eventMap_.find(eventConsumer);
400  pos->second->corruptedEvents_.addSample(1);
401  return true;
402  }
403 
404 
407  {
408  if ( dqmEventConsumer == 0 ) return false;
409  DQMEventMap::const_iterator pos = dqmEventMap_.find(dqmEventConsumer);
410  pos->second->corruptedEvents_.addSample(1);
411  return true;
412  }
413 
414 
417  {
418  if ( regPtr->sourceURL() != other.regPtr->sourceURL() )
419  return ( regPtr->sourceURL() < other.regPtr->sourceURL() );
420 
421  stor::EventConsRegPtr ecrp =
422  boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(regPtr);
423  stor::EventConsRegPtr ecrpOther =
424  boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(other.regPtr);
425  if ( ecrp && ecrpOther )
426  return ( *ecrp < *ecrpOther);
427 
429  boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(regPtr);
430  stor::DQMEventConsRegPtr dcrpOther =
431  boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(other.regPtr);
432  if ( dcrp && dcrpOther )
433  return ( *dcrp < *dcrpOther);
434 
435  return false;
436  }
437 
438 
440  (
441  const stor::utils::Duration_t& updateInterval
442  ):
443  size_(updateInterval, boost::posix_time::seconds(60)),
444  corruptedEvents_(updateInterval, boost::posix_time::seconds(60))
445  {}
446 
447 
449  {
450  size_.getStats(stats.sizeStats);
451  corruptedEvents_.getStats(stats.corruptedEventsStats);
452  }
453 
454 
456  {
457  size_.calculateStatistics();
458  corruptedEvents_.calculateStatistics();
459  }
460 
461 
463  {
464  size_.reset();
465  corruptedEvents_.reset();
466  }
467 
468 
470  (
471  const stor::RegPtr regPtr,
472  const stor::utils::Duration_t& updateInterval
473  ):
474  regPtr_(regPtr),
475  connectionStatus_(UNKNOWN),
476  eventMQ_(new EventMQ(updateInterval))
477  {}
478 
479 } // namespace smproxy
480 
481 
482 std::ostream& smproxy::operator<<
483 (
484  std::ostream& os,
486 )
487 {
488  switch (status)
489  {
491  os << "Connected";
492  break;
494  os << "Could not connect. SM not running?";
495  break;
497  os << "Lost connection to SM. Did it fail?";
498  break;
500  os << "unknown";
501  break;
502  }
503 
504  return os;
505 }
506 
507 
double getValueRate(DataSetType t=FULL) const
std::map< std::string, EventStats > ConnectionStats
auto_ptr< ClusterSequence > cs
boost::shared_ptr< RegistrationInfoBase > RegPtr
double seconds()
void getStatsByEventTypesPerConnection(EventTypePerConnectionStatList &) const
bool setConnectionStatus(const ConnectionID &, const ConnectionStatus &)
boost::shared_ptr< stor::EventConsumerRegistrationInfo > EventConsRegPtr
bool addRetrievedSample(const ConnectionID &, const unsigned int &size)
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
std::vector< EventTypePerConnectionStats > EventTypePerConnectionStatList
boost::shared_ptr< DataRetrieverMQ > DataRetrieverMQPtr
Container::value_type value_type
boost::shared_ptr< AlarmHandler > AlarmHandlerPtr
Definition: AlarmHandler.h:116
DataRetrieverMQ(stor::RegPtr, const stor::utils::Duration_t &updateInterval)
EventMQ(const stor::utils::Duration_t &updateInterval)
size_(0)
Definition: OwnArray.h:181
tuple status
Definition: ntuplemaker.py:245
DataRetrieverMonitorCollection(const stor::utils::Duration_t &updateInterval, stor::AlarmHandlerPtr)
bool getEventTypeStatsForConnection(const ConnectionID &, EventTypePerConnectionStats &)
tuple size
Write out results.
boost::shared_ptr< stor::DQMEventConsumerRegistrationInfo > DQMEventConsRegPtr