00001 #ifndef STOR_EVENT_SERVER_H
00002 #define STOR_EVENT_SERVER_H
00003
00018 #include <sys/time.h>
00019 #include <string>
00020 #include <vector>
00021 #include <map>
00022 #include "IOPool/Streamer/interface/MsgTools.h"
00023 #include "IOPool/Streamer/interface/EventMessage.h"
00024 #include "EventFilter/StorageManager/interface/ConsumerPipe.h"
00025 #include "EventFilter/StorageManager/interface/ForeverCounter.h"
00026 #include "EventFilter/StorageManager/interface/RollingIntervalCounter.h"
00027 #include "EventFilter/StorageManager/interface/RateLimiter.h"
00028 #include "FWCore/Utilities/interface/CPUTimer.h"
00029 #include "boost/random.hpp"
00030 #include "boost/shared_ptr.hpp"
00031 #include "boost/thread/mutex.hpp"
00032 #include "boost/thread/thread.hpp"
00033
00034 namespace stor
00035 {
00036 class EventServer
00037 {
00038 public:
00039 enum STATS_TIME_FRAME { SHORT_TERM_STATS = 0, LONG_TERM_STATS = 1 };
00040 enum STATS_SAMPLE_TYPE { INPUT_STATS = 10, OUTPUT_STATS = 11,
00041 UNIQUE_ACCEPT_STATS = 12 };
00042 enum STATS_TIMING_TYPE { CPUTIME = 20, REALTIME = 21 };
00043
00044 EventServer(double maxEventRate, double maxDataRate,
00045 std::string hltOutputSelection, bool runFairShareAlgo = false);
00046 ~EventServer();
00047
00048 void addConsumer(boost::shared_ptr<ConsumerPipe> consumer);
00049 std::map< uint32, boost::shared_ptr<ConsumerPipe> > getConsumerTable();
00050 boost::shared_ptr<ConsumerPipe> getConsumer(uint32 consumerId);
00051
00052 void processEvent(const EventMsgView &eventView);
00053 boost::shared_ptr< std::vector<char> > getEvent(uint32 consumerId);
00054 void clearQueue();
00055
00056 void setStreamSelectionTable(std::map<std::string, Strings> const& selTable);
00057 std::map<std::string, Strings> getStreamSelectionTable()
00058 {
00059 return streamSelectionTable_;
00060 }
00061 int getSelectionTableStringSize()
00062 {
00063 return selTableStringSize_;
00064 }
00065 Strings updateTriggerSelectionForStreams(Strings const& selectionList);
00066
00067 double getMaxEventRate() const { return maxEventRate_; }
00068 double getMaxDataRate() const { return maxDataRate_; }
00069 std::string getHLTOutputSelection() const { return hltOutputSelection_; }
00070
00071 long long getEventCount(STATS_TIME_FRAME timeFrame = SHORT_TERM_STATS,
00072 STATS_SAMPLE_TYPE sampleType = INPUT_STATS,
00073 uint32 outputModuleId = 0,
00074 double currentTime = ForeverCounter::getCurrentTime());
00075 double getEventRate(STATS_TIME_FRAME timeFrame = SHORT_TERM_STATS,
00076 STATS_SAMPLE_TYPE sampleType = INPUT_STATS,
00077 uint32 outputModuleId = 0,
00078 double currentTime = ForeverCounter::getCurrentTime());
00079 double getDataRate(STATS_TIME_FRAME timeFrame = SHORT_TERM_STATS,
00080 STATS_SAMPLE_TYPE sampleType = INPUT_STATS,
00081 uint32 outputModuleId = 0,
00082 double currentTime = ForeverCounter::getCurrentTime());
00083 double getDuration(STATS_TIME_FRAME timeFrame = SHORT_TERM_STATS,
00084 STATS_SAMPLE_TYPE sampleType = INPUT_STATS,
00085 uint32 outputModuleId = 0,
00086 double currentTime = ForeverCounter::getCurrentTime());
00087
00088 double getInternalTime(STATS_TIME_FRAME timeFrame = SHORT_TERM_STATS,
00089 STATS_TIMING_TYPE timingType = CPUTIME,
00090 double currentTime = ForeverCounter::getCurrentTime());
00091 double getTotalTime(STATS_TIME_FRAME timeFrame = SHORT_TERM_STATS,
00092 STATS_TIMING_TYPE timingType = CPUTIME,
00093 double currentTime = ForeverCounter::getCurrentTime());
00094 double getTimeFraction(STATS_TIME_FRAME timeFrame = SHORT_TERM_STATS,
00095 STATS_TIMING_TYPE timingType = CPUTIME,
00096 double currentTime = ForeverCounter::getCurrentTime());
00097
00098 private:
00099
00100 double maxEventRate_;
00101 double maxDataRate_;
00102 std::string hltOutputSelection_;
00103 uint32 hltOutputModuleId_;
00104 bool runFairShareAlgo_;
00105
00106
00107 boost::shared_ptr<RateLimiter> rateLimiter_;
00108
00109
00110 int disconnectedConsumerTestCounter_;
00111
00112
00113 std::map< uint32, boost::shared_ptr<ConsumerPipe> > consumerTable_;
00114
00115
00116 std::map<std::string, Strings> streamSelectionTable_;
00117 int selTableStringSize_;
00118
00119
00120 std::map<uint32, boost::shared_ptr<ForeverCounter> > ltInputCounters_;
00121 std::map<uint32, boost::shared_ptr<RollingIntervalCounter> > stInputCounters_;
00122 std::map<uint32, boost::shared_ptr<ForeverCounter> > ltAcceptCounters_;
00123 std::map<uint32, boost::shared_ptr<RollingIntervalCounter> > stAcceptCounters_;
00124 std::map<uint32, boost::shared_ptr<ForeverCounter> > ltOutputCounters_;
00125 std::map<uint32, boost::shared_ptr<RollingIntervalCounter> > stOutputCounters_;
00126 edm::CPUTimer outsideTimer_;
00127 edm::CPUTimer insideTimer_;
00128 boost::shared_ptr<ForeverCounter> longTermInsideCPUTimeCounter_;
00129 boost::shared_ptr<RollingIntervalCounter> shortTermInsideCPUTimeCounter_;
00130 boost::shared_ptr<ForeverCounter> longTermInsideRealTimeCounter_;
00131 boost::shared_ptr<RollingIntervalCounter> shortTermInsideRealTimeCounter_;
00132 boost::shared_ptr<ForeverCounter> longTermOutsideCPUTimeCounter_;
00133 boost::shared_ptr<RollingIntervalCounter> shortTermOutsideCPUTimeCounter_;
00134 boost::shared_ptr<ForeverCounter> longTermOutsideRealTimeCounter_;
00135 boost::shared_ptr<RollingIntervalCounter> shortTermOutsideRealTimeCounter_;
00136
00137 boost::mt19937 baseGenerator_;
00138 boost::shared_ptr< boost::uniform_01<boost::mt19937> > generator_;
00139 };
00140 }
00141
00142 #endif