3 #define _GLIBCXX_USE_NANOSLEEP
10 #include "XrdCl/XrdClFile.hh"
19 #define MAX_REQUEST 256*1024
20 #define XRD_CL_MAX_CHUNK 512*1024
24 #define XRD_DELAY 1000
25 #define XRD_SLOW_RATE 2
31 using namespace XrdAdaptor;
37 class DelayedClose : boost::noncopyable,
public XrdCl::ResponseHandler
42 :
m_fh(std::move(fh)),
48 if (!
m_fh->Close(
this).IsOK())
64 edm::LogWarning(
"XrdFileWarning") <<
"Source delayed close failed with error '" << status->ToStr()
65 <<
"' (errno=" << status->errNo <<
", code=" << status->code <<
", server=" <<
m_id <<
", site=" << m_site <<
")";
74 std::shared_ptr<XrdCl::File>
m_fh;
80 : m_lastDowngrade({0, 0}),
94 if (!
m_fh->GetProperty(
"DataServer",
m_id))
97 <<
"Source::Source() failed to determine data server name.'";
101 m_prettyid =
m_id +
" (unknown site)";
104 else {m_site =
"Unknown (" +
m_id +
")";}
118 size_t pos =
id.find(
":");
120 if ((pos != std::string::npos) && (pos > 0)) {hostname =
id.substr(0, pos);}
123 if (hostname.size() && ((hostname[0] ==
'[') || isdigit(hostname[0])))
126 struct addrinfo hints; memset(&hints, 0,
sizeof(
struct addrinfo));
127 hints.ai_family = AF_UNSPEC;
129 if (!getaddrinfo(hostname.c_str(),
NULL, &hints, &
result))
131 std::vector<char>
host; host.reserve(256);
132 if (!getnameinfo(result->ai_addr, result->ai_addrlen, &host[0], 255,
NULL, 0, NI_NAMEREQD))
137 freeaddrinfo(result);
147 size_t pos = domain.find(
".");
148 if (pos != std::string::npos && (pos < domain.size())) {domain = domain.substr(pos+1);}
150 return domain.size();
164 file.GetProperty(
"LastURL", lastUrl);
168 if (result && hostList && (hostList->size() > 1))
170 if (
isDCachePool((*hostList)[hostList->size()-2].url.GetURL()))
184 XrdCl::URL
url(lastUrl);
185 XrdCl::URL::ParamsMap
map = url.GetParams();
187 if (map.find(
"org.dcache.uuid") != map.end())
204 if (hostList && (hostList->size() > 3) &&
isDCachePool(file, hostList))
206 const XrdCl::HostInfo &
info = (*hostList)[hostList->size()-3];
207 exclude = info.url.GetHostName();
208 std::string lastUrl; file.GetProperty(
"LastURL", lastUrl);
209 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Changing exclude list for URL " << lastUrl <<
" to " << exclude;
218 fh.GetProperty(
"LastURL", lastUrl);
222 if (!fh.GetProperty(
"DataServer", server)) {
id =
"(unknown)";}
224 if (!lastUrl.size()) {
edm::LogWarning(
"XrdFileWarning") <<
"Unable to determine the URL associated with server " << id;}
226 if (server.size()) {
getDomain(server, site);}
236 XrdCl::Buffer *response = 0;
237 XrdCl::Buffer
arg( attr.size() );
238 arg.FromString( attr );
240 XrdCl::FileSystem fs(url);
244 XrdCl::URL xurl(url);
251 if (rsite.size() && (rsite[rsite.size()-1] ==
'\n'))
253 rsite = rsite.substr(0, rsite.size()-1);
255 if (rsite ==
"sitename")
257 XrdCl::URL xurl(url);
273 <<
"Xrootd server at " <<
m_id <<
" did not provide a sitename. Monitoring may be incomplete.";
288 std::shared_ptr<XrdCl::File>
297 off_t last_offset = -1;
298 for (
const auto & ci : cl)
300 assert(static_cast<off_t>(ci.offset) > last_offset);
301 last_offset = ci.offset;
303 assert(ci.offset < 0x1ffffffffff);
306 assert(cl.size() <= 1024);
312 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Reading from " <<
ID() <<
", quality " <<
m_qm->get() << std::endl;
313 c->m_source = shared_from_this();
314 c->m_self_reference =
c;
315 m_qm->startWatch(c->m_qmw);
319 c->setStatistics(readStats);
322 if (m_slow) std::this_thread::sleep_for(std::chrono::milliseconds(XRD_DELAY));
327 m_fh->Read(c->m_off, c->m_size, c->m_into, c.get());
332 cl.reserve(c->m_iolist->size());
333 for (
const auto & it : *c->m_iolist)
335 cl.emplace_back(it.offset(), it.size(), it.data());
338 m_fh->VectorRead(cl,
nullptr, c.get());
std::shared_ptr< XrdCl::File > m_fh
static bool getXrootdSite(XrdCl::File &file, std::string &site)
static void validateList(const XrdCl::ChunkList &cl)
m_qm(QualityMetricFactory::get(now, m_id))
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
DelayedClose(std::shared_ptr< XrdCl::File > fh, const std::string &id, const std::string &site)
void handle(std::shared_ptr< ClientRequest >)
std::shared_ptr< XrdCl::File > m_fh
static std::unique_ptr< QualityMetricSource > get(timespec now, const std::string &id)
static std::shared_ptr< XrdReadStatistics > startRead(std::shared_ptr< XrdSiteStatistics > parent, std::shared_ptr< ClientRequest > req)
static bool getDomain(const std::string &host, std::string &domain)
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Source(timespec now, std::unique_ptr< XrdCl::File > fileHandle, const std::string &exclude)
XrdSiteStatisticsInformation * statsService
std::unique_ptr< QualityMetricSource > m_qm
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
std::shared_ptr< XrdSiteStatistics > m_stats
std::shared_ptr< XrdCl::File > getFileHandle()
static bool getXrootdSiteFromURL(std::string url, std::string &site)
static bool getHostname(const std::string &id, std::string &hostname)
const std::string & ID() const