00001 // $Id: EventRetriever.h,v 1.2 2011/03/07 15:41:54 mommsen Exp $ 00003 00004 #ifndef EventFilter_SMProxyServer_EventRetriever_h 00005 #define EventFilter_SMProxyServer_EventRetriever_h 00006 00007 #include "EventFilter/SMProxyServer/interface/Configuration.h" 00008 #include "EventFilter/SMProxyServer/interface/ConnectionID.h" 00009 #include "EventFilter/SMProxyServer/interface/DataRetrieverMonitorCollection.h" 00010 #include "EventFilter/SMProxyServer/interface/DQMEventMsg.h" 00011 #include "EventFilter/SMProxyServer/interface/EventQueueCollection.h" 00012 #include "EventFilter/StorageManager/interface/DQMEventStore.h" 00013 #include "EventFilter/StorageManager/interface/EventServerProxy.h" 00014 #include "EventFilter/StorageManager/interface/EventConsumerRegistrationInfo.h" 00015 #include "EventFilter/StorageManager/interface/QueueID.h" 00016 #include "EventFilter/StorageManager/interface/Utils.h" 00017 #include "FWCore/ParameterSet/interface/ParameterSet.h" 00018 00019 #include <boost/scoped_ptr.hpp> 00020 #include <boost/shared_ptr.hpp> 00021 #include <boost/thread/thread.hpp> 00022 00023 #include <string> 00024 #include <vector> 00025 00026 00027 00028 namespace smproxy { 00029 00030 class StateMachine; 00031 00032 00041 template<class RegInfo, class QueueCollectionPtr> 00042 class EventRetriever 00043 { 00044 public: 00045 00046 typedef boost::shared_ptr<RegInfo> RegInfoPtr; 00047 00048 EventRetriever 00049 ( 00050 StateMachine*, 00051 const RegInfoPtr 00052 ); 00053 00054 ~EventRetriever(); 00055 00059 void addConsumer(const RegInfoPtr); 00060 00064 void stop(); 00065 00069 const stor::QueueIDs& getQueueIDs() const 00070 { return queueIDs_; } 00071 00075 size_t getConnectedSMCount() const 00076 { return eventServers_.size(); } 00077 00078 00079 private: 00080 00081 void activity(const edm::ParameterSet&); 00082 void doIt(const edm::ParameterSet&); 00083 void do_stop(); 00084 bool connect(const edm::ParameterSet&); 00085 void connectToSM(const std::string& sourceURL, const edm::ParameterSet&); 00086 bool openConnection(const ConnectionID&, const RegInfoPtr); 00087 bool tryToReconnect(); 00088 void getInitMsg(); 00089 bool getNextEvent(stor::CurlInterface::Content&); 00090 bool adjustMinEventRequestInterval(const stor::utils::Duration_t&); 00091 void updateConsumersSetting(const stor::utils::Duration_t&); 00092 bool anyActiveConsumers(QueueCollectionPtr) const; 00093 void disconnectFromCurrentSM(); 00094 void processCompletedTopLevelFolders(); 00095 00096 //Prevent copying of the EventRetriever 00097 EventRetriever(EventRetriever const&); 00098 EventRetriever& operator=(EventRetriever const&); 00099 00100 StateMachine* stateMachine_; 00101 const DataRetrieverParams dataRetrieverParams_; 00102 DataRetrieverMonitorCollection& dataRetrieverMonitorCollection_; 00103 00104 stor::utils::TimePoint_t nextRequestTime_; 00105 stor::utils::Duration_t minEventRequestInterval_; 00106 00107 boost::scoped_ptr<boost::thread> thread_; 00108 static size_t retrieverCount_; 00109 size_t instance_; 00110 00111 typedef stor::EventServerProxy<RegInfo> EventServer; 00112 typedef boost::shared_ptr<EventServer> EventServerPtr; 00113 typedef std::map<ConnectionID, EventServerPtr> EventServers; 00114 EventServers eventServers_; 00115 typename EventServers::iterator nextSMtoUse_; 00116 00117 typedef std::vector<ConnectionID> ConnectionIDs; 00118 ConnectionIDs connectionIDs_; 00119 mutable boost::mutex connectionIDsLock_; 00120 00121 stor::utils::TimePoint_t nextReconnectTry_; 00122 00123 stor::QueueIDs queueIDs_; 00124 mutable boost::mutex queueIDsLock_; 00125 00126 stor::DQMEventStore<DQMEventMsg, 00127 EventRetriever<RegInfo,QueueCollectionPtr>, 00128 StateMachine 00129 > dqmEventStore_; 00130 00131 }; 00132 00133 } // namespace smproxy 00134 00135 #endif // EventFilter_SMProxyServer_EventRetriever_h 00136 00137