9 #include <boost/lexical_cast.hpp>
26 outstandingDataDiscards_(0),
27 outstandingDQMDiscards_(0),
30 updateInterval_(updateInterval),
46 bool pointersAreValid;
49 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
53 i2oChain, rbRecordPtr, fuRecordPtr,
54 topLevelOutModPtr, rbSpecificOutModPtr,
79 bool pointersAreValid;
82 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
86 i2oChain, rbRecordPtr, fuRecordPtr,
87 topLevelOutModPtr, rbSpecificOutModPtr,
94 topLevelOutModPtr->name = outModName;
95 topLevelOutModPtr->initMsgSize = msgSize;
97 ++rbRecordPtr->initMsgCount;
98 rbSpecificOutModPtr->name = outModName;
99 rbSpecificOutModPtr->initMsgSize = msgSize;
101 ++fuRecordPtr->initMsgCount;
102 fuSpecificOutModPtr->name = outModName;
103 fuSpecificOutModPtr->initMsgSize = msgSize;
112 if (! i2oChain.
complete()) {
return;}
115 double eventSize =
static_cast<double>(i2oChain.
totalDataSize());
120 bool pointersAreValid;
123 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
127 i2oChain, rbRecordPtr, fuRecordPtr,
128 topLevelOutModPtr, rbSpecificOutModPtr,
129 fuSpecificOutModPtr);
133 if (pointersAreValid)
135 topLevelOutModPtr->eventSize.addSample(eventSize);
138 rbRecordPtr->lastEventNumber = eventNumber;
139 rbRecordPtr->eventSize.addSample(eventSize);
140 rbSpecificOutModPtr->eventSize.addSample(eventSize);
143 fuRecordPtr->lastEventNumber = eventNumber;
144 fuRecordPtr->shortIntervalEventSize.addSample(eventSize);
145 fuRecordPtr->mediumIntervalEventSize.addSample(eventSize);
146 fuSpecificOutModPtr->eventSize.addSample(eventSize);
155 if (! i2oChain.
complete()) {
return;}
158 double eventSize =
static_cast<double>(i2oChain.
totalDataSize());
161 bool pointersAreValid;
167 if (pointersAreValid)
174 if (pointersAreValid)
176 rbRecordPtr->dqmEventSize.addSample(eventSize);
177 fuRecordPtr->dqmEventSize.addSample(eventSize);
186 if (! i2oChain.
complete()) {
return;}
189 double eventSize =
static_cast<double>(i2oChain.
totalDataSize());
192 bool pointersAreValid;
198 if (pointersAreValid)
205 if (pointersAreValid)
207 rbRecordPtr->errorEventSize.addSample(eventSize);
208 fuRecordPtr->errorEventSize.addSample(eventSize);
216 double eventSize =
static_cast<double>(i2oChain.
totalDataSize());
219 bool pointersAreValid;
225 if (pointersAreValid)
232 if (pointersAreValid)
236 rbRecordPtr->faultyDQMEventSize.addSample(eventSize);
237 fuRecordPtr->faultyDQMEventSize.addSample(eventSize);
241 rbRecordPtr->faultyEventSize.addSample(eventSize);
242 fuRecordPtr->faultyEventSize.addSample(eventSize);
251 bool pointersAreValid;
257 if (pointersAreValid)
264 if (pointersAreValid)
266 rbRecordPtr->dataDiscardCount.addSample(1);
267 fuRecordPtr->dataDiscardCount.addSample(1);
275 bool pointersAreValid;
281 if (pointersAreValid)
288 if (pointersAreValid)
290 rbRecordPtr->dqmDiscardCount.addSample(1);
291 fuRecordPtr->dqmDiscardCount.addSample(1);
299 bool pointersAreValid;
305 if (pointersAreValid)
312 if (pointersAreValid)
314 rbRecordPtr->skippedDiscardCount.addSample(1);
315 fuRecordPtr->skippedDiscardCount.addSample(1);
335 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
336 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
342 result->uniqueRBID = rbMapIter->first;
343 resultsList.push_back(result);
356 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
362 result->uniqueRBID = rbMapIter->first;
375 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
393 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
398 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
399 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
400 rbRecordPtr->filterUnitMap.end();
401 for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
402 fuMapIter != fuMapEnd; ++fuMapIter)
406 result->initMsgCount = fuRecordPtr->initMsgCount;
407 result->lastRunNumber = fuRecordPtr->lastRunNumber;
408 result->lastEventNumber = fuRecordPtr->lastEventNumber;
409 fuRecordPtr->shortIntervalEventSize.getStats(result->shortIntervalEventStats);
410 fuRecordPtr->mediumIntervalEventSize.getStats(result->mediumIntervalEventStats);
411 fuRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
412 fuRecordPtr->errorEventSize.getStats(result->errorEventStats);
413 fuRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
414 fuRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
415 fuRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
416 fuRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
417 fuRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
419 result->outstandingDataDiscardCount =
420 result->initMsgCount +
421 result->shortIntervalEventStats.getSampleCount() +
422 result->errorEventStats.getSampleCount() +
423 result->faultyEventStats.getSampleCount() -
424 result->dataDiscardStats.getSampleCount();
425 result->outstandingDQMDiscardCount =
426 result->dqmEventStats.getSampleCount() +
427 result->faultyDQMEventStats.getSampleCount() -
428 result->dqmDiscardStats.getSampleCount();
430 resultsList.push_back(result);
442 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
443 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
448 rbRecordPtr->eventSize.calculateStatistics();
449 rbRecordPtr->dqmEventSize.calculateStatistics();
450 rbRecordPtr->errorEventSize.calculateStatistics();
451 rbRecordPtr->faultyEventSize.calculateStatistics();
452 rbRecordPtr->faultyDQMEventSize.calculateStatistics();
453 rbRecordPtr->dataDiscardCount.calculateStatistics();
454 rbRecordPtr->dqmDiscardCount.calculateStatistics();
455 rbRecordPtr->skippedDiscardCount.calculateStatistics();
458 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
459 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
460 rbRecordPtr->filterUnitMap.end();
461 for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
462 fuMapIter != fuMapEnd; ++fuMapIter)
465 fuRecordPtr->shortIntervalEventSize.calculateStatistics();
466 fuRecordPtr->mediumIntervalEventSize.calculateStatistics();
467 fuRecordPtr->dqmEventSize.calculateStatistics();
468 fuRecordPtr->errorEventSize.calculateStatistics();
469 fuRecordPtr->faultyEventSize.calculateStatistics();
470 fuRecordPtr->faultyDQMEventSize.calculateStatistics();
471 fuRecordPtr->dataDiscardCount.calculateStatistics();
472 fuRecordPtr->dqmDiscardCount.calculateStatistics();
473 fuRecordPtr->skippedDiscardCount.calculateStatistics();
500 infoSpaceItems.push_back(std::make_pair(
"connectedRBs", &
connectedRBs_));
501 infoSpaceItems.push_back(std::make_pair(
"connectedEPs", &
connectedEPs_));
502 infoSpaceItems.push_back(std::make_pair(
"activeEPs", &
activeEPs_));
505 infoSpaceItems.push_back(std::make_pair(
"faultyEvents", &
faultyEvents_));
506 infoSpaceItems.push_back(std::make_pair(
"ignoredDiscards", &
ignoredDiscards_));
516 uint32_t localEPCount = 0;
517 uint32_t localActiveEPCount = 0;
518 int localMissingDataDiscardCount = 0;
519 int localMissingDQMDiscardCount = 0;
520 uint32_t localFaultyEventsCount = 0;
521 uint32_t localIgnoredDiscardCount = 0;
522 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
523 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
528 localEPCount += rbRecordPtr->filterUnitMap.size();
531 rbRecordPtr->skippedDiscardCount.getStats(skippedDiscardStats);
537 rbRecordPtr->eventSize.getStats(eventStats);
538 rbRecordPtr->errorEventSize.getStats(errorEventStats);
539 rbRecordPtr->dataDiscardCount.getStats(dataDiscardStats);
540 localMissingDataDiscardCount += rbRecordPtr->initMsgCount + eventStats.
getSampleCount() +
545 rbRecordPtr->dqmEventSize.getStats(dqmEventStats);
546 rbRecordPtr->dqmDiscardCount.getStats(dqmDiscardStats);
551 rbRecordPtr->faultyEventSize.getStats(faultyEventStats);
554 rbRecordPtr->faultyDQMEventSize.getStats(faultyDQMEventStats);
557 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
558 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
559 rbRecordPtr->filterUnitMap.end();
560 for (fuMapIter = rbRecordPtr->filterUnitMap.begin(); fuMapIter != fuMapEnd; ++fuMapIter)
564 fuRecordPtr->mediumIntervalEventSize.getStats(fuMediumIntervalEventStats);
566 ++localActiveEPCount;
570 connectedEPs_ =
static_cast<xdata::UnsignedInteger32
>(localEPCount);
571 activeEPs_ =
static_cast<xdata::UnsignedInteger32
>(localActiveEPCount);
574 faultyEvents_ =
static_cast<xdata::UnsignedInteger32
>(localFaultyEventsCount);
575 ignoredDiscards_ =
static_cast<xdata::UnsignedInteger32
>(localIgnoredDiscardCount);
584 const std::string alarmName =
"FaultyEvents";
586 if (faultyEventsCount > 0)
588 std::ostringstream
msg;
589 msg <<
"Missing or faulty I2O fragments for " <<
591 " events. These events are lost!";
592 XCEPT_DECLARE(stor::exception::FaultyEvents, ex, msg.str());
604 const std::string alarmName =
"IgnoredDiscard";
606 if ( ignoredDiscardCount > 0)
608 std::ostringstream
msg;
609 msg << ignoredDiscardCount <<
610 " discard messages ignored. These events might be stuck in the resource broker.";
611 XCEPT_DECLARE(stor::exception::IgnoredDiscard, ex, msg.str());
634 if (! rbKey.
isValid) {
return false;}
636 if (! fuKey.
isValid) {
return false;}
639 topLevelOutModPtr = getOutputModuleRecord(outputModuleMap_, outModKey);
641 rbRecordPtr = getResourceBrokerRecord(rbKey);
642 rbSpecificOutModPtr = getOutputModuleRecord(
643 rbRecordPtr->outputModuleMap,
646 fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
647 fuSpecificOutModPtr = getOutputModuleRecord(
648 fuRecordPtr->outputModuleMap,
662 if (! rbKey.
isValid) {
return false;}
664 rbRecordPtr = getResourceBrokerRecord(rbKey);
677 if (! fuKey.
isValid) {
return false;}
679 fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
689 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
698 rbRecordPtr = rbMapIter->second;
708 std::map<ResourceBrokerKey, UniqueResourceBrokerID_t>::const_iterator rbMapIter;
712 std::string workString = rbKey.
hltURL +
713 boost::lexical_cast<std::string>(rbKey.
hltTid) +
714 boost::lexical_cast<std::string>(rbKey.
hltInstance) +
715 boost::lexical_cast<std::string>(rbKey.
hltLocalId) +
717 uLong crc = crc32(0
L, Z_NULL, 0);
718 Bytef* crcbuf = (Bytef*) workString.data();
719 crc = crc32(crc, crcbuf, workString.length());
725 uniqueID = rbMapIter->second;
739 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
740 fuMapIter = rbRecordPtr->filterUnitMap.find(fuKey);
741 if (fuMapIter == rbRecordPtr->filterUnitMap.end())
744 rbRecordPtr->filterUnitMap[fuKey] = fuRecordPtr;
748 fuRecordPtr = fuMapIter->second;
762 OutputModuleRecordMap::const_iterator omMapIter;
763 omMapIter = outModMap.find(outModKey);
764 if (omMapIter == outModMap.end())
768 outModRecordPtr->name =
"Unknown";
769 outModRecordPtr->id = outModKey;
770 outModRecordPtr->initMsgSize = 0;
772 outModMap[outModKey] = outModRecordPtr;
776 outModRecordPtr = omMapIter->second;
778 return outModRecordPtr;
787 OutputModuleRecordMap::const_iterator omMapIter;
788 OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
789 for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
793 result->name = outModRecordPtr->name;
794 result->id = outModRecordPtr->id;
795 result->initMsgSize = outModRecordPtr->initMsgSize;
796 outModRecordPtr->eventSize.getStats(result->eventStats);
797 resultsList.push_back(result);
809 result->filterUnitCount = rbRecordPtr->filterUnitMap.size();
810 result->initMsgCount = rbRecordPtr->initMsgCount;
811 result->lastRunNumber = rbRecordPtr->lastRunNumber;
812 result->lastEventNumber = rbRecordPtr->lastEventNumber;
813 rbRecordPtr->eventSize.getStats(result->eventStats);
814 rbRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
815 rbRecordPtr->errorEventSize.getStats(result->errorEventStats);
816 rbRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
817 rbRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
818 rbRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
819 rbRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
820 rbRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
822 result->outstandingDataDiscardCount =
823 result->initMsgCount +
824 result->eventStats.getSampleCount() +
825 result->errorEventStats.getSampleCount() +
826 result->faultyEventStats.getSampleCount() -
827 result->dataDiscardStats.getSampleCount();
828 result->outstandingDQMDiscardCount =
829 result->dqmEventStats.getSampleCount() +
830 result->faultyDQMEventStats.getSampleCount() -
831 result->dqmDiscardStats.getSampleCount();
839 OutputModuleRecordMap::const_iterator omMapIter;
840 OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
841 for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
846 outModRecordPtr->eventSize.calculateStatistics();
857 return *firstValue < *secondValue;
unsigned int fragmentCount() const
uint32_t runNumber() const
void incrementDataDiscardCount(I2OChain const &)
long long UniqueResourceBrokerID_t
RBRecordPtr getResourceBrokerRecord(ResourceBrokerKey const &)
DataSenderMonitorCollection(const utils::Duration_t &updateInterval, AlarmHandlerPtr)
std::map< UniqueResourceBrokerID_t, RBRecordPtr > resourceBrokerMap_
OutputModuleResultsList getTopLevelOutputModuleResults() const
boost::shared_ptr< FilterUnitRecord > FURecordPtr
void calcStatsForOutputModules(OutputModuleRecordMap &outputModuleMap)
void incrementSkippedDiscardCount(I2OChain const &)
xdata::UnsignedInteger32 activeEPs_
RBResultPtr getOneResourceBrokerResult(UniqueResourceBrokerID_t) const
std::map< OutputModuleKey, OutModRecordPtr > OutputModuleRecordMap
const utils::Duration_t updateInterval_
uint64_t getSampleCount(DataSetType t=FULL) const
virtual void do_appendInfoSpaceItems(InfoSpaceItems &)
boost::shared_ptr< ResourceBrokerRecord > RBRecordPtr
std::map< ResourceBrokerKey, UniqueResourceBrokerID_t > resourceBrokerIDs_
void incrementDQMDiscardCount(I2OChain const &)
UniqueResourceBrokerID_t getUniqueResourceBrokerID(ResourceBrokerKey const &)
virtual void do_calculateStatistics()
unsigned int messageCode() const
FilterUnitResultsList getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
xdata::UnsignedInteger32 connectedRBs_
void addDQMEventSample(I2OChain const &)
xdata::Integer32 outstandingDataDiscards_
void ignoredDiscardAlarm(const uint32_t &) const
boost::mutex collectionsMutex_
boost::posix_time::time_duration Duration_t
bool getAllNeededPointers(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr, FURecordPtr &fuRecordPtr, OutModRecordPtr &topLevelOutModPtr, OutModRecordPtr &rbSpecificOutModPtr, OutModRecordPtr &fuSpecificOutModPtr)
bool getFURecordPointer(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr, FURecordPtr &fuRecordPtr)
std::string outputModuleLabel() const
void addErrorEventSample(I2OChain const &)
void addEventSample(I2OChain const &)
bool getRBRecordPointer(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr)
boost::shared_ptr< AlarmHandler > AlarmHandlerPtr
virtual void do_updateInfoSpaceItems()
boost::shared_ptr< OutputModuleRecord > OutModRecordPtr
OutModRecordPtr getOutputModuleRecord(OutputModuleRecordMap &, OutputModuleKey const &)
xdata::UnsignedInteger32 faultyEvents_
AlarmHandlerPtr alarmHandler_
std::vector< boost::shared_ptr< OutputModuleResult > > OutputModuleResultsList
OutputModuleResultsList buildOutputModuleResults(OutputModuleRecordMap const &) const
boost::shared_ptr< FilterUnitResult > FUResultPtr
std::vector< RBResultPtr > ResourceBrokerResultsList
void addFragmentSample(I2OChain const &)
void addInitSample(I2OChain const &)
xdata::UnsignedInteger32 ignoredDiscards_
std::vector< std::pair< std::string, xdata::Serializable * > > InfoSpaceItems
void faultyEventsAlarm(const uint32_t &) const
unsigned long totalDataSize() const
boost::shared_ptr< ResourceBrokerResult > RBResultPtr
void addFaultyEventSample(I2OChain const &)
bool compareRBResultPtrValues(DataSenderMonitorCollection::RBResultPtr firstValue, DataSenderMonitorCollection::RBResultPtr secondValue)
xdata::Integer32 outstandingDQMDiscards_
ResourceBrokerResultsList getAllResourceBrokerResults() const
uint32_t eventNumber() const
OutputModuleRecordMap outputModuleMap_
OutputModuleResultsList getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
RBResultPtr buildResourceBrokerResult(RBRecordPtr const &) const
DataSenderMonitorCollection DSMC
std::vector< FUResultPtr > FilterUnitResultsList
FURecordPtr getFilterUnitRecord(RBRecordPtr &, FilterUnitKey const &)
xdata::UnsignedInteger32 connectedEPs_
uint32_t outputModuleId() const