CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DataSenderMonitorCollection.cc
Go to the documentation of this file.
1 // $Id: DataSenderMonitorCollection.cc,v 1.20 2012/04/20 10:48:02 mommsen Exp $
3 
4 #include <string>
5 #include <sstream>
6 #include <iomanip>
7 
8 #include <zlib.h>
9 #include <boost/lexical_cast.hpp>
10 
13 
14 
15 namespace stor {
16 
18  (
19  const utils::Duration_t& updateInterval,
21  ) :
22  MonitorCollection(updateInterval),
23  connectedRBs_(0),
24  connectedEPs_(0),
25  activeEPs_(0),
26  outstandingDataDiscards_(0),
27  outstandingDQMDiscards_(0),
28  faultyEvents_(0),
29  ignoredDiscards_(0),
30  updateInterval_(updateInterval),
31  alarmHandler_(ah)
32  {}
33 
34 
36  {
37  // sanity checks
38  if (i2oChain.messageCode() != Header::INIT) {return;}
39  if (! i2oChain.complete()) {return;}
40 
41  // fetch basic data from the I2OChain
42  std::string outModName = i2oChain.outputModuleLabel();
43  uint32_t msgSize = i2oChain.totalDataSize();
44 
45  // look up the monitoring records that we need
46  bool pointersAreValid;
47  RBRecordPtr rbRecordPtr;
48  FURecordPtr fuRecordPtr;
49  OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
50  {
51  boost::mutex::scoped_lock sl(collectionsMutex_);
52  pointersAreValid = getAllNeededPointers(
53  i2oChain, rbRecordPtr, fuRecordPtr,
54  topLevelOutModPtr, rbSpecificOutModPtr,
55  fuSpecificOutModPtr);
56  }
57 
58  // accumulate the data of interest
59  if (pointersAreValid)
60  {
61  topLevelOutModPtr->name = outModName;
62  topLevelOutModPtr->initMsgSize = msgSize;
63 
64  ++rbRecordPtr->initMsgCount;
65  rbRecordPtr->nExpectedEPs += i2oChain.nExpectedEPs();
66  rbSpecificOutModPtr->name = outModName;
67  rbSpecificOutModPtr->initMsgSize = msgSize;
68 
69  ++fuRecordPtr->initMsgCount;
70  fuSpecificOutModPtr->name = outModName;
71  fuSpecificOutModPtr->initMsgSize = msgSize;
72  }
73  }
74 
75 
77  {
78  // sanity checks
79  if (i2oChain.messageCode() != Header::EVENT) {return;}
80  if (! i2oChain.complete()) {return;}
81 
82  // fetch basic data from the I2OChain
83  double eventSize = static_cast<double>(i2oChain.totalDataSize());
84  uint32_t runNumber = i2oChain.runNumber();
85  uint32_t eventNumber = i2oChain.eventNumber();
86 
87  // look up the monitoring records that we need
88  bool pointersAreValid;
89  RBRecordPtr rbRecordPtr;
90  FURecordPtr fuRecordPtr;
91  OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
92  {
93  boost::mutex::scoped_lock sl(collectionsMutex_);
94  pointersAreValid = getAllNeededPointers(
95  i2oChain, rbRecordPtr, fuRecordPtr,
96  topLevelOutModPtr, rbSpecificOutModPtr,
97  fuSpecificOutModPtr);
98  }
99 
100  // accumulate the data of interest
101  if (pointersAreValid)
102  {
103  topLevelOutModPtr->eventSize.addSample(eventSize);
104 
105  rbRecordPtr->lastRunNumber = runNumber;
106  rbRecordPtr->lastEventNumber = eventNumber;
107  rbRecordPtr->eventSize.addSample(eventSize);
108  rbSpecificOutModPtr->eventSize.addSample(eventSize);
109 
110  fuRecordPtr->lastRunNumber = runNumber;
111  fuRecordPtr->lastEventNumber = eventNumber;
112  fuRecordPtr->shortIntervalEventSize.addSample(eventSize);
113  fuRecordPtr->mediumIntervalEventSize.addSample(eventSize);
114  fuSpecificOutModPtr->eventSize.addSample(eventSize);
115  }
116  }
117 
118 
120  {
121  // sanity checks
122  if (i2oChain.messageCode() != Header::DQM_EVENT) {return;}
123  if (! i2oChain.complete()) {return;}
124 
125  // fetch basic data from the I2OChain
126  double eventSize = static_cast<double>(i2oChain.totalDataSize());
127 
128  // look up the monitoring records that we need
129  bool pointersAreValid;
130  RBRecordPtr rbRecordPtr;
131  FURecordPtr fuRecordPtr;
132  {
133  boost::mutex::scoped_lock sl(collectionsMutex_);
134  pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
135  if (pointersAreValid)
136  {
137  pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
138  }
139  }
140 
141  // accumulate the data of interest
142  if (pointersAreValid)
143  {
144  rbRecordPtr->dqmEventSize.addSample(eventSize);
145  fuRecordPtr->dqmEventSize.addSample(eventSize);
146  }
147  }
148 
149 
151  {
152  // sanity checks
153  if (i2oChain.messageCode() != Header::ERROR_EVENT) {return;}
154  if (! i2oChain.complete()) {return;}
155 
156  // fetch basic data from the I2OChain
157  double eventSize = static_cast<double>(i2oChain.totalDataSize());
158 
159  // look up the monitoring records that we need
160  bool pointersAreValid;
161  RBRecordPtr rbRecordPtr;
162  FURecordPtr fuRecordPtr;
163  {
164  boost::mutex::scoped_lock sl(collectionsMutex_);
165  pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
166  if (pointersAreValid)
167  {
168  pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
169  }
170  }
171 
172  // accumulate the data of interest
173  if (pointersAreValid)
174  {
175  rbRecordPtr->errorEventSize.addSample(eventSize);
176  fuRecordPtr->errorEventSize.addSample(eventSize);
177  }
178  }
179 
180 
182  {
183  // fetch basic data from the I2OChain
184  double eventSize = static_cast<double>(i2oChain.totalDataSize());
185 
186  // look up the monitoring records that we need
187  bool pointersAreValid;
188  RBRecordPtr rbRecordPtr;
189  FURecordPtr fuRecordPtr;
190  {
191  boost::mutex::scoped_lock sl(collectionsMutex_);
192  pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
193  if (pointersAreValid)
194  {
195  pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
196  }
197  }
198 
199  // accumulate the data of interest
200  if (pointersAreValid)
201  {
202  if (i2oChain.messageCode() == Header::DQM_EVENT)
203  {
204  rbRecordPtr->faultyDQMEventSize.addSample(eventSize);
205  fuRecordPtr->faultyDQMEventSize.addSample(eventSize);
206  }
207  else
208  {
209  rbRecordPtr->faultyEventSize.addSample(eventSize);
210  fuRecordPtr->faultyEventSize.addSample(eventSize);
211  }
212  }
213  }
214 
215 
217  {
218  // look up the monitoring records that we need
219  bool pointersAreValid;
220  RBRecordPtr rbRecordPtr;
221  FURecordPtr fuRecordPtr;
222  {
223  boost::mutex::scoped_lock sl(collectionsMutex_);
224  pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
225  if (pointersAreValid)
226  {
227  pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
228  }
229  }
230 
231  // accumulate the data of interest
232  if (pointersAreValid)
233  {
234  rbRecordPtr->dataDiscardCount.addSample(1);
235  fuRecordPtr->dataDiscardCount.addSample(1);
236  }
237  }
238 
239 
241  {
242  // look up the monitoring records that we need
243  bool pointersAreValid;
244  RBRecordPtr rbRecordPtr;
245  FURecordPtr fuRecordPtr;
246  {
247  boost::mutex::scoped_lock sl(collectionsMutex_);
248  pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
249  if (pointersAreValid)
250  {
251  pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
252  }
253  }
254 
255  // accumulate the data of interest
256  if (pointersAreValid)
257  {
258  rbRecordPtr->dqmDiscardCount.addSample(1);
259  fuRecordPtr->dqmDiscardCount.addSample(1);
260  }
261  }
262 
263 
265  {
266  // look up the monitoring records that we need
267  bool pointersAreValid;
268  RBRecordPtr rbRecordPtr;
269  FURecordPtr fuRecordPtr;
270  {
271  boost::mutex::scoped_lock sl(collectionsMutex_);
272  pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
273  if (pointersAreValid)
274  {
275  pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
276  }
277  }
278 
279  // accumulate the data of interest
280  if (pointersAreValid)
281  {
282  rbRecordPtr->skippedDiscardCount.addSample(1);
283  fuRecordPtr->skippedDiscardCount.addSample(1);
284  }
285  }
286 
287 
290  {
291  boost::mutex::scoped_lock sl(collectionsMutex_);
292 
294  }
295 
296 
299  {
300  boost::mutex::scoped_lock sl(collectionsMutex_);
301  ResourceBrokerResultsList resultsList;
302 
303  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
304  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
305  resourceBrokerMap_.end();
306  for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
307  {
308  RBRecordPtr rbRecordPtr = rbMapIter->second;
310  result->uniqueRBID = rbMapIter->first;
311  resultsList.push_back(result);
312  }
313 
314  return resultsList;
315  }
316 
317 
320  {
321  boost::mutex::scoped_lock sl(collectionsMutex_);
323 
324  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
325  rbMapIter = resourceBrokerMap_.find(uniqueRBID);
326  if (rbMapIter != resourceBrokerMap_.end())
327  {
328  RBRecordPtr rbRecordPtr = rbMapIter->second;
329  result = buildResourceBrokerResult(rbRecordPtr);
330  result->uniqueRBID = rbMapIter->first;
331  }
332 
333  return result;
334  }
335 
336 
339  {
340  boost::mutex::scoped_lock sl(collectionsMutex_);
341  OutputModuleResultsList resultsList;
342 
343  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
344  rbMapIter = resourceBrokerMap_.find(uniqueRBID);
345  if (rbMapIter != resourceBrokerMap_.end())
346  {
347  RBRecordPtr rbRecordPtr = rbMapIter->second;
348  resultsList = buildOutputModuleResults(rbRecordPtr->outputModuleMap);
349  }
350 
351  return resultsList;
352  }
353 
354 
357  {
358  boost::mutex::scoped_lock sl(collectionsMutex_);
359  FilterUnitResultsList resultsList;
360 
361  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
362  rbMapIter = resourceBrokerMap_.find(uniqueRBID);
363  if (rbMapIter != resourceBrokerMap_.end())
364  {
365  RBRecordPtr rbRecordPtr = rbMapIter->second;
366  std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
367  std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
368  rbRecordPtr->filterUnitMap.end();
369  for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
370  fuMapIter != fuMapEnd; ++fuMapIter)
371  {
372  FURecordPtr fuRecordPtr = fuMapIter->second;
373  FUResultPtr result(new FilterUnitResult(fuRecordPtr->key));
374  result->initMsgCount = fuRecordPtr->initMsgCount;
375  result->lastRunNumber = fuRecordPtr->lastRunNumber;
376  result->lastEventNumber = fuRecordPtr->lastEventNumber;
377  fuRecordPtr->shortIntervalEventSize.getStats(result->shortIntervalEventStats);
378  fuRecordPtr->mediumIntervalEventSize.getStats(result->mediumIntervalEventStats);
379  fuRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
380  fuRecordPtr->errorEventSize.getStats(result->errorEventStats);
381  fuRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
382  fuRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
383  fuRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
384  fuRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
385  fuRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
386 
387  result->outstandingDataDiscardCount =
388  result->initMsgCount +
389  result->shortIntervalEventStats.getSampleCount() +
390  result->errorEventStats.getSampleCount() +
391  result->faultyEventStats.getSampleCount() -
392  result->dataDiscardStats.getSampleCount();
393  result->outstandingDQMDiscardCount =
394  result->dqmEventStats.getSampleCount() +
395  result->faultyDQMEventStats.getSampleCount() -
396  result->dqmDiscardStats.getSampleCount();
397 
398  resultsList.push_back(result);
399  }
400  }
401 
402  return resultsList;
403  }
404 
405 
407  {
408  boost::mutex::scoped_lock sl(collectionsMutex_);
409 
410  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
411  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
412  resourceBrokerMap_.end();
413  for (rbMapIter=resourceBrokerMap_.begin(); rbMapIter!=rbMapEnd; ++rbMapIter)
414  {
415  RBRecordPtr rbRecordPtr = rbMapIter->second;
416  rbRecordPtr->eventSize.calculateStatistics();
417  rbRecordPtr->dqmEventSize.calculateStatistics();
418  rbRecordPtr->errorEventSize.calculateStatistics();
419  rbRecordPtr->faultyEventSize.calculateStatistics();
420  rbRecordPtr->faultyDQMEventSize.calculateStatistics();
421  rbRecordPtr->dataDiscardCount.calculateStatistics();
422  rbRecordPtr->dqmDiscardCount.calculateStatistics();
423  rbRecordPtr->skippedDiscardCount.calculateStatistics();
424  calcStatsForOutputModules(rbRecordPtr->outputModuleMap);
425 
426  std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
427  std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
428  rbRecordPtr->filterUnitMap.end();
429  for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
430  fuMapIter != fuMapEnd; ++fuMapIter)
431  {
432  FURecordPtr fuRecordPtr = fuMapIter->second;
433  fuRecordPtr->shortIntervalEventSize.calculateStatistics();
434  fuRecordPtr->mediumIntervalEventSize.calculateStatistics();
435  fuRecordPtr->dqmEventSize.calculateStatistics();
436  fuRecordPtr->errorEventSize.calculateStatistics();
437  fuRecordPtr->faultyEventSize.calculateStatistics();
438  fuRecordPtr->faultyDQMEventSize.calculateStatistics();
439  fuRecordPtr->dataDiscardCount.calculateStatistics();
440  fuRecordPtr->dqmDiscardCount.calculateStatistics();
441  fuRecordPtr->skippedDiscardCount.calculateStatistics();
442  calcStatsForOutputModules(fuRecordPtr->outputModuleMap);
443  }
444  }
445 
447  }
448 
449 
451  {
452  boost::mutex::scoped_lock sl(collectionsMutex_);
453 
454  connectedRBs_ = 0;
455  connectedEPs_ = 0;
456  activeEPs_ = 0;
459  faultyEvents_ = 0;
460  ignoredDiscards_ = 0;
461  resourceBrokerMap_.clear();
462  outputModuleMap_.clear();
463  }
464 
465 
467  {
468  infoSpaceItems.push_back(std::make_pair("connectedRBs", &connectedRBs_));
469  infoSpaceItems.push_back(std::make_pair("connectedEPs", &connectedEPs_));
470  infoSpaceItems.push_back(std::make_pair("activeEPs", &activeEPs_));
471  infoSpaceItems.push_back(std::make_pair("outstandingDataDiscards", &outstandingDataDiscards_));
472  infoSpaceItems.push_back(std::make_pair("outstandingDQMDiscards", &outstandingDQMDiscards_));
473  infoSpaceItems.push_back(std::make_pair("faultyEvents", &faultyEvents_));
474  infoSpaceItems.push_back(std::make_pair("ignoredDiscards", &ignoredDiscards_));
475  }
476 
477 
479  {
480  boost::mutex::scoped_lock sl(collectionsMutex_);
481 
482  connectedRBs_ = static_cast<xdata::UnsignedInteger32>(resourceBrokerMap_.size());
483 
484  uint32_t localEPCount = 0;
485  uint32_t localActiveEPCount = 0;
486  int localMissingDataDiscardCount = 0;
487  int localMissingDQMDiscardCount = 0;
488  uint32_t localFaultyEventsCount = 0;
489  uint32_t localIgnoredDiscardCount = 0;
490  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
491  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
492  resourceBrokerMap_.end();
493  for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
494  {
495  RBRecordPtr rbRecordPtr = rbMapIter->second;
496  if ( rbRecordPtr->initMsgCount > 0 )
497  localEPCount += rbRecordPtr->nExpectedEPs / rbRecordPtr->initMsgCount;
498 
499  MonitoredQuantity::Stats skippedDiscardStats;
500  rbRecordPtr->skippedDiscardCount.getStats(skippedDiscardStats);
501  localIgnoredDiscardCount += skippedDiscardStats.getSampleCount();
502 
503  MonitoredQuantity::Stats eventStats;
504  MonitoredQuantity::Stats errorEventStats;
505  MonitoredQuantity::Stats dataDiscardStats;
506  rbRecordPtr->eventSize.getStats(eventStats);
507  rbRecordPtr->errorEventSize.getStats(errorEventStats);
508  rbRecordPtr->dataDiscardCount.getStats(dataDiscardStats);
509  localMissingDataDiscardCount += rbRecordPtr->initMsgCount + eventStats.getSampleCount() +
510  errorEventStats.getSampleCount() - dataDiscardStats.getSampleCount();
511 
512  MonitoredQuantity::Stats dqmEventStats;
513  MonitoredQuantity::Stats dqmDiscardStats;
514  rbRecordPtr->dqmEventSize.getStats(dqmEventStats);
515  rbRecordPtr->dqmDiscardCount.getStats(dqmDiscardStats);
516  localMissingDQMDiscardCount += dqmEventStats.getSampleCount() -
517  dqmDiscardStats.getSampleCount();
518 
519  MonitoredQuantity::Stats faultyEventStats;
520  rbRecordPtr->faultyEventSize.getStats(faultyEventStats);
521  localFaultyEventsCount += faultyEventStats.getSampleCount();
522  MonitoredQuantity::Stats faultyDQMEventStats;
523  rbRecordPtr->faultyDQMEventSize.getStats(faultyDQMEventStats);
524  localFaultyEventsCount += faultyDQMEventStats.getSampleCount();
525 
526  std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
527  std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
528  rbRecordPtr->filterUnitMap.end();
529  for (fuMapIter = rbRecordPtr->filterUnitMap.begin(); fuMapIter != fuMapEnd; ++fuMapIter)
530  {
531  FURecordPtr fuRecordPtr = fuMapIter->second;
532  MonitoredQuantity::Stats fuMediumIntervalEventStats;
533  fuRecordPtr->mediumIntervalEventSize.getStats(fuMediumIntervalEventStats);
534  if (fuMediumIntervalEventStats.getSampleCount(MonitoredQuantity::RECENT) > 0) {
535  ++localActiveEPCount;
536  }
537  }
538  }
539  connectedEPs_ = static_cast<xdata::UnsignedInteger32>(localEPCount);
540  activeEPs_ = static_cast<xdata::UnsignedInteger32>(localActiveEPCount);
541  outstandingDataDiscards_ = static_cast<xdata::Integer32>(localMissingDataDiscardCount);
542  outstandingDQMDiscards_ = static_cast<xdata::Integer32>(localMissingDQMDiscardCount);
543  faultyEvents_ = static_cast<xdata::UnsignedInteger32>(localFaultyEventsCount);
544  ignoredDiscards_ = static_cast<xdata::UnsignedInteger32>(localIgnoredDiscardCount);
545 
546  faultyEventsAlarm(localFaultyEventsCount);
547  ignoredDiscardAlarm(localIgnoredDiscardCount);
548  }
549 
550 
551  void DataSenderMonitorCollection::faultyEventsAlarm(const uint32_t& faultyEventsCount) const
552  {
553  const std::string alarmName = "FaultyEvents";
554 
555  if (faultyEventsCount > 0)
556  {
557  std::ostringstream msg;
558  msg << "Missing or faulty I2O fragments for " <<
559  faultyEventsCount <<
560  " events. These events are lost!";
561  XCEPT_DECLARE(stor::exception::FaultyEvents, ex, msg.str());
562  alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
563  }
564  else
565  {
566  alarmHandler_->revokeAlarm(alarmName);
567  }
568  }
569 
570 
571  void DataSenderMonitorCollection::ignoredDiscardAlarm(const uint32_t& ignoredDiscardCount) const
572  {
573  const std::string alarmName = "IgnoredDiscard";
574 
575  if ( ignoredDiscardCount > 0)
576  {
577  std::ostringstream msg;
578  msg << ignoredDiscardCount <<
579  " discard messages ignored. These events might be stuck in the resource broker.";
580  XCEPT_DECLARE(stor::exception::IgnoredDiscard, ex, msg.str());
581  alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
582  }
583  else
584  {
585  alarmHandler_->revokeAlarm(alarmName);
586  }
587  }
588 
589 
591  {
592  size_t count = 0;
593  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
594  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
595  resourceBrokerMap_.end();
596  for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
597  {
598  if ( rbMapIter->second->initMsgCount > 0 )
599  count += rbMapIter->second->nExpectedEPs / rbMapIter->second->initMsgCount;
600  }
601  return count;
602  }
603 
604 
606 
608  (
609  I2OChain const& i2oChain,
610  DSMC::RBRecordPtr& rbRecordPtr,
611  DSMC::FURecordPtr& fuRecordPtr,
612  DSMC::OutModRecordPtr& topLevelOutModPtr,
613  DSMC::OutModRecordPtr& rbSpecificOutModPtr,
614  DSMC::OutModRecordPtr& fuSpecificOutModPtr
615  )
616  {
617  ResourceBrokerKey rbKey(i2oChain);
618  if (! rbKey.isValid) {return false;}
619  FilterUnitKey fuKey(i2oChain);
620  if (! fuKey.isValid) {return false;}
621  OutputModuleKey outModKey = i2oChain.outputModuleId();
622 
623  topLevelOutModPtr = getOutputModuleRecord(outputModuleMap_, outModKey);
624 
625  rbRecordPtr = getResourceBrokerRecord(rbKey);
626  rbSpecificOutModPtr = getOutputModuleRecord(
627  rbRecordPtr->outputModuleMap,
628  outModKey);
629 
630  fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
631  fuSpecificOutModPtr = getOutputModuleRecord(
632  fuRecordPtr->outputModuleMap,
633  outModKey);
634 
635  return true;
636  }
637 
638 
640  (
641  I2OChain const& i2oChain,
642  DSMC::RBRecordPtr& rbRecordPtr
643  )
644  {
645  ResourceBrokerKey rbKey(i2oChain);
646  if (! rbKey.isValid) {return false;}
647 
648  rbRecordPtr = getResourceBrokerRecord(rbKey);
649  return true;
650  }
651 
652 
654  (
655  I2OChain const& i2oChain,
656  DSMC::RBRecordPtr& rbRecordPtr,
657  DSMC::FURecordPtr& fuRecordPtr
658  )
659  {
660  FilterUnitKey fuKey(i2oChain);
661  if (! fuKey.isValid) {return false;}
662 
663  fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
664  return true;
665  }
666 
667 
670  {
671  RBRecordPtr rbRecordPtr;
673  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
674  rbMapIter = resourceBrokerMap_.find(uniqueRBID);
675  if (rbMapIter == resourceBrokerMap_.end())
676  {
677  rbRecordPtr.reset(new ResourceBrokerRecord(rbKey,updateInterval_));
678  resourceBrokerMap_[uniqueRBID] = rbRecordPtr;
679  }
680  else
681  {
682  rbRecordPtr = rbMapIter->second;
683  }
684  return rbRecordPtr;
685  }
686 
687 
690  {
691  UniqueResourceBrokerID_t uniqueID;
692  std::map<ResourceBrokerKey, UniqueResourceBrokerID_t>::const_iterator rbMapIter;
693  rbMapIter = resourceBrokerIDs_.find(rbKey);
694  if (rbMapIter == resourceBrokerIDs_.end())
695  {
696  std::string workString = rbKey.hltURL +
697  boost::lexical_cast<std::string>(rbKey.hltTid) +
698  boost::lexical_cast<std::string>(rbKey.hltInstance) +
699  boost::lexical_cast<std::string>(rbKey.hltLocalId) +
700  rbKey.hltClassName;
701  uLong crc = crc32(0L, Z_NULL, 0);
702  Bytef* crcbuf = (Bytef*) workString.data();
703  crc = crc32(crc, crcbuf, workString.length());
704  uniqueID = static_cast<UniqueResourceBrokerID_t>(crc);
705  resourceBrokerIDs_[rbKey] = uniqueID;
706  }
707  else
708  {
709  uniqueID = rbMapIter->second;
710  }
711  return uniqueID;
712  }
713 
714 
717  (
718  DSMC::RBRecordPtr& rbRecordPtr,
719  DSMC::FilterUnitKey const& fuKey
720  )
721  {
722  FURecordPtr fuRecordPtr;
723  std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
724  fuMapIter = rbRecordPtr->filterUnitMap.find(fuKey);
725  if (fuMapIter == rbRecordPtr->filterUnitMap.end())
726  {
727  fuRecordPtr.reset(new FilterUnitRecord(fuKey,updateInterval_));
728  rbRecordPtr->filterUnitMap[fuKey] = fuRecordPtr;
729  }
730  else
731  {
732  fuRecordPtr = fuMapIter->second;
733  }
734  return fuRecordPtr;
735  }
736 
737 
740  (
741  OutputModuleRecordMap& outModMap,
742  DSMC::OutputModuleKey const& outModKey
743  )
744  {
745  OutModRecordPtr outModRecordPtr;
746  OutputModuleRecordMap::const_iterator omMapIter;
747  omMapIter = outModMap.find(outModKey);
748  if (omMapIter == outModMap.end())
749  {
750  outModRecordPtr.reset(new OutputModuleRecord(updateInterval_));
751 
752  outModRecordPtr->name = "Unknown";
753  outModRecordPtr->id = outModKey;
754  outModRecordPtr->initMsgSize = 0;
755 
756  outModMap[outModKey] = outModRecordPtr;
757  }
758  else
759  {
760  outModRecordPtr = omMapIter->second;
761  }
762  return outModRecordPtr;
763  }
764 
765 
768  {
769  OutputModuleResultsList resultsList;
770 
771  OutputModuleRecordMap::const_iterator omMapIter;
772  OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
773  for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
774  {
775  OutModRecordPtr outModRecordPtr = omMapIter->second;
776  boost::shared_ptr<OutputModuleResult> result(new OutputModuleResult());
777  result->name = outModRecordPtr->name;
778  result->id = outModRecordPtr->id;
779  result->initMsgSize = outModRecordPtr->initMsgSize;
780  outModRecordPtr->eventSize.getStats(result->eventStats);
781  resultsList.push_back(result);
782  }
783 
784  return resultsList;
785  }
786 
787 
790  {
791  RBResultPtr result(new ResourceBrokerResult(rbRecordPtr->key));
792 
793  result->filterUnitCount = rbRecordPtr->initMsgCount>0 ? rbRecordPtr->nExpectedEPs / rbRecordPtr->initMsgCount : 0;
794  result->initMsgCount = rbRecordPtr->initMsgCount;
795  result->lastRunNumber = rbRecordPtr->lastRunNumber;
796  result->lastEventNumber = rbRecordPtr->lastEventNumber;
797  rbRecordPtr->eventSize.getStats(result->eventStats);
798  rbRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
799  rbRecordPtr->errorEventSize.getStats(result->errorEventStats);
800  rbRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
801  rbRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
802  rbRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
803  rbRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
804  rbRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
805 
806  result->outstandingDataDiscardCount =
807  result->initMsgCount +
808  result->eventStats.getSampleCount() +
809  result->errorEventStats.getSampleCount() +
810  result->faultyEventStats.getSampleCount() -
811  result->dataDiscardStats.getSampleCount();
812  result->outstandingDQMDiscardCount =
813  result->dqmEventStats.getSampleCount() +
814  result->faultyDQMEventStats.getSampleCount() -
815  result->dqmDiscardStats.getSampleCount();
816 
817  return result;
818  }
819 
820 
822  {
823  OutputModuleRecordMap::const_iterator omMapIter;
824  OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
825  for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
826  {
827  OutModRecordPtr outModRecordPtr = omMapIter->second;
828 
829  outModRecordPtr->fragmentSize.calculateStatistics();
830  outModRecordPtr->eventSize.calculateStatistics();
831  }
832  }
833 
834 
836  (
838  DSMC::RBResultPtr secondValue
839  )
840  {
841  return *firstValue < *secondValue;
842  }
843 
844 } // namespace stor
845 
846 
uint32_t runNumber() const
Definition: I2OChain.cc:585
RBRecordPtr getResourceBrokerRecord(ResourceBrokerKey const &)
DataSenderMonitorCollection(const utils::Duration_t &updateInterval, AlarmHandlerPtr)
std::map< UniqueResourceBrokerID_t, RBRecordPtr > resourceBrokerMap_
OutputModuleResultsList getTopLevelOutputModuleResults() const
boost::shared_ptr< FilterUnitRecord > FURecordPtr
void calcStatsForOutputModules(OutputModuleRecordMap &outputModuleMap)
RBResultPtr getOneResourceBrokerResult(UniqueResourceBrokerID_t) const
std::map< OutputModuleKey, OutModRecordPtr > OutputModuleRecordMap
bool complete() const
Definition: I2OChain.cc:118
uint64_t getSampleCount(DataSetType t=FULL) const
virtual void do_appendInfoSpaceItems(InfoSpaceItems &)
boost::shared_ptr< ResourceBrokerRecord > RBRecordPtr
std::map< ResourceBrokerKey, UniqueResourceBrokerID_t > resourceBrokerIDs_
UniqueResourceBrokerID_t getUniqueResourceBrokerID(ResourceBrokerKey const &)
unsigned int messageCode() const
Definition: I2OChain.cc:218
FilterUnitResultsList getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
void ignoredDiscardAlarm(const uint32_t &) const
uint32_t nExpectedEPs() const
Definition: I2OChain.cc:515
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
bool getAllNeededPointers(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr, FURecordPtr &fuRecordPtr, OutModRecordPtr &topLevelOutModPtr, OutModRecordPtr &rbSpecificOutModPtr, OutModRecordPtr &fuSpecificOutModPtr)
bool getFURecordPointer(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr, FURecordPtr &fuRecordPtr)
tuple result
Definition: query.py:137
std::string outputModuleLabel() const
Definition: I2OChain.cc:495
bool getRBRecordPointer(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr)
boost::shared_ptr< AlarmHandler > AlarmHandlerPtr
Definition: AlarmHandler.h:116
boost::shared_ptr< OutputModuleRecord > OutModRecordPtr
OutModRecordPtr getOutputModuleRecord(OutputModuleRecordMap &, OutputModuleKey const &)
std::vector< boost::shared_ptr< OutputModuleResult > > OutputModuleResultsList
OutputModuleResultsList buildOutputModuleResults(OutputModuleRecordMap const &) const
boost::shared_ptr< FilterUnitResult > FUResultPtr
std::vector< RBResultPtr > ResourceBrokerResultsList
std::vector< std::pair< std::string, xdata::Serializable * > > InfoSpaceItems
void faultyEventsAlarm(const uint32_t &) const
unsigned long totalDataSize() const
Definition: I2OChain.cc:432
boost::shared_ptr< ResourceBrokerResult > RBResultPtr
bool compareRBResultPtrValues(DataSenderMonitorCollection::RBResultPtr firstValue, DataSenderMonitorCollection::RBResultPtr secondValue)
ResourceBrokerResultsList getAllResourceBrokerResults() const
uint32_t eventNumber() const
Definition: I2OChain.cc:605
OutputModuleResultsList getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
RBResultPtr buildResourceBrokerResult(RBRecordPtr const &) const
DataSenderMonitorCollection DSMC
std::vector< FUResultPtr > FilterUnitResultsList
FURecordPtr getFilterUnitRecord(RBRecordPtr &, FilterUnitKey const &)
uint32_t outputModuleId() const
Definition: I2OChain.cc:505