00001 #ifndef SMPS_DATA_PROCESS_MANAGER_HPP
00002 #define SMPS_DATA_PROCESS_MANAGER_HPP
00003
00004
00005 #include "EventFilter/StorageManager/interface/EventServer.h"
00006 #include "EventFilter/StorageManager/interface/DQMEventServer.h"
00007 #include "EventFilter/StorageManager/interface/InitMsgCollection.h"
00008 #include "EventFilter/StorageManager/interface/DQMServiceManager.h"
00009 #include "EventFilter/StorageManager/interface/SMPerformanceMeter.h"
00010 #include "EventFilter/StorageManager/interface/ForeverCounter.h"
00011 #include "EventFilter/StorageManager/interface/RollingIntervalCounter.h"
00012 #include "FWCore/Utilities/interface/CPUTimer.h"
00013
00014 #include "IOPool/Streamer/interface/EventBuffer.h"
00015 #include "IOPool/Streamer/interface/EventMessage.h"
00016 #include "IOPool/Streamer/interface/Messages.h"
00017
00018 #include "boost/shared_ptr.hpp"
00019 #include "boost/thread/thread.hpp"
00020
00021 #include <sys/time.h>
00022 #include <string>
00023 #include <vector>
00024
00025 namespace stor
00026 {
00027
00028 class DataProcessManager
00029 {
00030 public:
00031 enum STATS_TIME_FRAME { SHORT_TERM = 0, LONG_TERM = 1 };
00032 enum STATS_TIMING_TYPE { EVENT_FETCH = 10, DQMEVENT_FETCH = 11 };
00033
00034 typedef std::vector<char> Buf;
00035 typedef std::map<std::string, unsigned int> RegConsumer_map;
00036 typedef std::map<std::string, bool> HeaderConsumer_map;
00037 typedef std::map<std::string, struct timeval> LastReqTime_map;
00038
00039 DataProcessManager();
00040
00041 ~DataProcessManager();
00042
00043 void start();
00044 void stop();
00045 void join();
00046
00047 void setEventServer(boost::shared_ptr<EventServer>& es)
00048 {
00049 eventServer_ = es;
00050 }
00051 boost::shared_ptr<EventServer>& getEventServer() { return eventServer_; }
00052
00053 void setInitMsgCollection(boost::shared_ptr<InitMsgCollection>& imColl)
00054 {
00055 initMsgCollection_ = imColl;
00056 }
00057 boost::shared_ptr<InitMsgCollection>& getInitMsgCollection() { return initMsgCollection_; }
00058
00059 void setMaxEventRequestRate(double rate);
00060 void setMaxDQMEventRequestRate(double rate);
00061
00062 void updateMinEventRequestInterval();
00063
00064 void setCollateDQM(bool collateDQM)
00065 { dqmServiceManager_->setCollateDQM(collateDQM); }
00066
00067 void setArchiveDQM(bool archiveDQM)
00068 { dqmServiceManager_->setArchiveDQM(archiveDQM); }
00069
00070 void setArchiveIntervalDQM(int archiveInterval)
00071 { dqmServiceManager_->setArchiveInterval(archiveInterval); }
00072
00073 void setPurgeTimeDQM(int purgeTimeDQM)
00074 { dqmServiceManager_->setPurgeTime(purgeTimeDQM);}
00075
00076 void setReadyTimeDQM(int readyTimeDQM)
00077 { dqmServiceManager_->setReadyTime(readyTimeDQM);}
00078
00079 void setFilePrefixDQM(std::string filePrefixDQM)
00080 { dqmServiceManager_->setFilePrefix(filePrefixDQM);}
00081
00082 void setUseCompressionDQM(bool useCompressionDQM)
00083 { dqmServiceManager_->setUseCompression(useCompressionDQM);}
00084
00085 void setCompressionLevelDQM(int compressionLevelDQM)
00086 { dqmServiceManager_->setCompressionLevel(compressionLevelDQM);}
00087
00088 void setDQMEventServer(boost::shared_ptr<DQMEventServer>& es)
00089 {
00090
00091 if (dqmServiceManager_.get() != NULL) dqmServiceManager_->setDQMEventServer(es);
00092 DQMeventServer_ = es;
00093 }
00094 boost::shared_ptr<DQMEventServer>& getDQMEventServer() { return DQMeventServer_; }
00095 boost::shared_ptr<stor::DQMServiceManager>& getDQMServiceManager() { return dqmServiceManager_; }
00096
00097 edm::EventBuffer& getCommandQueue() { return *cmd_q_; }
00098
00099 void setConsumerName(std::string s) { consumerName_ = s; }
00100 void setDQMConsumerName(std::string s) { DQMconsumerName_ = s; }
00101
00102 void addSM2Register(std::string smURL);
00103 void addDQMSM2Register(std::string DQMsmURL);
00104 bool haveRegWithDQMServer();
00105 bool haveRegWithEventServer();
00106 bool haveHeader();
00107
00108
00109 unsigned long receivedevents() { return receivedEvents_; }
00110 unsigned long receivedDQMevents() { return receivedDQMEvents_; }
00111 stor::SMPerfStats getStats() { return stats_; }
00112 void addMeasurement(unsigned long size);
00113 void setSamples(unsigned long num_samples);
00114 void setPeriod4Samples(unsigned long period4samples);
00115 unsigned long samples() { return pmeter_->getSetSamples(); }
00116 unsigned long period4samples() { return pmeter_->getPeriod4Samples(); }
00117 double totalvolumemb() { return pmeter_->totalvolumemb(); }
00118
00119 double getSampleCount(STATS_TIME_FRAME timeFrame = SHORT_TERM,
00120 STATS_TIMING_TYPE timingType = EVENT_FETCH,
00121 double currentTime = BaseCounter::getCurrentTime());
00122 double getAverageValue(STATS_TIME_FRAME timeFrame = SHORT_TERM,
00123 STATS_TIMING_TYPE timingType = EVENT_FETCH,
00124 double currentTime = BaseCounter::getCurrentTime());
00125 double getDuration(STATS_TIME_FRAME timeFrame = SHORT_TERM,
00126 STATS_TIMING_TYPE timingType = EVENT_FETCH,
00127 double currentTime = BaseCounter::getCurrentTime());
00128
00129 private:
00130 void init();
00131 void processCommands();
00132 static void run(DataProcessManager*);
00133 void getEventFromAllSM();
00134 double getTime2Wait(std::string smURL);
00135 void setTime2Now(std::string smURL);
00136 bool getOneEventFromSM(std::string smURL, double& time2wait);
00137 void getDQMEventFromAllSM();
00138 double getDQMTime2Wait(std::string smURL);
00139 void setDQMTime2Now(std::string smURL);
00140 bool getOneDQMEventFromSM(std::string smURL, double& time2wait);
00141
00142 bool registerWithAllSM();
00143 bool registerWithAllDQMSM();
00144 int registerWithSM(std::string smURL);
00145 int registerWithDQMSM(std::string smURL);
00146 bool getAnyHeaderFromSM();
00147 bool getHeaderFromAllSM();
00148 bool getHeaderFromSM(std::string smURL);
00149 void waitBetweenRegTrys();
00150
00151 edm::EventBuffer* cmd_q_;
00152
00153 bool alreadyRegistered_;
00154 bool alreadyRegisteredDQM_;
00155 bool headerRefetchRequested_;
00156 std::vector<unsigned char> buf_;
00157
00158 std::vector<std::string> smList_;
00159 RegConsumer_map smRegMap_;
00160 HeaderConsumer_map smHeaderMap_;
00161 std::vector<std::string> DQMsmList_;
00162 RegConsumer_map DQMsmRegMap_;
00163 std::string eventpage_;
00164 std::string regpage_;
00165 std::string DQMeventpage_;
00166 std::string DQMregpage_;
00167 std::string headerpage_;
00168 char subscriptionurl_[2048];
00169
00170 std::string consumerName_;
00171 std::string consumerPriority_;
00172 std::string consumerPSetString_;
00173 int headerRetryInterval_;
00174 double maxEventRequestRate_;
00175 double minEventRequestInterval_;
00176 unsigned int consumerId_;
00177 LastReqTime_map lastReqMap_;
00178 double minDQMEventRequestInterval_;
00179 unsigned int DQMconsumerId_;
00180 LastReqTime_map lastDQMReqMap_;
00181 std::string DQMconsumerName_;
00182 std::string DQMconsumerPriority_;
00183 std::string consumerTopFolderName_;
00184
00185
00186 boost::shared_ptr<stor::DQMServiceManager> dqmServiceManager_;
00187
00188 boost::shared_ptr<EventServer> eventServer_;
00189 boost::shared_ptr<DQMEventServer> DQMeventServer_;
00190 boost::shared_ptr<InitMsgCollection> initMsgCollection_;
00191
00192 boost::shared_ptr<boost::thread> me_;
00193
00194
00195 unsigned long receivedEvents_;
00196 unsigned long receivedDQMEvents_;
00197 stor::SMPerformanceMeter *pmeter_;
00198
00199 xdata::UnsignedInteger32 samples_;
00200 xdata::UnsignedInteger32 period4samples_;
00201 stor::SMPerfStats stats_;
00202
00203
00204 edm::CPUTimer eventFetchTimer_;
00205 edm::CPUTimer dqmFetchTimer_;
00206 boost::shared_ptr<ForeverCounter> ltEventFetchTimeCounter_;
00207 boost::shared_ptr<RollingIntervalCounter> stEventFetchTimeCounter_;
00208 boost::shared_ptr<ForeverCounter> ltDQMFetchTimeCounter_;
00209 boost::shared_ptr<RollingIntervalCounter> stDQMFetchTimeCounter_;
00210 };
00211 }
00212
00213 #endif
00214