00001 // $Id: EventServerProxy.h,v 1.8 2012/04/23 08:41:27 mommsen Exp $ 00003 00004 #ifndef EventFilter_StorageManager_EventServerProxy_h 00005 #define EventFilter_StorageManager_EventServerProxy_h 00006 00007 #include "EventFilter/StorageManager/interface/CurlInterface.h" 00008 #include "EventFilter/StorageManager/interface/EventConsumerRegistrationInfo.h" 00009 #include "EventFilter/StorageManager/interface/RegistrationInfoBase.h" 00010 #include "EventFilter/StorageManager/interface/Utils.h" 00011 #include "FWCore/MessageLogger/interface/MessageLogger.h" 00012 #include "FWCore/ParameterSet/interface/ParameterSet.h" 00013 #include "FWCore/Utilities/interface/Exception.h" 00014 #include "FWCore/Utilities/interface/UnixSignalHandlers.h" 00015 #include "IOPool/Streamer/interface/MsgTools.h" 00016 #include "IOPool/Streamer/interface/OtherMessage.h" 00017 00018 #include <string> 00019 00020 00021 namespace stor { 00022 00035 template<typename RegInfo> 00036 class EventServerProxy 00037 { 00038 00039 public: 00040 00041 EventServerProxy(edm::ParameterSet const&); 00042 virtual ~EventServerProxy() {}; 00043 00047 void reconnect(); 00048 00052 void getOneEvent(CurlInterface::Content& data); 00053 00058 bool getEventMaybe(CurlInterface::Content& data); 00059 00063 void getInitMsg(CurlInterface::Content& data); 00064 00065 00066 private: 00067 00068 void getOneEventFromEventServer(CurlInterface::Content&); 00069 void checkEvent(CurlInterface::Content&); 00070 void getInitMsgFromEventServer(CurlInterface::Content&); 00071 void checkInitMsg(CurlInterface::Content&); 00072 void registerWithEventServer(); 00073 void connectToEventServer(CurlInterface::Content&); 00074 bool extractConsumerId(CurlInterface::Content&); 00075 00076 const RegInfo regInfo_; 00077 unsigned int consumerId_; 00078 stor::utils::TimePoint_t nextRequestTime_; 00079 const stor::utils::Duration_t minEventRequestInterval_; 00080 00081 bool alreadySaidHalted_; 00082 bool alreadySaidWaiting_; 00083 unsigned int failedAttemptsToGetData_; 00084 00085 }; 00086 00087 00089 // Specializations for EventConsumerRegistrationInfo // 00091 00092 template<> 00093 inline void 00094 EventServerProxy<EventConsumerRegistrationInfo>:: 00095 getInitMsgFromEventServer(CurlInterface::Content& data) 00096 { 00097 // build the header request message to send to the event server 00098 char msgBuff[100]; 00099 OtherMessageBuilder requestMessage( 00100 &msgBuff[0], 00101 Header::HEADER_REQUEST, 00102 sizeof(char_uint32) 00103 ); 00104 uint8 *bodyPtr = requestMessage.msgBody(); 00105 convert(consumerId_, bodyPtr); 00106 00107 // send the header request 00108 stor::CurlInterfacePtr curl = stor::CurlInterface::getInterface(); 00109 CURLcode result = curl->postBinaryMessage( 00110 regInfo_.sourceURL() + "/getregdata", 00111 requestMessage.startAddress(), 00112 requestMessage.size(), 00113 data 00114 ); 00115 00116 if ( result != CURLE_OK ) 00117 { 00118 // connection failed: try to reconnect 00119 edm::LogError("EventServerProxy") << "curl perform failed for header:" 00120 << std::string(&data[0]) << std::endl 00121 << ". Trying to reconnect."; 00122 data.clear(); 00123 registerWithEventServer(); 00124 } 00125 00126 if( data.empty() ) 00127 { 00128 if(!alreadySaidWaiting_) { 00129 edm::LogInfo("EventServerProxy") << "...waiting for header from event server..."; 00130 alreadySaidWaiting_ = true; 00131 } 00132 // sleep for desired amount of time 00133 ::sleep(regInfo_.headerRetryInterval()); 00134 } 00135 else 00136 { 00137 alreadySaidWaiting_ = false; 00138 } 00139 } 00140 00141 template<> 00142 inline void 00143 EventServerProxy<EventConsumerRegistrationInfo>:: 00144 checkInitMsg(CurlInterface::Content& data) 00145 { 00146 try { 00147 HeaderView hdrView(&data[0]); 00148 if (hdrView.code() != Header::INIT) { 00149 throw cms::Exception("EventServerProxy", "readHeader"); 00150 } 00151 } 00152 catch (cms::Exception excpt) { 00153 const unsigned int MAX_DUMP_LENGTH = 1000; 00154 std::ostringstream dump; 00155 dump << "========================================" << std::endl; 00156 dump << "* Exception decoding the getregdata response from the event server!" << std::endl; 00157 if (data.size() <= MAX_DUMP_LENGTH) 00158 { 00159 dump << "* Here is the raw text that was returned:" << std::endl; 00160 dump << std::string(&data[0]) << std::endl; 00161 } 00162 else 00163 { 00164 dump << "* Here are the first " << MAX_DUMP_LENGTH << 00165 " characters of the raw text that was returned:" << std::endl; 00166 dump << std::string(&data[0], MAX_DUMP_LENGTH) << std::endl; 00167 } 00168 dump << "========================================" << std::endl; 00169 edm::LogError("EventServerProxy") << dump.str(); 00170 throw excpt; 00171 } 00172 } 00173 00174 template<> 00175 inline void 00176 EventServerProxy<EventConsumerRegistrationInfo>:: 00177 getInitMsg(CurlInterface::Content& data) 00178 { 00179 do 00180 { 00181 data.clear(); 00182 getInitMsgFromEventServer(data); 00183 } 00184 while ( !edm::shutdown_flag && data.empty() ); 00185 00186 if (edm::shutdown_flag) { 00187 throw cms::Exception("readHeader","EventServerProxy") 00188 << "The header read was aborted by a shutdown request.\n"; 00189 } 00190 00191 checkInitMsg(data); 00192 } 00193 00194 } // namespace stor 00195 00196 #endif // EventFilter_StorageManager_EventServerProxy_h 00197 00198