00001 // $Id: EventServerProxy.h,v 1.5 2011/04/05 09:19:01 mommsen Exp $ 00003 00004 #ifndef EventFilter_StorageManager_EventServerProxy_h 00005 #define EventFilter_StorageManager_EventServerProxy_h 00006 00007 #include "EventFilter/StorageManager/interface/CurlInterface.h" 00008 #include "EventFilter/StorageManager/interface/EventConsumerRegistrationInfo.h" 00009 #include "EventFilter/StorageManager/interface/RegistrationInfoBase.h" 00010 #include "EventFilter/StorageManager/interface/Utils.h" 00011 #include "FWCore/MessageLogger/interface/MessageLogger.h" 00012 #include "FWCore/ParameterSet/interface/ParameterSet.h" 00013 #include "FWCore/Utilities/interface/Exception.h" 00014 #include "FWCore/Utilities/interface/UnixSignalHandlers.h" 00015 #include "IOPool/Streamer/interface/OtherMessage.h" 00016 00017 #include <string> 00018 00019 00020 namespace stor { 00021 00034 template<typename RegInfo> 00035 class EventServerProxy 00036 { 00037 00038 public: 00039 00040 EventServerProxy(edm::ParameterSet const&); 00041 virtual ~EventServerProxy() {}; 00042 00046 void getOneEvent(CurlInterface::Content& data); 00047 00052 bool getEventMaybe(CurlInterface::Content& data); 00053 00057 void getInitMsg(CurlInterface::Content& data); 00058 00059 00060 private: 00061 00062 void getOneEventFromEventServer(CurlInterface::Content&); 00063 void checkEvent(CurlInterface::Content&); 00064 void getInitMsgFromEventServer(CurlInterface::Content&); 00065 void checkInitMsg(CurlInterface::Content&); 00066 void registerWithEventServer(); 00067 void connectToEventServer(CurlInterface::Content&); 00068 bool extractConsumerId(CurlInterface::Content&); 00069 00070 const RegInfo regInfo_; 00071 unsigned int consumerId_; 00072 stor::utils::TimePoint_t nextRequestTime_; 00073 const stor::utils::Duration_t minEventRequestInterval_; 00074 00075 bool alreadySaidHalted_; 00076 bool alreadySaidWaiting_; 00077 unsigned int failedAttemptsToGetData_; 00078 00079 }; 00080 00081 00083 // Specializations for EventConsumerRegistrationInfo // 00085 00086 template<> 00087 inline void 00088 EventServerProxy<EventConsumerRegistrationInfo>:: 00089 getInitMsgFromEventServer(CurlInterface::Content& data) 00090 { 00091 // build the header request message to send to the event server 00092 char msgBuff[100]; 00093 OtherMessageBuilder requestMessage( 00094 &msgBuff[0], 00095 Header::HEADER_REQUEST, 00096 sizeof(char_uint32) 00097 ); 00098 uint8 *bodyPtr = requestMessage.msgBody(); 00099 convert(consumerId_, bodyPtr); 00100 00101 // send the header request 00102 stor::CurlInterface curl; 00103 CURLcode result = curl.postBinaryMessage( 00104 regInfo_.sourceURL() + "/getregdata", 00105 requestMessage.startAddress(), 00106 requestMessage.size(), 00107 data 00108 ); 00109 00110 if ( result != CURLE_OK ) 00111 { 00112 // connection failed: try to reconnect 00113 edm::LogError("EventServerProxy") << "curl perform failed for header:" 00114 << std::string(&data[0]) << std::endl 00115 << ". Trying to reconnect."; 00116 data.clear(); 00117 registerWithEventServer(); 00118 } 00119 00120 if( data.empty() ) 00121 { 00122 if(!alreadySaidWaiting_) { 00123 edm::LogInfo("EventServerProxy") << "...waiting for header from event server..."; 00124 alreadySaidWaiting_ = true; 00125 } 00126 // sleep for desired amount of time 00127 ::sleep(regInfo_.headerRetryInterval()); 00128 } 00129 else 00130 { 00131 alreadySaidWaiting_ = false; 00132 } 00133 } 00134 00135 template<> 00136 inline void 00137 EventServerProxy<EventConsumerRegistrationInfo>:: 00138 checkInitMsg(CurlInterface::Content& data) 00139 { 00140 try { 00141 HeaderView hdrView(&data[0]); 00142 if (hdrView.code() != Header::INIT) { 00143 throw cms::Exception("EventServerProxy", "readHeader"); 00144 } 00145 } 00146 catch (cms::Exception excpt) { 00147 const unsigned int MAX_DUMP_LENGTH = 1000; 00148 std::ostringstream dump; 00149 dump << "========================================" << std::endl; 00150 dump << "* Exception decoding the getregdata response from the event server!" << std::endl; 00151 if (data.size() <= MAX_DUMP_LENGTH) 00152 { 00153 dump << "* Here is the raw text that was returned:" << std::endl; 00154 dump << std::string(&data[0]) << std::endl; 00155 } 00156 else 00157 { 00158 dump << "* Here are the first " << MAX_DUMP_LENGTH << 00159 " characters of the raw text that was returned:" << std::endl; 00160 dump << std::string(&data[0], MAX_DUMP_LENGTH) << std::endl; 00161 } 00162 dump << "========================================" << std::endl; 00163 edm::LogError("EventServerProxy") << dump.str(); 00164 throw excpt; 00165 } 00166 } 00167 00168 template<> 00169 inline void 00170 EventServerProxy<EventConsumerRegistrationInfo>:: 00171 getInitMsg(CurlInterface::Content& data) 00172 { 00173 do 00174 { 00175 data.clear(); 00176 getInitMsgFromEventServer(data); 00177 } 00178 while ( !edm::shutdown_flag && data.empty() ); 00179 00180 if (edm::shutdown_flag) { 00181 throw cms::Exception("readHeader","EventServerProxy") 00182 << "The header read was aborted by a shutdown request.\n"; 00183 } 00184 00185 checkInitMsg(data); 00186 } 00187 00188 } // namespace stor 00189 00190 #endif // EventFilter_StorageManager_EventServerProxy_h 00191 00192