7 #include "XrdCl/XrdClFile.hh" 8 #include "XrdCl/XrdClDefaultEnv.hh" 9 #include "XrdCl/XrdClFileSystem.hh" 21 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh" 23 #define XRD_CL_MAX_CHUNK 512 * 1024 25 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5 27 #ifdef XRD_FAKE_OPEN_PROBE 28 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100 29 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20 31 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0 33 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10 34 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2 * 60 35 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100 38 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000 41 #include <mach/clock.h> 42 #include <mach/mach.h> 43 #define GET_CLOCK_MONOTONIC(ts) \ 45 clock_serv_t cclock; \ 46 mach_timespec_t mts; \ 47 host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \ 48 clock_get_time(cclock, &mts); \ 49 mach_port_deallocate(mach_task_self(), cclock); \ 50 ts.tv_sec = mts.tv_sec; \ 51 ts.tv_nsec = mts.tv_nsec; \ 54 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts); 60 long long diff = (a.tv_sec - b.tv_sec) * 1000;
61 diff += (a.tv_nsec - b.tv_nsec) / 1e6;
72 XrdCl::Buffer *
buffer =
nullptr;
73 response->Get(buffer);
74 response->Set(static_cast<int *>(
nullptr));
92 XrdCl::FileSystem&
fs() {
return m_fs; }
106 file.GetProperty(
"LastURL", lastUrl);
107 if (jobId && !lastUrl.empty()) {
109 if (!(sm_handler->fs().SendInfo(jobId, sm_handler, 30).IsOK())) {
111 <<
"Failed to send the monitoring information, monitoring ID is " << jobId <<
".";
114 edm::LogInfo(
"XrdAdaptorInternal") <<
"Set monitoring ID to " << jobId <<
".";
119 : m_serverToAdvertise(
nullptr),
120 m_timeout(XRD_DEFAULT_TIMEOUT),
121 m_nextInitialSourceToggle(
false),
125 m_distribution(0, 100),
126 m_excluded_active_count(0) {}
131 XrdCl::Env *
env = XrdCl::DefaultEnv::GetEnv();
133 env->GetInt(
"StreamErrorWindow",
m_timeout);
144 std::unique_ptr<XrdCl::File>
file;
146 bool validFile =
false;
147 const int retries = 5;
149 for (
int idx = 0;
idx < retries;
idx++) {
153 m_name + (!opaque.empty() ? ((
m_name.find(
"?") ==
m_name.npos) ?
"?" :
"&") + opaque :
"");
154 SyncHostResponseHandler handler;
155 XrdCl::XRootDStatus openStatus = file->Open(new_filename,
m_flags,
m_perms, &handler);
164 ex <<
"XrdCl::File::Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags <<
", permissions=0" 165 << std::oct <<
m_perms <<
std::dec <<
") => error '" << openStatus.ToStr() <<
"' (errno=" << openStatus.errNo
166 <<
", code=" << openStatus.code <<
")";
168 ex.
addAdditionalInfo(
"Remote server already encountered a fatal error; no redirections were performed.");
171 handler.WaitForResponse();
172 std::unique_ptr<XrdCl::XRootDStatus>
status = handler.GetStatus();
173 std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
176 if (status->IsOK()) {
183 ex <<
"XrdCl::File::Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags <<
", permissions=0" 184 << std::oct <<
m_perms <<
std::dec <<
") => error '" << status->ToStr() <<
"' (errno=" << status->errNo
185 <<
", code=" << status->code <<
")";
189 file->GetProperty(
"DataServer", dataServer);
190 file->GetProperty(
"LastURL", lastUrl);
191 if (!dataServer.empty()) {
194 if (!lastUrl.empty()) {
196 edm::LogWarning(
"XrdAdaptorInternal") <<
"Failed to open file at URL " << lastUrl <<
".";
200 ex <<
". No additional data servers were found.";
203 if (!dataServer.empty()) {
208 if (lastUrl == new_filename) {
209 edm::LogWarning(
"XrdAdaptorInternal") << lastUrl <<
", " << new_filename;
222 auto source = std::make_shared<Source>(ts,
std::move(file), excludeString);
253 std::unique_ptr<std::string> hostname(hostname_ptr);
262 auto hostname = std::make_unique<std::string>(
id);
272 std::string formatSites(std::vector<std::shared_ptr<Source>>
const &iSources) {
274 if (!iSources.empty()) {
275 siteA = iSources[0]->Site();
277 if (iSources.size() == 2) {
278 siteB = iSources[1]->Site();
281 if (!siteB.empty() && (siteB != siteA)) {
282 siteList = siteA +
", " + siteB;
289 std::vector<std::shared_ptr<Source>>
const &iNew,
291 auto siteList = formatSites(iNew);
292 if (!orig_site.empty() && (orig_site != siteList)) {
293 edm::LogWarning(
"XrdAdaptor") <<
"Data is served from " << siteList <<
" instead of original site " << orig_site;
295 auto oldSites = formatSites(iOld);
296 if (orig_site.empty() && (siteList != oldSites)) {
297 if (!oldSites.empty())
298 edm::LogWarning(
"XrdAdaptor") <<
"Data is now served from " << siteList <<
" instead of previous " << oldSites;
305 std::vector<std::shared_ptr<Source>> &activeSources,
306 std::vector<std::shared_ptr<Source>> &inactiveSources) {
324 std::vector<std::shared_ptr<Source>> &activeSources,
325 std::vector<std::shared_ptr<Source>> &inactiveSources)
const {
326 if (activeSources.size() <
std::max(a, b) + 1) {
330 bool findNewSource =
false;
331 if ((activeSources[a]->getQuality() > 5130) ||
332 ((activeSources[
a]->getQuality() > 260) &&
333 (activeSources[b]->getQuality() * 4 < activeSources[
a]->getQuality()))) {
335 <<
"Removing " << activeSources[
a]->PrettyID() <<
" from active sources due to poor quality (" 336 << activeSources[
a]->getQuality() <<
" vs " << activeSources[
b]->getQuality() <<
")" << std::endl;
337 if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
338 findNewSource =
true;
340 activeSources[
a]->setLastDowngrade(now);
341 inactiveSources.emplace_back(activeSources[a]);
342 auto oldSources = activeSources;
343 activeSources.erase(activeSources.begin() +
a);
346 return findNewSource;
351 std::vector<std::shared_ptr<Source>> &activeSources,
352 std::vector<std::shared_ptr<Source>> &inactiveSources) {
353 bool findNewSource =
false;
354 if (activeSources.size() <= 1) {
355 findNewSource =
true;
356 }
else if (activeSources.size() > 1) {
357 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Source 0 quality " << activeSources[0]->getQuality()
358 <<
", source 1 quality " << activeSources[1]->getQuality() << std::endl;
359 findNewSource |=
compareSources(now, 0, 1, activeSources, inactiveSources);
360 findNewSource |=
compareSources(now, 1, 0, activeSources, inactiveSources);
364 std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
365 eligibleInactiveSources.reserve(inactiveSources.size());
366 for (
const auto &
source : inactiveSources) {
368 eligibleInactiveSources.push_back(
source);
371 auto bestInactiveSource =
372 std::min_element(eligibleInactiveSources.begin(),
373 eligibleInactiveSources.end(),
374 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
375 return s1->getQuality() <
s2->getQuality();
377 auto worstActiveSource = std::max_element(activeSources.cbegin(),
378 activeSources.cend(),
379 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
380 return s1->getQuality() <
s2->getQuality();
382 if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
383 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Best inactive source: " << (*bestInactiveSource)->PrettyID()
384 <<
", quality " << (*bestInactiveSource)->getQuality();
386 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Worst active source: " << (*worstActiveSource)->PrettyID()
387 <<
", quality " << (*worstActiveSource)->getQuality();
390 if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
391 ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
392 auto oldSources = activeSources;
393 activeSources.push_back(*bestInactiveSource);
395 for (
auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
396 if (it->get() == bestInactiveSource->get()) {
397 inactiveSources.erase(it);
401 while ((bestInactiveSource != eligibleInactiveSources.end()) &&
402 (*worstActiveSource)->getQuality() >
405 <<
"Removing " << (*worstActiveSource)->PrettyID() <<
" from active sources due to quality (" 406 << (*worstActiveSource)->getQuality() <<
") and promoting " << (*bestInactiveSource)->PrettyID()
407 <<
" (quality: " << (*bestInactiveSource)->getQuality() <<
")" << std::endl;
408 (*worstActiveSource)->setLastDowngrade(now);
409 for (
auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
410 if (it->get() == bestInactiveSource->get()) {
411 inactiveSources.erase(it);
414 inactiveSources.emplace_back(
std::move(*worstActiveSource));
415 auto oldSources = activeSources;
416 activeSources.erase(worstActiveSource);
417 activeSources.emplace_back(
std::move(*bestInactiveSource));
419 eligibleInactiveSources.clear();
420 for (
const auto &
source : inactiveSources)
422 eligibleInactiveSources.push_back(
source);
423 bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
424 eligibleInactiveSources.end(),
425 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
426 return s1->getQuality() <
s2->getQuality();
428 worstActiveSource = std::max_element(activeSources.begin(),
430 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
431 return s1->getQuality() <
s2->getQuality();
437 findNewSource =
true;
447 if (activeSources.size() == 2) {
459 ex <<
"XrdAdaptor::RequestManager::getActiveFile(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags 460 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
") => Source used after fatal exception.";
461 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
472 sources.push_back(
source->ID());
480 sources.push_back(
source->PrettyID());
487 sources.push_back(
source);
492 std::vector<std::string> sources;
494 for (
auto const &
source : sources) {
499 for (
auto const &
source : sources) {
505 std::shared_ptr<Source>
source =
nullptr;
518 ex <<
"XrdAdaptor::RequestManager::handle read(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags 519 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
") => Source used after fatal exception.";
520 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
535 std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
543 std::shared_ptr<void *> guard(
nullptr, [
this, &activeSources, &inactiveSources](
void *) {
549 checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
553 source->handle(c_ptr);
554 return c_ptr->get_future();
559 std::stringstream ss;
561 bool has_active =
false;
563 void append_tried(
const std::string &
id,
bool active =
false) {
564 ss << (count ?
"," :
"tried=") <<
id;
575 state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(
':')),
true);
578 state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(
':')));
582 state.append_tried(it.substr(0, it.find(
':')));
584 if (state.has_active) {
585 state.ss <<
"&triedrc=resel";
588 return state.ss.str();
594 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Successfully opened new source: " << source->PrettyID() << std::endl;
596 if (source->ID() ==
s->ID()) {
598 <<
"Xrootd server returned excluded source " << source->PrettyID() <<
"; ignoring" << std::endl;
601 if (returned_count >= 3) {
608 if (source->ID() ==
s->ID()) {
610 <<
"Xrootd server returned excluded inactive source " << source->PrettyID() <<
"; ignoring" << std::endl;
615 if (m_activeSources.size() < 2) {
617 m_activeSources.push_back(source);
621 m_inactiveSources.push_back(source);
624 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Got failure when trying to open a new source" << std::endl;
633 std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
640 std::shared_ptr<void *> guard(
nullptr, [
this, &activeSources, &inactiveSources](
void *) {
654 if (activeSources.size() == 1) {
655 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, iolist);
656 checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
657 activeSources[0]->handle(c_ptr);
658 return c_ptr->get_future();
661 else if (activeSources.empty()) {
663 ex <<
"XrdAdaptor::RequestManager::handle readv(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags 664 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
") => Source used after fatal exception.";
665 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
670 assert(iolist.get());
671 auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
672 auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
675 checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
677 if (activeSources.size() == 1) {
678 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, iolist);
679 activeSources[0]->handle(c_ptr);
680 return c_ptr->get_future();
683 std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
684 std::future<IOSize> future1, future2;
685 if (!req1->empty()) {
687 activeSources[0]->handle(c_ptr1);
688 future1 = c_ptr1->get_future();
690 if (!req2->empty()) {
692 activeSources[1]->handle(c_ptr2);
693 future2 = c_ptr2->get_future();
695 if (!req1->empty() && !req2->empty()) {
696 std::future<IOSize>
task =
697 std::async(std::launch::deferred,
698 [](std::future<IOSize>
a, std::future<IOSize>
b) {
712 return b.get() + a.get();
719 }
else if (!req1->empty()) {
721 }
else if (!req2->empty()) {
724 std::promise<IOSize>
p;
726 return p.get_future();
731 std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
734 if (c_status.code == XrdCl::errInvalidResponse) {
735 edm::LogWarning(
"XrdAdaptorInternal") <<
"Invalid response when reading from " << source_ptr->PrettyID();
737 ex <<
"XrdAdaptor::RequestManager::requestFailure readv(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags 738 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
", old source=" << source_ptr->PrettyID()
739 <<
") => Invalid ReadV response from server";
740 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
744 edm::LogWarning(
"XrdAdaptorInternal") <<
"Request failure when reading from " << source_ptr->PrettyID();
763 std::shared_ptr<Source> new_source;
765 std::shared_future<std::shared_ptr<Source>> future =
m_open_handler->open();
776 ex <<
"XrdAdaptor::RequestManager::requestFailure Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags 777 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
", old source=" << source_ptr->PrettyID()
778 <<
") => timeout when waiting for file open";
779 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
784 new_source = future.get();
786 ex.
addContext(
"Handling XrdAdaptor::RequestManager::requestFailure()");
796 ex <<
"XrdAdaptor::RequestManager::requestFailure Open(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags 797 <<
", permissions=0" << std::oct <<
m_perms <<
std::dec <<
", old source=" << source_ptr->PrettyID()
798 <<
", new source=" << new_source->PrettyID() <<
") => Xrootd server returned an excluded source";
799 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
811 new_source->handle(c_ptr);
815 std::vector<IOPosBuffer> &
input,
816 std::vector<IOPosBuffer> &
output,
821 if (io.
size() > chunksize) {
829 consumed = chunksize;
833 consumed = chunksize;
836 chunksize -= consumed;
839 void *newdata =
static_cast<char *
>(io.
data()) + consumed;
843 }
else if (io.
size() == 0) {
846 output.push_back(io);
847 chunksize -= io.
size();
854 std::vector<IOPosBuffer> &
input,
855 std::vector<IOPosBuffer> &
output,
860 if (io.
size() > chunksize) {
868 consumed = chunksize;
872 consumed = chunksize;
875 chunksize -= consumed;
878 void *newdata =
static_cast<char *
>(io.
data()) + consumed;
882 }
else if (io.
size() == 0) {
885 output.push_back(io);
886 chunksize -= io.
size();
894 off_t last_offset = -1;
895 for (
const auto &it : req) {
897 assert(it.offset() > last_offset);
898 last_offset = it.offset();
900 assert(it.offset() < 0x1ffffffffff);
902 assert(req.size() <= 1024);
907 std::vector<IOPosBuffer> &req1,
908 std::vector<IOPosBuffer> &req2,
909 std::vector<std::shared_ptr<Source>>
const &activeSources)
const {
912 std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
913 req1.reserve(iolist.size() / 2 + 1);
914 req2.reserve(iolist.size() / 2 + 1);
918 float q1 =
static_cast<float>(activeSources[0]->getQuality()) + 5;
919 float q2 =
static_cast<float>(activeSources[1]->getQuality()) + 5;
923 static_cast<IOSize>(1024));
925 static_cast<IOSize>(1024));
928 for (
const auto &it : iolist)
929 size_orig += it.size();
931 while (tmp_iolist.size() - front > 0) {
940 ex <<
"XrdAdaptor::RequestManager::splitClientRequest(name='" <<
m_name <<
"', flags=0x" << std::hex <<
m_flags 942 <<
") => Unable to split request between active servers. This is an unexpected internal error and should be " 943 "reported to CMSSW developers.";
944 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
946 std::stringstream ss;
947 ss <<
"Original request size " << iolist.size() <<
"(" << size_orig <<
" bytes)";
949 std::stringstream ss2;
950 ss2 <<
"Quality source 1 " << q1 - 5 <<
", quality source 2: " << q2 - 5;
962 return left.
offset() < right.offset();
965 return left.
offset() < right.offset();
971 assert(size_orig == size1 + size2);
973 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Original request size " << iolist.size() <<
" (" << size_orig
974 <<
" bytes) split into requests size " << req1.size() <<
" (" << size1
975 <<
" bytes) and " << req2.size() <<
" (" << size2 <<
" bytes)" << std::endl;
986 XrdCl::HostList *hostList_ptr) {
988 std::shared_ptr<OpenHandler>
self =
m_self;
994 std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> outstanding_guard(
997 std::shared_ptr<Source>
source;
998 std::unique_ptr<XrdCl::XRootDStatus>
status(status_ptr);
999 std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1009 std::unique_ptr<XrdCl::File> releaseFile;
1011 std::lock_guard<std::recursive_mutex> sentry(
m_mutex);
1013 if (status->IsOK()) {
1026 ex <<
"XrdCl::File::Open(name='" << manager->m_name <<
"', flags=0x" << std::hex << manager->m_flags
1027 <<
", permissions=0" << std::oct << manager->m_perms <<
std::dec <<
") => error '" << status->ToStr()
1028 <<
"' (errno=" << status->errNo <<
", code=" << status->code <<
")";
1029 ex.
addContext(
"In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1030 manager->addConnections(ex);
1039 m_promise.set_exception(std::make_exception_ptr(ex));
1042 manager->handleOpen(*status, source);
1046 std::lock_guard<std::recursive_mutex> sentry(
m_mutex);
1049 return "(no open in progress)";
1052 m_file->GetProperty(
"DataServer", dataServer);
1053 if (dataServer.empty()) {
1054 return "(unknown source)";
1063 ex <<
"XrdCl::File::Open() =>" 1064 <<
" error: OpenHandler called within an invalid RequestManager context." 1065 <<
" This is a logic error and should be reported to the CMSSW developers.";
1066 ex.
addContext(
"Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1073 ex <<
"XrdCl::File::Open() => error: " 1074 <<
"OpenHandler called after it was deleted. This is a logic error " 1075 <<
"and should be reported to the CMSSW developers.";
1076 ex.
addContext(
"Calling XrdAdapter::RequestManager::OpenHandler::open()");
1092 std::lock_guard<std::recursive_mutex> sentry(
m_mutex);
1093 std::promise<std::shared_ptr<Source>> new_promise;
1099 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Trying to open URL: " << new_name;
1104 std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> exit_guard(
this, [&](
OpenHandler *) {
1109 XrdCl::XRootDStatus
status;
1112 ex <<
"XrdCl::File::Open(name='" << new_name <<
"', flags=0x" << std::hex << manager.
m_flags <<
", permissions=0" 1113 << std::oct << manager.
m_perms <<
std::dec <<
") => error '" << status.ToStr() <<
"' (errno=" << status.errNo
1114 <<
", code=" << status.code <<
")";
1115 ex.
addContext(
"Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1119 exit_guard.release();
std::shared_future< std::shared_ptr< Source > > open()
std::shared_ptr< XrdCl::File > getActiveFile() const
void getDisabledSourceNames(std::vector< std::string > &sources) const
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
std::string prepareOpaqueString() const
std::vector< Variable::Flags > flags
std::atomic< bool > m_outstanding_open
OpenHandler(std::weak_ptr< RequestManager > manager)
std::shared_ptr< OpenHandler > m_self
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
std::weak_ptr< OpenHandler > m_self_weak
static std::string const input
#define XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static void SendMonitoringInfo(XrdCl::File &file)
void set_data(void *new_buffer)
void set_size(IOSize new_size)
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
std::shared_future< std::shared_ptr< Source > > m_shared_future
void queueUpdateCurrentServer(const std::string &)
std::weak_ptr< RequestManager > m_manager
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void set_offset(IOOffset new_offset)
static bool getDomain(const std::string &host, std::string &domain)
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
std::atomic< std::string * > m_serverToAdvertise
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
void addAdditionalInfo(std::string const &info)
timespec m_nextActiveSourceCheck
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::string current_source()
std::shared_ptr< OpenHandler > m_open_handler
IOOffset offset(void) const
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void getActiveSourceNames(std::vector< std::string > &sources) const
XrdCl::OpenFlags::Flags m_flags
std::recursive_mutex m_mutex
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::atomic< unsigned > m_excluded_active_count
static const char * getJobID()
std::shared_ptr< Source > pickSingleSource()
static bool getXrootdSiteFromURL(std::string url, std::string &site)
static IOSize validateList(const std::vector< IOPosBuffer > req)
void updateCurrentServer()
std::unique_ptr< XrdCl::File > m_file
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
SendMonitoringInfoHandler(const std::string &url)
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
static bool getHostname(const std::string &id, std::string &hostname)
static std::string const source
void clearAdditionalInfo()
std::recursive_mutex m_source_mutex
bool m_nextInitialSourceToggle
timespec m_lastSourceCheck
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)