CMS 3D CMS Logo

XrdSource.cc
Go to the documentation of this file.
1 
2 // See http://stackoverflow.com/questions/12523122/what-is-glibcxx-use-nanosleep-all-about
3 #define _GLIBCXX_USE_NANOSLEEP
4 #include <memory>
5 
6 #include <thread>
7 #include <chrono>
8 #include <atomic>
9 #include <iostream>
10 #include <cassert>
11 #include <netdb.h>
12 
13 #include "XrdCl/XrdClFile.hh"
14 
17 
18 #include "XrdSource.h"
19 #include "XrdRequest.h"
20 #include "QualityMetric.h"
21 #include "XrdStatistics.h"
22 
23 #define MAX_REQUEST 256 * 1024
24 #define XRD_CL_MAX_CHUNK 512 * 1024
25 
26 #ifdef XRD_FAKE_SLOW
27 //#define XRD_DELAY 5140
28 #define XRD_DELAY 1000
29 #define XRD_SLOW_RATE 2
30 std::atomic<int> g_delayCount{0};
31 #else
32 std::atomic<int> g_delayCount{0};
33 #endif
34 
35 using namespace XrdAdaptor;
36 
37 // File::Close() can take awhile - slow servers (which are probably
38 // inactive anyway!) can even timeout. Rather than wait around for
39 // a few minutes in the main thread, this class asynchronously closes
40 // and deletes the XrdCl::File
41 class DelayedClose : public XrdCl::ResponseHandler {
42 public:
43  DelayedClose(const DelayedClose &) = delete;
44  DelayedClose &operator=(const DelayedClose &) = delete;
45 
46  DelayedClose(std::shared_ptr<XrdCl::File> fh, const std::string &id, const std::string &site)
47  : m_fh(std::move(fh)), m_id(id), m_site(site) {
48  if (m_fh && m_fh->IsOpen()) {
49  if (!m_fh->Close(this).IsOK()) {
50  delete this;
51  }
52  }
53  }
54 
55  ~DelayedClose() override = default;
56 
57  void HandleResponseWithHosts(XrdCl::XRootDStatus *status,
58  XrdCl::AnyObject *response,
59  XrdCl::HostList *hostList) override {
60  if (status && !status->IsOK()) {
61  edm::LogWarning("XrdFileWarning") << "Source delayed close failed with error '" << status->ToStr()
62  << "' (errno=" << status->errNo << ", code=" << status->code
63  << ", server=" << m_id << ", site=" << m_site << ")";
64  }
65  delete status;
66  delete hostList;
67  // NOTE: we do not delete response (copying behavior from XrdCl).
68  delete this;
69  }
70 
71 private:
75 };
76 
82 class QueryAttrHandler : public XrdCl::ResponseHandler {
83  friend std::unique_ptr<QueryAttrHandler> std::make_unique<QueryAttrHandler>();
84 
85 public:
86  QueryAttrHandler() = delete;
87  ~QueryAttrHandler() override = default;
88  QueryAttrHandler(const QueryAttrHandler &) = delete;
89  QueryAttrHandler &operator=(const QueryAttrHandler &) = delete;
90 
91  QueryAttrHandler(const std::string &url) : m_fs(url) {}
92 
93  static XrdCl::XRootDStatus query(const std::string &url,
94  const std::string &attr,
95  std::chrono::milliseconds timeout,
97  auto handler = std::make_unique<QueryAttrHandler>(url);
98  auto l_state = std::make_shared<QueryAttrState>();
99  handler->m_state = l_state;
100  XrdCl::Buffer arg(attr.size());
101  arg.FromString(attr);
102 
103  XrdCl::XRootDStatus st = handler->m_fs.Query(XrdCl::QueryCode::Config, arg, handler.get());
104  if (!st.IsOK()) {
105  return st;
106  }
107 
108  // Successfully registered the callback; it will always delete itself, so we shouldn't.
109  handler.release();
110 
111  std::unique_lock<std::mutex> guard(l_state->m_mutex);
112  // Wait until some status is available or a timeout.
113  l_state->m_condvar.wait_for(guard, timeout, [&] { return l_state->m_status.get(); });
114 
115  if (l_state->m_status) {
116  if (l_state->m_status->IsOK()) {
117  result = l_state->m_response->ToString();
118  }
119  return *(l_state->m_status);
120  } else { // We had a timeout; construct a reasonable message.
121  return XrdCl::XRootDStatus(
122  XrdCl::stError, XrdCl::errSocketTimeout, 1, "Timeout when waiting for query callback.");
123  }
124  }
125 
126 private:
127  void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
128  // NOTE: we own the status and response pointers.
129  std::unique_ptr<XrdCl::AnyObject> response_mgr;
130  response_mgr.reset(response);
131 
132  // Lock our state information then dispose of our object.
133  auto l_state = m_state.lock();
134  delete this;
135  if (!l_state) {
136  return;
137  }
138 
139  // On function exit, notify any waiting threads.
140  std::unique_ptr<char, std::function<void(char *)>> notify_guard(nullptr,
141  [&](char *) { l_state->m_condvar.notify_all(); });
142 
143  {
144  // On exit from the block, make sure m_status is set; it needs to be set before we notify threads.
145  std::unique_ptr<char, std::function<void(char *)>> exit_guard(nullptr, [&](char *) {
146  if (!l_state->m_status)
147  l_state->m_status = std::make_unique<XrdCl::XRootDStatus>(XrdCl::stError, XrdCl::errInternal);
148  });
149  if (!status) {
150  return;
151  }
152  if (status->IsOK()) {
153  if (!response) {
154  return;
155  }
156  XrdCl::Buffer *buf_ptr;
157  response->Get(buf_ptr);
158  // AnyObject::Set lacks specialization for nullptr
159  response->Set(static_cast<int *>(nullptr));
160  l_state->m_response.reset(buf_ptr);
161  }
162  l_state->m_status.reset(status);
163  }
164  }
165 
166  // Represents the current state of the callback. The parent class only manages a weak_ptr
167  // to the state. If the asynchronous callback cannot lock the weak_ptr, then it assumes the
168  // main thread has given up and doesn't touch any of the state variables.
169  struct QueryAttrState {
170  // Synchronize between the callback thread and the main thread; condvar predicate
171  // is having m_status set. m_mutex protects m_status.
173  std::condition_variable m_condvar;
174 
175  // Results from the server
176  std::unique_ptr<XrdCl::XRootDStatus> m_status;
177  std::unique_ptr<XrdCl::Buffer> m_response;
178  };
179  std::weak_ptr<QueryAttrState> m_state;
180  XrdCl::FileSystem m_fs;
181 };
182 
183 Source::Source(timespec now, std::unique_ptr<XrdCl::File> fh, const std::string &exclude)
184  : m_lastDowngrade({0, 0}),
185  m_id("(unknown)"),
186  m_exclude(exclude),
187  m_fh(std::move(fh)),
188  m_stats(nullptr)
189 #ifdef XRD_FAKE_SLOW
190  ,
191  m_slow(++g_delayCount % XRD_SLOW_RATE == 0)
192 //, m_slow(++g_delayCount >= XRD_SLOW_RATE)
193 //, m_slow(true)
194 #endif
195 {
196  if (m_fh.get()) {
197  if (!m_fh->GetProperty("DataServer", m_id)) {
198  edm::LogWarning("XrdFileWarning") << "Source::Source() failed to determine data server name.'";
199  }
200  if (m_exclude.empty()) {
201  m_exclude = m_id;
202  }
203  }
204  m_qm = QualityMetricFactory::get(now, m_id);
205  m_prettyid = m_id + " (unknown site)";
206  std::string domain_id;
207  if (getDomain(m_id, domain_id)) {
208  m_site = domain_id;
209  } else {
210  m_site = "Unknown (" + m_id + ")";
211  }
212  setXrootdSite();
213  assert(m_qm.get());
214  assert(m_fh.get());
216  if (statsService) {
217  m_stats = statsService->getStatisticsForSite(m_site);
218  }
219 }
220 
221 bool Source::getHostname(const std::string &id, std::string &hostname) {
222  size_t pos = id.find_last_of(':');
223  hostname = id;
224  if ((pos != std::string::npos) && (pos > 0)) {
225  hostname = id.substr(0, pos);
226  }
227 
228  bool retval = true;
229  if (!hostname.empty() && ((hostname[0] == '[') || isdigit(hostname[0]))) {
230  retval = false;
231  struct addrinfo hints;
232  memset(&hints, 0, sizeof(struct addrinfo));
233  hints.ai_family = AF_UNSPEC;
234  struct addrinfo *result;
235  if (!getaddrinfo(hostname.c_str(), nullptr, &hints, &result)) {
236  std::vector<char> host;
237  host.reserve(256);
238  if (!getnameinfo(result->ai_addr, result->ai_addrlen, &host[0], 255, nullptr, 0, NI_NAMEREQD)) {
239  hostname = &host[0];
240  retval = true;
241  }
242  freeaddrinfo(result);
243  }
244  }
245  return retval;
246 }
247 
249  getHostname(host, domain);
250  size_t pos = domain.find('.');
251  if (pos != std::string::npos && (pos < domain.size())) {
252  domain = domain.substr(pos + 1);
253  }
254 
255  return !domain.empty();
256 }
257 
258 bool Source::isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList) {
259  // WORKAROUND: On open-file recovery in the Xrootd client, it'll carry around the
260  // dCache opaque information to other sites, causing isDCachePool to erroneously return
261  // true. We are working with the upstream developers to solve this.
262  //
263  // For now, we see if the previous server also looks like a dCache pool - something that
264  // wouldn't happen at a real site, as the previous server should look like a dCache door.
265  std::string lastUrl;
266  file.GetProperty("LastURL", lastUrl);
267  if (!lastUrl.empty()) {
268  bool result = isDCachePool(lastUrl);
269  if (result && hostList && (hostList->size() > 1)) {
270  if (isDCachePool((*hostList)[hostList->size() - 2].url.GetURL())) {
271  return false;
272  }
273  return true;
274  }
275  return result;
276  }
277  return false;
278 }
279 
280 bool Source::isDCachePool(const std::string &lastUrl) {
281  XrdCl::URL url(lastUrl);
282  XrdCl::URL::ParamsMap map = url.GetParams();
283  // dCache pools always utilize this opaque identifier.
284  if (map.find("org.dcache.uuid") != map.end()) {
285  return true;
286  }
287  return false;
288 }
289 
290 void Source::determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude) {
291  // Detect a dCache pool and, if we are in the federation context, give a custom
292  // exclude parameter.
293  // We assume this is a federation context if there's at least a regional, dCache door,
294  // and dCache pool server (so, more than 2 servers!).
295 
296  exclude = "";
297  if (hostList && (hostList->size() > 3) && isDCachePool(file, hostList)) {
298  const XrdCl::HostInfo &info = (*hostList)[hostList->size() - 3];
299  exclude = info.url.GetHostName();
300  std::string lastUrl;
301  file.GetProperty("LastURL", lastUrl);
302  edm::LogVerbatim("XrdAdaptorInternal") << "Changing exclude list for URL " << lastUrl << " to " << exclude;
303  }
304 }
305 
307  std::string lastUrl;
308  fh.GetProperty("LastURL", lastUrl);
309  if (lastUrl.empty() || isDCachePool(lastUrl)) {
311  if (!fh.GetProperty("DataServer", server)) {
312  id = "(unknown)";
313  } else {
314  id = server;
315  }
316  if (lastUrl.empty()) {
317  edm::LogWarning("XrdFileWarning") << "Unable to determine the URL associated with server " << id;
318  }
319  site = "Unknown";
320  if (!server.empty()) {
322  }
323  return false;
324  }
325  return getXrootdSiteFromURL(lastUrl, site);
326 }
327 
329  const std::string attr = "sitename";
330  XrdCl::Buffer *response = nullptr;
331  XrdCl::Buffer arg(attr.size());
332  arg.FromString(attr);
333 
334  std::string rsite;
335  XrdCl::XRootDStatus st = QueryAttrHandler::query(url, "sitename", std::chrono::seconds(1), rsite);
336  if (!st.IsOK()) {
337  XrdCl::URL xurl(url);
338  getDomain(xurl.GetHostName(), site);
339  delete response;
340  return false;
341  }
342  if (!rsite.empty() && (rsite[rsite.size() - 1] == '\n')) {
343  rsite = rsite.substr(0, rsite.size() - 1);
344  }
345  if (rsite == "sitename") {
346  XrdCl::URL xurl(url);
347  getDomain(xurl.GetHostName(), site);
348  return false;
349  }
350  site = rsite;
351  return true;
352 }
353 
356  bool goodSitename = getXrootdSite(*m_fh, site);
357  if (!goodSitename) {
358  edm::LogInfo("XrdAdaptorInternal") << "Xrootd server at " << m_id
359  << " did not provide a sitename. Monitoring may be incomplete.";
360  } else {
361  m_site = site;
362  m_prettyid = m_id + " (site " + m_site + ")";
363  }
364  edm::LogInfo("XrdAdaptorInternal") << "Reading from new server " << m_id << " at site " << m_site;
365 }
366 
368 
369 std::shared_ptr<XrdCl::File> Source::getFileHandle() { return fh(); }
370 
371 static void validateList(const XrdCl::ChunkList &cl) {
372  off_t last_offset = -1;
373  for (const auto &ci : cl) {
374  assert(static_cast<off_t>(ci.offset) > last_offset);
375  last_offset = ci.offset;
376  assert(ci.length <= XRD_CL_MAX_CHUNK);
377  assert(ci.offset < 0x1ffffffffff);
378  assert(ci.offset > 0);
379  }
380  assert(cl.size() <= 1024);
381 }
382 
383 void Source::handle(std::shared_ptr<ClientRequest> c) {
384  edm::LogVerbatim("XrdAdaptorInternal") << "Reading from " << ID() << ", quality " << m_qm->get() << std::endl;
385  c->m_source = shared_from_this();
386  c->m_self_reference = c;
387  m_qm->startWatch(c->m_qmw);
388  if (m_stats) {
389  std::shared_ptr<XrdReadStatistics> readStats = XrdSiteStatistics::startRead(stats(), c);
390  c->setStatistics(readStats);
391  }
392 #ifdef XRD_FAKE_SLOW
393  if (m_slow)
394  std::this_thread::sleep_for(std::chrono::milliseconds(XRD_DELAY));
395 #endif
396 
397  XrdCl::XRootDStatus status;
398  if (c->m_into) {
399  // See notes in ClientRequest definition to understand this voodoo.
400  status = m_fh->Read(c->m_off, c->m_size, c->m_into, c.get());
401  } else {
402  XrdCl::ChunkList cl;
403  cl.reserve(c->m_iolist->size());
404  for (const auto &it : *c->m_iolist) {
405  cl.emplace_back(it.offset(), it.size(), it.data());
406  }
407  validateList(cl);
408  status = m_fh->VectorRead(cl, nullptr, c.get());
409  }
410 
411  if (!status.IsOK()) {
413  ex << "XrdFile::Read or XrdFile::VectorRead failed with error: '" << status.ToStr() << "' (errNo = " << status.errNo
414  << ")";
415  ex.addContext("Calling Source::handle");
416  throw ex;
417  }
418 }
XrdCl::FileSystem m_fs
Definition: XrdSource.cc:180
Log< level::Info, true > LogVerbatim
static bool getXrootdSite(XrdCl::File &file, std::string &site)
Definition: XrdSource.cc:306
std::string m_id
Definition: XrdSource.h:75
std::atomic< int > g_delayCount
Definition: XrdSource.cc:32
Basic3DVector & operator=(const Basic3DVector &)=default
Assignment operator.
static const TGPicture * info(bool iBackgroundIsBlack)
std::shared_ptr< XrdCl::File const > fh() const
Definition: XrdSource.h:69
double seconds()
string host
Definition: query.py:115
std::unique_ptr< XrdCl::Buffer > m_response
Definition: XrdSource.cc:177
const std::string & ID() const
Definition: XrdSource.h:38
edm::propagate_const< std::shared_ptr< XrdSiteStatistics > > m_stats
Definition: XrdSource.h:82
static void validateList(const XrdCl::ChunkList &cl)
Definition: XrdSource.cc:371
static std::mutex mutex
Definition: Proxy.cc:8
std::string m_site
Definition: XrdSource.cc:74
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:290
QueryAttrHandler(const std::string &url)
Definition: XrdSource.cc:91
DelayedClose(std::shared_ptr< XrdCl::File > fh, const std::string &id, const std::string &site)
Definition: XrdSource.cc:46
std::string m_site
Definition: XrdSource.h:77
void handle(std::shared_ptr< ClientRequest >)
Definition: XrdSource.cc:383
assert(be >=bs)
A arg
Definition: Factorize.h:31
std::string m_id
Definition: XrdSource.cc:73
edm::propagate_const< std::shared_ptr< XrdCl::File > > m_fh
Definition: XrdSource.cc:72
static std::unique_ptr< QualityMetricSource > get(timespec now, const std::string &id)
edm::propagate_const< std::shared_ptr< XrdCl::File > > m_fh
Definition: XrdSource.h:79
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
Definition: XrdSource.cc:127
static std::shared_ptr< XrdReadStatistics > startRead(std::shared_ptr< XrdSiteStatistics > parent, std::shared_ptr< ClientRequest > req)
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:248
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:258
Config
Definition: helper.py:10
static XrdCl::XRootDStatus query(const std::string &url, const std::string &attr, std::chrono::milliseconds timeout, std::string &result)
Definition: XrdSource.cc:93
static XrdSiteStatisticsInformation * getInstance()
Log< level::Info, false > LogInfo
#define XRD_CL_MAX_CHUNK
Definition: XrdSource.cc:24
void addContext(std::string const &context)
Definition: Exception.cc:169
std::string m_prettyid
Definition: XrdSource.h:76
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
Definition: XrdSource.cc:57
std::condition_variable m_condvar
Definition: XrdSource.cc:173
std::shared_ptr< XrdCl::File > getFileHandle()
Definition: XrdSource.cc:369
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:328
std::shared_ptr< XrdSiteStatistics const > stats() const
Definition: XrdSource.h:71
Log< level::Warning, false > LogWarning
edm::propagate_const< std::unique_ptr< QualityMetricSource > > m_qm
Definition: XrdSource.h:81
std::weak_ptr< QueryAttrState > m_state
Definition: XrdSource.cc:179
std::unique_ptr< XrdCl::XRootDStatus > m_status
Definition: XrdSource.cc:176
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:221
def move(src, dest)
Definition: eostools.py:511
Definition: server.py:1
Source(const Source &)=delete