CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_4_5_patch3/src/EventFilter/StorageManager/interface/DataSenderMonitorCollection.h

Go to the documentation of this file.
00001 // $Id: DataSenderMonitorCollection.h,v 1.18 2011/03/07 15:31:31 mommsen Exp $
00003 
00004 #ifndef EventFilter_StorageManager_DataSenderMonitorCollection_h
00005 #define EventFilter_StorageManager_DataSenderMonitorCollection_h
00006 
00007 #include <map>
00008 
00009 #include "xdata/UnsignedInteger32.h"
00010 #include "xdata/Integer32.h"
00011 
00012 #include <boost/thread/mutex.hpp>
00013 #include <boost/shared_ptr.hpp>
00014 
00015 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00016 #include "EventFilter/StorageManager/interface/I2OChain.h"
00017 #include "EventFilter/StorageManager/interface/MonitorCollection.h"
00018 #include "IOPool/Streamer/interface/MsgHeader.h"
00019 
00020 namespace stor {
00021 
00031   class DataSenderMonitorCollection : public MonitorCollection
00032   {
00033   public:
00034 
00038     struct ResourceBrokerKey
00039     {
00040       bool isValid;
00041       std::string hltURL;
00042       uint32_t hltTid;
00043       uint32_t hltInstance;
00044       uint32_t hltLocalId;
00045       std::string hltClassName;
00046 
00047       explicit ResourceBrokerKey(I2OChain const& i2oChain)
00048       {
00049         if (i2oChain.messageCode() != Header::INVALID)
00050           {
00051             isValid = true;
00052             hltURL = i2oChain.hltURL();
00053             hltTid = i2oChain.hltTid();
00054             hltInstance = i2oChain.hltInstance();
00055             hltLocalId = i2oChain.hltLocalId();
00056             hltClassName = i2oChain.hltClassName();
00057           }
00058         else
00059           {
00060             isValid = false;
00061           }
00062       }
00063 
00064       bool operator<(ResourceBrokerKey const& other) const
00065       {
00066         if (isValid != other.isValid) return isValid < other.isValid;
00067         if (hltURL != other.hltURL) return hltURL < other.hltURL;
00068         if (hltTid != other.hltTid) return hltTid < other.hltTid;
00069         if (hltInstance != other.hltInstance) return hltInstance < other.hltInstance;
00070         if (hltLocalId != other.hltLocalId) return hltLocalId < other.hltLocalId;
00071         return hltClassName < other.hltClassName;
00072       }
00073     };
00074 
00078     struct FilterUnitKey
00079     {
00080       bool isValid;
00081       uint32_t fuProcessId;
00082       uint32_t fuGuid;
00083 
00084       explicit FilterUnitKey(I2OChain const& i2oChain)
00085       {
00086         if (i2oChain.messageCode() != Header::INVALID)
00087           {
00088             isValid = true;
00089             fuProcessId = i2oChain.fuProcessId();
00090             fuGuid = i2oChain.fuGuid();
00091           }
00092         else
00093           {
00094             isValid = false;
00095           }
00096       }
00097 
00098       bool operator<(FilterUnitKey const& other) const
00099       {
00100         if (isValid != other.isValid) return isValid < other.isValid;
00101 
00102         // 30-Jun-2009, KAB - we want to keep stats for each filter unit without
00103         // separating out the output modules.  So, we should only use the process ID
00104         // in the key.  (The GUID is different for each output module.)
00105         //if (fuProcessId != other.fuProcessId) return fuProcessId < other.fuProcessId;
00106         //return fuGuid < other.fuGuid;
00107 
00108         return fuProcessId < other.fuProcessId;
00109       }
00110     };
00111 
00115     typedef uint32_t OutputModuleKey;
00116 
00117 
00121     struct OutputModuleRecord
00122     {
00123       std::string name;
00124       OutputModuleKey id;
00125       uint32_t initMsgSize;
00126       //MonitoredQuantity fragmentSize;
00127       MonitoredQuantity eventSize;
00128       
00129       OutputModuleRecord(const utils::Duration_t& updateInterval) :
00130       eventSize(updateInterval,boost::posix_time::seconds(10)) {}
00131     };
00132     typedef boost::shared_ptr<OutputModuleRecord> OutModRecordPtr;
00133     typedef std::map<OutputModuleKey, OutModRecordPtr> OutputModuleRecordMap;
00134 
00138     struct FilterUnitRecord
00139     {
00140       FilterUnitKey key;
00141       OutputModuleRecordMap outputModuleMap;
00142       MonitoredQuantity shortIntervalEventSize;
00143       MonitoredQuantity mediumIntervalEventSize;
00144       MonitoredQuantity dqmEventSize;
00145       MonitoredQuantity errorEventSize;
00146       MonitoredQuantity faultyEventSize;
00147       MonitoredQuantity faultyDQMEventSize;
00148       MonitoredQuantity dataDiscardCount;
00149       MonitoredQuantity dqmDiscardCount;
00150       MonitoredQuantity skippedDiscardCount;
00151       uint32_t initMsgCount;
00152       uint32_t lastRunNumber;
00153       uint64_t lastEventNumber;
00154 
00155       explicit FilterUnitRecord
00156       (
00157         FilterUnitKey fuKey,
00158         const utils::Duration_t& updateInterval
00159       ) :
00160         key(fuKey),
00161         shortIntervalEventSize(updateInterval,boost::posix_time::seconds(10)),
00162         mediumIntervalEventSize(updateInterval,boost::posix_time::seconds(300)),
00163         dqmEventSize(updateInterval,boost::posix_time::seconds(10)),
00164         errorEventSize(updateInterval,boost::posix_time::seconds(10)),
00165         faultyEventSize(updateInterval,boost::posix_time::seconds(10)),
00166         faultyDQMEventSize(updateInterval,boost::posix_time::seconds(10)),
00167         dataDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00168         dqmDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00169         skippedDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00170         initMsgCount(0), lastRunNumber(0), lastEventNumber(0) {}
00171     };
00172     typedef boost::shared_ptr<FilterUnitRecord> FURecordPtr;
00173 
00177     struct ResourceBrokerRecord
00178     {
00179       ResourceBrokerKey key;
00180       std::map<FilterUnitKey, FURecordPtr> filterUnitMap;
00181       OutputModuleRecordMap outputModuleMap;
00182       MonitoredQuantity eventSize;
00183       MonitoredQuantity dqmEventSize;
00184       MonitoredQuantity errorEventSize;
00185       MonitoredQuantity faultyEventSize;
00186       MonitoredQuantity faultyDQMEventSize;
00187       MonitoredQuantity dataDiscardCount;
00188       MonitoredQuantity dqmDiscardCount;
00189       MonitoredQuantity skippedDiscardCount;
00190       uint32_t initMsgCount;
00191       uint32_t lastRunNumber;
00192       uint64_t lastEventNumber;
00193 
00194       explicit ResourceBrokerRecord
00195       (
00196         ResourceBrokerKey rbKey,
00197         const utils::Duration_t& updateInterval
00198       ) :
00199         key(rbKey),
00200         eventSize(updateInterval,boost::posix_time::seconds(10)),
00201         dqmEventSize(updateInterval,boost::posix_time::seconds(10)),
00202         errorEventSize(updateInterval,boost::posix_time::seconds(10)),
00203         faultyEventSize(updateInterval,boost::posix_time::seconds(10)),
00204         faultyDQMEventSize(updateInterval,boost::posix_time::seconds(10)),
00205         dataDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00206         dqmDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00207         skippedDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00208         initMsgCount(0), lastRunNumber(0), lastEventNumber(0) {}
00209     };
00210     typedef boost::shared_ptr<ResourceBrokerRecord> RBRecordPtr;
00211 
00212 
00216     struct OutputModuleResult
00217     {
00218       std::string name;
00219       OutputModuleKey id;
00220       uint32_t initMsgSize;
00221       MonitoredQuantity::Stats eventStats;
00222     };
00223     typedef std::vector< boost::shared_ptr<OutputModuleResult> >
00224       OutputModuleResultsList;
00225 
00229     typedef long long UniqueResourceBrokerID_t;
00230     struct ResourceBrokerResult
00231     {
00232       ResourceBrokerKey key;
00233       uint32_t filterUnitCount;
00234       uint32_t initMsgCount;
00235       uint32_t lastRunNumber;
00236       uint64_t lastEventNumber;
00237       MonitoredQuantity::Stats eventStats;
00238       MonitoredQuantity::Stats dqmEventStats;
00239       MonitoredQuantity::Stats errorEventStats;
00240       MonitoredQuantity::Stats faultyEventStats;
00241       MonitoredQuantity::Stats faultyDQMEventStats;
00242       MonitoredQuantity::Stats dataDiscardStats;
00243       MonitoredQuantity::Stats dqmDiscardStats;
00244       MonitoredQuantity::Stats skippedDiscardStats;
00245       UniqueResourceBrokerID_t uniqueRBID;
00246       int outstandingDataDiscardCount;
00247       int outstandingDQMDiscardCount;
00248 
00249       explicit ResourceBrokerResult(ResourceBrokerKey const& rbKey):
00250         key(rbKey), filterUnitCount(0), initMsgCount(0),
00251         lastRunNumber(0), lastEventNumber(0), uniqueRBID(0),
00252         outstandingDataDiscardCount(0), outstandingDQMDiscardCount(0) {}
00253 
00254       bool operator<(ResourceBrokerResult const& other) const
00255       {
00256         return key < other.key;
00257       }
00258     };
00259     typedef boost::shared_ptr<ResourceBrokerResult> RBResultPtr;
00260     typedef std::vector<RBResultPtr> ResourceBrokerResultsList;
00261 
00265     struct FilterUnitResult
00266     {
00267       FilterUnitKey key;
00268       uint32_t initMsgCount;
00269       uint32_t lastRunNumber;
00270       uint64_t lastEventNumber;
00271       MonitoredQuantity::Stats shortIntervalEventStats;
00272       MonitoredQuantity::Stats mediumIntervalEventStats;
00273       MonitoredQuantity::Stats dqmEventStats;
00274       MonitoredQuantity::Stats errorEventStats;
00275       MonitoredQuantity::Stats faultyEventStats;
00276       MonitoredQuantity::Stats faultyDQMEventStats;
00277       MonitoredQuantity::Stats dataDiscardStats;
00278       MonitoredQuantity::Stats dqmDiscardStats;
00279       MonitoredQuantity::Stats skippedDiscardStats;
00280       int outstandingDataDiscardCount;
00281       int outstandingDQMDiscardCount;
00282 
00283       explicit FilterUnitResult(FilterUnitKey const& fuKey):
00284         key(fuKey), initMsgCount(0), lastRunNumber(0), lastEventNumber(0),
00285         outstandingDataDiscardCount(0), outstandingDQMDiscardCount(0) {}
00286     };
00287     typedef boost::shared_ptr<FilterUnitResult> FUResultPtr;
00288     typedef std::vector<FUResultPtr> FilterUnitResultsList;
00289 
00290 
00294     DataSenderMonitorCollection
00295     (
00296       const utils::Duration_t& updateInterval,
00297       AlarmHandlerPtr
00298     );
00299 
00303     void addFragmentSample(I2OChain const&);
00304 
00308     void addInitSample(I2OChain const&);
00309 
00313     void addEventSample(I2OChain const&);
00314 
00318     void addDQMEventSample(I2OChain const&);
00319 
00323     void addErrorEventSample(I2OChain const&);
00324 
00328     void addFaultyEventSample(I2OChain const&);
00329 
00333     void incrementDataDiscardCount(I2OChain const&);
00334 
00338     void incrementDQMDiscardCount(I2OChain const&);
00339 
00343     void incrementSkippedDiscardCount(I2OChain const&);
00344 
00348     OutputModuleResultsList getTopLevelOutputModuleResults() const;
00349 
00353     ResourceBrokerResultsList getAllResourceBrokerResults() const;
00354 
00358     RBResultPtr getOneResourceBrokerResult(UniqueResourceBrokerID_t) const;
00359 
00363     OutputModuleResultsList getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const;
00364 
00368     FilterUnitResultsList getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const;
00369 
00370   private:
00371 
00372     //Prevent copying of the DataSenderMonitorCollection
00373     DataSenderMonitorCollection(DataSenderMonitorCollection const&);
00374     DataSenderMonitorCollection& operator=(DataSenderMonitorCollection const&);
00375 
00376     virtual void do_calculateStatistics();
00377     virtual void do_reset();
00378     virtual void do_appendInfoSpaceItems(InfoSpaceItems&);
00379     virtual void do_updateInfoSpaceItems();
00380 
00381     void faultyEventsAlarm(const uint32_t&) const;
00382     void ignoredDiscardAlarm(const uint32_t&) const;
00383 
00384     bool getAllNeededPointers(I2OChain const& i2oChain,
00385                               RBRecordPtr& rbRecordPtr,
00386                               FURecordPtr& fuRecordPtr,
00387                               OutModRecordPtr& topLevelOutModPtr,
00388                               OutModRecordPtr& rbSpecificOutModPtr,
00389                               OutModRecordPtr& fuSpecificOutModPtr);
00390 
00391     bool getRBRecordPointer(I2OChain const& i2oChain,
00392                             RBRecordPtr& rbRecordPtr);
00393 
00394     bool getFURecordPointer(I2OChain const& i2oChain,
00395                             RBRecordPtr& rbRecordPtr,
00396                             FURecordPtr& fuRecordPtr);
00397 
00398     RBRecordPtr getResourceBrokerRecord(ResourceBrokerKey const&);
00399     UniqueResourceBrokerID_t getUniqueResourceBrokerID(ResourceBrokerKey const&);
00400 
00401     FURecordPtr getFilterUnitRecord(RBRecordPtr&, FilterUnitKey const&);
00402 
00403     OutModRecordPtr getOutputModuleRecord(OutputModuleRecordMap&,
00404                                           OutputModuleKey const&);
00405 
00406     OutputModuleResultsList buildOutputModuleResults(OutputModuleRecordMap const&) const;
00407 
00408     RBResultPtr buildResourceBrokerResult(RBRecordPtr const&) const;
00409 
00410     void calcStatsForOutputModules(OutputModuleRecordMap& outputModuleMap);
00411 
00412     mutable boost::mutex collectionsMutex_;
00413 
00414     xdata::UnsignedInteger32 connectedRBs_;
00415     xdata::UnsignedInteger32 connectedEPs_;
00416     xdata::UnsignedInteger32 activeEPs_;
00417     xdata::Integer32 outstandingDataDiscards_;
00418     xdata::Integer32 outstandingDQMDiscards_;
00419     xdata::UnsignedInteger32 faultyEvents_;
00420     xdata::UnsignedInteger32 ignoredDiscards_;
00421 
00422     OutputModuleRecordMap outputModuleMap_;
00423 
00424     std::map<ResourceBrokerKey, UniqueResourceBrokerID_t> resourceBrokerIDs_;
00425     std::map<UniqueResourceBrokerID_t, RBRecordPtr> resourceBrokerMap_;
00426 
00427     const utils::Duration_t updateInterval_;
00428     AlarmHandlerPtr alarmHandler_;
00429 
00430   };
00431 
00432   bool compareRBResultPtrValues(DataSenderMonitorCollection::RBResultPtr firstValue,
00433                                 DataSenderMonitorCollection::RBResultPtr secondValue);
00434 
00435 } // namespace stor
00436 
00437 #endif // EventFilter_StorageManager_DataSenderMonitorCollection_h 
00438 
00439