00001
00003
00004 #ifndef EventFilter_StorageManager_DataSenderMonitorCollection_h
00005 #define EventFilter_StorageManager_DataSenderMonitorCollection_h
00006
00007 #include <map>
00008
00009 #include "xdata/UnsignedInteger32.h"
00010 #include "xdata/Integer32.h"
00011
00012 #include <boost/thread/mutex.hpp>
00013 #include <boost/shared_ptr.hpp>
00014
00015 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00016 #include "EventFilter/StorageManager/interface/I2OChain.h"
00017 #include "EventFilter/StorageManager/interface/MonitorCollection.h"
00018 #include "IOPool/Streamer/interface/MsgHeader.h"
00019
00020 namespace stor {
00021
00031 class DataSenderMonitorCollection : public MonitorCollection
00032 {
00033 public:
00034
00038 struct ResourceBrokerKey
00039 {
00040 bool isValid;
00041 std::string hltURL;
00042 uint32_t hltTid;
00043 uint32_t hltInstance;
00044 uint32_t hltLocalId;
00045 std::string hltClassName;
00046
00047 explicit ResourceBrokerKey(I2OChain const& i2oChain)
00048 {
00049 if (i2oChain.messageCode() != Header::INVALID)
00050 {
00051 isValid = true;
00052 hltURL = i2oChain.hltURL();
00053 hltTid = i2oChain.hltTid();
00054 hltInstance = i2oChain.hltInstance();
00055 hltLocalId = i2oChain.hltLocalId();
00056 hltClassName = i2oChain.hltClassName();
00057 }
00058 else
00059 {
00060 isValid = false;
00061 }
00062 }
00063
00064 bool operator<(ResourceBrokerKey const& other) const
00065 {
00066 if (isValid != other.isValid) return isValid < other.isValid;
00067 if (hltURL != other.hltURL) return hltURL < other.hltURL;
00068 if (hltTid != other.hltTid) return hltTid < other.hltTid;
00069 if (hltInstance != other.hltInstance) return hltInstance < other.hltInstance;
00070 if (hltLocalId != other.hltLocalId) return hltLocalId < other.hltLocalId;
00071 return hltClassName < other.hltClassName;
00072 }
00073 };
00074
00078 struct FilterUnitKey
00079 {
00080 bool isValid;
00081 uint32_t fuProcessId;
00082 uint32_t fuGuid;
00083
00084 explicit FilterUnitKey(I2OChain const& i2oChain)
00085 {
00086 if (i2oChain.messageCode() != Header::INVALID)
00087 {
00088 isValid = true;
00089 fuProcessId = i2oChain.fuProcessId();
00090 fuGuid = i2oChain.fuGuid();
00091 }
00092 else
00093 {
00094 isValid = false;
00095 }
00096 }
00097
00098 bool operator<(FilterUnitKey const& other) const
00099 {
00100 if (isValid != other.isValid) return isValid < other.isValid;
00101
00102
00103
00104
00105
00106
00107
00108 return fuProcessId < other.fuProcessId;
00109 }
00110 };
00111
00115 typedef uint32_t OutputModuleKey;
00116
00117
00121 struct OutputModuleRecord
00122 {
00123 std::string name;
00124 OutputModuleKey id;
00125 uint32_t initMsgSize;
00126
00127 MonitoredQuantity eventSize;
00128
00129 OutputModuleRecord(const utils::Duration_t& updateInterval) :
00130 eventSize(updateInterval,boost::posix_time::seconds(10)) {}
00131 };
00132 typedef boost::shared_ptr<OutputModuleRecord> OutModRecordPtr;
00133 typedef std::map<OutputModuleKey, OutModRecordPtr> OutputModuleRecordMap;
00134
00138 struct FilterUnitRecord
00139 {
00140 FilterUnitKey key;
00141 OutputModuleRecordMap outputModuleMap;
00142 MonitoredQuantity shortIntervalEventSize;
00143 MonitoredQuantity mediumIntervalEventSize;
00144 MonitoredQuantity dqmEventSize;
00145 MonitoredQuantity errorEventSize;
00146 MonitoredQuantity faultyEventSize;
00147 MonitoredQuantity faultyDQMEventSize;
00148 MonitoredQuantity dataDiscardCount;
00149 MonitoredQuantity dqmDiscardCount;
00150 MonitoredQuantity skippedDiscardCount;
00151 uint32_t initMsgCount;
00152 uint32_t lastRunNumber;
00153 uint64_t lastEventNumber;
00154
00155 explicit FilterUnitRecord
00156 (
00157 FilterUnitKey fuKey,
00158 const utils::Duration_t& updateInterval
00159 ) :
00160 key(fuKey),
00161 shortIntervalEventSize(updateInterval,boost::posix_time::seconds(10)),
00162 mediumIntervalEventSize(updateInterval,boost::posix_time::seconds(300)),
00163 dqmEventSize(updateInterval,boost::posix_time::seconds(10)),
00164 errorEventSize(updateInterval,boost::posix_time::seconds(10)),
00165 faultyEventSize(updateInterval,boost::posix_time::seconds(10)),
00166 faultyDQMEventSize(updateInterval,boost::posix_time::seconds(10)),
00167 dataDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00168 dqmDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00169 skippedDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00170 initMsgCount(0), lastRunNumber(0), lastEventNumber(0) {}
00171 };
00172 typedef boost::shared_ptr<FilterUnitRecord> FURecordPtr;
00173
00177 struct ResourceBrokerRecord
00178 {
00179 ResourceBrokerKey key;
00180 std::map<FilterUnitKey, FURecordPtr> filterUnitMap;
00181 OutputModuleRecordMap outputModuleMap;
00182 MonitoredQuantity eventSize;
00183 MonitoredQuantity dqmEventSize;
00184 MonitoredQuantity errorEventSize;
00185 MonitoredQuantity faultyEventSize;
00186 MonitoredQuantity faultyDQMEventSize;
00187 MonitoredQuantity dataDiscardCount;
00188 MonitoredQuantity dqmDiscardCount;
00189 MonitoredQuantity skippedDiscardCount;
00190 uint32_t initMsgCount;
00191 uint32_t lastRunNumber;
00192 uint64_t lastEventNumber;
00193
00194 explicit ResourceBrokerRecord
00195 (
00196 ResourceBrokerKey rbKey,
00197 const utils::Duration_t& updateInterval
00198 ) :
00199 key(rbKey),
00200 eventSize(updateInterval,boost::posix_time::seconds(10)),
00201 dqmEventSize(updateInterval,boost::posix_time::seconds(10)),
00202 errorEventSize(updateInterval,boost::posix_time::seconds(10)),
00203 faultyEventSize(updateInterval,boost::posix_time::seconds(10)),
00204 faultyDQMEventSize(updateInterval,boost::posix_time::seconds(10)),
00205 dataDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00206 dqmDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00207 skippedDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00208 initMsgCount(0), lastRunNumber(0), lastEventNumber(0) {}
00209 };
00210 typedef boost::shared_ptr<ResourceBrokerRecord> RBRecordPtr;
00211
00212
00216 struct OutputModuleResult
00217 {
00218 std::string name;
00219 OutputModuleKey id;
00220 uint32_t initMsgSize;
00221 MonitoredQuantity::Stats eventStats;
00222 };
00223 typedef std::vector< boost::shared_ptr<OutputModuleResult> >
00224 OutputModuleResultsList;
00225
00229 typedef long long UniqueResourceBrokerID_t;
00230 struct ResourceBrokerResult
00231 {
00232 ResourceBrokerKey key;
00233 uint32_t filterUnitCount;
00234 uint32_t initMsgCount;
00235 uint32_t lastRunNumber;
00236 uint64_t lastEventNumber;
00237 MonitoredQuantity::Stats eventStats;
00238 MonitoredQuantity::Stats dqmEventStats;
00239 MonitoredQuantity::Stats errorEventStats;
00240 MonitoredQuantity::Stats faultyEventStats;
00241 MonitoredQuantity::Stats faultyDQMEventStats;
00242 MonitoredQuantity::Stats dataDiscardStats;
00243 MonitoredQuantity::Stats dqmDiscardStats;
00244 MonitoredQuantity::Stats skippedDiscardStats;
00245 UniqueResourceBrokerID_t uniqueRBID;
00246 int outstandingDataDiscardCount;
00247 int outstandingDQMDiscardCount;
00248
00249 explicit ResourceBrokerResult(ResourceBrokerKey const& rbKey):
00250 key(rbKey), filterUnitCount(0), initMsgCount(0),
00251 lastRunNumber(0), lastEventNumber(0), uniqueRBID(0),
00252 outstandingDataDiscardCount(0), outstandingDQMDiscardCount(0) {}
00253
00254 bool operator<(ResourceBrokerResult const& other) const
00255 {
00256 return key < other.key;
00257 }
00258 };
00259 typedef boost::shared_ptr<ResourceBrokerResult> RBResultPtr;
00260 typedef std::vector<RBResultPtr> ResourceBrokerResultsList;
00261
00265 struct FilterUnitResult
00266 {
00267 FilterUnitKey key;
00268 uint32_t initMsgCount;
00269 uint32_t lastRunNumber;
00270 uint64_t lastEventNumber;
00271 MonitoredQuantity::Stats shortIntervalEventStats;
00272 MonitoredQuantity::Stats mediumIntervalEventStats;
00273 MonitoredQuantity::Stats dqmEventStats;
00274 MonitoredQuantity::Stats errorEventStats;
00275 MonitoredQuantity::Stats faultyEventStats;
00276 MonitoredQuantity::Stats faultyDQMEventStats;
00277 MonitoredQuantity::Stats dataDiscardStats;
00278 MonitoredQuantity::Stats dqmDiscardStats;
00279 MonitoredQuantity::Stats skippedDiscardStats;
00280 int outstandingDataDiscardCount;
00281 int outstandingDQMDiscardCount;
00282
00283 explicit FilterUnitResult(FilterUnitKey const& fuKey):
00284 key(fuKey), initMsgCount(0), lastRunNumber(0), lastEventNumber(0),
00285 outstandingDataDiscardCount(0), outstandingDQMDiscardCount(0) {}
00286 };
00287 typedef boost::shared_ptr<FilterUnitResult> FUResultPtr;
00288 typedef std::vector<FUResultPtr> FilterUnitResultsList;
00289
00290
00294 DataSenderMonitorCollection
00295 (
00296 const utils::Duration_t& updateInterval,
00297 AlarmHandlerPtr
00298 );
00299
00303 void addFragmentSample(I2OChain const&);
00304
00308 void addInitSample(I2OChain const&);
00309
00313 void addEventSample(I2OChain const&);
00314
00318 void addDQMEventSample(I2OChain const&);
00319
00323 void addErrorEventSample(I2OChain const&);
00324
00328 void addFaultyEventSample(I2OChain const&);
00329
00333 void incrementDataDiscardCount(I2OChain const&);
00334
00338 void incrementDQMDiscardCount(I2OChain const&);
00339
00343 void incrementSkippedDiscardCount(I2OChain const&);
00344
00348 OutputModuleResultsList getTopLevelOutputModuleResults() const;
00349
00353 ResourceBrokerResultsList getAllResourceBrokerResults() const;
00354
00358 RBResultPtr getOneResourceBrokerResult(UniqueResourceBrokerID_t) const;
00359
00363 OutputModuleResultsList getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const;
00364
00368 FilterUnitResultsList getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const;
00369
00370 private:
00371
00372
00373 DataSenderMonitorCollection(DataSenderMonitorCollection const&);
00374 DataSenderMonitorCollection& operator=(DataSenderMonitorCollection const&);
00375
00376 virtual void do_calculateStatistics();
00377 virtual void do_reset();
00378 virtual void do_appendInfoSpaceItems(InfoSpaceItems&);
00379 virtual void do_updateInfoSpaceItems();
00380
00381 void faultyEventsAlarm(const uint32_t&) const;
00382 void ignoredDiscardAlarm(const uint32_t&) const;
00383
00384 bool getAllNeededPointers(I2OChain const& i2oChain,
00385 RBRecordPtr& rbRecordPtr,
00386 FURecordPtr& fuRecordPtr,
00387 OutModRecordPtr& topLevelOutModPtr,
00388 OutModRecordPtr& rbSpecificOutModPtr,
00389 OutModRecordPtr& fuSpecificOutModPtr);
00390
00391 bool getRBRecordPointer(I2OChain const& i2oChain,
00392 RBRecordPtr& rbRecordPtr);
00393
00394 bool getFURecordPointer(I2OChain const& i2oChain,
00395 RBRecordPtr& rbRecordPtr,
00396 FURecordPtr& fuRecordPtr);
00397
00398 RBRecordPtr getResourceBrokerRecord(ResourceBrokerKey const&);
00399 UniqueResourceBrokerID_t getUniqueResourceBrokerID(ResourceBrokerKey const&);
00400
00401 FURecordPtr getFilterUnitRecord(RBRecordPtr&, FilterUnitKey const&);
00402
00403 OutModRecordPtr getOutputModuleRecord(OutputModuleRecordMap&,
00404 OutputModuleKey const&);
00405
00406 OutputModuleResultsList buildOutputModuleResults(OutputModuleRecordMap const&) const;
00407
00408 RBResultPtr buildResourceBrokerResult(RBRecordPtr const&) const;
00409
00410 void calcStatsForOutputModules(OutputModuleRecordMap& outputModuleMap);
00411
00412 mutable boost::mutex collectionsMutex_;
00413
00414 xdata::UnsignedInteger32 connectedRBs_;
00415 xdata::UnsignedInteger32 connectedEPs_;
00416 xdata::UnsignedInteger32 activeEPs_;
00417 xdata::Integer32 outstandingDataDiscards_;
00418 xdata::Integer32 outstandingDQMDiscards_;
00419 xdata::UnsignedInteger32 faultyEvents_;
00420 xdata::UnsignedInteger32 ignoredDiscards_;
00421
00422 OutputModuleRecordMap outputModuleMap_;
00423
00424 std::map<ResourceBrokerKey, UniqueResourceBrokerID_t> resourceBrokerIDs_;
00425 std::map<UniqueResourceBrokerID_t, RBRecordPtr> resourceBrokerMap_;
00426
00427 const utils::Duration_t updateInterval_;
00428 AlarmHandlerPtr alarmHandler_;
00429
00430 };
00431
00432 bool compareRBResultPtrValues(DataSenderMonitorCollection::RBResultPtr firstValue,
00433 DataSenderMonitorCollection::RBResultPtr secondValue);
00434
00435 }
00436
00437 #endif // EventFilter_StorageManager_DataSenderMonitorCollection_h
00438
00439