7 #include "XrdCl/XrdClFile.hh"
8 #include "XrdCl/XrdClDefaultEnv.hh"
9 #include "XrdCl/XrdClFileSystem.hh"
19 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
21 #define XRD_CL_MAX_CHUNK 512*1024
23 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5
25 #ifdef XRD_FAKE_OPEN_PROBE
26 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100
27 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20
29 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0
31 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10
32 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2*60
33 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
36 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000
40 #include <mach/clock.h>
41 #include <mach/mach.h>
42 #define GET_CLOCK_MONOTONIC(ts) \
44 clock_serv_t cclock; \
45 mach_timespec_t mts; \
46 host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
47 clock_get_time(cclock, &mts); \
48 mach_port_deallocate(mach_task_self(), cclock); \
49 ts.tv_sec = mts.tv_sec; \
50 ts.tv_nsec = mts.tv_nsec; \
53 #define GET_CLOCK_MONOTONIC(ts) \
54 clock_gettime(CLOCK_MONOTONIC, &ts);
57 using namespace XrdAdaptor;
61 long long diff = (a.tv_sec - b.tv_sec) * 1000;
62 diff += (a.tv_nsec - b.tv_nsec) / 1e6;
76 XrdCl::Buffer *buffer =
nullptr;
77 response->Get(buffer);
97 file.GetProperty(
"LastURL", lastUrl);
98 if (jobId && lastUrl.size())
100 XrdCl::URL
url(lastUrl);
101 XrdCl::FileSystem fs(url);
103 edm::LogInfo(
"XrdAdaptorInternal") <<
"Set monitoring ID to " << jobId <<
".";
109 : m_serverToAdvertise(
nullptr),
110 m_timeout(XRD_DEFAULT_TIMEOUT),
111 m_nextInitialSourceToggle(
false),
115 m_distribution(0,100),
116 m_excluded_active_count(0)
126 XrdCl::Env *
env = XrdCl::DefaultEnv::GetEnv();
127 if (env) {env->GetInt(
"StreamErrorWindow",
m_timeout);}
139 std::unique_ptr<XrdCl::File>
file;
141 bool validFile =
false;
142 const int retries = 5;
149 SyncHostResponseHandler handler;
150 XrdCl::XRootDStatus openStatus = file->Open(new_filename,
m_flags,
m_perms, &handler);
151 if (!openStatus.IsOK())
159 ex <<
"XrdCl::File::Open(name='" <<
m_name
160 <<
"', flags=0x" << std::hex <<
m_flags
162 <<
") => error '" << openStatus.ToStr()
163 <<
"' (errno=" << openStatus.errNo <<
", code=" << openStatus.code <<
")";
165 ex.
addAdditionalInfo(
"Remote server already encountered a fatal error; no redirections were performed.");
168 handler.WaitForResponse();
169 std::unique_ptr<XrdCl::XRootDStatus>
status = handler.GetStatus();
170 std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
183 ex <<
"XrdCl::File::Open(name='" <<
m_name
184 <<
"', flags=0x" << std::hex <<
m_flags
186 <<
") => error '" << status->ToStr()
187 <<
"' (errno=" << status->errNo <<
", code=" << status->code <<
")";
191 file->GetProperty(
"DataServer", dataServer);
192 file->GetProperty(
"LastURL", lastUrl);
193 if (dataServer.size())
200 edm::LogWarning(
"XrdAdaptorInternal") <<
"Failed to open file at URL " << lastUrl <<
".";
204 ex <<
". No additional data servers were found.";
207 if (dataServer.size())
213 if (lastUrl == new_filename)
215 edm::LogWarning(
"XrdAdaptorInternal") << lastUrl <<
", " << new_filename;
260 std::unique_ptr<std::string> hostname(hostname_ptr);
272 std::unique_ptr<std::string> hostname(
new std::string(
id));
290 if (siteB.size() && (siteB != siteA)) {siteList = siteA +
", " + siteB;}
291 if (orig_site.size() && (orig_site != siteList))
293 edm::LogWarning(
"XrdAdaptor") <<
"Data is served from " << siteList <<
" instead of original site " << orig_site;
300 m_activeSites = siteList;
332 bool findNewSource =
false;
337 <<
m_activeSources[
a]->PrettyID() <<
" from active sources due to poor quality ("
339 if (
m_activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource =
true;}
345 return findNewSource;
353 bool findNewSource =
false;
356 findNewSource =
true;
366 std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(
m_inactiveSources.size());
371 std::vector<std::shared_ptr<Source> >::iterator bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
372 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
return s1->getQuality() <
s2->getQuality();});
374 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
return s1->getQuality() <
s2->getQuality();});
375 if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
377 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Best inactive source: " <<(*bestInactiveSource)->PrettyID()
378 <<
", quality " << (*bestInactiveSource)->getQuality();
380 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Worst active source: " <<(*worstActiveSource)->PrettyID()
381 <<
", quality " << (*worstActiveSource)->getQuality();
384 if ((bestInactiveSource != eligibleInactiveSources.end()) &&
m_activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*
m_activeSources[0]->getQuality()))
388 for (
auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++)
if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it);
break;}
390 else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+
XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
392 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Removing " << (*worstActiveSource)->PrettyID()
393 <<
" from active sources due to quality (" << (*worstActiveSource)->getQuality()
394 <<
") and promoting " << (*bestInactiveSource)->PrettyID() <<
" (quality: "
395 << (*bestInactiveSource)->getQuality() <<
")" << std::endl;
396 (*worstActiveSource)->setLastDowngrade(now);
397 for (
auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++)
if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it);
break;}
398 m_inactiveSources.emplace_back(
std::move(*worstActiveSource));
402 eligibleInactiveSources.clear();
404 bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
405 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
return s1->getQuality() <
s2->getQuality();});
407 [](
const std::shared_ptr<Source> &s1,
const std::shared_ptr<Source> &
s2) {
return s1->getQuality() <
s2->getQuality();});
414 findNewSource =
true;
436 std::shared_ptr<XrdCl::File>
443 ex <<
"XrdAdaptor::RequestManager::getActiveFile(name='" <<
m_name
444 <<
"', flags=0x" << std::hex <<
m_flags
446 <<
") => Source used after fatal exception.";
447 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
460 sources.push_back(
source->ID());
470 sources.push_back(
source->PrettyID());
480 sources.push_back(
source);
487 std::vector<std::string> sources;
489 for (
auto const&
source : sources)
495 for (
auto const&
source : sources)
501 std::shared_ptr<Source>
504 std::shared_ptr<Source>
source =
nullptr;
523 ex <<
"XrdAdaptor::RequestManager::handle read(name='" <<
m_name
524 <<
"', flags=0x" << std::hex <<
m_flags
526 <<
") => Source used after fatal exception.";
527 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
548 source->handle(c_ptr);
549 return c_ptr->get_future();
556 std::stringstream
ss;
562 ss << it->ExcludeID().substr(0, it->ExcludeID().find(
":")) <<
",";
567 ss << it->ExcludeID().substr(0, it->ExcludeID().find(
":")) <<
",";
572 ss << it.substr(0, it.find(
":")) <<
",";
577 return tmp_str.substr(0, tmp_str.size()-1);
585 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
588 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Successfully opened new source: " << source->PrettyID() << std::endl;
589 for (
const auto &
s : m_activeSources)
591 if (source->ID() ==
s->ID())
593 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Xrootd server returned excluded source " << source->PrettyID()
594 <<
"; ignoring" << std::endl;
595 unsigned returned_count = ++m_excluded_active_count;
601 for (
const auto &
s : m_inactiveSources)
603 if (source->ID() ==
s->ID())
605 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Xrootd server returned excluded inactive source " << source->PrettyID()
606 <<
"; ignoring" << std::endl;
611 if (m_activeSources.size() < 2)
613 m_activeSources.push_back(source);
615 queueUpdateCurrentServer(source->ID());
619 m_inactiveSources.push_back(source);
624 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Got failure when trying to open a new source" << std::endl;
632 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
633 updateCurrentServer();
641 if (m_activeSources.size() == 1)
644 checkSources(now, c_ptr->getSize());
645 m_activeSources[0]->handle(c_ptr);
646 return c_ptr->get_future();
649 else if (m_activeSources.empty())
652 ex <<
"XrdAdaptor::RequestManager::handle readv(name='" << m_name
653 <<
"', flags=0x" << std::hex << m_flags
654 <<
", permissions=0" << std::oct << m_perms <<
std::dec
655 <<
") => Source used after fatal exception.";
656 ex.
addContext(
"In XrdAdaptor::RequestManager::handle()");
662 std::shared_ptr<std::vector<IOPosBuffer> > req1(
new std::vector<IOPosBuffer>);
663 std::shared_ptr<std::vector<IOPosBuffer> > req2(
new std::vector<IOPosBuffer>);
664 splitClientRequest(*iolist, *req1, *req2);
666 checkSources(now, req1->size() + req2->size());
668 if (m_activeSources.size() == 1)
671 m_activeSources[0]->handle(c_ptr);
672 return c_ptr->get_future();
675 std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
676 std::future<IOSize> future1, future2;
680 m_activeSources[0]->handle(c_ptr1);
681 future1 = c_ptr1->get_future();
686 m_activeSources[1]->handle(c_ptr2);
687 future2 = c_ptr2->get_future();
689 if (req1->size() && req2->size())
691 std::future<IOSize> task = std::async(std::launch::deferred,
692 [](std::future<IOSize>
a, std::future<IOSize>
b){
705 return b.get() + a.get();
713 else if (req1->size()) {
return future1; }
714 else if (req2->size()) {
return future2; }
717 std::promise<IOSize>
p; p.set_value(0);
718 return p.get_future();
726 std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
729 if (c_status.code == XrdCl::errInvalidResponse)
731 edm::LogWarning(
"XrdAdaptorInternal") <<
"Invalid response when reading from " << source_ptr->PrettyID();
733 ex <<
"XrdAdaptor::RequestManager::requestFailure readv(name='" <<
m_name
734 <<
"', flags=0x" << std::hex <<
m_flags
736 <<
", old source=" << source_ptr->PrettyID()
737 <<
") => Invalid ReadV response from server";
738 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
742 edm::LogWarning(
"XrdAdaptorInternal") <<
"Request failure when reading from " << source_ptr->PrettyID();
761 std::shared_ptr<Source> new_source;
764 std::shared_future<std::shared_ptr<Source> > future =
m_open_handler->open();
773 if (status == std::future_status::timeout)
776 ex <<
"XrdAdaptor::RequestManager::requestFailure Open(name='" <<
m_name
777 <<
"', flags=0x" << std::hex <<
m_flags
779 <<
", old source=" << source_ptr->PrettyID()
780 <<
") => timeout when waiting for file open";
781 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
789 new_source = future.get();
793 ex.
addContext(
"Handling XrdAdaptor::RequestManager::requestFailure()");
804 ex <<
"XrdAdaptor::RequestManager::requestFailure Open(name='" <<
m_name
805 <<
"', flags=0x" << std::hex <<
m_flags
807 <<
", old source=" << source_ptr->PrettyID()
808 <<
", new source=" << new_source->PrettyID() <<
") => Xrootd server returned an excluded source";
809 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
820 new_source->handle(c_ptr);
830 if (io.
size() > chunksize)
842 consumed = chunksize;
848 consumed = chunksize;
851 chunksize -= consumed;
854 void* newdata =
static_cast<char*
>(io.
data()) + consumed;
859 else if (io.
size() == 0)
865 output.push_back(io);
866 chunksize -= io.
size();
879 if (io.
size() > chunksize)
891 consumed = chunksize;
897 consumed = chunksize;
900 chunksize -= consumed;
903 void* newdata =
static_cast<char*
>(io.
data()) + consumed;
908 else if (io.
size() == 0)
914 output.push_back(io);
915 chunksize -= io.
size();
924 off_t last_offset = -1;
925 for (
const auto & it : req)
928 assert(it.offset() > last_offset);
929 last_offset = it.offset();
931 assert(it.offset() < 0x1ffffffffff);
933 assert(req.size() <= 1024);
940 if (iolist.size() == 0)
return;
941 std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
942 req1.reserve(iolist.size()/2+1);
943 req2.reserve(iolist.size()/2+1);
947 float q1 =
static_cast<float>(m_activeSources[0]->getQuality())+5;
948 float q2 =
static_cast<float>(m_activeSources[1]->getQuality())+5;
951 chunk1 =
std::max(static_cast<IOSize>(static_cast<float>(
XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
952 chunk2 =
std::max(static_cast<IOSize>(static_cast<float>(
XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
955 for (
const auto & it : iolist) size_orig += it.size();
957 while (tmp_iolist.size()-front > 0)
966 ex <<
"XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name
967 <<
"', flags=0x" << std::hex << m_flags
968 <<
", permissions=0" << std::oct << m_perms <<
std::dec
969 <<
") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
970 ex.
addContext(
"In XrdAdaptor::RequestManager::requestFailure()");
972 std::stringstream
ss; ss <<
"Original request size " << iolist.size() <<
"(" << size_orig <<
" bytes)";
974 std::stringstream ss2; ss2 <<
"Quality source 1 " << q1-5 <<
", quality source 2: " << q2-5;
987 assert(size_orig == size1 + size2);
989 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Original request size " << iolist.size() <<
" (" << size_orig <<
" bytes) split into requests size " << req1.size() <<
" (" << size1 <<
" bytes) and " << req2.size() <<
" (" << size2 <<
" bytes)" << std::endl;
1008 std::shared_ptr<Source>
source;
1009 std::unique_ptr<XrdCl::XRootDStatus>
status(status_ptr);
1010 std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1013 std::shared_ptr<OpenHandler>
self = m_self;
1016 auto manager = m_manager.lock();
1025 std::unique_ptr<XrdCl::File> releaseFile;
1027 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1039 m_promise.set_value(source);
1045 ex <<
"XrdCl::File::Open(name='" << manager->m_name
1046 <<
"', flags=0x" << std::hex << manager->m_flags
1047 <<
", permissions=0" << std::oct << manager->m_perms <<
std::dec
1048 <<
") => error '" << status->ToStr()
1049 <<
"' (errno=" << status->errNo <<
", code=" << status->code <<
")";
1050 ex.
addContext(
"In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1051 manager->addConnections(ex);
1053 m_promise.set_exception(std::make_exception_ptr(ex));
1056 manager->handleOpen(*status, source);
1062 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1066 return "(no open in progress)";
1069 m_file->GetProperty(
"DataServer", dataServer);
1070 if (!dataServer.size()) {
return "(unknown source)"; }
1074 std::shared_future<std::shared_ptr<Source> >
1077 auto manager_ptr = m_manager.lock();
1081 ex <<
"XrdCl::File::Open() =>"
1082 <<
" error: OpenHandler called within an invalid RequestManager context."
1083 <<
" This is a logic error and should be reported to the CMSSW developers.";
1084 ex.
addContext(
"Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1088 auto self_ptr = m_self_weak.lock();
1092 ex <<
"XrdCl::File::Open() => error: "
1093 <<
"OpenHandler called after it was deleted. This is a logic error "
1094 <<
"and should be reported to the CMSSW developers.";
1095 ex.
addContext(
"Calling XrdAdapter::RequestManager::OpenHandler::open()");
1110 return m_shared_future;
1112 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1113 std::promise<std::shared_ptr<Source> > new_promise;
1114 m_promise.swap(new_promise);
1115 m_shared_future = m_promise.get_future().share();
1119 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"Trying to open URL: " << new_name;
1121 XrdCl::XRootDStatus
status;
1122 if (!(status = m_file->Open(new_name, manager.
m_flags, manager.
m_perms,
this)).IsOK())
1125 ex <<
"XrdCl::File::Open(name='" << new_name
1126 <<
"', flags=0x" << std::hex << manager.
m_flags
1128 <<
") => error '" << status.ToStr()
1129 <<
"' (errno=" << status.errNo <<
", code=" << status.code <<
")";
1130 ex.
addContext(
"Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1136 return m_shared_future;
void setCurrentServer(const std::string &servername)
std::shared_future< std::shared_ptr< Source > > open()
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
std::vector< Variable::Flags > flags
void updateSiteInfo(std::string orig_site="")
std::set< std::string > m_disabledSourceStrings
OpenHandler(std::weak_ptr< RequestManager > manager)
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
SendMonitoringInfoHandler nullHandler
static std::string const input
bool compareSources(const timespec &now, unsigned a, unsigned b)
#define XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static void SendMonitoringInfo(XrdCl::File &file)
void set_data(void *new_buffer)
void set_size(IOSize new_size)
void queueUpdateCurrentServer(const std::string &)
void addConnections(cms::Exception &)
long long timeDiffMS(const timespec &a, const timespec &b)
std::set< std::shared_ptr< Source > > m_disabledSources
void set_offset(IOOffset new_offset)
static bool getDomain(const std::string &host, std::string &domain)
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
std::atomic< std::string * > m_serverToAdvertise
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
XrdSiteStatisticsInformation * statsService
void addAdditionalInfo(std::string const &info)
timespec m_nextActiveSourceCheck
std::set< std::string > m_disabledExcludeStrings
std::shared_ptr< XrdCl::File > getActiveFile()
std::string current_source()
std::shared_ptr< OpenHandler > m_open_handler
IOOffset offset(void) const
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
std::string prepareOpaqueString()
XrdCl::OpenFlags::Flags m_flags
virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
tuple idx
DEBUGGING if hasattr(process,"trackMonIterativeTracking2012"): print "trackMonIterativeTracking2012 D...
void getActiveSourceNames(std::vector< std::string > &sources)
void checkSources(timespec &now, IOSize requestSize)
std::string m_activeSites
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
void checkSourcesImpl(timespec &now, IOSize requestSize)
static const char * getJobID()
std::shared_ptr< Source > pickSingleSource()
static bool getXrootdSiteFromURL(std::string url, std::string &site)
void getDisabledSourceNames(std::vector< std::string > &sources)
static IOSize validateList(const std::vector< IOPosBuffer > req)
void getPrettyActiveSourceNames(std::vector< std::string > &sources)
void updateCurrentServer()
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
volatile std::atomic< bool > shutdown_flag false
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2)
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
static bool getHostname(const std::string &id, std::string &hostname)
static std::string const source
void clearAdditionalInfo()
std::recursive_mutex m_source_mutex
bool m_nextInitialSourceToggle
timespec m_lastSourceCheck
void initialize(std::weak_ptr< RequestManager > selfref)