CMS 3D CMS Logo

DataProcessManager.h

Go to the documentation of this file.
00001 #ifndef SMPS_DATA_PROCESS_MANAGER_HPP
00002 #define SMPS_DATA_PROCESS_MANAGER_HPP
00003 // $Id: DataProcessManager.h,v 1.10 2008/10/14 15:02:18 hcheung Exp $
00004 
00005 #include "EventFilter/StorageManager/interface/EventServer.h"
00006 #include "EventFilter/StorageManager/interface/DQMEventServer.h"
00007 #include "EventFilter/StorageManager/interface/InitMsgCollection.h"
00008 #include "EventFilter/StorageManager/interface/DQMServiceManager.h"
00009 #include "EventFilter/StorageManager/interface/SMPerformanceMeter.h"
00010 #include "EventFilter/StorageManager/interface/ForeverCounter.h"
00011 #include "EventFilter/StorageManager/interface/RollingIntervalCounter.h"
00012 #include "FWCore/Utilities/interface/CPUTimer.h"
00013 
00014 #include "IOPool/Streamer/interface/EventBuffer.h"
00015 #include "IOPool/Streamer/interface/EventMessage.h"
00016 #include "IOPool/Streamer/interface/Messages.h"
00017 
00018 #include "boost/shared_ptr.hpp"
00019 #include "boost/thread/thread.hpp"
00020 
00021 #include <sys/time.h>
00022 #include <string>
00023 #include <vector>
00024 
00025 namespace stor
00026 {
00027 
00028   class DataProcessManager
00029   {
00030   public:
00031     enum STATS_TIME_FRAME { SHORT_TERM = 0, LONG_TERM = 1 };
00032     enum STATS_TIMING_TYPE { EVENT_FETCH = 10, DQMEVENT_FETCH = 11 };
00033 
00034     typedef std::vector<char> Buf;
00035     typedef std::map<std::string, unsigned int> RegConsumer_map;
00036     typedef std::map<std::string, bool> HeaderConsumer_map;
00037     typedef std::map<std::string, struct timeval> LastReqTime_map;
00038 
00039     DataProcessManager();
00040 
00041     ~DataProcessManager();
00042 
00043     void start();
00044     void stop();
00045     void join();
00046 
00047     void setEventServer(boost::shared_ptr<EventServer>& es)
00048     {
00049       eventServer_ = es;
00050     }
00051     boost::shared_ptr<EventServer>& getEventServer() { return eventServer_; }
00052 
00053     void setInitMsgCollection(boost::shared_ptr<InitMsgCollection>& imColl)
00054     {
00055       initMsgCollection_ = imColl;
00056     }
00057     boost::shared_ptr<InitMsgCollection>& getInitMsgCollection() { return initMsgCollection_; }
00058 
00059     void setMaxEventRequestRate(double rate);
00060     void setMaxDQMEventRequestRate(double rate);
00061 
00062     void updateMinEventRequestInterval();
00063 
00064     void setCollateDQM(bool collateDQM)
00065     { dqmServiceManager_->setCollateDQM(collateDQM); }
00066 
00067     void setArchiveDQM(bool archiveDQM)
00068     { dqmServiceManager_->setArchiveDQM(archiveDQM); }
00069 
00070     void setArchiveIntervalDQM(int archiveInterval)
00071     { dqmServiceManager_->setArchiveInterval(archiveInterval); }
00072 
00073     void setPurgeTimeDQM(int purgeTimeDQM)
00074     { dqmServiceManager_->setPurgeTime(purgeTimeDQM);}
00075 
00076     void setReadyTimeDQM(int readyTimeDQM)
00077     { dqmServiceManager_->setReadyTime(readyTimeDQM);}
00078 
00079     void setFilePrefixDQM(std::string filePrefixDQM)
00080     { dqmServiceManager_->setFilePrefix(filePrefixDQM);}
00081 
00082     void setUseCompressionDQM(bool useCompressionDQM)
00083     { dqmServiceManager_->setUseCompression(useCompressionDQM);}
00084 
00085     void setCompressionLevelDQM(int compressionLevelDQM)
00086     { dqmServiceManager_->setCompressionLevel(compressionLevelDQM);}
00087 
00088     void setDQMEventServer(boost::shared_ptr<DQMEventServer>& es)
00089     {
00090       // The auto_ptr still owns the memory after this get()
00091       if (dqmServiceManager_.get() != NULL) dqmServiceManager_->setDQMEventServer(es);
00092       DQMeventServer_ = es;
00093     }
00094     boost::shared_ptr<DQMEventServer>& getDQMEventServer() { return DQMeventServer_; }
00095     boost::shared_ptr<stor::DQMServiceManager>& getDQMServiceManager() { return dqmServiceManager_; }
00096 
00097     edm::EventBuffer& getCommandQueue() { return *cmd_q_; }
00098 
00099     void setConsumerName(std::string s) { consumerName_ = s; }
00100     void setDQMConsumerName(std::string s) { DQMconsumerName_ = s; }
00101 
00102     void addSM2Register(std::string smURL);
00103     void addDQMSM2Register(std::string DQMsmURL);
00104     bool haveRegWithDQMServer();
00105     bool haveRegWithEventServer();
00106     bool haveHeader();
00107 
00108     // *** for performance measurements
00109     unsigned long receivedevents() { return receivedEvents_; }
00110     unsigned long receivedDQMevents() { return receivedDQMEvents_; }
00111     stor::SMPerfStats getStats() { return stats_; }
00112     void addMeasurement(unsigned long size);
00113     void setSamples(unsigned long num_samples);
00114     void setPeriod4Samples(unsigned long period4samples);
00115     unsigned long samples() { return pmeter_->getSetSamples(); }
00116     unsigned long period4samples() { return pmeter_->getPeriod4Samples(); }
00117     double totalvolumemb() { return pmeter_->totalvolumemb(); }
00118 
00119     double getSampleCount(STATS_TIME_FRAME timeFrame = SHORT_TERM,
00120                           STATS_TIMING_TYPE timingType = EVENT_FETCH,
00121                           double currentTime = BaseCounter::getCurrentTime());
00122     double getAverageValue(STATS_TIME_FRAME timeFrame = SHORT_TERM,
00123                            STATS_TIMING_TYPE timingType = EVENT_FETCH,
00124                            double currentTime = BaseCounter::getCurrentTime());
00125     double getDuration(STATS_TIME_FRAME timeFrame = SHORT_TERM,
00126                        STATS_TIMING_TYPE timingType = EVENT_FETCH,
00127                        double currentTime = BaseCounter::getCurrentTime());
00128 
00129   private:
00130     void init();
00131     void processCommands();
00132     static void run(DataProcessManager*);
00133     void getEventFromAllSM();
00134     double getTime2Wait(std::string smURL);
00135     void setTime2Now(std::string smURL);
00136     bool getOneEventFromSM(std::string smURL, double& time2wait);
00137     void getDQMEventFromAllSM();
00138     double getDQMTime2Wait(std::string smURL);
00139     void setDQMTime2Now(std::string smURL);
00140     bool getOneDQMEventFromSM(std::string smURL, double& time2wait);
00141 
00142     bool registerWithAllSM();
00143     bool registerWithAllDQMSM();
00144     int registerWithSM(std::string smURL);
00145     int registerWithDQMSM(std::string smURL);
00146     bool getAnyHeaderFromSM();
00147     bool getHeaderFromAllSM();
00148     bool getHeaderFromSM(std::string smURL);
00149     void waitBetweenRegTrys();
00150 
00151     edm::EventBuffer* cmd_q_;
00152 
00153     bool alreadyRegistered_;
00154     bool alreadyRegisteredDQM_;
00155     bool headerRefetchRequested_;
00156     std::vector<unsigned char> buf_;
00157 
00158     std::vector<std::string> smList_;
00159     RegConsumer_map smRegMap_;
00160     HeaderConsumer_map smHeaderMap_;
00161     std::vector<std::string> DQMsmList_;
00162     RegConsumer_map DQMsmRegMap_;
00163     std::string eventpage_;
00164     std::string regpage_;
00165     std::string DQMeventpage_;
00166     std::string DQMregpage_;
00167     std::string headerpage_;
00168     char subscriptionurl_[2048];
00169 
00170     std::string consumerName_;
00171     std::string consumerPriority_;
00172     std::string consumerPSetString_;
00173     int headerRetryInterval_; // seconds
00174     double maxEventRequestRate_;
00175     double minEventRequestInterval_;
00176     unsigned int consumerId_;
00177     LastReqTime_map lastReqMap_;
00178     double minDQMEventRequestInterval_;
00179     unsigned int DQMconsumerId_;
00180     LastReqTime_map lastDQMReqMap_;
00181     std::string DQMconsumerName_;
00182     std::string DQMconsumerPriority_;
00183     std::string consumerTopFolderName_;
00184 
00185     //std::auto_ptr<stor::DQMServiceManager> dqmServiceManager_;
00186     boost::shared_ptr<stor::DQMServiceManager> dqmServiceManager_;
00187 
00188     boost::shared_ptr<EventServer> eventServer_;
00189     boost::shared_ptr<DQMEventServer> DQMeventServer_;
00190     boost::shared_ptr<InitMsgCollection> initMsgCollection_;
00191 
00192     boost::shared_ptr<boost::thread> me_;
00193 
00194     // *** for performance measurements
00195     unsigned long receivedEvents_;
00196     unsigned long receivedDQMEvents_;
00197     stor::SMPerformanceMeter *pmeter_;
00198     // *** measurements for last set of samples
00199     xdata::UnsignedInteger32 samples_; // number of samples per measurement
00200     xdata::UnsignedInteger32 period4samples_; // time period for measurement
00201     stor::SMPerfStats stats_;
00202 
00203     // statistics (long term and short term)
00204     edm::CPUTimer eventFetchTimer_;
00205     edm::CPUTimer dqmFetchTimer_;
00206     boost::shared_ptr<ForeverCounter> ltEventFetchTimeCounter_;
00207     boost::shared_ptr<RollingIntervalCounter> stEventFetchTimeCounter_;
00208     boost::shared_ptr<ForeverCounter> ltDQMFetchTimeCounter_;
00209     boost::shared_ptr<RollingIntervalCounter> stDQMFetchTimeCounter_;
00210   };
00211 }
00212 
00213 #endif
00214 

Generated on Tue Jun 9 17:34:51 2009 for CMSSW by  doxygen 1.5.4