00001
00003
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007
00008 #include <zlib.h>
00009 #include <boost/lexical_cast.hpp>
00010
00011 #include "EventFilter/StorageManager/interface/Exception.h"
00012 #include "EventFilter/StorageManager/interface/DataSenderMonitorCollection.h"
00013
00014
00015 namespace stor {
00016
00017 DataSenderMonitorCollection::DataSenderMonitorCollection
00018 (
00019 const utils::Duration_t& updateInterval,
00020 AlarmHandlerPtr ah
00021 ) :
00022 MonitorCollection(updateInterval),
00023 connectedRBs_(0),
00024 connectedEPs_(0),
00025 activeEPs_(0),
00026 outstandingDataDiscards_(0),
00027 outstandingDQMDiscards_(0),
00028 faultyEvents_(0),
00029 ignoredDiscards_(0),
00030 updateInterval_(updateInterval),
00031 alarmHandler_(ah)
00032 {}
00033
00034
00035 void DataSenderMonitorCollection::addFragmentSample(I2OChain const& i2oChain)
00036 {
00037
00038 if (i2oChain.messageCode() != Header::INIT &&
00039 i2oChain.messageCode() != Header::EVENT) {return;}
00040 if (i2oChain.fragmentCount() != 1) {return;}
00041
00042
00043
00044
00045
00046 bool pointersAreValid;
00047 RBRecordPtr rbRecordPtr;
00048 FURecordPtr fuRecordPtr;
00049 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00050 {
00051 boost::mutex::scoped_lock sl(collectionsMutex_);
00052 pointersAreValid = getAllNeededPointers(
00053 i2oChain, rbRecordPtr, fuRecordPtr,
00054 topLevelOutModPtr, rbSpecificOutModPtr,
00055 fuSpecificOutModPtr);
00056 }
00057
00058
00059
00060
00061
00062
00063
00064
00065 }
00066
00067
00068 void DataSenderMonitorCollection::addInitSample(I2OChain const& i2oChain)
00069 {
00070
00071 if (i2oChain.messageCode() != Header::INIT) {return;}
00072 if (! i2oChain.complete()) {return;}
00073
00074
00075 std::string outModName = i2oChain.outputModuleLabel();
00076 uint32_t msgSize = i2oChain.totalDataSize();
00077
00078
00079 bool pointersAreValid;
00080 RBRecordPtr rbRecordPtr;
00081 FURecordPtr fuRecordPtr;
00082 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00083 {
00084 boost::mutex::scoped_lock sl(collectionsMutex_);
00085 pointersAreValid = getAllNeededPointers(
00086 i2oChain, rbRecordPtr, fuRecordPtr,
00087 topLevelOutModPtr, rbSpecificOutModPtr,
00088 fuSpecificOutModPtr);
00089 }
00090
00091
00092 if (pointersAreValid)
00093 {
00094 topLevelOutModPtr->name = outModName;
00095 topLevelOutModPtr->initMsgSize = msgSize;
00096
00097 ++rbRecordPtr->initMsgCount;
00098 rbSpecificOutModPtr->name = outModName;
00099 rbSpecificOutModPtr->initMsgSize = msgSize;
00100
00101 ++fuRecordPtr->initMsgCount;
00102 fuSpecificOutModPtr->name = outModName;
00103 fuSpecificOutModPtr->initMsgSize = msgSize;
00104 }
00105 }
00106
00107
00108 void DataSenderMonitorCollection::addEventSample(I2OChain const& i2oChain)
00109 {
00110
00111 if (i2oChain.messageCode() != Header::EVENT) {return;}
00112 if (! i2oChain.complete()) {return;}
00113
00114
00115 double eventSize = static_cast<double>(i2oChain.totalDataSize());
00116 uint32_t runNumber = i2oChain.runNumber();
00117 uint32_t eventNumber = i2oChain.eventNumber();
00118
00119
00120 bool pointersAreValid;
00121 RBRecordPtr rbRecordPtr;
00122 FURecordPtr fuRecordPtr;
00123 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00124 {
00125 boost::mutex::scoped_lock sl(collectionsMutex_);
00126 pointersAreValid = getAllNeededPointers(
00127 i2oChain, rbRecordPtr, fuRecordPtr,
00128 topLevelOutModPtr, rbSpecificOutModPtr,
00129 fuSpecificOutModPtr);
00130 }
00131
00132
00133 if (pointersAreValid)
00134 {
00135 topLevelOutModPtr->eventSize.addSample(eventSize);
00136
00137 rbRecordPtr->lastRunNumber = runNumber;
00138 rbRecordPtr->lastEventNumber = eventNumber;
00139 rbRecordPtr->eventSize.addSample(eventSize);
00140 rbSpecificOutModPtr->eventSize.addSample(eventSize);
00141
00142 fuRecordPtr->lastRunNumber = runNumber;
00143 fuRecordPtr->lastEventNumber = eventNumber;
00144 fuRecordPtr->shortIntervalEventSize.addSample(eventSize);
00145 fuRecordPtr->mediumIntervalEventSize.addSample(eventSize);
00146 fuSpecificOutModPtr->eventSize.addSample(eventSize);
00147 }
00148 }
00149
00150
00151 void DataSenderMonitorCollection::addDQMEventSample(I2OChain const& i2oChain)
00152 {
00153
00154 if (i2oChain.messageCode() != Header::DQM_EVENT) {return;}
00155 if (! i2oChain.complete()) {return;}
00156
00157
00158 double eventSize = static_cast<double>(i2oChain.totalDataSize());
00159
00160
00161 bool pointersAreValid;
00162 RBRecordPtr rbRecordPtr;
00163 FURecordPtr fuRecordPtr;
00164 {
00165 boost::mutex::scoped_lock sl(collectionsMutex_);
00166 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00167 if (pointersAreValid)
00168 {
00169 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00170 }
00171 }
00172
00173
00174 if (pointersAreValid)
00175 {
00176 rbRecordPtr->dqmEventSize.addSample(eventSize);
00177 fuRecordPtr->dqmEventSize.addSample(eventSize);
00178 }
00179 }
00180
00181
00182 void DataSenderMonitorCollection::addErrorEventSample(I2OChain const& i2oChain)
00183 {
00184
00185 if (i2oChain.messageCode() != Header::ERROR_EVENT) {return;}
00186 if (! i2oChain.complete()) {return;}
00187
00188
00189 double eventSize = static_cast<double>(i2oChain.totalDataSize());
00190
00191
00192 bool pointersAreValid;
00193 RBRecordPtr rbRecordPtr;
00194 FURecordPtr fuRecordPtr;
00195 {
00196 boost::mutex::scoped_lock sl(collectionsMutex_);
00197 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00198 if (pointersAreValid)
00199 {
00200 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00201 }
00202 }
00203
00204
00205 if (pointersAreValid)
00206 {
00207 rbRecordPtr->errorEventSize.addSample(eventSize);
00208 fuRecordPtr->errorEventSize.addSample(eventSize);
00209 }
00210 }
00211
00212
00213 void DataSenderMonitorCollection::addFaultyEventSample(I2OChain const& i2oChain)
00214 {
00215
00216 double eventSize = static_cast<double>(i2oChain.totalDataSize());
00217
00218
00219 bool pointersAreValid;
00220 RBRecordPtr rbRecordPtr;
00221 FURecordPtr fuRecordPtr;
00222 {
00223 boost::mutex::scoped_lock sl(collectionsMutex_);
00224 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00225 if (pointersAreValid)
00226 {
00227 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00228 }
00229 }
00230
00231
00232 if (pointersAreValid)
00233 {
00234 if (i2oChain.messageCode() == Header::DQM_EVENT)
00235 {
00236 rbRecordPtr->faultyDQMEventSize.addSample(eventSize);
00237 fuRecordPtr->faultyDQMEventSize.addSample(eventSize);
00238 }
00239 else
00240 {
00241 rbRecordPtr->faultyEventSize.addSample(eventSize);
00242 fuRecordPtr->faultyEventSize.addSample(eventSize);
00243 }
00244 }
00245 }
00246
00247
00248 void DataSenderMonitorCollection::incrementDataDiscardCount(I2OChain const& i2oChain)
00249 {
00250
00251 bool pointersAreValid;
00252 RBRecordPtr rbRecordPtr;
00253 FURecordPtr fuRecordPtr;
00254 {
00255 boost::mutex::scoped_lock sl(collectionsMutex_);
00256 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00257 if (pointersAreValid)
00258 {
00259 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00260 }
00261 }
00262
00263
00264 if (pointersAreValid)
00265 {
00266 rbRecordPtr->dataDiscardCount.addSample(1);
00267 fuRecordPtr->dataDiscardCount.addSample(1);
00268 }
00269 }
00270
00271
00272 void DataSenderMonitorCollection::incrementDQMDiscardCount(I2OChain const& i2oChain)
00273 {
00274
00275 bool pointersAreValid;
00276 RBRecordPtr rbRecordPtr;
00277 FURecordPtr fuRecordPtr;
00278 {
00279 boost::mutex::scoped_lock sl(collectionsMutex_);
00280 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00281 if (pointersAreValid)
00282 {
00283 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00284 }
00285 }
00286
00287
00288 if (pointersAreValid)
00289 {
00290 rbRecordPtr->dqmDiscardCount.addSample(1);
00291 fuRecordPtr->dqmDiscardCount.addSample(1);
00292 }
00293 }
00294
00295
00296 void DataSenderMonitorCollection::incrementSkippedDiscardCount(I2OChain const& i2oChain)
00297 {
00298
00299 bool pointersAreValid;
00300 RBRecordPtr rbRecordPtr;
00301 FURecordPtr fuRecordPtr;
00302 {
00303 boost::mutex::scoped_lock sl(collectionsMutex_);
00304 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00305 if (pointersAreValid)
00306 {
00307 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00308 }
00309 }
00310
00311
00312 if (pointersAreValid)
00313 {
00314 rbRecordPtr->skippedDiscardCount.addSample(1);
00315 fuRecordPtr->skippedDiscardCount.addSample(1);
00316 }
00317 }
00318
00319
00320 DataSenderMonitorCollection::OutputModuleResultsList
00321 DataSenderMonitorCollection::getTopLevelOutputModuleResults() const
00322 {
00323 boost::mutex::scoped_lock sl(collectionsMutex_);
00324
00325 return buildOutputModuleResults(outputModuleMap_);
00326 }
00327
00328
00329 DataSenderMonitorCollection::ResourceBrokerResultsList
00330 DataSenderMonitorCollection::getAllResourceBrokerResults() const
00331 {
00332 boost::mutex::scoped_lock sl(collectionsMutex_);
00333 ResourceBrokerResultsList resultsList;
00334
00335 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00336 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00337 resourceBrokerMap_.end();
00338 for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
00339 {
00340 RBRecordPtr rbRecordPtr = rbMapIter->second;
00341 RBResultPtr result = buildResourceBrokerResult(rbRecordPtr);
00342 result->uniqueRBID = rbMapIter->first;
00343 resultsList.push_back(result);
00344 }
00345
00346 return resultsList;
00347 }
00348
00349
00350 DataSenderMonitorCollection::RBResultPtr
00351 DataSenderMonitorCollection::getOneResourceBrokerResult(UniqueResourceBrokerID_t uniqueRBID) const
00352 {
00353 boost::mutex::scoped_lock sl(collectionsMutex_);
00354 RBResultPtr result;
00355
00356 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00357 rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00358 if (rbMapIter != resourceBrokerMap_.end())
00359 {
00360 RBRecordPtr rbRecordPtr = rbMapIter->second;
00361 result = buildResourceBrokerResult(rbRecordPtr);
00362 result->uniqueRBID = rbMapIter->first;
00363 }
00364
00365 return result;
00366 }
00367
00368
00369 DataSenderMonitorCollection::OutputModuleResultsList
00370 DataSenderMonitorCollection::getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
00371 {
00372 boost::mutex::scoped_lock sl(collectionsMutex_);
00373 OutputModuleResultsList resultsList;
00374
00375 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00376 rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00377 if (rbMapIter != resourceBrokerMap_.end())
00378 {
00379 RBRecordPtr rbRecordPtr = rbMapIter->second;
00380 resultsList = buildOutputModuleResults(rbRecordPtr->outputModuleMap);
00381 }
00382
00383 return resultsList;
00384 }
00385
00386
00387 DataSenderMonitorCollection::FilterUnitResultsList
00388 DataSenderMonitorCollection::getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
00389 {
00390 boost::mutex::scoped_lock sl(collectionsMutex_);
00391 FilterUnitResultsList resultsList;
00392
00393 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00394 rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00395 if (rbMapIter != resourceBrokerMap_.end())
00396 {
00397 RBRecordPtr rbRecordPtr = rbMapIter->second;
00398 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00399 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00400 rbRecordPtr->filterUnitMap.end();
00401 for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
00402 fuMapIter != fuMapEnd; ++fuMapIter)
00403 {
00404 FURecordPtr fuRecordPtr = fuMapIter->second;
00405 FUResultPtr result(new FilterUnitResult(fuRecordPtr->key));
00406 result->initMsgCount = fuRecordPtr->initMsgCount;
00407 result->lastRunNumber = fuRecordPtr->lastRunNumber;
00408 result->lastEventNumber = fuRecordPtr->lastEventNumber;
00409 fuRecordPtr->shortIntervalEventSize.getStats(result->shortIntervalEventStats);
00410 fuRecordPtr->mediumIntervalEventSize.getStats(result->mediumIntervalEventStats);
00411 fuRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
00412 fuRecordPtr->errorEventSize.getStats(result->errorEventStats);
00413 fuRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
00414 fuRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
00415 fuRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
00416 fuRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
00417 fuRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
00418
00419 result->outstandingDataDiscardCount =
00420 result->initMsgCount +
00421 result->shortIntervalEventStats.getSampleCount() +
00422 result->errorEventStats.getSampleCount() +
00423 result->faultyEventStats.getSampleCount() -
00424 result->dataDiscardStats.getSampleCount();
00425 result->outstandingDQMDiscardCount =
00426 result->dqmEventStats.getSampleCount() +
00427 result->faultyDQMEventStats.getSampleCount() -
00428 result->dqmDiscardStats.getSampleCount();
00429
00430 resultsList.push_back(result);
00431 }
00432 }
00433
00434 return resultsList;
00435 }
00436
00437
00438 void DataSenderMonitorCollection::do_calculateStatistics()
00439 {
00440 boost::mutex::scoped_lock sl(collectionsMutex_);
00441
00442 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00443 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00444 resourceBrokerMap_.end();
00445 for (rbMapIter=resourceBrokerMap_.begin(); rbMapIter!=rbMapEnd; ++rbMapIter)
00446 {
00447 RBRecordPtr rbRecordPtr = rbMapIter->second;
00448 rbRecordPtr->eventSize.calculateStatistics();
00449 rbRecordPtr->dqmEventSize.calculateStatistics();
00450 rbRecordPtr->errorEventSize.calculateStatistics();
00451 rbRecordPtr->faultyEventSize.calculateStatistics();
00452 rbRecordPtr->faultyDQMEventSize.calculateStatistics();
00453 rbRecordPtr->dataDiscardCount.calculateStatistics();
00454 rbRecordPtr->dqmDiscardCount.calculateStatistics();
00455 rbRecordPtr->skippedDiscardCount.calculateStatistics();
00456 calcStatsForOutputModules(rbRecordPtr->outputModuleMap);
00457
00458 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00459 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00460 rbRecordPtr->filterUnitMap.end();
00461 for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
00462 fuMapIter != fuMapEnd; ++fuMapIter)
00463 {
00464 FURecordPtr fuRecordPtr = fuMapIter->second;
00465 fuRecordPtr->shortIntervalEventSize.calculateStatistics();
00466 fuRecordPtr->mediumIntervalEventSize.calculateStatistics();
00467 fuRecordPtr->dqmEventSize.calculateStatistics();
00468 fuRecordPtr->errorEventSize.calculateStatistics();
00469 fuRecordPtr->faultyEventSize.calculateStatistics();
00470 fuRecordPtr->faultyDQMEventSize.calculateStatistics();
00471 fuRecordPtr->dataDiscardCount.calculateStatistics();
00472 fuRecordPtr->dqmDiscardCount.calculateStatistics();
00473 fuRecordPtr->skippedDiscardCount.calculateStatistics();
00474 calcStatsForOutputModules(fuRecordPtr->outputModuleMap);
00475 }
00476 }
00477
00478 calcStatsForOutputModules(outputModuleMap_);
00479 }
00480
00481
00482 void DataSenderMonitorCollection::do_reset()
00483 {
00484 boost::mutex::scoped_lock sl(collectionsMutex_);
00485
00486 connectedRBs_ = 0;
00487 connectedEPs_ = 0;
00488 activeEPs_ = 0;
00489 outstandingDataDiscards_ = 0;
00490 outstandingDQMDiscards_ = 0;
00491 faultyEvents_ = 0;
00492 ignoredDiscards_ = 0;
00493 resourceBrokerMap_.clear();
00494 outputModuleMap_.clear();
00495 }
00496
00497
00498 void DataSenderMonitorCollection::do_appendInfoSpaceItems(InfoSpaceItems& infoSpaceItems)
00499 {
00500 infoSpaceItems.push_back(std::make_pair("connectedRBs", &connectedRBs_));
00501 infoSpaceItems.push_back(std::make_pair("connectedEPs", &connectedEPs_));
00502 infoSpaceItems.push_back(std::make_pair("activeEPs", &activeEPs_));
00503 infoSpaceItems.push_back(std::make_pair("outstandingDataDiscards", &outstandingDataDiscards_));
00504 infoSpaceItems.push_back(std::make_pair("outstandingDQMDiscards", &outstandingDQMDiscards_));
00505 infoSpaceItems.push_back(std::make_pair("faultyEvents", &faultyEvents_));
00506 infoSpaceItems.push_back(std::make_pair("ignoredDiscards", &ignoredDiscards_));
00507 }
00508
00509
00510 void DataSenderMonitorCollection::do_updateInfoSpaceItems()
00511 {
00512 boost::mutex::scoped_lock sl(collectionsMutex_);
00513
00514 connectedRBs_ = static_cast<xdata::UnsignedInteger32>(resourceBrokerMap_.size());
00515
00516 uint32_t localEPCount = 0;
00517 uint32_t localActiveEPCount = 0;
00518 int localMissingDataDiscardCount = 0;
00519 int localMissingDQMDiscardCount = 0;
00520 uint32_t localFaultyEventsCount = 0;
00521 uint32_t localIgnoredDiscardCount = 0;
00522 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00523 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00524 resourceBrokerMap_.end();
00525 for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
00526 {
00527 RBRecordPtr rbRecordPtr = rbMapIter->second;
00528 localEPCount += rbRecordPtr->filterUnitMap.size();
00529
00530 MonitoredQuantity::Stats skippedDiscardStats;
00531 rbRecordPtr->skippedDiscardCount.getStats(skippedDiscardStats);
00532 localIgnoredDiscardCount += skippedDiscardStats.getSampleCount();
00533
00534 MonitoredQuantity::Stats eventStats;
00535 MonitoredQuantity::Stats errorEventStats;
00536 MonitoredQuantity::Stats dataDiscardStats;
00537 rbRecordPtr->eventSize.getStats(eventStats);
00538 rbRecordPtr->errorEventSize.getStats(errorEventStats);
00539 rbRecordPtr->dataDiscardCount.getStats(dataDiscardStats);
00540 localMissingDataDiscardCount += rbRecordPtr->initMsgCount + eventStats.getSampleCount() +
00541 errorEventStats.getSampleCount() - dataDiscardStats.getSampleCount();
00542 localEPCount -= errorEventStats.getSampleCount();
00543
00544 MonitoredQuantity::Stats dqmEventStats;
00545 MonitoredQuantity::Stats dqmDiscardStats;
00546 rbRecordPtr->dqmEventSize.getStats(dqmEventStats);
00547 rbRecordPtr->dqmDiscardCount.getStats(dqmDiscardStats);
00548 localMissingDQMDiscardCount += dqmEventStats.getSampleCount() -
00549 dqmDiscardStats.getSampleCount();
00550
00551 MonitoredQuantity::Stats faultyEventStats;
00552 rbRecordPtr->faultyEventSize.getStats(faultyEventStats);
00553 localFaultyEventsCount += faultyEventStats.getSampleCount();
00554 MonitoredQuantity::Stats faultyDQMEventStats;
00555 rbRecordPtr->faultyDQMEventSize.getStats(faultyDQMEventStats);
00556 localFaultyEventsCount += faultyDQMEventStats.getSampleCount();
00557
00558 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00559 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00560 rbRecordPtr->filterUnitMap.end();
00561 for (fuMapIter = rbRecordPtr->filterUnitMap.begin(); fuMapIter != fuMapEnd; ++fuMapIter)
00562 {
00563 FURecordPtr fuRecordPtr = fuMapIter->second;
00564 MonitoredQuantity::Stats fuMediumIntervalEventStats;
00565 fuRecordPtr->mediumIntervalEventSize.getStats(fuMediumIntervalEventStats);
00566 if (fuMediumIntervalEventStats.getSampleCount(MonitoredQuantity::RECENT) > 0) {
00567 ++localActiveEPCount;
00568 }
00569 }
00570 }
00571 connectedEPs_ = static_cast<xdata::UnsignedInteger32>(localEPCount);
00572 activeEPs_ = static_cast<xdata::UnsignedInteger32>(localActiveEPCount);
00573 outstandingDataDiscards_ = static_cast<xdata::Integer32>(localMissingDataDiscardCount);
00574 outstandingDQMDiscards_ = static_cast<xdata::Integer32>(localMissingDQMDiscardCount);
00575 faultyEvents_ = static_cast<xdata::UnsignedInteger32>(localFaultyEventsCount);
00576 ignoredDiscards_ = static_cast<xdata::UnsignedInteger32>(localIgnoredDiscardCount);
00577
00578 faultyEventsAlarm(localFaultyEventsCount);
00579 ignoredDiscardAlarm(localIgnoredDiscardCount);
00580 }
00581
00582
00583 void DataSenderMonitorCollection::faultyEventsAlarm(const uint32_t& faultyEventsCount) const
00584 {
00585 const std::string alarmName = "FaultyEvents";
00586
00587 if (faultyEventsCount > 0)
00588 {
00589 std::ostringstream msg;
00590 msg << "Missing or faulty I2O fragments for " <<
00591 faultyEventsCount <<
00592 " events. These events are lost!";
00593 XCEPT_DECLARE(stor::exception::FaultyEvents, ex, msg.str());
00594 alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
00595 }
00596 else
00597 {
00598 alarmHandler_->revokeAlarm(alarmName);
00599 }
00600 }
00601
00602
00603 void DataSenderMonitorCollection::ignoredDiscardAlarm(const uint32_t& ignoredDiscardCount) const
00604 {
00605 const std::string alarmName = "IgnoredDiscard";
00606
00607 if ( ignoredDiscardCount > 0)
00608 {
00609 std::ostringstream msg;
00610 msg << ignoredDiscardCount <<
00611 " discard messages ignored. These events might be stuck in the resource broker.";
00612 XCEPT_DECLARE(stor::exception::IgnoredDiscard, ex, msg.str());
00613 alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
00614 }
00615 else
00616 {
00617 alarmHandler_->revokeAlarm(alarmName);
00618 }
00619 }
00620
00621
00622 typedef DataSenderMonitorCollection DSMC;
00623
00624 bool DSMC::getAllNeededPointers
00625 (
00626 I2OChain const& i2oChain,
00627 DSMC::RBRecordPtr& rbRecordPtr,
00628 DSMC::FURecordPtr& fuRecordPtr,
00629 DSMC::OutModRecordPtr& topLevelOutModPtr,
00630 DSMC::OutModRecordPtr& rbSpecificOutModPtr,
00631 DSMC::OutModRecordPtr& fuSpecificOutModPtr
00632 )
00633 {
00634 ResourceBrokerKey rbKey(i2oChain);
00635 if (! rbKey.isValid) {return false;}
00636 FilterUnitKey fuKey(i2oChain);
00637 if (! fuKey.isValid) {return false;}
00638 OutputModuleKey outModKey = i2oChain.outputModuleId();
00639
00640 topLevelOutModPtr = getOutputModuleRecord(outputModuleMap_, outModKey);
00641
00642 rbRecordPtr = getResourceBrokerRecord(rbKey);
00643 rbSpecificOutModPtr = getOutputModuleRecord(
00644 rbRecordPtr->outputModuleMap,
00645 outModKey);
00646
00647 fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
00648 fuSpecificOutModPtr = getOutputModuleRecord(
00649 fuRecordPtr->outputModuleMap,
00650 outModKey);
00651
00652 return true;
00653 }
00654
00655
00656 bool DSMC::getRBRecordPointer
00657 (
00658 I2OChain const& i2oChain,
00659 DSMC::RBRecordPtr& rbRecordPtr
00660 )
00661 {
00662 ResourceBrokerKey rbKey(i2oChain);
00663 if (! rbKey.isValid) {return false;}
00664
00665 rbRecordPtr = getResourceBrokerRecord(rbKey);
00666 return true;
00667 }
00668
00669
00670 bool DSMC::getFURecordPointer
00671 (
00672 I2OChain const& i2oChain,
00673 DSMC::RBRecordPtr& rbRecordPtr,
00674 DSMC::FURecordPtr& fuRecordPtr
00675 )
00676 {
00677 FilterUnitKey fuKey(i2oChain);
00678 if (! fuKey.isValid) {return false;}
00679
00680 fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
00681 return true;
00682 }
00683
00684
00685 DSMC::RBRecordPtr
00686 DSMC::getResourceBrokerRecord(DSMC::ResourceBrokerKey const& rbKey)
00687 {
00688 RBRecordPtr rbRecordPtr;
00689 UniqueResourceBrokerID_t uniqueRBID = getUniqueResourceBrokerID(rbKey);
00690 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00691 rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00692 if (rbMapIter == resourceBrokerMap_.end())
00693 {
00694 rbRecordPtr.reset(new ResourceBrokerRecord(rbKey,updateInterval_));
00695 resourceBrokerMap_[uniqueRBID] = rbRecordPtr;
00696 }
00697 else
00698 {
00699 rbRecordPtr = rbMapIter->second;
00700 }
00701 return rbRecordPtr;
00702 }
00703
00704
00705 DSMC::UniqueResourceBrokerID_t
00706 DSMC::getUniqueResourceBrokerID(DSMC::ResourceBrokerKey const& rbKey)
00707 {
00708 UniqueResourceBrokerID_t uniqueID;
00709 std::map<ResourceBrokerKey, UniqueResourceBrokerID_t>::const_iterator rbMapIter;
00710 rbMapIter = resourceBrokerIDs_.find(rbKey);
00711 if (rbMapIter == resourceBrokerIDs_.end())
00712 {
00713 std::string workString = rbKey.hltURL +
00714 boost::lexical_cast<std::string>(rbKey.hltTid) +
00715 boost::lexical_cast<std::string>(rbKey.hltInstance) +
00716 boost::lexical_cast<std::string>(rbKey.hltLocalId) +
00717 rbKey.hltClassName;
00718 uLong crc = crc32(0L, Z_NULL, 0);
00719 Bytef* crcbuf = (Bytef*) workString.data();
00720 crc = crc32(crc, crcbuf, workString.length());
00721 uniqueID = static_cast<UniqueResourceBrokerID_t>(crc);
00722 resourceBrokerIDs_[rbKey] = uniqueID;
00723 }
00724 else
00725 {
00726 uniqueID = rbMapIter->second;
00727 }
00728 return uniqueID;
00729 }
00730
00731
00732 DSMC::FURecordPtr
00733 DSMC::getFilterUnitRecord
00734 (
00735 DSMC::RBRecordPtr& rbRecordPtr,
00736 DSMC::FilterUnitKey const& fuKey
00737 )
00738 {
00739 FURecordPtr fuRecordPtr;
00740 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00741 fuMapIter = rbRecordPtr->filterUnitMap.find(fuKey);
00742 if (fuMapIter == rbRecordPtr->filterUnitMap.end())
00743 {
00744 fuRecordPtr.reset(new FilterUnitRecord(fuKey,updateInterval_));
00745 rbRecordPtr->filterUnitMap[fuKey] = fuRecordPtr;
00746 }
00747 else
00748 {
00749 fuRecordPtr = fuMapIter->second;
00750 }
00751 return fuRecordPtr;
00752 }
00753
00754
00755 DSMC::OutModRecordPtr
00756 DSMC::getOutputModuleRecord
00757 (
00758 OutputModuleRecordMap& outModMap,
00759 DSMC::OutputModuleKey const& outModKey
00760 )
00761 {
00762 OutModRecordPtr outModRecordPtr;
00763 OutputModuleRecordMap::const_iterator omMapIter;
00764 omMapIter = outModMap.find(outModKey);
00765 if (omMapIter == outModMap.end())
00766 {
00767 outModRecordPtr.reset(new OutputModuleRecord(updateInterval_));
00768
00769 outModRecordPtr->name = "Unknown";
00770 outModRecordPtr->id = outModKey;
00771 outModRecordPtr->initMsgSize = 0;
00772
00773 outModMap[outModKey] = outModRecordPtr;
00774 }
00775 else
00776 {
00777 outModRecordPtr = omMapIter->second;
00778 }
00779 return outModRecordPtr;
00780 }
00781
00782
00783 DSMC::OutputModuleResultsList
00784 DSMC::buildOutputModuleResults(DSMC::OutputModuleRecordMap const& outputModuleMap) const
00785 {
00786 OutputModuleResultsList resultsList;
00787
00788 OutputModuleRecordMap::const_iterator omMapIter;
00789 OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
00790 for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
00791 {
00792 OutModRecordPtr outModRecordPtr = omMapIter->second;
00793 boost::shared_ptr<OutputModuleResult> result(new OutputModuleResult());
00794 result->name = outModRecordPtr->name;
00795 result->id = outModRecordPtr->id;
00796 result->initMsgSize = outModRecordPtr->initMsgSize;
00797 outModRecordPtr->eventSize.getStats(result->eventStats);
00798 resultsList.push_back(result);
00799 }
00800
00801 return resultsList;
00802 }
00803
00804
00805 DSMC::RBResultPtr
00806 DSMC::buildResourceBrokerResult(DSMC::RBRecordPtr const& rbRecordPtr) const
00807 {
00808 RBResultPtr result(new ResourceBrokerResult(rbRecordPtr->key));
00809
00810 result->filterUnitCount = rbRecordPtr->filterUnitMap.size();
00811 result->initMsgCount = rbRecordPtr->initMsgCount;
00812 result->lastRunNumber = rbRecordPtr->lastRunNumber;
00813 result->lastEventNumber = rbRecordPtr->lastEventNumber;
00814 rbRecordPtr->eventSize.getStats(result->eventStats);
00815 rbRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
00816 rbRecordPtr->errorEventSize.getStats(result->errorEventStats);
00817 rbRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
00818 rbRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
00819 rbRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
00820 rbRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
00821 rbRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
00822
00823 result->outstandingDataDiscardCount =
00824 result->initMsgCount +
00825 result->eventStats.getSampleCount() +
00826 result->errorEventStats.getSampleCount() +
00827 result->faultyEventStats.getSampleCount() -
00828 result->dataDiscardStats.getSampleCount();
00829 result->outstandingDQMDiscardCount =
00830 result->dqmEventStats.getSampleCount() +
00831 result->faultyDQMEventStats.getSampleCount() -
00832 result->dqmDiscardStats.getSampleCount();
00833
00834 return result;
00835 }
00836
00837
00838 void DSMC::calcStatsForOutputModules(DSMC::OutputModuleRecordMap& outputModuleMap)
00839 {
00840 OutputModuleRecordMap::const_iterator omMapIter;
00841 OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
00842 for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
00843 {
00844 OutModRecordPtr outModRecordPtr = omMapIter->second;
00845
00846
00847 outModRecordPtr->eventSize.calculateStatistics();
00848 }
00849 }
00850
00851
00852 bool compareRBResultPtrValues
00853 (
00854 DSMC::RBResultPtr firstValue,
00855 DSMC::RBResultPtr secondValue
00856 )
00857 {
00858 return *firstValue < *secondValue;
00859 }
00860
00861 }
00862
00863