00001
00003
00004 #include <string>
00005 #include <sstream>
00006 #include <iomanip>
00007
00008 #include <zlib.h>
00009 #include <boost/lexical_cast.hpp>
00010
00011 #include "EventFilter/StorageManager/interface/Exception.h"
00012 #include "EventFilter/StorageManager/interface/DataSenderMonitorCollection.h"
00013
00014
00015 namespace stor {
00016
00017 DataSenderMonitorCollection::DataSenderMonitorCollection
00018 (
00019 const utils::Duration_t& updateInterval,
00020 AlarmHandlerPtr ah
00021 ) :
00022 MonitorCollection(updateInterval),
00023 connectedRBs_(0),
00024 connectedEPs_(0),
00025 activeEPs_(0),
00026 outstandingDataDiscards_(0),
00027 outstandingDQMDiscards_(0),
00028 faultyEvents_(0),
00029 ignoredDiscards_(0),
00030 updateInterval_(updateInterval),
00031 alarmHandler_(ah)
00032 {}
00033
00034
00035 void DataSenderMonitorCollection::addInitSample(I2OChain const& i2oChain)
00036 {
00037
00038 if (i2oChain.messageCode() != Header::INIT) {return;}
00039 if (! i2oChain.complete()) {return;}
00040
00041
00042 std::string outModName = i2oChain.outputModuleLabel();
00043 uint32_t msgSize = i2oChain.totalDataSize();
00044
00045
00046 bool pointersAreValid;
00047 RBRecordPtr rbRecordPtr;
00048 FURecordPtr fuRecordPtr;
00049 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00050 {
00051 boost::mutex::scoped_lock sl(collectionsMutex_);
00052 pointersAreValid = getAllNeededPointers(
00053 i2oChain, rbRecordPtr, fuRecordPtr,
00054 topLevelOutModPtr, rbSpecificOutModPtr,
00055 fuSpecificOutModPtr);
00056 }
00057
00058
00059 if (pointersAreValid)
00060 {
00061 topLevelOutModPtr->name = outModName;
00062 topLevelOutModPtr->initMsgSize = msgSize;
00063
00064 ++rbRecordPtr->initMsgCount;
00065 rbRecordPtr->nExpectedEPs += i2oChain.nExpectedEPs();
00066 rbSpecificOutModPtr->name = outModName;
00067 rbSpecificOutModPtr->initMsgSize = msgSize;
00068
00069 ++fuRecordPtr->initMsgCount;
00070 fuSpecificOutModPtr->name = outModName;
00071 fuSpecificOutModPtr->initMsgSize = msgSize;
00072 }
00073 }
00074
00075
00076 void DataSenderMonitorCollection::addEventSample(I2OChain const& i2oChain)
00077 {
00078
00079 if (i2oChain.messageCode() != Header::EVENT) {return;}
00080 if (! i2oChain.complete()) {return;}
00081
00082
00083 double eventSize = static_cast<double>(i2oChain.totalDataSize());
00084 uint32_t runNumber = i2oChain.runNumber();
00085 uint32_t eventNumber = i2oChain.eventNumber();
00086
00087
00088 bool pointersAreValid;
00089 RBRecordPtr rbRecordPtr;
00090 FURecordPtr fuRecordPtr;
00091 OutModRecordPtr topLevelOutModPtr, rbSpecificOutModPtr, fuSpecificOutModPtr;
00092 {
00093 boost::mutex::scoped_lock sl(collectionsMutex_);
00094 pointersAreValid = getAllNeededPointers(
00095 i2oChain, rbRecordPtr, fuRecordPtr,
00096 topLevelOutModPtr, rbSpecificOutModPtr,
00097 fuSpecificOutModPtr);
00098 }
00099
00100
00101 if (pointersAreValid)
00102 {
00103 topLevelOutModPtr->eventSize.addSample(eventSize);
00104
00105 rbRecordPtr->lastRunNumber = runNumber;
00106 rbRecordPtr->lastEventNumber = eventNumber;
00107 rbRecordPtr->eventSize.addSample(eventSize);
00108 rbSpecificOutModPtr->eventSize.addSample(eventSize);
00109
00110 fuRecordPtr->lastRunNumber = runNumber;
00111 fuRecordPtr->lastEventNumber = eventNumber;
00112 fuRecordPtr->shortIntervalEventSize.addSample(eventSize);
00113 fuRecordPtr->mediumIntervalEventSize.addSample(eventSize);
00114 fuSpecificOutModPtr->eventSize.addSample(eventSize);
00115 }
00116 }
00117
00118
00119 void DataSenderMonitorCollection::addDQMEventSample(I2OChain const& i2oChain)
00120 {
00121
00122 if (i2oChain.messageCode() != Header::DQM_EVENT) {return;}
00123 if (! i2oChain.complete()) {return;}
00124
00125
00126 double eventSize = static_cast<double>(i2oChain.totalDataSize());
00127
00128
00129 bool pointersAreValid;
00130 RBRecordPtr rbRecordPtr;
00131 FURecordPtr fuRecordPtr;
00132 {
00133 boost::mutex::scoped_lock sl(collectionsMutex_);
00134 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00135 if (pointersAreValid)
00136 {
00137 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00138 }
00139 }
00140
00141
00142 if (pointersAreValid)
00143 {
00144 rbRecordPtr->dqmEventSize.addSample(eventSize);
00145 fuRecordPtr->dqmEventSize.addSample(eventSize);
00146 }
00147 }
00148
00149
00150 void DataSenderMonitorCollection::addErrorEventSample(I2OChain const& i2oChain)
00151 {
00152
00153 if (i2oChain.messageCode() != Header::ERROR_EVENT) {return;}
00154 if (! i2oChain.complete()) {return;}
00155
00156
00157 double eventSize = static_cast<double>(i2oChain.totalDataSize());
00158
00159
00160 bool pointersAreValid;
00161 RBRecordPtr rbRecordPtr;
00162 FURecordPtr fuRecordPtr;
00163 {
00164 boost::mutex::scoped_lock sl(collectionsMutex_);
00165 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00166 if (pointersAreValid)
00167 {
00168 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00169 }
00170 }
00171
00172
00173 if (pointersAreValid)
00174 {
00175 rbRecordPtr->errorEventSize.addSample(eventSize);
00176 fuRecordPtr->errorEventSize.addSample(eventSize);
00177 }
00178 }
00179
00180
00181 void DataSenderMonitorCollection::addFaultyEventSample(I2OChain const& i2oChain)
00182 {
00183
00184 double eventSize = static_cast<double>(i2oChain.totalDataSize());
00185
00186
00187 bool pointersAreValid;
00188 RBRecordPtr rbRecordPtr;
00189 FURecordPtr fuRecordPtr;
00190 {
00191 boost::mutex::scoped_lock sl(collectionsMutex_);
00192 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00193 if (pointersAreValid)
00194 {
00195 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00196 }
00197 }
00198
00199
00200 if (pointersAreValid)
00201 {
00202 if (i2oChain.messageCode() == Header::DQM_EVENT)
00203 {
00204 rbRecordPtr->faultyDQMEventSize.addSample(eventSize);
00205 fuRecordPtr->faultyDQMEventSize.addSample(eventSize);
00206 }
00207 else
00208 {
00209 rbRecordPtr->faultyEventSize.addSample(eventSize);
00210 fuRecordPtr->faultyEventSize.addSample(eventSize);
00211 }
00212 }
00213 }
00214
00215
00216 void DataSenderMonitorCollection::incrementDataDiscardCount(I2OChain const& i2oChain)
00217 {
00218
00219 bool pointersAreValid;
00220 RBRecordPtr rbRecordPtr;
00221 FURecordPtr fuRecordPtr;
00222 {
00223 boost::mutex::scoped_lock sl(collectionsMutex_);
00224 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00225 if (pointersAreValid)
00226 {
00227 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00228 }
00229 }
00230
00231
00232 if (pointersAreValid)
00233 {
00234 rbRecordPtr->dataDiscardCount.addSample(1);
00235 fuRecordPtr->dataDiscardCount.addSample(1);
00236 }
00237 }
00238
00239
00240 void DataSenderMonitorCollection::incrementDQMDiscardCount(I2OChain const& i2oChain)
00241 {
00242
00243 bool pointersAreValid;
00244 RBRecordPtr rbRecordPtr;
00245 FURecordPtr fuRecordPtr;
00246 {
00247 boost::mutex::scoped_lock sl(collectionsMutex_);
00248 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00249 if (pointersAreValid)
00250 {
00251 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00252 }
00253 }
00254
00255
00256 if (pointersAreValid)
00257 {
00258 rbRecordPtr->dqmDiscardCount.addSample(1);
00259 fuRecordPtr->dqmDiscardCount.addSample(1);
00260 }
00261 }
00262
00263
00264 void DataSenderMonitorCollection::incrementSkippedDiscardCount(I2OChain const& i2oChain)
00265 {
00266
00267 bool pointersAreValid;
00268 RBRecordPtr rbRecordPtr;
00269 FURecordPtr fuRecordPtr;
00270 {
00271 boost::mutex::scoped_lock sl(collectionsMutex_);
00272 pointersAreValid = getRBRecordPointer(i2oChain, rbRecordPtr);
00273 if (pointersAreValid)
00274 {
00275 pointersAreValid = getFURecordPointer(i2oChain, rbRecordPtr, fuRecordPtr);
00276 }
00277 }
00278
00279
00280 if (pointersAreValid)
00281 {
00282 rbRecordPtr->skippedDiscardCount.addSample(1);
00283 fuRecordPtr->skippedDiscardCount.addSample(1);
00284 }
00285 }
00286
00287
00288 DataSenderMonitorCollection::OutputModuleResultsList
00289 DataSenderMonitorCollection::getTopLevelOutputModuleResults() const
00290 {
00291 boost::mutex::scoped_lock sl(collectionsMutex_);
00292
00293 return buildOutputModuleResults(outputModuleMap_);
00294 }
00295
00296
00297 DataSenderMonitorCollection::ResourceBrokerResultsList
00298 DataSenderMonitorCollection::getAllResourceBrokerResults() const
00299 {
00300 boost::mutex::scoped_lock sl(collectionsMutex_);
00301 ResourceBrokerResultsList resultsList;
00302
00303 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00304 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00305 resourceBrokerMap_.end();
00306 for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
00307 {
00308 RBRecordPtr rbRecordPtr = rbMapIter->second;
00309 RBResultPtr result = buildResourceBrokerResult(rbRecordPtr);
00310 result->uniqueRBID = rbMapIter->first;
00311 resultsList.push_back(result);
00312 }
00313
00314 return resultsList;
00315 }
00316
00317
00318 DataSenderMonitorCollection::RBResultPtr
00319 DataSenderMonitorCollection::getOneResourceBrokerResult(UniqueResourceBrokerID_t uniqueRBID) const
00320 {
00321 boost::mutex::scoped_lock sl(collectionsMutex_);
00322 RBResultPtr result;
00323
00324 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00325 rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00326 if (rbMapIter != resourceBrokerMap_.end())
00327 {
00328 RBRecordPtr rbRecordPtr = rbMapIter->second;
00329 result = buildResourceBrokerResult(rbRecordPtr);
00330 result->uniqueRBID = rbMapIter->first;
00331 }
00332
00333 return result;
00334 }
00335
00336
00337 DataSenderMonitorCollection::OutputModuleResultsList
00338 DataSenderMonitorCollection::getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
00339 {
00340 boost::mutex::scoped_lock sl(collectionsMutex_);
00341 OutputModuleResultsList resultsList;
00342
00343 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00344 rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00345 if (rbMapIter != resourceBrokerMap_.end())
00346 {
00347 RBRecordPtr rbRecordPtr = rbMapIter->second;
00348 resultsList = buildOutputModuleResults(rbRecordPtr->outputModuleMap);
00349 }
00350
00351 return resultsList;
00352 }
00353
00354
00355 DataSenderMonitorCollection::FilterUnitResultsList
00356 DataSenderMonitorCollection::getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const
00357 {
00358 boost::mutex::scoped_lock sl(collectionsMutex_);
00359 FilterUnitResultsList resultsList;
00360
00361 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00362 rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00363 if (rbMapIter != resourceBrokerMap_.end())
00364 {
00365 RBRecordPtr rbRecordPtr = rbMapIter->second;
00366 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00367 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00368 rbRecordPtr->filterUnitMap.end();
00369 for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
00370 fuMapIter != fuMapEnd; ++fuMapIter)
00371 {
00372 FURecordPtr fuRecordPtr = fuMapIter->second;
00373 FUResultPtr result(new FilterUnitResult(fuRecordPtr->key));
00374 result->initMsgCount = fuRecordPtr->initMsgCount;
00375 result->lastRunNumber = fuRecordPtr->lastRunNumber;
00376 result->lastEventNumber = fuRecordPtr->lastEventNumber;
00377 fuRecordPtr->shortIntervalEventSize.getStats(result->shortIntervalEventStats);
00378 fuRecordPtr->mediumIntervalEventSize.getStats(result->mediumIntervalEventStats);
00379 fuRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
00380 fuRecordPtr->errorEventSize.getStats(result->errorEventStats);
00381 fuRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
00382 fuRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
00383 fuRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
00384 fuRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
00385 fuRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
00386
00387 result->outstandingDataDiscardCount =
00388 result->initMsgCount +
00389 result->shortIntervalEventStats.getSampleCount() +
00390 result->errorEventStats.getSampleCount() +
00391 result->faultyEventStats.getSampleCount() -
00392 result->dataDiscardStats.getSampleCount();
00393 result->outstandingDQMDiscardCount =
00394 result->dqmEventStats.getSampleCount() +
00395 result->faultyDQMEventStats.getSampleCount() -
00396 result->dqmDiscardStats.getSampleCount();
00397
00398 resultsList.push_back(result);
00399 }
00400 }
00401
00402 return resultsList;
00403 }
00404
00405
00406 void DataSenderMonitorCollection::do_calculateStatistics()
00407 {
00408 boost::mutex::scoped_lock sl(collectionsMutex_);
00409
00410 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00411 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00412 resourceBrokerMap_.end();
00413 for (rbMapIter=resourceBrokerMap_.begin(); rbMapIter!=rbMapEnd; ++rbMapIter)
00414 {
00415 RBRecordPtr rbRecordPtr = rbMapIter->second;
00416 rbRecordPtr->eventSize.calculateStatistics();
00417 rbRecordPtr->dqmEventSize.calculateStatistics();
00418 rbRecordPtr->errorEventSize.calculateStatistics();
00419 rbRecordPtr->faultyEventSize.calculateStatistics();
00420 rbRecordPtr->faultyDQMEventSize.calculateStatistics();
00421 rbRecordPtr->dataDiscardCount.calculateStatistics();
00422 rbRecordPtr->dqmDiscardCount.calculateStatistics();
00423 rbRecordPtr->skippedDiscardCount.calculateStatistics();
00424 calcStatsForOutputModules(rbRecordPtr->outputModuleMap);
00425
00426 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00427 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00428 rbRecordPtr->filterUnitMap.end();
00429 for (fuMapIter = rbRecordPtr->filterUnitMap.begin();
00430 fuMapIter != fuMapEnd; ++fuMapIter)
00431 {
00432 FURecordPtr fuRecordPtr = fuMapIter->second;
00433 fuRecordPtr->shortIntervalEventSize.calculateStatistics();
00434 fuRecordPtr->mediumIntervalEventSize.calculateStatistics();
00435 fuRecordPtr->dqmEventSize.calculateStatistics();
00436 fuRecordPtr->errorEventSize.calculateStatistics();
00437 fuRecordPtr->faultyEventSize.calculateStatistics();
00438 fuRecordPtr->faultyDQMEventSize.calculateStatistics();
00439 fuRecordPtr->dataDiscardCount.calculateStatistics();
00440 fuRecordPtr->dqmDiscardCount.calculateStatistics();
00441 fuRecordPtr->skippedDiscardCount.calculateStatistics();
00442 calcStatsForOutputModules(fuRecordPtr->outputModuleMap);
00443 }
00444 }
00445
00446 calcStatsForOutputModules(outputModuleMap_);
00447 }
00448
00449
00450 void DataSenderMonitorCollection::do_reset()
00451 {
00452 boost::mutex::scoped_lock sl(collectionsMutex_);
00453
00454 connectedRBs_ = 0;
00455 connectedEPs_ = 0;
00456 activeEPs_ = 0;
00457 outstandingDataDiscards_ = 0;
00458 outstandingDQMDiscards_ = 0;
00459 faultyEvents_ = 0;
00460 ignoredDiscards_ = 0;
00461 resourceBrokerMap_.clear();
00462 outputModuleMap_.clear();
00463 }
00464
00465
00466 void DataSenderMonitorCollection::do_appendInfoSpaceItems(InfoSpaceItems& infoSpaceItems)
00467 {
00468 infoSpaceItems.push_back(std::make_pair("connectedRBs", &connectedRBs_));
00469 infoSpaceItems.push_back(std::make_pair("connectedEPs", &connectedEPs_));
00470 infoSpaceItems.push_back(std::make_pair("activeEPs", &activeEPs_));
00471 infoSpaceItems.push_back(std::make_pair("outstandingDataDiscards", &outstandingDataDiscards_));
00472 infoSpaceItems.push_back(std::make_pair("outstandingDQMDiscards", &outstandingDQMDiscards_));
00473 infoSpaceItems.push_back(std::make_pair("faultyEvents", &faultyEvents_));
00474 infoSpaceItems.push_back(std::make_pair("ignoredDiscards", &ignoredDiscards_));
00475 }
00476
00477
00478 void DataSenderMonitorCollection::do_updateInfoSpaceItems()
00479 {
00480 boost::mutex::scoped_lock sl(collectionsMutex_);
00481
00482 connectedRBs_ = static_cast<xdata::UnsignedInteger32>(resourceBrokerMap_.size());
00483
00484 uint32_t localEPCount = 0;
00485 uint32_t localActiveEPCount = 0;
00486 int localMissingDataDiscardCount = 0;
00487 int localMissingDQMDiscardCount = 0;
00488 uint32_t localFaultyEventsCount = 0;
00489 uint32_t localIgnoredDiscardCount = 0;
00490 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00491 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00492 resourceBrokerMap_.end();
00493 for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
00494 {
00495 RBRecordPtr rbRecordPtr = rbMapIter->second;
00496 if ( rbRecordPtr->initMsgCount > 0 )
00497 localEPCount += rbRecordPtr->nExpectedEPs / rbRecordPtr->initMsgCount;
00498
00499 MonitoredQuantity::Stats skippedDiscardStats;
00500 rbRecordPtr->skippedDiscardCount.getStats(skippedDiscardStats);
00501 localIgnoredDiscardCount += skippedDiscardStats.getSampleCount();
00502
00503 MonitoredQuantity::Stats eventStats;
00504 MonitoredQuantity::Stats errorEventStats;
00505 MonitoredQuantity::Stats dataDiscardStats;
00506 rbRecordPtr->eventSize.getStats(eventStats);
00507 rbRecordPtr->errorEventSize.getStats(errorEventStats);
00508 rbRecordPtr->dataDiscardCount.getStats(dataDiscardStats);
00509 localMissingDataDiscardCount += rbRecordPtr->initMsgCount + eventStats.getSampleCount() +
00510 errorEventStats.getSampleCount() - dataDiscardStats.getSampleCount();
00511
00512 MonitoredQuantity::Stats dqmEventStats;
00513 MonitoredQuantity::Stats dqmDiscardStats;
00514 rbRecordPtr->dqmEventSize.getStats(dqmEventStats);
00515 rbRecordPtr->dqmDiscardCount.getStats(dqmDiscardStats);
00516 localMissingDQMDiscardCount += dqmEventStats.getSampleCount() -
00517 dqmDiscardStats.getSampleCount();
00518
00519 MonitoredQuantity::Stats faultyEventStats;
00520 rbRecordPtr->faultyEventSize.getStats(faultyEventStats);
00521 localFaultyEventsCount += faultyEventStats.getSampleCount();
00522 MonitoredQuantity::Stats faultyDQMEventStats;
00523 rbRecordPtr->faultyDQMEventSize.getStats(faultyDQMEventStats);
00524 localFaultyEventsCount += faultyDQMEventStats.getSampleCount();
00525
00526 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00527 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapEnd =
00528 rbRecordPtr->filterUnitMap.end();
00529 for (fuMapIter = rbRecordPtr->filterUnitMap.begin(); fuMapIter != fuMapEnd; ++fuMapIter)
00530 {
00531 FURecordPtr fuRecordPtr = fuMapIter->second;
00532 MonitoredQuantity::Stats fuMediumIntervalEventStats;
00533 fuRecordPtr->mediumIntervalEventSize.getStats(fuMediumIntervalEventStats);
00534 if (fuMediumIntervalEventStats.getSampleCount(MonitoredQuantity::RECENT) > 0) {
00535 ++localActiveEPCount;
00536 }
00537 }
00538 }
00539 connectedEPs_ = static_cast<xdata::UnsignedInteger32>(localEPCount);
00540 activeEPs_ = static_cast<xdata::UnsignedInteger32>(localActiveEPCount);
00541 outstandingDataDiscards_ = static_cast<xdata::Integer32>(localMissingDataDiscardCount);
00542 outstandingDQMDiscards_ = static_cast<xdata::Integer32>(localMissingDQMDiscardCount);
00543 faultyEvents_ = static_cast<xdata::UnsignedInteger32>(localFaultyEventsCount);
00544 ignoredDiscards_ = static_cast<xdata::UnsignedInteger32>(localIgnoredDiscardCount);
00545
00546 faultyEventsAlarm(localFaultyEventsCount);
00547 ignoredDiscardAlarm(localIgnoredDiscardCount);
00548 }
00549
00550
00551 void DataSenderMonitorCollection::faultyEventsAlarm(const uint32_t& faultyEventsCount) const
00552 {
00553 const std::string alarmName = "FaultyEvents";
00554
00555 if (faultyEventsCount > 0)
00556 {
00557 std::ostringstream msg;
00558 msg << "Missing or faulty I2O fragments for " <<
00559 faultyEventsCount <<
00560 " events. These events are lost!";
00561 XCEPT_DECLARE(stor::exception::FaultyEvents, ex, msg.str());
00562 alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
00563 }
00564 else
00565 {
00566 alarmHandler_->revokeAlarm(alarmName);
00567 }
00568 }
00569
00570
00571 void DataSenderMonitorCollection::ignoredDiscardAlarm(const uint32_t& ignoredDiscardCount) const
00572 {
00573 const std::string alarmName = "IgnoredDiscard";
00574
00575 if ( ignoredDiscardCount > 0)
00576 {
00577 std::ostringstream msg;
00578 msg << ignoredDiscardCount <<
00579 " discard messages ignored. These events might be stuck in the resource broker.";
00580 XCEPT_DECLARE(stor::exception::IgnoredDiscard, ex, msg.str());
00581 alarmHandler_->raiseAlarm(alarmName, AlarmHandler::ERROR, ex);
00582 }
00583 else
00584 {
00585 alarmHandler_->revokeAlarm(alarmName);
00586 }
00587 }
00588
00589
00590 size_t DataSenderMonitorCollection::getConnectedEPs() const
00591 {
00592 size_t count = 0;
00593 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00594 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapEnd =
00595 resourceBrokerMap_.end();
00596 for (rbMapIter = resourceBrokerMap_.begin(); rbMapIter != rbMapEnd; ++rbMapIter)
00597 {
00598 if ( rbMapIter->second->initMsgCount > 0 )
00599 count += rbMapIter->second->nExpectedEPs / rbMapIter->second->initMsgCount;
00600 }
00601 return count;
00602 }
00603
00604
00605 typedef DataSenderMonitorCollection DSMC;
00606
00607 bool DSMC::getAllNeededPointers
00608 (
00609 I2OChain const& i2oChain,
00610 DSMC::RBRecordPtr& rbRecordPtr,
00611 DSMC::FURecordPtr& fuRecordPtr,
00612 DSMC::OutModRecordPtr& topLevelOutModPtr,
00613 DSMC::OutModRecordPtr& rbSpecificOutModPtr,
00614 DSMC::OutModRecordPtr& fuSpecificOutModPtr
00615 )
00616 {
00617 ResourceBrokerKey rbKey(i2oChain);
00618 if (! rbKey.isValid) {return false;}
00619 FilterUnitKey fuKey(i2oChain);
00620 if (! fuKey.isValid) {return false;}
00621 OutputModuleKey outModKey = i2oChain.outputModuleId();
00622
00623 topLevelOutModPtr = getOutputModuleRecord(outputModuleMap_, outModKey);
00624
00625 rbRecordPtr = getResourceBrokerRecord(rbKey);
00626 rbSpecificOutModPtr = getOutputModuleRecord(
00627 rbRecordPtr->outputModuleMap,
00628 outModKey);
00629
00630 fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
00631 fuSpecificOutModPtr = getOutputModuleRecord(
00632 fuRecordPtr->outputModuleMap,
00633 outModKey);
00634
00635 return true;
00636 }
00637
00638
00639 bool DSMC::getRBRecordPointer
00640 (
00641 I2OChain const& i2oChain,
00642 DSMC::RBRecordPtr& rbRecordPtr
00643 )
00644 {
00645 ResourceBrokerKey rbKey(i2oChain);
00646 if (! rbKey.isValid) {return false;}
00647
00648 rbRecordPtr = getResourceBrokerRecord(rbKey);
00649 return true;
00650 }
00651
00652
00653 bool DSMC::getFURecordPointer
00654 (
00655 I2OChain const& i2oChain,
00656 DSMC::RBRecordPtr& rbRecordPtr,
00657 DSMC::FURecordPtr& fuRecordPtr
00658 )
00659 {
00660 FilterUnitKey fuKey(i2oChain);
00661 if (! fuKey.isValid) {return false;}
00662
00663 fuRecordPtr = getFilterUnitRecord(rbRecordPtr, fuKey);
00664 return true;
00665 }
00666
00667
00668 DSMC::RBRecordPtr
00669 DSMC::getResourceBrokerRecord(DSMC::ResourceBrokerKey const& rbKey)
00670 {
00671 RBRecordPtr rbRecordPtr;
00672 UniqueResourceBrokerID_t uniqueRBID = getUniqueResourceBrokerID(rbKey);
00673 std::map<UniqueResourceBrokerID_t, RBRecordPtr>::const_iterator rbMapIter;
00674 rbMapIter = resourceBrokerMap_.find(uniqueRBID);
00675 if (rbMapIter == resourceBrokerMap_.end())
00676 {
00677 rbRecordPtr.reset(new ResourceBrokerRecord(rbKey,updateInterval_));
00678 resourceBrokerMap_[uniqueRBID] = rbRecordPtr;
00679 }
00680 else
00681 {
00682 rbRecordPtr = rbMapIter->second;
00683 }
00684 return rbRecordPtr;
00685 }
00686
00687
00688 DSMC::UniqueResourceBrokerID_t
00689 DSMC::getUniqueResourceBrokerID(DSMC::ResourceBrokerKey const& rbKey)
00690 {
00691 UniqueResourceBrokerID_t uniqueID;
00692 std::map<ResourceBrokerKey, UniqueResourceBrokerID_t>::const_iterator rbMapIter;
00693 rbMapIter = resourceBrokerIDs_.find(rbKey);
00694 if (rbMapIter == resourceBrokerIDs_.end())
00695 {
00696 std::string workString = rbKey.hltURL +
00697 boost::lexical_cast<std::string>(rbKey.hltTid) +
00698 boost::lexical_cast<std::string>(rbKey.hltInstance) +
00699 boost::lexical_cast<std::string>(rbKey.hltLocalId) +
00700 rbKey.hltClassName;
00701 uLong crc = crc32(0L, Z_NULL, 0);
00702 Bytef* crcbuf = (Bytef*) workString.data();
00703 crc = crc32(crc, crcbuf, workString.length());
00704 uniqueID = static_cast<UniqueResourceBrokerID_t>(crc);
00705 resourceBrokerIDs_[rbKey] = uniqueID;
00706 }
00707 else
00708 {
00709 uniqueID = rbMapIter->second;
00710 }
00711 return uniqueID;
00712 }
00713
00714
00715 DSMC::FURecordPtr
00716 DSMC::getFilterUnitRecord
00717 (
00718 DSMC::RBRecordPtr& rbRecordPtr,
00719 DSMC::FilterUnitKey const& fuKey
00720 )
00721 {
00722 FURecordPtr fuRecordPtr;
00723 std::map<FilterUnitKey, FURecordPtr>::const_iterator fuMapIter;
00724 fuMapIter = rbRecordPtr->filterUnitMap.find(fuKey);
00725 if (fuMapIter == rbRecordPtr->filterUnitMap.end())
00726 {
00727 fuRecordPtr.reset(new FilterUnitRecord(fuKey,updateInterval_));
00728 rbRecordPtr->filterUnitMap[fuKey] = fuRecordPtr;
00729 }
00730 else
00731 {
00732 fuRecordPtr = fuMapIter->second;
00733 }
00734 return fuRecordPtr;
00735 }
00736
00737
00738 DSMC::OutModRecordPtr
00739 DSMC::getOutputModuleRecord
00740 (
00741 OutputModuleRecordMap& outModMap,
00742 DSMC::OutputModuleKey const& outModKey
00743 )
00744 {
00745 OutModRecordPtr outModRecordPtr;
00746 OutputModuleRecordMap::const_iterator omMapIter;
00747 omMapIter = outModMap.find(outModKey);
00748 if (omMapIter == outModMap.end())
00749 {
00750 outModRecordPtr.reset(new OutputModuleRecord(updateInterval_));
00751
00752 outModRecordPtr->name = "Unknown";
00753 outModRecordPtr->id = outModKey;
00754 outModRecordPtr->initMsgSize = 0;
00755
00756 outModMap[outModKey] = outModRecordPtr;
00757 }
00758 else
00759 {
00760 outModRecordPtr = omMapIter->second;
00761 }
00762 return outModRecordPtr;
00763 }
00764
00765
00766 DSMC::OutputModuleResultsList
00767 DSMC::buildOutputModuleResults(DSMC::OutputModuleRecordMap const& outputModuleMap) const
00768 {
00769 OutputModuleResultsList resultsList;
00770
00771 OutputModuleRecordMap::const_iterator omMapIter;
00772 OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
00773 for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
00774 {
00775 OutModRecordPtr outModRecordPtr = omMapIter->second;
00776 boost::shared_ptr<OutputModuleResult> result(new OutputModuleResult());
00777 result->name = outModRecordPtr->name;
00778 result->id = outModRecordPtr->id;
00779 result->initMsgSize = outModRecordPtr->initMsgSize;
00780 outModRecordPtr->eventSize.getStats(result->eventStats);
00781 resultsList.push_back(result);
00782 }
00783
00784 return resultsList;
00785 }
00786
00787
00788 DSMC::RBResultPtr
00789 DSMC::buildResourceBrokerResult(DSMC::RBRecordPtr const& rbRecordPtr) const
00790 {
00791 RBResultPtr result(new ResourceBrokerResult(rbRecordPtr->key));
00792
00793 result->filterUnitCount = rbRecordPtr->initMsgCount>0 ? rbRecordPtr->nExpectedEPs / rbRecordPtr->initMsgCount : 0;
00794 result->initMsgCount = rbRecordPtr->initMsgCount;
00795 result->lastRunNumber = rbRecordPtr->lastRunNumber;
00796 result->lastEventNumber = rbRecordPtr->lastEventNumber;
00797 rbRecordPtr->eventSize.getStats(result->eventStats);
00798 rbRecordPtr->dqmEventSize.getStats(result->dqmEventStats);
00799 rbRecordPtr->errorEventSize.getStats(result->errorEventStats);
00800 rbRecordPtr->faultyEventSize.getStats(result->faultyEventStats);
00801 rbRecordPtr->faultyDQMEventSize.getStats(result->faultyDQMEventStats);
00802 rbRecordPtr->dataDiscardCount.getStats(result->dataDiscardStats);
00803 rbRecordPtr->dqmDiscardCount.getStats(result->dqmDiscardStats);
00804 rbRecordPtr->skippedDiscardCount.getStats(result->skippedDiscardStats);
00805
00806 result->outstandingDataDiscardCount =
00807 result->initMsgCount +
00808 result->eventStats.getSampleCount() +
00809 result->errorEventStats.getSampleCount() +
00810 result->faultyEventStats.getSampleCount() -
00811 result->dataDiscardStats.getSampleCount();
00812 result->outstandingDQMDiscardCount =
00813 result->dqmEventStats.getSampleCount() +
00814 result->faultyDQMEventStats.getSampleCount() -
00815 result->dqmDiscardStats.getSampleCount();
00816
00817 return result;
00818 }
00819
00820
00821 void DSMC::calcStatsForOutputModules(DSMC::OutputModuleRecordMap& outputModuleMap)
00822 {
00823 OutputModuleRecordMap::const_iterator omMapIter;
00824 OutputModuleRecordMap::const_iterator omMapEnd = outputModuleMap.end();
00825 for (omMapIter = outputModuleMap.begin(); omMapIter != omMapEnd; ++omMapIter)
00826 {
00827 OutModRecordPtr outModRecordPtr = omMapIter->second;
00828
00829 outModRecordPtr->fragmentSize.calculateStatistics();
00830 outModRecordPtr->eventSize.calculateStatistics();
00831 }
00832 }
00833
00834
00835 bool compareRBResultPtrValues
00836 (
00837 DSMC::RBResultPtr firstValue,
00838 DSMC::RBResultPtr secondValue
00839 )
00840 {
00841 return *firstValue < *secondValue;
00842 }
00843
00844 }
00845
00846