CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch13/src/EventFilter/StorageManager/interface/EventServerProxy.h

Go to the documentation of this file.
00001 // $Id: EventServerProxy.h,v 1.5 2011/04/05 09:19:01 mommsen Exp $
00003 
00004 #ifndef EventFilter_StorageManager_EventServerProxy_h
00005 #define EventFilter_StorageManager_EventServerProxy_h
00006 
00007 #include "EventFilter/StorageManager/interface/CurlInterface.h"
00008 #include "EventFilter/StorageManager/interface/EventConsumerRegistrationInfo.h"
00009 #include "EventFilter/StorageManager/interface/RegistrationInfoBase.h"
00010 #include "EventFilter/StorageManager/interface/Utils.h"
00011 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00012 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00013 #include "FWCore/Utilities/interface/Exception.h"
00014 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00015 #include "IOPool/Streamer/interface/OtherMessage.h"
00016 
00017 #include <string>
00018 
00019 
00020 namespace stor {
00021 
00034   template<typename RegInfo>
00035   class EventServerProxy
00036   {
00037 
00038   public:
00039 
00040     EventServerProxy(edm::ParameterSet const&);
00041     virtual ~EventServerProxy() {};
00042 
00046     void getOneEvent(CurlInterface::Content& data);
00047 
00052     bool getEventMaybe(CurlInterface::Content& data);
00053 
00057     void getInitMsg(CurlInterface::Content& data);
00058     
00059     
00060   private:
00061 
00062     void getOneEventFromEventServer(CurlInterface::Content&);
00063     void checkEvent(CurlInterface::Content&);
00064     void getInitMsgFromEventServer(CurlInterface::Content&);
00065     void checkInitMsg(CurlInterface::Content&);
00066     void registerWithEventServer();
00067     void connectToEventServer(CurlInterface::Content&);
00068     bool extractConsumerId(CurlInterface::Content&);
00069 
00070     const RegInfo regInfo_;
00071     unsigned int consumerId_;
00072     stor::utils::TimePoint_t nextRequestTime_;
00073     const stor::utils::Duration_t minEventRequestInterval_;
00074     
00075     bool alreadySaidHalted_;
00076     bool alreadySaidWaiting_;
00077     unsigned int failedAttemptsToGetData_;
00078     
00079   };
00080 
00081 
00083   // Specializations for EventConsumerRegistrationInfo //
00085 
00086   template<>
00087   inline void
00088   EventServerProxy<EventConsumerRegistrationInfo>::
00089   getInitMsgFromEventServer(CurlInterface::Content& data)
00090   {
00091     // build the header request message to send to the event server
00092     char msgBuff[100];
00093     OtherMessageBuilder requestMessage(
00094       &msgBuff[0],
00095       Header::HEADER_REQUEST,
00096       sizeof(char_uint32)
00097     );
00098     uint8 *bodyPtr = requestMessage.msgBody();
00099     convert(consumerId_, bodyPtr);
00100     
00101     // send the header request
00102     stor::CurlInterface curl;
00103     CURLcode result = curl.postBinaryMessage(
00104       regInfo_.sourceURL() + "/getregdata",
00105       requestMessage.startAddress(),
00106       requestMessage.size(),
00107       data
00108     );
00109     
00110     if ( result != CURLE_OK )
00111     {
00112       // connection failed: try to reconnect
00113       edm::LogError("EventServerProxy") << "curl perform failed for header:"
00114         << std::string(&data[0]) << std::endl
00115         << ". Trying to reconnect.";
00116       data.clear();
00117       registerWithEventServer();
00118     }
00119     
00120     if( data.empty() )
00121     {
00122       if(!alreadySaidWaiting_) {
00123         edm::LogInfo("EventServerProxy") << "...waiting for header from event server...";
00124         alreadySaidWaiting_ = true;
00125       }
00126       // sleep for desired amount of time
00127       ::sleep(regInfo_.headerRetryInterval());
00128     }
00129     else
00130     {
00131       alreadySaidWaiting_ = false;
00132     }
00133   }
00134   
00135   template<>
00136   inline void
00137   EventServerProxy<EventConsumerRegistrationInfo>::
00138   checkInitMsg(CurlInterface::Content& data)
00139   {
00140     try {
00141       HeaderView hdrView(&data[0]);
00142       if (hdrView.code() != Header::INIT) {
00143         throw cms::Exception("EventServerProxy", "readHeader");
00144       }
00145     }
00146     catch (cms::Exception excpt) {
00147       const unsigned int MAX_DUMP_LENGTH = 1000;
00148       std::ostringstream dump;
00149       dump << "========================================" << std::endl;
00150       dump << "* Exception decoding the getregdata response from the event server!" << std::endl;
00151       if (data.size() <= MAX_DUMP_LENGTH)
00152       {
00153         dump << "* Here is the raw text that was returned:" << std::endl;
00154         dump << std::string(&data[0]) << std::endl;
00155       }
00156       else
00157       {
00158         dump << "* Here are the first " << MAX_DUMP_LENGTH <<
00159           " characters of the raw text that was returned:" << std::endl;
00160         dump << std::string(&data[0], MAX_DUMP_LENGTH) << std::endl;
00161       }
00162       dump << "========================================" << std::endl;
00163       edm::LogError("EventServerProxy") << dump.str();
00164       throw excpt;
00165     }
00166   }
00167 
00168   template<>
00169   inline void
00170   EventServerProxy<EventConsumerRegistrationInfo>::
00171   getInitMsg(CurlInterface::Content& data)
00172   {
00173     do
00174     {
00175       data.clear();
00176       getInitMsgFromEventServer(data);
00177     }
00178     while ( !edm::shutdown_flag && data.empty() );
00179     
00180     if (edm::shutdown_flag) {
00181       throw cms::Exception("readHeader","EventServerProxy")
00182         << "The header read was aborted by a shutdown request.\n";
00183     }
00184     
00185     checkInitMsg(data);
00186   }
00187 
00188 } // namespace stor
00189 
00190 #endif // EventFilter_StorageManager_EventServerProxy_h
00191 
00192