CMS 3D CMS Logo

XrdSource.cc
Go to the documentation of this file.
1 
2 // See http://stackoverflow.com/questions/12523122/what-is-glibcxx-use-nanosleep-all-about
3 #define _GLIBCXX_USE_NANOSLEEP
4 #include <thread>
5 #include <chrono>
6 #include <atomic>
7 #include <iostream>
8 #include <cassert>
9 #include <netdb.h>
10 
11 #include "XrdCl/XrdClFile.hh"
12 
15 
16 #include "XrdSource.h"
17 #include "XrdRequest.h"
18 #include "QualityMetric.h"
19 #include "XrdStatistics.h"
20 
21 #define MAX_REQUEST 256 * 1024
22 #define XRD_CL_MAX_CHUNK 512 * 1024
23 
24 #ifdef XRD_FAKE_SLOW
25 //#define XRD_DELAY 5140
26 #define XRD_DELAY 1000
27 #define XRD_SLOW_RATE 2
28 std::atomic<int> g_delayCount{0};
29 #else
30 std::atomic<int> g_delayCount{0};
31 #endif
32 
33 using namespace XrdAdaptor;
34 
35 // File::Close() can take awhile - slow servers (which are probably
36 // inactive anyway!) can even timeout. Rather than wait around for
37 // a few minutes in the main thread, this class asynchronously closes
38 // and deletes the XrdCl::File
39 class DelayedClose : boost::noncopyable, public XrdCl::ResponseHandler {
40 public:
41  DelayedClose(std::shared_ptr<XrdCl::File> fh, const std::string &id, const std::string &site)
42  : m_fh(std::move(fh)), m_id(id), m_site(site) {
43  if (m_fh && m_fh->IsOpen()) {
44  if (!m_fh->Close(this).IsOK()) {
45  delete this;
46  }
47  }
48  }
49 
50  ~DelayedClose() override = default;
51 
52  void HandleResponseWithHosts(XrdCl::XRootDStatus *status,
53  XrdCl::AnyObject *response,
54  XrdCl::HostList *hostList) override {
55  if (status && !status->IsOK()) {
56  edm::LogWarning("XrdFileWarning") << "Source delayed close failed with error '" << status->ToStr()
57  << "' (errno=" << status->errNo << ", code=" << status->code
58  << ", server=" << m_id << ", site=" << m_site << ")";
59  }
60  delete status;
61  delete hostList;
62  // NOTE: we do not delete response (copying behavior from XrdCl).
63  delete this;
64  }
65 
66 private:
70 };
71 
77 class QueryAttrHandler : public XrdCl::ResponseHandler {
78  friend std::unique_ptr<QueryAttrHandler> std::make_unique<QueryAttrHandler>();
79 
80 public:
81  ~QueryAttrHandler() override = default;
82  QueryAttrHandler(const QueryAttrHandler &) = delete;
83  QueryAttrHandler &operator=(const QueryAttrHandler &) = delete;
84 
85  static XrdCl::XRootDStatus query(XrdCl::FileSystem &fs,
86  const std::string &attr,
87  std::chrono::milliseconds timeout,
89  auto handler = std::make_unique<QueryAttrHandler>();
90  auto l_state = std::make_shared<QueryAttrState>();
91  handler->m_state = l_state;
92  XrdCl::Buffer arg(attr.size());
93  arg.FromString(attr);
94 
95  XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Config, arg, handler.get());
96  if (!st.IsOK()) {
97  return st;
98  }
99 
100  // Successfully registered the callback; it will always delete itself, so we shouldn't.
101  handler.release();
102 
103  std::unique_lock<std::mutex> guard(l_state->m_mutex);
104  // Wait until some status is available or a timeout.
105  l_state->m_condvar.wait_for(guard, timeout, [&] { return l_state->m_status.get(); });
106 
107  if (l_state->m_status) {
108  if (l_state->m_status->IsOK()) {
109  result = l_state->m_response->ToString();
110  }
111  return *(l_state->m_status);
112  } else { // We had a timeout; construct a reasonable message.
113  return XrdCl::XRootDStatus(
114  XrdCl::stError, XrdCl::errSocketTimeout, 1, "Timeout when waiting for query callback.");
115  }
116  }
117 
118 private:
120 
121  void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
122  // NOTE: we own the status and response pointers.
123  std::unique_ptr<XrdCl::AnyObject> response_mgr;
124  response_mgr.reset(response);
125 
126  // Lock our state information then dispose of our object.
127  auto l_state = m_state.lock();
128  delete this;
129  if (!l_state) {
130  return;
131  }
132 
133  // On function exit, notify any waiting threads.
134  std::unique_ptr<char, std::function<void(char *)>> notify_guard(nullptr,
135  [&](char *) { l_state->m_condvar.notify_all(); });
136 
137  {
138  // On exit from the block, make sure m_status is set; it needs to be set before we notify threads.
139  std::unique_ptr<char, std::function<void(char *)>> exit_guard(nullptr, [&](char *) {
140  if (!l_state->m_status)
141  l_state->m_status.reset(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInternal));
142  });
143  if (!status) {
144  return;
145  }
146  if (status->IsOK()) {
147  if (!response) {
148  return;
149  }
150  XrdCl::Buffer *buf_ptr;
151  response->Get(buf_ptr);
152  // AnyObject::Set lacks specialization for nullptr
153  response->Set(static_cast<int *>(nullptr));
154  l_state->m_response.reset(buf_ptr);
155  }
156  l_state->m_status.reset(status);
157  }
158  }
159 
160  // Represents the current state of the callback. The parent class only manages a weak_ptr
161  // to the state. If the asynchronous callback cannot lock the weak_ptr, then it assumes the
162  // main thread has given up and doesn't touch any of the state variables.
163  struct QueryAttrState {
164  // Synchronize between the callback thread and the main thread; condvar predicate
165  // is having m_status set. m_mutex protects m_status.
167  std::condition_variable m_condvar;
168 
169  // Results from the server
170  std::unique_ptr<XrdCl::XRootDStatus> m_status;
171  std::unique_ptr<XrdCl::Buffer> m_response;
172  };
173  std::weak_ptr<QueryAttrState> m_state;
174 };
175 
176 Source::Source(timespec now, std::unique_ptr<XrdCl::File> fh, const std::string &exclude)
177  : m_lastDowngrade({0, 0}),
178  m_id("(unknown)"),
179  m_exclude(exclude),
180  m_fh(std::move(fh)),
181  m_stats(nullptr)
182 #ifdef XRD_FAKE_SLOW
183  ,
184  m_slow(++g_delayCount % XRD_SLOW_RATE == 0)
185 //, m_slow(++g_delayCount >= XRD_SLOW_RATE)
186 //, m_slow(true)
187 #endif
188 {
189  if (m_fh.get()) {
190  if (!m_fh->GetProperty("DataServer", m_id)) {
191  edm::LogWarning("XrdFileWarning") << "Source::Source() failed to determine data server name.'";
192  }
193  if (m_exclude.empty()) {
194  m_exclude = m_id;
195  }
196  }
197  m_qm = QualityMetricFactory::get(now, m_id);
198  m_prettyid = m_id + " (unknown site)";
199  std::string domain_id;
200  if (getDomain(m_id, domain_id)) {
201  m_site = domain_id;
202  } else {
203  m_site = "Unknown (" + m_id + ")";
204  }
205  setXrootdSite();
206  assert(m_qm.get());
207  assert(m_fh.get());
209  if (statsService) {
210  m_stats = statsService->getStatisticsForSite(m_site);
211  }
212 }
213 
214 bool Source::getHostname(const std::string &id, std::string &hostname) {
215  size_t pos = id.find(":");
216  hostname = id;
217  if ((pos != std::string::npos) && (pos > 0)) {
218  hostname = id.substr(0, pos);
219  }
220 
221  bool retval = true;
222  if (!hostname.empty() && ((hostname[0] == '[') || isdigit(hostname[0]))) {
223  retval = false;
224  struct addrinfo hints;
225  memset(&hints, 0, sizeof(struct addrinfo));
226  hints.ai_family = AF_UNSPEC;
227  struct addrinfo *result;
228  if (!getaddrinfo(hostname.c_str(), nullptr, &hints, &result)) {
229  std::vector<char> host;
230  host.reserve(256);
231  if (!getnameinfo(result->ai_addr, result->ai_addrlen, &host[0], 255, nullptr, 0, NI_NAMEREQD)) {
232  hostname = &host[0];
233  retval = true;
234  }
235  freeaddrinfo(result);
236  }
237  }
238  return retval;
239 }
240 
242  getHostname(host, domain);
243  size_t pos = domain.find(".");
244  if (pos != std::string::npos && (pos < domain.size())) {
245  domain = domain.substr(pos + 1);
246  }
247 
248  return !domain.empty();
249 }
250 
251 bool Source::isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList) {
252  // WORKAROUND: On open-file recovery in the Xrootd client, it'll carry around the
253  // dCache opaque information to other sites, causing isDCachePool to erroneously return
254  // true. We are working with the upstream developers to solve this.
255  //
256  // For now, we see if the previous server also looks like a dCache pool - something that
257  // wouldn't happen at a real site, as the previous server should look like a dCache door.
258  std::string lastUrl;
259  file.GetProperty("LastURL", lastUrl);
260  if (!lastUrl.empty()) {
261  bool result = isDCachePool(lastUrl);
262  if (result && hostList && (hostList->size() > 1)) {
263  if (isDCachePool((*hostList)[hostList->size() - 2].url.GetURL())) {
264  return false;
265  }
266  return true;
267  }
268  return result;
269  }
270  return false;
271 }
272 
273 bool Source::isDCachePool(const std::string &lastUrl) {
274  XrdCl::URL url(lastUrl);
275  XrdCl::URL::ParamsMap map = url.GetParams();
276  // dCache pools always utilize this opaque identifier.
277  if (map.find("org.dcache.uuid") != map.end()) {
278  return true;
279  }
280  return false;
281 }
282 
283 void Source::determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude) {
284  // Detect a dCache pool and, if we are in the federation context, give a custom
285  // exclude parameter.
286  // We assume this is a federation context if there's at least a regional, dCache door,
287  // and dCache pool server (so, more than 2 servers!).
288 
289  exclude = "";
290  if (hostList && (hostList->size() > 3) && isDCachePool(file, hostList)) {
291  const XrdCl::HostInfo &info = (*hostList)[hostList->size() - 3];
292  exclude = info.url.GetHostName();
293  std::string lastUrl;
294  file.GetProperty("LastURL", lastUrl);
295  edm::LogVerbatim("XrdAdaptorInternal") << "Changing exclude list for URL " << lastUrl << " to " << exclude;
296  }
297 }
298 
300  std::string lastUrl;
301  fh.GetProperty("LastURL", lastUrl);
302  if (lastUrl.empty() || isDCachePool(lastUrl)) {
304  if (!fh.GetProperty("DataServer", server)) {
305  id = "(unknown)";
306  } else {
307  id = server;
308  }
309  if (lastUrl.empty()) {
310  edm::LogWarning("XrdFileWarning") << "Unable to determine the URL associated with server " << id;
311  }
312  site = "Unknown";
313  if (!server.empty()) {
314  getDomain(server, site);
315  }
316  return false;
317  }
318  return getXrootdSiteFromURL(lastUrl, site);
319 }
320 
322  const std::string attr = "sitename";
323  XrdCl::Buffer *response = nullptr;
324  XrdCl::Buffer arg(attr.size());
325  arg.FromString(attr);
326 
327  XrdCl::FileSystem fs(url);
328  std::string rsite;
329  XrdCl::XRootDStatus st = QueryAttrHandler::query(fs, "sitename", std::chrono::seconds(1), rsite);
330  if (!st.IsOK()) {
331  XrdCl::URL xurl(url);
332  getDomain(xurl.GetHostName(), site);
333  delete response;
334  return false;
335  }
336  if (!rsite.empty() && (rsite[rsite.size() - 1] == '\n')) {
337  rsite = rsite.substr(0, rsite.size() - 1);
338  }
339  if (rsite == "sitename") {
340  XrdCl::URL xurl(url);
341  getDomain(xurl.GetHostName(), site);
342  return false;
343  }
344  site = rsite;
345  return true;
346 }
347 
349  std::string site;
350  bool goodSitename = getXrootdSite(*m_fh, site);
351  if (!goodSitename) {
352  edm::LogInfo("XrdAdaptorInternal") << "Xrootd server at " << m_id
353  << " did not provide a sitename. Monitoring may be incomplete.";
354  } else {
355  m_site = site;
356  m_prettyid = m_id + " (site " + m_site + ")";
357  }
358  edm::LogInfo("XrdAdaptorInternal") << "Reading from new server " << m_id << " at site " << m_site;
359 }
360 
362 
363 std::shared_ptr<XrdCl::File> Source::getFileHandle() { return fh(); }
364 
365 static void validateList(const XrdCl::ChunkList &cl) {
366  off_t last_offset = -1;
367  for (const auto &ci : cl) {
368  assert(static_cast<off_t>(ci.offset) > last_offset);
369  last_offset = ci.offset;
370  assert(ci.length <= XRD_CL_MAX_CHUNK);
371  assert(ci.offset < 0x1ffffffffff);
372  assert(ci.offset > 0);
373  }
374  assert(cl.size() <= 1024);
375 }
376 
377 void Source::handle(std::shared_ptr<ClientRequest> c) {
378  edm::LogVerbatim("XrdAdaptorInternal") << "Reading from " << ID() << ", quality " << m_qm->get() << std::endl;
379  c->m_source = shared_from_this();
380  c->m_self_reference = c;
381  m_qm->startWatch(c->m_qmw);
382  if (m_stats) {
383  std::shared_ptr<XrdReadStatistics> readStats = XrdSiteStatistics::startRead(stats(), c);
384  c->setStatistics(readStats);
385  }
386 #ifdef XRD_FAKE_SLOW
387  if (m_slow)
388  std::this_thread::sleep_for(std::chrono::milliseconds(XRD_DELAY));
389 #endif
390 
391  XrdCl::XRootDStatus status;
392  if (c->m_into) {
393  // See notes in ClientRequest definition to understand this voodoo.
394  status = m_fh->Read(c->m_off, c->m_size, c->m_into, c.get());
395  } else {
396  XrdCl::ChunkList cl;
397  cl.reserve(c->m_iolist->size());
398  for (const auto &it : *c->m_iolist) {
399  cl.emplace_back(it.offset(), it.size(), it.data());
400  }
401  validateList(cl);
402  status = m_fh->VectorRead(cl, nullptr, c.get());
403  }
404 
405  if (!status.IsOK()) {
407  ex << "XrdFile::Read or XrdFile::VectorRead failed with error: '" << status.ToStr() << "' (errNo = " << status.errNo
408  << ")";
409  ex.addContext("Calling Source::handle");
410  throw ex;
411  }
412 }
XrdAdaptor::XrdSiteStatisticsInformation
Definition: XrdStatistics.h:55
XrdAdaptor::XrdSiteStatisticsInformation::getInstance
static XrdSiteStatisticsInformation * getInstance()
Definition: XrdStatistics.cc:86
XrdAdaptor::Source::m_id
std::string m_id
Definition: XrdSource.h:76
g_delayCount
std::atomic< int > g_delayCount
Definition: XrdSource.cc:30
XrdAdaptor::Source::stats
std::shared_ptr< XrdSiteStatistics const > stats() const
Definition: XrdSource.h:72
QueryAttrHandler::QueryAttrHandler
QueryAttrHandler()
Definition: XrdSource.cc:119
XrdAdaptor::Source::determineHostExcludeString
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:283
DelayedClose::DelayedClose
DelayedClose(std::shared_ptr< XrdCl::File > fh, const std::string &id, const std::string &site)
Definition: XrdSource.cc:41
relmon_authenticated_wget.url
url
Definition: relmon_authenticated_wget.py:22
MessageLogger.h
XrdStatistics.h
XrdAdaptor::Source::m_site
std::string m_site
Definition: XrdSource.h:78
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
QueryAttrHandler::QueryAttrState::m_response
std::unique_ptr< XrdCl::Buffer > m_response
Definition: XrdSource.cc:171
XrdAdaptor::Source::m_qm
edm::propagate_const< std::unique_ptr< QualityMetricSource > > m_qm
Definition: XrdSource.h:82
mps_update.status
status
Definition: mps_update.py:69
QueryAttrHandler::QueryAttrState
Definition: XrdSource.cc:163
XrdAdaptor::Source::fh
std::shared_ptr< XrdCl::File const > fh() const
Definition: XrdSource.h:70
pos
Definition: PixelAliasList.h:18
edm::LogInfo
Definition: MessageLogger.h:254
XrdSource.h
validateList
static void validateList(const XrdCl::ChunkList &cl)
Definition: XrdSource.cc:365
cms::cuda::assert
assert(be >=bs)
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:152
XrdAdaptor::Source::m_stats
edm::propagate_const< std::shared_ptr< XrdSiteStatistics > > m_stats
Definition: XrdSource.h:83
XrdAdaptor::Source::handle
void handle(std::shared_ptr< ClientRequest >)
Definition: XrdSource.cc:377
edm::Exception
Definition: EDMException.h:77
query.host
host
Definition: query.py:115
EDMException.h
GetRecoTauVFromDQM_MC_cff.cl
cl
Definition: GetRecoTauVFromDQM_MC_cff.py:38
fileCollector.now
now
Definition: fileCollector.py:207
edm::propagate_const
Definition: propagate_const.h:32
XrdAdaptor::Source::m_fh
edm::propagate_const< std::shared_ptr< XrdCl::File > > m_fh
Definition: XrdSource.h:80
XrdAdaptor::XrdSiteStatistics::startRead
static std::shared_ptr< XrdReadStatistics > startRead(std::shared_ptr< XrdSiteStatistics > parent, std::shared_ptr< ClientRequest > req)
Definition: XrdStatistics.cc:110
seconds
double seconds()
QueryAttrHandler::query
static XrdCl::XRootDStatus query(XrdCl::FileSystem &fs, const std::string &attr, std::chrono::milliseconds timeout, std::string &result)
Definition: XrdSource.cc:85
XrdAdaptor::Source::getDomain
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:241
mutex
static boost::mutex mutex
Definition: Proxy.cc:9
XRD_CL_MAX_CHUNK
#define XRD_CL_MAX_CHUNK
Definition: XrdSource.cc:22
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::LogWarning
Definition: MessageLogger.h:141
DelayedClose::m_fh
edm::propagate_const< std::shared_ptr< XrdCl::File > > m_fh
Definition: XrdSource.cc:67
XrdAdaptor::Source::isDCachePool
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:251
XrdRequest.h
DelayedClose::m_id
std::string m_id
Definition: XrdSource.cc:68
contentValuesFiles.server
server
Definition: contentValuesFiles.py:37
XrdAdaptor::Source::getXrootdSiteFromURL
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:321
FrontierConditions_GlobalTag_cff.file
file
Definition: FrontierConditions_GlobalTag_cff.py:13
edm::LogVerbatim
Definition: MessageLogger.h:297
QueryAttrHandler::HandleResponse
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
Definition: XrdSource.cc:121
HltBtagPostValidation_cff.c
c
Definition: HltBtagPostValidation_cff.py:31
QueryAttrHandler::QueryAttrState::m_condvar
std::condition_variable m_condvar
Definition: XrdSource.cc:167
timeout
Definition: timeout.py:1
XrdAdaptor::Source::m_prettyid
std::string m_prettyid
Definition: XrdSource.h:77
DelayedClose::m_site
std::string m_site
Definition: XrdSource.cc:69
QualityMetric.h
cms::cuda::device::unique_ptr
std::unique_ptr< T, impl::DeviceDeleter > unique_ptr
Definition: device_unique_ptr.h:33
VtxSmearedBeamProfile_cfi.File
File
Definition: VtxSmearedBeamProfile_cfi.py:30
eostools.move
def move(src, dest)
Definition: eostools.py:511
XrdAdaptor::QualityMetricFactory::get
static std::unique_ptr< QualityMetricSource > get(timespec now, const std::string &id)
Definition: QualityMetric.cc:134
std
Definition: JetResolutionObject.h:76
triggerObjects_cff.id
id
Definition: triggerObjects_cff.py:31
XrdAdaptor::Source::getXrootdSite
static bool getXrootdSite(XrdCl::File &file, std::string &site)
Definition: XrdSource.cc:299
HiBiasedCentrality_cfi.function
function
Definition: HiBiasedCentrality_cfi.py:4
XrdAdaptor::Source::getFileHandle
std::shared_ptr< XrdCl::File > getFileHandle()
Definition: XrdSource.cc:363
XrdAdaptor::Source::Source
Source(timespec now, std::unique_ptr< XrdCl::File > fileHandle, const std::string &exclude)
Definition: XrdSource.cc:176
funct::void
TEMPL(T2) struct Divides void
Definition: Factorize.h:29
DelayedClose
Definition: XrdSource.cc:39
funct::arg
A arg
Definition: Factorize.h:36
hcal_runs.URL
URL
Definition: hcal_runs.py:4
XrdAdaptor::Source::setXrootdSite
void setXrootdSite()
Definition: XrdSource.cc:348
mps_fire.result
result
Definition: mps_fire.py:303
genParticles_cff.map
map
Definition: genParticles_cff.py:11
XrdAdaptor::Source::ID
const std::string & ID() const
Definition: XrdSource.h:37
QueryAttrHandler::m_state
std::weak_ptr< QueryAttrState > m_state
Definition: XrdSource.cc:173
edm::errors::FileReadError
Definition: EDMException.h:50
DelayedClose::HandleResponseWithHosts
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
Definition: XrdSource.cc:52
QueryAttrHandler
Definition: XrdSource.cc:77
XrdAdaptor::Source::getHostname
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:214
XrdAdaptor
Definition: QualityMetric.h:15
QueryAttrHandler::QueryAttrState::m_status
std::unique_ptr< XrdCl::XRootDStatus > m_status
Definition: XrdSource.cc:170
helper.Config
Config
Definition: helper.py:10
QueryAttrHandler::QueryAttrState::m_mutex
std::mutex m_mutex
Definition: XrdSource.cc:166
XrdAdaptor::Source::~Source
~Source()
Definition: XrdSource.cc:361