3 #define _GLIBCXX_USE_NANOSLEEP
13 #include "XrdCl/XrdClFile.hh"
23 #define MAX_REQUEST 256 * 1024
24 #define XRD_CL_MAX_CHUNK 512 * 1024
28 #define XRD_DELAY 1000
29 #define XRD_SLOW_RATE 2
35 using namespace XrdAdaptor;
49 if (!
m_fh->Close(
this).IsOK()) {
59 XrdCl::HostList *hostList)
override {
60 if (status && !status->IsOK()) {
61 edm::LogWarning(
"XrdFileWarning") <<
"Source delayed close failed with error '" << status->ToStr()
62 <<
"' (errno=" << status->errNo <<
", code=" << status->code
63 <<
", server=" <<
m_id <<
", site=" << m_site <<
")";
83 friend std::unique_ptr<QueryAttrHandler> std::make_unique<QueryAttrHandler>();
95 std::chrono::milliseconds
timeout,
97 auto handler = std::make_unique<QueryAttrHandler>(
url);
98 auto l_state = std::make_shared<QueryAttrState>();
99 handler->m_state = l_state;
100 XrdCl::Buffer
arg(attr.size());
101 arg.FromString(attr);
111 std::unique_lock<std::mutex> guard(l_state->m_mutex);
113 l_state->m_condvar.wait_for(guard, timeout, [&] {
return l_state->m_status.get(); });
115 if (l_state->m_status) {
116 if (l_state->m_status->IsOK()) {
117 result = l_state->m_response->ToString();
119 return *(l_state->m_status);
121 return XrdCl::XRootDStatus(
122 XrdCl::stError, XrdCl::errSocketTimeout, 1,
"Timeout when waiting for query callback.");
129 std::unique_ptr<XrdCl::AnyObject> response_mgr;
130 response_mgr.reset(response);
133 auto l_state = m_state.lock();
140 std::unique_ptr<char, std::function<void(char *)>> notify_guard(
nullptr,
141 [&](
char *) { l_state->m_condvar.notify_all(); });
145 std::unique_ptr<char, std::function<void(char *)>> exit_guard(
nullptr, [&](
char *) {
146 if (!l_state->m_status)
147 l_state->m_status = std::make_unique<XrdCl::XRootDStatus>(XrdCl::stError, XrdCl::errInternal);
152 if (status->IsOK()) {
156 XrdCl::Buffer *buf_ptr;
157 response->Get(buf_ptr);
159 response->Set(static_cast<int *>(
nullptr));
160 l_state->m_response.reset(buf_ptr);
162 l_state->m_status.reset(status);
184 : m_lastDowngrade({0, 0}),
197 if (!
m_fh->GetProperty(
"DataServer",
m_id)) {
198 edm::LogWarning(
"XrdFileWarning") <<
"Source::Source() failed to determine data server name.'";
207 if (getDomain(
m_id, domain_id)) {
210 m_site =
"Unknown (" +
m_id +
")";
222 size_t pos =
id.find(
':');
224 if ((pos != std::string::npos) && (pos > 0)) {
225 hostname =
id.substr(0, pos);
229 if (!hostname.empty() && ((hostname[0] ==
'[') || isdigit(hostname[0]))) {
231 struct addrinfo hints;
232 memset(&hints, 0,
sizeof(
struct addrinfo));
233 hints.ai_family = AF_UNSPEC;
235 if (!getaddrinfo(hostname.c_str(),
nullptr, &hints, &
result)) {
236 std::vector<char>
host;
238 if (!getnameinfo(result->ai_addr, result->ai_addrlen, &host[0], 255,
nullptr, 0, NI_NAMEREQD)) {
242 freeaddrinfo(result);
250 size_t pos = domain.find(
'.');
251 if (pos != std::string::npos && (pos < domain.size())) {
252 domain = domain.substr(pos + 1);
255 return !domain.empty();
266 file.GetProperty(
"LastURL", lastUrl);
267 if (!lastUrl.empty()) {
269 if (result && hostList && (hostList->size() > 1)) {
270 if (
isDCachePool((*hostList)[hostList->size() - 2].url.GetURL())) {
282 XrdCl::URL::ParamsMap map = url.GetParams();
284 if (map.find(
"org.dcache.uuid") != map.end()) {
297 if (hostList && (hostList->size() > 3) &&
isDCachePool(file, hostList)) {
298 const XrdCl::HostInfo &
info = (*hostList)[hostList->size() - 3];
299 exclude = info.url.GetHostName();
301 file.GetProperty(
"LastURL", lastUrl);
302 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Changing exclude list for URL " << lastUrl <<
" to " << exclude;
308 fh.GetProperty(
"LastURL", lastUrl);
311 if (!fh.GetProperty(
"DataServer", server)) {
316 if (lastUrl.empty()) {
317 edm::LogWarning(
"XrdFileWarning") <<
"Unable to determine the URL associated with server " <<
id;
320 if (!server.empty()) {
331 XrdCl::Buffer
arg(attr.size());
332 arg.FromString(attr);
342 if (!rsite.empty() && (rsite[rsite.size() - 1] ==
'\n')) {
343 rsite = rsite.substr(0, rsite.size() - 1);
345 if (rsite ==
"sitename") {
359 <<
" did not provide a sitename. Monitoring may be incomplete.";
372 off_t last_offset = -1;
373 for (
const auto &ci : cl) {
374 assert(static_cast<off_t>(ci.offset) > last_offset);
375 last_offset = ci.offset;
377 assert(ci.offset < 0x1ffffffffff);
380 assert(cl.size() <= 1024);
384 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Reading from " <<
ID() <<
", quality " <<
m_qm->get() << std::endl;
385 c->m_source = shared_from_this();
386 c->m_self_reference =
c;
387 m_qm->startWatch(c->m_qmw);
390 c->setStatistics(readStats);
394 std::this_thread::sleep_for(std::chrono::milliseconds(XRD_DELAY));
397 XrdCl::XRootDStatus
status;
400 status =
m_fh->Read(c->m_off, c->m_size, c->m_into, c.get());
403 cl.reserve(c->m_iolist->size());
404 for (
const auto &it : *c->m_iolist) {
405 cl.emplace_back(it.offset(), it.size(), it.data());
408 status =
m_fh->VectorRead(cl,
nullptr, c.get());
411 if (!status.IsOK()) {
413 ex <<
"XrdFile::Read or XrdFile::VectorRead failed with error: '" << status.ToStr() <<
"' (errNo = " << status.errNo
Log< level::Info, true > LogVerbatim
static bool getXrootdSite(XrdCl::File &file, std::string &site)
std::atomic< int > g_delayCount
const edm::EventSetup & c
uint16_t *__restrict__ id
std::unique_ptr< XrdCl::Buffer > m_response
edm::propagate_const< std::shared_ptr< XrdSiteStatistics > > m_stats
static void validateList(const XrdCl::ChunkList &cl)
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
QueryAttrHandler(const std::string &url)
DelayedClose(std::shared_ptr< XrdCl::File > fh, const std::string &id, const std::string &site)
void handle(std::shared_ptr< ClientRequest >)
std::shared_ptr< XrdSiteStatistics const > stats() const
edm::propagate_const< std::shared_ptr< XrdCl::File > > m_fh
static std::unique_ptr< QualityMetricSource > get(timespec now, const std::string &id)
edm::propagate_const< std::shared_ptr< XrdCl::File > > m_fh
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
static std::shared_ptr< XrdReadStatistics > startRead(std::shared_ptr< XrdSiteStatistics > parent, std::shared_ptr< ClientRequest > req)
static bool getDomain(const std::string &host, std::string &domain)
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
static XrdCl::XRootDStatus query(const std::string &url, const std::string &attr, std::chrono::milliseconds timeout, std::string &result)
XrdSiteStatisticsInformation * statsService
std::shared_ptr< XrdCl::File const > fh() const
Log< level::Info, false > LogInfo
void addContext(std::string const &context)
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
std::condition_variable m_condvar
std::shared_ptr< XrdCl::File > getFileHandle()
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Log< level::Warning, false > LogWarning
edm::propagate_const< std::unique_ptr< QualityMetricSource > > m_qm
std::weak_ptr< QueryAttrState > m_state
std::unique_ptr< XrdCl::XRootDStatus > m_status
static bool getHostname(const std::string &id, std::string &hostname)
const std::string & ID() const
Source(const Source &)=delete