7 #include "XrdCl/XrdClFile.hh" 8 #include "XrdCl/XrdClDefaultEnv.hh" 9 #include "XrdCl/XrdClFileSystem.hh" 19 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh" 21 #define XRD_CL_MAX_CHUNK 512*1024 23 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5 25 #ifdef XRD_FAKE_OPEN_PROBE 26 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100 27 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20 29 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0 31 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10 32 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2*60 33 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100 36 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000 40 #include <mach/clock.h> 41 #include <mach/mach.h> 42 #define GET_CLOCK_MONOTONIC(ts) \ 44 clock_serv_t cclock; \ 45 mach_timespec_t mts; \ 46 host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \ 47 clock_get_time(cclock, &mts); \ 48 mach_port_deallocate(mach_task_self(), cclock); \ 49 ts.tv_sec = mts.tv_sec; \ 50 ts.tv_nsec = mts.tv_nsec; \ 53 #define GET_CLOCK_MONOTONIC(ts) \ 54 clock_gettime(CLOCK_MONOTONIC, &ts); 61 long long diff = (a.tv_sec - b.tv_sec) * 1000;
62 diff += (a.tv_nsec - b.tv_nsec) / 1e6;
76 XrdCl::Buffer *
buffer =
nullptr;
77 response->Get(buffer);
78 response->Set(static_cast<int*>(
nullptr));
101 file.GetProperty(
"LastURL", lastUrl);
102 if (jobId && lastUrl.size())
104 XrdCl::URL
url(lastUrl);
105 XrdCl::FileSystem fs(url);
106 if (!(fs.SendInfo(jobId, &nullHandler, 30).IsOK()))
108 edm::LogWarning(
"XrdAdaptorInternal") <<
"Failed to send the monitoring information, monitoring ID is " << jobId <<
".";
110 edm::LogInfo(
"XrdAdaptorInternal") <<
"Set monitoring ID to " << jobId <<
".";
116 : m_serverToAdvertise(
nullptr),
117 m_timeout(XRD_DEFAULT_TIMEOUT),
118 m_nextInitialSourceToggle(
false),
122 m_distribution(0,100),
123 m_excluded_active_count(0)
133 XrdCl::Env *
env = XrdCl::DefaultEnv::GetEnv();
134 if (env) {env->GetInt(
"StreamErrorWindow",
m_timeout);}
146 std::unique_ptr<XrdCl::File>
file;
148 bool validFile =
false;
149 const int retries = 5;
154 file->SetProperty(
"ReadRecovery",
"false");
157 SyncHostResponseHandler handler;
158 XrdCl::XRootDStatus openStatus = file->Open(new_filename,
m_flags,
m_perms, &handler);
159 if (!openStatus.IsOK())
167 ex <<
"XrdCl::File::Open(name='" <<
m_name 168 <<
"', flags=0x" << std::hex <<
m_flags 170 <<
") => error '" << openStatus.ToStr()
171 <<
"' (errno=" << openStatus.errNo <<
", code=" << openStatus.code <<
")";
173 ex.
addAdditionalInfo(
"Remote server already encountered a fatal error; no redirections were performed.");
176 handler.WaitForResponse();
177 std::unique_ptr<XrdCl::XRootDStatus>
status = handler.GetStatus();
178 std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
191 ex <<
"XrdCl::File::Open(name='" <<
m_name 192 <<
"', flags=0x" << std::hex <<
m_flags 194 <<
") => error '" << status->ToStr()
195 <<
"' (errno=" << status->errNo <<
", code=" << status->code <<
")";
199 file->GetProperty(
"DataServer", dataServer);
200 file->GetProperty(
"LastURL", lastUrl);
201 if (dataServer.size())
208 edm::LogWarning(
"XrdAdaptorInternal") <<
"Failed to open file at URL " << lastUrl <<
".";
212 ex <<
". No additional data servers were found.";
215 if (dataServer.size())
221 if (lastUrl == new_filename)
223 edm::LogWarning(
"XrdAdaptorInternal") << lastUrl <<
", " << new_filename;
237 auto source = std::make_shared<Source>(ts,
std::move(file), excludeString);
269 std::unique_ptr<std::string> hostname(hostname_ptr);
281 auto hostname = std::make_unique<std::string>(
id);
293 std::string formatSites(std::vector<std::shared_ptr<Source> >
const& iSources) {
295 if (iSources.size()) {siteA = iSources[0]->Site();}
296 if (iSources.size() == 2) {siteB = iSources[1]->Site();}
298 if (siteB.size() && (siteB != siteA)) {siteList = siteA +
", " + siteB;}
305 std::vector<std::shared_ptr<Source> >
const& iNew,
308 auto siteList = formatSites(iNew);
309 if (orig_site.size() && (orig_site != siteList))
311 edm::LogWarning(
"XrdAdaptor") <<
"Data is served from " << siteList <<
" instead of original site " << orig_site;
314 auto oldSites = formatSites(iOld);
315 if (!orig_site.size() && (siteList != oldSites))
317 if (oldSites.size() >0 )
318 edm::LogWarning(
"XrdAdaptor") <<
"Data is now served from " << siteList <<
" instead of previous " << oldSites;
326 std::vector<std::shared_ptr<Source>>& activeSources,
327 std::vector<std::shared_ptr<Source>>& inactiveSources)
349 std::vector<std::shared_ptr<Source>>& activeSources,
350 std::vector<std::shared_ptr<Source>>& inactiveSources)
const 352 if (activeSources.size() <
std::max(a, b)+1) {
return false;}
354 bool findNewSource =
false;
355 if ((activeSources[a]->getQuality() > 5130) ||
356 ((activeSources[
a]->getQuality() > 260) && (activeSources[b]->getQuality()*4 < activeSources[
a]->getQuality())))
359 << activeSources[
a]->PrettyID() <<
" from active sources due to poor quality (" 360 << activeSources[
a]->getQuality() <<
" vs " << activeSources[
b]->getQuality() <<
")" << std::endl;
361 if (activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource =
true;}
362 activeSources[
a]->setLastDowngrade(now);
363 inactiveSources.emplace_back(activeSources[a]);
364 auto oldSources = activeSources;
365 activeSources.erase(activeSources.begin()+
a);
368 return findNewSource;
374 std::vector<std::shared_ptr<Source>>& activeSources,
375 std::vector<std::shared_ptr<Source>>& inactiveSources)
378 bool findNewSource =
false;
379 if (activeSources.size() <= 1)
381 findNewSource =
true;
383 else if (activeSources.size() > 1)
385 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Source 0 quality " << activeSources[0]->getQuality() <<
", source 1 quality " << activeSources[1]->getQuality() << std::endl;
386 findNewSource |=
compareSources(now, 0, 1, activeSources,inactiveSources);
387 findNewSource |=
compareSources(now, 1, 0,activeSources, inactiveSources);
391 std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(inactiveSources.size());
392 for (
const auto &
source : inactiveSources)
396 auto bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
397 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
return s1->getQuality() <
s2->getQuality();});
398 auto worstActiveSource = std::max_element(activeSources.cbegin(), activeSources.cend(),
399 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
return s1->getQuality() <
s2->getQuality();});
400 if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
402 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Best inactive source: " <<(*bestInactiveSource)->PrettyID()
403 <<
", quality " << (*bestInactiveSource)->getQuality();
405 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Worst active source: " <<(*worstActiveSource)->PrettyID()
406 <<
", quality " << (*worstActiveSource)->getQuality();
409 if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*activeSources[0]->getQuality()))
411 auto oldSources = activeSources;
412 activeSources.push_back(*bestInactiveSource);
414 for (
auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
if (it->get() == bestInactiveSource->get()) {inactiveSources.erase(it);
break;}
416 else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+
XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
418 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Removing " << (*worstActiveSource)->PrettyID()
419 <<
" from active sources due to quality (" << (*worstActiveSource)->getQuality()
420 <<
") and promoting " << (*bestInactiveSource)->PrettyID() <<
" (quality: " 421 << (*bestInactiveSource)->getQuality() <<
")" << std::endl;
422 (*worstActiveSource)->setLastDowngrade(now);
423 for (
auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
if (it->get() == bestInactiveSource->get()) {inactiveSources.erase(it);
break;}
424 inactiveSources.emplace_back(
std::move(*worstActiveSource));
425 auto oldSources = activeSources;
426 activeSources.erase(worstActiveSource);
427 activeSources.emplace_back(
std::move(*bestInactiveSource));
429 eligibleInactiveSources.clear();
431 bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
432 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
return s1->getQuality() <
s2->getQuality();});
433 worstActiveSource = std::max_element(activeSources.begin(), activeSources.end(),
434 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
return s1->getQuality() <
s2->getQuality();});
441 findNewSource =
true;
452 if (activeSources.size() == 2)
463 std::shared_ptr<XrdCl::File>
470 ex <<
"XrdAdaptor::RequestManager::getActiveFile(name='" <<
m_name 471 <<
"', flags=0x" << std::hex <<
m_flags 473 <<
") => Source used after fatal exception.";
474 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
487 sources.push_back(
source->ID());
497 sources.push_back(
source->PrettyID());
506 sources.push_back(
source);
513 std::vector<std::string> sources;
515 for (
auto const&
source : sources)
521 for (
auto const&
source : sources)
527 std::shared_ptr<Source>
530 std::shared_ptr<Source>
source =
nullptr;
549 ex <<
"XrdAdaptor::RequestManager::handle read(name='" <<
m_name 550 <<
"', flags=0x" << std::hex <<
m_flags 552 <<
") => Source used after fatal exception.";
553 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
572 std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
580 std::shared_ptr<void*> guard(
nullptr, [
this, &activeSources, &inactiveSources](
void *) {
586 checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
590 source->handle(c_ptr);
591 return c_ptr->get_future();
597 std::stringstream ss;
606 ss << it->ExcludeID().substr(0, it->ExcludeID().find(
":")) <<
",";
611 ss << it->ExcludeID().substr(0, it->ExcludeID().find(
":")) <<
",";
617 ss << it.substr(0, it.find(
":")) <<
",";
622 return tmp_str.substr(0, tmp_str.size()-1);
633 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Successfully opened new source: " << source->PrettyID() << std::endl;
636 if (source->ID() ==
s->ID())
638 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Xrootd server returned excluded source " << source->PrettyID()
639 <<
"; ignoring" << std::endl;
648 if (source->ID() ==
s->ID())
650 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Xrootd server returned excluded inactive source " << source->PrettyID()
651 <<
"; ignoring" << std::endl;
656 if (m_activeSources.size() < 2)
659 m_activeSources.push_back(source);
665 m_inactiveSources.push_back(source);
670 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Got failure when trying to open a new source" << std::endl;
681 std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
688 std::shared_ptr<void*> guard(
nullptr, [
this, &activeSources, &inactiveSources](
void *) {
702 if (activeSources.size() == 1)
704 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, iolist);
705 checkSources(now, c_ptr->getSize(), activeSources,inactiveSources);
706 activeSources[0]->handle(c_ptr);
707 return c_ptr->get_future();
710 else if (activeSources.empty())
713 ex <<
"XrdAdaptor::RequestManager::handle readv(name='" <<
m_name 714 <<
"', flags=0x" << std::hex <<
m_flags 716 <<
") => Source used after fatal exception.";
717 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
722 assert(iolist.get());
723 auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
724 auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
727 checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
729 if (activeSources.size() == 1)
731 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, iolist);
732 activeSources[0]->handle(c_ptr);
733 return c_ptr->get_future();
736 std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
737 std::future<IOSize> future1, future2;
741 activeSources[0]->handle(c_ptr1);
742 future1 = c_ptr1->get_future();
747 activeSources[1]->handle(c_ptr2);
748 future2 = c_ptr2->get_future();
750 if (req1->size() && req2->size())
752 std::future<IOSize> task = std::async(std::launch::deferred,
753 [](std::future<IOSize>
a, std::future<IOSize>
b){
766 return b.get() + a.get();
774 else if (req1->size()) {
return future1; }
775 else if (req2->size()) {
return future2; }
778 std::promise<IOSize>
p; p.set_value(0);
779 return p.get_future();
786 std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
789 if (c_status.code == XrdCl::errInvalidResponse)
791 edm::LogWarning(
"XrdAdaptorInternal") <<
"Invalid response when reading from " << source_ptr->PrettyID();
793 ex <<
"XrdAdaptor::RequestManager::requestFailure readv(name='" <<
m_name 794 <<
"', flags=0x" << std::hex <<
m_flags 796 <<
", old source=" << source_ptr->PrettyID()
797 <<
") => Invalid ReadV response from server";
798 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
802 edm::LogWarning(
"XrdAdaptorInternal") <<
"Request failure when reading from " << source_ptr->PrettyID();
824 std::shared_ptr<Source> new_source;
827 std::shared_future<std::shared_ptr<Source> > future =
m_open_handler->open();
839 ex <<
"XrdAdaptor::RequestManager::requestFailure Open(name='" <<
m_name 840 <<
"', flags=0x" << std::hex <<
m_flags 842 <<
", old source=" << source_ptr->PrettyID()
843 <<
") => timeout when waiting for file open";
844 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
852 new_source = future.get();
856 ex.
addContext(
"Handling XrdAdaptor::RequestManager::requestFailure()");
866 ex <<
"XrdAdaptor::RequestManager::requestFailure Open(name='" <<
m_name 867 <<
"', flags=0x" << std::hex <<
m_flags 869 <<
", old source=" << source_ptr->PrettyID()
870 <<
", new source=" << new_source->PrettyID() <<
") => Xrootd server returned an excluded source";
871 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
885 new_source->handle(c_ptr);
895 if (io.
size() > chunksize)
907 consumed = chunksize;
913 consumed = chunksize;
916 chunksize -= consumed;
919 void* newdata =
static_cast<char*
>(io.
data()) + consumed;
924 else if (io.
size() == 0)
930 output.push_back(io);
931 chunksize -= io.
size();
944 if (io.
size() > chunksize)
956 consumed = chunksize;
962 consumed = chunksize;
965 chunksize -= consumed;
968 void* newdata =
static_cast<char*
>(io.
data()) + consumed;
973 else if (io.
size() == 0)
979 output.push_back(io);
980 chunksize -= io.
size();
989 off_t last_offset = -1;
990 for (
const auto & it : req)
993 assert(it.offset() > last_offset);
994 last_offset = it.offset();
996 assert(it.offset() < 0x1ffffffffff);
998 assert(req.size() <= 1024);
1005 if (iolist.size() == 0)
return;
1006 std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
1007 req1.reserve(iolist.size()/2+1);
1008 req2.reserve(iolist.size()/2+1);
1012 float q1 =
static_cast<float>(activeSources[0]->getQuality())+5;
1013 float q2 =
static_cast<float>(activeSources[1]->getQuality())+5;
1016 chunk1 =
std::max(static_cast<IOSize>(static_cast<float>(
XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
1017 chunk2 =
std::max(static_cast<IOSize>(static_cast<float>(
XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
1020 for (
const auto & it : iolist) size_orig += it.size();
1022 while (tmp_iolist.size()-front > 0)
1031 ex <<
"XrdAdaptor::RequestManager::splitClientRequest(name='" <<
m_name 1032 <<
"', flags=0x" << std::hex <<
m_flags 1034 <<
") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
1035 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
1037 std::stringstream ss; ss <<
"Original request size " << iolist.size() <<
"(" << size_orig <<
" bytes)";
1039 std::stringstream ss2; ss2 <<
"Quality source 1 " << q1-5 <<
", quality source 2: " << q2-5;
1052 assert(size_orig == size1 + size2);
1054 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Original request size " << iolist.size() <<
" (" << size_orig <<
" bytes) split into requests size " << req1.size() <<
" (" << size1 <<
" bytes) and " << req2.size() <<
" (" << size2 <<
" bytes)" << std::endl;
1058 : m_manager(manager)
1078 std::shared_ptr<Source>
source;
1079 std::unique_ptr<XrdCl::XRootDStatus>
status(status_ptr);
1080 std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1083 std::shared_ptr<OpenHandler>
self =
m_self;
1095 std::unique_ptr<XrdCl::File> releaseFile;
1097 std::lock_guard<std::recursive_mutex> sentry(
m_mutex);
1115 ex <<
"XrdCl::File::Open(name='" << manager->m_name
1116 <<
"', flags=0x" << std::hex << manager->m_flags
1117 <<
", permissions=0" << std::oct << manager->m_perms <<
std::dec 1118 <<
") => error '" << status->ToStr()
1119 <<
"' (errno=" << status->errNo <<
", code=" << status->code <<
")";
1120 ex.
addContext(
"In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1121 manager->addConnections(ex);
1123 m_promise.set_exception(std::make_exception_ptr(ex));
1126 manager->handleOpen(*status, source);
1132 std::lock_guard<std::recursive_mutex> sentry(
m_mutex);
1136 return "(no open in progress)";
1139 m_file->GetProperty(
"DataServer", dataServer);
1140 if (!dataServer.size()) {
return "(unknown source)"; }
1144 std::shared_future<std::shared_ptr<Source> >
1151 ex <<
"XrdCl::File::Open() =>" 1152 <<
" error: OpenHandler called within an invalid RequestManager context." 1153 <<
" This is a logic error and should be reported to the CMSSW developers.";
1154 ex.
addContext(
"Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1162 ex <<
"XrdCl::File::Open() => error: " 1163 <<
"OpenHandler called after it was deleted. This is a logic error " 1164 <<
"and should be reported to the CMSSW developers.";
1165 ex.
addContext(
"Calling XrdAdapter::RequestManager::OpenHandler::open()");
1182 std::lock_guard<std::recursive_mutex> sentry(
m_mutex);
1183 std::promise<std::shared_ptr<Source> > new_promise;
1189 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Trying to open URL: " << new_name;
1191 m_file->SetProperty(
"ReadRecovery",
"false");
1197 XrdCl::XRootDStatus
status;
1201 ex <<
"XrdCl::File::Open(name='" << new_name
1202 <<
"', flags=0x" << std::hex << manager.
m_flags 1204 <<
") => error '" << status.ToStr()
1205 <<
"' (errno=" << status.errNo <<
", code=" << status.code <<
")";
1206 ex.
addContext(
"Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1210 exit_guard.release();
void setCurrentServer(const std::string &servername)
std::shared_future< std::shared_ptr< Source > > open()
std::shared_ptr< XrdCl::File > getActiveFile() const
void getDisabledSourceNames(std::vector< std::string > &sources) const
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
std::string prepareOpaqueString() const
std::vector< Variable::Flags > flags
std::atomic< bool > m_outstanding_open
OpenHandler(std::weak_ptr< RequestManager > manager)
std::shared_ptr< OpenHandler > m_self
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
std::weak_ptr< OpenHandler > m_self_weak
SendMonitoringInfoHandler nullHandler
static std::string const input
#define XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static void SendMonitoringInfo(XrdCl::File &file)
void set_data(void *new_buffer)
void set_size(IOSize new_size)
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
std::shared_future< std::shared_ptr< Source > > m_shared_future
void queueUpdateCurrentServer(const std::string &)
std::weak_ptr< RequestManager > m_manager
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void set_offset(IOOffset new_offset)
static bool getDomain(const std::string &host, std::string &domain)
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
std::atomic< std::string * > m_serverToAdvertise
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
void addAdditionalInfo(std::string const &info)
timespec m_nextActiveSourceCheck
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::string current_source()
std::shared_ptr< OpenHandler > m_open_handler
IOOffset offset(void) const
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void getActiveSourceNames(std::vector< std::string > &sources) const
XrdCl::OpenFlags::Flags m_flags
virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
std::recursive_mutex m_mutex
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::atomic< unsigned > m_excluded_active_count
static const char * getJobID()
std::shared_ptr< Source > pickSingleSource()
static bool getXrootdSiteFromURL(std::string url, std::string &site)
static IOSize validateList(const std::vector< IOPosBuffer > req)
void updateCurrentServer()
std::unique_ptr< XrdCl::File > m_file
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
static bool getHostname(const std::string &id, std::string &hostname)
static std::string const source
void clearAdditionalInfo()
std::recursive_mutex m_source_mutex
bool m_nextInitialSourceToggle
timespec m_lastSourceCheck
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)