9 #include "XrdCl/XrdClPostMasterInterfaces.hh" 10 #include "XrdCl/XrdClPostMaster.hh" 12 #include "XrdCl/XrdClFile.hh" 13 #include "XrdCl/XrdClDefaultEnv.hh" 14 #include "XrdCl/XrdClFileSystem.hh" 26 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh" 32 #ifdef XRD_FAKE_OPEN_PROBE 46 #include <mach/clock.h> 47 #include <mach/mach.h> 48 #define GET_CLOCK_MONOTONIC(ts) \ 50 clock_serv_t cclock; \ 51 mach_timespec_t mts; \ 52 host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \ 53 clock_get_time(cclock, &mts); \ 54 mach_port_deallocate(mach_task_self(), cclock); \ 55 ts.tv_sec = mts.tv_sec; \ 56 ts.tv_nsec = mts.tv_nsec; \ 59 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts); 66 long long diff = (
a.tv_sec -
b.tv_sec) * 1000;
67 diff += (
a.tv_nsec -
b.tv_nsec) / 1e6;
78 XrdCl::Buffer *
buffer =
nullptr;
80 response->Set(static_cast<int *>(
nullptr));
98 XrdCl::FileSystem &
fs() {
return m_fs; }
112 file.GetProperty(
"LastURL", lastUrl);
113 if (jobId && !lastUrl.empty()) {
115 if (!(sm_handler->fs().SendInfo(jobId, sm_handler, 30).IsOK())) {
117 <<
"Failed to send the monitoring information, monitoring ID is " << jobId <<
".";
120 edm::LogInfo(
"XrdAdaptorInternal") <<
"Set monitoring ID to " << jobId <<
".";
125 std::unique_ptr<std::string> getQueryTransport(
const XrdCl::URL &
url, uint16_t
query) {
127 XrdCl::DefaultEnv::GetPostMaster()->QueryTransport(
url,
query,
result);
130 return std::unique_ptr<std::string>(
tmp);
133 void tracerouteRedirections(
const XrdCl::HostList *hostList) {
134 edm::LogInfo(
"XrdAdaptorLvl2").log([hostList](
auto &li) {
135 int idx_redirection = 1;
136 li <<
"-------------------------------\nTraceroute:\n";
137 for (
auto const &
host : *hostList) {
139 std::unique_ptr<std::string> stack_ip_method = getQueryTransport(
host.url, XrdCl::StreamQuery::IpStack);
140 std::unique_ptr<std::string> ip_method = getQueryTransport(
host.url, XrdCl::StreamQuery::IpAddr);
141 std::unique_ptr<std::string> auth_method = getQueryTransport(
host.url, XrdCl::TransportQuery::Auth);
142 std::unique_ptr<std::string> hostname_method = getQueryTransport(
host.url, XrdCl::StreamQuery::HostName);
146 if (!auth_method->empty()) {
151 if (
host.loadBalancer == 1) {
152 type_resource =
"load balancer";
154 li.format(
"{}. || {} / {} / {} / {} / {} / {} ||\n",
164 li.format(
"-------------------------------");
170 : m_serverToAdvertise(nullptr),
171 m_timeout(XRD_DEFAULT_TIMEOUT),
172 m_nextInitialSourceToggle(
false),
173 m_redirectLimitDelayScale(1),
177 m_distribution(0, 100),
178 m_excluded_active_count(0) {}
183 XrdCl::Env *
env = XrdCl::DefaultEnv::GetEnv();
196 std::unique_ptr<XrdCl::File>
file;
198 bool validFile =
false;
199 const int retries = 5;
201 for (
int idx = 0;
idx < retries;
idx++) {
202 file = std::make_unique<XrdCl::File>();
205 m_name + (!opaque.empty() ? ((
m_name.find(
'?') ==
m_name.npos) ?
"?" :
"&") + opaque :
"");
206 SyncHostResponseHandler handler;
207 XrdCl::XRootDStatus openStatus =
file->Open(new_filename,
m_flags,
m_perms, &handler);
216 ex <<
"XrdCl::File::Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags <<
", permissions=0" 217 << std::oct <<
m_perms <<
std::dec <<
") => error '" << openStatus.ToStr() <<
"' (errno=" << openStatus.errNo
218 <<
", code=" << openStatus.code <<
")";
220 ex.
addAdditionalInfo(
"Remote server already encountered a fatal error; no redirections were performed.");
223 handler.WaitForResponse();
224 std::unique_ptr<XrdCl::XRootDStatus>
status = handler.GetStatus();
225 std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
226 tracerouteRedirections(hostList.get());
236 ex <<
"XrdCl::File::Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags <<
", permissions=0" 238 <<
", code=" <<
status->code <<
")";
242 file->GetProperty(
"DataServer", dataServer);
243 file->GetProperty(
"LastURL", lastUrl);
244 if (!dataServer.empty()) {
247 if (!lastUrl.empty()) {
249 edm::LogWarning(
"XrdAdaptorInternal") <<
"Failed to open file at URL " << lastUrl <<
".";
253 ex <<
". No additional data servers were found.";
256 if (!dataServer.empty()) {
261 if (lastUrl == new_filename) {
262 edm::LogWarning(
"XrdAdaptorInternal") << lastUrl <<
", " << new_filename;
306 std::unique_ptr<std::string> hostname(hostname_ptr);
315 auto hostname = std::make_unique<std::string>(
id);
327 if (!iSources.empty()) {
328 siteA = iSources[0]->Site();
330 if (iSources.size() == 2) {
331 siteB = iSources[1]->Site();
334 if (!siteB.empty() && (siteB != siteA)) {
335 siteList = siteA +
", " + siteB;
344 auto siteList = formatSites(iNew);
345 if (orig_site.empty() || (orig_site == siteList)) {
346 auto oldSites = formatSites(iOld);
350 li <<
"Serving data from: ";
351 int size_active_sources = iNew.size();
352 for (
int i = 0;
i < size_active_sources; ++
i) {
355 li.format(
" [{}] {}",
i + 1, hostname_a);
360 li <<
"The quality of the active sources is: ";
361 int size_active_sources = iNew.size();
362 for (
int i = 0;
i < size_active_sources; ++
i) {
366 li.format(
" [{}] {} for {}",
i + 1,
quality, hostname_a);
374 std::vector<std::shared_ptr<Source>> &inactiveSources) {
393 std::vector<std::shared_ptr<Source>> &inactiveSources)
const {
399 bool findNewSource =
false;
400 if ((quality_a > 5130) || ((quality_a > 260) && (quality_b * 4 < quality_a))) {
403 if (quality_a > 5130) {
404 edm::LogWarning(
"XrdAdaptorLvl3") <<
"Deactivating " << hostname_a <<
" from active sources because the quality (" 405 << quality_a <<
") is above 5130 and it is not the only active server";
407 if ((quality_a > 260) && (quality_b * 4 < quality_a)) {
410 edm::LogWarning(
"XrdAdaptorLvl3") <<
"Deactivating " << hostname_a <<
" from active sources because its quality (" 412 <<
") is higher than 260 and 4 times larger than the other active server " 413 << hostname_b <<
" (" << quality_b <<
") ";
415 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Removing " << hostname_a <<
" from active sources due to poor quality (" 416 << quality_a <<
" vs " << quality_b <<
")" << std::endl;
418 findNewSource =
true;
426 return findNewSource;
432 std::vector<std::shared_ptr<Source>> &inactiveSources) {
433 bool findNewSource =
false;
435 findNewSource =
true;
437 <<
"Looking for an additional source because the number of active sources is smaller than 2";
440 <<
", source 1 quality " <<
activeSources[1]->getQuality() << std::endl;
446 std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
447 eligibleInactiveSources.reserve(inactiveSources.size());
448 for (
const auto &
source : inactiveSources) {
450 eligibleInactiveSources.push_back(
source);
453 auto bestInactiveSource =
454 std::min_element(eligibleInactiveSources.begin(),
455 eligibleInactiveSources.end(),
456 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &s2) {
457 return s1->getQuality() < s2->getQuality();
459 auto worstActiveSource = std::max_element(
activeSources.cbegin(),
461 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &s2) {
462 return s1->getQuality() < s2->getQuality();
464 if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
465 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Best inactive source: " << (*bestInactiveSource)->PrettyID()
466 <<
", quality " << (*bestInactiveSource)->getQuality();
468 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Worst active source: " << (*worstActiveSource)->PrettyID()
469 <<
", quality " << (*worstActiveSource)->getQuality();
472 if ((bestInactiveSource != eligibleInactiveSources.end()) &&
activeSources.size() == 1 &&
473 ((*bestInactiveSource)->getQuality() < 4 *
activeSources[0]->getQuality())) {
477 for (
auto it = inactiveSources.begin();
it != inactiveSources.end();
it++)
478 if (
it->get() == bestInactiveSource->get()) {
479 inactiveSources.erase(
it);
483 while ((bestInactiveSource != eligibleInactiveSources.end()) &&
484 (*worstActiveSource)->getQuality() >
487 <<
"Removing " << (*worstActiveSource)->PrettyID() <<
" from active sources due to quality (" 488 << (*worstActiveSource)->getQuality() <<
") and promoting " << (*bestInactiveSource)->PrettyID()
489 <<
" (quality: " << (*bestInactiveSource)->getQuality() <<
")" << std::endl;
490 (*worstActiveSource)->setLastDowngrade(
now);
491 for (
auto it = inactiveSources.begin();
it != inactiveSources.end();
it++)
492 if (
it->get() == bestInactiveSource->get()) {
493 inactiveSources.erase(
it);
496 inactiveSources.emplace_back(*worstActiveSource);
501 eligibleInactiveSources.clear();
502 for (
const auto &
source : inactiveSources)
504 eligibleInactiveSources.push_back(
source);
505 bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
506 eligibleInactiveSources.end(),
507 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &s2) {
508 return s1->getQuality() < s2->getQuality();
512 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &s2) {
513 return s1->getQuality() < s2->getQuality();
519 findNewSource =
true;
541 ex <<
"XrdAdaptor::RequestManager::getActiveFile(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags 542 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
") => Source used after fatal exception.";
543 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
574 std::vector<std::string>
sources;
587 std::shared_ptr<Source>
source =
nullptr;
600 ex <<
"XrdAdaptor::RequestManager::handle read(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags 601 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
") => Source used after fatal exception.";
602 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
617 std::vector<std::shared_ptr<Source>>
activeSources, inactiveSources;
625 std::shared_ptr<void *> guard(
nullptr, [
this, &
activeSources, &inactiveSources](
void *) {
636 return c_ptr->get_future();
641 std::stringstream
ss;
643 bool has_active =
false;
645 void append_tried(
const std::string &
id,
bool active =
false) {
646 ss << (
count ?
"," :
"tried=") <<
id;
657 state.append_tried(
it->ExcludeID().substr(0,
it->ExcludeID().find(
':')),
true);
660 state.append_tried(
it->ExcludeID().substr(0,
it->ExcludeID().find(
':')));
664 state.append_tried(
it.substr(0,
it.find(
':')));
666 if (
state.has_active) {
667 state.ss <<
"&triedrc=resel";
670 return state.ss.str();
674 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
676 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Successfully opened new source: " <<
source->PrettyID() << std::endl;
677 m_redirectLimitDelayScale = 1;
678 for (
const auto &
s : m_activeSources) {
681 <<
"Xrootd server returned excluded source " <<
source->PrettyID() <<
"; ignoring" << std::endl;
682 unsigned returned_count = ++m_excluded_active_count;
684 if (returned_count >= 3) {
690 for (
const auto &
s : m_inactiveSources) {
693 <<
"Xrootd server returned excluded inactive source " <<
source->PrettyID() <<
"; ignoring" << std::endl;
698 if (m_activeSources.size() < 2) {
699 auto oldSources = m_activeSources;
700 m_activeSources.push_back(
source);
701 reportSiteChange(oldSources, m_activeSources);
702 queueUpdateCurrentServer(
source->ID());
704 m_inactiveSources.push_back(
source);
707 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Got failure when trying to open a new source" << std::endl;
709 if (
status.status == XrdCl::errRedirectLimit) {
710 m_redirectLimitDelayScale =
std::min(2 * m_redirectLimitDelayScale, 100);
711 delayScale = m_redirectLimitDelayScale;
721 std::vector<std::shared_ptr<Source>>
activeSources, inactiveSources;
723 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
725 inactiveSources = m_inactiveSources;
728 std::shared_ptr<void *> guard(
nullptr, [
this, &
activeSources, &inactiveSources](
void *) {
729 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
731 m_inactiveSources =
std::move(inactiveSources);
734 updateCurrentServer();
743 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, iolist);
746 return c_ptr->get_future();
751 ex <<
"XrdAdaptor::RequestManager::handle readv(name='" << m_name <<
"', flags=0x" << std::hex << m_flags
752 <<
", permissions=0" << std::oct << m_perms <<
std::dec <<
") => Source used after fatal exception.";
753 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
759 auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
760 auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
763 checkSources(
now, req1->size() + req2->size(),
activeSources, inactiveSources);
766 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, iolist);
768 return c_ptr->get_future();
771 std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
772 std::future<IOSize> future1, future2;
773 if (!req1->empty()) {
776 future1 = c_ptr1->get_future();
778 if (!req2->empty()) {
781 future2 = c_ptr2->get_future();
783 if (!req1->empty() && !req2->empty()) {
784 std::future<IOSize>
task = std::async(
785 std::launch::deferred,
786 [](std::future<IOSize>
a, std::future<IOSize>
b) {
800 return b.get() +
a.get();
807 }
else if (!req1->empty()) {
809 }
else if (!req2->empty()) {
812 std::promise<IOSize>
p;
814 return p.get_future();
819 std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
822 if (c_status.code == XrdCl::errInvalidResponse) {
823 edm::LogWarning(
"XrdAdaptorInternal") <<
"Invalid response when reading from " << source_ptr->PrettyID();
825 ex <<
"XrdAdaptor::RequestManager::requestFailure readv(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags 826 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
", old source=" << source_ptr->PrettyID()
827 <<
") => Invalid ReadV response from server";
828 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
832 edm::LogWarning(
"XrdAdaptorInternal") <<
"Request failure when reading from " << source_ptr->PrettyID();
851 std::shared_ptr<Source> new_source;
853 std::shared_future<std::shared_ptr<Source>> future =
m_open_handler->open();
864 ex <<
"XrdAdaptor::RequestManager::requestFailure Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags 865 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
", old source=" << source_ptr->PrettyID()
866 <<
") => timeout when waiting for file open";
867 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
872 new_source = future.get();
874 ex.
addContext(
"Handling XrdAdaptor::RequestManager::requestFailure()");
884 ex <<
"XrdAdaptor::RequestManager::requestFailure Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags 885 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
", old source=" << source_ptr->PrettyID()
886 <<
", new source=" << new_source->PrettyID() <<
") => Xrootd server returned an excluded source";
887 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
899 new_source->handle(c_ptr);
903 std::vector<IOPosBuffer> &
input,
904 std::vector<IOPosBuffer> &
output,
909 if (
io.size() > chunksize) {
917 consumed = chunksize;
921 consumed = chunksize;
924 chunksize -= consumed;
925 IOSize newsize =
io.size() - consumed;
927 void *newdata =
static_cast<char *
>(
io.data()) + consumed;
928 io.set_offset(newoffset);
929 io.set_data(newdata);
930 io.set_size(newsize);
931 }
else if (
io.size() == 0) {
935 chunksize -=
io.size();
942 std::vector<IOPosBuffer> &
input,
943 std::vector<IOPosBuffer> &
output,
948 if (
io.size() > chunksize) {
956 consumed = chunksize;
960 consumed = chunksize;
963 chunksize -= consumed;
964 IOSize newsize =
io.size() - consumed;
966 void *newdata =
static_cast<char *
>(
io.data()) + consumed;
967 io.set_offset(newoffset);
968 io.set_data(newdata);
969 io.set_size(newsize);
970 }
else if (
io.size() == 0) {
974 chunksize -=
io.size();
982 off_t last_offset = -1;
983 for (
const auto &
it : req) {
986 last_offset =
it.offset();
988 assert(
it.offset() < 0x1ffffffffff);
990 assert(req.size() <= 1024);
995 std::vector<IOPosBuffer> &req1,
996 std::vector<IOPosBuffer> &req2,
1000 std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
1001 req1.reserve(iolist.size() / 2 + 1);
1002 req2.reserve(iolist.size() / 2 + 1);
1006 float q1 =
static_cast<float>(
activeSources[0]->getQuality()) + 5;
1007 float q2 =
static_cast<float>(
activeSources[1]->getQuality()) + 5;
1011 static_cast<IOSize>(1024));
1013 static_cast<IOSize>(1024));
1016 for (
const auto &
it : iolist)
1017 size_orig +=
it.size();
1019 while (tmp_iolist.size() - front > 0) {
1028 ex <<
"XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name <<
"', flags=0x" << std::hex << m_flags
1029 <<
", permissions=0" << std::oct << m_perms <<
std::dec 1030 <<
") => Unable to split request between active servers. This is an unexpected internal error and should be " 1031 "reported to CMSSW developers.";
1032 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
1034 std::stringstream
ss;
1035 ss <<
"Original request size " << iolist.size() <<
"(" << size_orig <<
" bytes)";
1037 std::stringstream ss2;
1038 ss2 <<
"Quality source 1 " << q1 - 5 <<
", quality source 2: " << q2 - 5;
1050 return left.
offset() < right.offset();
1053 return left.
offset() < right.offset();
1059 assert(size_orig == size1 + size2);
1061 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Original request size " << iolist.size() <<
" (" << size_orig
1062 <<
" bytes) split into requests size " << req1.size() <<
" (" << size1
1063 <<
" bytes) and " << req2.size() <<
" (" << size2 <<
" bytes)" << std::endl;
1074 XrdCl::HostList *hostList_ptr) {
1076 std::shared_ptr<OpenHandler>
self = m_self;
1082 std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> outstanding_guard(
1083 this, [&](
OpenHandler *) { m_outstanding_open =
false; });
1085 std::shared_ptr<Source>
source;
1086 std::unique_ptr<XrdCl::XRootDStatus>
status(status_ptr);
1087 std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1088 tracerouteRedirections(hostList.get());
1089 auto manager = m_manager.lock();
1097 std::unique_ptr<XrdCl::File> releaseFile;
1099 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1110 m_promise.set_value(
source);
1114 ex <<
"XrdCl::File::Open(name='" << manager->m_name <<
"', flags=0x" << std::hex << manager->m_flags
1115 <<
", permissions=0" << std::oct << manager->m_perms <<
std::dec <<
") => error '" <<
status->ToStr()
1116 <<
"' (errno=" <<
status->errNo <<
", code=" <<
status->code <<
")";
1117 ex.
addContext(
"In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1118 manager->addConnections(ex);
1119 m_promise.set_exception(std::make_exception_ptr(ex));
1126 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1128 if (!m_file.get()) {
1129 return "(no open in progress)";
1132 m_file->GetProperty(
"DataServer", dataServer);
1133 if (dataServer.empty()) {
1134 return "(unknown source)";
1140 auto manager_ptr = m_manager.lock();
1143 ex <<
"XrdCl::File::Open() =>" 1144 <<
" error: OpenHandler called within an invalid RequestManager context." 1145 <<
" This is a logic error and should be reported to the CMSSW developers.";
1146 ex.
addContext(
"Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1150 auto self_ptr = m_self_weak.lock();
1153 ex <<
"XrdCl::File::Open() => error: " 1154 <<
"OpenHandler called after it was deleted. This is a logic error " 1155 <<
"and should be reported to the CMSSW developers.";
1156 ex.
addContext(
"Calling XrdAdapter::RequestManager::OpenHandler::open()");
1169 if (m_outstanding_open) {
1170 return m_shared_future;
1172 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1173 std::promise<std::shared_ptr<Source>> new_promise;
1174 m_promise.swap(new_promise);
1175 m_shared_future = m_promise.get_future().share();
1179 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Trying to open URL: " << new_name;
1180 m_file = std::make_unique<XrdCl::File>();
1181 m_outstanding_open =
true;
1184 std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> exit_guard(
this, [&](
OpenHandler *) {
1185 m_outstanding_open =
false;
1189 XrdCl::XRootDStatus
status;
1192 ex <<
"XrdCl::File::Open(name='" << new_name <<
"', flags=0x" << std::hex << manager.
m_flags <<
", permissions=0" 1194 <<
", code=" <<
status.code <<
")";
1195 ex.
addContext(
"Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1199 exit_guard.release();
1202 return m_shared_future;
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
Log< level::Info, true > LogVerbatim
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
Basic3DVector & operator=(const Basic3DVector &)=default
Assignment operator.
static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_future< std::shared_ptr< Source > > open()
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
static constexpr int XRD_CL_MAX_CHUNK
void set_size(IOSize new_size)
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
std::string prepareOpaqueString() const
static std::string to_string(const XMLCh *ch)
static std::string const input
static void SendMonitoringInfo(XrdCl::File &file)
OpenHandler(const OpenHandler &)=delete
void queueUpdateCurrentServer(const std::string &)
long long timeDiffMS(const timespec &a, const timespec &b)
static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static bool getDomain(const std::string &host, std::string &domain)
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void addAdditionalInfo(std::string const &info)
timespec m_nextActiveSourceCheck
void addConnections(cms::Exception &) const
std::shared_ptr< XrdCl::File > getActiveFile() const
std::atomic< std::string * > m_serverToAdvertise
std::string current_source()
std::shared_ptr< OpenHandler > m_open_handler
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
Log< level::Info, false > LogInfo
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY
static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static const char * getJobID()
void getDisabledSourceNames(std::vector< std::string > &sources) const
std::shared_ptr< Source > pickSingleSource()
static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD
oneapi::tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
static bool getXrootdSiteFromURL(std::string url, std::string &site)
static IOSize validateList(const std::vector< IOPosBuffer > req)
void updateCurrentServer()
edm::storage::IOSize IOSize
Log< level::Warning, false > LogWarning
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
SendMonitoringInfoHandler(const std::string &url)
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
static bool getHostname(const std::string &id, std::string &hostname)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
static std::string const source
void clearAdditionalInfo()
RequestManager(const RequestManager &)=delete
void getActiveSourceNames(std::vector< std::string > &sources) const
std::recursive_mutex m_source_mutex
bool m_nextInitialSourceToggle
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
timespec m_lastSourceCheck
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)