CMS 3D CMS Logo

XrdRequestManager.cc
Go to the documentation of this file.
1 
2 #include <algorithm>
3 #include <cassert>
4 #include <iostream>
5 #include <memory>
6 
7 #include <netdb.h>
8 
9 #include "XrdCl/XrdClFile.hh"
10 #include "XrdCl/XrdClDefaultEnv.hh"
11 #include "XrdCl/XrdClFileSystem.hh"
12 
20 
21 #include "XrdStatistics.h"
23 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
24 
25 static constexpr int XRD_CL_MAX_CHUNK = 512 * 1024;
26 
27 static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY = 5;
28 
29 #ifdef XRD_FAKE_OPEN_PROBE
30 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 100;
31 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 20;
32 // This is the minimal difference in quality required to swap an active and inactive source
33 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 0;
34 #else
35 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 10;
36 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 2 * 60;
37 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 100;
38 #endif
39 
40 static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD = 1000;
41 
42 #ifdef __MACH__
43 #include <mach/clock.h>
44 #include <mach/mach.h>
45 #define GET_CLOCK_MONOTONIC(ts) \
46  { \
47  clock_serv_t cclock; \
48  mach_timespec_t mts; \
49  host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
50  clock_get_time(cclock, &mts); \
51  mach_port_deallocate(mach_task_self(), cclock); \
52  ts.tv_sec = mts.tv_sec; \
53  ts.tv_nsec = mts.tv_nsec; \
54  }
55 #else
56 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
57 #endif
58 
59 using namespace XrdAdaptor;
60 using namespace edm::storage;
61 
62 long long timeDiffMS(const timespec &a, const timespec &b) {
63  long long diff = (a.tv_sec - b.tv_sec) * 1000;
64  diff += (a.tv_nsec - b.tv_nsec) / 1e6;
65  return diff;
66 }
67 
68 /*
69  * We do not care about the response of sending the monitoring information;
70  * this handler class simply frees any returned buffer to prevent memory leaks.
71  */
72 class SendMonitoringInfoHandler : public XrdCl::ResponseHandler {
73  void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
74  if (response) {
75  XrdCl::Buffer *buffer = nullptr;
76  response->Get(buffer);
77  response->Set(static_cast<int *>(nullptr));
78  delete buffer;
79  }
80  // Send Info has a response object; we must delete it.
81  delete response;
82  delete status;
83  delete this;
84  }
85 
86  XrdCl::FileSystem m_fs;
87 
88 public:
90  SendMonitoringInfoHandler &operator=(const SendMonitoringInfoHandler &) = delete;
91  SendMonitoringInfoHandler() = delete;
92 
94 
95  XrdCl::FileSystem &fs() { return m_fs; }
96 };
97 
99  // Do not send this to a dCache data server as they return an error.
100  // In some versions of dCache, sending the monitoring information causes
101  // the server to close the connection - resulting in failures.
102  if (Source::isDCachePool(file)) {
103  return;
104  }
105 
106  // Send the monitoring info, if available.
108  std::string lastUrl;
109  file.GetProperty("LastURL", lastUrl);
110  if (jobId && !lastUrl.empty()) {
111  auto sm_handler = new SendMonitoringInfoHandler(lastUrl);
112  if (!(sm_handler->fs().SendInfo(jobId, sm_handler, 30).IsOK())) {
113  edm::LogWarning("XrdAdaptorInternal")
114  << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
115  delete sm_handler;
116  }
117  edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
118  }
119 }
120 
122  : m_serverToAdvertise(nullptr),
123  m_timeout(XRD_DEFAULT_TIMEOUT),
124  m_nextInitialSourceToggle(false),
125  m_redirectLimitDelayScale(1),
126  m_name(filename),
127  m_flags(flags),
128  m_perms(perms),
129  m_distribution(0, 100),
130  m_excluded_active_count(0) {}
131 
132 void RequestManager::initialize(std::weak_ptr<RequestManager> self) {
134 
135  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
136  if (env) {
137  env->GetInt("StreamErrorWindow", m_timeout);
138  }
139 
140  std::string orig_site;
141  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find('.') == std::string::npos)) {
142  std::string hostname;
143  if (Source::getHostname(orig_site, hostname)) {
144  Source::getDomain(hostname, orig_site);
145  }
146  }
147 
148  std::unique_ptr<XrdCl::File> file;
150  bool validFile = false;
151  const int retries = 5;
152  std::string excludeString;
153  for (int idx = 0; idx < retries; idx++) {
154  file = std::make_unique<XrdCl::File>();
155  auto opaque = prepareOpaqueString();
156  std::string new_filename =
157  m_name + (!opaque.empty() ? ((m_name.find('?') == m_name.npos) ? "?" : "&") + opaque : "");
158  SyncHostResponseHandler handler;
159  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
160  if (!openStatus
161  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
162  // server and it was marked bad - xrootd couldn't even queue up the request internally!
163  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
164  // redirector being down or authentication failures.
165  ex.clearMessage();
166  ex.clearContext();
167  ex.clearAdditionalInfo();
168  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
169  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
170  << ", code=" << openStatus.code << ")";
171  ex.addContext("Calling XrdFile::open()");
172  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
173  throw ex;
174  }
175  handler.WaitForResponse();
176  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
177  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
178  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
179  assert(status);
180  if (status->IsOK()) {
181  validFile = true;
182  break;
183  } else {
184  ex.clearMessage();
185  ex.clearContext();
186  ex.clearAdditionalInfo();
187  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
188  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
189  << ", code=" << status->code << ")";
190  ex.addContext("Calling XrdFile::open()");
191  addConnections(ex);
192  std::string dataServer, lastUrl;
193  file->GetProperty("DataServer", dataServer);
194  file->GetProperty("LastURL", lastUrl);
195  if (!dataServer.empty()) {
196  ex.addAdditionalInfo("Problematic data server: " + dataServer);
197  }
198  if (!lastUrl.empty()) {
199  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
200  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
201  }
202  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
203  m_disabledSourceStrings.end()) {
204  ex << ". No additional data servers were found.";
205  throw ex;
206  }
207  if (!dataServer.empty()) {
208  m_disabledSourceStrings.insert(dataServer);
209  m_disabledExcludeStrings.insert(excludeString);
210  }
211  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
212  if (lastUrl == new_filename) {
213  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
214  throw ex;
215  }
216  }
217  }
218  if (!validFile) {
219  throw ex;
220  }
222 
223  timespec ts;
225 
226  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
227  {
228  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
229  auto oldList = m_activeSources;
230  m_activeSources.push_back(source);
232  }
235 
236  m_lastSourceCheck = ts;
237  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
239 }
240 
249  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
250  // a pending update. *However*, since we call this for every read, we'll get it
251  // eventually.
252  if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
253  return;
254  }
255  std::string *hostname_ptr;
256  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
257  std::unique_ptr<std::string> hostname(hostname_ptr);
259  if (statsService.isAvailable()) {
260  statsService->setCurrentServer(m_name, *hostname_ptr);
261  }
262  }
263 }
264 
266  auto hostname = std::make_unique<std::string>(id);
267  if (Source::getHostname(id, *hostname)) {
268  std::string *null_hostname = nullptr;
269  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
270  hostname.release();
271  }
272  }
273 }
274 
275 namespace {
276  std::string formatSites(std::vector<std::shared_ptr<Source>> const &iSources) {
277  std::string siteA, siteB;
278  if (!iSources.empty()) {
279  siteA = iSources[0]->Site();
280  }
281  if (iSources.size() == 2) {
282  siteB = iSources[1]->Site();
283  }
284  std::string siteList = siteA;
285  if (!siteB.empty() && (siteB != siteA)) {
286  siteList = siteA + ", " + siteB;
287  }
288  return siteList;
289  }
290 } // namespace
291 
292 void RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
293  std::vector<std::shared_ptr<Source>> const &iNew,
294  std::string orig_site) const {
295  auto siteList = formatSites(iNew);
296  if (!orig_site.empty() && (orig_site != siteList)) {
297  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
298  } else {
299  auto oldSites = formatSites(iOld);
300  if (orig_site.empty() && (siteList != oldSites)) {
301  if (!oldSites.empty())
302  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
303  }
304  }
305 }
306 
308  IOSize requestSize,
309  std::vector<std::shared_ptr<Source>> &activeSources,
310  std::vector<std::shared_ptr<Source>> &inactiveSources) {
311  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
312  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
313  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
314  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
315  { // Be more aggressive about getting rid of very bad sources.
316  compareSources(now, 0, 1, activeSources, inactiveSources);
317  compareSources(now, 1, 0, activeSources, inactiveSources);
318  }
320  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
321  }
322  }
323 }
324 
325 bool RequestManager::compareSources(const timespec &now,
326  unsigned a,
327  unsigned b,
328  std::vector<std::shared_ptr<Source>> &activeSources,
329  std::vector<std::shared_ptr<Source>> &inactiveSources) const {
330  if (activeSources.size() < std::max(a, b) + 1) {
331  return false;
332  }
333 
334  bool findNewSource = false;
335  if ((activeSources[a]->getQuality() > 5130) ||
336  ((activeSources[a]->getQuality() > 260) &&
337  (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
338  edm::LogVerbatim("XrdAdaptorInternal")
339  << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
340  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
341  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
342  findNewSource = true;
343  }
344  activeSources[a]->setLastDowngrade(now);
345  inactiveSources.emplace_back(activeSources[a]);
346  auto oldSources = activeSources;
347  activeSources.erase(activeSources.begin() + a);
348  reportSiteChange(oldSources, activeSources);
349  }
350  return findNewSource;
351 }
352 
354  IOSize requestSize,
355  std::vector<std::shared_ptr<Source>> &activeSources,
356  std::vector<std::shared_ptr<Source>> &inactiveSources) {
357  bool findNewSource = false;
358  if (activeSources.size() <= 1) {
359  findNewSource = true;
360  } else if (activeSources.size() > 1) {
361  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
362  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
363  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
364  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
365 
366  // NOTE: We could probably replace the copy with a better sort function.
367  // However, there are typically very few sources and the correctness is more obvious right now.
368  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
369  eligibleInactiveSources.reserve(inactiveSources.size());
370  for (const auto &source : inactiveSources) {
371  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
372  eligibleInactiveSources.push_back(source);
373  }
374  }
375  auto bestInactiveSource =
376  std::min_element(eligibleInactiveSources.begin(),
377  eligibleInactiveSources.end(),
378  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
379  return s1->getQuality() < s2->getQuality();
380  });
381  auto worstActiveSource = std::max_element(activeSources.cbegin(),
382  activeSources.cend(),
383  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
384  return s1->getQuality() < s2->getQuality();
385  });
386  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
387  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
388  << ", quality " << (*bestInactiveSource)->getQuality();
389  }
390  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
391  << ", quality " << (*worstActiveSource)->getQuality();
392  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
393  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
394  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
395  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
396  auto oldSources = activeSources;
397  activeSources.push_back(*bestInactiveSource);
398  reportSiteChange(oldSources, activeSources);
399  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
400  if (it->get() == bestInactiveSource->get()) {
401  inactiveSources.erase(it);
402  break;
403  }
404  } else
405  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
406  (*worstActiveSource)->getQuality() >
407  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
408  edm::LogVerbatim("XrdAdaptorInternal")
409  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
410  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
411  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
412  (*worstActiveSource)->setLastDowngrade(now);
413  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
414  if (it->get() == bestInactiveSource->get()) {
415  inactiveSources.erase(it);
416  break;
417  }
418  inactiveSources.emplace_back(*worstActiveSource);
419  auto oldSources = activeSources;
420  activeSources.erase(worstActiveSource);
421  activeSources.emplace_back(std::move(*bestInactiveSource));
422  reportSiteChange(oldSources, activeSources);
423  eligibleInactiveSources.clear();
424  for (const auto &source : inactiveSources)
425  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
426  eligibleInactiveSources.push_back(source);
427  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
428  eligibleInactiveSources.end(),
429  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
430  return s1->getQuality() < s2->getQuality();
431  });
432  worstActiveSource = std::max_element(activeSources.begin(),
433  activeSources.end(),
434  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
435  return s1->getQuality() < s2->getQuality();
436  });
437  }
438  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
439  float r = m_distribution(m_generator);
441  findNewSource = true;
442  }
443  }
444  }
445  if (findNewSource) {
446  m_open_handler->open();
448  }
449 
450  // Only aggressively look for new sources if we don't have two.
451  if (activeSources.size() == 2) {
453  } else {
455  }
457 }
458 
459 std::shared_ptr<XrdCl::File> RequestManager::getActiveFile() const {
460  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
461  if (m_activeSources.empty()) {
463  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
464  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
465  ex.addContext("In XrdAdaptor::RequestManager::handle()");
466  addConnections(ex);
467  throw ex;
468  }
469  return m_activeSources[0]->getFileHandle();
470 }
471 
472 void RequestManager::getActiveSourceNames(std::vector<std::string> &sources) const {
473  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
474  sources.reserve(m_activeSources.size());
475  for (auto const &source : m_activeSources) {
476  sources.push_back(source->ID());
477  }
478 }
479 
480 void RequestManager::getPrettyActiveSourceNames(std::vector<std::string> &sources) const {
481  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
482  sources.reserve(m_activeSources.size());
483  for (auto const &source : m_activeSources) {
484  sources.push_back(source->PrettyID());
485  }
486 }
487 
488 void RequestManager::getDisabledSourceNames(std::vector<std::string> &sources) const {
489  sources.reserve(m_disabledSourceStrings.size());
490  for (auto const &source : m_disabledSourceStrings) {
491  sources.push_back(source);
492  }
493 }
494 
496  std::vector<std::string> sources;
498  for (auto const &source : sources) {
499  ex.addAdditionalInfo("Active source: " + source);
500  }
501  sources.clear();
503  for (auto const &source : sources) {
504  ex.addAdditionalInfo("Disabled source: " + source);
505  }
506 }
507 
508 std::shared_ptr<Source> RequestManager::pickSingleSource() {
509  std::shared_ptr<Source> source = nullptr;
510  {
511  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
512  if (m_activeSources.size() == 2) {
514  source = m_activeSources[0];
516  } else {
517  source = m_activeSources[1];
519  }
520  } else if (m_activeSources.empty()) {
522  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
523  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
524  ex.addContext("In XrdAdaptor::RequestManager::handle()");
525  addConnections(ex);
526  throw ex;
527  } else {
528  source = m_activeSources[0];
529  }
530  }
531  return source;
532 }
533 
534 std::future<IOSize> RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr) {
535  assert(c_ptr.get());
536  timespec now;
538  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
539  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
540  {
541  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
543  inactiveSources = m_inactiveSources;
544  }
545  {
546  //make sure we update values before calling pickSingelSource
547  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
548  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
550  m_inactiveSources = std::move(inactiveSources);
551  });
552 
553  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
554  }
555 
556  std::shared_ptr<Source> source = pickSingleSource();
557  source->handle(c_ptr);
558  return c_ptr->get_future();
559 }
560 
562  struct {
563  std::stringstream ss;
564  size_t count = 0;
565  bool has_active = false;
566 
567  void append_tried(const std::string &id, bool active = false) {
568  ss << (count ? "," : "tried=") << id;
569  count++;
570  if (active) {
571  has_active = true;
572  }
573  }
574  } state;
575  {
576  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
577 
578  for (const auto &it : m_activeSources) {
579  state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')), true);
580  }
581  for (const auto &it : m_inactiveSources) {
582  state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')));
583  }
584  }
585  for (const auto &it : m_disabledExcludeStrings) {
586  state.append_tried(it.substr(0, it.find(':')));
587  }
588  if (state.has_active) {
589  state.ss << "&triedrc=resel";
590  }
591 
592  return state.ss.str();
593 }
594 
595 void XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source) {
596  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
597  if (status.IsOK()) {
598  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
599  m_redirectLimitDelayScale = 1;
600  for (const auto &s : m_activeSources) {
601  if (source->ID() == s->ID()) {
602  edm::LogVerbatim("XrdAdaptorInternal")
603  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
604  unsigned returned_count = ++m_excluded_active_count;
605  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
606  if (returned_count >= 3) {
607  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2 * XRD_ADAPTOR_SHORT_OPEN_DELAY;
608  }
609  return;
610  }
611  }
612  for (const auto &s : m_inactiveSources) {
613  if (source->ID() == s->ID()) {
614  edm::LogVerbatim("XrdAdaptorInternal")
615  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
616  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
617  return;
618  }
619  }
620  if (m_activeSources.size() < 2) {
621  auto oldSources = m_activeSources;
622  m_activeSources.push_back(source);
623  reportSiteChange(oldSources, m_activeSources);
624  queueUpdateCurrentServer(source->ID());
625  } else {
626  m_inactiveSources.push_back(source);
627  }
628  } else { // File-open failure - wait at least 120s before next attempt.
629  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
630  int delayScale = 1;
631  if (status.status == XrdCl::errRedirectLimit) {
632  m_redirectLimitDelayScale = std::min(2 * m_redirectLimitDelayScale, 100);
633  delayScale = m_redirectLimitDelayScale;
634  }
635  m_nextActiveSourceCheck.tv_sec += delayScale * XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
636  }
637 }
638 
639 std::future<IOSize> XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist) {
640  //Use a copy of m_activeSources and m_inactiveSources throughout this function
641  // in order to avoid holding the lock a long time and causing a deadlock.
642  // When the function is over we will update the values of the containers
643  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
644  {
645  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
646  activeSources = m_activeSources;
647  inactiveSources = m_inactiveSources;
648  }
649  //Make sure we update changes when we leave the function
650  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
651  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
652  m_activeSources = std::move(activeSources);
653  m_inactiveSources = std::move(inactiveSources);
654  });
655 
656  updateCurrentServer();
657 
658  timespec now;
660 
661  edm::CPUTimer timer;
662  timer.start();
663 
664  if (activeSources.size() == 1) {
665  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
666  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
667  activeSources[0]->handle(c_ptr);
668  return c_ptr->get_future();
669  }
670  // Make sure active
671  else if (activeSources.empty()) {
673  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
674  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
675  ex.addContext("In XrdAdaptor::RequestManager::handle()");
676  addConnections(ex);
677  throw ex;
678  }
679 
680  assert(iolist.get());
681  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
682  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
683  splitClientRequest(*iolist, *req1, *req2, activeSources);
684 
685  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
686  // CheckSources may have removed a source
687  if (activeSources.size() == 1) {
688  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
689  activeSources[0]->handle(c_ptr);
690  return c_ptr->get_future();
691  }
692 
693  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
694  std::future<IOSize> future1, future2;
695  if (!req1->empty()) {
696  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
697  activeSources[0]->handle(c_ptr1);
698  future1 = c_ptr1->get_future();
699  }
700  if (!req2->empty()) {
701  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
702  activeSources[1]->handle(c_ptr2);
703  future2 = c_ptr2->get_future();
704  }
705  if (!req1->empty() && !req2->empty()) {
706  std::future<IOSize> task = std::async(
707  std::launch::deferred,
708  [](std::future<IOSize> a, std::future<IOSize> b) {
709  // Wait until *both* results are available. This is essential
710  // as the callback may try referencing the RequestManager. If one
711  // throws an exception (causing the RequestManager to be destroyed by
712  // XrdFile) and the other has a failure, then the recovery code will
713  // reference the destroyed RequestManager.
714  //
715  // Unlike other places where we use shared/weak ptrs to maintain object
716  // lifetime and destruction asynchronously, we *cannot* destroy the request
717  // asynchronously as it is associated with a ROOT buffer. We must wait until we
718  // are guaranteed that XrdCl will not write into the ROOT buffer before we
719  // can return.
720  b.wait();
721  a.wait();
722  return b.get() + a.get();
723  },
724  std::move(future1),
725  std::move(future2));
726  timer.stop();
727  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
728  return task;
729  } else if (!req1->empty()) {
730  return future1;
731  } else if (!req2->empty()) {
732  return future2;
733  } else { // Degenerate case - no bytes to read.
734  std::promise<IOSize> p;
735  p.set_value(0);
736  return p.get_future();
737  }
738 }
739 
740 void RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status) {
741  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
742 
743  // Fail early for invalid responses - XrdFile has a separate path for handling this.
744  if (c_status.code == XrdCl::errInvalidResponse) {
745  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
747  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
748  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
749  << ") => Invalid ReadV response from server";
750  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
751  addConnections(ex);
752  throw ex;
753  }
754  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
755 
756  // Note that we do not delete the Source itself. That is because this
757  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
758  // In such a case, if you close a file in the handler, it will deadlock
759  m_disabledSourceStrings.insert(source_ptr->ID());
760  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
761  m_disabledSources.insert(source_ptr);
762 
763  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
764  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
765  auto oldSources = m_activeSources;
766  m_activeSources.erase(m_activeSources.begin());
767  reportSiteChange(oldSources, m_activeSources);
768  } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
769  auto oldSources = m_activeSources;
770  m_activeSources.erase(m_activeSources.begin() + 1);
771  reportSiteChange(oldSources, m_activeSources);
772  }
773  std::shared_ptr<Source> new_source;
774  if (m_activeSources.empty()) {
775  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
776  timespec now;
779  // Note we only wait for 180 seconds here. This is because we've already failed
780  // once and the likelihood the program has some inconsistent state is decent.
781  // We'd much rather fail hard than deadlock!
782  sentry.unlock();
783  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
786  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
787  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
788  << ") => timeout when waiting for file open";
789  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
790  addConnections(ex);
791  throw ex;
792  } else {
793  try {
794  new_source = future.get();
795  } catch (edm::Exception &ex) {
796  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
797  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
798  throw;
799  }
800  }
801 
802  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
803  m_disabledSourceStrings.end()) {
804  // The server gave us back a data node we requested excluded. Fatal!
806  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
807  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
808  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
809  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
810  addConnections(ex);
811  throw ex;
812  }
813  sentry.lock();
814 
815  auto oldSources = m_activeSources;
816  m_activeSources.push_back(new_source);
817  reportSiteChange(oldSources, m_activeSources);
818  } else {
819  new_source = m_activeSources[0];
820  }
821  new_source->handle(c_ptr);
822 }
823 
824 static void consumeChunkFront(size_t &front,
825  std::vector<IOPosBuffer> &input,
826  std::vector<IOPosBuffer> &output,
827  IOSize chunksize) {
828  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
829  IOPosBuffer &io = input[front];
830  IOPosBuffer &outio = output.back();
831  if (io.size() > chunksize) {
832  IOSize consumed;
833  if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
834  (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
835  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
836  consumed = (XRD_CL_MAX_CHUNK - outio.size());
837  outio.set_size(XRD_CL_MAX_CHUNK);
838  } else {
839  consumed = chunksize;
840  outio.set_size(outio.size() + consumed);
841  }
842  } else {
843  consumed = chunksize;
844  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
845  }
846  chunksize -= consumed;
847  IOSize newsize = io.size() - consumed;
848  IOOffset newoffset = io.offset() + consumed;
849  void *newdata = static_cast<char *>(io.data()) + consumed;
850  io.set_offset(newoffset);
851  io.set_data(newdata);
852  io.set_size(newsize);
853  } else if (io.size() == 0) {
854  front++;
855  } else {
856  output.push_back(io);
857  chunksize -= io.size();
858  front++;
859  }
860  }
861 }
862 
863 static void consumeChunkBack(size_t front,
864  std::vector<IOPosBuffer> &input,
865  std::vector<IOPosBuffer> &output,
866  IOSize chunksize) {
867  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
868  IOPosBuffer &io = input.back();
869  IOPosBuffer &outio = output.back();
870  if (io.size() > chunksize) {
871  IOSize consumed;
872  if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
873  (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
874  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
875  consumed = (XRD_CL_MAX_CHUNK - outio.size());
876  outio.set_size(XRD_CL_MAX_CHUNK);
877  } else {
878  consumed = chunksize;
879  outio.set_size(outio.size() + consumed);
880  }
881  } else {
882  consumed = chunksize;
883  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
884  }
885  chunksize -= consumed;
886  IOSize newsize = io.size() - consumed;
887  IOOffset newoffset = io.offset() + consumed;
888  void *newdata = static_cast<char *>(io.data()) + consumed;
889  io.set_offset(newoffset);
890  io.set_data(newdata);
891  io.set_size(newsize);
892  } else if (io.size() == 0) {
893  input.pop_back();
894  } else {
895  output.push_back(io);
896  chunksize -= io.size();
897  input.pop_back();
898  }
899  }
900 }
901 
902 static IOSize validateList(const std::vector<IOPosBuffer> req) {
903  IOSize total = 0;
904  off_t last_offset = -1;
905  for (const auto &it : req) {
906  total += it.size();
907  assert(it.offset() > last_offset);
908  last_offset = it.offset();
909  assert(it.size() <= XRD_CL_MAX_CHUNK);
910  assert(it.offset() < 0x1ffffffffff);
911  }
912  assert(req.size() <= 1024);
913  return total;
914 }
915 
916 void XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist,
917  std::vector<IOPosBuffer> &req1,
918  std::vector<IOPosBuffer> &req2,
919  std::vector<std::shared_ptr<Source>> const &activeSources) const {
920  if (iolist.empty())
921  return;
922  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
923  req1.reserve(iolist.size() / 2 + 1);
924  req2.reserve(iolist.size() / 2 + 1);
925  size_t front = 0;
926 
927  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
928  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
929  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
930  IOSize chunk1, chunk2;
931  // Make sure the chunk size is at least 1024; little point to reads less than that size.
932  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
933  static_cast<IOSize>(1024));
934  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
935  static_cast<IOSize>(1024));
936 
937  IOSize size_orig = 0;
938  for (const auto &it : iolist)
939  size_orig += it.size();
940 
941  while (tmp_iolist.size() - front > 0) {
942  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
943  (req2.size() >=
944  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
945  // are passed to the request manager. However, because we have a max chunk size, we increase
946  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
947  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
948  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
950  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
951  << ", permissions=0" << std::oct << m_perms << std::dec
952  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
953  "reported to CMSSW developers.";
954  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
955  addConnections(ex);
956  std::stringstream ss;
957  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
958  ex.addAdditionalInfo(ss.str());
959  std::stringstream ss2;
960  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
961  ex.addAdditionalInfo(ss2.str());
962  throw ex;
963  }
964  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
965  consumeChunkFront(front, tmp_iolist, req1, chunk1);
966  }
967  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
968  consumeChunkBack(front, tmp_iolist, req2, chunk2);
969  }
970  }
971  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
972  return left.offset() < right.offset();
973  });
974  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
975  return left.offset() < right.offset();
976  });
977 
978  IOSize size1 = validateList(req1);
979  IOSize size2 = validateList(req2);
980 
981  assert(size_orig == size1 + size2);
982 
983  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
984  << " bytes) split into requests size " << req1.size() << " (" << size1
985  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
986 }
987 
988 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager) : m_manager(manager) {}
989 
990 // Cannot use ~OpenHandler=default as XrdCl::File is not fully
991 // defined in the header.
993 
995  XrdCl::AnyObject *,
996  XrdCl::HostList *hostList_ptr) {
997  // Make sure we get rid of the strong self-reference when the callback finishes.
998  std::shared_ptr<OpenHandler> self = m_self;
999  m_self.reset();
1000 
1001  // NOTE: as in XrdCl::File (synchronous), we ignore the response object.
1002  // Make sure that we set m_outstanding_open to false on exit from this function.
1003  // NOTE: we need to pass non-nullptr to unique_ptr in order for the guard to run
1004  std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> outstanding_guard(
1005  this, [&](OpenHandler *) { m_outstanding_open = false; });
1006 
1007  std::shared_ptr<Source> source;
1008  std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
1009  std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1010 
1011  auto manager = m_manager.lock();
1012  // Manager object has already been deleted. Cleanup the
1013  // response objects, remove our self-reference, and ignore the response.
1014  if (!manager) {
1015  return;
1016  }
1017  //if we need to delete the File object we must do it outside
1018  // of the lock to avoid a potential deadlock
1019  std::unique_ptr<XrdCl::File> releaseFile;
1020  {
1021  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1022 
1023  if (status->IsOK()) {
1024  SendMonitoringInfo(*m_file);
1025  timespec now;
1027 
1028  std::string excludeString;
1029  Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1030 
1031  source.reset(new Source(now, std::move(m_file), excludeString));
1032  m_promise.set_value(source);
1033  } else {
1034  releaseFile = std::move(m_file);
1036  ex << "XrdCl::File::Open(name='" << manager->m_name << "', flags=0x" << std::hex << manager->m_flags
1037  << ", permissions=0" << std::oct << manager->m_perms << std::dec << ") => error '" << status->ToStr()
1038  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1039  ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1040  manager->addConnections(ex);
1041  m_promise.set_exception(std::make_exception_ptr(ex));
1042  }
1043  }
1044  manager->handleOpen(*status, source);
1045 }
1046 
1048  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1049 
1050  if (!m_file.get()) {
1051  return "(no open in progress)";
1052  }
1053  std::string dataServer;
1054  m_file->GetProperty("DataServer", dataServer);
1055  if (dataServer.empty()) {
1056  return "(unknown source)";
1057  }
1058  return dataServer;
1059 }
1060 
1061 std::shared_future<std::shared_ptr<Source>> XrdAdaptor::RequestManager::OpenHandler::open() {
1062  auto manager_ptr = m_manager.lock();
1063  if (!manager_ptr) {
1065  ex << "XrdCl::File::Open() =>"
1066  << " error: OpenHandler called within an invalid RequestManager context."
1067  << " This is a logic error and should be reported to the CMSSW developers.";
1068  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1069  throw ex;
1070  }
1071  RequestManager &manager = *manager_ptr;
1072  auto self_ptr = m_self_weak.lock();
1073  if (!self_ptr) {
1075  ex << "XrdCl::File::Open() => error: "
1076  << "OpenHandler called after it was deleted. This is a logic error "
1077  << "and should be reported to the CMSSW developers.";
1078  ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1079  throw ex;
1080  }
1081 
1082  // NOTE NOTE: we look at this variable *without* the lock. This means the method
1083  // is not thread-safe; the caller is responsible to verify it is not called from
1084  // multiple threads simultaneously.
1085  //
1086  // This is done because ::open may be called from a Xrootd callback; if we
1087  // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1088  // and make a call into xrootd (when it invokes m_file.reset()). Hence, our callback
1089  // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1090  // an Xrootd mutex and tries to hold m_mutex. This is a classic deadlock.
1091  if (m_outstanding_open) {
1092  return m_shared_future;
1093  }
1094  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1095  std::promise<std::shared_ptr<Source>> new_promise;
1096  m_promise.swap(new_promise);
1097  m_shared_future = m_promise.get_future().share();
1098 
1099  auto opaque = manager.prepareOpaqueString();
1100  std::string new_name = manager.m_name + ((manager.m_name.find('?') == manager.m_name.npos) ? "?" : "&") + opaque;
1101  edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1102  m_file = std::make_unique<XrdCl::File>();
1103  m_outstanding_open = true;
1104 
1105  // Always make sure we release m_file and set m_outstanding_open to false on error.
1106  std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> exit_guard(this, [&](OpenHandler *) {
1107  m_outstanding_open = false;
1108  m_file.reset();
1109  });
1110 
1111  XrdCl::XRootDStatus status;
1112  if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK()) {
1114  ex << "XrdCl::File::Open(name='" << new_name << "', flags=0x" << std::hex << manager.m_flags << ", permissions=0"
1115  << std::oct << manager.m_perms << std::dec << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
1116  << ", code=" << status.code << ")";
1117  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1118  manager.addConnections(ex);
1119  throw ex;
1120  }
1121  exit_guard.release();
1122  // Have a strong self-reference for as long as the callback is in-progress.
1123  m_self = self_ptr;
1124  return m_shared_future;
1125 }
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
Log< level::Info, true > LogVerbatim
int64_t IOOffset
Definition: IOTypes.h:20
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_future< std::shared_ptr< Source > > open()
double seconds()
void start()
Definition: CPUTimer.cc:68
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:290
#define LIKELY(x)
Definition: Likely.h:20
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
static constexpr int XRD_CL_MAX_CHUNK
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:56
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
int timeout
Definition: mps_check.py:53
assert(be >=bs)
std::string prepareOpaqueString() const
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:50
IOOffset offset() const
Definition: IOPosBuffer.h:41
static std::string const input
Definition: EdmProvDump.cc:47
static void SendMonitoringInfo(XrdCl::File &file)
OpenHandler(const OpenHandler &)=delete
void queueUpdateCurrentServer(const std::string &)
long long timeDiffMS(const timespec &a, const timespec &b)
static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:248
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:258
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
void addConnections(cms::Exception &) const
Times stop()
Definition: CPUTimer.cc:87
void clearMessage()
Definition: Exception.cc:159
std::shared_ptr< XrdCl::File > getActiveFile() const
std::atomic< std::string * > m_serverToAdvertise
std::shared_ptr< OpenHandler > m_open_handler
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void clearContext()
Definition: Exception.cc:161
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
Log< level::Info, false > LogInfo
size_t IOSize
Definition: IOTypes.h:15
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:118
void addContext(std::string const &context)
Definition: Exception.cc:165
static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY
static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:53
void getDisabledSourceNames(std::vector< std::string > &sources) const
double a
Definition: hdecay.h:119
std::shared_ptr< Source > pickSingleSource()
static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD
oneapi::tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:328
static IOSize validateList(const std::vector< IOPosBuffer > req)
edm::storage::IOSize IOSize
bool isAvailable() const
Definition: Service.h:40
XrdCl::FileSystem & fs()
Log< level::Warning, false > LogWarning
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
SendMonitoringInfoHandler(const std::string &url)
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:221
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
static std::string const source
Definition: EdmProvDump.cc:46
void clearAdditionalInfo()
Definition: Exception.cc:163
RequestManager(const RequestManager &)=delete
void getActiveSourceNames(std::vector< std::string > &sources) const
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)