3 #define _GLIBCXX_USE_NANOSLEEP
11 #include "XrdCl/XrdClFile.hh"
21 #define MAX_REQUEST 256 * 1024
22 #define XRD_CL_MAX_CHUNK 512 * 1024
26 #define XRD_DELAY 1000
27 #define XRD_SLOW_RATE 2
39 class DelayedClose : boost::noncopyable,
public XrdCl::ResponseHandler {
42 : m_fh(
std::
move(fh)), m_id(
id), m_site(site) {
43 if (m_fh && m_fh->IsOpen()) {
44 if (!m_fh->Close(
this).IsOK()) {
53 XrdCl::AnyObject *response,
54 XrdCl::HostList *hostList)
override {
57 <<
"' (errno=" <<
status->errNo <<
", code=" <<
status->code
58 <<
", server=" << m_id <<
", site=" << m_site <<
")";
78 friend std::unique_ptr<QueryAttrHandler> std::make_unique<QueryAttrHandler>();
85 static XrdCl::XRootDStatus
query(XrdCl::FileSystem &fs,
87 std::chrono::milliseconds
timeout,
89 auto handler = std::make_unique<QueryAttrHandler>();
90 auto l_state = std::make_shared<QueryAttrState>();
91 handler->m_state = l_state;
92 XrdCl::Buffer
arg(attr.size());
103 std::unique_lock<std::mutex> guard(l_state->m_mutex);
105 l_state->m_condvar.wait_for(guard,
timeout, [&] {
return l_state->m_status.get(); });
107 if (l_state->m_status) {
108 if (l_state->m_status->IsOK()) {
109 result = l_state->m_response->ToString();
111 return *(l_state->m_status);
113 return XrdCl::XRootDStatus(
114 XrdCl::stError, XrdCl::errSocketTimeout, 1,
"Timeout when waiting for query callback.");
123 std::unique_ptr<XrdCl::AnyObject> response_mgr;
124 response_mgr.reset(response);
127 auto l_state = m_state.lock();
135 [&](
char *) { l_state->m_condvar.notify_all(); });
140 if (!l_state->m_status)
141 l_state->m_status.reset(
new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInternal));
150 XrdCl::Buffer *buf_ptr;
151 response->Get(buf_ptr);
153 response->Set(static_cast<int *>(
nullptr));
154 l_state->m_response.reset(buf_ptr);
156 l_state->m_status.reset(
status);
177 : m_lastDowngrade({0, 0}),
190 if (!m_fh->GetProperty(
"DataServer", m_id)) {
191 edm::LogWarning(
"XrdFileWarning") <<
"Source::Source() failed to determine data server name.'";
193 if (m_exclude.empty()) {
198 m_prettyid = m_id +
" (unknown site)";
200 if (getDomain(m_id, domain_id)) {
203 m_site =
"Unknown (" + m_id +
")";
210 m_stats = statsService->getStatisticsForSite(m_site);
215 size_t pos =
id.find(
":");
217 if ((
pos != std::string::npos) && (
pos > 0)) {
218 hostname =
id.substr(0,
pos);
222 if (!hostname.empty() && ((hostname[0] ==
'[') || isdigit(hostname[0]))) {
224 struct addrinfo hints;
225 memset(&hints, 0,
sizeof(
struct addrinfo));
226 hints.ai_family = AF_UNSPEC;
228 if (!getaddrinfo(hostname.c_str(),
nullptr, &hints, &
result)) {
229 std::vector<char>
host;
231 if (!getnameinfo(
result->ai_addr,
result->ai_addrlen, &
host[0], 255,
nullptr, 0, NI_NAMEREQD)) {
243 size_t pos = domain.find(
".");
244 if (
pos != std::string::npos && (
pos < domain.size())) {
245 domain = domain.substr(
pos + 1);
248 return !domain.empty();
259 file.GetProperty(
"LastURL", lastUrl);
260 if (!lastUrl.empty()) {
262 if (
result && hostList && (hostList->size() > 1)) {
263 if (
isDCachePool((*hostList)[hostList->size() - 2].url.GetURL())) {
275 XrdCl::URL::ParamsMap
map =
url.GetParams();
277 if (
map.find(
"org.dcache.uuid") !=
map.end()) {
291 const XrdCl::HostInfo &
info = (*hostList)[hostList->size() - 3];
292 exclude =
info.url.GetHostName();
294 file.GetProperty(
"LastURL", lastUrl);
295 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Changing exclude list for URL " << lastUrl <<
" to " << exclude;
301 fh.GetProperty(
"LastURL", lastUrl);
304 if (!
fh.GetProperty(
"DataServer",
server)) {
309 if (lastUrl.empty()) {
310 edm::LogWarning(
"XrdFileWarning") <<
"Unable to determine the URL associated with server " <<
id;
323 XrdCl::Buffer *response =
nullptr;
324 XrdCl::Buffer
arg(attr.size());
325 arg.FromString(attr);
327 XrdCl::FileSystem fs(
url);
336 if (!rsite.empty() && (rsite[rsite.size() - 1] ==
'\n')) {
337 rsite = rsite.substr(0, rsite.size() - 1);
339 if (rsite ==
"sitename") {
353 <<
" did not provide a sitename. Monitoring may be incomplete.";
366 off_t last_offset = -1;
367 for (
const auto &ci :
cl) {
368 assert(static_cast<off_t>(ci.offset) > last_offset);
369 last_offset = ci.offset;
371 assert(ci.offset < 0x1ffffffffff);
378 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Reading from " <<
ID() <<
", quality " <<
m_qm->get() << std::endl;
379 c->m_source = shared_from_this();
380 c->m_self_reference =
c;
381 m_qm->startWatch(
c->m_qmw);
384 c->setStatistics(readStats);
388 std::this_thread::sleep_for(std::chrono::milliseconds(XRD_DELAY));
391 XrdCl::XRootDStatus
status;
397 cl.reserve(
c->m_iolist->size());
398 for (
const auto &it : *
c->m_iolist) {
399 cl.emplace_back(it.offset(), it.size(), it.data());
407 ex <<
"XrdFile::Read or XrdFile::VectorRead failed with error: '" <<
status.ToStr() <<
"' (errNo = " <<
status.errNo