CMS 3D CMS Logo

XrdRequestManager.cc
Go to the documentation of this file.
1 
2 #include <algorithm>
3 #include <cassert>
4 #include <iostream>
5 #include <memory>
6 
7 #include <netdb.h>
8 
9 #include "XrdCl/XrdClPostMasterInterfaces.hh"
10 #include "XrdCl/XrdClPostMaster.hh"
11 
12 #include "XrdCl/XrdClFile.hh"
13 #include "XrdCl/XrdClDefaultEnv.hh"
14 #include "XrdCl/XrdClFileSystem.hh"
15 
23 
24 #include "XrdStatistics.h"
26 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
27 
28 static constexpr int XRD_CL_MAX_CHUNK = 512 * 1024;
29 
30 static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY = 5;
31 
32 #ifdef XRD_FAKE_OPEN_PROBE
33 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 100;
34 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 20;
35 // This is the minimal difference in quality required to swap an active and inactive source
36 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 0;
37 #else
38 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 10;
39 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 2 * 60;
40 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 100;
41 #endif
42 
43 static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD = 1000;
44 
45 #ifdef __MACH__
46 #include <mach/clock.h>
47 #include <mach/mach.h>
48 #define GET_CLOCK_MONOTONIC(ts) \
49  { \
50  clock_serv_t cclock; \
51  mach_timespec_t mts; \
52  host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
53  clock_get_time(cclock, &mts); \
54  mach_port_deallocate(mach_task_self(), cclock); \
55  ts.tv_sec = mts.tv_sec; \
56  ts.tv_nsec = mts.tv_nsec; \
57  }
58 #else
59 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
60 #endif
61 
62 using namespace XrdAdaptor;
63 using namespace edm::storage;
64 
65 long long timeDiffMS(const timespec &a, const timespec &b) {
66  long long diff = (a.tv_sec - b.tv_sec) * 1000;
67  diff += (a.tv_nsec - b.tv_nsec) / 1e6;
68  return diff;
69 }
70 
71 /*
72  * We do not care about the response of sending the monitoring information;
73  * this handler class simply frees any returned buffer to prevent memory leaks.
74  */
75 class SendMonitoringInfoHandler : public XrdCl::ResponseHandler {
76  void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
77  if (response) {
78  XrdCl::Buffer *buffer = nullptr;
79  response->Get(buffer);
80  response->Set(static_cast<int *>(nullptr));
81  delete buffer;
82  }
83  // Send Info has a response object; we must delete it.
84  delete response;
85  delete status;
86  delete this;
87  }
88 
89  XrdCl::FileSystem m_fs;
90 
91 public:
93  SendMonitoringInfoHandler &operator=(const SendMonitoringInfoHandler &) = delete;
94  SendMonitoringInfoHandler() = delete;
95 
97 
98  XrdCl::FileSystem &fs() { return m_fs; }
99 };
100 
102  // Do not send this to a dCache data server as they return an error.
103  // In some versions of dCache, sending the monitoring information causes
104  // the server to close the connection - resulting in failures.
105  if (Source::isDCachePool(file)) {
106  return;
107  }
108 
109  // Send the monitoring info, if available.
111  std::string lastUrl;
112  file.GetProperty("LastURL", lastUrl);
113  if (jobId && !lastUrl.empty()) {
114  auto sm_handler = new SendMonitoringInfoHandler(lastUrl);
115  if (!(sm_handler->fs().SendInfo(jobId, sm_handler, 30).IsOK())) {
116  edm::LogWarning("XrdAdaptorInternal")
117  << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
118  delete sm_handler;
119  }
120  edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
121  }
122 }
123 
124 namespace {
125  std::unique_ptr<std::string> getQueryTransport(const XrdCl::URL &url, uint16_t query) {
126  XrdCl::AnyObject result;
127  XrdCl::DefaultEnv::GetPostMaster()->QueryTransport(url, query, result);
128  std::string *tmp;
129  result.Get(tmp);
130  return std::unique_ptr<std::string>(tmp);
131  }
132 
133  void tracerouteRedirections(const XrdCl::HostList *hostList) {
134  edm::LogInfo("XrdAdaptorLvl2").log([hostList](auto &li) {
135  int idx_redirection = 1;
136  li << "-------------------------------\nTraceroute:\n";
137  for (auto const &host : *hostList) {
138  // Query info
139  std::unique_ptr<std::string> stack_ip_method = getQueryTransport(host.url, XrdCl::StreamQuery::IpStack);
140  std::unique_ptr<std::string> ip_method = getQueryTransport(host.url, XrdCl::StreamQuery::IpAddr);
141  std::unique_ptr<std::string> auth_method = getQueryTransport(host.url, XrdCl::TransportQuery::Auth);
142  std::unique_ptr<std::string> hostname_method = getQueryTransport(host.url, XrdCl::StreamQuery::HostName);
143  std::string type_resource = "endpoint";
145  // Organize redirection info
146  if (!auth_method->empty()) {
147  authentication = *auth_method;
148  } else {
149  authentication = "no auth";
150  };
151  if (host.loadBalancer == 1) {
152  type_resource = "load balancer";
153  };
154  li.format("{}. || {} / {} / {} / {} / {} / {} ||\n",
155  idx_redirection,
156  *hostname_method,
157  *stack_ip_method,
158  *ip_method,
159  host.url.GetPort(),
161  type_resource);
162  ++idx_redirection;
163  }
164  li.format("-------------------------------");
165  });
166  }
167 } // namespace
168 
170  : m_serverToAdvertise(nullptr),
171  m_timeout(XRD_DEFAULT_TIMEOUT),
172  m_nextInitialSourceToggle(false),
173  m_redirectLimitDelayScale(1),
174  m_name(filename),
175  m_flags(flags),
176  m_perms(perms),
177  m_distribution(0, 100),
178  m_excluded_active_count(0) {}
179 
180 void RequestManager::initialize(std::weak_ptr<RequestManager> self) {
182 
183  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
184  if (env) {
185  env->GetInt("StreamErrorWindow", m_timeout);
186  }
187 
188  std::string orig_site;
189  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find('.') == std::string::npos)) {
190  std::string hostname;
191  if (Source::getHostname(orig_site, hostname)) {
192  Source::getDomain(hostname, orig_site);
193  }
194  }
195 
196  std::unique_ptr<XrdCl::File> file;
198  bool validFile = false;
199  const int retries = 5;
200  std::string excludeString;
201  for (int idx = 0; idx < retries; idx++) {
202  file = std::make_unique<XrdCl::File>();
203  auto opaque = prepareOpaqueString();
204  std::string new_filename =
205  m_name + (!opaque.empty() ? ((m_name.find('?') == m_name.npos) ? "?" : "&") + opaque : "");
206  SyncHostResponseHandler handler;
207  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
208  if (!openStatus
209  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
210  // server and it was marked bad - xrootd couldn't even queue up the request internally!
211  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
212  // redirector being down or authentication failures.
213  ex.clearMessage();
214  ex.clearContext();
215  ex.clearAdditionalInfo();
216  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
217  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
218  << ", code=" << openStatus.code << ")";
219  ex.addContext("Calling XrdFile::open()");
220  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
221  throw ex;
222  }
223  handler.WaitForResponse();
224  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
225  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
226  tracerouteRedirections(hostList.get());
227  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
228  assert(status);
229  if (status->IsOK()) {
230  validFile = true;
231  break;
232  } else {
233  ex.clearMessage();
234  ex.clearContext();
235  ex.clearAdditionalInfo();
236  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
237  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
238  << ", code=" << status->code << ")";
239  ex.addContext("Calling XrdFile::open()");
240  addConnections(ex);
241  std::string dataServer, lastUrl;
242  file->GetProperty("DataServer", dataServer);
243  file->GetProperty("LastURL", lastUrl);
244  if (!dataServer.empty()) {
245  ex.addAdditionalInfo("Problematic data server: " + dataServer);
246  }
247  if (!lastUrl.empty()) {
248  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
249  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
250  }
251  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
252  m_disabledSourceStrings.end()) {
253  ex << ". No additional data servers were found.";
254  throw ex;
255  }
256  if (!dataServer.empty()) {
257  m_disabledSourceStrings.insert(dataServer);
258  m_disabledExcludeStrings.insert(excludeString);
259  }
260  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
261  if (lastUrl == new_filename) {
262  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
263  throw ex;
264  }
265  }
266  }
267  if (!validFile) {
268  throw ex;
269  }
271 
272  timespec ts;
274 
275  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
276  {
277  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
278  auto oldList = m_activeSources;
279  m_activeSources.push_back(source);
281  }
284 
285  m_lastSourceCheck = ts;
286  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
288 }
289 
298  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
299  // a pending update. *However*, since we call this for every read, we'll get it
300  // eventually.
301  if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
302  return;
303  }
304  std::string *hostname_ptr;
305  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
306  std::unique_ptr<std::string> hostname(hostname_ptr);
308  if (statsService.isAvailable()) {
309  statsService->setCurrentServer(m_name, *hostname_ptr);
310  }
311  }
312 }
313 
315  auto hostname = std::make_unique<std::string>(id);
316  if (Source::getHostname(id, *hostname)) {
317  std::string *null_hostname = nullptr;
318  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
319  hostname.release();
320  }
321  }
322 }
323 
324 namespace {
325  std::string formatSites(std::vector<std::shared_ptr<Source>> const &iSources) {
326  std::string siteA, siteB;
327  if (!iSources.empty()) {
328  siteA = iSources[0]->Site();
329  }
330  if (iSources.size() == 2) {
331  siteB = iSources[1]->Site();
332  }
333  std::string siteList = siteA;
334  if (!siteB.empty() && (siteB != siteA)) {
335  siteList = siteA + ", " + siteB;
336  }
337  return siteList;
338  }
339 } // namespace
340 
341 void RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
342  std::vector<std::shared_ptr<Source>> const &iNew,
343  std::string orig_site) const {
344  auto siteList = formatSites(iNew);
345  if (orig_site.empty() || (orig_site == siteList)) {
346  auto oldSites = formatSites(iOld);
347  }
348 
349  edm::LogInfo("XrdAdaptorLvl1").log([&](auto &li) {
350  li << "Serving data from: ";
351  int size_active_sources = iNew.size();
352  for (int i = 0; i < size_active_sources; ++i) {
353  std::string hostname_a;
354  Source::getHostname(iNew[i]->PrettyID(), hostname_a);
355  li.format(" [{}] {}", i + 1, hostname_a);
356  }
357  });
358 
359  edm::LogInfo("XrdAdaptorLvl3").log([&](auto &li) {
360  li << "The quality of the active sources is: ";
361  int size_active_sources = iNew.size();
362  for (int i = 0; i < size_active_sources; ++i) {
363  std::string hostname_a;
364  Source::getHostname(iNew[i]->PrettyID(), hostname_a);
365  std::string quality = std::to_string(iNew[i]->getQuality());
366  li.format(" [{}] {} for {}", i + 1, quality, hostname_a);
367  }
368  });
369 }
370 
372  IOSize requestSize,
373  std::vector<std::shared_ptr<Source>> &activeSources,
374  std::vector<std::shared_ptr<Source>> &inactiveSources) {
375  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
376  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
377  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
378  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
379  { // Be more aggressive about getting rid of very bad sources.
380  compareSources(now, 0, 1, activeSources, inactiveSources);
381  compareSources(now, 1, 0, activeSources, inactiveSources);
382  }
384  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
385  }
386  }
387 }
388 
389 bool RequestManager::compareSources(const timespec &now,
390  unsigned a,
391  unsigned b,
392  std::vector<std::shared_ptr<Source>> &activeSources,
393  std::vector<std::shared_ptr<Source>> &inactiveSources) const {
394  if (activeSources.size() < std::max(a, b) + 1) {
395  return false;
396  }
397  unsigned quality_a = activeSources[a]->getQuality();
398  unsigned quality_b = activeSources[b]->getQuality();
399  bool findNewSource = false;
400  if ((quality_a > 5130) || ((quality_a > 260) && (quality_b * 4 < quality_a))) {
401  std::string hostname_a;
402  Source::getHostname(activeSources[a]->ID(), hostname_a);
403  if (quality_a > 5130) {
404  edm::LogWarning("XrdAdaptorLvl3") << "Deactivating " << hostname_a << " from active sources because the quality ("
405  << quality_a << ") is above 5130 and it is not the only active server";
406  }
407  if ((quality_a > 260) && (quality_b * 4 < quality_a)) {
408  std::string hostname_b;
409  Source::getHostname(activeSources[b]->ID(), hostname_b);
410  edm::LogWarning("XrdAdaptorLvl3") << "Deactivating " << hostname_a << " from active sources because its quality ("
411  << quality_a
412  << ") is higher than 260 and 4 times larger than the other active server "
413  << hostname_b << " (" << quality_b << ") ";
414  }
415  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << hostname_a << " from active sources due to poor quality ("
416  << quality_a << " vs " << quality_b << ")" << std::endl;
417  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
418  findNewSource = true;
419  }
420  activeSources[a]->setLastDowngrade(now);
421  inactiveSources.emplace_back(activeSources[a]);
422  auto oldSources = activeSources;
423  activeSources.erase(activeSources.begin() + a);
424  reportSiteChange(oldSources, activeSources);
425  }
426  return findNewSource;
427 }
428 
430  IOSize requestSize,
431  std::vector<std::shared_ptr<Source>> &activeSources,
432  std::vector<std::shared_ptr<Source>> &inactiveSources) {
433  bool findNewSource = false;
434  if (activeSources.size() <= 1) {
435  findNewSource = true;
436  edm::LogInfo("XrdAdaptorLvl3")
437  << "Looking for an additional source because the number of active sources is smaller than 2";
438  } else if (activeSources.size() > 1) {
439  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
440  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
441  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
442  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
443 
444  // NOTE: We could probably replace the copy with a better sort function.
445  // However, there are typically very few sources and the correctness is more obvious right now.
446  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
447  eligibleInactiveSources.reserve(inactiveSources.size());
448  for (const auto &source : inactiveSources) {
449  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
450  eligibleInactiveSources.push_back(source);
451  }
452  }
453  auto bestInactiveSource =
454  std::min_element(eligibleInactiveSources.begin(),
455  eligibleInactiveSources.end(),
456  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
457  return s1->getQuality() < s2->getQuality();
458  });
459  auto worstActiveSource = std::max_element(activeSources.cbegin(),
460  activeSources.cend(),
461  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
462  return s1->getQuality() < s2->getQuality();
463  });
464  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
465  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
466  << ", quality " << (*bestInactiveSource)->getQuality();
467  }
468  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
469  << ", quality " << (*worstActiveSource)->getQuality();
470  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
471  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
472  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
473  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
474  auto oldSources = activeSources;
475  activeSources.push_back(*bestInactiveSource);
476  reportSiteChange(oldSources, activeSources);
477  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
478  if (it->get() == bestInactiveSource->get()) {
479  inactiveSources.erase(it);
480  break;
481  }
482  } else
483  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
484  (*worstActiveSource)->getQuality() >
485  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
486  edm::LogVerbatim("XrdAdaptorInternal")
487  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
488  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
489  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
490  (*worstActiveSource)->setLastDowngrade(now);
491  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
492  if (it->get() == bestInactiveSource->get()) {
493  inactiveSources.erase(it);
494  break;
495  }
496  inactiveSources.emplace_back(*worstActiveSource);
497  auto oldSources = activeSources;
498  activeSources.erase(worstActiveSource);
499  activeSources.emplace_back(std::move(*bestInactiveSource));
500  reportSiteChange(oldSources, activeSources);
501  eligibleInactiveSources.clear();
502  for (const auto &source : inactiveSources)
503  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
504  eligibleInactiveSources.push_back(source);
505  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
506  eligibleInactiveSources.end(),
507  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
508  return s1->getQuality() < s2->getQuality();
509  });
510  worstActiveSource = std::max_element(activeSources.begin(),
511  activeSources.end(),
512  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
513  return s1->getQuality() < s2->getQuality();
514  });
515  }
516  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
517  float r = m_distribution(m_generator);
519  findNewSource = true;
520  }
521  }
522  }
523  if (findNewSource) {
524  m_open_handler->open();
526  }
527 
528  // Only aggressively look for new sources if we don't have two.
529  if (activeSources.size() == 2) {
531  } else {
533  }
535 }
536 
537 std::shared_ptr<XrdCl::File> RequestManager::getActiveFile() const {
538  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
539  if (m_activeSources.empty()) {
541  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
542  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
543  ex.addContext("In XrdAdaptor::RequestManager::handle()");
544  addConnections(ex);
545  throw ex;
546  }
547  return m_activeSources[0]->getFileHandle();
548 }
549 
550 void RequestManager::getActiveSourceNames(std::vector<std::string> &sources) const {
551  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
552  sources.reserve(m_activeSources.size());
553  for (auto const &source : m_activeSources) {
554  sources.push_back(source->ID());
555  }
556 }
557 
558 void RequestManager::getPrettyActiveSourceNames(std::vector<std::string> &sources) const {
559  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
560  sources.reserve(m_activeSources.size());
561  for (auto const &source : m_activeSources) {
562  sources.push_back(source->PrettyID());
563  }
564 }
565 
566 void RequestManager::getDisabledSourceNames(std::vector<std::string> &sources) const {
567  sources.reserve(m_disabledSourceStrings.size());
568  for (auto const &source : m_disabledSourceStrings) {
569  sources.push_back(source);
570  }
571 }
572 
574  std::vector<std::string> sources;
576  for (auto const &source : sources) {
577  ex.addAdditionalInfo("Active source: " + source);
578  }
579  sources.clear();
581  for (auto const &source : sources) {
582  ex.addAdditionalInfo("Disabled source: " + source);
583  }
584 }
585 
586 std::shared_ptr<Source> RequestManager::pickSingleSource() {
587  std::shared_ptr<Source> source = nullptr;
588  {
589  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
590  if (m_activeSources.size() == 2) {
592  source = m_activeSources[0];
594  } else {
595  source = m_activeSources[1];
597  }
598  } else if (m_activeSources.empty()) {
600  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
601  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
602  ex.addContext("In XrdAdaptor::RequestManager::handle()");
603  addConnections(ex);
604  throw ex;
605  } else {
606  source = m_activeSources[0];
607  }
608  }
609  return source;
610 }
611 
612 std::future<IOSize> RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr) {
613  assert(c_ptr.get());
614  timespec now;
616  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
617  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
618  {
619  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
621  inactiveSources = m_inactiveSources;
622  }
623  {
624  //make sure we update values before calling pickSingelSource
625  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
626  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
628  m_inactiveSources = std::move(inactiveSources);
629  });
630 
631  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
632  }
633 
634  std::shared_ptr<Source> source = pickSingleSource();
635  source->handle(c_ptr);
636  return c_ptr->get_future();
637 }
638 
640  struct {
641  std::stringstream ss;
642  size_t count = 0;
643  bool has_active = false;
644 
645  void append_tried(const std::string &id, bool active = false) {
646  ss << (count ? "," : "tried=") << id;
647  count++;
648  if (active) {
649  has_active = true;
650  }
651  }
652  } state;
653  {
654  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
655 
656  for (const auto &it : m_activeSources) {
657  state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')), true);
658  }
659  for (const auto &it : m_inactiveSources) {
660  state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')));
661  }
662  }
663  for (const auto &it : m_disabledExcludeStrings) {
664  state.append_tried(it.substr(0, it.find(':')));
665  }
666  if (state.has_active) {
667  state.ss << "&triedrc=resel";
668  }
669 
670  return state.ss.str();
671 }
672 
673 void XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source) {
674  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
675  if (status.IsOK()) {
676  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
677  m_redirectLimitDelayScale = 1;
678  for (const auto &s : m_activeSources) {
679  if (source->ID() == s->ID()) {
680  edm::LogVerbatim("XrdAdaptorInternal")
681  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
682  unsigned returned_count = ++m_excluded_active_count;
683  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
684  if (returned_count >= 3) {
685  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2 * XRD_ADAPTOR_SHORT_OPEN_DELAY;
686  }
687  return;
688  }
689  }
690  for (const auto &s : m_inactiveSources) {
691  if (source->ID() == s->ID()) {
692  edm::LogVerbatim("XrdAdaptorInternal")
693  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
694  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
695  return;
696  }
697  }
698  if (m_activeSources.size() < 2) {
699  auto oldSources = m_activeSources;
700  m_activeSources.push_back(source);
701  reportSiteChange(oldSources, m_activeSources);
702  queueUpdateCurrentServer(source->ID());
703  } else {
704  m_inactiveSources.push_back(source);
705  }
706  } else { // File-open failure - wait at least 120s before next attempt.
707  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
708  int delayScale = 1;
709  if (status.status == XrdCl::errRedirectLimit) {
710  m_redirectLimitDelayScale = std::min(2 * m_redirectLimitDelayScale, 100);
711  delayScale = m_redirectLimitDelayScale;
712  }
713  m_nextActiveSourceCheck.tv_sec += delayScale * XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
714  }
715 }
716 
717 std::future<IOSize> XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist) {
718  //Use a copy of m_activeSources and m_inactiveSources throughout this function
719  // in order to avoid holding the lock a long time and causing a deadlock.
720  // When the function is over we will update the values of the containers
721  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
722  {
723  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
724  activeSources = m_activeSources;
725  inactiveSources = m_inactiveSources;
726  }
727  //Make sure we update changes when we leave the function
728  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
729  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
730  m_activeSources = std::move(activeSources);
731  m_inactiveSources = std::move(inactiveSources);
732  });
733 
734  updateCurrentServer();
735 
736  timespec now;
738 
739  edm::CPUTimer timer;
740  timer.start();
741 
742  if (activeSources.size() == 1) {
743  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
744  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
745  activeSources[0]->handle(c_ptr);
746  return c_ptr->get_future();
747  }
748  // Make sure active
749  else if (activeSources.empty()) {
751  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
752  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
753  ex.addContext("In XrdAdaptor::RequestManager::handle()");
754  addConnections(ex);
755  throw ex;
756  }
757 
758  assert(iolist.get());
759  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
760  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
761  splitClientRequest(*iolist, *req1, *req2, activeSources);
762 
763  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
764  // CheckSources may have removed a source
765  if (activeSources.size() == 1) {
766  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
767  activeSources[0]->handle(c_ptr);
768  return c_ptr->get_future();
769  }
770 
771  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
772  std::future<IOSize> future1, future2;
773  if (!req1->empty()) {
774  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
775  activeSources[0]->handle(c_ptr1);
776  future1 = c_ptr1->get_future();
777  }
778  if (!req2->empty()) {
779  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
780  activeSources[1]->handle(c_ptr2);
781  future2 = c_ptr2->get_future();
782  }
783  if (!req1->empty() && !req2->empty()) {
784  std::future<IOSize> task = std::async(
785  std::launch::deferred,
786  [](std::future<IOSize> a, std::future<IOSize> b) {
787  // Wait until *both* results are available. This is essential
788  // as the callback may try referencing the RequestManager. If one
789  // throws an exception (causing the RequestManager to be destroyed by
790  // XrdFile) and the other has a failure, then the recovery code will
791  // reference the destroyed RequestManager.
792  //
793  // Unlike other places where we use shared/weak ptrs to maintain object
794  // lifetime and destruction asynchronously, we *cannot* destroy the request
795  // asynchronously as it is associated with a ROOT buffer. We must wait until we
796  // are guaranteed that XrdCl will not write into the ROOT buffer before we
797  // can return.
798  b.wait();
799  a.wait();
800  return b.get() + a.get();
801  },
802  std::move(future1),
803  std::move(future2));
804  timer.stop();
805  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
806  return task;
807  } else if (!req1->empty()) {
808  return future1;
809  } else if (!req2->empty()) {
810  return future2;
811  } else { // Degenerate case - no bytes to read.
812  std::promise<IOSize> p;
813  p.set_value(0);
814  return p.get_future();
815  }
816 }
817 
818 void RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status) {
819  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
820 
821  // Fail early for invalid responses - XrdFile has a separate path for handling this.
822  if (c_status.code == XrdCl::errInvalidResponse) {
823  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
825  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
826  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
827  << ") => Invalid ReadV response from server";
828  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
829  addConnections(ex);
830  throw ex;
831  }
832  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
833 
834  // Note that we do not delete the Source itself. That is because this
835  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
836  // In such a case, if you close a file in the handler, it will deadlock
837  m_disabledSourceStrings.insert(source_ptr->ID());
838  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
839  m_disabledSources.insert(source_ptr);
840 
841  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
842  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
843  auto oldSources = m_activeSources;
844  m_activeSources.erase(m_activeSources.begin());
845  reportSiteChange(oldSources, m_activeSources);
846  } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
847  auto oldSources = m_activeSources;
848  m_activeSources.erase(m_activeSources.begin() + 1);
849  reportSiteChange(oldSources, m_activeSources);
850  }
851  std::shared_ptr<Source> new_source;
852  if (m_activeSources.empty()) {
853  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
854  timespec now;
857  // Note we only wait for 180 seconds here. This is because we've already failed
858  // once and the likelihood the program has some inconsistent state is decent.
859  // We'd much rather fail hard than deadlock!
860  sentry.unlock();
861  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
864  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
865  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
866  << ") => timeout when waiting for file open";
867  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
868  addConnections(ex);
869  throw ex;
870  } else {
871  try {
872  new_source = future.get();
873  } catch (edm::Exception &ex) {
874  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
875  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
876  throw;
877  }
878  }
879 
880  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
881  m_disabledSourceStrings.end()) {
882  // The server gave us back a data node we requested excluded. Fatal!
884  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
885  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
886  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
887  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
888  addConnections(ex);
889  throw ex;
890  }
891  sentry.lock();
892 
893  auto oldSources = m_activeSources;
894  m_activeSources.push_back(new_source);
895  reportSiteChange(oldSources, m_activeSources);
896  } else {
897  new_source = m_activeSources[0];
898  }
899  new_source->handle(c_ptr);
900 }
901 
902 static void consumeChunkFront(size_t &front,
903  std::vector<IOPosBuffer> &input,
904  std::vector<IOPosBuffer> &output,
905  IOSize chunksize) {
906  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
907  IOPosBuffer &io = input[front];
908  IOPosBuffer &outio = output.back();
909  if (io.size() > chunksize) {
910  IOSize consumed;
911  if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
912  (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
913  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
914  consumed = (XRD_CL_MAX_CHUNK - outio.size());
915  outio.set_size(XRD_CL_MAX_CHUNK);
916  } else {
917  consumed = chunksize;
918  outio.set_size(outio.size() + consumed);
919  }
920  } else {
921  consumed = chunksize;
922  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
923  }
924  chunksize -= consumed;
925  IOSize newsize = io.size() - consumed;
926  IOOffset newoffset = io.offset() + consumed;
927  void *newdata = static_cast<char *>(io.data()) + consumed;
928  io.set_offset(newoffset);
929  io.set_data(newdata);
930  io.set_size(newsize);
931  } else if (io.size() == 0) {
932  front++;
933  } else {
934  output.push_back(io);
935  chunksize -= io.size();
936  front++;
937  }
938  }
939 }
940 
941 static void consumeChunkBack(size_t front,
942  std::vector<IOPosBuffer> &input,
943  std::vector<IOPosBuffer> &output,
944  IOSize chunksize) {
945  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
946  IOPosBuffer &io = input.back();
947  IOPosBuffer &outio = output.back();
948  if (io.size() > chunksize) {
949  IOSize consumed;
950  if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
951  (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
952  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
953  consumed = (XRD_CL_MAX_CHUNK - outio.size());
954  outio.set_size(XRD_CL_MAX_CHUNK);
955  } else {
956  consumed = chunksize;
957  outio.set_size(outio.size() + consumed);
958  }
959  } else {
960  consumed = chunksize;
961  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
962  }
963  chunksize -= consumed;
964  IOSize newsize = io.size() - consumed;
965  IOOffset newoffset = io.offset() + consumed;
966  void *newdata = static_cast<char *>(io.data()) + consumed;
967  io.set_offset(newoffset);
968  io.set_data(newdata);
969  io.set_size(newsize);
970  } else if (io.size() == 0) {
971  input.pop_back();
972  } else {
973  output.push_back(io);
974  chunksize -= io.size();
975  input.pop_back();
976  }
977  }
978 }
979 
980 static IOSize validateList(const std::vector<IOPosBuffer> req) {
981  IOSize total = 0;
982  off_t last_offset = -1;
983  for (const auto &it : req) {
984  total += it.size();
985  assert(it.offset() > last_offset);
986  last_offset = it.offset();
987  assert(it.size() <= XRD_CL_MAX_CHUNK);
988  assert(it.offset() < 0x1ffffffffff);
989  }
990  assert(req.size() <= 1024);
991  return total;
992 }
993 
994 void XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist,
995  std::vector<IOPosBuffer> &req1,
996  std::vector<IOPosBuffer> &req2,
997  std::vector<std::shared_ptr<Source>> const &activeSources) const {
998  if (iolist.empty())
999  return;
1000  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
1001  req1.reserve(iolist.size() / 2 + 1);
1002  req2.reserve(iolist.size() / 2 + 1);
1003  size_t front = 0;
1004 
1005  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
1006  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
1007  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
1008  IOSize chunk1, chunk2;
1009  // Make sure the chunk size is at least 1024; little point to reads less than that size.
1010  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
1011  static_cast<IOSize>(1024));
1012  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
1013  static_cast<IOSize>(1024));
1014 
1015  IOSize size_orig = 0;
1016  for (const auto &it : iolist)
1017  size_orig += it.size();
1018 
1019  while (tmp_iolist.size() - front > 0) {
1020  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
1021  (req2.size() >=
1022  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
1023  // are passed to the request manager. However, because we have a max chunk size, we increase
1024  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
1025  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
1026  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
1028  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
1029  << ", permissions=0" << std::oct << m_perms << std::dec
1030  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
1031  "reported to CMSSW developers.";
1032  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
1033  addConnections(ex);
1034  std::stringstream ss;
1035  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
1036  ex.addAdditionalInfo(ss.str());
1037  std::stringstream ss2;
1038  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
1039  ex.addAdditionalInfo(ss2.str());
1040  throw ex;
1041  }
1042  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
1043  consumeChunkFront(front, tmp_iolist, req1, chunk1);
1044  }
1045  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
1046  consumeChunkBack(front, tmp_iolist, req2, chunk2);
1047  }
1048  }
1049  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
1050  return left.offset() < right.offset();
1051  });
1052  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
1053  return left.offset() < right.offset();
1054  });
1055 
1056  IOSize size1 = validateList(req1);
1057  IOSize size2 = validateList(req2);
1058 
1059  assert(size_orig == size1 + size2);
1060 
1061  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
1062  << " bytes) split into requests size " << req1.size() << " (" << size1
1063  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
1064 }
1065 
1066 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager) : m_manager(manager) {}
1067 
1068 // Cannot use ~OpenHandler=default as XrdCl::File is not fully
1069 // defined in the header.
1071 
1073  XrdCl::AnyObject *,
1074  XrdCl::HostList *hostList_ptr) {
1075  // Make sure we get rid of the strong self-reference when the callback finishes.
1076  std::shared_ptr<OpenHandler> self = m_self;
1077  m_self.reset();
1078 
1079  // NOTE: as in XrdCl::File (synchronous), we ignore the response object.
1080  // Make sure that we set m_outstanding_open to false on exit from this function.
1081  // NOTE: we need to pass non-nullptr to unique_ptr in order for the guard to run
1082  std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> outstanding_guard(
1083  this, [&](OpenHandler *) { m_outstanding_open = false; });
1084 
1085  std::shared_ptr<Source> source;
1086  std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
1087  std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1088  tracerouteRedirections(hostList.get());
1089  auto manager = m_manager.lock();
1090  // Manager object has already been deleted. Cleanup the
1091  // response objects, remove our self-reference, and ignore the response.
1092  if (!manager) {
1093  return;
1094  }
1095  //if we need to delete the File object we must do it outside
1096  // of the lock to avoid a potential deadlock
1097  std::unique_ptr<XrdCl::File> releaseFile;
1098  {
1099  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1100 
1101  if (status->IsOK()) {
1102  SendMonitoringInfo(*m_file);
1103  timespec now;
1105 
1106  std::string excludeString;
1107  Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1108 
1109  source.reset(new Source(now, std::move(m_file), excludeString));
1110  m_promise.set_value(source);
1111  } else {
1112  releaseFile = std::move(m_file);
1114  ex << "XrdCl::File::Open(name='" << manager->m_name << "', flags=0x" << std::hex << manager->m_flags
1115  << ", permissions=0" << std::oct << manager->m_perms << std::dec << ") => error '" << status->ToStr()
1116  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1117  ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1118  manager->addConnections(ex);
1119  m_promise.set_exception(std::make_exception_ptr(ex));
1120  }
1121  }
1122  manager->handleOpen(*status, source);
1123 }
1124 
1126  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1127 
1128  if (!m_file.get()) {
1129  return "(no open in progress)";
1130  }
1131  std::string dataServer;
1132  m_file->GetProperty("DataServer", dataServer);
1133  if (dataServer.empty()) {
1134  return "(unknown source)";
1135  }
1136  return dataServer;
1137 }
1138 
1139 std::shared_future<std::shared_ptr<Source>> XrdAdaptor::RequestManager::OpenHandler::open() {
1140  auto manager_ptr = m_manager.lock();
1141  if (!manager_ptr) {
1143  ex << "XrdCl::File::Open() =>"
1144  << " error: OpenHandler called within an invalid RequestManager context."
1145  << " This is a logic error and should be reported to the CMSSW developers.";
1146  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1147  throw ex;
1148  }
1149  RequestManager &manager = *manager_ptr;
1150  auto self_ptr = m_self_weak.lock();
1151  if (!self_ptr) {
1153  ex << "XrdCl::File::Open() => error: "
1154  << "OpenHandler called after it was deleted. This is a logic error "
1155  << "and should be reported to the CMSSW developers.";
1156  ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1157  throw ex;
1158  }
1159 
1160  // NOTE NOTE: we look at this variable *without* the lock. This means the method
1161  // is not thread-safe; the caller is responsible to verify it is not called from
1162  // multiple threads simultaneously.
1163  //
1164  // This is done because ::open may be called from a Xrootd callback; if we
1165  // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1166  // and make a call into xrootd (when it invokes m_file.reset()). Hence, our callback
1167  // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1168  // an Xrootd mutex and tries to hold m_mutex. This is a classic deadlock.
1169  if (m_outstanding_open) {
1170  return m_shared_future;
1171  }
1172  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1173  std::promise<std::shared_ptr<Source>> new_promise;
1174  m_promise.swap(new_promise);
1175  m_shared_future = m_promise.get_future().share();
1176 
1177  auto opaque = manager.prepareOpaqueString();
1178  std::string new_name = manager.m_name + ((manager.m_name.find('?') == manager.m_name.npos) ? "?" : "&") + opaque;
1179  edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1180  m_file = std::make_unique<XrdCl::File>();
1181  m_outstanding_open = true;
1182 
1183  // Always make sure we release m_file and set m_outstanding_open to false on error.
1184  std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> exit_guard(this, [&](OpenHandler *) {
1185  m_outstanding_open = false;
1186  m_file.reset();
1187  });
1188 
1189  XrdCl::XRootDStatus status;
1190  if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK()) {
1192  ex << "XrdCl::File::Open(name='" << new_name << "', flags=0x" << std::hex << manager.m_flags << ", permissions=0"
1193  << std::oct << manager.m_perms << std::dec << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
1194  << ", code=" << status.code << ")";
1195  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1196  manager.addConnections(ex);
1197  throw ex;
1198  }
1199  exit_guard.release();
1200  // Have a strong self-reference for as long as the callback is in-progress.
1201  m_self = self_ptr;
1202  return m_shared_future;
1203 }
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
Log< level::Info, true > LogVerbatim
int64_t IOOffset
Definition: IOTypes.h:20
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_future< std::shared_ptr< Source > > open()
double seconds()
void start()
Definition: CPUTimer.cc:68
string host
Definition: query.py:115
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
uint32_t ID
Definition: Definitions.h:24
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:290
#define LIKELY(x)
Definition: Likely.h:20
std::string to_string(const V &value)
Definition: OMSAccess.h:77
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
static constexpr int XRD_CL_MAX_CHUNK
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:56
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
int timeout
Definition: mps_check.py:53
assert(be >=bs)
std::string prepareOpaqueString() const
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:50
IOOffset offset() const
Definition: IOPosBuffer.h:41
static std::string const input
Definition: EdmProvDump.cc:50
Definition: query.py:1
static void SendMonitoringInfo(XrdCl::File &file)
OpenHandler(const OpenHandler &)=delete
string quality
void queueUpdateCurrentServer(const std::string &)
long long timeDiffMS(const timespec &a, const timespec &b)
static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:248
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:258
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
void addConnections(cms::Exception &) const
Times stop()
Definition: CPUTimer.cc:87
void clearMessage()
Definition: Exception.cc:159
std::shared_ptr< XrdCl::File > getActiveFile() const
std::atomic< std::string * > m_serverToAdvertise
std::shared_ptr< OpenHandler > m_open_handler
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void clearContext()
Definition: Exception.cc:161
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
Log< level::Info, false > LogInfo
size_t IOSize
Definition: IOTypes.h:15
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:118
void addContext(std::string const &context)
Definition: Exception.cc:165
static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY
static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:53
void getDisabledSourceNames(std::vector< std::string > &sources) const
double a
Definition: hdecay.h:119
std::shared_ptr< Source > pickSingleSource()
static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD
oneapi::tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:328
static IOSize validateList(const std::vector< IOPosBuffer > req)
edm::storage::IOSize IOSize
bool isAvailable() const
Definition: Service.h:40
Definition: output.py:1
XrdCl::FileSystem & fs()
Log< level::Warning, false > LogWarning
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
SendMonitoringInfoHandler(const std::string &url)
tmp
align.sh
Definition: createJobs.py:716
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:221
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
static std::string const source
Definition: EdmProvDump.cc:49
void clearAdditionalInfo()
Definition: Exception.cc:163
RequestManager(const RequestManager &)=delete
void getActiveSourceNames(std::vector< std::string > &sources) const
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)