9 #include <boost/lexical_cast.hpp>
26 outstandingDataDiscards_(0),
27 outstandingDQMDiscards_(0),
30 updateInterval_(updateInterval),
46 bool pointersAreValid;
49 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
53 i2oChain, rbRecordPtr, fuRecordPtr,
54 topLevelOutModPtr, rbSpecificOutModPtr,
61 topLevelOutModPtr->name = outModName;
62 topLevelOutModPtr->initMsgSize = msgSize;
64 ++rbRecordPtr->initMsgCount;
66 rbSpecificOutModPtr->name = outModName;
67 rbSpecificOutModPtr->initMsgSize = msgSize;
69 ++fuRecordPtr->initMsgCount;
70 fuSpecificOutModPtr->name = outModName;
71 fuSpecificOutModPtr->initMsgSize = msgSize;
83 double eventSize =
static_cast<double>(i2oChain.
totalDataSize());
88 bool pointersAreValid;
91 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
95 i2oChain, rbRecordPtr, fuRecordPtr,
96 topLevelOutModPtr, rbSpecificOutModPtr,
101 if (pointersAreValid)
103 topLevelOutModPtr->eventSize.addSample(eventSize);
106 rbRecordPtr->lastEventNumber = eventNumber;
107 rbRecordPtr->eventSize.addSample(eventSize);
108 rbSpecificOutModPtr->eventSize.addSample(eventSize);
111 fuRecordPtr->lastEventNumber = eventNumber;
112 fuRecordPtr->shortIntervalEventSize.addSample(eventSize);
113 fuRecordPtr->mediumIntervalEventSize.addSample(eventSize);
114 fuSpecificOutModPtr->eventSize.addSample(eventSize);
123 if (! i2oChain.
complete()) {
return;}
126 double eventSize =
static_cast<double>(i2oChain.
totalDataSize());
129 bool pointersAreValid;
135 if (pointersAreValid)
142 if (pointersAreValid)
144 rbRecordPtr->dqmEventSize.addSample(eventSize);
145 fuRecordPtr->dqmEventSize.addSample(eventSize);
154 if (! i2oChain.
complete()) {
return;}
157 double eventSize =
static_cast<double>(i2oChain.
totalDataSize());
160 bool pointersAreValid;
166 if (pointersAreValid)
173 if (pointersAreValid)
175 rbRecordPtr->errorEventSize.addSample(eventSize);
176 fuRecordPtr->errorEventSize.addSample(eventSize);
184 double eventSize =
static_cast<double>(i2oChain.
totalDataSize());
187 bool pointersAreValid;
193 if (pointersAreValid)
200 if (pointersAreValid)
204 rbRecordPtr->faultyDQMEventSize.addSample(eventSize);
205 fuRecordPtr->faultyDQMEventSize.addSample(eventSize);
209 rbRecordPtr->faultyEventSize.addSample(eventSize);
210 fuRecordPtr->faultyEventSize.addSample(eventSize);
219 bool pointersAreValid;
225 if (pointersAreValid)
232 if (pointersAreValid)
234 rbRecordPtr->dataDiscardCount.addSample(1);
235 fuRecordPtr->dataDiscardCount.addSample(1);
243 bool pointersAreValid;
249 if (pointersAreValid)
256 if (pointersAreValid)
258 rbRecordPtr->dqmDiscardCount.addSample(1);
259 fuRecordPtr->dqmDiscardCount.addSample(1);
267 bool pointersAreValid;
273 if (pointersAreValid)
280 if (pointersAreValid)
282 rbRecordPtr->skippedDiscardCount.addSample(1);
283 fuRecordPtr->skippedDiscardCount.addSample(1);
303 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
304 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
310 result->uniqueRBID = rbMapIter->first;
311 resultsList.push_back(result);
324 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
330 result->uniqueRBID = rbMapIter->first;
343 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
361 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
366 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
367 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
368 rbRecordPtr->filterUnitMap.end();
369 for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
370 fuMapIter != fuMapEnd; ++fuMapIter)
374 result->initMsgCount = fuRecordPtr->initMsgCount;
375 result->lastRunNumber = fuRecordPtr->lastRunNumber;
376 result->lastEventNumber = fuRecordPtr->lastEventNumber;
377 fuRecordPtr->shortIntervalEventSize.getStats(result->shortIntervalEventStats);
378 fuRecordPtr->mediumIntervalEventSize.getStats(result->mediumIntervalEventStats);
379 fuRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
380 fuRecordPtr->errorEventSize.getStats(result->errorEventStats);
381 fuRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
382 fuRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
383 fuRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
384 fuRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
385 fuRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
387 result->outstandingDataDiscardCount =
388 result->initMsgCount +
389 result->shortIntervalEventStats.getSampleCount() +
390 result->errorEventStats.getSampleCount() +
391 result->faultyEventStats.getSampleCount() -
392 result->dataDiscardStats.getSampleCount();
393 result->outstandingDQMDiscardCount =
394 result->dqmEventStats.getSampleCount() +
395 result->faultyDQMEventStats.getSampleCount() -
396 result->dqmDiscardStats.getSampleCount();
398 resultsList.push_back(result);
410 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
411 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
416 rbRecordPtr->eventSize.calculateStatistics();
417 rbRecordPtr->dqmEventSize.calculateStatistics();
418 rbRecordPtr->errorEventSize.calculateStatistics();
419 rbRecordPtr->faultyEventSize.calculateStatistics();
420 rbRecordPtr->faultyDQMEventSize.calculateStatistics();
421 rbRecordPtr->dataDiscardCount.calculateStatistics();
422 rbRecordPtr->dqmDiscardCount.calculateStatistics();
423 rbRecordPtr->skippedDiscardCount.calculateStatistics();
426 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
427 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
428 rbRecordPtr->filterUnitMap.end();
429 for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
430 fuMapIter != fuMapEnd; ++fuMapIter)
433 fuRecordPtr->shortIntervalEventSize.calculateStatistics();
434 fuRecordPtr->mediumIntervalEventSize.calculateStatistics();
435 fuRecordPtr->dqmEventSize.calculateStatistics();
436 fuRecordPtr->errorEventSize.calculateStatistics();
437 fuRecordPtr->faultyEventSize.calculateStatistics();
438 fuRecordPtr->faultyDQMEventSize.calculateStatistics();
439 fuRecordPtr->dataDiscardCount.calculateStatistics();
440 fuRecordPtr->dqmDiscardCount.calculateStatistics();
441 fuRecordPtr->skippedDiscardCount.calculateStatistics();
468 infoSpaceItems.push_back(std::make_pair(
"connectedRBs", &
connectedRBs_));
469 infoSpaceItems.push_back(std::make_pair(
"connectedEPs", &
connectedEPs_));
470 infoSpaceItems.push_back(std::make_pair(
"activeEPs", &
activeEPs_));
473 infoSpaceItems.push_back(std::make_pair(
"faultyEvents", &
faultyEvents_));
474 infoSpaceItems.push_back(std::make_pair(
"ignoredDiscards", &
ignoredDiscards_));
484 uint32_t localEPCount = 0;
485 uint32_t localActiveEPCount = 0;
486 int localMissingDataDiscardCount = 0;
487 int localMissingDQMDiscardCount = 0;
488 uint32_t localFaultyEventsCount = 0;
489 uint32_t localIgnoredDiscardCount = 0;
490 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
491 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
496 if ( rbRecordPtr->initMsgCount > 0 )
497 localEPCount += rbRecordPtr->nExpectedEPs / rbRecordPtr->initMsgCount;
500 rbRecordPtr->skippedDiscardCount.getStats(skippedDiscardStats);
506 rbRecordPtr->eventSize.getStats(eventStats);
507 rbRecordPtr->errorEventSize.getStats(errorEventStats);
508 rbRecordPtr->dataDiscardCount.getStats(dataDiscardStats);
509 localMissingDataDiscardCount += rbRecordPtr->initMsgCount + eventStats.
getSampleCount() +
514 rbRecordPtr->dqmEventSize.getStats(dqmEventStats);
515 rbRecordPtr->dqmDiscardCount.getStats(dqmDiscardStats);
520 rbRecordPtr->faultyEventSize.getStats(faultyEventStats);
523 rbRecordPtr->faultyDQMEventSize.getStats(faultyDQMEventStats);
526 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
527 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
528 rbRecordPtr->filterUnitMap.end();
529 for (fuMapIter = rbRecordPtr->filterUnitMap.begin(); fuMapIter != fuMapEnd; ++fuMapIter)
533 fuRecordPtr->mediumIntervalEventSize.getStats(fuMediumIntervalEventStats);
535 ++localActiveEPCount;
539 connectedEPs_ =
static_cast<xdata::UnsignedInteger32
>(localEPCount);
540 activeEPs_ =
static_cast<xdata::UnsignedInteger32
>(localActiveEPCount);
543 faultyEvents_ =
static_cast<xdata::UnsignedInteger32
>(localFaultyEventsCount);
544 ignoredDiscards_ =
static_cast<xdata::UnsignedInteger32
>(localIgnoredDiscardCount);
555 if (faultyEventsCount > 0)
557 std::ostringstream
msg;
558 msg <<
"Missing or faulty I2O fragments for " <<
560 " events. These events are lost!";
561 XCEPT_DECLARE(stor::exception::FaultyEvents, ex, msg.str());
575 if ( ignoredDiscardCount > 0)
577 std::ostringstream
msg;
578 msg << ignoredDiscardCount <<
579 " discard messages ignored. These events might be stuck in the resource broker.";
580 XCEPT_DECLARE(stor::exception::IgnoredDiscard, ex, msg.str());
593 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
594 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
598 if ( rbMapIter->second->initMsgCount > 0 )
599 count += rbMapIter->second->nExpectedEPs / rbMapIter->second->initMsgCount;
618 if (! rbKey.
isValid) {
return false;}
620 if (! fuKey.
isValid) {
return false;}
623 topLevelOutModPtr = getOutputModuleRecord(outputModuleMap_, outModKey);
625 rbRecordPtr = getResourceBrokerRecord(rbKey);
626 rbSpecificOutModPtr = getOutputModuleRecord(
627 rbRecordPtr->outputModuleMap,
630 fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
631 fuSpecificOutModPtr = getOutputModuleRecord(
632 fuRecordPtr->outputModuleMap,
646 if (! rbKey.
isValid) {
return false;}
648 rbRecordPtr = getResourceBrokerRecord(rbKey);
661 if (! fuKey.
isValid) {
return false;}
663 fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
673 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
682 rbRecordPtr = rbMapIter->second;
692 std::map<ResourceBrokerKey, UniqueResourceBrokerID_t>::const_iterator rbMapIter;
698 boost::lexical_cast<std::string>(rbKey.
hltInstance) +
701 uLong crc = crc32(0
L, Z_NULL, 0);
702 Bytef* crcbuf = (Bytef*) workString.data();
703 crc = crc32(crc, crcbuf, workString.length());
709 uniqueID = rbMapIter->second;
723 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
724 fuMapIter = rbRecordPtr->filterUnitMap.find(fuKey);
725 if (fuMapIter == rbRecordPtr->filterUnitMap.end())
728 rbRecordPtr->filterUnitMap[fuKey] = fuRecordPtr;
732 fuRecordPtr = fuMapIter->second;
746 OutputModuleRecordMap::const_iterator omMapIter;
747 omMapIter = outModMap.find(outModKey);
748 if (omMapIter == outModMap.end())
752 outModRecordPtr->name =
"Unknown";
753 outModRecordPtr->id = outModKey;
754 outModRecordPtr->initMsgSize = 0;
756 outModMap[outModKey] = outModRecordPtr;
760 outModRecordPtr = omMapIter->second;
762 return outModRecordPtr;
771 OutputModuleRecordMap::const_iterator omMapIter;
772 OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
773 for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
777 result->name = outModRecordPtr->name;
778 result->id = outModRecordPtr->id;
779 result->initMsgSize = outModRecordPtr->initMsgSize;
780 outModRecordPtr->eventSize.getStats(result->eventStats);
781 resultsList.push_back(result);
793 result->filterUnitCount = rbRecordPtr->initMsgCount>0 ? rbRecordPtr->nExpectedEPs / rbRecordPtr->initMsgCount : 0;
794 result->initMsgCount = rbRecordPtr->initMsgCount;
795 result->lastRunNumber = rbRecordPtr->lastRunNumber;
796 result->lastEventNumber = rbRecordPtr->lastEventNumber;
797 rbRecordPtr->eventSize.getStats(result->eventStats);
798 rbRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
799 rbRecordPtr->errorEventSize.getStats(result->errorEventStats);
800 rbRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
801 rbRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
802 rbRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
803 rbRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
804 rbRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
806 result->outstandingDataDiscardCount =
807 result->initMsgCount +
808 result->eventStats.getSampleCount() +
809 result->errorEventStats.getSampleCount() +
810 result->faultyEventStats.getSampleCount() -
811 result->dataDiscardStats.getSampleCount();
812 result->outstandingDQMDiscardCount =
813 result->dqmEventStats.getSampleCount() +
814 result->faultyDQMEventStats.getSampleCount() -
815 result->dqmDiscardStats.getSampleCount();
823 OutputModuleRecordMap::const_iterator omMapIter;
824 OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
825 for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
829 outModRecordPtr->fragmentSize.calculateStatistics();
830 outModRecordPtr->eventSize.calculateStatistics();
841 return *firstValue < *secondValue;
uint32_t runNumber() const
void incrementDataDiscardCount(I2OChain const &)
long long UniqueResourceBrokerID_t
RBRecordPtr getResourceBrokerRecord(ResourceBrokerKey const &)
DataSenderMonitorCollection(const utils::Duration_t &updateInterval, AlarmHandlerPtr)
std::map< UniqueResourceBrokerID_t, RBRecordPtr > resourceBrokerMap_
OutputModuleResultsList getTopLevelOutputModuleResults() const
boost::shared_ptr< FilterUnitRecord > FURecordPtr
void calcStatsForOutputModules(OutputModuleRecordMap &outputModuleMap)
void incrementSkippedDiscardCount(I2OChain const &)
xdata::UnsignedInteger32 activeEPs_
RBResultPtr getOneResourceBrokerResult(UniqueResourceBrokerID_t) const
std::map< OutputModuleKey, OutModRecordPtr > OutputModuleRecordMap
const utils::Duration_t updateInterval_
uint64_t getSampleCount(DataSetType t=FULL) const
virtual void do_appendInfoSpaceItems(InfoSpaceItems &)
boost::shared_ptr< ResourceBrokerRecord > RBRecordPtr
std::map< ResourceBrokerKey, UniqueResourceBrokerID_t > resourceBrokerIDs_
void incrementDQMDiscardCount(I2OChain const &)
UniqueResourceBrokerID_t getUniqueResourceBrokerID(ResourceBrokerKey const &)
virtual void do_calculateStatistics()
size_t getConnectedEPs() const
unsigned int messageCode() const
FilterUnitResultsList getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
xdata::UnsignedInteger32 connectedRBs_
void addDQMEventSample(I2OChain const &)
xdata::Integer32 outstandingDataDiscards_
void ignoredDiscardAlarm(const uint32_t &) const
uint32_t nExpectedEPs() const
boost::mutex collectionsMutex_
boost::posix_time::time_duration Duration_t
bool getAllNeededPointers(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr, FURecordPtr &fuRecordPtr, OutModRecordPtr &topLevelOutModPtr, OutModRecordPtr &rbSpecificOutModPtr, OutModRecordPtr &fuSpecificOutModPtr)
bool getFURecordPointer(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr, FURecordPtr &fuRecordPtr)
std::string outputModuleLabel() const
void addErrorEventSample(I2OChain const &)
void addEventSample(I2OChain const &)
bool getRBRecordPointer(I2OChain const &i2oChain, RBRecordPtr &rbRecordPtr)
boost::shared_ptr< AlarmHandler > AlarmHandlerPtr
virtual void do_updateInfoSpaceItems()
boost::shared_ptr< OutputModuleRecord > OutModRecordPtr
OutModRecordPtr getOutputModuleRecord(OutputModuleRecordMap &, OutputModuleKey const &)
xdata::UnsignedInteger32 faultyEvents_
AlarmHandlerPtr alarmHandler_
std::vector< boost::shared_ptr< OutputModuleResult > > OutputModuleResultsList
OutputModuleResultsList buildOutputModuleResults(OutputModuleRecordMap const &) const
boost::shared_ptr< FilterUnitResult > FUResultPtr
std::vector< RBResultPtr > ResourceBrokerResultsList
void addInitSample(I2OChain const &)
xdata::UnsignedInteger32 ignoredDiscards_
std::vector< std::pair< std::string, xdata::Serializable * > > InfoSpaceItems
void faultyEventsAlarm(const uint32_t &) const
unsigned long totalDataSize() const
boost::shared_ptr< ResourceBrokerResult > RBResultPtr
void addFaultyEventSample(I2OChain const &)
bool compareRBResultPtrValues(DataSenderMonitorCollection::RBResultPtr firstValue, DataSenderMonitorCollection::RBResultPtr secondValue)
xdata::Integer32 outstandingDQMDiscards_
ResourceBrokerResultsList getAllResourceBrokerResults() const
uint32_t eventNumber() const
OutputModuleRecordMap outputModuleMap_
OutputModuleResultsList getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
RBResultPtr buildResourceBrokerResult(RBRecordPtr const &) const
DataSenderMonitorCollection DSMC
std::vector< FUResultPtr > FilterUnitResultsList
FURecordPtr getFilterUnitRecord(RBRecordPtr &, FilterUnitKey const &)
xdata::UnsignedInteger32 connectedEPs_
uint32_t outputModuleId() const