CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EventServerProxy.h
Go to the documentation of this file.
1 // $Id: EventServerProxy.h,v 1.8 2012/04/23 08:41:27 mommsen Exp $
3 
4 #ifndef EventFilter_StorageManager_EventServerProxy_h
5 #define EventFilter_StorageManager_EventServerProxy_h
6 
17 
18 #include <string>
19 
20 
21 namespace stor {
22 
35  template<typename RegInfo>
37  {
38 
39  public:
40 
42  virtual ~EventServerProxy() {};
43 
47  void reconnect();
48 
53 
59 
64 
65 
66  private:
67 
75 
76  const RegInfo regInfo_;
77  unsigned int consumerId_;
80 
84 
85  };
86 
87 
89  // Specializations for EventConsumerRegistrationInfo //
91 
92  template<>
93  inline void
96  {
97  // build the header request message to send to the event server
98  char msgBuff[100];
99  OtherMessageBuilder requestMessage(
100  &msgBuff[0],
102  sizeof(char_uint32)
103  );
104  uint8 *bodyPtr = requestMessage.msgBody();
105  convert(consumerId_, bodyPtr);
106 
107  // send the header request
109  CURLcode result = curl->postBinaryMessage(
110  regInfo_.sourceURL() + "/getregdata",
111  requestMessage.startAddress(),
112  requestMessage.size(),
113  data
114  );
115 
116  if ( result != CURLE_OK )
117  {
118  // connection failed: try to reconnect
119  edm::LogError("EventServerProxy") << "curl perform failed for header:"
120  << std::string(&data[0]) << std::endl
121  << ". Trying to reconnect.";
122  data.clear();
123  registerWithEventServer();
124  }
125 
126  if( data.empty() )
127  {
128  if(!alreadySaidWaiting_) {
129  edm::LogInfo("EventServerProxy") << "...waiting for header from event server...";
130  alreadySaidWaiting_ = true;
131  }
132  // sleep for desired amount of time
133  ::sleep(regInfo_.headerRetryInterval());
134  }
135  else
136  {
137  alreadySaidWaiting_ = false;
138  }
139  }
140 
141  template<>
142  inline void
145  {
146  try {
147  HeaderView hdrView(&data[0]);
148  if (hdrView.code() != Header::INIT) {
149  throw cms::Exception("EventServerProxy", "readHeader");
150  }
151  }
152  catch (cms::Exception excpt) {
153  const unsigned int MAX_DUMP_LENGTH = 1000;
154  std::ostringstream dump;
155  dump << "========================================" << std::endl;
156  dump << "* Exception decoding the getregdata response from the event server!" << std::endl;
157  if (data.size() <= MAX_DUMP_LENGTH)
158  {
159  dump << "* Here is the raw text that was returned:" << std::endl;
160  dump << std::string(&data[0]) << std::endl;
161  }
162  else
163  {
164  dump << "* Here are the first " << MAX_DUMP_LENGTH <<
165  " characters of the raw text that was returned:" << std::endl;
166  dump << std::string(&data[0], MAX_DUMP_LENGTH) << std::endl;
167  }
168  dump << "========================================" << std::endl;
169  edm::LogError("EventServerProxy") << dump.str();
170  throw excpt;
171  }
172  }
173 
174  template<>
175  inline void
178  {
179  do
180  {
181  data.clear();
182  getInitMsgFromEventServer(data);
183  }
184  while ( !edm::shutdown_flag && data.empty() );
185 
186  if (edm::shutdown_flag) {
187  throw cms::Exception("readHeader","EventServerProxy")
188  << "The header read was aborted by a shutdown request.\n";
189  }
190 
191  checkInitMsg(data);
192  }
193 
194 } // namespace stor
195 
196 #endif // EventFilter_StorageManager_EventServerProxy_h
197 
198 
bool extractConsumerId(CurlInterface::Content &)
void connectToEventServer(CurlInterface::Content &)
void getInitMsgFromEventServer(CurlInterface::Content &)
uint32 code() const
Definition: MsgHeader.h:34
void sleep(Duration_t)
Definition: Utils.h:163
void convert(uint32 i, char_uint32 v)
Definition: MsgTools.h:46
bool getEventMaybe(CurlInterface::Content &data)
void checkInitMsg(CurlInterface::Content &)
boost::posix_time::time_duration Duration_t
Definition: Utils.h:41
static boost::shared_ptr< CurlInterface > getInterface()
uint8 * startAddress()
Definition: OtherMessage.h:32
tuple result
Definition: query.py:137
const stor::utils::Duration_t minEventRequestInterval_
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
void getInitMsg(CurlInterface::Content &data)
unsigned char char_uint32[sizeof(uint32)]
Definition: MsgTools.h:16
boost::shared_ptr< CurlInterface > CurlInterfacePtr
Definition: CurlInterface.h:71
void getOneEventFromEventServer(CurlInterface::Content &)
stor::utils::TimePoint_t nextRequestTime_
void checkEvent(CurlInterface::Content &)
unsigned char uint8
Definition: MsgTools.h:11
uint32 size() const
Definition: OtherMessage.h:30
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
void getOneEvent(CurlInterface::Content &data)
EventServerProxy(edm::ParameterSet const &)
volatile bool shutdown_flag
unsigned int failedAttemptsToGetData_
std::vector< char > Content
Definition: CurlInterface.h:31