9 #include "XrdCl/XrdClFile.hh"
10 #include "XrdCl/XrdClDefaultEnv.hh"
11 #include "XrdCl/XrdClFileSystem.hh"
23 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
25 #define XRD_CL_MAX_CHUNK 512 * 1024
27 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5
29 #ifdef XRD_FAKE_OPEN_PROBE
30 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100
31 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20
33 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0
35 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10
36 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2 * 60
37 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
40 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000
43 #include <mach/clock.h>
44 #include <mach/mach.h>
45 #define GET_CLOCK_MONOTONIC(ts) \
47 clock_serv_t cclock; \
48 mach_timespec_t mts; \
49 host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
50 clock_get_time(cclock, &mts); \
51 mach_port_deallocate(mach_task_self(), cclock); \
52 ts.tv_sec = mts.tv_sec; \
53 ts.tv_nsec = mts.tv_nsec; \
56 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
62 long long diff = (
a.tv_sec -
b.tv_sec) * 1000;
63 diff += (
a.tv_nsec -
b.tv_nsec) / 1e6;
74 XrdCl::Buffer *
buffer =
nullptr;
76 response->Set(static_cast<int *>(
nullptr));
103 file.GetProperty(
"LastURL", lastUrl);
104 if (jobId && !lastUrl.empty()) {
106 XrdCl::FileSystem fs(
url);
107 if (!(fs.SendInfo(jobId, &
nullHandler, 30).IsOK())) {
109 <<
"Failed to send the monitoring information, monitoring ID is " << jobId <<
".";
111 edm::LogInfo(
"XrdAdaptorInternal") <<
"Set monitoring ID to " << jobId <<
".";
116 : m_serverToAdvertise(nullptr),
117 m_timeout(XRD_DEFAULT_TIMEOUT),
118 m_nextInitialSourceToggle(
false),
122 m_distribution(0, 100),
123 m_excluded_active_count(0) {}
128 XrdCl::Env *
env = XrdCl::DefaultEnv::GetEnv();
141 std::unique_ptr<XrdCl::File>
file;
143 bool validFile =
false;
144 const int retries = 5;
146 for (
int idx = 0;
idx < retries;
idx++) {
147 file = std::make_unique<XrdCl::File>();
150 m_name + (!opaque.empty() ? ((
m_name.find(
'?') ==
m_name.npos) ?
"?" :
"&") + opaque :
"");
151 SyncHostResponseHandler handler;
152 XrdCl::XRootDStatus openStatus =
file->Open(new_filename,
m_flags,
m_perms, &handler);
161 ex <<
"XrdCl::File::Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags <<
", permissions=0"
162 << std::oct <<
m_perms <<
std::dec <<
") => error '" << openStatus.ToStr() <<
"' (errno=" << openStatus.errNo
163 <<
", code=" << openStatus.code <<
")";
165 ex.
addAdditionalInfo(
"Remote server already encountered a fatal error; no redirections were performed.");
168 handler.WaitForResponse();
169 std::unique_ptr<XrdCl::XRootDStatus>
status = handler.GetStatus();
170 std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
180 ex <<
"XrdCl::File::Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags <<
", permissions=0"
182 <<
", code=" <<
status->code <<
")";
186 file->GetProperty(
"DataServer", dataServer);
187 file->GetProperty(
"LastURL", lastUrl);
188 if (!dataServer.empty()) {
191 if (!lastUrl.empty()) {
193 edm::LogWarning(
"XrdAdaptorInternal") <<
"Failed to open file at URL " << lastUrl <<
".";
197 ex <<
". No additional data servers were found.";
200 if (!dataServer.empty()) {
205 if (lastUrl == new_filename) {
206 edm::LogWarning(
"XrdAdaptorInternal") << lastUrl <<
", " << new_filename;
250 std::unique_ptr<std::string> hostname(hostname_ptr);
259 auto hostname = std::make_unique<std::string>(
id);
271 if (!iSources.empty()) {
272 siteA = iSources[0]->Site();
274 if (iSources.size() == 2) {
275 siteB = iSources[1]->Site();
278 if (!siteB.empty() && (siteB != siteA)) {
279 siteList = siteA +
", " + siteB;
288 auto siteList = formatSites(iNew);
289 if (!orig_site.empty() && (orig_site != siteList)) {
290 edm::LogWarning(
"XrdAdaptor") <<
"Data is served from " << siteList <<
" instead of original site " << orig_site;
292 auto oldSites = formatSites(iOld);
293 if (orig_site.empty() && (siteList != oldSites)) {
294 if (!oldSites.empty())
295 edm::LogWarning(
"XrdAdaptor") <<
"Data is now served from " << siteList <<
" instead of previous " << oldSites;
303 std::vector<std::shared_ptr<Source>> &inactiveSources) {
322 std::vector<std::shared_ptr<Source>> &inactiveSources)
const {
327 bool findNewSource =
false;
332 <<
"Removing " <<
activeSources[
a]->PrettyID() <<
" from active sources due to poor quality ("
335 findNewSource =
true;
343 return findNewSource;
349 std::vector<std::shared_ptr<Source>> &inactiveSources) {
350 bool findNewSource =
false;
352 findNewSource =
true;
355 <<
", source 1 quality " <<
activeSources[1]->getQuality() << std::endl;
361 std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
362 eligibleInactiveSources.reserve(inactiveSources.size());
363 for (
const auto &
source : inactiveSources) {
365 eligibleInactiveSources.push_back(
source);
368 auto bestInactiveSource =
369 std::min_element(eligibleInactiveSources.begin(),
370 eligibleInactiveSources.end(),
371 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
372 return s1->getQuality() <
s2->getQuality();
374 auto worstActiveSource = std::max_element(
activeSources.cbegin(),
376 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
377 return s1->getQuality() <
s2->getQuality();
379 if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
380 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Best inactive source: " << (*bestInactiveSource)->PrettyID()
381 <<
", quality " << (*bestInactiveSource)->getQuality();
383 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Worst active source: " << (*worstActiveSource)->PrettyID()
384 <<
", quality " << (*worstActiveSource)->getQuality();
387 if ((bestInactiveSource != eligibleInactiveSources.end()) &&
activeSources.size() == 1 &&
388 ((*bestInactiveSource)->getQuality() < 4 *
activeSources[0]->getQuality())) {
392 for (
auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
393 if (it->get() == bestInactiveSource->get()) {
394 inactiveSources.erase(it);
398 while ((bestInactiveSource != eligibleInactiveSources.end()) &&
399 (*worstActiveSource)->getQuality() >
402 <<
"Removing " << (*worstActiveSource)->PrettyID() <<
" from active sources due to quality ("
403 << (*worstActiveSource)->getQuality() <<
") and promoting " << (*bestInactiveSource)->PrettyID()
404 <<
" (quality: " << (*bestInactiveSource)->getQuality() <<
")" << std::endl;
405 (*worstActiveSource)->setLastDowngrade(
now);
406 for (
auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
407 if (it->get() == bestInactiveSource->get()) {
408 inactiveSources.erase(it);
411 inactiveSources.emplace_back(*worstActiveSource);
416 eligibleInactiveSources.clear();
417 for (
const auto &
source : inactiveSources)
419 eligibleInactiveSources.push_back(
source);
420 bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
421 eligibleInactiveSources.end(),
422 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
423 return s1->getQuality() <
s2->getQuality();
427 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
428 return s1->getQuality() <
s2->getQuality();
434 findNewSource =
true;
456 ex <<
"XrdAdaptor::RequestManager::getActiveFile(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
457 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
") => Source used after fatal exception.";
458 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
489 std::vector<std::string>
sources;
502 std::shared_ptr<Source>
source =
nullptr;
515 ex <<
"XrdAdaptor::RequestManager::handle read(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
516 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
") => Source used after fatal exception.";
517 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
532 std::vector<std::shared_ptr<Source>>
activeSources, inactiveSources;
540 std::shared_ptr<void *> guard(
nullptr, [
this, &
activeSources, &inactiveSources](
void *) {
551 return c_ptr->get_future();
555 std::stringstream
ss;
563 ss << it->ExcludeID().substr(0, it->ExcludeID().find(
':')) <<
",";
567 ss << it->ExcludeID().substr(0, it->ExcludeID().find(
':')) <<
",";
572 ss << it.substr(0, it.find(
':')) <<
",";
576 return tmp_str.substr(0, tmp_str.size() - 1);
582 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
584 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Successfully opened new source: " <<
source->PrettyID() << std::endl;
585 for (
const auto &
s : m_activeSources) {
588 <<
"Xrootd server returned excluded source " <<
source->PrettyID() <<
"; ignoring" << std::endl;
589 unsigned returned_count = ++m_excluded_active_count;
591 if (returned_count >= 3) {
597 for (
const auto &
s : m_inactiveSources) {
600 <<
"Xrootd server returned excluded inactive source " <<
source->PrettyID() <<
"; ignoring" << std::endl;
605 if (m_activeSources.size() < 2) {
606 auto oldSources = m_activeSources;
607 m_activeSources.push_back(
source);
608 reportSiteChange(oldSources, m_activeSources);
609 queueUpdateCurrentServer(
source->ID());
611 m_inactiveSources.push_back(
source);
614 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Got failure when trying to open a new source" << std::endl;
623 std::vector<std::shared_ptr<Source>>
activeSources, inactiveSources;
625 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
627 inactiveSources = m_inactiveSources;
630 std::shared_ptr<void *> guard(
nullptr, [
this, &
activeSources, &inactiveSources](
void *) {
631 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
633 m_inactiveSources =
std::move(inactiveSources);
636 updateCurrentServer();
645 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, iolist);
648 return c_ptr->get_future();
653 ex <<
"XrdAdaptor::RequestManager::handle readv(name='" << m_name <<
"', flags=0x" << std::hex << m_flags
654 <<
", permissions=0" << std::oct << m_perms <<
std::dec <<
") => Source used after fatal exception.";
655 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
661 auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
662 auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
665 checkSources(
now, req1->size() + req2->size(),
activeSources, inactiveSources);
668 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, iolist);
670 return c_ptr->get_future();
673 std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
674 std::future<IOSize> future1, future2;
675 if (!req1->empty()) {
678 future1 = c_ptr1->get_future();
680 if (!req2->empty()) {
683 future2 = c_ptr2->get_future();
685 if (!req1->empty() && !req2->empty()) {
686 std::future<IOSize>
task = std::async(
687 std::launch::deferred,
688 [](std::future<IOSize>
a, std::future<IOSize>
b) {
702 return b.get() +
a.get();
709 }
else if (!req1->empty()) {
711 }
else if (!req2->empty()) {
714 std::promise<IOSize>
p;
716 return p.get_future();
721 std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
724 if (c_status.code == XrdCl::errInvalidResponse) {
725 edm::LogWarning(
"XrdAdaptorInternal") <<
"Invalid response when reading from " << source_ptr->PrettyID();
727 ex <<
"XrdAdaptor::RequestManager::requestFailure readv(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
728 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
", old source=" << source_ptr->PrettyID()
729 <<
") => Invalid ReadV response from server";
730 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
734 edm::LogWarning(
"XrdAdaptorInternal") <<
"Request failure when reading from " << source_ptr->PrettyID();
753 std::shared_ptr<Source> new_source;
755 std::shared_future<std::shared_ptr<Source>> future =
m_open_handler->open();
766 ex <<
"XrdAdaptor::RequestManager::requestFailure Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
767 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
", old source=" << source_ptr->PrettyID()
768 <<
") => timeout when waiting for file open";
769 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
774 new_source = future.get();
776 ex.
addContext(
"Handling XrdAdaptor::RequestManager::requestFailure()");
786 ex <<
"XrdAdaptor::RequestManager::requestFailure Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
787 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
", old source=" << source_ptr->PrettyID()
788 <<
", new source=" << new_source->PrettyID() <<
") => Xrootd server returned an excluded source";
789 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
801 new_source->handle(c_ptr);
805 std::vector<IOPosBuffer> &
input,
806 std::vector<IOPosBuffer> &
output,
811 if (io.
size() > chunksize) {
819 consumed = chunksize;
823 consumed = chunksize;
826 chunksize -= consumed;
829 void *newdata = static_cast<char *>(io.
data()) + consumed;
833 }
else if (io.
size() == 0) {
837 chunksize -= io.
size();
844 std::vector<IOPosBuffer> &
input,
845 std::vector<IOPosBuffer> &
output,
850 if (io.
size() > chunksize) {
858 consumed = chunksize;
862 consumed = chunksize;
865 chunksize -= consumed;
868 void *newdata = static_cast<char *>(io.
data()) + consumed;
872 }
else if (io.
size() == 0) {
876 chunksize -= io.
size();
884 off_t last_offset = -1;
885 for (
const auto &it : req) {
887 assert(it.offset() > last_offset);
888 last_offset = it.offset();
890 assert(it.offset() < 0x1ffffffffff);
892 assert(req.size() <= 1024);
897 std::vector<IOPosBuffer> &req1,
898 std::vector<IOPosBuffer> &req2,
902 std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
903 req1.reserve(iolist.size() / 2 + 1);
904 req2.reserve(iolist.size() / 2 + 1);
913 static_cast<IOSize>(1024));
915 static_cast<IOSize>(1024));
918 for (
const auto &it : iolist)
919 size_orig += it.size();
921 while (tmp_iolist.size() - front > 0) {
930 ex <<
"XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name <<
"', flags=0x" << std::hex << m_flags
931 <<
", permissions=0" << std::oct << m_perms <<
std::dec
932 <<
") => Unable to split request between active servers. This is an unexpected internal error and should be "
933 "reported to CMSSW developers.";
934 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
936 std::stringstream
ss;
937 ss <<
"Original request size " << iolist.size() <<
"(" << size_orig <<
" bytes)";
939 std::stringstream ss2;
940 ss2 <<
"Quality source 1 " <<
q1 - 5 <<
", quality source 2: " <<
q2 - 5;
952 return left.
offset() < right.offset();
955 return left.
offset() < right.offset();
961 assert(size_orig == size1 + size2);
963 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Original request size " << iolist.size() <<
" (" << size_orig
964 <<
" bytes) split into requests size " << req1.size() <<
" (" << size1
965 <<
" bytes) and " << req2.size() <<
" (" << size2 <<
" bytes)" << std::endl;
976 XrdCl::HostList *hostList_ptr) {
978 std::shared_ptr<OpenHandler>
self = m_self;
985 this, [&](
OpenHandler *) { m_outstanding_open =
false; });
987 std::shared_ptr<Source>
source;
988 std::unique_ptr<XrdCl::XRootDStatus>
status(status_ptr);
989 std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
991 auto manager = m_manager.lock();
999 std::unique_ptr<XrdCl::File> releaseFile;
1001 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1012 m_promise.set_value(
source);
1016 ex <<
"XrdCl::File::Open(name='" << manager->m_name <<
"', flags=0x" << std::hex << manager->m_flags
1017 <<
", permissions=0" << std::oct << manager->m_perms <<
std::dec <<
") => error '" <<
status->ToStr()
1018 <<
"' (errno=" <<
status->errNo <<
", code=" <<
status->code <<
")";
1019 ex.
addContext(
"In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1020 manager->addConnections(ex);
1022 m_promise.set_exception(std::make_exception_ptr(ex));
1029 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1031 if (!m_file.get()) {
1032 return "(no open in progress)";
1035 m_file->GetProperty(
"DataServer", dataServer);
1036 if (dataServer.empty()) {
1037 return "(unknown source)";
1043 auto manager_ptr = m_manager.lock();
1046 ex <<
"XrdCl::File::Open() =>"
1047 <<
" error: OpenHandler called within an invalid RequestManager context."
1048 <<
" This is a logic error and should be reported to the CMSSW developers.";
1049 ex.
addContext(
"Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1053 auto self_ptr = m_self_weak.lock();
1056 ex <<
"XrdCl::File::Open() => error: "
1057 <<
"OpenHandler called after it was deleted. This is a logic error "
1058 <<
"and should be reported to the CMSSW developers.";
1059 ex.
addContext(
"Calling XrdAdapter::RequestManager::OpenHandler::open()");
1072 if (m_outstanding_open) {
1073 return m_shared_future;
1075 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1076 std::promise<std::shared_ptr<Source>> new_promise;
1077 m_promise.swap(new_promise);
1078 m_shared_future = m_promise.get_future().share();
1082 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Trying to open URL: " << new_name;
1083 m_file = std::make_unique<XrdCl::File>();
1084 m_outstanding_open =
true;
1088 m_outstanding_open =
false;
1092 XrdCl::XRootDStatus
status;
1095 ex <<
"XrdCl::File::Open(name='" << new_name <<
"', flags=0x" << std::hex << manager.
m_flags <<
", permissions=0"
1097 <<
", code=" <<
status.code <<
")";
1098 ex.
addContext(
"Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1102 exit_guard.release();
1105 return m_shared_future;