00001
00003
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007
00008 #include <zlib.h>
00009 #include <boost/lexical_cast.hpp>
00010
00011 #include "EventFilter/StorageManager/interface/Exception.h"
00012 #include "EventFilter/StorageManager/interface/DataSenderMonitorCollection.h"
00013
00014
00015 namespace stor {
00016
00017 DataSenderMonitorCollection::DataSenderMonitorCollection
00018 (
00019 const utils::Duration_t& updateInterval,
00020 AlarmHandlerPtr ah
00021 ) :
00022 MonitorCollection(updateInterval),
00023 connectedRBs_(0),
00024 connectedEPs_(0),
00025 activeEPs_(0),
00026 outstandingDataDiscards_(0),
00027 outstandingDQMDiscards_(0),
00028 faultyEvents_(0),
00029 ignoredDiscards_(0),
00030 updateInterval_(updateInterval),
00031 alarmHandler_(ah)
00032 {}
00033
00034
00035 void DataSenderMonitorCollection::addFragmentSample(I2OChain const& i2oChain)
00036 {
00037
00038 if (i2oChain.messageCode() != Header::INIT &&
00039 i2oChain.messageCode() != Header::EVENT) {return;}
00040 if (i2oChain.fragmentCount() != 1) {return;}
00041
00042
00043
00044
00045
00046 bool pointersAreValid;
00047 RBRecordPtr rbRecordPtr;
00048 FURecordPtr fuRecordPtr;
00049 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00050 {
00051 boost::mutex::scoped_lock sl(collectionsMutex_);
00052 pointersAreValid = getAllNeededPointers(
00053 i2oChain, rbRecordPtr, fuRecordPtr,
00054 topLevelOutModPtr, rbSpecificOutModPtr,
00055 fuSpecificOutModPtr);
00056 }
00057
00058
00059
00060
00061
00062
00063
00064
00065 }
00066
00067
00068 void DataSenderMonitorCollection::addInitSample(I2OChain const& i2oChain)
00069 {
00070
00071 if (i2oChain.messageCode() != Header::INIT) {return;}
00072 if (! i2oChain.complete()) {return;}
00073
00074
00075 std::string outModName = i2oChain.outputModuleLabel();
00076 uint32_t msgSize = i2oChain.totalDataSize();
00077
00078
00079 bool pointersAreValid;
00080 RBRecordPtr rbRecordPtr;
00081 FURecordPtr fuRecordPtr;
00082 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00083 {
00084 boost::mutex::scoped_lock sl(collectionsMutex_);
00085 pointersAreValid = getAllNeededPointers(
00086 i2oChain, rbRecordPtr, fuRecordPtr,
00087 topLevelOutModPtr, rbSpecificOutModPtr,
00088 fuSpecificOutModPtr);
00089 }
00090
00091
00092 if (pointersAreValid)
00093 {
00094 topLevelOutModPtr->name = outModName;
00095 topLevelOutModPtr->initMsgSize = msgSize;
00096
00097 ++rbRecordPtr->initMsgCount;
00098 rbSpecificOutModPtr->name = outModName;
00099 rbSpecificOutModPtr->initMsgSize = msgSize;
00100
00101 ++fuRecordPtr->initMsgCount;
00102 fuSpecificOutModPtr->name = outModName;
00103 fuSpecificOutModPtr->initMsgSize = msgSize;
00104 }
00105 }
00106
00107
00108 void DataSenderMonitorCollection::addEventSample(I2OChain const& i2oChain)
00109 {
00110
00111 if (i2oChain.messageCode() != Header::EVENT) {return;}
00112 if (! i2oChain.complete()) {return;}
00113
00114
00115 double eventSize = static_cast<double>(i2oChain.totalDataSize());
00116 uint32_t runNumber = i2oChain.runNumber();
00117 uint32_t eventNumber = i2oChain.eventNumber();
00118
00119
00120 bool pointersAreValid;
00121 RBRecordPtr rbRecordPtr;
00122 FURecordPtr fuRecordPtr;
00123 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00124 {
00125 boost::mutex::scoped_lock sl(collectionsMutex_);
00126 pointersAreValid = getAllNeededPointers(
00127 i2oChain, rbRecordPtr, fuRecordPtr,
00128 topLevelOutModPtr, rbSpecificOutModPtr,
00129 fuSpecificOutModPtr);
00130 }
00131
00132
00133 if (pointersAreValid)
00134 {
00135 topLevelOutModPtr->eventSize.addSample(eventSize);
00136
00137 rbRecordPtr->lastRunNumber = runNumber;
00138 rbRecordPtr->lastEventNumber = eventNumber;
00139 rbRecordPtr->eventSize.addSample(eventSize);
00140 rbSpecificOutModPtr->eventSize.addSample(eventSize);
00141
00142 fuRecordPtr->lastRunNumber = runNumber;
00143 fuRecordPtr->lastEventNumber = eventNumber;
00144 fuRecordPtr->shortIntervalEventSize.addSample(eventSize);
00145 fuRecordPtr->mediumIntervalEventSize.addSample(eventSize);
00146 fuSpecificOutModPtr->eventSize.addSample(eventSize);
00147 }
00148 }
00149
00150
00151 void DataSenderMonitorCollection::addDQMEventSample(I2OChain const& i2oChain)
00152 {
00153
00154 if (i2oChain.messageCode() != Header::DQM_EVENT) {return;}
00155 if (! i2oChain.complete()) {return;}
00156
00157
00158 double eventSize = static_cast<double>(i2oChain.totalDataSize());
00159
00160
00161 bool pointersAreValid;
00162 RBRecordPtr rbRecordPtr;
00163 FURecordPtr fuRecordPtr;
00164 {
00165 boost::mutex::scoped_lock sl(collectionsMutex_);
00166 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00167 if (pointersAreValid)
00168 {
00169 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00170 }
00171 }
00172
00173
00174 if (pointersAreValid)
00175 {
00176 rbRecordPtr->dqmEventSize.addSample(eventSize);
00177 fuRecordPtr->dqmEventSize.addSample(eventSize);
00178 }
00179 }
00180
00181
00182 void DataSenderMonitorCollection::addErrorEventSample(I2OChain const& i2oChain)
00183 {
00184
00185 if (i2oChain.messageCode() != Header::ERROR_EVENT) {return;}
00186 if (! i2oChain.complete()) {return;}
00187
00188
00189 double eventSize = static_cast<double>(i2oChain.totalDataSize());
00190
00191
00192 bool pointersAreValid;
00193 RBRecordPtr rbRecordPtr;
00194 FURecordPtr fuRecordPtr;
00195 {
00196 boost::mutex::scoped_lock sl(collectionsMutex_);
00197 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00198 if (pointersAreValid)
00199 {
00200 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00201 }
00202 }
00203
00204
00205 if (pointersAreValid)
00206 {
00207 rbRecordPtr->errorEventSize.addSample(eventSize);
00208 fuRecordPtr->errorEventSize.addSample(eventSize);
00209 }
00210 }
00211
00212
00213 void DataSenderMonitorCollection::addFaultyEventSample(I2OChain const& i2oChain)
00214 {
00215
00216 double eventSize = static_cast<double>(i2oChain.totalDataSize());
00217
00218
00219 bool pointersAreValid;
00220 RBRecordPtr rbRecordPtr;
00221 FURecordPtr fuRecordPtr;
00222 {
00223 boost::mutex::scoped_lock sl(collectionsMutex_);
00224 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00225 if (pointersAreValid)
00226 {
00227 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00228 }
00229 }
00230
00231
00232 if (pointersAreValid)
00233 {
00234 if (i2oChain.messageCode() == Header::DQM_EVENT)
00235 {
00236 rbRecordPtr->faultyDQMEventSize.addSample(eventSize);
00237 fuRecordPtr->faultyDQMEventSize.addSample(eventSize);
00238 }
00239 else
00240 {
00241 rbRecordPtr->faultyEventSize.addSample(eventSize);
00242 fuRecordPtr->faultyEventSize.addSample(eventSize);
00243 }
00244 }
00245 }
00246
00247
00248 void DataSenderMonitorCollection::incrementDataDiscardCount(I2OChain const& i2oChain)
00249 {
00250
00251 bool pointersAreValid;
00252 RBRecordPtr rbRecordPtr;
00253 FURecordPtr fuRecordPtr;
00254 {
00255 boost::mutex::scoped_lock sl(collectionsMutex_);
00256 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00257 if (pointersAreValid)
00258 {
00259 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00260 }
00261 }
00262
00263
00264 if (pointersAreValid)
00265 {
00266 rbRecordPtr->dataDiscardCount.addSample(1);
00267 fuRecordPtr->dataDiscardCount.addSample(1);
00268 }
00269 }
00270
00271
00272 void DataSenderMonitorCollection::incrementDQMDiscardCount(I2OChain const& i2oChain)
00273 {
00274
00275 bool pointersAreValid;
00276 RBRecordPtr rbRecordPtr;
00277 FURecordPtr fuRecordPtr;
00278 {
00279 boost::mutex::scoped_lock sl(collectionsMutex_);
00280 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00281 if (pointersAreValid)
00282 {
00283 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00284 }
00285 }
00286
00287
00288 if (pointersAreValid)
00289 {
00290 rbRecordPtr->dqmDiscardCount.addSample(1);
00291 fuRecordPtr->dqmDiscardCount.addSample(1);
00292 }
00293 }
00294
00295
00296 void DataSenderMonitorCollection::incrementSkippedDiscardCount(I2OChain const& i2oChain)
00297 {
00298
00299 bool pointersAreValid;
00300 RBRecordPtr rbRecordPtr;
00301 FURecordPtr fuRecordPtr;
00302 {
00303 boost::mutex::scoped_lock sl(collectionsMutex_);
00304 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00305 if (pointersAreValid)
00306 {
00307 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00308 }
00309 }
00310
00311
00312 if (pointersAreValid)
00313 {
00314 rbRecordPtr->skippedDiscardCount.addSample(1);
00315 fuRecordPtr->skippedDiscardCount.addSample(1);
00316 }
00317 }
00318
00319
00320 DataSenderMonitorCollection::OutputModuleResultsList
00321 DataSenderMonitorCollection::getTopLevelOutputModuleResults() const
00322 {
00323 boost::mutex::scoped_lock sl(collectionsMutex_);
00324
00325 return buildOutputModuleResults(outputModuleMap_);
00326 }
00327
00328
00329 DataSenderMonitorCollection::ResourceBrokerResultsList
00330 DataSenderMonitorCollection::getAllResourceBrokerResults() const
00331 {
00332 boost::mutex::scoped_lock sl(collectionsMutex_);
00333 ResourceBrokerResultsList resultsList;
00334
00335 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00336 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00337 resourceBrokerMap_.end();
00338 for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
00339 {
00340 RBRecordPtr rbRecordPtr = rbMapIter->second;
00341 RBResultPtr result = buildResourceBrokerResult(rbRecordPtr);
00342 result->uniqueRBID = rbMapIter->first;
00343 resultsList.push_back(result);
00344 }
00345
00346 return resultsList;
00347 }
00348
00349
00350 DataSenderMonitorCollection::RBResultPtr
00351 DataSenderMonitorCollection::getOneResourceBrokerResult(UniqueResourceBrokerID_t uniqueRBID) const
00352 {
00353 boost::mutex::scoped_lock sl(collectionsMutex_);
00354 RBResultPtr result;
00355
00356 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00357 rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00358 if (rbMapIter != resourceBrokerMap_.end())
00359 {
00360 RBRecordPtr rbRecordPtr = rbMapIter->second;
00361 result = buildResourceBrokerResult(rbRecordPtr);
00362 result->uniqueRBID = rbMapIter->first;
00363 }
00364
00365 return result;
00366 }
00367
00368
00369 DataSenderMonitorCollection::OutputModuleResultsList
00370 DataSenderMonitorCollection::getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
00371 {
00372 boost::mutex::scoped_lock sl(collectionsMutex_);
00373 OutputModuleResultsList resultsList;
00374
00375 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00376 rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00377 if (rbMapIter != resourceBrokerMap_.end())
00378 {
00379 RBRecordPtr rbRecordPtr = rbMapIter->second;
00380 resultsList = buildOutputModuleResults(rbRecordPtr->outputModuleMap);
00381 }
00382
00383 return resultsList;
00384 }
00385
00386
00387 DataSenderMonitorCollection::FilterUnitResultsList
00388 DataSenderMonitorCollection::getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
00389 {
00390 boost::mutex::scoped_lock sl(collectionsMutex_);
00391 FilterUnitResultsList resultsList;
00392
00393 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00394 rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00395 if (rbMapIter != resourceBrokerMap_.end())
00396 {
00397 RBRecordPtr rbRecordPtr = rbMapIter->second;
00398 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00399 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00400 rbRecordPtr->filterUnitMap.end();
00401 for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
00402 fuMapIter != fuMapEnd; ++fuMapIter)
00403 {
00404 FURecordPtr fuRecordPtr = fuMapIter->second;
00405 FUResultPtr result(new FilterUnitResult(fuRecordPtr->key));
00406 result->initMsgCount = fuRecordPtr->initMsgCount;
00407 result->lastRunNumber = fuRecordPtr->lastRunNumber;
00408 result->lastEventNumber = fuRecordPtr->lastEventNumber;
00409 fuRecordPtr->shortIntervalEventSize.getStats(result->shortIntervalEventStats);
00410 fuRecordPtr->mediumIntervalEventSize.getStats(result->mediumIntervalEventStats);
00411 fuRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
00412 fuRecordPtr->errorEventSize.getStats(result->errorEventStats);
00413 fuRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
00414 fuRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
00415 fuRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
00416 fuRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
00417 fuRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
00418
00419 result->outstandingDataDiscardCount =
00420 result->initMsgCount +
00421 result->shortIntervalEventStats.getSampleCount() +
00422 result->errorEventStats.getSampleCount() +
00423 result->faultyEventStats.getSampleCount() -
00424 result->dataDiscardStats.getSampleCount();
00425 result->outstandingDQMDiscardCount =
00426 result->dqmEventStats.getSampleCount() +
00427 result->faultyDQMEventStats.getSampleCount() -
00428 result->dqmDiscardStats.getSampleCount();
00429
00430 resultsList.push_back(result);
00431 }
00432 }
00433
00434 return resultsList;
00435 }
00436
00437
00438 void DataSenderMonitorCollection::do_calculateStatistics()
00439 {
00440 boost::mutex::scoped_lock sl(collectionsMutex_);
00441
00442 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00443 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00444 resourceBrokerMap_.end();
00445 for (rbMapIter=resourceBrokerMap_.begin(); rbMapIter!=rbMapEnd; ++rbMapIter)
00446 {
00447 RBRecordPtr rbRecordPtr = rbMapIter->second;
00448 rbRecordPtr->eventSize.calculateStatistics();
00449 rbRecordPtr->dqmEventSize.calculateStatistics();
00450 rbRecordPtr->errorEventSize.calculateStatistics();
00451 rbRecordPtr->faultyEventSize.calculateStatistics();
00452 rbRecordPtr->faultyDQMEventSize.calculateStatistics();
00453 rbRecordPtr->dataDiscardCount.calculateStatistics();
00454 rbRecordPtr->dqmDiscardCount.calculateStatistics();
00455 rbRecordPtr->skippedDiscardCount.calculateStatistics();
00456 calcStatsForOutputModules(rbRecordPtr->outputModuleMap);
00457
00458 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00459 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00460 rbRecordPtr->filterUnitMap.end();
00461 for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
00462 fuMapIter != fuMapEnd; ++fuMapIter)
00463 {
00464 FURecordPtr fuRecordPtr = fuMapIter->second;
00465 fuRecordPtr->shortIntervalEventSize.calculateStatistics();
00466 fuRecordPtr->mediumIntervalEventSize.calculateStatistics();
00467 fuRecordPtr->dqmEventSize.calculateStatistics();
00468 fuRecordPtr->errorEventSize.calculateStatistics();
00469 fuRecordPtr->faultyEventSize.calculateStatistics();
00470 fuRecordPtr->faultyDQMEventSize.calculateStatistics();
00471 fuRecordPtr->dataDiscardCount.calculateStatistics();
00472 fuRecordPtr->dqmDiscardCount.calculateStatistics();
00473 fuRecordPtr->skippedDiscardCount.calculateStatistics();
00474 calcStatsForOutputModules(fuRecordPtr->outputModuleMap);
00475 }
00476 }
00477
00478 calcStatsForOutputModules(outputModuleMap_);
00479 }
00480
00481
00482 void DataSenderMonitorCollection::do_reset()
00483 {
00484 boost::mutex::scoped_lock sl(collectionsMutex_);
00485
00486 connectedRBs_ = 0;
00487 connectedEPs_ = 0;
00488 activeEPs_ = 0;
00489 outstandingDataDiscards_ = 0;
00490 outstandingDQMDiscards_ = 0;
00491 faultyEvents_ = 0;
00492 ignoredDiscards_ = 0;
00493 resourceBrokerMap_.clear();
00494 outputModuleMap_.clear();
00495 }
00496
00497
00498 void DataSenderMonitorCollection::do_appendInfoSpaceItems(InfoSpaceItems& infoSpaceItems)
00499 {
00500 infoSpaceItems.push_back(std::make_pair("connectedRBs", &connectedRBs_));
00501 infoSpaceItems.push_back(std::make_pair("connectedEPs", &connectedEPs_));
00502 infoSpaceItems.push_back(std::make_pair("activeEPs", &activeEPs_));
00503 infoSpaceItems.push_back(std::make_pair("outstandingDataDiscards", &outstandingDataDiscards_));
00504 infoSpaceItems.push_back(std::make_pair("outstandingDQMDiscards", &outstandingDQMDiscards_));
00505 infoSpaceItems.push_back(std::make_pair("faultyEvents", &faultyEvents_));
00506 infoSpaceItems.push_back(std::make_pair("ignoredDiscards", &ignoredDiscards_));
00507 }
00508
00509
00510 void DataSenderMonitorCollection::do_updateInfoSpaceItems()
00511 {
00512 boost::mutex::scoped_lock sl(collectionsMutex_);
00513
00514 connectedRBs_ = static_cast<xdata::UnsignedInteger32>(resourceBrokerMap_.size());
00515
00516 uint32_t localEPCount = 0;
00517 uint32_t localActiveEPCount = 0;
00518 int localMissingDataDiscardCount = 0;
00519 int localMissingDQMDiscardCount = 0;
00520 uint32_t localFaultyEventsCount = 0;
00521 uint32_t localIgnoredDiscardCount = 0;
00522 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00523 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00524 resourceBrokerMap_.end();
00525 for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
00526 {
00527 RBRecordPtr rbRecordPtr = rbMapIter->second;
00528 localEPCount += rbRecordPtr->filterUnitMap.size();
00529
00530 MonitoredQuantity::Stats skippedDiscardStats;
00531 rbRecordPtr->skippedDiscardCount.getStats(skippedDiscardStats);
00532 localIgnoredDiscardCount += skippedDiscardStats.getSampleCount();
00533
00534 MonitoredQuantity::Stats eventStats;
00535 MonitoredQuantity::Stats errorEventStats;
00536 MonitoredQuantity::Stats dataDiscardStats;
00537 rbRecordPtr->eventSize.getStats(eventStats);
00538 rbRecordPtr->errorEventSize.getStats(errorEventStats);
00539 rbRecordPtr->dataDiscardCount.getStats(dataDiscardStats);
00540 localMissingDataDiscardCount += rbRecordPtr->initMsgCount + eventStats.getSampleCount() +
00541 errorEventStats.getSampleCount() - dataDiscardStats.getSampleCount();
00542
00543 MonitoredQuantity::Stats dqmEventStats;
00544 MonitoredQuantity::Stats dqmDiscardStats;
00545 rbRecordPtr->dqmEventSize.getStats(dqmEventStats);
00546 rbRecordPtr->dqmDiscardCount.getStats(dqmDiscardStats);
00547 localMissingDQMDiscardCount += dqmEventStats.getSampleCount() -
00548 dqmDiscardStats.getSampleCount();
00549
00550 MonitoredQuantity::Stats faultyEventStats;
00551 rbRecordPtr->faultyEventSize.getStats(faultyEventStats);
00552 localFaultyEventsCount += faultyEventStats.getSampleCount();
00553 MonitoredQuantity::Stats faultyDQMEventStats;
00554 rbRecordPtr->faultyDQMEventSize.getStats(faultyDQMEventStats);
00555 localFaultyEventsCount += faultyDQMEventStats.getSampleCount();
00556
00557 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00558 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00559 rbRecordPtr->filterUnitMap.end();
00560 for (fuMapIter = rbRecordPtr->filterUnitMap.begin(); fuMapIter != fuMapEnd; ++fuMapIter)
00561 {
00562 FURecordPtr fuRecordPtr = fuMapIter->second;
00563 MonitoredQuantity::Stats fuMediumIntervalEventStats;
00564 fuRecordPtr->mediumIntervalEventSize.getStats(fuMediumIntervalEventStats);
00565 if (fuMediumIntervalEventStats.getSampleCount(MonitoredQuantity::RECENT) > 0) {
00566 ++localActiveEPCount;
00567 }
00568 }
00569 }
00570 connectedEPs_ = static_cast<xdata::UnsignedInteger32>(localEPCount);
00571 activeEPs_ = static_cast<xdata::UnsignedInteger32>(localActiveEPCount);
00572 outstandingDataDiscards_ = static_cast<xdata::Integer32>(localMissingDataDiscardCount);
00573 outstandingDQMDiscards_ = static_cast<xdata::Integer32>(localMissingDQMDiscardCount);
00574 faultyEvents_ = static_cast<xdata::UnsignedInteger32>(localFaultyEventsCount);
00575 ignoredDiscards_ = static_cast<xdata::UnsignedInteger32>(localIgnoredDiscardCount);
00576
00577 faultyEventsAlarm(localFaultyEventsCount);
00578 ignoredDiscardAlarm(localIgnoredDiscardCount);
00579 }
00580
00581
00582 void DataSenderMonitorCollection::faultyEventsAlarm(const uint32_t& faultyEventsCount) const
00583 {
00584 const std::string alarmName = "FaultyEvents";
00585
00586 if (faultyEventsCount > 0)
00587 {
00588 std::ostringstream msg;
00589 msg << "Missing or faulty I2O fragments for " <<
00590 faultyEventsCount <<
00591 " events. These events are lost!";
00592 XCEPT_DECLARE(stor::exception::FaultyEvents, ex, msg.str());
00593 alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
00594 }
00595 else
00596 {
00597 alarmHandler_->revokeAlarm(alarmName);
00598 }
00599 }
00600
00601
00602 void DataSenderMonitorCollection::ignoredDiscardAlarm(const uint32_t& ignoredDiscardCount) const
00603 {
00604 const std::string alarmName = "IgnoredDiscard";
00605
00606 if ( ignoredDiscardCount > 0)
00607 {
00608 std::ostringstream msg;
00609 msg << ignoredDiscardCount <<
00610 " discard messages ignored. These events might be stuck in the resource broker.";
00611 XCEPT_DECLARE(stor::exception::IgnoredDiscard, ex, msg.str());
00612 alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
00613 }
00614 else
00615 {
00616 alarmHandler_->revokeAlarm(alarmName);
00617 }
00618 }
00619
00620
00621 typedef DataSenderMonitorCollection DSMC;
00622
00623 bool DSMC::getAllNeededPointers
00624 (
00625 I2OChain const& i2oChain,
00626 DSMC::RBRecordPtr& rbRecordPtr,
00627 DSMC::FURecordPtr& fuRecordPtr,
00628 DSMC::OutModRecordPtr& topLevelOutModPtr,
00629 DSMC::OutModRecordPtr& rbSpecificOutModPtr,
00630 DSMC::OutModRecordPtr& fuSpecificOutModPtr
00631 )
00632 {
00633 ResourceBrokerKey rbKey(i2oChain);
00634 if (! rbKey.isValid) {return false;}
00635 FilterUnitKey fuKey(i2oChain);
00636 if (! fuKey.isValid) {return false;}
00637 OutputModuleKey outModKey = i2oChain.outputModuleId();
00638
00639 topLevelOutModPtr = getOutputModuleRecord(outputModuleMap_, outModKey);
00640
00641 rbRecordPtr = getResourceBrokerRecord(rbKey);
00642 rbSpecificOutModPtr = getOutputModuleRecord(
00643 rbRecordPtr->outputModuleMap,
00644 outModKey);
00645
00646 fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
00647 fuSpecificOutModPtr = getOutputModuleRecord(
00648 fuRecordPtr->outputModuleMap,
00649 outModKey);
00650
00651 return true;
00652 }
00653
00654
00655 bool DSMC::getRBRecordPointer
00656 (
00657 I2OChain const& i2oChain,
00658 DSMC::RBRecordPtr& rbRecordPtr
00659 )
00660 {
00661 ResourceBrokerKey rbKey(i2oChain);
00662 if (! rbKey.isValid) {return false;}
00663
00664 rbRecordPtr = getResourceBrokerRecord(rbKey);
00665 return true;
00666 }
00667
00668
00669 bool DSMC::getFURecordPointer
00670 (
00671 I2OChain const& i2oChain,
00672 DSMC::RBRecordPtr& rbRecordPtr,
00673 DSMC::FURecordPtr& fuRecordPtr
00674 )
00675 {
00676 FilterUnitKey fuKey(i2oChain);
00677 if (! fuKey.isValid) {return false;}
00678
00679 fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
00680 return true;
00681 }
00682
00683
00684 DSMC::RBRecordPtr
00685 DSMC::getResourceBrokerRecord(DSMC::ResourceBrokerKey const& rbKey)
00686 {
00687 RBRecordPtr rbRecordPtr;
00688 UniqueResourceBrokerID_t uniqueRBID = getUniqueResourceBrokerID(rbKey);
00689 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00690 rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00691 if (rbMapIter == resourceBrokerMap_.end())
00692 {
00693 rbRecordPtr.reset(new ResourceBrokerRecord(rbKey,updateInterval_));
00694 resourceBrokerMap_[uniqueRBID] = rbRecordPtr;
00695 }
00696 else
00697 {
00698 rbRecordPtr = rbMapIter->second;
00699 }
00700 return rbRecordPtr;
00701 }
00702
00703
00704 DSMC::UniqueResourceBrokerID_t
00705 DSMC::getUniqueResourceBrokerID(DSMC::ResourceBrokerKey const& rbKey)
00706 {
00707 UniqueResourceBrokerID_t uniqueID;
00708 std::map<ResourceBrokerKey, UniqueResourceBrokerID_t>::const_iterator rbMapIter;
00709 rbMapIter = resourceBrokerIDs_.find(rbKey);
00710 if (rbMapIter == resourceBrokerIDs_.end())
00711 {
00712 std::string workString = rbKey.hltURL +
00713 boost::lexical_cast<std::string>(rbKey.hltTid) +
00714 boost::lexical_cast<std::string>(rbKey.hltInstance) +
00715 boost::lexical_cast<std::string>(rbKey.hltLocalId) +
00716 rbKey.hltClassName;
00717 uLong crc = crc32(0L, Z_NULL, 0);
00718 Bytef* crcbuf = (Bytef*) workString.data();
00719 crc = crc32(crc, crcbuf, workString.length());
00720 uniqueID = static_cast<UniqueResourceBrokerID_t>(crc);
00721 resourceBrokerIDs_[rbKey] = uniqueID;
00722 }
00723 else
00724 {
00725 uniqueID = rbMapIter->second;
00726 }
00727 return uniqueID;
00728 }
00729
00730
00731 DSMC::FURecordPtr
00732 DSMC::getFilterUnitRecord
00733 (
00734 DSMC::RBRecordPtr& rbRecordPtr,
00735 DSMC::FilterUnitKey const& fuKey
00736 )
00737 {
00738 FURecordPtr fuRecordPtr;
00739 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00740 fuMapIter = rbRecordPtr->filterUnitMap.find(fuKey);
00741 if (fuMapIter == rbRecordPtr->filterUnitMap.end())
00742 {
00743 fuRecordPtr.reset(new FilterUnitRecord(fuKey,updateInterval_));
00744 rbRecordPtr->filterUnitMap[fuKey] = fuRecordPtr;
00745 }
00746 else
00747 {
00748 fuRecordPtr = fuMapIter->second;
00749 }
00750 return fuRecordPtr;
00751 }
00752
00753
00754 DSMC::OutModRecordPtr
00755 DSMC::getOutputModuleRecord
00756 (
00757 OutputModuleRecordMap& outModMap,
00758 DSMC::OutputModuleKey const& outModKey
00759 )
00760 {
00761 OutModRecordPtr outModRecordPtr;
00762 OutputModuleRecordMap::const_iterator omMapIter;
00763 omMapIter = outModMap.find(outModKey);
00764 if (omMapIter == outModMap.end())
00765 {
00766 outModRecordPtr.reset(new OutputModuleRecord(updateInterval_));
00767
00768 outModRecordPtr->name = "Unknown";
00769 outModRecordPtr->id = outModKey;
00770 outModRecordPtr->initMsgSize = 0;
00771
00772 outModMap[outModKey] = outModRecordPtr;
00773 }
00774 else
00775 {
00776 outModRecordPtr = omMapIter->second;
00777 }
00778 return outModRecordPtr;
00779 }
00780
00781
00782 DSMC::OutputModuleResultsList
00783 DSMC::buildOutputModuleResults(DSMC::OutputModuleRecordMap const& outputModuleMap) const
00784 {
00785 OutputModuleResultsList resultsList;
00786
00787 OutputModuleRecordMap::const_iterator omMapIter;
00788 OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
00789 for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
00790 {
00791 OutModRecordPtr outModRecordPtr = omMapIter->second;
00792 boost::shared_ptr<OutputModuleResult> result(new OutputModuleResult());
00793 result->name = outModRecordPtr->name;
00794 result->id = outModRecordPtr->id;
00795 result->initMsgSize = outModRecordPtr->initMsgSize;
00796 outModRecordPtr->eventSize.getStats(result->eventStats);
00797 resultsList.push_back(result);
00798 }
00799
00800 return resultsList;
00801 }
00802
00803
00804 DSMC::RBResultPtr
00805 DSMC::buildResourceBrokerResult(DSMC::RBRecordPtr const& rbRecordPtr) const
00806 {
00807 RBResultPtr result(new ResourceBrokerResult(rbRecordPtr->key));
00808
00809 result->filterUnitCount = rbRecordPtr->filterUnitMap.size();
00810 result->initMsgCount = rbRecordPtr->initMsgCount;
00811 result->lastRunNumber = rbRecordPtr->lastRunNumber;
00812 result->lastEventNumber = rbRecordPtr->lastEventNumber;
00813 rbRecordPtr->eventSize.getStats(result->eventStats);
00814 rbRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
00815 rbRecordPtr->errorEventSize.getStats(result->errorEventStats);
00816 rbRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
00817 rbRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
00818 rbRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
00819 rbRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
00820 rbRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
00821
00822 result->outstandingDataDiscardCount =
00823 result->initMsgCount +
00824 result->eventStats.getSampleCount() +
00825 result->errorEventStats.getSampleCount() +
00826 result->faultyEventStats.getSampleCount() -
00827 result->dataDiscardStats.getSampleCount();
00828 result->outstandingDQMDiscardCount =
00829 result->dqmEventStats.getSampleCount() +
00830 result->faultyDQMEventStats.getSampleCount() -
00831 result->dqmDiscardStats.getSampleCount();
00832
00833 return result;
00834 }
00835
00836
00837 void DSMC::calcStatsForOutputModules(DSMC::OutputModuleRecordMap& outputModuleMap)
00838 {
00839 OutputModuleRecordMap::const_iterator omMapIter;
00840 OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
00841 for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
00842 {
00843 OutModRecordPtr outModRecordPtr = omMapIter->second;
00844
00845
00846 outModRecordPtr->eventSize.calculateStatistics();
00847 }
00848 }
00849
00850
00851 bool compareRBResultPtrValues
00852 (
00853 DSMC::RBResultPtr firstValue,
00854 DSMC::RBResultPtr secondValue
00855 )
00856 {
00857 return *firstValue < *secondValue;
00858 }
00859
00860 }
00861
00862