CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DataRetrieverMonitorCollection.cc
Go to the documentation of this file.
1 // $Id: DataRetrieverMonitorCollection.cc,v 1.1.2.8 2011/03/01 08:32:15 mommsen Exp $
3 
4 #include <string>
5 #include <sstream>
6 #include <iomanip>
7 
8 #include <boost/pointer_cast.hpp>
9 
12 
13 
14 namespace smproxy {
15 
17  (
18  const stor::utils::Duration_t& updateInterval
19  ) :
20  MonitorCollection(updateInterval),
21  totalSize_(updateInterval, boost::posix_time::seconds(60)),
22  updateInterval_(updateInterval),
23  eventTypeMqMap_(updateInterval)
24  {}
25 
26 
28  (
29  const stor::RegPtr regPtr
30  )
31  {
32  boost::mutex::scoped_lock sl(statsMutex_);
33  ++nextConnectionId_;
34 
35  DataRetrieverMQPtr dataRetrieverMQ( new DataRetrieverMQ(regPtr, updateInterval_) );
36  retrieverMqMap_.insert(
37  RetrieverMqMap::value_type(nextConnectionId_, dataRetrieverMQ)
38  );
39 
40  eventTypeMqMap_.insert(regPtr);
41 
42  connectionMqMap_.insert(ConnectionMqMap::value_type(regPtr->sourceURL(),
44  new stor::MonitoredQuantity(updateInterval_, boost::posix_time::seconds(60))
45  )
46  ));
47 
48  return nextConnectionId_;
49  }
50 
51 
53  (
54  const ConnectionID& connectionId,
56  )
57  {
58  boost::mutex::scoped_lock sl(statsMutex_);
59  RetrieverMqMap::const_iterator pos = retrieverMqMap_.find(connectionId);
60  if ( pos == retrieverMqMap_.end() ) return false;
61  pos->second->connectionStatus_ = status;
62  return true;
63  }
64 
65 
67  (
68  const ConnectionID& connectionId,
69  EventTypeStats& stats
70  )
71  {
72  boost::mutex::scoped_lock sl(statsMutex_);
73  RetrieverMqMap::const_iterator pos = retrieverMqMap_.find(connectionId);
74 
75  if ( pos == retrieverMqMap_.end() ) return false;
76 
77  stats.regPtr = pos->second->regPtr_;
78  stats.connectionStatus = pos->second->connectionStatus_;
79  pos->second->size_.getStats(stats.sizeStats);
80 
81  return true;
82  }
83 
84 
86  (
87  const ConnectionID& connectionId,
88  const unsigned int& size
89  )
90  {
91  boost::mutex::scoped_lock sl(statsMutex_);
92 
93  RetrieverMqMap::const_iterator retrieverPos = retrieverMqMap_.find(connectionId);
94  if ( retrieverPos == retrieverMqMap_.end() ) return false;
95 
96  const double sizeKB = static_cast<double>(size) / 1024;
97  retrieverPos->second->size_.addSample(sizeKB);
98 
99  const stor::RegPtr regPtr = retrieverPos->second->regPtr_;
100 
101  eventTypeMqMap_.addSample(regPtr, sizeKB);
102 
103  const std::string sourceURL = regPtr->sourceURL();
104  ConnectionMqMap::const_iterator connectionPos = connectionMqMap_.find(sourceURL);
105  connectionPos->second->addSample(sizeKB);
106 
107  totalSize_.addSample(sizeKB);
108 
109  return true;
110  }
111 
112 
114  {
115  boost::mutex::scoped_lock sl(statsMutex_);
116 
117  stats.registeredSMs = 0;
118  stats.activeSMs = 0;
119 
120  for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
121  itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
122  {
123  ++stats.registeredSMs;
124  if ( it->second->connectionStatus_ == CONNECTED )
125  ++stats.activeSMs;
126  }
127 
129 
131  }
132 
133 
135  {
136  boost::mutex::scoped_lock sl(statsMutex_);
137  cs.clear();
138 
139  for (ConnectionMqMap::const_iterator it = connectionMqMap_.begin(),
140  itEnd = connectionMqMap_.end(); it != itEnd; ++it)
141  {
143  it->second->getStats(stats);
144  cs.insert(ConnectionStats::value_type(it->first, stats));
145  }
146  }
147 
148 
150  {
151  boost::mutex::scoped_lock sl(statsMutex_);
152  etsl.clear();
153 
154  for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
155  itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
156  {
157  const DataRetrieverMQPtr mq = it->second;
158  EventTypeStats stats;
159  stats.regPtr = mq->regPtr_;
160  stats.connectionStatus = mq->connectionStatus_;
161  mq->size_.getStats(stats.sizeStats);
162  etsl.push_back(stats);
163  }
164  std::sort(etsl.begin(), etsl.end());
165  }
166 
167 
169  {
170  boost::mutex::scoped_lock sl(statsMutex_);
171 
173 
174  for (RetrieverMqMap::const_iterator it = retrieverMqMap_.begin(),
175  itEnd = retrieverMqMap_.end(); it != itEnd; ++it)
176  {
177  it->second->size_.calculateStatistics();
178  }
179 
180  for (ConnectionMqMap::const_iterator it = connectionMqMap_.begin(),
181  itEnd = connectionMqMap_.end(); it != itEnd; ++it)
182  {
183  it->second->calculateStatistics();
184  }
185 
187  }
188 
189 
191  {
192  boost::mutex::scoped_lock sl(statsMutex_);
193  totalSize_.reset();
194  retrieverMqMap_.clear();
195  connectionMqMap_.clear();
197  }
198 
199 
202  {
203  return (
204  insert(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer)) ||
205  insert(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer))
206  );
207  }
208 
209 
211  addSample(const stor::RegPtr consumer, const double& sizeKB)
212  {
213  return (
214  addSample(boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(consumer), sizeKB) ||
215  addSample(boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(consumer), sizeKB)
216  );
217  }
218 
219 
222  {
223  eventTypeStats.clear();
224  eventTypeStats.reserve(eventMap_.size()+dqmEventMap_.size());
225 
226  for (EventMap::const_iterator it = eventMap_.begin(),
227  itEnd = eventMap_.end(); it != itEnd; ++it)
228  {
230  it->second->getStats(etStats);
231  eventTypeStats.push_back(
232  std::make_pair(it->first, etStats));
233  }
234 
235  for (DQMEventMap::const_iterator it = dqmEventMap_.begin(),
236  itEnd = dqmEventMap_.end(); it != itEnd; ++it)
237  {
239  it->second->getStats(etStats);
240  eventTypeStats.push_back(
241  std::make_pair(it->first, etStats));
242  }
243  }
244 
245 
248  {
249  for (EventMap::iterator it = eventMap_.begin(),
250  itEnd = eventMap_.end(); it != itEnd; ++it)
251  {
252  it->second->calculateStatistics();
253  }
254  for (DQMEventMap::iterator it = dqmEventMap_.begin(),
255  itEnd = dqmEventMap_.end(); it != itEnd; ++it)
256  {
257  it->second->calculateStatistics();
258  }
259  }
260 
261 
264  {
265  eventMap_.clear();
266  dqmEventMap_.clear();
267  }
268 
269 
271  insert(const stor::EventConsRegPtr eventConsumer)
272  {
273  if ( eventConsumer == 0 ) return false;
274  eventMap_.insert(EventMap::value_type(eventConsumer,
277  )));
278  return true;
279  }
280 
281 
283  insert(const stor::DQMEventConsRegPtr dqmEventConsumer)
284  {
285  if ( dqmEventConsumer == 0 ) return false;
286  dqmEventMap_.insert(DQMEventMap::value_type(dqmEventConsumer,
289  )));
290  return true;
291  }
292 
293 
295  addSample(const stor::EventConsRegPtr eventConsumer, const double& sizeKB)
296  {
297  if ( eventConsumer == 0 ) return false;
298  EventMap::const_iterator pos = eventMap_.find(eventConsumer);
299  pos->second->addSample(sizeKB);
300  return true;
301  }
302 
303 
305  addSample(const stor::DQMEventConsRegPtr dqmEventConsumer, const double& sizeKB)
306  {
307  if ( dqmEventConsumer == 0 ) return false;
308  DQMEventMap::const_iterator pos = dqmEventMap_.find(dqmEventConsumer);
309  pos->second->addSample(sizeKB);
310  return true;
311  }
312 
313 
315  {
316  if ( regPtr->sourceURL() != other.regPtr->sourceURL() )
317  return ( regPtr->sourceURL() < other.regPtr->sourceURL() );
318 
319  stor::EventConsRegPtr ecrp =
320  boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(regPtr);
321  stor::EventConsRegPtr ecrpOther =
322  boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(other.regPtr);
323  if ( ecrp && ecrpOther )
324  return ( *ecrp < *ecrpOther);
325 
327  boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(regPtr);
328  stor::DQMEventConsRegPtr dcrpOther =
329  boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(other.regPtr);
330  if ( dcrp && dcrpOther )
331  return ( *dcrp < *dcrpOther);
332 
333  return false;
334  }
335 
336 
338  (
339  const stor::RegPtr regPtr,
340  const stor::utils::Duration_t& updateInterval
341  ):
342  regPtr_(regPtr),
343  connectionStatus_(UNKNOWN),
344  size_(updateInterval, boost::posix_time::seconds(60))
345  {}
346 
347 } // namespace smproxy
348 
349 
350 std::ostream& smproxy::operator<<
351 (
352  std::ostream& os,
354 )
355 {
356  switch (status)
357  {
359  os << "Connected";
360  break;
362  os << "Could not connect. SM not running?";
363  break;
365  os << "Lost connection to SM. Did it fail?";
366  break;
368  os << "unknown";
369  break;
370  }
371 
372  return os;
373 }
374 
375 
boost::shared_ptr< RegistrationInfoBase > RegPtr
double seconds()
boost::shared_ptr< MonitoredQuantity > MonitoredQuantityPtr
bool setConnectionStatus(const ConnectionID &, const ConnectionStatus &)
bool getEventTypeStatsForConnection(const ConnectionID &, EventTypeStats &)
DataRetrieverMonitorCollection(const stor::utils::Duration_t &updateInterval)
boost::shared_ptr< stor::EventConsumerRegistrationInfo > EventConsRegPtr
Container::value_type value_type
void getStats(Stats &stats) const
bool addRetrievedSample(const ConnectionID &, const unsigned int &size)
void calculateStatistics(const utils::TimePoint_t &currentTime=utils::getCurrentTime())
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
boost::shared_ptr< DataRetrieverMQ > DataRetrieverMQPtr
std::map< std::string, stor::MonitoredQuantity::Stats > ConnectionStats
DataRetrieverMQ(stor::RegPtr, const stor::utils::Duration_t &updateInterval)
tuple status
Definition: ntuplemaker.py:245
tuple size
Write out results.
boost::shared_ptr< stor::DQMEventConsumerRegistrationInfo > DQMEventConsRegPtr