CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DataSenderMonitorCollection.cc
Go to the documentation of this file.
1 // $Id: DataSenderMonitorCollection.cc,v 1.17 2011/03/07 15:31:32 mommsen Exp $
3 
4 #include <string>
5 #include <sstream>
6 #include <iomanip>
7 
8 #include <zlib.h>
9 #include <boost/lexical_cast.hpp>
10 
13 
14 
15 namespace stor {
16 
18  (
19  const utils::Duration_t& updateInterval,
21  ) :
22  MonitorCollection(updateInterval),
23  connectedRBs_(0),
24  connectedEPs_(0),
25  activeEPs_(0),
26  outstandingDataDiscards_(0),
27  outstandingDQMDiscards_(0),
28  faultyEvents_(0),
29  ignoredDiscards_(0),
30  updateInterval_(updateInterval),
31  alarmHandler_(ah)
32  {}
33 
34 
36  {
37  // focus only on INIT and event fragments, for now
38  if (i2oChain.messageCode() != Header::INIT &&
39  i2oChain.messageCode() != Header::EVENT) {return;}
40  if (i2oChain.fragmentCount() != 1) {return;}
41 
42  // fetch basic data from the I2OChain
43  //double fragmentSize = static_cast<double>(i2oChain.totalDataSize());
44 
45  // look up the monitoring records that we need
46  bool pointersAreValid;
47  RBRecordPtr rbRecordPtr;
48  FURecordPtr fuRecordPtr;
49  OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
50  {
51  boost::mutex::scoped_lock sl(collectionsMutex_);
52  pointersAreValid = getAllNeededPointers(
53  i2oChain, rbRecordPtr, fuRecordPtr,
54  topLevelOutModPtr, rbSpecificOutModPtr,
55  fuSpecificOutModPtr);
56  }
57 
58  // accumulate the data of interest
59  //if (pointersAreValid)
60  //{
61  //topLevelOutModPtr->fragmentSize.addSample(fragmentSize);
62  //rbSpecificOutModPtr->fragmentSize.addSample(fragmentSize);
63  //fuSpecificOutModPtr->fragmentSize.addSample(fragmentSize);
64  //}
65  }
66 
67 
69  {
70  // sanity checks
71  if (i2oChain.messageCode() != Header::INIT) {return;}
72  if (! i2oChain.complete()) {return;}
73 
74  // fetch basic data from the I2OChain
75  std::string outModName = i2oChain.outputModuleLabel();
76  uint32_t msgSize = i2oChain.totalDataSize();
77 
78  // look up the monitoring records that we need
79  bool pointersAreValid;
80  RBRecordPtr rbRecordPtr;
81  FURecordPtr fuRecordPtr;
82  OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
83  {
84  boost::mutex::scoped_lock sl(collectionsMutex_);
85  pointersAreValid = getAllNeededPointers(
86  i2oChain, rbRecordPtr, fuRecordPtr,
87  topLevelOutModPtr, rbSpecificOutModPtr,
88  fuSpecificOutModPtr);
89  }
90 
91  // accumulate the data of interest
92  if (pointersAreValid)
93  {
94  topLevelOutModPtr->name = outModName;
95  topLevelOutModPtr->initMsgSize = msgSize;
96 
97  ++rbRecordPtr->initMsgCount;
98  rbSpecificOutModPtr->name = outModName;
99  rbSpecificOutModPtr->initMsgSize = msgSize;
100 
101  ++fuRecordPtr->initMsgCount;
102  fuSpecificOutModPtr->name = outModName;
103  fuSpecificOutModPtr->initMsgSize = msgSize;
104  }
105  }
106 
107 
109  {
110  // sanity checks
111  if (i2oChain.messageCode() != Header::EVENT) {return;}
112  if (! i2oChain.complete()) {return;}
113 
114  // fetch basic data from the I2OChain
115  double eventSize = static_cast<double>(i2oChain.totalDataSize());
116  uint32_t runNumber = i2oChain.runNumber();
117  uint32_t eventNumber = i2oChain.eventNumber();
118 
119  // look up the monitoring records that we need
120  bool pointersAreValid;
121  RBRecordPtr rbRecordPtr;
122  FURecordPtr fuRecordPtr;
123  OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
124  {
125  boost::mutex::scoped_lock sl(collectionsMutex_);
126  pointersAreValid = getAllNeededPointers(
127  i2oChain, rbRecordPtr, fuRecordPtr,
128  topLevelOutModPtr, rbSpecificOutModPtr,
129  fuSpecificOutModPtr);
130  }
131 
132  // accumulate the data of interest
133  if (pointersAreValid)
134  {
135  topLevelOutModPtr->eventSize.addSample(eventSize);
136 
137  rbRecordPtr->lastRunNumber = runNumber;
138  rbRecordPtr->lastEventNumber = eventNumber;
139  rbRecordPtr->eventSize.addSample(eventSize);
140  rbSpecificOutModPtr->eventSize.addSample(eventSize);
141 
142  fuRecordPtr->lastRunNumber = runNumber;
143  fuRecordPtr->lastEventNumber = eventNumber;
144  fuRecordPtr->shortIntervalEventSize.addSample(eventSize);
145  fuRecordPtr->mediumIntervalEventSize.addSample(eventSize);
146  fuSpecificOutModPtr->eventSize.addSample(eventSize);
147  }
148  }
149 
150 
152  {
153  // sanity checks
154  if (i2oChain.messageCode() != Header::DQM_EVENT) {return;}
155  if (! i2oChain.complete()) {return;}
156 
157  // fetch basic data from the I2OChain
158  double eventSize = static_cast<double>(i2oChain.totalDataSize());
159 
160  // look up the monitoring records that we need
161  bool pointersAreValid;
162  RBRecordPtr rbRecordPtr;
163  FURecordPtr fuRecordPtr;
164  {
165  boost::mutex::scoped_lock sl(collectionsMutex_);
166  pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
167  if (pointersAreValid)
168  {
169  pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
170  }
171  }
172 
173  // accumulate the data of interest
174  if (pointersAreValid)
175  {
176  rbRecordPtr->dqmEventSize.addSample(eventSize);
177  fuRecordPtr->dqmEventSize.addSample(eventSize);
178  }
179  }
180 
181 
183  {
184  // sanity checks
185  if (i2oChain.messageCode() != Header::ERROR_EVENT) {return;}
186  if (! i2oChain.complete()) {return;}
187 
188  // fetch basic data from the I2OChain
189  double eventSize = static_cast<double>(i2oChain.totalDataSize());
190 
191  // look up the monitoring records that we need
192  bool pointersAreValid;
193  RBRecordPtr rbRecordPtr;
194  FURecordPtr fuRecordPtr;
195  {
196  boost::mutex::scoped_lock sl(collectionsMutex_);
197  pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
198  if (pointersAreValid)
199  {
200  pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
201  }
202  }
203 
204  // accumulate the data of interest
205  if (pointersAreValid)
206  {
207  rbRecordPtr->errorEventSize.addSample(eventSize);
208  fuRecordPtr->errorEventSize.addSample(eventSize);
209  }
210  }
211 
212 
214  {
215  // fetch basic data from the I2OChain
216  double eventSize = static_cast<double>(i2oChain.totalDataSize());
217 
218  // look up the monitoring records that we need
219  bool pointersAreValid;
220  RBRecordPtr rbRecordPtr;
221  FURecordPtr fuRecordPtr;
222  {
223  boost::mutex::scoped_lock sl(collectionsMutex_);
224  pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
225  if (pointersAreValid)
226  {
227  pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
228  }
229  }
230 
231  // accumulate the data of interest
232  if (pointersAreValid)
233  {
234  if (i2oChain.messageCode() == Header::DQM_EVENT)
235  {
236  rbRecordPtr->faultyDQMEventSize.addSample(eventSize);
237  fuRecordPtr->faultyDQMEventSize.addSample(eventSize);
238  }
239  else
240  {
241  rbRecordPtr->faultyEventSize.addSample(eventSize);
242  fuRecordPtr->faultyEventSize.addSample(eventSize);
243  }
244  }
245  }
246 
247 
249  {
250  // look up the monitoring records that we need
251  bool pointersAreValid;
252  RBRecordPtr rbRecordPtr;
253  FURecordPtr fuRecordPtr;
254  {
255  boost::mutex::scoped_lock sl(collectionsMutex_);
256  pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
257  if (pointersAreValid)
258  {
259  pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
260  }
261  }
262 
263  // accumulate the data of interest
264  if (pointersAreValid)
265  {
266  rbRecordPtr->dataDiscardCount.addSample(1);
267  fuRecordPtr->dataDiscardCount.addSample(1);
268  }
269  }
270 
271 
273  {
274  // look up the monitoring records that we need
275  bool pointersAreValid;
276  RBRecordPtr rbRecordPtr;
277  FURecordPtr fuRecordPtr;
278  {
279  boost::mutex::scoped_lock sl(collectionsMutex_);
280  pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
281  if (pointersAreValid)
282  {
283  pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
284  }
285  }
286 
287  // accumulate the data of interest
288  if (pointersAreValid)
289  {
290  rbRecordPtr->dqmDiscardCount.addSample(1);
291  fuRecordPtr->dqmDiscardCount.addSample(1);
292  }
293  }
294 
295 
297  {
298  // look up the monitoring records that we need
299  bool pointersAreValid;
300  RBRecordPtr rbRecordPtr;
301  FURecordPtr fuRecordPtr;
302  {
303  boost::mutex::scoped_lock sl(collectionsMutex_);
304  pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
305  if (pointersAreValid)
306  {
307  pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
308  }
309  }
310 
311  // accumulate the data of interest
312  if (pointersAreValid)
313  {
314  rbRecordPtr->skippedDiscardCount.addSample(1);
315  fuRecordPtr->skippedDiscardCount.addSample(1);
316  }
317  }
318 
319 
322  {
323  boost::mutex::scoped_lock sl(collectionsMutex_);
324 
326  }
327 
328 
331  {
332  boost::mutex::scoped_lock sl(collectionsMutex_);
333  ResourceBrokerResultsList resultsList;
334 
335  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
336  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
337  resourceBrokerMap_.end();
338  for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
339  {
340  RBRecordPtr rbRecordPtr = rbMapIter->second;
342  result->uniqueRBID = rbMapIter->first;
343  resultsList.push_back(result);
344  }
345 
346  return resultsList;
347  }
348 
349 
352  {
353  boost::mutex::scoped_lock sl(collectionsMutex_);
355 
356  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
357  rbMapIter = resourceBrokerMap_.find(uniqueRBID);
358  if (rbMapIter != resourceBrokerMap_.end())
359  {
360  RBRecordPtr rbRecordPtr = rbMapIter->second;
361  result = buildResourceBrokerResult(rbRecordPtr);
362  result->uniqueRBID = rbMapIter->first;
363  }
364 
365  return result;
366  }
367 
368 
371  {
372  boost::mutex::scoped_lock sl(collectionsMutex_);
373  OutputModuleResultsList resultsList;
374 
375  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
376  rbMapIter = resourceBrokerMap_.find(uniqueRBID);
377  if (rbMapIter != resourceBrokerMap_.end())
378  {
379  RBRecordPtr rbRecordPtr = rbMapIter->second;
380  resultsList = buildOutputModuleResults(rbRecordPtr->outputModuleMap);
381  }
382 
383  return resultsList;
384  }
385 
386 
389  {
390  boost::mutex::scoped_lock sl(collectionsMutex_);
391  FilterUnitResultsList resultsList;
392 
393  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
394  rbMapIter = resourceBrokerMap_.find(uniqueRBID);
395  if (rbMapIter != resourceBrokerMap_.end())
396  {
397  RBRecordPtr rbRecordPtr = rbMapIter->second;
398  std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
399  std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
400  rbRecordPtr->filterUnitMap.end();
401  for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
402  fuMapIter != fuMapEnd; ++fuMapIter)
403  {
404  FURecordPtr fuRecordPtr = fuMapIter->second;
405  FUResultPtr result(new FilterUnitResult(fuRecordPtr->key));
406  result->initMsgCount = fuRecordPtr->initMsgCount;
407  result->lastRunNumber = fuRecordPtr->lastRunNumber;
408  result->lastEventNumber = fuRecordPtr->lastEventNumber;
409  fuRecordPtr->shortIntervalEventSize.getStats(result->shortIntervalEventStats);
410  fuRecordPtr->mediumIntervalEventSize.getStats(result->mediumIntervalEventStats);
411  fuRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
412  fuRecordPtr->errorEventSize.getStats(result->errorEventStats);
413  fuRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
414  fuRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
415  fuRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
416  fuRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
417  fuRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
418 
419  result->outstandingDataDiscardCount =
420  result->initMsgCount +
421  result->shortIntervalEventStats.getSampleCount() +
422  result->errorEventStats.getSampleCount() +
423  result->faultyEventStats.getSampleCount() -
424  result->dataDiscardStats.getSampleCount();
425  result->outstandingDQMDiscardCount =
426  result->dqmEventStats.getSampleCount() +
427  result->faultyDQMEventStats.getSampleCount() -
428  result->dqmDiscardStats.getSampleCount();
429 
430  resultsList.push_back(result);
431  }
432  }
433 
434  return resultsList;
435  }
436 
437 
439  {
440  boost::mutex::scoped_lock sl(collectionsMutex_);
441 
442  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
443  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
444  resourceBrokerMap_.end();
445  for (rbMapIter=resourceBrokerMap_.begin(); rbMapIter!=rbMapEnd; ++rbMapIter)
446  {
447  RBRecordPtr rbRecordPtr = rbMapIter->second;
448  rbRecordPtr->eventSize.calculateStatistics();
449  rbRecordPtr->dqmEventSize.calculateStatistics();
450  rbRecordPtr->errorEventSize.calculateStatistics();
451  rbRecordPtr->faultyEventSize.calculateStatistics();
452  rbRecordPtr->faultyDQMEventSize.calculateStatistics();
453  rbRecordPtr->dataDiscardCount.calculateStatistics();
454  rbRecordPtr->dqmDiscardCount.calculateStatistics();
455  rbRecordPtr->skippedDiscardCount.calculateStatistics();
456  calcStatsForOutputModules(rbRecordPtr->outputModuleMap);
457 
458  std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
459  std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
460  rbRecordPtr->filterUnitMap.end();
461  for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
462  fuMapIter != fuMapEnd; ++fuMapIter)
463  {
464  FURecordPtr fuRecordPtr = fuMapIter->second;
465  fuRecordPtr->shortIntervalEventSize.calculateStatistics();
466  fuRecordPtr->mediumIntervalEventSize.calculateStatistics();
467  fuRecordPtr->dqmEventSize.calculateStatistics();
468  fuRecordPtr->errorEventSize.calculateStatistics();
469  fuRecordPtr->faultyEventSize.calculateStatistics();
470  fuRecordPtr->faultyDQMEventSize.calculateStatistics();
471  fuRecordPtr->dataDiscardCount.calculateStatistics();
472  fuRecordPtr->dqmDiscardCount.calculateStatistics();
473  fuRecordPtr->skippedDiscardCount.calculateStatistics();
474  calcStatsForOutputModules(fuRecordPtr->outputModuleMap);
475  }
476  }
477 
479  }
480 
481 
483  {
484  boost::mutex::scoped_lock sl(collectionsMutex_);
485 
486  connectedRBs_ = 0;
487  connectedEPs_ = 0;
488  activeEPs_ = 0;
491  faultyEvents_ = 0;
492  ignoredDiscards_ = 0;
493  resourceBrokerMap_.clear();
494  outputModuleMap_.clear();
495  }
496 
497 
499  {
500  infoSpaceItems.push_back(std::make_pair("connectedRBs", &connectedRBs_));
501  infoSpaceItems.push_back(std::make_pair("connectedEPs", &connectedEPs_));
502  infoSpaceItems.push_back(std::make_pair("activeEPs", &activeEPs_));
503  infoSpaceItems.push_back(std::make_pair("outstandingDataDiscards", &outstandingDataDiscards_));
504  infoSpaceItems.push_back(std::make_pair("outstandingDQMDiscards", &outstandingDQMDiscards_));
505  infoSpaceItems.push_back(std::make_pair("faultyEvents", &faultyEvents_));
506  infoSpaceItems.push_back(std::make_pair("ignoredDiscards", &ignoredDiscards_));
507  }
508 
509 
511  {
512  boost::mutex::scoped_lock sl(collectionsMutex_);
513 
514  connectedRBs_ = static_cast<xdata::UnsignedInteger32>(resourceBrokerMap_.size());
515 
516  uint32_t localEPCount = 0;
517  uint32_t localActiveEPCount = 0;
518  int localMissingDataDiscardCount = 0;
519  int localMissingDQMDiscardCount = 0;
520  uint32_t localFaultyEventsCount = 0;
521  uint32_t localIgnoredDiscardCount = 0;
522  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
523  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
524  resourceBrokerMap_.end();
525  for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
526  {
527  RBRecordPtr rbRecordPtr = rbMapIter->second;
528  localEPCount += rbRecordPtr->filterUnitMap.size();
529 
530  MonitoredQuantity::Stats skippedDiscardStats;
531  rbRecordPtr->skippedDiscardCount.getStats(skippedDiscardStats);
532  localIgnoredDiscardCount += skippedDiscardStats.getSampleCount();
533 
534  MonitoredQuantity::Stats eventStats;
535  MonitoredQuantity::Stats errorEventStats;
536  MonitoredQuantity::Stats dataDiscardStats;
537  rbRecordPtr->eventSize.getStats(eventStats);
538  rbRecordPtr->errorEventSize.getStats(errorEventStats);
539  rbRecordPtr->dataDiscardCount.getStats(dataDiscardStats);
540  localMissingDataDiscardCount += rbRecordPtr->initMsgCount + eventStats.getSampleCount() +
541  errorEventStats.getSampleCount() - dataDiscardStats.getSampleCount();
542  localEPCount -= errorEventStats.getSampleCount();
543 
544  MonitoredQuantity::Stats dqmEventStats;
545  MonitoredQuantity::Stats dqmDiscardStats;
546  rbRecordPtr->dqmEventSize.getStats(dqmEventStats);
547  rbRecordPtr->dqmDiscardCount.getStats(dqmDiscardStats);
548  localMissingDQMDiscardCount += dqmEventStats.getSampleCount() -
549  dqmDiscardStats.getSampleCount();
550 
551  MonitoredQuantity::Stats faultyEventStats;
552  rbRecordPtr->faultyEventSize.getStats(faultyEventStats);
553  localFaultyEventsCount += faultyEventStats.getSampleCount();
554  MonitoredQuantity::Stats faultyDQMEventStats;
555  rbRecordPtr->faultyDQMEventSize.getStats(faultyDQMEventStats);
556  localFaultyEventsCount += faultyDQMEventStats.getSampleCount();
557 
558  std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
559  std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
560  rbRecordPtr->filterUnitMap.end();
561  for (fuMapIter = rbRecordPtr->filterUnitMap.begin(); fuMapIter != fuMapEnd; ++fuMapIter)
562  {
563  FURecordPtr fuRecordPtr = fuMapIter->second;
564  MonitoredQuantity::Stats fuMediumIntervalEventStats;
565  fuRecordPtr->mediumIntervalEventSize.getStats(fuMediumIntervalEventStats);
566  if (fuMediumIntervalEventStats.getSampleCount(MonitoredQuantity::RECENT) > 0) {
567  ++localActiveEPCount;
568  }
569  }
570  }
571  connectedEPs_ = static_cast<xdata::UnsignedInteger32>(localEPCount);
572  activeEPs_ = static_cast<xdata::UnsignedInteger32>(localActiveEPCount);
573  outstandingDataDiscards_ = static_cast<xdata::Integer32>(localMissingDataDiscardCount);
574  outstandingDQMDiscards_ = static_cast<xdata::Integer32>(localMissingDQMDiscardCount);
575  faultyEvents_ = static_cast<xdata::UnsignedInteger32>(localFaultyEventsCount);
576  ignoredDiscards_ = static_cast<xdata::UnsignedInteger32>(localIgnoredDiscardCount);
577 
578  faultyEventsAlarm(localFaultyEventsCount);
579  ignoredDiscardAlarm(localIgnoredDiscardCount);
580  }
581 
582 
583  void DataSenderMonitorCollection::faultyEventsAlarm(const uint32_t& faultyEventsCount) const
584  {
585  const std::string alarmName = "FaultyEvents";
586 
587  if (faultyEventsCount > 0)
588  {
589  std::ostringstream msg;
590  msg << "Missing or faulty I2O fragments for " <<
591  faultyEventsCount <<
592  " events. These events are lost!";
593  XCEPT_DECLARE(stor::exception::FaultyEvents, ex, msg.str());
594  alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
595  }
596  else
597  {
598  alarmHandler_->revokeAlarm(alarmName);
599  }
600  }
601 
602 
603  void DataSenderMonitorCollection::ignoredDiscardAlarm(const uint32_t& ignoredDiscardCount) const
604  {
605  const std::string alarmName = "IgnoredDiscard";
606 
607  if ( ignoredDiscardCount > 0)
608  {
609  std::ostringstream msg;
610  msg << ignoredDiscardCount <<
611  " discard messages ignored. These events might be stuck in the resource broker.";
612  XCEPT_DECLARE(stor::exception::IgnoredDiscard, ex, msg.str());
613  alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
614  }
615  else
616  {
617  alarmHandler_->revokeAlarm(alarmName);
618  }
619  }
620 
621 
623 
625  (
626  I2OChain const& i2oChain,
627  DSMC::RBRecordPtr& rbRecordPtr,
628  DSMC::FURecordPtr& fuRecordPtr,
629  DSMC::OutModRecordPtr& topLevelOutModPtr,
630  DSMC::OutModRecordPtr& rbSpecificOutModPtr,
631  DSMC::OutModRecordPtr& fuSpecificOutModPtr
632  )
633  {
634  ResourceBrokerKey rbKey(i2oChain);
635  if (! rbKey.isValid) {return false;}
636  FilterUnitKey fuKey(i2oChain);
637  if (! fuKey.isValid) {return false;}
638  OutputModuleKey outModKey = i2oChain.outputModuleId();
639 
640  topLevelOutModPtr = getOutputModuleRecord(outputModuleMap_, outModKey);
641 
642  rbRecordPtr = getResourceBrokerRecord(rbKey);
643  rbSpecificOutModPtr = getOutputModuleRecord(
644  rbRecordPtr->outputModuleMap,
645  outModKey);
646 
647  fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
648  fuSpecificOutModPtr = getOutputModuleRecord(
649  fuRecordPtr->outputModuleMap,
650  outModKey);
651 
652  return true;
653  }
654 
655 
657  (
658  I2OChain const& i2oChain,
659  DSMC::RBRecordPtr& rbRecordPtr
660  )
661  {
662  ResourceBrokerKey rbKey(i2oChain);
663  if (! rbKey.isValid) {return false;}
664 
665  rbRecordPtr = getResourceBrokerRecord(rbKey);
666  return true;
667  }
668 
669 
671  (
672  I2OChain const& i2oChain,
673  DSMC::RBRecordPtr& rbRecordPtr,
674  DSMC::FURecordPtr& fuRecordPtr
675  )
676  {
677  FilterUnitKey fuKey(i2oChain);
678  if (! fuKey.isValid) {return false;}
679 
680  fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
681  return true;
682  }
683 
684 
687  {
688  RBRecordPtr rbRecordPtr;
690  std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
691  rbMapIter = resourceBrokerMap_.find(uniqueRBID);
692  if (rbMapIter == resourceBrokerMap_.end())
693  {
694  rbRecordPtr.reset(new ResourceBrokerRecord(rbKey,updateInterval_));
695  resourceBrokerMap_[uniqueRBID] = rbRecordPtr;
696  }
697  else
698  {
699  rbRecordPtr = rbMapIter->second;
700  }
701  return rbRecordPtr;
702  }
703 
704 
707  {
708  UniqueResourceBrokerID_t uniqueID;
709  std::map<ResourceBrokerKey, UniqueResourceBrokerID_t>::const_iterator rbMapIter;
710  rbMapIter = resourceBrokerIDs_.find(rbKey);
711  if (rbMapIter == resourceBrokerIDs_.end())
712  {
713  std::string workString = rbKey.hltURL +
714  boost::lexical_cast<std::string>(rbKey.hltTid) +
715  boost::lexical_cast<std::string>(rbKey.hltInstance) +
716  boost::lexical_cast<std::string>(rbKey.hltLocalId) +
717  rbKey.hltClassName;
718  uLong crc = crc32(0L, Z_NULL, 0);
719  Bytef* crcbuf = (Bytef*) workString.data();
720  crc = crc32(crc, crcbuf, workString.length());
721  uniqueID = static_cast<UniqueResourceBrokerID_t>(crc);
722  resourceBrokerIDs_[rbKey] = uniqueID;
723  }
724  else
725  {
726  uniqueID = rbMapIter->second;
727  }
728  return uniqueID;
729  }
730 
731 
734  (
735  DSMC::RBRecordPtr& rbRecordPtr,
736  DSMC::FilterUnitKey const& fuKey
737  )
738  {
739  FURecordPtr fuRecordPtr;
740  std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
741  fuMapIter = rbRecordPtr->filterUnitMap.find(fuKey);
742  if (fuMapIter == rbRecordPtr->filterUnitMap.end())
743  {
744  fuRecordPtr.reset(new FilterUnitRecord(fuKey,updateInterval_));
745  rbRecordPtr->filterUnitMap[fuKey] = fuRecordPtr;
746  }
747  else
748  {
749  fuRecordPtr = fuMapIter->second;
750  }
751  return fuRecordPtr;
752  }
753 
754 
757  (
758  OutputModuleRecordMap& outModMap,
759  DSMC::OutputModuleKey const& outModKey
760  )
761  {
762  OutModRecordPtr outModRecordPtr;
763  OutputModuleRecordMap::const_iterator omMapIter;
764  omMapIter = outModMap.find(outModKey);
765  if (omMapIter == outModMap.end())
766  {
767  outModRecordPtr.reset(new OutputModuleRecord(updateInterval_));
768 
769  outModRecordPtr->name = "Unknown";
770  outModRecordPtr->id = outModKey;
771  outModRecordPtr->initMsgSize = 0;
772 
773  outModMap[outModKey] = outModRecordPtr;
774  }
775  else
776  {
777  outModRecordPtr = omMapIter->second;
778  }
779  return outModRecordPtr;
780  }
781 
782 
785  {
786  OutputModuleResultsList resultsList;
787 
788  OutputModuleRecordMap::const_iterator omMapIter;
789  OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
790  for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
791  {
792  OutModRecordPtr outModRecordPtr = omMapIter->second;
793  boost::shared_ptr<OutputModuleResult> result(new OutputModuleResult());
794  result->name = outModRecordPtr->name;
795  result->id = outModRecordPtr->id;
796  result->initMsgSize = outModRecordPtr->initMsgSize;
797  outModRecordPtr->eventSize.getStats(result->eventStats);
798  resultsList.push_back(result);
799  }
800 
801  return resultsList;
802  }
803 
804 
807  {
808  RBResultPtr result(new ResourceBrokerResult(rbRecordPtr->key));
809 
810  result->filterUnitCount = rbRecordPtr->filterUnitMap.size();
811  result->initMsgCount = rbRecordPtr->initMsgCount;
812  result->lastRunNumber = rbRecordPtr->lastRunNumber;
813  result->lastEventNumber = rbRecordPtr->lastEventNumber;
814  rbRecordPtr->eventSize.getStats(result->eventStats);
815  rbRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
816  rbRecordPtr->errorEventSize.getStats(result->errorEventStats);
817  rbRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
818  rbRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
819  rbRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
820  rbRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
821  rbRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
822 
823  result->outstandingDataDiscardCount =
824  result->initMsgCount +
825  result->eventStats.getSampleCount() +
826  result->errorEventStats.getSampleCount() +
827  result->faultyEventStats.getSampleCount() -
828  result->dataDiscardStats.getSampleCount();
829  result->outstandingDQMDiscardCount =
830  result->dqmEventStats.getSampleCount() +
831  result->faultyDQMEventStats.getSampleCount() -
832  result->dqmDiscardStats.getSampleCount();
833 
834  return result;
835  }
836 
837 
839  {
840  OutputModuleRecordMap::const_iterator omMapIter;
841  OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
842  for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
843  {
844  OutModRecordPtr outModRecordPtr = omMapIter->second;
845 
846  //outModRecordPtr->fragmentSize.calculateStatistics();
847  outModRecordPtr->eventSize.calculateStatistics();
848  }
849  }
850 
851 
853  (
855  DSMC::RBResultPtr secondValue
856  )
857  {
858  return *firstValue < *secondValue;
859  }
860 
861 } // namespace stor
862 
863 
unsigned int fragmentCount() const
Definition: I2OChain.cc:284
uint32_t runNumber() const
Definition: I2OChain.cc:575
RBRecordPtr getResourceBrokerRecord(ResourceBrokerKey const &)
DataSenderMonitorCollection(const utils::Duration_t &updateInterval, AlarmHandlerPtr)
std::map< UniqueResourceBrokerID_t, RBRecordPtr > resourceBrokerMap_
OutputModuleResultsList getTopLevelOutputModuleResults() const
boost::shared_ptr< FilterUnitRecord > FURecordPtr
void calcStatsForOutputModules(OutputModuleRecordMap &outputModuleMap)
RBResultPtr getOneResourceBrokerResult(UniqueResourceBrokerID_t) const
std::map< OutputModuleKey, OutModRecordPtr > OutputModuleRecordMap
bool complete() const
Definition: I2OChain.cc:118
uint64_t getSampleCount(DataSetType t=FULL) const
virtual void do_appendInfoSpaceItems(InfoSpaceItems &)
boost::shared_ptr< ResourceBrokerRecord > RBRecordPtr
std::map< ResourceBrokerKey, UniqueResourceBrokerID_t > resourceBrokerIDs_
UniqueResourceBrokerID_t getUniqueResourceBrokerID(ResourceBrokerKey const &)
unsigned int messageCode() const
Definition: I2OChain.cc:218
FilterUnitResultsList getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
void ignoredDiscardAlarm(const uint32_t &) const
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
bool getAllNeededPointers(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr, FURecordPtr &fuRecordPtr, OutModRecordPtr &topLevelOutModPtr, OutModRecordPtr &rbSpecificOutModPtr, OutModRecordPtr &fuSpecificOutModPtr)
bool getFURecordPointer(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr, FURecordPtr &fuRecordPtr)
tuple result
Definition: query.py:137
std::string outputModuleLabel() const
Definition: I2OChain.cc:475
bool getRBRecordPointer(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr)
boost::shared_ptr< AlarmHandler > AlarmHandlerPtr
Definition: AlarmHandler.h:91
boost::shared_ptr< OutputModuleRecord > OutModRecordPtr
OutModRecordPtr getOutputModuleRecord(OutputModuleRecordMap &, OutputModuleKey const &)
std::vector< boost::shared_ptr< OutputModuleResult > > OutputModuleResultsList
OutputModuleResultsList buildOutputModuleResults(OutputModuleRecordMap const &) const
boost::shared_ptr< FilterUnitResult > FUResultPtr
std::vector< RBResultPtr > ResourceBrokerResultsList
std::vector< std::pair< std::string, xdata::Serializable * > > InfoSpaceItems
void faultyEventsAlarm(const uint32_t &) const
unsigned long totalDataSize() const
Definition: I2OChain.cc:432
boost::shared_ptr< ResourceBrokerResult > RBResultPtr
bool compareRBResultPtrValues(DataSenderMonitorCollection::RBResultPtr firstValue, DataSenderMonitorCollection::RBResultPtr secondValue)
ResourceBrokerResultsList getAllResourceBrokerResults() const
uint32_t eventNumber() const
Definition: I2OChain.cc:595
OutputModuleResultsList getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
RBResultPtr buildResourceBrokerResult(RBRecordPtr const &) const
DataSenderMonitorCollection DSMC
std::vector< FUResultPtr > FilterUnitResultsList
FURecordPtr getFilterUnitRecord(RBRecordPtr &, FilterUnitKey const &)
uint32_t outputModuleId() const
Definition: I2OChain.cc:505