7 #include "XrdCl/XrdClFile.hh"
8 #include "XrdCl/XrdClDefaultEnv.hh"
9 #include "XrdCl/XrdClFileSystem.hh"
19 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
21 #define XRD_CL_MAX_CHUNK 512 * 1024
23 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5
25 #ifdef XRD_FAKE_OPEN_PROBE
26 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100
27 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20
29 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0
31 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10
32 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2 * 60
33 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
36 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000
39 #include <mach/clock.h>
40 #include <mach/mach.h>
41 #define GET_CLOCK_MONOTONIC(ts) \
43 clock_serv_t cclock; \
44 mach_timespec_t mts; \
45 host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
46 clock_get_time(cclock, &mts); \
47 mach_port_deallocate(mach_task_self(), cclock); \
48 ts.tv_sec = mts.tv_sec; \
49 ts.tv_nsec = mts.tv_nsec; \
52 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
55 using namespace XrdAdaptor;
58 long long diff = (a.tv_sec - b.tv_sec) * 1000;
59 diff += (a.tv_nsec - b.tv_nsec) / 1e6;
73 XrdCl::Buffer *buffer =
nullptr;
74 response->Get(buffer);
75 response->Set(static_cast<int*>(
nullptr));
97 file.GetProperty(
"LastURL", lastUrl);
98 if (jobId && lastUrl.size())
101 XrdCl::FileSystem fs(url);
102 if (!(fs.SendInfo(jobId, &
nullHandler, 30).IsOK()))
104 edm::LogWarning(
"XrdAdaptorInternal") <<
"Failed to send the monitoring information, monitoring ID is " << jobId <<
".";
106 edm::LogInfo(
"XrdAdaptorInternal") <<
"Set monitoring ID to " << jobId <<
".";
111 : m_serverToAdvertise(
nullptr),
112 m_timeout(XRD_DEFAULT_TIMEOUT),
113 m_nextInitialSourceToggle(
false),
117 m_distribution(0, 100),
118 m_excluded_active_count(0) {}
123 XrdCl::Env *
env = XrdCl::DefaultEnv::GetEnv();
125 env->GetInt(
"StreamErrorWindow",
m_timeout);
136 std::unique_ptr<XrdCl::File>
file;
138 bool validFile =
false;
139 const int retries = 5;
141 for (
int idx = 0;
idx < retries;
idx++) {
145 SyncHostResponseHandler handler;
146 XrdCl::XRootDStatus openStatus = file->Open(new_filename,
m_flags,
m_perms, &handler);
155 ex <<
"XrdCl::File::Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags <<
", permissions=0"
156 << std::oct <<
m_perms <<
std::dec <<
") => error '" << openStatus.ToStr() <<
"' (errno=" << openStatus.errNo
157 <<
", code=" << openStatus.code <<
")";
159 ex.
addAdditionalInfo(
"Remote server already encountered a fatal error; no redirections were performed.");
162 handler.WaitForResponse();
163 std::unique_ptr<XrdCl::XRootDStatus>
status = handler.GetStatus();
164 std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
167 if (status->IsOK()) {
174 ex <<
"XrdCl::File::Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags <<
", permissions=0"
175 << std::oct <<
m_perms <<
std::dec <<
") => error '" << status->ToStr() <<
"' (errno=" << status->errNo
176 <<
", code=" << status->code <<
")";
180 file->GetProperty(
"DataServer", dataServer);
181 file->GetProperty(
"LastURL", lastUrl);
182 if (dataServer.size())
189 edm::LogWarning(
"XrdAdaptorInternal") <<
"Failed to open file at URL " << lastUrl <<
".";
193 ex <<
". No additional data servers were found.";
196 if (dataServer.size())
202 if (lastUrl == new_filename) {
203 edm::LogWarning(
"XrdAdaptorInternal") << lastUrl <<
", " << new_filename;
247 std::unique_ptr<std::string> hostname(hostname_ptr);
256 std::unique_ptr<std::string> hostname(
new std::string(
id));
266 std::string formatSites(std::vector<std::shared_ptr<Source>>
const &iSources) {
268 if (iSources.size()) {siteA = iSources[0]->Site();}
269 if (iSources.size() == 2) {siteB = iSources[1]->Site();}
271 if (siteB.size() && (siteB != siteA)) {siteList = siteA +
", " + siteB;}
278 std::vector<std::shared_ptr<Source> >
const& iNew,
281 auto siteList = formatSites(iNew);
282 if (orig_site.size() && (orig_site != siteList))
284 edm::LogWarning(
"XrdAdaptor") <<
"Data is served from " << siteList <<
" instead of original site " << orig_site;
286 auto oldSites = formatSites(iOld);
287 if (!orig_site.size() && (siteList != oldSites))
289 if (oldSites.size() >0 )
290 edm::LogWarning(
"XrdAdaptor") <<
"Data is now served from " << siteList <<
" instead of previous " << oldSites;
298 std::vector<std::shared_ptr<Source>> &inactiveSources) {
317 std::vector<std::shared_ptr<Source>> &inactiveSources)
const {
322 bool findNewSource =
false;
327 <<
"Removing " <<
activeSources[
a]->PrettyID() <<
" from active sources due to poor quality ("
330 findNewSource =
true;
338 return findNewSource;
344 std::vector<std::shared_ptr<Source>> &inactiveSources) {
345 bool findNewSource =
false;
347 findNewSource =
true;
350 <<
", source 1 quality " <<
activeSources[1]->getQuality() << std::endl;
356 std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
357 eligibleInactiveSources.reserve(inactiveSources.size());
358 for (
const auto &
source : inactiveSources) {
360 eligibleInactiveSources.push_back(
source);
363 auto bestInactiveSource =
364 std::min_element(eligibleInactiveSources.begin(),
365 eligibleInactiveSources.end(),
366 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
367 return s1->getQuality() <
s2->getQuality();
369 auto worstActiveSource = std::max_element(
activeSources.cbegin(),
371 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
372 return s1->getQuality() <
s2->getQuality();
374 if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
375 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Best inactive source: " << (*bestInactiveSource)->PrettyID()
376 <<
", quality " << (*bestInactiveSource)->getQuality();
378 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Worst active source: " << (*worstActiveSource)->PrettyID()
379 <<
", quality " << (*worstActiveSource)->getQuality();
382 if ((bestInactiveSource != eligibleInactiveSources.end()) &&
activeSources.size() == 1 &&
383 ((*bestInactiveSource)->getQuality() < 4 *
activeSources[0]->getQuality())) {
387 for (
auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
388 if (it->get() == bestInactiveSource->get()) {
389 inactiveSources.erase(it);
393 while ((bestInactiveSource != eligibleInactiveSources.end()) &&
394 (*worstActiveSource)->getQuality() >
397 <<
"Removing " << (*worstActiveSource)->PrettyID() <<
" from active sources due to quality ("
398 << (*worstActiveSource)->getQuality() <<
") and promoting " << (*bestInactiveSource)->PrettyID()
399 <<
" (quality: " << (*bestInactiveSource)->getQuality() <<
")" << std::endl;
400 (*worstActiveSource)->setLastDowngrade(now);
401 for (
auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
402 if (it->get() == bestInactiveSource->get()) {
403 inactiveSources.erase(it);
406 inactiveSources.emplace_back(
std::move(*worstActiveSource));
411 eligibleInactiveSources.clear();
412 for (
const auto &
source : inactiveSources)
414 eligibleInactiveSources.push_back(
source);
415 bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
416 eligibleInactiveSources.end(),
417 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
418 return s1->getQuality() <
s2->getQuality();
422 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
423 return s1->getQuality() <
s2->getQuality();
429 findNewSource =
true;
451 ex <<
"XrdAdaptor::RequestManager::getActiveFile(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
452 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
") => Source used after fatal exception.";
453 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
464 sources.push_back(
source->ID());
472 sources.push_back(
source->PrettyID());
479 sources.push_back(
source);
484 std::vector<std::string> sources;
486 for (
auto const &
source : sources) {
491 for (
auto const &
source : sources) {
497 std::shared_ptr<Source>
source =
nullptr;
510 ex <<
"XrdAdaptor::RequestManager::handle read(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
511 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
") => Source used after fatal exception.";
512 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
527 std::vector<std::shared_ptr<Source>>
activeSources, inactiveSources;
535 std::shared_ptr<void *> guard(
nullptr, [
this, &activeSources, &inactiveSources](
void *) {
545 source->handle(c_ptr);
546 return c_ptr->get_future();
550 std::stringstream
ss;
558 ss << it->ExcludeID().substr(0, it->ExcludeID().find(
":")) <<
",";
562 ss << it->ExcludeID().substr(0, it->ExcludeID().find(
":")) <<
",";
567 ss << it.substr(0, it.find(
":")) <<
",";
571 return tmp_str.substr(0, tmp_str.size() - 1);
577 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
579 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Successfully opened new source: " << source->PrettyID() << std::endl;
580 for (
const auto &
s : m_activeSources) {
581 if (source->ID() ==
s->ID()) {
583 <<
"Xrootd server returned excluded source " << source->PrettyID() <<
"; ignoring" << std::endl;
584 unsigned returned_count = ++m_excluded_active_count;
586 if (returned_count >= 3) {
592 for (
const auto &
s : m_inactiveSources) {
593 if (source->ID() ==
s->ID()) {
595 <<
"Xrootd server returned excluded inactive source " << source->PrettyID() <<
"; ignoring" << std::endl;
600 if (m_activeSources.size() < 2) {
601 auto oldSources = m_activeSources;
602 m_activeSources.push_back(source);
603 reportSiteChange(oldSources, m_activeSources);
604 queueUpdateCurrentServer(source->ID());
606 m_inactiveSources.push_back(source);
609 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Got failure when trying to open a new source" << std::endl;
618 std::vector<std::shared_ptr<Source>>
activeSources, inactiveSources;
620 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
621 activeSources = m_activeSources;
622 inactiveSources = m_inactiveSources;
625 std::shared_ptr<void *> guard(
nullptr, [
this, &activeSources, &inactiveSources](
void *) {
626 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
627 m_activeSources =
std::move(activeSources);
628 m_inactiveSources =
std::move(inactiveSources);
631 updateCurrentServer();
639 if (activeSources.size() == 1) {
641 checkSources(now, c_ptr->getSize(),
activeSources, inactiveSources);
642 activeSources[0]->handle(c_ptr);
643 return c_ptr->get_future();
646 else if (activeSources.empty()) {
648 ex <<
"XrdAdaptor::RequestManager::handle readv(name='" << m_name <<
"', flags=0x" << std::hex << m_flags
649 <<
", permissions=0" << std::oct << m_perms <<
std::dec <<
") => Source used after fatal exception.";
650 ex.addContext(
"In XrdAdaptor::RequestManager::handle()");
656 std::shared_ptr<std::vector<IOPosBuffer> > req1(
new std::vector<IOPosBuffer>);
657 std::shared_ptr<std::vector<IOPosBuffer> > req2(
new std::vector<IOPosBuffer>);
658 splitClientRequest(*iolist, *req1, *req2, activeSources);
660 checkSources(now, req1->size() + req2->size(),
activeSources, inactiveSources);
662 if (activeSources.size() == 1) {
664 activeSources[0]->handle(c_ptr);
665 return c_ptr->get_future();
668 std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
669 std::future<IOSize> future1, future2;
673 activeSources[0]->handle(c_ptr1);
674 future1 = c_ptr1->get_future();
679 activeSources[1]->handle(c_ptr2);
680 future2 = c_ptr2->get_future();
682 if (req1->size() && req2->size()) {
683 std::future<IOSize> task =
684 std::async(std::launch::deferred,
685 [](std::future<IOSize>
a, std::future<IOSize>
b) {
699 return b.get() + a.get();
707 else if (req1->size()) {
return future1; }
708 else if (req2->size()) {
return future2; }
711 std::promise<IOSize>
p; p.set_value(0);
712 return p.get_future();
717 std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
720 if (c_status.code == XrdCl::errInvalidResponse) {
721 edm::LogWarning(
"XrdAdaptorInternal") <<
"Invalid response when reading from " << source_ptr->PrettyID();
723 ex <<
"XrdAdaptor::RequestManager::requestFailure readv(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
724 <<
", permissions=0" << std::oct <<
m_perms << std::dec <<
", old source=" << source_ptr->PrettyID()
725 <<
") => Invalid ReadV response from server";
726 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
730 edm::LogWarning(
"XrdAdaptorInternal") <<
"Request failure when reading from " << source_ptr->PrettyID();
752 std::shared_ptr<Source> new_source;
754 std::shared_future<std::shared_ptr<Source>> future =
m_open_handler->open();
765 ex <<
"XrdAdaptor::RequestManager::requestFailure Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
766 <<
", permissions=0" << std::oct <<
m_perms << std::dec <<
", old source=" << source_ptr->PrettyID()
767 <<
") => timeout when waiting for file open";
768 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
773 new_source = future.get();
775 ex.
addContext(
"Handling XrdAdaptor::RequestManager::requestFailure()");
785 ex <<
"XrdAdaptor::RequestManager::requestFailure Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags
786 <<
", permissions=0" << std::oct <<
m_perms << std::dec <<
", old source=" << source_ptr->PrettyID()
787 <<
", new source=" << new_source->PrettyID() <<
") => Xrootd server returned an excluded source";
788 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
800 new_source->handle(c_ptr);
804 std::vector<IOPosBuffer> &
input,
805 std::vector<IOPosBuffer> &
output,
810 if (io.
size() > chunksize) {
818 consumed = chunksize;
822 consumed = chunksize;
825 chunksize -= consumed;
828 void *newdata =
static_cast<char *
>(io.
data()) + consumed;
832 }
else if (io.
size() == 0) {
835 output.push_back(io);
836 chunksize -= io.
size();
843 std::vector<IOPosBuffer> &
input,
844 std::vector<IOPosBuffer> &
output,
849 if (io.
size() > chunksize) {
857 consumed = chunksize;
861 consumed = chunksize;
864 chunksize -= consumed;
867 void *newdata =
static_cast<char *
>(io.
data()) + consumed;
871 }
else if (io.
size() == 0) {
874 output.push_back(io);
875 chunksize -= io.
size();
883 off_t last_offset = -1;
884 for (
const auto &it : req) {
886 assert(it.offset() > last_offset);
887 last_offset = it.offset();
889 assert(it.offset() < 0x1ffffffffff);
891 assert(req.size() <= 1024);
898 if (iolist.size() == 0)
return;
899 std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
900 req1.reserve(iolist.size() / 2 + 1);
901 req2.reserve(iolist.size() / 2 + 1);
905 float q1 =
static_cast<float>(activeSources[0]->getQuality()) + 5;
906 float q2 =
static_cast<float>(activeSources[1]->getQuality()) + 5;
910 static_cast<IOSize>(1024));
912 static_cast<IOSize>(1024));
915 for (
const auto &it : iolist)
916 size_orig += it.size();
918 while (tmp_iolist.size() - front > 0) {
927 ex <<
"XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name <<
"', flags=0x" << std::hex << m_flags
928 <<
", permissions=0" << std::oct << m_perms << std::dec
929 <<
") => Unable to split request between active servers. This is an unexpected internal error and should be "
930 "reported to CMSSW developers.";
931 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
933 std::stringstream
ss;
934 ss <<
"Original request size " << iolist.size() <<
"(" << size_orig <<
" bytes)";
936 std::stringstream ss2;
937 ss2 <<
"Quality source 1 " << q1 - 5 <<
", quality source 2: " << q2 - 5;
949 return left.
offset() < right.offset();
952 return left.
offset() < right.offset();
958 assert(size_orig == size1 + size2);
960 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Original request size " << iolist.size() <<
" (" << size_orig
961 <<
" bytes) split into requests size " << req1.size() <<
" (" << size1
962 <<
" bytes) and " << req2.size() <<
" (" << size2 <<
" bytes)" << std::endl;
973 XrdCl::HostList *hostList_ptr) {
976 std::unique_ptr<char, std::function<void(char*)>> outstanding_guard(
nullptr, [&](
char*){m_outstanding_open=
false;});
978 std::shared_ptr<Source>
source;
979 std::unique_ptr<XrdCl::XRootDStatus>
status(status_ptr);
980 std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
983 std::shared_ptr<OpenHandler>
self = m_self;
986 auto manager = m_manager.lock();
994 std::unique_ptr<XrdCl::File> releaseFile;
996 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
998 if (status->IsOK()) {
1007 m_promise.set_value(source);
1011 ex <<
"XrdCl::File::Open(name='" << manager->m_name <<
"', flags=0x" << std::hex << manager->m_flags
1012 <<
", permissions=0" << std::oct << manager->m_perms << std::dec <<
") => error '" << status->ToStr()
1013 <<
"' (errno=" << status->errNo <<
", code=" << status->code <<
")";
1014 ex.
addContext(
"In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1015 manager->addConnections(ex);
1017 m_promise.set_exception(std::make_exception_ptr(ex));
1020 manager->handleOpen(*status, source);
1024 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1026 if (!m_file.get()) {
1027 return "(no open in progress)";
1030 m_file->GetProperty(
"DataServer", dataServer);
1031 if (!dataServer.size()) {
return "(unknown source)"; }
1036 auto manager_ptr = m_manager.lock();
1039 ex <<
"XrdCl::File::Open() =>"
1040 <<
" error: OpenHandler called within an invalid RequestManager context."
1041 <<
" This is a logic error and should be reported to the CMSSW developers.";
1042 ex.
addContext(
"Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1046 auto self_ptr = m_self_weak.lock();
1049 ex <<
"XrdCl::File::Open() => error: "
1050 <<
"OpenHandler called after it was deleted. This is a logic error "
1051 <<
"and should be reported to the CMSSW developers.";
1052 ex.
addContext(
"Calling XrdAdapter::RequestManager::OpenHandler::open()");
1065 if (m_outstanding_open) {
1066 return m_shared_future;
1068 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1069 std::promise<std::shared_ptr<Source>> new_promise;
1070 m_promise.swap(new_promise);
1071 m_shared_future = m_promise.get_future().share();
1075 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Trying to open URL: " << new_name;
1077 m_outstanding_open =
true;
1080 std::unique_ptr<char, std::function<void(char*)>> exit_guard(
nullptr, [&](
char*){m_outstanding_open =
false; m_file.reset();});
1082 XrdCl::XRootDStatus
status;
1083 if (!(status = m_file->Open(new_name, manager.
m_flags, manager.
m_perms,
this)).IsOK()) {
1085 ex <<
"XrdCl::File::Open(name='" << new_name <<
"', flags=0x" << std::hex << manager.
m_flags <<
", permissions=0"
1086 << std::oct << manager.
m_perms << std::dec <<
") => error '" << status.ToStr() <<
"' (errno=" << status.errNo
1087 <<
", code=" << status.code <<
")";
1088 ex.
addContext(
"Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1092 exit_guard.release();
1095 return m_shared_future;
std::shared_future< std::shared_ptr< Source > > open()
std::shared_ptr< XrdCl::File > getActiveFile() const
void getDisabledSourceNames(std::vector< std::string > &sources) const
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
std::string prepareOpaqueString() const
std::vector< Variable::Flags > flags
OpenHandler(std::weak_ptr< RequestManager > manager)
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
SendMonitoringInfoHandler nullHandler
static std::string const input
#define XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static void SendMonitoringInfo(XrdCl::File &file)
void set_data(void *new_buffer)
void set_size(IOSize new_size)
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
void queueUpdateCurrentServer(const std::string &)
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void set_offset(IOOffset new_offset)
static bool getDomain(const std::string &host, std::string &domain)
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
std::atomic< std::string * > m_serverToAdvertise
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
XrdSiteStatisticsInformation * statsService
void addAdditionalInfo(std::string const &info)
timespec m_nextActiveSourceCheck
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::string current_source()
std::shared_ptr< OpenHandler > m_open_handler
IOOffset offset(void) const
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void getActiveSourceNames(std::vector< std::string > &sources) const
XrdCl::OpenFlags::Flags m_flags
virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
tuple idx
DEBUGGING if hasattr(process,"trackMonIterativeTracking2012"): print "trackMonIterativeTracking2012 D...
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
static const char * getJobID()
std::shared_ptr< Source > pickSingleSource()
static bool getXrootdSiteFromURL(std::string url, std::string &site)
static IOSize validateList(const std::vector< IOPosBuffer > req)
void updateCurrentServer()
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
volatile std::atomic< bool > shutdown_flag false
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
static bool getHostname(const std::string &id, std::string &hostname)
static std::string const source
void clearAdditionalInfo()
std::recursive_mutex m_source_mutex
bool m_nextInitialSourceToggle
timespec m_lastSourceCheck
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)