CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
XrdSource.cc
Go to the documentation of this file.
1 
2 // See http://stackoverflow.com/questions/12523122/what-is-glibcxx-use-nanosleep-all-about
3 #define _GLIBCXX_USE_NANOSLEEP
4 #include <thread>
5 #include <chrono>
6 #include <atomic>
7 #include <iostream>
8 #include <assert.h>
9 #include <netdb.h>
10 
11 #include "XrdCl/XrdClFile.hh"
12 
15 
16 #include "XrdSource.h"
17 #include "XrdRequest.h"
18 #include "QualityMetric.h"
19 #include "XrdStatistics.h"
20 
21 #define MAX_REQUEST 256*1024
22 #define XRD_CL_MAX_CHUNK 512*1024
23 
24 #ifdef XRD_FAKE_SLOW
25 //#define XRD_DELAY 5140
26 #define XRD_DELAY 1000
27 #define XRD_SLOW_RATE 2
28 std::atomic<int> g_delayCount {0};
29 #else
30 std::atomic<int> g_delayCount {0};
31 #endif
32 
33 using namespace XrdAdaptor;
34 
35 // File::Close() can take awhile - slow servers (which are probably
36 // inactive anyway!) can even timeout. Rather than wait around for
37 // a few minutes in the main thread, this class asynchronously closes
38 // and deletes the XrdCl::File
39 class DelayedClose : boost::noncopyable, public XrdCl::ResponseHandler
40 {
41 public:
42 
43  DelayedClose(std::shared_ptr<XrdCl::File> fh, const std::string &id, const std::string &site)
44  : m_fh(std::move(fh)),
45  m_id(id),
46  m_site(site)
47  {
48  if (m_fh && m_fh->IsOpen())
49  {
50  if (!m_fh->Close(this).IsOK())
51  {
52  delete this;
53  }
54  }
55  }
56 
57 
58  virtual ~DelayedClose() = default;
59 
60 
61  virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
62  {
63  if (status && !status->IsOK())
64  {
65 
66  edm::LogWarning("XrdFileWarning") << "Source delayed close failed with error '" << status->ToStr()
67  << "' (errno=" << status->errNo << ", code=" << status->code << ", server=" << m_id << ", site=" << m_site << ")";
68  }
69  delete status;
70  delete hostList;
71  // NOTE: we do not delete response (copying behavior from XrdCl).
72  delete this;
73  }
74 
75 
76 private:
80 };
81 
82 
88 class QueryAttrHandler : public XrdCl::ResponseHandler
89 {
90  friend std::unique_ptr<QueryAttrHandler> std::make_unique<QueryAttrHandler>();
91 
92 public:
93 
94  virtual ~QueryAttrHandler() = default;
95  QueryAttrHandler(const QueryAttrHandler&) = delete;
96  QueryAttrHandler& operator=(const QueryAttrHandler&) = delete;
97 
98 
99  static XrdCl::XRootDStatus query(XrdCl::FileSystem &fs, const std::string &attr, std::chrono::milliseconds timeout, std::string &result)
100  {
101  auto handler = std::make_unique<QueryAttrHandler>();
102  auto l_state = std::make_shared<QueryAttrState>();
103  handler->m_state = l_state;
104  XrdCl::Buffer arg(attr.size());
105  arg.FromString(attr);
106 
107  XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Config, arg, handler.get());
108  if (!st.IsOK())
109  {
110  return st;
111  }
112 
113  // Successfully registered the callback; it will always delete itself, so we shouldn't.
114  handler.release();
115 
116  std::unique_lock<std::mutex> guard(l_state->m_mutex);
117  // Wait until some status is available or a timeout.
118  l_state->m_condvar.wait_for(guard, timeout, [&]{return l_state->m_status.get();});
119 
120  if (l_state->m_status)
121  {
122  if (l_state->m_status->IsOK())
123  {
124  result = l_state->m_response->ToString();
125  }
126  return *(l_state->m_status);
127  }
128  else
129  { // We had a timeout; construct a reasonable message.
130  return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errSocketTimeout, 1, "Timeout when waiting for query callback.");
131  }
132  }
133 
134 
135 private:
136 
138 
139 
140  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response ) override
141  {
142  // NOTE: we own the status and response pointers.
143  std::unique_ptr<XrdCl::AnyObject> response_mgr;
144  response_mgr.reset(response);
145 
146  // Lock our state information then dispose of our object.
147  auto l_state = m_state.lock();
148  delete this;
149  if (!l_state) {return;}
150 
151  // On function exit, notify any waiting threads.
152  std::unique_ptr<char, std::function<void(char*)>> notify_guard(nullptr, [&](char *) {l_state->m_condvar.notify_all();});
153 
154  {
155  // On exit from the block, make sure m_status is set; it needs to be set before we notify threads.
156  std::unique_ptr<char, std::function<void(char*)>> exit_guard(nullptr, [&](char *) {if (!l_state->m_status) l_state->m_status.reset(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInternal));});
157  if (!status) {return;}
158  if (status->IsOK())
159  {
160  if (!response) {return;}
161  XrdCl::Buffer *buf_ptr;
162  response->Get(buf_ptr);
163  // AnyObject::Set lacks specialization for nullptr
164  response->Set(static_cast<int *>(nullptr));
165  l_state->m_response.reset(buf_ptr);
166  }
167  l_state->m_status.reset(status);
168  }
169  }
170 
171 
172  // Represents the current state of the callback. The parent class only manages a weak_ptr
173  // to the state. If the asynchronous callback cannot lock the weak_ptr, then it assumes the
174  // main thread has given up and doesn't touch any of the state variables.
175  struct QueryAttrState {
176 
177  // Synchronize between the callback thread and the main thread; condvar predicate
178  // is having m_status set. m_mutex protects m_status.
180  std::condition_variable m_condvar;
181 
182  // Results from the server
183  std::unique_ptr<XrdCl::XRootDStatus> m_status;
184  std::unique_ptr<XrdCl::Buffer> m_response;
185  };
186  std::weak_ptr<QueryAttrState> m_state;
187 };
188 
189 
190 Source::Source(timespec now, std::unique_ptr<XrdCl::File> fh, const std::string &exclude)
191  : m_lastDowngrade({0, 0}),
192  m_id("(unknown)"),
193  m_exclude(exclude),
194  m_fh(std::move(fh)),
195  m_stats(nullptr)
196 #ifdef XRD_FAKE_SLOW
197  , m_slow(++g_delayCount % XRD_SLOW_RATE == 0)
198  //, m_slow(++g_delayCount >= XRD_SLOW_RATE)
199  //, m_slow(true)
200 #endif
201 {
202  if (m_fh.get())
203  {
204  if (!m_fh->GetProperty("DataServer", m_id))
205  {
206  edm::LogWarning("XrdFileWarning")
207  << "Source::Source() failed to determine data server name.'";
208  }
209  if (!m_exclude.size()) {m_exclude = m_id;}
210  }
212  m_prettyid = m_id + " (unknown site)";
214  if (getDomain(m_id, domain_id)) {m_site = domain_id;}
215  else {m_site = "Unknown (" + m_id + ")";}
216  setXrootdSite();
217  assert(m_qm.get());
218  assert(m_fh.get());
220  if (statsService)
221  {
222  m_stats = statsService->getStatisticsForSite(m_site);
223  }
224 }
225 
226 
227 bool Source::getHostname(const std::string &id, std::string &hostname)
228 {
229  size_t pos = id.find(":");
230  hostname = id;
231  if ((pos != std::string::npos) && (pos > 0)) {hostname = id.substr(0, pos);}
232 
233  bool retval = true;
234  if (hostname.size() && ((hostname[0] == '[') || isdigit(hostname[0])))
235  {
236  retval = false;
237  struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo));
238  hints.ai_family = AF_UNSPEC;
239  struct addrinfo *result;
240  if (!getaddrinfo(hostname.c_str(), NULL, &hints, &result))
241  {
242  std::vector<char> host; host.reserve(256);
243  if (!getnameinfo(result->ai_addr, result->ai_addrlen, &host[0], 255, NULL, 0, NI_NAMEREQD))
244  {
245  hostname = &host[0];
246  retval = true;
247  }
248  freeaddrinfo(result);
249  }
250  }
251  return retval;
252 }
253 
254 
256 {
257  getHostname(host, domain);
258  size_t pos = domain.find(".");
259  if (pos != std::string::npos && (pos < domain.size())) {domain = domain.substr(pos+1);}
260 
261  return domain.size();
262 }
263 
264 
265 bool
266 Source::isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList)
267 {
268  // WORKAROUND: On open-file recovery in the Xrootd client, it'll carry around the
269  // dCache opaque information to other sites, causing isDCachePool to erroneously return
270  // true. We are working with the upstream developers to solve this.
271  //
272  // For now, we see if the previous server also looks like a dCache pool - something that
273  // wouldn't happen at a real site, as the previous server should look like a dCache door.
274  std::string lastUrl;
275  file.GetProperty("LastURL", lastUrl);
276  if (lastUrl.size())
277  {
278  bool result = isDCachePool(lastUrl);
279  if (result && hostList && (hostList->size() > 1))
280  {
281  if (isDCachePool((*hostList)[hostList->size()-2].url.GetURL()))
282  {
283  return false;
284  }
285  return true;
286  }
287  return result;
288  }
289  return false;
290 }
291 
292 bool
294 {
295  XrdCl::URL url(lastUrl);
296  XrdCl::URL::ParamsMap map = url.GetParams();
297  // dCache pools always utilize this opaque identifier.
298  if (map.find("org.dcache.uuid") != map.end())
299  {
300  return true;
301  }
302  return false;
303 }
304 
305 
306 void
307 Source::determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
308 {
309  // Detect a dCache pool and, if we are in the federation context, give a custom
310  // exclude parameter.
311  // We assume this is a federation context if there's at least a regional, dCache door,
312  // and dCache pool server (so, more than 2 servers!).
313 
314  exclude = "";
315  if (hostList && (hostList->size() > 3) && isDCachePool(file, hostList))
316  {
317  const XrdCl::HostInfo &info = (*hostList)[hostList->size()-3];
318  exclude = info.url.GetHostName();
319  std::string lastUrl; file.GetProperty("LastURL", lastUrl);
320  edm::LogVerbatim("XrdAdaptorInternal") << "Changing exclude list for URL " << lastUrl << " to " << exclude;
321  }
322 }
323 
324 
325 bool
327 {
328  std::string lastUrl;
329  fh.GetProperty("LastURL", lastUrl);
330  if (!lastUrl.size() || isDCachePool(lastUrl))
331  {
332  std::string server, id;
333  if (!fh.GetProperty("DataServer", server)) {id = "(unknown)";}
334  else {id = server;}
335  if (!lastUrl.size()) {edm::LogWarning("XrdFileWarning") << "Unable to determine the URL associated with server " << id;}
336  site = "Unknown";
337  if (server.size()) {getDomain(server, site);}
338  return false;
339  }
340  return getXrootdSiteFromURL(lastUrl, site);
341 }
342 
343 bool
345 {
346  const std::string attr = "sitename";
347  XrdCl::Buffer *response = 0;
348  XrdCl::Buffer arg( attr.size() );
349  arg.FromString( attr );
350 
351  XrdCl::FileSystem fs(url);
352  std::string rsite;
353  XrdCl::XRootDStatus st = QueryAttrHandler::query(fs, "sitename", std::chrono::seconds(1), rsite);
354  if (!st.IsOK())
355  {
356  XrdCl::URL xurl(url);
357  getDomain(xurl.GetHostName(), site);
358  delete response;
359  return false;
360  }
361  if (!rsite.empty() && (rsite[rsite.size()-1] == '\n'))
362  {
363  rsite = rsite.substr(0, rsite.size()-1);
364  }
365  if (rsite == "sitename")
366  {
367  XrdCl::URL xurl(url);
368  getDomain(xurl.GetHostName(), site);
369  return false;
370  }
371  site = rsite;
372  return true;
373 }
374 
375 void
377 {
378  std::string site;
379  bool goodSitename = getXrootdSite(*m_fh, site);
380  if (!goodSitename)
381  {
382  edm::LogInfo("XrdAdaptorInternal")
383  << "Xrootd server at " << m_id << " did not provide a sitename. Monitoring may be incomplete.";
384  }
385  else
386  {
387  m_site = site;
388  m_prettyid = m_id + " (site " + m_site + ")";
389  }
390  edm::LogInfo("XrdAdaptorInternal") << "Reading from new server " << m_id << " at site " << m_site;
391 }
392 
394 {
395  new DelayedClose(fh(), m_id, m_site);
396 }
397 
398 std::shared_ptr<XrdCl::File>
400 {
401  return fh();
402 }
403 
404 static void
405 validateList(const XrdCl::ChunkList& cl)
406 {
407  off_t last_offset = -1;
408  for (const auto & ci : cl)
409  {
410  assert(static_cast<off_t>(ci.offset) > last_offset);
411  last_offset = ci.offset;
412  assert(ci.length <= XRD_CL_MAX_CHUNK);
413  assert(ci.offset < 0x1ffffffffff);
414  assert(ci.offset > 0);
415  }
416  assert(cl.size() <= 1024);
417 }
418 
419 void
420 Source::handle(std::shared_ptr<ClientRequest> c)
421 {
422  edm::LogVerbatim("XrdAdaptorInternal") << "Reading from " << ID() << ", quality " << m_qm->get() << std::endl;
423  c->m_source = shared_from_this();
424  c->m_self_reference = c;
425  m_qm->startWatch(c->m_qmw);
426  if (m_stats)
427  {
428  std::shared_ptr<XrdReadStatistics> readStats = XrdSiteStatistics::startRead(stats(), c);
429  c->setStatistics(readStats);
430  }
431 #ifdef XRD_FAKE_SLOW
432  if (m_slow) std::this_thread::sleep_for(std::chrono::milliseconds(XRD_DELAY));
433 #endif
434 
435  XrdCl::XRootDStatus status;
436  if (c->m_into)
437  {
438  // See notes in ClientRequest definition to understand this voodoo.
439  status = m_fh->Read(c->m_off, c->m_size, c->m_into, c.get());
440  }
441  else
442  {
443  XrdCl::ChunkList cl;
444  cl.reserve(c->m_iolist->size());
445  for (const auto & it : *c->m_iolist)
446  {
447  cl.emplace_back(it.offset(), it.size(), it.data());
448  }
449  validateList(cl);
450  status = m_fh->VectorRead(cl, nullptr, c.get());
451  }
452 
453  if (!status.IsOK())
454  {
456  ex << "XrdFile::Read or XrdFile::VectorRead failed with error: '"
457  << status.ToStr() << "' (errNo = " << status.errNo << ")";
458  ex.addContext("Calling Source::handle");
459  throw ex;
460  }
461 }
462 
static bool getXrootdSite(XrdCl::File &file, std::string &site)
Definition: XrdSource.cc:326
std::string m_id
Definition: XrdSource.h:74
std::atomic< int > g_delayCount
Definition: XrdSource.cc:30
static const TGPicture * info(bool iBackgroundIsBlack)
double seconds()
static boost::mutex mutex
Definition: LHEProxy.cc:11
std::unique_ptr< XrdCl::Buffer > m_response
Definition: XrdSource.cc:184
std::shared_ptr< XrdSiteStatistics > getStatisticsForSite(std::string const &site)
edm::propagate_const< std::shared_ptr< XrdSiteStatistics > > m_stats
Definition: XrdSource.h:81
static void validateList(const XrdCl::ChunkList &cl)
Definition: XrdSource.cc:405
static XrdCl::XRootDStatus query(XrdCl::FileSystem &fs, const std::string &attr, std::chrono::milliseconds timeout, std::string &result)
Definition: XrdSource.cc:99
std::string m_site
Definition: XrdSource.cc:79
assert(m_qm.get())
setXrootdSite()
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:307
DelayedClose(std::shared_ptr< XrdCl::File > fh, const std::string &id, const std::string &site)
Definition: XrdSource.cc:43
#define NULL
Definition: scimark2.h:8
std::string m_site
Definition: XrdSource.h:76
void handle(std::shared_ptr< ClientRequest >)
Definition: XrdSource.cc:420
A arg
Definition: Factorize.h:36
std::shared_ptr< XrdSiteStatistics const > stats() const
Definition: XrdSource.h:70
std::string m_id
Definition: XrdSource.cc:78
edm::propagate_const< std::shared_ptr< XrdCl::File > > m_fh
Definition: XrdSource.cc:77
static std::unique_ptr< QualityMetricSource > get(timespec now, const std::string &id)
edm::propagate_const< std::shared_ptr< XrdCl::File > > m_fh
Definition: XrdSource.h:78
m_exclude(exclude)
static std::shared_ptr< XrdReadStatistics > startRead(std::shared_ptr< XrdSiteStatistics > parent, std::shared_ptr< ClientRequest > req)
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:255
virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
Definition: XrdSource.cc:140
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:266
Source(timespec now, std::unique_ptr< XrdCl::File > fileHandle, const std::string &exclude)
Definition: XrdSource.cc:190
tuple result
Definition: query.py:137
def move
Definition: eostools.py:510
XrdSiteStatisticsInformation * statsService
Definition: XrdSource.cc:219
static XrdSiteStatisticsInformation * getInstance()
m_prettyid
Definition: XrdSource.cc:212
std::string domain_id
Definition: XrdSource.cc:213
std::shared_ptr< XrdCl::File const > fh() const
Definition: XrdSource.h:68
m_id("(unknown)")
string host
Definition: query.py:114
#define XRD_CL_MAX_CHUNK
Definition: XrdSource.cc:22
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
Definition: XrdSource.cc:61
void addContext(std::string const &context)
Definition: Exception.cc:227
std::string m_prettyid
Definition: XrdSource.h:75
std::condition_variable m_condvar
Definition: XrdSource.cc:180
std::shared_ptr< XrdCl::File > getFileHandle()
Definition: XrdSource.cc:399
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:344
tuple Config
Definition: helper.py:9
edm::propagate_const< std::unique_ptr< QualityMetricSource > > m_qm
Definition: XrdSource.h:80
tuple status
Definition: ntuplemaker.py:245
m_fh(std::move(fh))
std::weak_ptr< QueryAttrState > m_state
Definition: XrdSource.cc:186
std::unique_ptr< XrdCl::XRootDStatus > m_status
Definition: XrdSource.cc:183
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
const std::string & ID() const
Definition: XrdSource.h:37