CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_13_patch3/src/EventFilter/StorageManager/interface/EventServerProxy.h

Go to the documentation of this file.
00001 // $Id: EventServerProxy.h,v 1.8 2012/04/23 08:41:27 mommsen Exp $
00003 
00004 #ifndef EventFilter_StorageManager_EventServerProxy_h
00005 #define EventFilter_StorageManager_EventServerProxy_h
00006 
00007 #include "EventFilter/StorageManager/interface/CurlInterface.h"
00008 #include "EventFilter/StorageManager/interface/EventConsumerRegistrationInfo.h"
00009 #include "EventFilter/StorageManager/interface/RegistrationInfoBase.h"
00010 #include "EventFilter/StorageManager/interface/Utils.h"
00011 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00012 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00013 #include "FWCore/Utilities/interface/Exception.h"
00014 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00015 #include "IOPool/Streamer/interface/MsgTools.h"
00016 #include "IOPool/Streamer/interface/OtherMessage.h"
00017 
00018 #include <string>
00019 
00020 
00021 namespace stor {
00022 
00035   template<typename RegInfo>
00036   class EventServerProxy
00037   {
00038 
00039   public:
00040 
00041     EventServerProxy(edm::ParameterSet const&);
00042     virtual ~EventServerProxy() {};
00043 
00047     void reconnect();
00048 
00052     void getOneEvent(CurlInterface::Content& data);
00053 
00058     bool getEventMaybe(CurlInterface::Content& data);
00059 
00063     void getInitMsg(CurlInterface::Content& data);
00064     
00065     
00066   private:
00067 
00068     void getOneEventFromEventServer(CurlInterface::Content&);
00069     void checkEvent(CurlInterface::Content&);
00070     void getInitMsgFromEventServer(CurlInterface::Content&);
00071     void checkInitMsg(CurlInterface::Content&);
00072     void registerWithEventServer();
00073     void connectToEventServer(CurlInterface::Content&);
00074     bool extractConsumerId(CurlInterface::Content&);
00075 
00076     const RegInfo regInfo_;
00077     unsigned int consumerId_;
00078     stor::utils::TimePoint_t nextRequestTime_;
00079     const stor::utils::Duration_t minEventRequestInterval_;
00080     
00081     bool alreadySaidHalted_;
00082     bool alreadySaidWaiting_;
00083     unsigned int failedAttemptsToGetData_;
00084     
00085   };
00086 
00087 
00089   // Specializations for EventConsumerRegistrationInfo //
00091 
00092   template<>
00093   inline void
00094   EventServerProxy<EventConsumerRegistrationInfo>::
00095   getInitMsgFromEventServer(CurlInterface::Content& data)
00096   {
00097     // build the header request message to send to the event server
00098     char msgBuff[100];
00099     OtherMessageBuilder requestMessage(
00100       &msgBuff[0],
00101       Header::HEADER_REQUEST,
00102       sizeof(char_uint32)
00103     );
00104     uint8 *bodyPtr = requestMessage.msgBody();
00105     convert(consumerId_, bodyPtr);
00106     
00107     // send the header request
00108     stor::CurlInterfacePtr curl = stor::CurlInterface::getInterface();
00109     CURLcode result = curl->postBinaryMessage(
00110       regInfo_.sourceURL() + "/getregdata",
00111       requestMessage.startAddress(),
00112       requestMessage.size(),
00113       data
00114     );
00115     
00116     if ( result != CURLE_OK )
00117     {
00118       // connection failed: try to reconnect
00119       edm::LogError("EventServerProxy") << "curl perform failed for header:"
00120         << std::string(&data[0]) << std::endl
00121         << ". Trying to reconnect.";
00122       data.clear();
00123       registerWithEventServer();
00124     }
00125     
00126     if( data.empty() )
00127     {
00128       if(!alreadySaidWaiting_) {
00129         edm::LogInfo("EventServerProxy") << "...waiting for header from event server...";
00130         alreadySaidWaiting_ = true;
00131       }
00132       // sleep for desired amount of time
00133       ::sleep(regInfo_.headerRetryInterval());
00134     }
00135     else
00136     {
00137       alreadySaidWaiting_ = false;
00138     }
00139   }
00140   
00141   template<>
00142   inline void
00143   EventServerProxy<EventConsumerRegistrationInfo>::
00144   checkInitMsg(CurlInterface::Content& data)
00145   {
00146     try {
00147       HeaderView hdrView(&data[0]);
00148       if (hdrView.code() != Header::INIT) {
00149         throw cms::Exception("EventServerProxy", "readHeader");
00150       }
00151     }
00152     catch (cms::Exception excpt) {
00153       const unsigned int MAX_DUMP_LENGTH = 1000;
00154       std::ostringstream dump;
00155       dump << "========================================" << std::endl;
00156       dump << "* Exception decoding the getregdata response from the event server!" << std::endl;
00157       if (data.size() <= MAX_DUMP_LENGTH)
00158       {
00159         dump << "* Here is the raw text that was returned:" << std::endl;
00160         dump << std::string(&data[0]) << std::endl;
00161       }
00162       else
00163       {
00164         dump << "* Here are the first " << MAX_DUMP_LENGTH <<
00165           " characters of the raw text that was returned:" << std::endl;
00166         dump << std::string(&data[0], MAX_DUMP_LENGTH) << std::endl;
00167       }
00168       dump << "========================================" << std::endl;
00169       edm::LogError("EventServerProxy") << dump.str();
00170       throw excpt;
00171     }
00172   }
00173 
00174   template<>
00175   inline void
00176   EventServerProxy<EventConsumerRegistrationInfo>::
00177   getInitMsg(CurlInterface::Content& data)
00178   {
00179     do
00180     {
00181       data.clear();
00182       getInitMsgFromEventServer(data);
00183     }
00184     while ( !edm::shutdown_flag && data.empty() );
00185     
00186     if (edm::shutdown_flag) {
00187       throw cms::Exception("readHeader","EventServerProxy")
00188         << "The header read was aborted by a shutdown request.\n";
00189     }
00190     
00191     checkInitMsg(data);
00192   }
00193 
00194 } // namespace stor
00195 
00196 #endif // EventFilter_StorageManager_EventServerProxy_h
00197 
00198