9 #include "XrdCl/XrdClFile.hh"
10 #include "XrdCl/XrdClDefaultEnv.hh"
11 #include "XrdCl/XrdClFileSystem.hh"
23 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
29 #ifdef XRD_FAKE_OPEN_PROBE
43 #include <mach/clock.h>
44 #include <mach/mach.h>
45 #define GET_CLOCK_MONOTONIC(ts) \
47 clock_serv_t cclock; \
48 mach_timespec_t mts; \
49 host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
50 clock_get_time(cclock, &mts); \
51 mach_port_deallocate(mach_task_self(), cclock); \
52 ts.tv_sec = mts.tv_sec; \
53 ts.tv_nsec = mts.tv_nsec; \
56 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
59 using namespace XrdAdaptor;
60 using namespace edm::storage;
63 long long diff = (a.tv_sec - b.tv_sec) * 1000;
64 diff += (a.tv_nsec - b.tv_nsec) / 1e6;
75 XrdCl::Buffer *
buffer =
nullptr;
76 response->Get(buffer);
77 response->Set(static_cast<int *>(
nullptr));
95 XrdCl::FileSystem &
fs() {
return m_fs; }
109 file.GetProperty(
"LastURL", lastUrl);
110 if (jobId && !lastUrl.empty()) {
112 if (!(sm_handler->fs().SendInfo(jobId, sm_handler, 30).IsOK())) {
114 <<
"Failed to send the monitoring information, monitoring ID is " << jobId <<
".";
117 edm::LogInfo(
"XrdAdaptorInternal") <<
"Set monitoring ID to " << jobId <<
".";
122 : m_serverToAdvertise(nullptr),
123 m_timeout(XRD_DEFAULT_TIMEOUT),
124 m_nextInitialSourceToggle(
false),
125 m_redirectLimitDelayScale(1),
129 m_distribution(0, 100),
130 m_excluded_active_count(0) {}
135 XrdCl::Env *
env = XrdCl::DefaultEnv::GetEnv();
137 env->GetInt(
"StreamErrorWindow",
m_timeout);
148 std::unique_ptr<XrdCl::File>
file;
150 bool validFile =
false;
151 const int retries = 5;
153 for (
int idx = 0; idx < retries; idx++) {
154 file = std::make_unique<XrdCl::File>();
157 m_name + (!opaque.empty() ? ((
m_name.find(
'?') ==
m_name.npos) ?
"?" :
"&") + opaque :
"");
158 SyncHostResponseHandler handler;
159 XrdCl::XRootDStatus openStatus = file->Open(new_filename,
m_flags,
m_perms, &handler);
168 ex <<
"XrdCl::File::Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags <<
", permissions=0"
169 << std::oct <<
m_perms <<
std::dec <<
") => error '" << openStatus.ToStr() <<
"' (errno=" << openStatus.errNo
170 <<
", code=" << openStatus.code <<
")";
172 ex.
addAdditionalInfo(
"Remote server already encountered a fatal error; no redirections were performed.");
175 handler.WaitForResponse();
176 std::unique_ptr<XrdCl::XRootDStatus>
status = handler.GetStatus();
177 std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
180 if (status->IsOK()) {
187 ex <<
"XrdCl::File::Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags <<
", permissions=0"
188 << std::oct <<
m_perms <<
std::dec <<
") => error '" << status->ToStr() <<
"' (errno=" << status->errNo
189 <<
", code=" << status->code <<
")";
193 file->GetProperty(
"DataServer", dataServer);
194 file->GetProperty(
"LastURL", lastUrl);
195 if (!dataServer.empty()) {
198 if (!lastUrl.empty()) {
200 edm::LogWarning(
"XrdAdaptorInternal") <<
"Failed to open file at URL " << lastUrl <<
".";
204 ex <<
". No additional data servers were found.";
207 if (!dataServer.empty()) {
212 if (lastUrl == new_filename) {
213 edm::LogWarning(
"XrdAdaptorInternal") << lastUrl <<
", " << new_filename;
226 auto source = std::make_shared<Source>(ts,
std::move(file), excludeString);
257 std::unique_ptr<std::string> hostname(hostname_ptr);
266 auto hostname = std::make_unique<std::string>(
id);
278 if (!iSources.empty()) {
279 siteA = iSources[0]->Site();
281 if (iSources.size() == 2) {
282 siteB = iSources[1]->Site();
285 if (!siteB.empty() && (siteB != siteA)) {
286 siteList = siteA +
", " + siteB;
295 auto siteList = formatSites(iNew);
296 if (!orig_site.empty() && (orig_site != siteList)) {
297 edm::LogWarning(
"XrdAdaptor") <<
"Data is served from " << siteList <<
" instead of original site " << orig_site;
299 auto oldSites = formatSites(iOld);
300 if (orig_site.empty() && (siteList != oldSites)) {
301 if (!oldSites.empty())
302 edm::LogWarning(
"XrdAdaptor") <<
"Data is now served from " << siteList <<
" instead of previous " << oldSites;
310 std::vector<std::shared_ptr<Source>> &inactiveSources) {
329 std::vector<std::shared_ptr<Source>> &inactiveSources)
const {
334 bool findNewSource =
false;
339 <<
"Removing " <<
activeSources[
a]->PrettyID() <<
" from active sources due to poor quality ("
342 findNewSource =
true;
350 return findNewSource;
356 std::vector<std::shared_ptr<Source>> &inactiveSources) {
357 bool findNewSource =
false;
359 findNewSource =
true;
362 <<
", source 1 quality " <<
activeSources[1]->getQuality() << std::endl;
368 std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
369 eligibleInactiveSources.reserve(inactiveSources.size());
370 for (
const auto &
source : inactiveSources) {
372 eligibleInactiveSources.push_back(
source);
375 auto bestInactiveSource =
376 std::min_element(eligibleInactiveSources.begin(),
377 eligibleInactiveSources.end(),
378 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &s2) {
379 return s1->getQuality() < s2->getQuality();
381 auto worstActiveSource = std::max_element(
activeSources.cbegin(),
383 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &s2) {
384 return s1->getQuality() < s2->getQuality();
386 if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
387 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Best inactive source: " << (*bestInactiveSource)->PrettyID()
388 <<
", quality " << (*bestInactiveSource)->getQuality();
390 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Worst active source: " << (*worstActiveSource)->PrettyID()
391 <<
", quality " << (*worstActiveSource)->getQuality();
394 if ((bestInactiveSource != eligibleInactiveSources.end()) &&
activeSources.size() == 1 &&
395 ((*bestInactiveSource)->getQuality() < 4 *
activeSources[0]->getQuality())) {
399 for (
auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
400 if (it->get() == bestInactiveSource->get()) {
401 inactiveSources.erase(it);
405 while ((bestInactiveSource != eligibleInactiveSources.end()) &&
406 (*worstActiveSource)->getQuality() >
409 <<
"Removing " << (*worstActiveSource)->PrettyID() <<
" from active sources due to quality ("
410 << (*worstActiveSource)->getQuality() <<
") and promoting " << (*bestInactiveSource)->PrettyID()
411 <<
" (quality: " << (*bestInactiveSource)->getQuality() <<
")" << std::endl;
412 (*worstActiveSource)->setLastDowngrade(now);
413 for (
auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
414 if (it->get() == bestInactiveSource->get()) {
415 inactiveSources.erase(it);
418 inactiveSources.emplace_back(*worstActiveSource);
423 eligibleInactiveSources.clear();
424 for (
const auto &
source : inactiveSources)
426 eligibleInactiveSources.push_back(
source);
427 bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
428 eligibleInactiveSources.end(),
429 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &s2) {
430 return s1->getQuality() < s2->getQuality();
434 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &s2) {
435 return s1->getQuality() < s2->getQuality();
441 findNewSource =
true;
463 ex <<
"XrdAdaptor::RequestManager::getActiveFile(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
464 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
") => Source used after fatal exception.";
465 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
476 sources.push_back(
source->ID());
484 sources.push_back(
source->PrettyID());
491 sources.push_back(
source);
496 std::vector<std::string> sources;
498 for (
auto const &
source : sources) {
503 for (
auto const &
source : sources) {
509 std::shared_ptr<Source>
source =
nullptr;
522 ex <<
"XrdAdaptor::RequestManager::handle read(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
523 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
") => Source used after fatal exception.";
524 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
539 std::vector<std::shared_ptr<Source>>
activeSources, inactiveSources;
547 std::shared_ptr<void *> guard(
nullptr, [
this, &activeSources, &inactiveSources](
void *) {
557 source->handle(c_ptr);
558 return c_ptr->get_future();
563 std::stringstream
ss;
565 bool has_active =
false;
567 void append_tried(
const std::string &
id,
bool active =
false) {
568 ss << (count ?
"," :
"tried=") <<
id;
579 state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(
':')),
true);
582 state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(
':')));
586 state.append_tried(it.substr(0, it.find(
':')));
588 if (
state.has_active) {
589 state.ss <<
"&triedrc=resel";
592 return state.ss.str();
596 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
598 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Successfully opened new source: " << source->PrettyID() << std::endl;
599 m_redirectLimitDelayScale = 1;
600 for (
const auto &
s : m_activeSources) {
601 if (source->ID() ==
s->ID()) {
603 <<
"Xrootd server returned excluded source " << source->PrettyID() <<
"; ignoring" << std::endl;
604 unsigned returned_count = ++m_excluded_active_count;
606 if (returned_count >= 3) {
612 for (
const auto &
s : m_inactiveSources) {
613 if (source->ID() ==
s->ID()) {
615 <<
"Xrootd server returned excluded inactive source " << source->PrettyID() <<
"; ignoring" << std::endl;
620 if (m_activeSources.size() < 2) {
621 auto oldSources = m_activeSources;
622 m_activeSources.push_back(source);
623 reportSiteChange(oldSources, m_activeSources);
624 queueUpdateCurrentServer(source->ID());
626 m_inactiveSources.push_back(source);
629 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Got failure when trying to open a new source" << std::endl;
631 if (status.status == XrdCl::errRedirectLimit) {
632 m_redirectLimitDelayScale =
std::min(2 * m_redirectLimitDelayScale, 100);
633 delayScale = m_redirectLimitDelayScale;
643 std::vector<std::shared_ptr<Source>>
activeSources, inactiveSources;
645 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
646 activeSources = m_activeSources;
647 inactiveSources = m_inactiveSources;
650 std::shared_ptr<void *> guard(
nullptr, [
this, &activeSources, &inactiveSources](
void *) {
651 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
652 m_activeSources =
std::move(activeSources);
653 m_inactiveSources =
std::move(inactiveSources);
656 updateCurrentServer();
664 if (activeSources.size() == 1) {
665 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, iolist);
666 checkSources(now, c_ptr->getSize(),
activeSources, inactiveSources);
667 activeSources[0]->handle(c_ptr);
668 return c_ptr->get_future();
671 else if (activeSources.empty()) {
673 ex <<
"XrdAdaptor::RequestManager::handle readv(name='" << m_name <<
"', flags=0x" << std::hex << m_flags
674 <<
", permissions=0" << std::oct << m_perms <<
std::dec <<
") => Source used after fatal exception.";
675 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
681 auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
682 auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
683 splitClientRequest(*iolist, *req1, *req2, activeSources);
685 checkSources(now, req1->size() + req2->size(),
activeSources, inactiveSources);
687 if (activeSources.size() == 1) {
688 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, iolist);
689 activeSources[0]->handle(c_ptr);
690 return c_ptr->get_future();
693 std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
694 std::future<IOSize> future1, future2;
695 if (!req1->empty()) {
697 activeSources[0]->handle(c_ptr1);
698 future1 = c_ptr1->get_future();
700 if (!req2->empty()) {
702 activeSources[1]->handle(c_ptr2);
703 future2 = c_ptr2->get_future();
705 if (!req1->empty() && !req2->empty()) {
706 std::future<IOSize> task = std::async(
707 std::launch::deferred,
708 [](std::future<IOSize>
a, std::future<IOSize>
b) {
722 return b.get() + a.get();
729 }
else if (!req1->empty()) {
731 }
else if (!req2->empty()) {
734 std::promise<IOSize>
p;
736 return p.get_future();
741 std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
744 if (c_status.code == XrdCl::errInvalidResponse) {
745 edm::LogWarning(
"XrdAdaptorInternal") <<
"Invalid response when reading from " << source_ptr->PrettyID();
747 ex <<
"XrdAdaptor::RequestManager::requestFailure readv(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
748 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
", old source=" << source_ptr->PrettyID()
749 <<
") => Invalid ReadV response from server";
750 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
754 edm::LogWarning(
"XrdAdaptorInternal") <<
"Request failure when reading from " << source_ptr->PrettyID();
773 std::shared_ptr<Source> new_source;
775 std::shared_future<std::shared_ptr<Source>> future =
m_open_handler->open();
786 ex <<
"XrdAdaptor::RequestManager::requestFailure Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
787 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
", old source=" << source_ptr->PrettyID()
788 <<
") => timeout when waiting for file open";
789 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
794 new_source = future.get();
796 ex.
addContext(
"Handling XrdAdaptor::RequestManager::requestFailure()");
806 ex <<
"XrdAdaptor::RequestManager::requestFailure Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
807 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
", old source=" << source_ptr->PrettyID()
808 <<
", new source=" << new_source->PrettyID() <<
") => Xrootd server returned an excluded source";
809 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
821 new_source->handle(c_ptr);
825 std::vector<IOPosBuffer> &
input,
826 std::vector<IOPosBuffer> &
output,
831 if (io.
size() > chunksize) {
839 consumed = chunksize;
843 consumed = chunksize;
846 chunksize -= consumed;
849 void *newdata =
static_cast<char *
>(io.
data()) + consumed;
853 }
else if (io.
size() == 0) {
856 output.push_back(io);
857 chunksize -= io.
size();
864 std::vector<IOPosBuffer> &
input,
865 std::vector<IOPosBuffer> &
output,
870 if (io.
size() > chunksize) {
878 consumed = chunksize;
882 consumed = chunksize;
885 chunksize -= consumed;
888 void *newdata =
static_cast<char *
>(io.
data()) + consumed;
892 }
else if (io.
size() == 0) {
895 output.push_back(io);
896 chunksize -= io.
size();
904 off_t last_offset = -1;
905 for (
const auto &it : req) {
907 assert(it.offset() > last_offset);
908 last_offset = it.offset();
910 assert(it.offset() < 0x1ffffffffff);
912 assert(req.size() <= 1024);
917 std::vector<IOPosBuffer> &req1,
918 std::vector<IOPosBuffer> &req2,
922 std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
923 req1.reserve(iolist.size() / 2 + 1);
924 req2.reserve(iolist.size() / 2 + 1);
928 float q1 =
static_cast<float>(
activeSources[0]->getQuality()) + 5;
929 float q2 =
static_cast<float>(
activeSources[1]->getQuality()) + 5;
933 static_cast<IOSize>(1024));
935 static_cast<IOSize>(1024));
938 for (
const auto &it : iolist)
939 size_orig += it.size();
941 while (tmp_iolist.size() - front > 0) {
950 ex <<
"XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name <<
"', flags=0x" << std::hex << m_flags
951 <<
", permissions=0" << std::oct << m_perms <<
std::dec
952 <<
") => Unable to split request between active servers. This is an unexpected internal error and should be "
953 "reported to CMSSW developers.";
954 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
956 std::stringstream
ss;
957 ss <<
"Original request size " << iolist.size() <<
"(" << size_orig <<
" bytes)";
959 std::stringstream ss2;
960 ss2 <<
"Quality source 1 " << q1 - 5 <<
", quality source 2: " << q2 - 5;
972 return left.
offset() < right.offset();
975 return left.
offset() < right.offset();
981 assert(size_orig == size1 + size2);
983 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Original request size " << iolist.size() <<
" (" << size_orig
984 <<
" bytes) split into requests size " << req1.size() <<
" (" << size1
985 <<
" bytes) and " << req2.size() <<
" (" << size2 <<
" bytes)" << std::endl;
996 XrdCl::HostList *hostList_ptr) {
998 std::shared_ptr<OpenHandler>
self = m_self;
1004 std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> outstanding_guard(
1005 this, [&](
OpenHandler *) { m_outstanding_open =
false; });
1007 std::shared_ptr<Source>
source;
1008 std::unique_ptr<XrdCl::XRootDStatus>
status(status_ptr);
1009 std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1011 auto manager = m_manager.lock();
1019 std::unique_ptr<XrdCl::File> releaseFile;
1021 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1023 if (status->IsOK()) {
1032 m_promise.set_value(source);
1036 ex <<
"XrdCl::File::Open(name='" << manager->m_name <<
"', flags=0x" << std::hex << manager->m_flags
1037 <<
", permissions=0" << std::oct << manager->m_perms <<
std::dec <<
") => error '" << status->ToStr()
1038 <<
"' (errno=" << status->errNo <<
", code=" << status->code <<
")";
1039 ex.
addContext(
"In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1040 manager->addConnections(ex);
1041 m_promise.set_exception(std::make_exception_ptr(ex));
1044 manager->handleOpen(*status, source);
1048 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1050 if (!m_file.get()) {
1051 return "(no open in progress)";
1054 m_file->GetProperty(
"DataServer", dataServer);
1055 if (dataServer.empty()) {
1056 return "(unknown source)";
1062 auto manager_ptr = m_manager.lock();
1065 ex <<
"XrdCl::File::Open() =>"
1066 <<
" error: OpenHandler called within an invalid RequestManager context."
1067 <<
" This is a logic error and should be reported to the CMSSW developers.";
1068 ex.
addContext(
"Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1072 auto self_ptr = m_self_weak.lock();
1075 ex <<
"XrdCl::File::Open() => error: "
1076 <<
"OpenHandler called after it was deleted. This is a logic error "
1077 <<
"and should be reported to the CMSSW developers.";
1078 ex.
addContext(
"Calling XrdAdapter::RequestManager::OpenHandler::open()");
1091 if (m_outstanding_open) {
1092 return m_shared_future;
1094 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1095 std::promise<std::shared_ptr<Source>> new_promise;
1096 m_promise.swap(new_promise);
1097 m_shared_future = m_promise.get_future().share();
1101 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Trying to open URL: " << new_name;
1102 m_file = std::make_unique<XrdCl::File>();
1103 m_outstanding_open =
true;
1106 std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> exit_guard(
this, [&](
OpenHandler *) {
1107 m_outstanding_open =
false;
1111 XrdCl::XRootDStatus
status;
1112 if (!(status = m_file->Open(new_name, manager.
m_flags, manager.
m_perms,
this)).IsOK()) {
1114 ex <<
"XrdCl::File::Open(name='" << new_name <<
"', flags=0x" << std::hex << manager.
m_flags <<
", permissions=0"
1115 << std::oct << manager.
m_perms <<
std::dec <<
") => error '" << status.ToStr() <<
"' (errno=" << status.errNo
1116 <<
", code=" << status.code <<
")";
1117 ex.
addContext(
"Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1121 exit_guard.release();
1124 return m_shared_future;
Log< level::Info, true > LogVerbatim
static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_future< std::shared_ptr< Source > > open()
std::shared_ptr< XrdCl::File > getActiveFile() const
uint16_t *__restrict__ id
void getDisabledSourceNames(std::vector< std::string > &sources) const
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
std::string prepareOpaqueString() const
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
static constexpr int XRD_CL_MAX_CHUNK
void set_size(IOSize new_size)
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
void set_offset(IOOffset new_offset)
static std::string const input
static void SendMonitoringInfo(XrdCl::File &file)
OpenHandler(const OpenHandler &)=delete
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
void queueUpdateCurrentServer(const std::string &)
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static bool getDomain(const std::string &host, std::string &domain)
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
XrdSiteStatisticsInformation * statsService
void addAdditionalInfo(std::string const &info)
timespec m_nextActiveSourceCheck
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
std::atomic< std::string * > m_serverToAdvertise
std::string current_source()
std::shared_ptr< OpenHandler > m_open_handler
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void getActiveSourceNames(std::vector< std::string > &sources) const
Log< level::Info, false > LogInfo
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY
static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static const char * getJobID()
void set_data(void *new_buffer)
std::shared_ptr< Source > pickSingleSource()
static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD
oneapi::tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
static bool getXrootdSiteFromURL(std::string url, std::string &site)
static IOSize validateList(const std::vector< IOPosBuffer > req)
void updateCurrentServer()
edm::storage::IOSize IOSize
Log< level::Warning, false > LogWarning
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
SendMonitoringInfoHandler(const std::string &url)
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
static bool getHostname(const std::string &id, std::string &hostname)
static std::string const source
void clearAdditionalInfo()
RequestManager(const RequestManager &)=delete
std::recursive_mutex m_source_mutex
bool m_nextInitialSourceToggle
timespec m_lastSourceCheck
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)