9 #include <boost/lexical_cast.hpp>
26 outstandingDataDiscards_(0),
27 outstandingDQMDiscards_(0),
30 updateInterval_(updateInterval),
46 bool pointersAreValid;
49 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
53 i2oChain, rbRecordPtr, fuRecordPtr,
54 topLevelOutModPtr, rbSpecificOutModPtr,
79 bool pointersAreValid;
82 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
86 i2oChain, rbRecordPtr, fuRecordPtr,
87 topLevelOutModPtr, rbSpecificOutModPtr,
94 topLevelOutModPtr->name = outModName;
95 topLevelOutModPtr->initMsgSize = msgSize;
97 ++rbRecordPtr->initMsgCount;
98 rbSpecificOutModPtr->name = outModName;
99 rbSpecificOutModPtr->initMsgSize = msgSize;
101 ++fuRecordPtr->initMsgCount;
102 fuSpecificOutModPtr->name = outModName;
103 fuSpecificOutModPtr->initMsgSize = msgSize;
112 if (! i2oChain.
complete()) {
return;}
115 double eventSize =
static_cast<double>(i2oChain.
totalDataSize());
120 bool pointersAreValid;
123 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
127 i2oChain, rbRecordPtr, fuRecordPtr,
128 topLevelOutModPtr, rbSpecificOutModPtr,
129 fuSpecificOutModPtr);
133 if (pointersAreValid)
135 topLevelOutModPtr->eventSize.addSample(eventSize);
138 rbRecordPtr->lastEventNumber = eventNumber;
139 rbRecordPtr->eventSize.addSample(eventSize);
140 rbSpecificOutModPtr->eventSize.addSample(eventSize);
143 fuRecordPtr->lastEventNumber = eventNumber;
144 fuRecordPtr->shortIntervalEventSize.addSample(eventSize);
145 fuRecordPtr->mediumIntervalEventSize.addSample(eventSize);
146 fuSpecificOutModPtr->eventSize.addSample(eventSize);
155 if (! i2oChain.
complete()) {
return;}
158 double eventSize =
static_cast<double>(i2oChain.
totalDataSize());
161 bool pointersAreValid;
167 if (pointersAreValid)
174 if (pointersAreValid)
176 rbRecordPtr->dqmEventSize.addSample(eventSize);
177 fuRecordPtr->dqmEventSize.addSample(eventSize);
186 if (! i2oChain.
complete()) {
return;}
189 double eventSize =
static_cast<double>(i2oChain.
totalDataSize());
192 bool pointersAreValid;
198 if (pointersAreValid)
205 if (pointersAreValid)
207 rbRecordPtr->errorEventSize.addSample(eventSize);
208 fuRecordPtr->errorEventSize.addSample(eventSize);
216 double eventSize =
static_cast<double>(i2oChain.
totalDataSize());
219 bool pointersAreValid;
225 if (pointersAreValid)
232 if (pointersAreValid)
236 rbRecordPtr->faultyDQMEventSize.addSample(eventSize);
237 fuRecordPtr->faultyDQMEventSize.addSample(eventSize);
241 rbRecordPtr->faultyEventSize.addSample(eventSize);
242 fuRecordPtr->faultyEventSize.addSample(eventSize);
251 bool pointersAreValid;
257 if (pointersAreValid)
264 if (pointersAreValid)
266 rbRecordPtr->dataDiscardCount.addSample(1);
267 fuRecordPtr->dataDiscardCount.addSample(1);
275 bool pointersAreValid;
281 if (pointersAreValid)
288 if (pointersAreValid)
290 rbRecordPtr->dqmDiscardCount.addSample(1);
291 fuRecordPtr->dqmDiscardCount.addSample(1);
299 bool pointersAreValid;
305 if (pointersAreValid)
312 if (pointersAreValid)
314 rbRecordPtr->skippedDiscardCount.addSample(1);
315 fuRecordPtr->skippedDiscardCount.addSample(1);
335 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
336 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
342 result->uniqueRBID = rbMapIter->first;
343 resultsList.push_back(result);
356 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
362 result->uniqueRBID = rbMapIter->first;
375 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
393 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
398 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
399 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
400 rbRecordPtr->filterUnitMap.end();
401 for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
402 fuMapIter != fuMapEnd; ++fuMapIter)
406 result->initMsgCount = fuRecordPtr->initMsgCount;
407 result->lastRunNumber = fuRecordPtr->lastRunNumber;
408 result->lastEventNumber = fuRecordPtr->lastEventNumber;
409 fuRecordPtr->shortIntervalEventSize.getStats(result->shortIntervalEventStats);
410 fuRecordPtr->mediumIntervalEventSize.getStats(result->mediumIntervalEventStats);
411 fuRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
412 fuRecordPtr->errorEventSize.getStats(result->errorEventStats);
413 fuRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
414 fuRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
415 fuRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
416 fuRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
417 fuRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
419 result->outstandingDataDiscardCount =
420 result->initMsgCount +
421 result->shortIntervalEventStats.getSampleCount() +
422 result->errorEventStats.getSampleCount() +
423 result->faultyEventStats.getSampleCount() -
424 result->dataDiscardStats.getSampleCount();
425 result->outstandingDQMDiscardCount =
426 result->dqmEventStats.getSampleCount() +
427 result->faultyDQMEventStats.getSampleCount() -
428 result->dqmDiscardStats.getSampleCount();
430 resultsList.push_back(result);
442 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
443 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
448 rbRecordPtr->eventSize.calculateStatistics();
449 rbRecordPtr->dqmEventSize.calculateStatistics();
450 rbRecordPtr->errorEventSize.calculateStatistics();
451 rbRecordPtr->faultyEventSize.calculateStatistics();
452 rbRecordPtr->faultyDQMEventSize.calculateStatistics();
453 rbRecordPtr->dataDiscardCount.calculateStatistics();
454 rbRecordPtr->dqmDiscardCount.calculateStatistics();
455 rbRecordPtr->skippedDiscardCount.calculateStatistics();
458 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
459 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
460 rbRecordPtr->filterUnitMap.end();
461 for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
462 fuMapIter != fuMapEnd; ++fuMapIter)
465 fuRecordPtr->shortIntervalEventSize.calculateStatistics();
466 fuRecordPtr->mediumIntervalEventSize.calculateStatistics();
467 fuRecordPtr->dqmEventSize.calculateStatistics();
468 fuRecordPtr->errorEventSize.calculateStatistics();
469 fuRecordPtr->faultyEventSize.calculateStatistics();
470 fuRecordPtr->faultyDQMEventSize.calculateStatistics();
471 fuRecordPtr->dataDiscardCount.calculateStatistics();
472 fuRecordPtr->dqmDiscardCount.calculateStatistics();
473 fuRecordPtr->skippedDiscardCount.calculateStatistics();
500 infoSpaceItems.push_back(std::make_pair(
"connectedRBs", &
connectedRBs_));
501 infoSpaceItems.push_back(std::make_pair(
"connectedEPs", &
connectedEPs_));
502 infoSpaceItems.push_back(std::make_pair(
"activeEPs", &
activeEPs_));
505 infoSpaceItems.push_back(std::make_pair(
"faultyEvents", &
faultyEvents_));
506 infoSpaceItems.push_back(std::make_pair(
"ignoredDiscards", &
ignoredDiscards_));
516 uint32_t localEPCount = 0;
517 uint32_t localActiveEPCount = 0;
518 int localMissingDataDiscardCount = 0;
519 int localMissingDQMDiscardCount = 0;
520 uint32_t localFaultyEventsCount = 0;
521 uint32_t localIgnoredDiscardCount = 0;
522 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
523 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
528 localEPCount += rbRecordPtr->filterUnitMap.size();
531 rbRecordPtr->skippedDiscardCount.getStats(skippedDiscardStats);
537 rbRecordPtr->eventSize.getStats(eventStats);
538 rbRecordPtr->errorEventSize.getStats(errorEventStats);
539 rbRecordPtr->dataDiscardCount.getStats(dataDiscardStats);
540 localMissingDataDiscardCount += rbRecordPtr->initMsgCount + eventStats.
getSampleCount() +
546 rbRecordPtr->dqmEventSize.getStats(dqmEventStats);
547 rbRecordPtr->dqmDiscardCount.getStats(dqmDiscardStats);
552 rbRecordPtr->faultyEventSize.getStats(faultyEventStats);
555 rbRecordPtr->faultyDQMEventSize.getStats(faultyDQMEventStats);
558 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
559 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
560 rbRecordPtr->filterUnitMap.end();
561 for (fuMapIter = rbRecordPtr->filterUnitMap.begin(); fuMapIter != fuMapEnd; ++fuMapIter)
565 fuRecordPtr->mediumIntervalEventSize.getStats(fuMediumIntervalEventStats);
567 ++localActiveEPCount;
571 connectedEPs_ =
static_cast<xdata::UnsignedInteger32
>(localEPCount);
572 activeEPs_ =
static_cast<xdata::UnsignedInteger32
>(localActiveEPCount);
575 faultyEvents_ =
static_cast<xdata::UnsignedInteger32
>(localFaultyEventsCount);
576 ignoredDiscards_ =
static_cast<xdata::UnsignedInteger32
>(localIgnoredDiscardCount);
585 const std::string alarmName =
"FaultyEvents";
587 if (faultyEventsCount > 0)
589 std::ostringstream
msg;
590 msg <<
"Missing or faulty I2O fragments for " <<
592 " events. These events are lost!";
593 XCEPT_DECLARE(stor::exception::FaultyEvents, ex, msg.str());
605 const std::string alarmName =
"IgnoredDiscard";
607 if ( ignoredDiscardCount > 0)
609 std::ostringstream
msg;
610 msg << ignoredDiscardCount <<
611 " discard messages ignored. These events might be stuck in the resource broker.";
612 XCEPT_DECLARE(stor::exception::IgnoredDiscard, ex, msg.str());
635 if (! rbKey.
isValid) {
return false;}
637 if (! fuKey.
isValid) {
return false;}
640 topLevelOutModPtr = getOutputModuleRecord(outputModuleMap_, outModKey);
642 rbRecordPtr = getResourceBrokerRecord(rbKey);
643 rbSpecificOutModPtr = getOutputModuleRecord(
644 rbRecordPtr->outputModuleMap,
647 fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
648 fuSpecificOutModPtr = getOutputModuleRecord(
649 fuRecordPtr->outputModuleMap,
663 if (! rbKey.
isValid) {
return false;}
665 rbRecordPtr = getResourceBrokerRecord(rbKey);
678 if (! fuKey.
isValid) {
return false;}
680 fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
690 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
699 rbRecordPtr = rbMapIter->second;
709 std::map<ResourceBrokerKey, UniqueResourceBrokerID_t>::const_iterator rbMapIter;
713 std::string workString = rbKey.
hltURL +
714 boost::lexical_cast<std::string>(rbKey.
hltTid) +
715 boost::lexical_cast<std::string>(rbKey.
hltInstance) +
716 boost::lexical_cast<std::string>(rbKey.
hltLocalId) +
718 uLong crc = crc32(0
L, Z_NULL, 0);
719 Bytef* crcbuf = (Bytef*) workString.data();
720 crc = crc32(crc, crcbuf, workString.length());
726 uniqueID = rbMapIter->second;
740 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
741 fuMapIter = rbRecordPtr->filterUnitMap.find(fuKey);
742 if (fuMapIter == rbRecordPtr->filterUnitMap.end())
745 rbRecordPtr->filterUnitMap[fuKey] = fuRecordPtr;
749 fuRecordPtr = fuMapIter->second;
763 OutputModuleRecordMap::const_iterator omMapIter;
764 omMapIter = outModMap.find(outModKey);
765 if (omMapIter == outModMap.end())
769 outModRecordPtr->name =
"Unknown";
770 outModRecordPtr->id = outModKey;
771 outModRecordPtr->initMsgSize = 0;
773 outModMap[outModKey] = outModRecordPtr;
777 outModRecordPtr = omMapIter->second;
779 return outModRecordPtr;
788 OutputModuleRecordMap::const_iterator omMapIter;
789 OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
790 for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
794 result->name = outModRecordPtr->name;
795 result->id = outModRecordPtr->id;
796 result->initMsgSize = outModRecordPtr->initMsgSize;
797 outModRecordPtr->eventSize.getStats(result->eventStats);
798 resultsList.push_back(result);
810 result->filterUnitCount = rbRecordPtr->filterUnitMap.size();
811 result->initMsgCount = rbRecordPtr->initMsgCount;
812 result->lastRunNumber = rbRecordPtr->lastRunNumber;
813 result->lastEventNumber = rbRecordPtr->lastEventNumber;
814 rbRecordPtr->eventSize.getStats(result->eventStats);
815 rbRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
816 rbRecordPtr->errorEventSize.getStats(result->errorEventStats);
817 rbRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
818 rbRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
819 rbRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
820 rbRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
821 rbRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
823 result->outstandingDataDiscardCount =
824 result->initMsgCount +
825 result->eventStats.getSampleCount() +
826 result->errorEventStats.getSampleCount() +
827 result->faultyEventStats.getSampleCount() -
828 result->dataDiscardStats.getSampleCount();
829 result->outstandingDQMDiscardCount =
830 result->dqmEventStats.getSampleCount() +
831 result->faultyDQMEventStats.getSampleCount() -
832 result->dqmDiscardStats.getSampleCount();
840 OutputModuleRecordMap::const_iterator omMapIter;
841 OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
842 for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
847 outModRecordPtr->eventSize.calculateStatistics();
858 return *firstValue < *secondValue;
unsigned int fragmentCount() const
uint32_t runNumber() const
void incrementDataDiscardCount(I2OChain const &)
long long UniqueResourceBrokerID_t
RBRecordPtr getResourceBrokerRecord(ResourceBrokerKey const &)
DataSenderMonitorCollection(const utils::Duration_t &updateInterval, AlarmHandlerPtr)
std::map< UniqueResourceBrokerID_t, RBRecordPtr > resourceBrokerMap_
OutputModuleResultsList getTopLevelOutputModuleResults() const
boost::shared_ptr< FilterUnitRecord > FURecordPtr
void calcStatsForOutputModules(OutputModuleRecordMap &outputModuleMap)
void incrementSkippedDiscardCount(I2OChain const &)
xdata::UnsignedInteger32 activeEPs_
RBResultPtr getOneResourceBrokerResult(UniqueResourceBrokerID_t) const
std::map< OutputModuleKey, OutModRecordPtr > OutputModuleRecordMap
const utils::Duration_t updateInterval_
uint64_t getSampleCount(DataSetType t=FULL) const
virtual void do_appendInfoSpaceItems(InfoSpaceItems &)
boost::shared_ptr< ResourceBrokerRecord > RBRecordPtr
std::map< ResourceBrokerKey, UniqueResourceBrokerID_t > resourceBrokerIDs_
void incrementDQMDiscardCount(I2OChain const &)
UniqueResourceBrokerID_t getUniqueResourceBrokerID(ResourceBrokerKey const &)
virtual void do_calculateStatistics()
unsigned int messageCode() const
FilterUnitResultsList getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
xdata::UnsignedInteger32 connectedRBs_
void addDQMEventSample(I2OChain const &)
xdata::Integer32 outstandingDataDiscards_
void ignoredDiscardAlarm(const uint32_t &) const
boost::mutex collectionsMutex_
boost::posix_time::time_duration Duration_t
bool getAllNeededPointers(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr, FURecordPtr &fuRecordPtr, OutModRecordPtr &topLevelOutModPtr, OutModRecordPtr &rbSpecificOutModPtr, OutModRecordPtr &fuSpecificOutModPtr)
bool getFURecordPointer(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr, FURecordPtr &fuRecordPtr)
std::string outputModuleLabel() const
void addErrorEventSample(I2OChain const &)
void addEventSample(I2OChain const &)
bool getRBRecordPointer(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr)
boost::shared_ptr< AlarmHandler > AlarmHandlerPtr
virtual void do_updateInfoSpaceItems()
boost::shared_ptr< OutputModuleRecord > OutModRecordPtr
OutModRecordPtr getOutputModuleRecord(OutputModuleRecordMap &, OutputModuleKey const &)
xdata::UnsignedInteger32 faultyEvents_
AlarmHandlerPtr alarmHandler_
std::vector< boost::shared_ptr< OutputModuleResult > > OutputModuleResultsList
OutputModuleResultsList buildOutputModuleResults(OutputModuleRecordMap const &) const
boost::shared_ptr< FilterUnitResult > FUResultPtr
std::vector< RBResultPtr > ResourceBrokerResultsList
void addFragmentSample(I2OChain const &)
void addInitSample(I2OChain const &)
xdata::UnsignedInteger32 ignoredDiscards_
std::vector< std::pair< std::string, xdata::Serializable * > > InfoSpaceItems
void faultyEventsAlarm(const uint32_t &) const
unsigned long totalDataSize() const
boost::shared_ptr< ResourceBrokerResult > RBResultPtr
void addFaultyEventSample(I2OChain const &)
bool compareRBResultPtrValues(DataSenderMonitorCollection::RBResultPtr firstValue, DataSenderMonitorCollection::RBResultPtr secondValue)
xdata::Integer32 outstandingDQMDiscards_
ResourceBrokerResultsList getAllResourceBrokerResults() const
uint32_t eventNumber() const
OutputModuleRecordMap outputModuleMap_
OutputModuleResultsList getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
RBResultPtr buildResourceBrokerResult(RBRecordPtr const &) const
DataSenderMonitorCollection DSMC
std::vector< FUResultPtr > FilterUnitResultsList
FURecordPtr getFilterUnitRecord(RBRecordPtr &, FilterUnitKey const &)
xdata::UnsignedInteger32 connectedEPs_
uint32_t outputModuleId() const