3 #define _GLIBCXX_USE_NANOSLEEP 11 #include "XrdCl/XrdClFile.hh" 21 #define MAX_REQUEST 256*1024 22 #define XRD_CL_MAX_CHUNK 512*1024 26 #define XRD_DELAY 1000 27 #define XRD_SLOW_RATE 2 39 class DelayedClose : boost::noncopyable,
public XrdCl::ResponseHandler
48 if (m_fh && m_fh->IsOpen())
50 if (!m_fh->Close(
this).IsOK())
63 if (status && !status->IsOK())
66 edm::LogWarning(
"XrdFileWarning") <<
"Source delayed close failed with error '" << status->ToStr()
67 <<
"' (errno=" << status->errNo <<
", code=" << status->code <<
", server=" << m_id <<
", site=" << m_site <<
")";
90 friend std::unique_ptr<QueryAttrHandler> std::make_unique<QueryAttrHandler>();
101 auto handler = std::make_unique<QueryAttrHandler>();
102 auto l_state = std::make_shared<QueryAttrState>();
103 handler->m_state = l_state;
104 XrdCl::Buffer
arg(attr.size());
105 arg.FromString(attr);
107 XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Config,
arg, handler.get());
116 std::unique_lock<std::mutex> guard(l_state->m_mutex);
118 l_state->m_condvar.wait_for(guard, timeout, [&]{
return l_state->m_status.get();});
120 if (l_state->m_status)
122 if (l_state->m_status->IsOK())
124 result = l_state->m_response->ToString();
126 return *(l_state->m_status);
130 return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errSocketTimeout, 1,
"Timeout when waiting for query callback.");
143 std::unique_ptr<XrdCl::AnyObject> response_mgr;
144 response_mgr.reset(response);
147 auto l_state = m_state.lock();
149 if (!l_state) {
return;}
152 std::unique_ptr<char, std::function<void(char*)>> notify_guard(
nullptr, [&](
char *) {l_state->m_condvar.notify_all();});
156 std::unique_ptr<char, std::function<void(char*)>> exit_guard(
nullptr, [&](
char *) {
if (!l_state->m_status) l_state->m_status.reset(
new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInternal));});
157 if (!status) {
return;}
160 if (!response) {
return;}
161 XrdCl::Buffer *buf_ptr;
162 response->Get(buf_ptr);
164 response->Set(static_cast<int *>(
nullptr));
165 l_state->m_response.reset(buf_ptr);
167 l_state->m_status.reset(status);
191 : m_lastDowngrade({0, 0}),
204 if (!
m_fh->GetProperty(
"DataServer",
m_id))
207 <<
"Source::Source() failed to determine data server name.'";
229 size_t pos =
id.find(
":");
231 if ((pos != std::string::npos) && (pos > 0)) {hostname =
id.substr(0, pos);}
234 if (hostname.size() && ((hostname[0] ==
'[') || isdigit(hostname[0])))
237 struct addrinfo hints; memset(&hints, 0,
sizeof(
struct addrinfo));
238 hints.ai_family = AF_UNSPEC;
240 if (!getaddrinfo(hostname.c_str(),
NULL, &hints, &
result))
242 std::vector<char>
host; host.reserve(256);
243 if (!getnameinfo(result->ai_addr, result->ai_addrlen, &host[0], 255,
NULL, 0, NI_NAMEREQD))
248 freeaddrinfo(result);
258 size_t pos = domain.find(
".");
259 if (pos != std::string::npos && (pos < domain.size())) {domain = domain.substr(pos+1);}
261 return domain.size();
275 file.GetProperty(
"LastURL", lastUrl);
279 if (result && hostList && (hostList->size() > 1))
281 if (
isDCachePool((*hostList)[hostList->size()-2].url.GetURL()))
295 XrdCl::URL
url(lastUrl);
296 XrdCl::URL::ParamsMap
map = url.GetParams();
298 if (map.find(
"org.dcache.uuid") != map.end())
315 if (hostList && (hostList->size() > 3) &&
isDCachePool(file, hostList))
317 const XrdCl::HostInfo &
info = (*hostList)[hostList->size()-3];
318 exclude = info.url.GetHostName();
319 std::string lastUrl; file.GetProperty(
"LastURL", lastUrl);
320 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Changing exclude list for URL " << lastUrl <<
" to " << exclude;
329 fh.GetProperty(
"LastURL", lastUrl);
333 if (!fh.GetProperty(
"DataServer", server)) {
id =
"(unknown)";}
335 if (!lastUrl.size()) {
edm::LogWarning(
"XrdFileWarning") <<
"Unable to determine the URL associated with server " <<
id;}
337 if (server.size()) {
getDomain(server, site);}
347 XrdCl::Buffer *response = 0;
348 XrdCl::Buffer
arg( attr.size() );
349 arg.FromString( attr );
351 XrdCl::FileSystem fs(url);
356 XrdCl::URL xurl(url);
361 if (!rsite.empty() && (rsite[rsite.size()-1] ==
'\n'))
363 rsite = rsite.substr(0, rsite.size()-1);
365 if (rsite ==
"sitename")
367 XrdCl::URL xurl(url);
383 <<
"Xrootd server at " <<
m_id <<
" did not provide a sitename. Monitoring may be incomplete.";
398 std::shared_ptr<XrdCl::File>
407 off_t last_offset = -1;
408 for (
const auto & ci : cl)
410 assert(static_cast<off_t>(ci.offset) > last_offset);
411 last_offset = ci.offset;
413 assert(ci.offset < 0x1ffffffffff);
414 assert(ci.offset > 0);
416 assert(cl.size() <= 1024);
422 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Reading from " <<
ID() <<
", quality " <<
m_qm->get() << std::endl;
423 c->m_source = shared_from_this();
424 c->m_self_reference =
c;
425 m_qm->startWatch(c->m_qmw);
429 c->setStatistics(readStats);
432 if (m_slow) std::this_thread::sleep_for(std::chrono::milliseconds(XRD_DELAY));
435 XrdCl::XRootDStatus
status;
439 status =
m_fh->Read(c->m_off, c->m_size, c->m_into, c.get());
444 cl.reserve(c->m_iolist->size());
445 for (
const auto & it : *c->m_iolist)
447 cl.emplace_back(it.offset(), it.size(), it.data());
450 status =
m_fh->VectorRead(cl,
nullptr, c.get());
456 ex <<
"XrdFile::Read or XrdFile::VectorRead failed with error: '" 457 << status.ToStr() <<
"' (errNo = " << status.errNo <<
")";
static bool getXrootdSite(XrdCl::File &file, std::string &site)
std::atomic< int > g_delayCount
static boost::mutex mutex
std::unique_ptr< XrdCl::Buffer > m_response
edm::propagate_const< std::shared_ptr< XrdSiteStatistics > > m_stats
static void validateList(const XrdCl::ChunkList &cl)
static XrdCl::XRootDStatus query(XrdCl::FileSystem &fs, const std::string &attr, std::chrono::milliseconds timeout, std::string &result)
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
DelayedClose(std::shared_ptr< XrdCl::File > fh, const std::string &id, const std::string &site)
void handle(std::shared_ptr< ClientRequest >)
std::shared_ptr< XrdSiteStatistics const > stats() const
edm::propagate_const< std::shared_ptr< XrdCl::File > > m_fh
static std::unique_ptr< QualityMetricSource > get(timespec now, const std::string &id)
edm::propagate_const< std::shared_ptr< XrdCl::File > > m_fh
static std::shared_ptr< XrdReadStatistics > startRead(std::shared_ptr< XrdSiteStatistics > parent, std::shared_ptr< ClientRequest > req)
static bool getDomain(const std::string &host, std::string &domain)
virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Source(timespec now, std::unique_ptr< XrdCl::File > fileHandle, const std::string &exclude)
std::shared_ptr< XrdCl::File const > fh() const
element_type const * get() const
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void addContext(std::string const &context)
std::condition_variable m_condvar
std::shared_ptr< XrdCl::File > getFileHandle()
static bool getXrootdSiteFromURL(std::string url, std::string &site)
edm::propagate_const< std::unique_ptr< QualityMetricSource > > m_qm
std::weak_ptr< QueryAttrState > m_state
std::unique_ptr< XrdCl::XRootDStatus > m_status
static bool getHostname(const std::string &id, std::string &hostname)
const std::string & ID() const