00001
00003
00004 #ifndef EventFilter_StorageManager_DataSenderMonitorCollection_h
00005 #define EventFilter_StorageManager_DataSenderMonitorCollection_h
00006
00007 #include <map>
00008
00009 #include "xdata/UnsignedInteger32.h"
00010 #include "xdata/Integer32.h"
00011
00012 #include <boost/thread/mutex.hpp>
00013 #include <boost/shared_ptr.hpp>
00014
00015 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00016 #include "EventFilter/StorageManager/interface/I2OChain.h"
00017 #include "EventFilter/StorageManager/interface/MonitorCollection.h"
00018 #include "IOPool/Streamer/interface/MsgHeader.h"
00019
00020 namespace stor {
00021
00031 class DataSenderMonitorCollection : public MonitorCollection
00032 {
00033 public:
00034
00038 struct ResourceBrokerKey
00039 {
00040 bool isValid;
00041 std::string hltURL;
00042 uint32_t hltTid;
00043 uint32_t hltInstance;
00044 uint32_t hltLocalId;
00045 std::string hltClassName;
00046
00047 explicit ResourceBrokerKey(I2OChain const& i2oChain)
00048 {
00049 if (i2oChain.messageCode() != Header::INVALID)
00050 {
00051 isValid = true;
00052 hltURL = i2oChain.hltURL();
00053 hltTid = i2oChain.hltTid();
00054 hltInstance = i2oChain.hltInstance();
00055 hltLocalId = i2oChain.hltLocalId();
00056 hltClassName = i2oChain.hltClassName();
00057 }
00058 else
00059 {
00060 isValid = false;
00061 }
00062 }
00063
00064 bool operator<(ResourceBrokerKey const& other) const
00065 {
00066 if (isValid != other.isValid) return isValid < other.isValid;
00067 if (hltURL != other.hltURL) return hltURL < other.hltURL;
00068 if (hltTid != other.hltTid) return hltTid < other.hltTid;
00069 if (hltInstance != other.hltInstance) return hltInstance < other.hltInstance;
00070 if (hltLocalId != other.hltLocalId) return hltLocalId < other.hltLocalId;
00071 return hltClassName < other.hltClassName;
00072 }
00073 };
00074
00078 struct FilterUnitKey
00079 {
00080 bool isValid;
00081 uint32_t fuProcessId;
00082 uint32_t fuGuid;
00083
00084 explicit FilterUnitKey(I2OChain const& i2oChain)
00085 {
00086 if (i2oChain.messageCode() != Header::INVALID)
00087 {
00088 isValid = true;
00089 fuProcessId = i2oChain.fuProcessId();
00090 fuGuid = i2oChain.fuGuid();
00091 }
00092 else
00093 {
00094 isValid = false;
00095 }
00096 }
00097
00098 bool operator<(FilterUnitKey const& other) const
00099 {
00100 if (isValid != other.isValid) return isValid < other.isValid;
00101
00102
00103
00104
00105
00106
00107
00108 return fuProcessId < other.fuProcessId;
00109 }
00110 };
00111
00115 typedef uint32_t OutputModuleKey;
00116
00117
00121 struct OutputModuleRecord
00122 {
00123 std::string name;
00124 OutputModuleKey id;
00125 uint32_t initMsgSize;
00126 MonitoredQuantity fragmentSize;
00127 MonitoredQuantity eventSize;
00128
00129 OutputModuleRecord(const utils::Duration_t& updateInterval) :
00130 fragmentSize(updateInterval,boost::posix_time::seconds(10)),
00131 eventSize(updateInterval,boost::posix_time::seconds(10)) {}
00132 };
00133 typedef boost::shared_ptr<OutputModuleRecord> OutModRecordPtr;
00134 typedef std::map<OutputModuleKey, OutModRecordPtr> OutputModuleRecordMap;
00135
00139 struct FilterUnitRecord
00140 {
00141 FilterUnitKey key;
00142 OutputModuleRecordMap outputModuleMap;
00143 MonitoredQuantity shortIntervalEventSize;
00144 MonitoredQuantity mediumIntervalEventSize;
00145 MonitoredQuantity dqmEventSize;
00146 MonitoredQuantity errorEventSize;
00147 MonitoredQuantity faultyEventSize;
00148 MonitoredQuantity faultyDQMEventSize;
00149 MonitoredQuantity dataDiscardCount;
00150 MonitoredQuantity dqmDiscardCount;
00151 MonitoredQuantity skippedDiscardCount;
00152 uint32_t initMsgCount;
00153 uint32_t lastRunNumber;
00154 uint64_t lastEventNumber;
00155
00156 explicit FilterUnitRecord
00157 (
00158 FilterUnitKey fuKey,
00159 const utils::Duration_t& updateInterval
00160 ) :
00161 key(fuKey),
00162 shortIntervalEventSize(updateInterval,boost::posix_time::seconds(10)),
00163 mediumIntervalEventSize(updateInterval,boost::posix_time::seconds(300)),
00164 dqmEventSize(updateInterval,boost::posix_time::seconds(10)),
00165 errorEventSize(updateInterval,boost::posix_time::seconds(10)),
00166 faultyEventSize(updateInterval,boost::posix_time::seconds(10)),
00167 faultyDQMEventSize(updateInterval,boost::posix_time::seconds(10)),
00168 dataDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00169 dqmDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00170 skippedDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00171 initMsgCount(0), lastRunNumber(0), lastEventNumber(0) {}
00172 };
00173 typedef boost::shared_ptr<FilterUnitRecord> FURecordPtr;
00174
00178 struct ResourceBrokerRecord
00179 {
00180 ResourceBrokerKey key;
00181 std::map<FilterUnitKey, FURecordPtr> filterUnitMap;
00182 OutputModuleRecordMap outputModuleMap;
00183 MonitoredQuantity eventSize;
00184 MonitoredQuantity dqmEventSize;
00185 MonitoredQuantity errorEventSize;
00186 MonitoredQuantity faultyEventSize;
00187 MonitoredQuantity faultyDQMEventSize;
00188 MonitoredQuantity dataDiscardCount;
00189 MonitoredQuantity dqmDiscardCount;
00190 MonitoredQuantity skippedDiscardCount;
00191 uint32_t nExpectedEPs;
00192 uint32_t initMsgCount;
00193 uint32_t lastRunNumber;
00194 uint64_t lastEventNumber;
00195
00196 explicit ResourceBrokerRecord
00197 (
00198 ResourceBrokerKey rbKey,
00199 const utils::Duration_t& updateInterval
00200 ) :
00201 key(rbKey),
00202 eventSize(updateInterval,boost::posix_time::seconds(10)),
00203 dqmEventSize(updateInterval,boost::posix_time::seconds(10)),
00204 errorEventSize(updateInterval,boost::posix_time::seconds(10)),
00205 faultyEventSize(updateInterval,boost::posix_time::seconds(10)),
00206 faultyDQMEventSize(updateInterval,boost::posix_time::seconds(10)),
00207 dataDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00208 dqmDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00209 skippedDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00210 nExpectedEPs(0), initMsgCount(0), lastRunNumber(0), lastEventNumber(0) {}
00211 };
00212 typedef boost::shared_ptr<ResourceBrokerRecord> RBRecordPtr;
00213
00214
00218 struct OutputModuleResult
00219 {
00220 std::string name;
00221 OutputModuleKey id;
00222 uint32_t initMsgSize;
00223 MonitoredQuantity::Stats eventStats;
00224 };
00225 typedef std::vector< boost::shared_ptr<OutputModuleResult> >
00226 OutputModuleResultsList;
00227
00231 typedef long long UniqueResourceBrokerID_t;
00232 struct ResourceBrokerResult
00233 {
00234 ResourceBrokerKey key;
00235 uint32_t filterUnitCount;
00236 uint32_t initMsgCount;
00237 uint32_t lastRunNumber;
00238 uint64_t lastEventNumber;
00239 MonitoredQuantity::Stats eventStats;
00240 MonitoredQuantity::Stats dqmEventStats;
00241 MonitoredQuantity::Stats errorEventStats;
00242 MonitoredQuantity::Stats faultyEventStats;
00243 MonitoredQuantity::Stats faultyDQMEventStats;
00244 MonitoredQuantity::Stats dataDiscardStats;
00245 MonitoredQuantity::Stats dqmDiscardStats;
00246 MonitoredQuantity::Stats skippedDiscardStats;
00247 UniqueResourceBrokerID_t uniqueRBID;
00248 int outstandingDataDiscardCount;
00249 int outstandingDQMDiscardCount;
00250
00251 explicit ResourceBrokerResult(ResourceBrokerKey const& rbKey):
00252 key(rbKey), filterUnitCount(0), initMsgCount(0),
00253 lastRunNumber(0), lastEventNumber(0), uniqueRBID(0),
00254 outstandingDataDiscardCount(0), outstandingDQMDiscardCount(0) {}
00255
00256 bool operator<(ResourceBrokerResult const& other) const
00257 {
00258 return key < other.key;
00259 }
00260 };
00261 typedef boost::shared_ptr<ResourceBrokerResult> RBResultPtr;
00262 typedef std::vector<RBResultPtr> ResourceBrokerResultsList;
00263
00267 struct FilterUnitResult
00268 {
00269 FilterUnitKey key;
00270 uint32_t initMsgCount;
00271 uint32_t lastRunNumber;
00272 uint64_t lastEventNumber;
00273 MonitoredQuantity::Stats shortIntervalEventStats;
00274 MonitoredQuantity::Stats mediumIntervalEventStats;
00275 MonitoredQuantity::Stats dqmEventStats;
00276 MonitoredQuantity::Stats errorEventStats;
00277 MonitoredQuantity::Stats faultyEventStats;
00278 MonitoredQuantity::Stats faultyDQMEventStats;
00279 MonitoredQuantity::Stats dataDiscardStats;
00280 MonitoredQuantity::Stats dqmDiscardStats;
00281 MonitoredQuantity::Stats skippedDiscardStats;
00282 int outstandingDataDiscardCount;
00283 int outstandingDQMDiscardCount;
00284
00285 explicit FilterUnitResult(FilterUnitKey const& fuKey):
00286 key(fuKey), initMsgCount(0), lastRunNumber(0), lastEventNumber(0),
00287 outstandingDataDiscardCount(0), outstandingDQMDiscardCount(0) {}
00288 };
00289 typedef boost::shared_ptr<FilterUnitResult> FUResultPtr;
00290 typedef std::vector<FUResultPtr> FilterUnitResultsList;
00291
00292
00296 DataSenderMonitorCollection
00297 (
00298 const utils::Duration_t& updateInterval,
00299 AlarmHandlerPtr
00300 );
00301
00305 void addInitSample(I2OChain const&);
00306
00310 void addEventSample(I2OChain const&);
00311
00315 void addDQMEventSample(I2OChain const&);
00316
00320 void addErrorEventSample(I2OChain const&);
00321
00325 void addFaultyEventSample(I2OChain const&);
00326
00330 void incrementDataDiscardCount(I2OChain const&);
00331
00335 void incrementDQMDiscardCount(I2OChain const&);
00336
00340 void incrementSkippedDiscardCount(I2OChain const&);
00341
00345 OutputModuleResultsList getTopLevelOutputModuleResults() const;
00346
00350 ResourceBrokerResultsList getAllResourceBrokerResults() const;
00351
00355 RBResultPtr getOneResourceBrokerResult(UniqueResourceBrokerID_t) const;
00356
00360 OutputModuleResultsList getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const;
00361
00365 FilterUnitResultsList getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const;
00366
00370 size_t getConnectedEPs() const;
00371
00372
00373 private:
00374
00375
00376 DataSenderMonitorCollection(DataSenderMonitorCollection const&);
00377 DataSenderMonitorCollection& operator=(DataSenderMonitorCollection const&);
00378
00379 virtual void do_calculateStatistics();
00380 virtual void do_reset();
00381 virtual void do_appendInfoSpaceItems(InfoSpaceItems&);
00382 virtual void do_updateInfoSpaceItems();
00383
00384 void faultyEventsAlarm(const uint32_t&) const;
00385 void ignoredDiscardAlarm(const uint32_t&) const;
00386
00387 bool getAllNeededPointers(I2OChain const& i2oChain,
00388 RBRecordPtr& rbRecordPtr,
00389 FURecordPtr& fuRecordPtr,
00390 OutModRecordPtr& topLevelOutModPtr,
00391 OutModRecordPtr& rbSpecificOutModPtr,
00392 OutModRecordPtr& fuSpecificOutModPtr);
00393
00394 bool getRBRecordPointer(I2OChain const& i2oChain,
00395 RBRecordPtr& rbRecordPtr);
00396
00397 bool getFURecordPointer(I2OChain const& i2oChain,
00398 RBRecordPtr& rbRecordPtr,
00399 FURecordPtr& fuRecordPtr);
00400
00401 RBRecordPtr getResourceBrokerRecord(ResourceBrokerKey const&);
00402 UniqueResourceBrokerID_t getUniqueResourceBrokerID(ResourceBrokerKey const&);
00403
00404 FURecordPtr getFilterUnitRecord(RBRecordPtr&, FilterUnitKey const&);
00405
00406 OutModRecordPtr getOutputModuleRecord(OutputModuleRecordMap&,
00407 OutputModuleKey const&);
00408
00409 OutputModuleResultsList buildOutputModuleResults(OutputModuleRecordMap const&) const;
00410
00411 RBResultPtr buildResourceBrokerResult(RBRecordPtr const&) const;
00412
00413 void calcStatsForOutputModules(OutputModuleRecordMap& outputModuleMap);
00414
00415 mutable boost::mutex collectionsMutex_;
00416
00417 xdata::UnsignedInteger32 connectedRBs_;
00418 xdata::UnsignedInteger32 connectedEPs_;
00419 xdata::UnsignedInteger32 activeEPs_;
00420 xdata::Integer32 outstandingDataDiscards_;
00421 xdata::Integer32 outstandingDQMDiscards_;
00422 xdata::UnsignedInteger32 faultyEvents_;
00423 xdata::UnsignedInteger32 ignoredDiscards_;
00424
00425 OutputModuleRecordMap outputModuleMap_;
00426
00427 std::map<ResourceBrokerKey, UniqueResourceBrokerID_t> resourceBrokerIDs_;
00428 std::map<UniqueResourceBrokerID_t, RBRecordPtr> resourceBrokerMap_;
00429
00430 const utils::Duration_t updateInterval_;
00431 AlarmHandlerPtr alarmHandler_;
00432
00433 };
00434
00435 bool compareRBResultPtrValues(DataSenderMonitorCollection::RBResultPtr firstValue,
00436 DataSenderMonitorCollection::RBResultPtr secondValue);
00437
00438 }
00439
00440 #endif // EventFilter_StorageManager_DataSenderMonitorCollection_h
00441
00442