CMS 3D CMS Logo

XrdRequestManager.cc
Go to the documentation of this file.
1 
2 #include <cassert>
3 #include <iostream>
4 #include <algorithm>
5 #include <netdb.h>
6 
7 #include "XrdCl/XrdClFile.hh"
8 #include "XrdCl/XrdClDefaultEnv.hh"
9 #include "XrdCl/XrdClFileSystem.hh"
10 
16 
17 #include "XrdStatistics.h"
19 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
20 
21 #define XRD_CL_MAX_CHUNK 512 * 1024
22 
23 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5
24 
25 #ifdef XRD_FAKE_OPEN_PROBE
26 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100
27 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20
28 // This is the minimal difference in quality required to swap an active and inactive source
29 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0
30 #else
31 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10
32 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2 * 60
33 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
34 #endif
35 
36 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000
37 
38 #ifdef __MACH__
39 #include <mach/clock.h>
40 #include <mach/mach.h>
41 #define GET_CLOCK_MONOTONIC(ts) \
42  { \
43  clock_serv_t cclock; \
44  mach_timespec_t mts; \
45  host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
46  clock_get_time(cclock, &mts); \
47  mach_port_deallocate(mach_task_self(), cclock); \
48  ts.tv_sec = mts.tv_sec; \
49  ts.tv_nsec = mts.tv_nsec; \
50  }
51 #else
52 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
53 #endif
54 
55 using namespace XrdAdaptor;
56 
57 long long timeDiffMS(const timespec &a, const timespec &b) {
58  long long diff = (a.tv_sec - b.tv_sec) * 1000;
59  diff += (a.tv_nsec - b.tv_nsec) / 1e6;
60  return diff;
61 }
62 
63 /*
64  * We do not care about the response of sending the monitoring information;
65  * this handler class simply frees any returned buffer to prevent memory leaks.
66  */
67 class SendMonitoringInfoHandler : boost::noncopyable, public XrdCl::ResponseHandler {
68  void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
69  if (response) {
70  XrdCl::Buffer *buffer = nullptr;
71  response->Get(buffer);
72  response->Set(static_cast<int *>(nullptr));
73  delete buffer;
74  }
75  // Send Info has a response object; we must delete it.
76  delete response;
77  delete status;
78  }
79 };
80 
82 
84  // Do not send this to a dCache data server as they return an error.
85  // In some versions of dCache, sending the monitoring information causes
86  // the server to close the connection - resulting in failures.
87  if (Source::isDCachePool(file)) {
88  return;
89  }
90 
91  // Send the monitoring info, if available.
93  std::string lastUrl;
94  file.GetProperty("LastURL", lastUrl);
95  if (jobId && !lastUrl.empty()) {
96  XrdCl::URL url(lastUrl);
97  XrdCl::FileSystem fs(url);
98  if (!(fs.SendInfo(jobId, &nullHandler, 30).IsOK())) {
99  edm::LogWarning("XrdAdaptorInternal")
100  << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
101  }
102  edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
103  }
104 }
105 
107  : m_serverToAdvertise(nullptr),
108  m_timeout(XRD_DEFAULT_TIMEOUT),
109  m_nextInitialSourceToggle(false),
110  m_name(filename),
111  m_flags(flags),
112  m_perms(perms),
113  m_distribution(0, 100),
114  m_excluded_active_count(0) {}
115 
116 void RequestManager::initialize(std::weak_ptr<RequestManager> self) {
118 
119  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
120  if (env) {
121  env->GetInt("StreamErrorWindow", m_timeout);
122  }
123 
124  std::string orig_site;
125  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos)) {
126  std::string hostname;
127  if (Source::getHostname(orig_site, hostname)) {
128  Source::getDomain(hostname, orig_site);
129  }
130  }
131 
132  std::unique_ptr<XrdCl::File> file;
134  bool validFile = false;
135  const int retries = 5;
136  std::string excludeString;
137  for (int idx = 0; idx < retries; idx++) {
138  file.reset(new XrdCl::File());
139  auto opaque = prepareOpaqueString();
140  std::string new_filename =
141  m_name + (!opaque.empty() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
142  SyncHostResponseHandler handler;
143  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
144  if (!openStatus
145  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
146  // server and it was marked bad - xrootd couldn't even queue up the request internally!
147  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
148  // redirector being down or authentication failures.
149  ex.clearMessage();
150  ex.clearContext();
151  ex.clearAdditionalInfo();
152  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
153  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
154  << ", code=" << openStatus.code << ")";
155  ex.addContext("Calling XrdFile::open()");
156  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
157  throw ex;
158  }
159  handler.WaitForResponse();
160  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
161  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
162  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
163  assert(status);
164  if (status->IsOK()) {
165  validFile = true;
166  break;
167  } else {
168  ex.clearMessage();
169  ex.clearContext();
170  ex.clearAdditionalInfo();
171  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
172  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
173  << ", code=" << status->code << ")";
174  ex.addContext("Calling XrdFile::open()");
175  addConnections(ex);
176  std::string dataServer, lastUrl;
177  file->GetProperty("DataServer", dataServer);
178  file->GetProperty("LastURL", lastUrl);
179  if (!dataServer.empty()) {
180  ex.addAdditionalInfo("Problematic data server: " + dataServer);
181  }
182  if (!lastUrl.empty()) {
183  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
184  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
185  }
186  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
187  m_disabledSourceStrings.end()) {
188  ex << ". No additional data servers were found.";
189  throw ex;
190  }
191  if (!dataServer.empty()) {
192  m_disabledSourceStrings.insert(dataServer);
193  m_disabledExcludeStrings.insert(excludeString);
194  }
195  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
196  if (lastUrl == new_filename) {
197  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
198  throw ex;
199  }
200  }
201  }
202  if (!validFile) {
203  throw ex;
204  }
205  SendMonitoringInfo(*file);
206 
207  timespec ts;
209 
210  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
211  {
212  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
213  auto oldList = m_activeSources;
214  m_activeSources.push_back(source);
216  }
219 
220  m_lastSourceCheck = ts;
221  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
223 }
224 
233  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
234  // a pending update. *However*, since we call this for every read, we'll get it
235  // eventually.
236  if (likely(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
237  return;
238  }
239  std::string *hostname_ptr;
240  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
241  std::unique_ptr<std::string> hostname(hostname_ptr);
243  if (statsService.isAvailable()) {
244  statsService->setCurrentServer(m_name, *hostname_ptr);
245  }
246  }
247 }
248 
250  auto hostname = std::make_unique<std::string>(id);
251  if (Source::getHostname(id, *hostname)) {
252  std::string *null_hostname = nullptr;
253  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
254  hostname.release();
255  }
256  }
257 }
258 
259 namespace {
260  std::string formatSites(std::vector<std::shared_ptr<Source>> const &iSources) {
261  std::string siteA, siteB;
262  if (!iSources.empty()) {
263  siteA = iSources[0]->Site();
264  }
265  if (iSources.size() == 2) {
266  siteB = iSources[1]->Site();
267  }
268  std::string siteList = siteA;
269  if (!siteB.empty() && (siteB != siteA)) {
270  siteList = siteA + ", " + siteB;
271  }
272  return siteList;
273  }
274 } // namespace
275 
276 void RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
277  std::vector<std::shared_ptr<Source>> const &iNew,
278  std::string orig_site) const {
279  auto siteList = formatSites(iNew);
280  if (!orig_site.empty() && (orig_site != siteList)) {
281  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
282  } else {
283  auto oldSites = formatSites(iOld);
284  if (orig_site.empty() && (siteList != oldSites)) {
285  if (!oldSites.empty())
286  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
287  }
288  }
289 }
290 
292  IOSize requestSize,
293  std::vector<std::shared_ptr<Source>> &activeSources,
294  std::vector<std::shared_ptr<Source>> &inactiveSources) {
295  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
296  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
297  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
298  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
299  { // Be more aggressive about getting rid of very bad sources.
300  compareSources(now, 0, 1, activeSources, inactiveSources);
301  compareSources(now, 1, 0, activeSources, inactiveSources);
302  }
303  if (timeDiffMS(now, m_nextActiveSourceCheck) > 0) {
304  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
305  }
306  }
307 }
308 
309 bool RequestManager::compareSources(const timespec &now,
310  unsigned a,
311  unsigned b,
312  std::vector<std::shared_ptr<Source>> &activeSources,
313  std::vector<std::shared_ptr<Source>> &inactiveSources) const {
314  if (activeSources.size() < std::max(a, b) + 1) {
315  return false;
316  }
317 
318  bool findNewSource = false;
319  if ((activeSources[a]->getQuality() > 5130) ||
320  ((activeSources[a]->getQuality() > 260) &&
321  (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
322  edm::LogVerbatim("XrdAdaptorInternal")
323  << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
324  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
325  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
326  findNewSource = true;
327  }
328  activeSources[a]->setLastDowngrade(now);
329  inactiveSources.emplace_back(activeSources[a]);
330  auto oldSources = activeSources;
331  activeSources.erase(activeSources.begin() + a);
332  reportSiteChange(oldSources, activeSources);
333  }
334  return findNewSource;
335 }
336 
338  IOSize requestSize,
339  std::vector<std::shared_ptr<Source>> &activeSources,
340  std::vector<std::shared_ptr<Source>> &inactiveSources) {
341  bool findNewSource = false;
342  if (activeSources.size() <= 1) {
343  findNewSource = true;
344  } else if (activeSources.size() > 1) {
345  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
346  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
347  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
348  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
349 
350  // NOTE: We could probably replace the copy with a better sort function.
351  // However, there are typically very few sources and the correctness is more obvious right now.
352  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
353  eligibleInactiveSources.reserve(inactiveSources.size());
354  for (const auto &source : inactiveSources) {
355  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
356  eligibleInactiveSources.push_back(source);
357  }
358  }
359  auto bestInactiveSource =
360  std::min_element(eligibleInactiveSources.begin(),
361  eligibleInactiveSources.end(),
362  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
363  return s1->getQuality() < s2->getQuality();
364  });
365  auto worstActiveSource = std::max_element(activeSources.cbegin(),
366  activeSources.cend(),
367  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
368  return s1->getQuality() < s2->getQuality();
369  });
370  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
371  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
372  << ", quality " << (*bestInactiveSource)->getQuality();
373  }
374  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
375  << ", quality " << (*worstActiveSource)->getQuality();
376  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
377  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
378  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
379  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
380  auto oldSources = activeSources;
381  activeSources.push_back(*bestInactiveSource);
382  reportSiteChange(oldSources, activeSources);
383  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
384  if (it->get() == bestInactiveSource->get()) {
385  inactiveSources.erase(it);
386  break;
387  }
388  } else
389  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
390  (*worstActiveSource)->getQuality() >
391  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
392  edm::LogVerbatim("XrdAdaptorInternal")
393  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
394  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
395  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
396  (*worstActiveSource)->setLastDowngrade(now);
397  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
398  if (it->get() == bestInactiveSource->get()) {
399  inactiveSources.erase(it);
400  break;
401  }
402  inactiveSources.emplace_back(std::move(*worstActiveSource));
403  auto oldSources = activeSources;
404  activeSources.erase(worstActiveSource);
405  activeSources.emplace_back(std::move(*bestInactiveSource));
406  reportSiteChange(oldSources, activeSources);
407  eligibleInactiveSources.clear();
408  for (const auto &source : inactiveSources)
409  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
410  eligibleInactiveSources.push_back(source);
411  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
412  eligibleInactiveSources.end(),
413  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
414  return s1->getQuality() < s2->getQuality();
415  });
416  worstActiveSource = std::max_element(activeSources.begin(),
417  activeSources.end(),
418  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
419  return s1->getQuality() < s2->getQuality();
420  });
421  }
422  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
423  float r = m_distribution(m_generator);
425  findNewSource = true;
426  }
427  }
428  }
429  if (findNewSource) {
430  m_open_handler->open();
432  }
433 
434  // Only aggressively look for new sources if we don't have two.
435  if (activeSources.size() == 2) {
437  } else {
438  now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
439  }
441 }
442 
443 std::shared_ptr<XrdCl::File> RequestManager::getActiveFile() const {
444  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
445  if (m_activeSources.empty()) {
447  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
448  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
449  ex.addContext("In XrdAdaptor::RequestManager::handle()");
450  addConnections(ex);
451  throw ex;
452  }
453  return m_activeSources[0]->getFileHandle();
454 }
455 
456 void RequestManager::getActiveSourceNames(std::vector<std::string> &sources) const {
457  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
458  sources.reserve(m_activeSources.size());
459  for (auto const &source : m_activeSources) {
460  sources.push_back(source->ID());
461  }
462 }
463 
464 void RequestManager::getPrettyActiveSourceNames(std::vector<std::string> &sources) const {
465  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
466  sources.reserve(m_activeSources.size());
467  for (auto const &source : m_activeSources) {
468  sources.push_back(source->PrettyID());
469  }
470 }
471 
472 void RequestManager::getDisabledSourceNames(std::vector<std::string> &sources) const {
473  sources.reserve(m_disabledSourceStrings.size());
474  for (auto const &source : m_disabledSourceStrings) {
475  sources.push_back(source);
476  }
477 }
478 
480  std::vector<std::string> sources;
482  for (auto const &source : sources) {
483  ex.addAdditionalInfo("Active source: " + source);
484  }
485  sources.clear();
486  getDisabledSourceNames(sources);
487  for (auto const &source : sources) {
488  ex.addAdditionalInfo("Disabled source: " + source);
489  }
490 }
491 
492 std::shared_ptr<Source> RequestManager::pickSingleSource() {
493  std::shared_ptr<Source> source = nullptr;
494  {
495  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
496  if (m_activeSources.size() == 2) {
498  source = m_activeSources[0];
500  } else {
501  source = m_activeSources[1];
503  }
504  } else if (m_activeSources.empty()) {
506  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
507  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
508  ex.addContext("In XrdAdaptor::RequestManager::handle()");
509  addConnections(ex);
510  throw ex;
511  } else {
512  source = m_activeSources[0];
513  }
514  }
515  return source;
516 }
517 
518 std::future<IOSize> RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr) {
519  assert(c_ptr.get());
520  timespec now;
521  GET_CLOCK_MONOTONIC(now);
522  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
523  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
524  {
525  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
526  activeSources = m_activeSources;
527  inactiveSources = m_inactiveSources;
528  }
529  {
530  //make sure we update values before calling pickSingelSource
531  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
532  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
533  m_activeSources = std::move(activeSources);
534  m_inactiveSources = std::move(inactiveSources);
535  });
536 
537  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
538  }
539 
540  std::shared_ptr<Source> source = pickSingleSource();
541  source->handle(c_ptr);
542  return c_ptr->get_future();
543 }
544 
546  std::stringstream ss;
547  ss << "tried=";
548  size_t count = 0;
549  {
550  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
551 
552  for (const auto &it : m_activeSources) {
553  count++;
554  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
555  }
556  for (const auto &it : m_inactiveSources) {
557  count++;
558  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
559  }
560  }
561  for (const auto &it : m_disabledExcludeStrings) {
562  count++;
563  ss << it.substr(0, it.find(":")) << ",";
564  }
565  if (count) {
566  std::string tmp_str = ss.str();
567  return tmp_str.substr(0, tmp_str.size() - 1);
568  }
569  return "";
570 }
571 
572 void XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source) {
573  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
574  if (status.IsOK()) {
575  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
576  for (const auto &s : m_activeSources) {
577  if (source->ID() == s->ID()) {
578  edm::LogVerbatim("XrdAdaptorInternal")
579  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
580  unsigned returned_count = ++m_excluded_active_count;
582  if (returned_count >= 3) {
584  }
585  return;
586  }
587  }
588  for (const auto &s : m_inactiveSources) {
589  if (source->ID() == s->ID()) {
590  edm::LogVerbatim("XrdAdaptorInternal")
591  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
593  return;
594  }
595  }
596  if (m_activeSources.size() < 2) {
597  auto oldSources = m_activeSources;
598  m_activeSources.push_back(source);
599  reportSiteChange(oldSources, m_activeSources);
600  queueUpdateCurrentServer(source->ID());
601  } else {
602  m_inactiveSources.push_back(source);
603  }
604  } else { // File-open failure - wait at least 120s before next attempt.
605  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
607  }
608 }
609 
610 std::future<IOSize> XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist) {
611  //Use a copy of m_activeSources and m_inactiveSources throughout this function
612  // in order to avoid holding the lock a long time and causing a deadlock.
613  // When the function is over we will update the values of the containers
614  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
615  {
616  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
617  activeSources = m_activeSources;
618  inactiveSources = m_inactiveSources;
619  }
620  //Make sure we update changes when we leave the function
621  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
622  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
623  m_activeSources = std::move(activeSources);
624  m_inactiveSources = std::move(inactiveSources);
625  });
626 
628 
629  timespec now;
630  GET_CLOCK_MONOTONIC(now);
631 
632  edm::CPUTimer timer;
633  timer.start();
634 
635  if (activeSources.size() == 1) {
636  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
637  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
638  activeSources[0]->handle(c_ptr);
639  return c_ptr->get_future();
640  }
641  // Make sure active
642  else if (activeSources.empty()) {
644  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
645  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
646  ex.addContext("In XrdAdaptor::RequestManager::handle()");
647  addConnections(ex);
648  throw ex;
649  }
650 
651  assert(iolist.get());
652  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
653  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
654  splitClientRequest(*iolist, *req1, *req2, activeSources);
655 
656  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
657  // CheckSources may have removed a source
658  if (activeSources.size() == 1) {
659  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
660  activeSources[0]->handle(c_ptr);
661  return c_ptr->get_future();
662  }
663 
664  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
665  std::future<IOSize> future1, future2;
666  if (!req1->empty()) {
667  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
668  activeSources[0]->handle(c_ptr1);
669  future1 = c_ptr1->get_future();
670  }
671  if (!req2->empty()) {
672  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
673  activeSources[1]->handle(c_ptr2);
674  future2 = c_ptr2->get_future();
675  }
676  if (!req1->empty() && !req2->empty()) {
677  std::future<IOSize> task =
678  std::async(std::launch::deferred,
679  [](std::future<IOSize> a, std::future<IOSize> b) {
680  // Wait until *both* results are available. This is essential
681  // as the callback may try referencing the RequestManager. If one
682  // throws an exception (causing the RequestManager to be destroyed by
683  // XrdFile) and the other has a failure, then the recovery code will
684  // reference the destroyed RequestManager.
685  //
686  // Unlike other places where we use shared/weak ptrs to maintain object
687  // lifetime and destruction asynchronously, we *cannot* destroy the request
688  // asynchronously as it is associated with a ROOT buffer. We must wait until we
689  // are guaranteed that XrdCl will not write into the ROOT buffer before we
690  // can return.
691  b.wait();
692  a.wait();
693  return b.get() + a.get();
694  },
695  std::move(future1),
696  std::move(future2));
697  timer.stop();
698  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
699  return task;
700  } else if (!req1->empty()) {
701  return future1;
702  } else if (!req2->empty()) {
703  return future2;
704  } else { // Degenerate case - no bytes to read.
705  std::promise<IOSize> p;
706  p.set_value(0);
707  return p.get_future();
708  }
709 }
710 
711 void RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status) {
712  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
713 
714  // Fail early for invalid responses - XrdFile has a separate path for handling this.
715  if (c_status.code == XrdCl::errInvalidResponse) {
716  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
718  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
719  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
720  << ") => Invalid ReadV response from server";
721  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
722  addConnections(ex);
723  throw ex;
724  }
725  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
726 
727  // Note that we do not delete the Source itself. That is because this
728  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
729  // In such a case, if you close a file in the handler, it will deadlock
730  m_disabledSourceStrings.insert(source_ptr->ID());
731  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
732  m_disabledSources.insert(source_ptr);
733 
734  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
735  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
736  auto oldSources = m_activeSources;
737  m_activeSources.erase(m_activeSources.begin());
738  reportSiteChange(oldSources, m_activeSources);
739  } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
740  auto oldSources = m_activeSources;
741  m_activeSources.erase(m_activeSources.begin() + 1);
742  reportSiteChange(oldSources, m_activeSources);
743  }
744  std::shared_ptr<Source> new_source;
745  if (m_activeSources.empty()) {
746  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
747  timespec now;
748  GET_CLOCK_MONOTONIC(now);
750  // Note we only wait for 180 seconds here. This is because we've already failed
751  // once and the likelihood the program has some inconsistent state is decent.
752  // We'd much rather fail hard than deadlock!
753  sentry.unlock();
754  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
755  if (status == std::future_status::timeout) {
757  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
758  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
759  << ") => timeout when waiting for file open";
760  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
761  addConnections(ex);
762  throw ex;
763  } else {
764  try {
765  new_source = future.get();
766  } catch (edm::Exception &ex) {
767  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
768  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
769  throw;
770  }
771  }
772 
773  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
774  m_disabledSourceStrings.end()) {
775  // The server gave us back a data node we requested excluded. Fatal!
777  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
778  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
779  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
780  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
781  addConnections(ex);
782  throw ex;
783  }
784  sentry.lock();
785 
786  auto oldSources = m_activeSources;
787  m_activeSources.push_back(new_source);
788  reportSiteChange(oldSources, m_activeSources);
789  } else {
790  new_source = m_activeSources[0];
791  }
792  new_source->handle(c_ptr);
793 }
794 
795 static void consumeChunkFront(size_t &front,
796  std::vector<IOPosBuffer> &input,
797  std::vector<IOPosBuffer> &output,
798  IOSize chunksize) {
799  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
800  IOPosBuffer &io = input[front];
801  IOPosBuffer &outio = output.back();
802  if (io.size() > chunksize) {
803  IOSize consumed;
804  if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
805  (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
806  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
807  consumed = (XRD_CL_MAX_CHUNK - outio.size());
808  outio.set_size(XRD_CL_MAX_CHUNK);
809  } else {
810  consumed = chunksize;
811  outio.set_size(outio.size() + consumed);
812  }
813  } else {
814  consumed = chunksize;
815  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
816  }
817  chunksize -= consumed;
818  IOSize newsize = io.size() - consumed;
819  IOOffset newoffset = io.offset() + consumed;
820  void *newdata = static_cast<char *>(io.data()) + consumed;
821  io.set_offset(newoffset);
822  io.set_data(newdata);
823  io.set_size(newsize);
824  } else if (io.size() == 0) {
825  front++;
826  } else {
827  output.push_back(io);
828  chunksize -= io.size();
829  front++;
830  }
831  }
832 }
833 
834 static void consumeChunkBack(size_t front,
835  std::vector<IOPosBuffer> &input,
836  std::vector<IOPosBuffer> &output,
837  IOSize chunksize) {
838  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
839  IOPosBuffer &io = input.back();
840  IOPosBuffer &outio = output.back();
841  if (io.size() > chunksize) {
842  IOSize consumed;
843  if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
844  (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
845  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
846  consumed = (XRD_CL_MAX_CHUNK - outio.size());
847  outio.set_size(XRD_CL_MAX_CHUNK);
848  } else {
849  consumed = chunksize;
850  outio.set_size(outio.size() + consumed);
851  }
852  } else {
853  consumed = chunksize;
854  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
855  }
856  chunksize -= consumed;
857  IOSize newsize = io.size() - consumed;
858  IOOffset newoffset = io.offset() + consumed;
859  void *newdata = static_cast<char *>(io.data()) + consumed;
860  io.set_offset(newoffset);
861  io.set_data(newdata);
862  io.set_size(newsize);
863  } else if (io.size() == 0) {
864  input.pop_back();
865  } else {
866  output.push_back(io);
867  chunksize -= io.size();
868  input.pop_back();
869  }
870  }
871 }
872 
873 static IOSize validateList(const std::vector<IOPosBuffer> req) {
874  IOSize total = 0;
875  off_t last_offset = -1;
876  for (const auto &it : req) {
877  total += it.size();
878  assert(it.offset() > last_offset);
879  last_offset = it.offset();
880  assert(it.size() <= XRD_CL_MAX_CHUNK);
881  assert(it.offset() < 0x1ffffffffff);
882  }
883  assert(req.size() <= 1024);
884  return total;
885 }
886 
887 void XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist,
888  std::vector<IOPosBuffer> &req1,
889  std::vector<IOPosBuffer> &req2,
890  std::vector<std::shared_ptr<Source>> const &activeSources) const {
891  if (iolist.empty())
892  return;
893  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
894  req1.reserve(iolist.size() / 2 + 1);
895  req2.reserve(iolist.size() / 2 + 1);
896  size_t front = 0;
897 
898  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
899  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
900  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
901  IOSize chunk1, chunk2;
902  // Make sure the chunk size is at least 1024; little point to reads less than that size.
903  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
904  static_cast<IOSize>(1024));
905  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
906  static_cast<IOSize>(1024));
907 
908  IOSize size_orig = 0;
909  for (const auto &it : iolist)
910  size_orig += it.size();
911 
912  while (tmp_iolist.size() - front > 0) {
913  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
914  (req2.size() >=
915  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
916  // are passed to the request manager. However, because we have a max chunk size, we increase
917  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
918  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
919  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
921  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
922  << ", permissions=0" << std::oct << m_perms << std::dec
923  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
924  "reported to CMSSW developers.";
925  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
926  addConnections(ex);
927  std::stringstream ss;
928  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
929  ex.addAdditionalInfo(ss.str());
930  std::stringstream ss2;
931  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
932  ex.addAdditionalInfo(ss2.str());
933  throw ex;
934  }
935  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
936  consumeChunkFront(front, tmp_iolist, req1, chunk1);
937  }
938  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
939  consumeChunkBack(front, tmp_iolist, req2, chunk2);
940  }
941  }
942  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
943  return left.offset() < right.offset();
944  });
945  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
946  return left.offset() < right.offset();
947  });
948 
949  IOSize size1 = validateList(req1);
950  IOSize size2 = validateList(req2);
951 
952  assert(size_orig == size1 + size2);
953 
954  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
955  << " bytes) split into requests size " << req1.size() << " (" << size1
956  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
957 }
958 
959 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager) : m_manager(manager) {}
960 
961 // Cannot use ~OpenHandler=default as XrdCl::File is not fully
962 // defined in the header.
964 
966  XrdCl::AnyObject *,
967  XrdCl::HostList *hostList_ptr) {
968  // NOTE: as in XrdCl::File (synchronous), we ignore the response object.
969  // Make sure that we set m_outstanding_open to false on exit from this function.
970  // NOTE: we need to pass non-nullptr to unique_ptr in order for the guard to run
971  std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> outstanding_guard(
972  this, [&](OpenHandler *) { m_outstanding_open = false; });
973 
974  std::shared_ptr<Source> source;
975  std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
976  std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
977 
978  // Make sure we get rid of the strong self-reference when the callback finishes.
979  std::shared_ptr<OpenHandler> self = m_self;
980  m_self.reset();
981 
982  auto manager = m_manager.lock();
983  // Manager object has already been deleted. Cleanup the
984  // response objects, remove our self-reference, and ignore the response.
985  if (!manager) {
986  return;
987  }
988  //if we need to delete the File object we must do it outside
989  // of the lock to avoid a potential deadlock
990  std::unique_ptr<XrdCl::File> releaseFile;
991  {
992  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
993 
994  if (status->IsOK()) {
996  timespec now;
997  GET_CLOCK_MONOTONIC(now);
998 
999  std::string excludeString;
1000  Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1001 
1002  source.reset(new Source(now, std::move(m_file), excludeString));
1003  m_promise.set_value(source);
1004  } else {
1005  releaseFile = std::move(m_file);
1007  ex << "XrdCl::File::Open(name='" << manager->m_name << "', flags=0x" << std::hex << manager->m_flags
1008  << ", permissions=0" << std::oct << manager->m_perms << std::dec << ") => error '" << status->ToStr()
1009  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1010  ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1011  manager->addConnections(ex);
1012 
1013  m_promise.set_exception(std::make_exception_ptr(ex));
1014  }
1015  }
1016  manager->handleOpen(*status, source);
1017 }
1018 
1020  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1021 
1022  if (!m_file.get()) {
1023  return "(no open in progress)";
1024  }
1025  std::string dataServer;
1026  m_file->GetProperty("DataServer", dataServer);
1027  if (dataServer.empty()) {
1028  return "(unknown source)";
1029  }
1030  return dataServer;
1031 }
1032 
1033 std::shared_future<std::shared_ptr<Source>> XrdAdaptor::RequestManager::OpenHandler::open() {
1034  auto manager_ptr = m_manager.lock();
1035  if (!manager_ptr) {
1037  ex << "XrdCl::File::Open() =>"
1038  << " error: OpenHandler called within an invalid RequestManager context."
1039  << " This is a logic error and should be reported to the CMSSW developers.";
1040  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1041  throw ex;
1042  }
1043  RequestManager &manager = *manager_ptr;
1044  auto self_ptr = m_self_weak.lock();
1045  if (!self_ptr) {
1047  ex << "XrdCl::File::Open() => error: "
1048  << "OpenHandler called after it was deleted. This is a logic error "
1049  << "and should be reported to the CMSSW developers.";
1050  ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1051  throw ex;
1052  }
1053 
1054  // NOTE NOTE: we look at this variable *without* the lock. This means the method
1055  // is not thread-safe; the caller is responsible to verify it is not called from
1056  // multiple threads simultaneously.
1057  //
1058  // This is done because ::open may be called from a Xrootd callback; if we
1059  // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1060  // and make a call into xrootd (when it invokes m_file.reset()). Hence, our callback
1061  // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1062  // an Xrootd mutex and tries to hold m_mutex. This is a classic deadlock.
1063  if (m_outstanding_open) {
1064  return m_shared_future;
1065  }
1066  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1067  std::promise<std::shared_ptr<Source>> new_promise;
1068  m_promise.swap(new_promise);
1069  m_shared_future = m_promise.get_future().share();
1070 
1071  auto opaque = manager.prepareOpaqueString();
1072  std::string new_name = manager.m_name + ((manager.m_name.find("?") == manager.m_name.npos) ? "?" : "&") + opaque;
1073  edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1074  m_file.reset(new XrdCl::File());
1075  m_outstanding_open = true;
1076 
1077  // Always make sure we release m_file and set m_outstanding_open to false on error.
1078  std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> exit_guard(this, [&](OpenHandler *) {
1079  m_outstanding_open = false;
1080  m_file.reset();
1081  });
1082 
1083  XrdCl::XRootDStatus status;
1084  if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK()) {
1086  ex << "XrdCl::File::Open(name='" << new_name << "', flags=0x" << std::hex << manager.m_flags << ", permissions=0"
1087  << std::oct << manager.m_perms << std::dec << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
1088  << ", code=" << status.code << ")";
1089  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1090  manager.addConnections(ex);
1091  throw ex;
1092  }
1093  exit_guard.release();
1094  // Have a strong self-reference for as long as the callback is in-progress.
1095  m_self = self_ptr;
1096  return m_shared_future;
1097 }
std::shared_future< std::shared_ptr< Source > > open()
double seconds()
void start()
Definition: CPUTimer.cc:74
std::shared_ptr< XrdCl::File > getActiveFile() const
void getDisabledSourceNames(std::vector< std::string > &sources) const
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:307
std::string prepareOpaqueString() const
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
#define XRD_CL_MAX_CHUNK
OpenHandler(std::weak_ptr< RequestManager > manager)
std::shared_ptr< OpenHandler > m_self
#define nullptr
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
int timeout
Definition: mps_check.py:51
double q2[4]
Definition: TauolaWrapper.h:88
std::weak_ptr< OpenHandler > m_self_weak
SendMonitoringInfoHandler nullHandler
static std::string const input
Definition: EdmProvDump.cc:44
#define XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static void SendMonitoringInfo(XrdCl::File &file)
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
#define likely(x)
std::shared_future< std::shared_ptr< Source > > m_shared_future
void queueUpdateCurrentServer(const std::string &)
std::weak_ptr< RequestManager > m_manager
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:255
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:266
std::atomic< std::string * > m_serverToAdvertise
bool isAvailable() const
Definition: Service.h:46
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
Times stop()
Definition: CPUTimer.cc:94
void clearMessage()
Definition: Exception.cc:215
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
void * data(void) const
Definition: IOPosBuffer.h:59
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void clearContext()
Definition: Exception.cc:219
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void getActiveSourceNames(std::vector< std::string > &sources) const
IOSize size(void) const
Definition: IOPosBuffer.h:64
XrdCl::OpenFlags::Flags m_flags
double q1[4]
Definition: TauolaWrapper.h:87
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::atomic< unsigned > m_excluded_active_count
double a
Definition: hdecay.h:121
std::shared_ptr< Source > pickSingleSource()
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:344
static IOSize validateList(const std::vector< IOPosBuffer > req)
std::unique_ptr< XrdCl::File > m_file
size_t IOSize
Definition: IOTypes.h:14
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
static std::string const source
Definition: EdmProvDump.cc:43
void clearAdditionalInfo()
Definition: Exception.cc:223
def move(src, dest)
Definition: eostools.py:510
std::recursive_mutex m_source_mutex
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)