CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_14/src/EventFilter/StorageManager/interface/DataSenderMonitorCollection.h

Go to the documentation of this file.
00001 // $Id: DataSenderMonitorCollection.h,v 1.20 2012/04/20 10:48:18 mommsen Exp $
00003 
00004 #ifndef EventFilter_StorageManager_DataSenderMonitorCollection_h
00005 #define EventFilter_StorageManager_DataSenderMonitorCollection_h
00006 
00007 #include <map>
00008 
00009 #include "xdata/UnsignedInteger32.h"
00010 #include "xdata/Integer32.h"
00011 
00012 #include <boost/thread/mutex.hpp>
00013 #include <boost/shared_ptr.hpp>
00014 
00015 #include "EventFilter/StorageManager/interface/AlarmHandler.h"
00016 #include "EventFilter/StorageManager/interface/I2OChain.h"
00017 #include "EventFilter/StorageManager/interface/MonitorCollection.h"
00018 #include "IOPool/Streamer/interface/MsgHeader.h"
00019 
00020 namespace stor {
00021 
00031   class DataSenderMonitorCollection : public MonitorCollection
00032   {
00033   public:
00034 
00038     struct ResourceBrokerKey
00039     {
00040       bool isValid;
00041       std::string hltURL;
00042       uint32_t hltTid;
00043       uint32_t hltInstance;
00044       uint32_t hltLocalId;
00045       std::string hltClassName;
00046 
00047       explicit ResourceBrokerKey(I2OChain const& i2oChain)
00048       {
00049         if (i2oChain.messageCode() != Header::INVALID)
00050           {
00051             isValid = true;
00052             hltURL = i2oChain.hltURL();
00053             hltTid = i2oChain.hltTid();
00054             hltInstance = i2oChain.hltInstance();
00055             hltLocalId = i2oChain.hltLocalId();
00056             hltClassName = i2oChain.hltClassName();
00057           }
00058         else
00059           {
00060             isValid = false;
00061           }
00062       }
00063 
00064       bool operator<(ResourceBrokerKey const& other) const
00065       {
00066         if (isValid != other.isValid) return isValid < other.isValid;
00067         if (hltURL != other.hltURL) return hltURL < other.hltURL;
00068         if (hltTid != other.hltTid) return hltTid < other.hltTid;
00069         if (hltInstance != other.hltInstance) return hltInstance < other.hltInstance;
00070         if (hltLocalId != other.hltLocalId) return hltLocalId < other.hltLocalId;
00071         return hltClassName < other.hltClassName;
00072       }
00073     };
00074 
00078     struct FilterUnitKey
00079     {
00080       bool isValid;
00081       uint32_t fuProcessId;
00082       uint32_t fuGuid;
00083 
00084       explicit FilterUnitKey(I2OChain const& i2oChain)
00085       {
00086         if (i2oChain.messageCode() != Header::INVALID)
00087           {
00088             isValid = true;
00089             fuProcessId = i2oChain.fuProcessId();
00090             fuGuid = i2oChain.fuGuid();
00091           }
00092         else
00093           {
00094             isValid = false;
00095           }
00096       }
00097 
00098       bool operator<(FilterUnitKey const& other) const
00099       {
00100         if (isValid != other.isValid) return isValid < other.isValid;
00101 
00102         // 30-Jun-2009, KAB - we want to keep stats for each filter unit without
00103         // separating out the output modules.  So, we should only use the process ID
00104         // in the key.  (The GUID is different for each output module.)
00105         //if (fuProcessId != other.fuProcessId) return fuProcessId < other.fuProcessId;
00106         //return fuGuid < other.fuGuid;
00107 
00108         return fuProcessId < other.fuProcessId;
00109       }
00110     };
00111 
00115     typedef uint32_t OutputModuleKey;
00116 
00117 
00121     struct OutputModuleRecord
00122     {
00123       std::string name;
00124       OutputModuleKey id;
00125       uint32_t initMsgSize;
00126       MonitoredQuantity fragmentSize;
00127       MonitoredQuantity eventSize;
00128       
00129       OutputModuleRecord(const utils::Duration_t& updateInterval) :
00130       fragmentSize(updateInterval,boost::posix_time::seconds(10)),
00131       eventSize(updateInterval,boost::posix_time::seconds(10)) {}
00132     };
00133     typedef boost::shared_ptr<OutputModuleRecord> OutModRecordPtr;
00134     typedef std::map<OutputModuleKey, OutModRecordPtr> OutputModuleRecordMap;
00135 
00139     struct FilterUnitRecord
00140     {
00141       FilterUnitKey key;
00142       OutputModuleRecordMap outputModuleMap;
00143       MonitoredQuantity shortIntervalEventSize;
00144       MonitoredQuantity mediumIntervalEventSize;
00145       MonitoredQuantity dqmEventSize;
00146       MonitoredQuantity errorEventSize;
00147       MonitoredQuantity faultyEventSize;
00148       MonitoredQuantity faultyDQMEventSize;
00149       MonitoredQuantity dataDiscardCount;
00150       MonitoredQuantity dqmDiscardCount;
00151       MonitoredQuantity skippedDiscardCount;
00152       uint32_t initMsgCount;
00153       uint32_t lastRunNumber;
00154       uint64_t lastEventNumber;
00155 
00156       explicit FilterUnitRecord
00157       (
00158         FilterUnitKey fuKey,
00159         const utils::Duration_t& updateInterval
00160       ) :
00161         key(fuKey),
00162         shortIntervalEventSize(updateInterval,boost::posix_time::seconds(10)),
00163         mediumIntervalEventSize(updateInterval,boost::posix_time::seconds(300)),
00164         dqmEventSize(updateInterval,boost::posix_time::seconds(10)),
00165         errorEventSize(updateInterval,boost::posix_time::seconds(10)),
00166         faultyEventSize(updateInterval,boost::posix_time::seconds(10)),
00167         faultyDQMEventSize(updateInterval,boost::posix_time::seconds(10)),
00168         dataDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00169         dqmDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00170         skippedDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00171         initMsgCount(0), lastRunNumber(0), lastEventNumber(0) {}
00172     };
00173     typedef boost::shared_ptr<FilterUnitRecord> FURecordPtr;
00174 
00178     struct ResourceBrokerRecord
00179     {
00180       ResourceBrokerKey key;
00181       std::map<FilterUnitKey, FURecordPtr> filterUnitMap;
00182       OutputModuleRecordMap outputModuleMap;
00183       MonitoredQuantity eventSize;
00184       MonitoredQuantity dqmEventSize;
00185       MonitoredQuantity errorEventSize;
00186       MonitoredQuantity faultyEventSize;
00187       MonitoredQuantity faultyDQMEventSize;
00188       MonitoredQuantity dataDiscardCount;
00189       MonitoredQuantity dqmDiscardCount;
00190       MonitoredQuantity skippedDiscardCount;
00191       uint32_t nExpectedEPs;
00192       uint32_t initMsgCount;
00193       uint32_t lastRunNumber;
00194       uint64_t lastEventNumber;
00195 
00196       explicit ResourceBrokerRecord
00197       (
00198         ResourceBrokerKey rbKey,
00199         const utils::Duration_t& updateInterval
00200       ) :
00201         key(rbKey),
00202         eventSize(updateInterval,boost::posix_time::seconds(10)),
00203         dqmEventSize(updateInterval,boost::posix_time::seconds(10)),
00204         errorEventSize(updateInterval,boost::posix_time::seconds(10)),
00205         faultyEventSize(updateInterval,boost::posix_time::seconds(10)),
00206         faultyDQMEventSize(updateInterval,boost::posix_time::seconds(10)),
00207         dataDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00208         dqmDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00209         skippedDiscardCount(updateInterval,boost::posix_time::seconds(10)),
00210         nExpectedEPs(0), initMsgCount(0), lastRunNumber(0), lastEventNumber(0) {}
00211     };
00212     typedef boost::shared_ptr<ResourceBrokerRecord> RBRecordPtr;
00213 
00214 
00218     struct OutputModuleResult
00219     {
00220       std::string name;
00221       OutputModuleKey id;
00222       uint32_t initMsgSize;
00223       MonitoredQuantity::Stats eventStats;
00224     };
00225     typedef std::vector< boost::shared_ptr<OutputModuleResult> >
00226       OutputModuleResultsList;
00227 
00231     typedef long long UniqueResourceBrokerID_t;
00232     struct ResourceBrokerResult
00233     {
00234       ResourceBrokerKey key;
00235       uint32_t filterUnitCount;
00236       uint32_t initMsgCount;
00237       uint32_t lastRunNumber;
00238       uint64_t lastEventNumber;
00239       MonitoredQuantity::Stats eventStats;
00240       MonitoredQuantity::Stats dqmEventStats;
00241       MonitoredQuantity::Stats errorEventStats;
00242       MonitoredQuantity::Stats faultyEventStats;
00243       MonitoredQuantity::Stats faultyDQMEventStats;
00244       MonitoredQuantity::Stats dataDiscardStats;
00245       MonitoredQuantity::Stats dqmDiscardStats;
00246       MonitoredQuantity::Stats skippedDiscardStats;
00247       UniqueResourceBrokerID_t uniqueRBID;
00248       int outstandingDataDiscardCount;
00249       int outstandingDQMDiscardCount;
00250 
00251       explicit ResourceBrokerResult(ResourceBrokerKey const& rbKey):
00252         key(rbKey), filterUnitCount(0), initMsgCount(0),
00253         lastRunNumber(0), lastEventNumber(0), uniqueRBID(0),
00254         outstandingDataDiscardCount(0), outstandingDQMDiscardCount(0) {}
00255 
00256       bool operator<(ResourceBrokerResult const& other) const
00257       {
00258         return key < other.key;
00259       }
00260     };
00261     typedef boost::shared_ptr<ResourceBrokerResult> RBResultPtr;
00262     typedef std::vector<RBResultPtr> ResourceBrokerResultsList;
00263 
00267     struct FilterUnitResult
00268     {
00269       FilterUnitKey key;
00270       uint32_t initMsgCount;
00271       uint32_t lastRunNumber;
00272       uint64_t lastEventNumber;
00273       MonitoredQuantity::Stats shortIntervalEventStats;
00274       MonitoredQuantity::Stats mediumIntervalEventStats;
00275       MonitoredQuantity::Stats dqmEventStats;
00276       MonitoredQuantity::Stats errorEventStats;
00277       MonitoredQuantity::Stats faultyEventStats;
00278       MonitoredQuantity::Stats faultyDQMEventStats;
00279       MonitoredQuantity::Stats dataDiscardStats;
00280       MonitoredQuantity::Stats dqmDiscardStats;
00281       MonitoredQuantity::Stats skippedDiscardStats;
00282       int outstandingDataDiscardCount;
00283       int outstandingDQMDiscardCount;
00284 
00285       explicit FilterUnitResult(FilterUnitKey const& fuKey):
00286         key(fuKey), initMsgCount(0), lastRunNumber(0), lastEventNumber(0),
00287         outstandingDataDiscardCount(0), outstandingDQMDiscardCount(0) {}
00288     };
00289     typedef boost::shared_ptr<FilterUnitResult> FUResultPtr;
00290     typedef std::vector<FUResultPtr> FilterUnitResultsList;
00291 
00292 
00296     DataSenderMonitorCollection
00297     (
00298       const utils::Duration_t& updateInterval,
00299       AlarmHandlerPtr
00300     );
00301 
00305     void addInitSample(I2OChain const&);
00306 
00310     void addEventSample(I2OChain const&);
00311 
00315     void addDQMEventSample(I2OChain const&);
00316 
00320     void addErrorEventSample(I2OChain const&);
00321 
00325     void addFaultyEventSample(I2OChain const&);
00326 
00330     void incrementDataDiscardCount(I2OChain const&);
00331 
00335     void incrementDQMDiscardCount(I2OChain const&);
00336 
00340     void incrementSkippedDiscardCount(I2OChain const&);
00341 
00345     OutputModuleResultsList getTopLevelOutputModuleResults() const;
00346 
00350     ResourceBrokerResultsList getAllResourceBrokerResults() const;
00351 
00355     RBResultPtr getOneResourceBrokerResult(UniqueResourceBrokerID_t) const;
00356 
00360     OutputModuleResultsList getOutputModuleResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const;
00361 
00365     FilterUnitResultsList getFilterUnitResultsForRB(UniqueResourceBrokerID_t uniqueRBID) const;
00366 
00370     size_t getConnectedEPs() const;
00371 
00372 
00373   private:
00374 
00375     //Prevent copying of the DataSenderMonitorCollection
00376     DataSenderMonitorCollection(DataSenderMonitorCollection const&);
00377     DataSenderMonitorCollection& operator=(DataSenderMonitorCollection const&);
00378 
00379     virtual void do_calculateStatistics();
00380     virtual void do_reset();
00381     virtual void do_appendInfoSpaceItems(InfoSpaceItems&);
00382     virtual void do_updateInfoSpaceItems();
00383 
00384     void faultyEventsAlarm(const uint32_t&) const;
00385     void ignoredDiscardAlarm(const uint32_t&) const;
00386 
00387     bool getAllNeededPointers(I2OChain const& i2oChain,
00388                               RBRecordPtr& rbRecordPtr,
00389                               FURecordPtr& fuRecordPtr,
00390                               OutModRecordPtr& topLevelOutModPtr,
00391                               OutModRecordPtr& rbSpecificOutModPtr,
00392                               OutModRecordPtr& fuSpecificOutModPtr);
00393 
00394     bool getRBRecordPointer(I2OChain const& i2oChain,
00395                             RBRecordPtr& rbRecordPtr);
00396 
00397     bool getFURecordPointer(I2OChain const& i2oChain,
00398                             RBRecordPtr& rbRecordPtr,
00399                             FURecordPtr& fuRecordPtr);
00400 
00401     RBRecordPtr getResourceBrokerRecord(ResourceBrokerKey const&);
00402     UniqueResourceBrokerID_t getUniqueResourceBrokerID(ResourceBrokerKey const&);
00403 
00404     FURecordPtr getFilterUnitRecord(RBRecordPtr&, FilterUnitKey const&);
00405 
00406     OutModRecordPtr getOutputModuleRecord(OutputModuleRecordMap&,
00407                                           OutputModuleKey const&);
00408 
00409     OutputModuleResultsList buildOutputModuleResults(OutputModuleRecordMap const&) const;
00410 
00411     RBResultPtr buildResourceBrokerResult(RBRecordPtr const&) const;
00412 
00413     void calcStatsForOutputModules(OutputModuleRecordMap& outputModuleMap);
00414 
00415     mutable boost::mutex collectionsMutex_;
00416 
00417     xdata::UnsignedInteger32 connectedRBs_;
00418     xdata::UnsignedInteger32 connectedEPs_;
00419     xdata::UnsignedInteger32 activeEPs_;
00420     xdata::Integer32 outstandingDataDiscards_;
00421     xdata::Integer32 outstandingDQMDiscards_;
00422     xdata::UnsignedInteger32 faultyEvents_;
00423     xdata::UnsignedInteger32 ignoredDiscards_;
00424 
00425     OutputModuleRecordMap outputModuleMap_;
00426 
00427     std::map<ResourceBrokerKey, UniqueResourceBrokerID_t> resourceBrokerIDs_;
00428     std::map<UniqueResourceBrokerID_t, RBRecordPtr> resourceBrokerMap_;
00429 
00430     const utils::Duration_t updateInterval_;
00431     AlarmHandlerPtr alarmHandler_;
00432 
00433   };
00434 
00435   bool compareRBResultPtrValues(DataSenderMonitorCollection::RBResultPtr firstValue,
00436                                 DataSenderMonitorCollection::RBResultPtr secondValue);
00437 
00438 } // namespace stor
00439 
00440 #endif // EventFilter_StorageManager_DataSenderMonitorCollection_h 
00441 
00442