CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
XrdRequestManager.cc
Go to the documentation of this file.
1 
2 #include <algorithm>
3 #include <cassert>
4 #include <iostream>
5 #include <memory>
6 
7 #include <netdb.h>
8 
9 #include "XrdCl/XrdClFile.hh"
10 #include "XrdCl/XrdClDefaultEnv.hh"
11 #include "XrdCl/XrdClFileSystem.hh"
12 
20 
21 #include "XrdStatistics.h"
23 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
24 
25 static constexpr int XRD_CL_MAX_CHUNK = 512 * 1024;
26 
27 static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY = 5;
28 
29 #ifdef XRD_FAKE_OPEN_PROBE
30 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 100;
31 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 20;
32 // This is the minimal difference in quality required to swap an active and inactive source
33 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 0;
34 #else
35 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 10;
36 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 2 * 60;
37 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 100;
38 #endif
39 
40 static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD = 1000;
41 
42 #ifdef __MACH__
43 #include <mach/clock.h>
44 #include <mach/mach.h>
45 #define GET_CLOCK_MONOTONIC(ts) \
46  { \
47  clock_serv_t cclock; \
48  mach_timespec_t mts; \
49  host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
50  clock_get_time(cclock, &mts); \
51  mach_port_deallocate(mach_task_self(), cclock); \
52  ts.tv_sec = mts.tv_sec; \
53  ts.tv_nsec = mts.tv_nsec; \
54  }
55 #else
56 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
57 #endif
58 
59 using namespace XrdAdaptor;
60 using namespace edm::storage;
61 
62 long long timeDiffMS(const timespec &a, const timespec &b) {
63  long long diff = (a.tv_sec - b.tv_sec) * 1000;
64  diff += (a.tv_nsec - b.tv_nsec) / 1e6;
65  return diff;
66 }
67 
68 /*
69  * We do not care about the response of sending the monitoring information;
70  * this handler class simply frees any returned buffer to prevent memory leaks.
71  */
72 class SendMonitoringInfoHandler : public XrdCl::ResponseHandler {
73  void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
74  if (response) {
75  XrdCl::Buffer *buffer = nullptr;
76  response->Get(buffer);
77  response->Set(static_cast<int *>(nullptr));
78  delete buffer;
79  }
80  // Send Info has a response object; we must delete it.
81  delete response;
82  delete status;
83  }
84 
85 public:
87  SendMonitoringInfoHandler &operator=(const SendMonitoringInfoHandler &) = delete;
88  SendMonitoringInfoHandler() = default;
89 };
90 
92 
94  // Do not send this to a dCache data server as they return an error.
95  // In some versions of dCache, sending the monitoring information causes
96  // the server to close the connection - resulting in failures.
97  if (Source::isDCachePool(file)) {
98  return;
99  }
100 
101  // Send the monitoring info, if available.
103  std::string lastUrl;
104  file.GetProperty("LastURL", lastUrl);
105  if (jobId && !lastUrl.empty()) {
106  XrdCl::URL url(lastUrl);
107  XrdCl::FileSystem fs(url);
108  if (!(fs.SendInfo(jobId, &nullHandler, 30).IsOK())) {
109  edm::LogWarning("XrdAdaptorInternal")
110  << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
111  }
112  edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
113  }
114 }
115 
116 RequestManager::RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
117  : m_serverToAdvertise(nullptr),
118  m_timeout(XRD_DEFAULT_TIMEOUT),
119  m_nextInitialSourceToggle(false),
120  m_name(filename),
121  m_flags(flags),
122  m_perms(perms),
123  m_distribution(0, 100),
124  m_excluded_active_count(0) {}
125 
126 void RequestManager::initialize(std::weak_ptr<RequestManager> self) {
128 
129  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
130  if (env) {
131  env->GetInt("StreamErrorWindow", m_timeout);
132  }
133 
134  std::string orig_site;
135  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find('.') == std::string::npos)) {
136  std::string hostname;
137  if (Source::getHostname(orig_site, hostname)) {
138  Source::getDomain(hostname, orig_site);
139  }
140  }
141 
142  std::unique_ptr<XrdCl::File> file;
144  bool validFile = false;
145  const int retries = 5;
146  std::string excludeString;
147  for (int idx = 0; idx < retries; idx++) {
148  file = std::make_unique<XrdCl::File>();
149  auto opaque = prepareOpaqueString();
150  std::string new_filename =
151  m_name + (!opaque.empty() ? ((m_name.find('?') == m_name.npos) ? "?" : "&") + opaque : "");
152  SyncHostResponseHandler handler;
153  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
154  if (!openStatus
155  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
156  // server and it was marked bad - xrootd couldn't even queue up the request internally!
157  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
158  // redirector being down or authentication failures.
159  ex.clearMessage();
160  ex.clearContext();
161  ex.clearAdditionalInfo();
162  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
163  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
164  << ", code=" << openStatus.code << ")";
165  ex.addContext("Calling XrdFile::open()");
166  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
167  throw ex;
168  }
169  handler.WaitForResponse();
170  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
171  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
172  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
173  assert(status);
174  if (status->IsOK()) {
175  validFile = true;
176  break;
177  } else {
178  ex.clearMessage();
179  ex.clearContext();
180  ex.clearAdditionalInfo();
181  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
182  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
183  << ", code=" << status->code << ")";
184  ex.addContext("Calling XrdFile::open()");
185  addConnections(ex);
186  std::string dataServer, lastUrl;
187  file->GetProperty("DataServer", dataServer);
188  file->GetProperty("LastURL", lastUrl);
189  if (!dataServer.empty()) {
190  ex.addAdditionalInfo("Problematic data server: " + dataServer);
191  }
192  if (!lastUrl.empty()) {
193  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
194  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
195  }
196  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
197  m_disabledSourceStrings.end()) {
198  ex << ". No additional data servers were found.";
199  throw ex;
200  }
201  if (!dataServer.empty()) {
202  m_disabledSourceStrings.insert(dataServer);
203  m_disabledExcludeStrings.insert(excludeString);
204  }
205  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
206  if (lastUrl == new_filename) {
207  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
208  throw ex;
209  }
210  }
211  }
212  if (!validFile) {
213  throw ex;
214  }
215  SendMonitoringInfo(*file);
216 
217  timespec ts;
219 
220  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
221  {
222  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
223  auto oldList = m_activeSources;
224  m_activeSources.push_back(source);
226  }
229 
230  m_lastSourceCheck = ts;
231  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
233 }
234 
243  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
244  // a pending update. *However*, since we call this for every read, we'll get it
245  // eventually.
246  if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
247  return;
248  }
249  std::string *hostname_ptr;
250  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
251  std::unique_ptr<std::string> hostname(hostname_ptr);
253  if (statsService.isAvailable()) {
254  statsService->setCurrentServer(m_name, *hostname_ptr);
255  }
256  }
257 }
258 
260  auto hostname = std::make_unique<std::string>(id);
261  if (Source::getHostname(id, *hostname)) {
262  std::string *null_hostname = nullptr;
263  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
264  hostname.release();
265  }
266  }
267 }
268 
269 namespace {
270  std::string formatSites(std::vector<std::shared_ptr<Source>> const &iSources) {
271  std::string siteA, siteB;
272  if (!iSources.empty()) {
273  siteA = iSources[0]->Site();
274  }
275  if (iSources.size() == 2) {
276  siteB = iSources[1]->Site();
277  }
278  std::string siteList = siteA;
279  if (!siteB.empty() && (siteB != siteA)) {
280  siteList = siteA + ", " + siteB;
281  }
282  return siteList;
283  }
284 } // namespace
285 
286 void RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
287  std::vector<std::shared_ptr<Source>> const &iNew,
288  std::string orig_site) const {
289  auto siteList = formatSites(iNew);
290  if (!orig_site.empty() && (orig_site != siteList)) {
291  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
292  } else {
293  auto oldSites = formatSites(iOld);
294  if (orig_site.empty() && (siteList != oldSites)) {
295  if (!oldSites.empty())
296  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
297  }
298  }
299 }
300 
302  IOSize requestSize,
303  std::vector<std::shared_ptr<Source>> &activeSources,
304  std::vector<std::shared_ptr<Source>> &inactiveSources) {
305  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
306  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
307  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
308  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
309  { // Be more aggressive about getting rid of very bad sources.
310  compareSources(now, 0, 1, activeSources, inactiveSources);
311  compareSources(now, 1, 0, activeSources, inactiveSources);
312  }
313  if (timeDiffMS(now, m_nextActiveSourceCheck) > 0) {
314  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
315  }
316  }
317 }
318 
319 bool RequestManager::compareSources(const timespec &now,
320  unsigned a,
321  unsigned b,
322  std::vector<std::shared_ptr<Source>> &activeSources,
323  std::vector<std::shared_ptr<Source>> &inactiveSources) const {
324  if (activeSources.size() < std::max(a, b) + 1) {
325  return false;
326  }
327 
328  bool findNewSource = false;
329  if ((activeSources[a]->getQuality() > 5130) ||
330  ((activeSources[a]->getQuality() > 260) &&
331  (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
332  edm::LogVerbatim("XrdAdaptorInternal")
333  << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
334  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
335  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
336  findNewSource = true;
337  }
338  activeSources[a]->setLastDowngrade(now);
339  inactiveSources.emplace_back(activeSources[a]);
340  auto oldSources = activeSources;
341  activeSources.erase(activeSources.begin() + a);
342  reportSiteChange(oldSources, activeSources);
343  }
344  return findNewSource;
345 }
346 
348  IOSize requestSize,
349  std::vector<std::shared_ptr<Source>> &activeSources,
350  std::vector<std::shared_ptr<Source>> &inactiveSources) {
351  bool findNewSource = false;
352  if (activeSources.size() <= 1) {
353  findNewSource = true;
354  } else if (activeSources.size() > 1) {
355  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
356  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
357  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
358  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
359 
360  // NOTE: We could probably replace the copy with a better sort function.
361  // However, there are typically very few sources and the correctness is more obvious right now.
362  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
363  eligibleInactiveSources.reserve(inactiveSources.size());
364  for (const auto &source : inactiveSources) {
365  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
366  eligibleInactiveSources.push_back(source);
367  }
368  }
369  auto bestInactiveSource =
370  std::min_element(eligibleInactiveSources.begin(),
371  eligibleInactiveSources.end(),
372  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
373  return s1->getQuality() < s2->getQuality();
374  });
375  auto worstActiveSource = std::max_element(activeSources.cbegin(),
376  activeSources.cend(),
377  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
378  return s1->getQuality() < s2->getQuality();
379  });
380  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
381  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
382  << ", quality " << (*bestInactiveSource)->getQuality();
383  }
384  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
385  << ", quality " << (*worstActiveSource)->getQuality();
386  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
387  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
388  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
389  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
390  auto oldSources = activeSources;
391  activeSources.push_back(*bestInactiveSource);
392  reportSiteChange(oldSources, activeSources);
393  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
394  if (it->get() == bestInactiveSource->get()) {
395  inactiveSources.erase(it);
396  break;
397  }
398  } else
399  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
400  (*worstActiveSource)->getQuality() >
401  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
402  edm::LogVerbatim("XrdAdaptorInternal")
403  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
404  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
405  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
406  (*worstActiveSource)->setLastDowngrade(now);
407  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
408  if (it->get() == bestInactiveSource->get()) {
409  inactiveSources.erase(it);
410  break;
411  }
412  inactiveSources.emplace_back(*worstActiveSource);
413  auto oldSources = activeSources;
414  activeSources.erase(worstActiveSource);
415  activeSources.emplace_back(std::move(*bestInactiveSource));
416  reportSiteChange(oldSources, activeSources);
417  eligibleInactiveSources.clear();
418  for (const auto &source : inactiveSources)
419  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
420  eligibleInactiveSources.push_back(source);
421  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
422  eligibleInactiveSources.end(),
423  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
424  return s1->getQuality() < s2->getQuality();
425  });
426  worstActiveSource = std::max_element(activeSources.begin(),
427  activeSources.end(),
428  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
429  return s1->getQuality() < s2->getQuality();
430  });
431  }
432  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
433  float r = m_distribution(m_generator);
435  findNewSource = true;
436  }
437  }
438  }
439  if (findNewSource) {
440  m_open_handler->open();
442  }
443 
444  // Only aggressively look for new sources if we don't have two.
445  if (activeSources.size() == 2) {
447  } else {
448  now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
449  }
451 }
452 
453 std::shared_ptr<XrdCl::File> RequestManager::getActiveFile() const {
454  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
455  if (m_activeSources.empty()) {
457  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
458  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
459  ex.addContext("In XrdAdaptor::RequestManager::handle()");
460  addConnections(ex);
461  throw ex;
462  }
463  return m_activeSources[0]->getFileHandle();
464 }
465 
466 void RequestManager::getActiveSourceNames(std::vector<std::string> &sources) const {
467  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
468  sources.reserve(m_activeSources.size());
469  for (auto const &source : m_activeSources) {
470  sources.push_back(source->ID());
471  }
472 }
473 
474 void RequestManager::getPrettyActiveSourceNames(std::vector<std::string> &sources) const {
475  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
476  sources.reserve(m_activeSources.size());
477  for (auto const &source : m_activeSources) {
478  sources.push_back(source->PrettyID());
479  }
480 }
481 
482 void RequestManager::getDisabledSourceNames(std::vector<std::string> &sources) const {
483  sources.reserve(m_disabledSourceStrings.size());
484  for (auto const &source : m_disabledSourceStrings) {
485  sources.push_back(source);
486  }
487 }
488 
490  std::vector<std::string> sources;
492  for (auto const &source : sources) {
493  ex.addAdditionalInfo("Active source: " + source);
494  }
495  sources.clear();
496  getDisabledSourceNames(sources);
497  for (auto const &source : sources) {
498  ex.addAdditionalInfo("Disabled source: " + source);
499  }
500 }
501 
502 std::shared_ptr<Source> RequestManager::pickSingleSource() {
503  std::shared_ptr<Source> source = nullptr;
504  {
505  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
506  if (m_activeSources.size() == 2) {
508  source = m_activeSources[0];
510  } else {
511  source = m_activeSources[1];
513  }
514  } else if (m_activeSources.empty()) {
516  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
517  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
518  ex.addContext("In XrdAdaptor::RequestManager::handle()");
519  addConnections(ex);
520  throw ex;
521  } else {
522  source = m_activeSources[0];
523  }
524  }
525  return source;
526 }
527 
528 std::future<IOSize> RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr) {
529  assert(c_ptr.get());
530  timespec now;
531  GET_CLOCK_MONOTONIC(now);
532  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
533  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
534  {
535  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
536  activeSources = m_activeSources;
537  inactiveSources = m_inactiveSources;
538  }
539  {
540  //make sure we update values before calling pickSingelSource
541  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
542  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
543  m_activeSources = std::move(activeSources);
544  m_inactiveSources = std::move(inactiveSources);
545  });
546 
547  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
548  }
549 
550  std::shared_ptr<Source> source = pickSingleSource();
551  source->handle(c_ptr);
552  return c_ptr->get_future();
553 }
554 
556  std::stringstream ss;
557  ss << "tried=";
558  size_t count = 0;
559  {
560  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
561 
562  for (const auto &it : m_activeSources) {
563  count++;
564  ss << it->ExcludeID().substr(0, it->ExcludeID().find(':')) << ",";
565  }
566  for (const auto &it : m_inactiveSources) {
567  count++;
568  ss << it->ExcludeID().substr(0, it->ExcludeID().find(':')) << ",";
569  }
570  }
571  for (const auto &it : m_disabledExcludeStrings) {
572  count++;
573  ss << it.substr(0, it.find(':')) << ",";
574  }
575  if (count) {
576  std::string tmp_str = ss.str();
577  return tmp_str.substr(0, tmp_str.size() - 1);
578  }
579  return "";
580 }
581 
582 void XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source) {
583  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
584  if (status.IsOK()) {
585  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
586  for (const auto &s : m_activeSources) {
587  if (source->ID() == s->ID()) {
588  edm::LogVerbatim("XrdAdaptorInternal")
589  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
590  unsigned returned_count = ++m_excluded_active_count;
591  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
592  if (returned_count >= 3) {
593  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2 * XRD_ADAPTOR_SHORT_OPEN_DELAY;
594  }
595  return;
596  }
597  }
598  for (const auto &s : m_inactiveSources) {
599  if (source->ID() == s->ID()) {
600  edm::LogVerbatim("XrdAdaptorInternal")
601  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
602  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
603  return;
604  }
605  }
606  if (m_activeSources.size() < 2) {
607  auto oldSources = m_activeSources;
608  m_activeSources.push_back(source);
609  reportSiteChange(oldSources, m_activeSources);
610  queueUpdateCurrentServer(source->ID());
611  } else {
612  m_inactiveSources.push_back(source);
613  }
614  } else { // File-open failure - wait at least 120s before next attempt.
615  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
616  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
617  }
618 }
619 
620 std::future<IOSize> XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist) {
621  //Use a copy of m_activeSources and m_inactiveSources throughout this function
622  // in order to avoid holding the lock a long time and causing a deadlock.
623  // When the function is over we will update the values of the containers
624  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
625  {
626  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
627  activeSources = m_activeSources;
628  inactiveSources = m_inactiveSources;
629  }
630  //Make sure we update changes when we leave the function
631  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
632  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
633  m_activeSources = std::move(activeSources);
634  m_inactiveSources = std::move(inactiveSources);
635  });
636 
637  updateCurrentServer();
638 
639  timespec now;
640  GET_CLOCK_MONOTONIC(now);
641 
642  edm::CPUTimer timer;
643  timer.start();
644 
645  if (activeSources.size() == 1) {
646  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
647  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
648  activeSources[0]->handle(c_ptr);
649  return c_ptr->get_future();
650  }
651  // Make sure active
652  else if (activeSources.empty()) {
654  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
655  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
656  ex.addContext("In XrdAdaptor::RequestManager::handle()");
657  addConnections(ex);
658  throw ex;
659  }
660 
661  assert(iolist.get());
662  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
663  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
664  splitClientRequest(*iolist, *req1, *req2, activeSources);
665 
666  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
667  // CheckSources may have removed a source
668  if (activeSources.size() == 1) {
669  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
670  activeSources[0]->handle(c_ptr);
671  return c_ptr->get_future();
672  }
673 
674  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
675  std::future<IOSize> future1, future2;
676  if (!req1->empty()) {
677  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
678  activeSources[0]->handle(c_ptr1);
679  future1 = c_ptr1->get_future();
680  }
681  if (!req2->empty()) {
682  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
683  activeSources[1]->handle(c_ptr2);
684  future2 = c_ptr2->get_future();
685  }
686  if (!req1->empty() && !req2->empty()) {
687  std::future<IOSize> task = std::async(
688  std::launch::deferred,
689  [](std::future<IOSize> a, std::future<IOSize> b) {
690  // Wait until *both* results are available. This is essential
691  // as the callback may try referencing the RequestManager. If one
692  // throws an exception (causing the RequestManager to be destroyed by
693  // XrdFile) and the other has a failure, then the recovery code will
694  // reference the destroyed RequestManager.
695  //
696  // Unlike other places where we use shared/weak ptrs to maintain object
697  // lifetime and destruction asynchronously, we *cannot* destroy the request
698  // asynchronously as it is associated with a ROOT buffer. We must wait until we
699  // are guaranteed that XrdCl will not write into the ROOT buffer before we
700  // can return.
701  b.wait();
702  a.wait();
703  return b.get() + a.get();
704  },
705  std::move(future1),
706  std::move(future2));
707  timer.stop();
708  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
709  return task;
710  } else if (!req1->empty()) {
711  return future1;
712  } else if (!req2->empty()) {
713  return future2;
714  } else { // Degenerate case - no bytes to read.
715  std::promise<IOSize> p;
716  p.set_value(0);
717  return p.get_future();
718  }
719 }
720 
721 void RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status) {
722  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
723 
724  // Fail early for invalid responses - XrdFile has a separate path for handling this.
725  if (c_status.code == XrdCl::errInvalidResponse) {
726  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
728  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
729  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
730  << ") => Invalid ReadV response from server";
731  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
732  addConnections(ex);
733  throw ex;
734  }
735  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
736 
737  // Note that we do not delete the Source itself. That is because this
738  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
739  // In such a case, if you close a file in the handler, it will deadlock
740  m_disabledSourceStrings.insert(source_ptr->ID());
741  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
742  m_disabledSources.insert(source_ptr);
743 
744  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
745  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
746  auto oldSources = m_activeSources;
747  m_activeSources.erase(m_activeSources.begin());
748  reportSiteChange(oldSources, m_activeSources);
749  } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
750  auto oldSources = m_activeSources;
751  m_activeSources.erase(m_activeSources.begin() + 1);
752  reportSiteChange(oldSources, m_activeSources);
753  }
754  std::shared_ptr<Source> new_source;
755  if (m_activeSources.empty()) {
756  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
757  timespec now;
758  GET_CLOCK_MONOTONIC(now);
760  // Note we only wait for 180 seconds here. This is because we've already failed
761  // once and the likelihood the program has some inconsistent state is decent.
762  // We'd much rather fail hard than deadlock!
763  sentry.unlock();
764  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
765  if (status == std::future_status::timeout) {
767  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
768  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
769  << ") => timeout when waiting for file open";
770  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
771  addConnections(ex);
772  throw ex;
773  } else {
774  try {
775  new_source = future.get();
776  } catch (edm::Exception &ex) {
777  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
778  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
779  throw;
780  }
781  }
782 
783  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
784  m_disabledSourceStrings.end()) {
785  // The server gave us back a data node we requested excluded. Fatal!
787  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
788  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
789  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
790  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
791  addConnections(ex);
792  throw ex;
793  }
794  sentry.lock();
795 
796  auto oldSources = m_activeSources;
797  m_activeSources.push_back(new_source);
798  reportSiteChange(oldSources, m_activeSources);
799  } else {
800  new_source = m_activeSources[0];
801  }
802  new_source->handle(c_ptr);
803 }
804 
805 static void consumeChunkFront(size_t &front,
806  std::vector<IOPosBuffer> &input,
807  std::vector<IOPosBuffer> &output,
808  IOSize chunksize) {
809  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
810  IOPosBuffer &io = input[front];
811  IOPosBuffer &outio = output.back();
812  if (io.size() > chunksize) {
813  IOSize consumed;
814  if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
815  (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
816  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
817  consumed = (XRD_CL_MAX_CHUNK - outio.size());
818  outio.set_size(XRD_CL_MAX_CHUNK);
819  } else {
820  consumed = chunksize;
821  outio.set_size(outio.size() + consumed);
822  }
823  } else {
824  consumed = chunksize;
825  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
826  }
827  chunksize -= consumed;
828  IOSize newsize = io.size() - consumed;
829  IOOffset newoffset = io.offset() + consumed;
830  void *newdata = static_cast<char *>(io.data()) + consumed;
831  io.set_offset(newoffset);
832  io.set_data(newdata);
833  io.set_size(newsize);
834  } else if (io.size() == 0) {
835  front++;
836  } else {
837  output.push_back(io);
838  chunksize -= io.size();
839  front++;
840  }
841  }
842 }
843 
844 static void consumeChunkBack(size_t front,
845  std::vector<IOPosBuffer> &input,
846  std::vector<IOPosBuffer> &output,
847  IOSize chunksize) {
848  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
849  IOPosBuffer &io = input.back();
850  IOPosBuffer &outio = output.back();
851  if (io.size() > chunksize) {
852  IOSize consumed;
853  if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
854  (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
855  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
856  consumed = (XRD_CL_MAX_CHUNK - outio.size());
857  outio.set_size(XRD_CL_MAX_CHUNK);
858  } else {
859  consumed = chunksize;
860  outio.set_size(outio.size() + consumed);
861  }
862  } else {
863  consumed = chunksize;
864  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
865  }
866  chunksize -= consumed;
867  IOSize newsize = io.size() - consumed;
868  IOOffset newoffset = io.offset() + consumed;
869  void *newdata = static_cast<char *>(io.data()) + consumed;
870  io.set_offset(newoffset);
871  io.set_data(newdata);
872  io.set_size(newsize);
873  } else if (io.size() == 0) {
874  input.pop_back();
875  } else {
876  output.push_back(io);
877  chunksize -= io.size();
878  input.pop_back();
879  }
880  }
881 }
882 
883 static IOSize validateList(const std::vector<IOPosBuffer> req) {
884  IOSize total = 0;
885  off_t last_offset = -1;
886  for (const auto &it : req) {
887  total += it.size();
888  assert(it.offset() > last_offset);
889  last_offset = it.offset();
890  assert(it.size() <= XRD_CL_MAX_CHUNK);
891  assert(it.offset() < 0x1ffffffffff);
892  }
893  assert(req.size() <= 1024);
894  return total;
895 }
896 
897 void XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist,
898  std::vector<IOPosBuffer> &req1,
899  std::vector<IOPosBuffer> &req2,
900  std::vector<std::shared_ptr<Source>> const &activeSources) const {
901  if (iolist.empty())
902  return;
903  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
904  req1.reserve(iolist.size() / 2 + 1);
905  req2.reserve(iolist.size() / 2 + 1);
906  size_t front = 0;
907 
908  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
909  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
910  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
911  IOSize chunk1, chunk2;
912  // Make sure the chunk size is at least 1024; little point to reads less than that size.
913  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
914  static_cast<IOSize>(1024));
915  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
916  static_cast<IOSize>(1024));
917 
918  IOSize size_orig = 0;
919  for (const auto &it : iolist)
920  size_orig += it.size();
921 
922  while (tmp_iolist.size() - front > 0) {
923  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
924  (req2.size() >=
925  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
926  // are passed to the request manager. However, because we have a max chunk size, we increase
927  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
928  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
929  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
931  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
932  << ", permissions=0" << std::oct << m_perms << std::dec
933  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
934  "reported to CMSSW developers.";
935  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
936  addConnections(ex);
937  std::stringstream ss;
938  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
939  ex.addAdditionalInfo(ss.str());
940  std::stringstream ss2;
941  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
942  ex.addAdditionalInfo(ss2.str());
943  throw ex;
944  }
945  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
946  consumeChunkFront(front, tmp_iolist, req1, chunk1);
947  }
948  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
949  consumeChunkBack(front, tmp_iolist, req2, chunk2);
950  }
951  }
952  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
953  return left.offset() < right.offset();
954  });
955  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
956  return left.offset() < right.offset();
957  });
958 
959  IOSize size1 = validateList(req1);
960  IOSize size2 = validateList(req2);
961 
962  assert(size_orig == size1 + size2);
963 
964  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
965  << " bytes) split into requests size " << req1.size() << " (" << size1
966  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
967 }
968 
969 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager) : m_manager(manager) {}
970 
971 // Cannot use ~OpenHandler=default as XrdCl::File is not fully
972 // defined in the header.
974 
976  XrdCl::AnyObject *,
977  XrdCl::HostList *hostList_ptr) {
978  // Make sure we get rid of the strong self-reference when the callback finishes.
979  std::shared_ptr<OpenHandler> self = m_self;
980  m_self.reset();
981 
982  // NOTE: as in XrdCl::File (synchronous), we ignore the response object.
983  // Make sure that we set m_outstanding_open to false on exit from this function.
984  // NOTE: we need to pass non-nullptr to unique_ptr in order for the guard to run
985  std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> outstanding_guard(
986  this, [&](OpenHandler *) { m_outstanding_open = false; });
987 
988  std::shared_ptr<Source> source;
989  std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
990  std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
991 
992  auto manager = m_manager.lock();
993  // Manager object has already been deleted. Cleanup the
994  // response objects, remove our self-reference, and ignore the response.
995  if (!manager) {
996  return;
997  }
998  //if we need to delete the File object we must do it outside
999  // of the lock to avoid a potential deadlock
1000  std::unique_ptr<XrdCl::File> releaseFile;
1001  {
1002  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1003 
1004  if (status->IsOK()) {
1005  SendMonitoringInfo(*m_file);
1006  timespec now;
1007  GET_CLOCK_MONOTONIC(now);
1008 
1009  std::string excludeString;
1010  Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1011 
1012  source.reset(new Source(now, std::move(m_file), excludeString));
1013  m_promise.set_value(source);
1014  } else {
1015  releaseFile = std::move(m_file);
1017  ex << "XrdCl::File::Open(name='" << manager->m_name << "', flags=0x" << std::hex << manager->m_flags
1018  << ", permissions=0" << std::oct << manager->m_perms << std::dec << ") => error '" << status->ToStr()
1019  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1020  ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1021  manager->addConnections(ex);
1022 
1023  m_promise.set_exception(std::make_exception_ptr(ex));
1024  }
1025  }
1026  manager->handleOpen(*status, source);
1027 }
1028 
1030  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1031 
1032  if (!m_file.get()) {
1033  return "(no open in progress)";
1034  }
1035  std::string dataServer;
1036  m_file->GetProperty("DataServer", dataServer);
1037  if (dataServer.empty()) {
1038  return "(unknown source)";
1039  }
1040  return dataServer;
1041 }
1042 
1043 std::shared_future<std::shared_ptr<Source>> XrdAdaptor::RequestManager::OpenHandler::open() {
1044  auto manager_ptr = m_manager.lock();
1045  if (!manager_ptr) {
1047  ex << "XrdCl::File::Open() =>"
1048  << " error: OpenHandler called within an invalid RequestManager context."
1049  << " This is a logic error and should be reported to the CMSSW developers.";
1050  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1051  throw ex;
1052  }
1053  RequestManager &manager = *manager_ptr;
1054  auto self_ptr = m_self_weak.lock();
1055  if (!self_ptr) {
1057  ex << "XrdCl::File::Open() => error: "
1058  << "OpenHandler called after it was deleted. This is a logic error "
1059  << "and should be reported to the CMSSW developers.";
1060  ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1061  throw ex;
1062  }
1063 
1064  // NOTE NOTE: we look at this variable *without* the lock. This means the method
1065  // is not thread-safe; the caller is responsible to verify it is not called from
1066  // multiple threads simultaneously.
1067  //
1068  // This is done because ::open may be called from a Xrootd callback; if we
1069  // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1070  // and make a call into xrootd (when it invokes m_file.reset()). Hence, our callback
1071  // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1072  // an Xrootd mutex and tries to hold m_mutex. This is a classic deadlock.
1073  if (m_outstanding_open) {
1074  return m_shared_future;
1075  }
1076  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1077  std::promise<std::shared_ptr<Source>> new_promise;
1078  m_promise.swap(new_promise);
1079  m_shared_future = m_promise.get_future().share();
1080 
1081  auto opaque = manager.prepareOpaqueString();
1082  std::string new_name = manager.m_name + ((manager.m_name.find('?') == manager.m_name.npos) ? "?" : "&") + opaque;
1083  edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1084  m_file = std::make_unique<XrdCl::File>();
1085  m_outstanding_open = true;
1086 
1087  // Always make sure we release m_file and set m_outstanding_open to false on error.
1088  std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> exit_guard(this, [&](OpenHandler *) {
1089  m_outstanding_open = false;
1090  m_file.reset();
1091  });
1092 
1093  XrdCl::XRootDStatus status;
1094  if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK()) {
1096  ex << "XrdCl::File::Open(name='" << new_name << "', flags=0x" << std::hex << manager.m_flags << ", permissions=0"
1097  << std::oct << manager.m_perms << std::dec << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
1098  << ", code=" << status.code << ")";
1099  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1100  manager.addConnections(ex);
1101  throw ex;
1102  }
1103  exit_guard.release();
1104  // Have a strong self-reference for as long as the callback is in-progress.
1105  m_self = self_ptr;
1106  return m_shared_future;
1107 }
Log< level::Info, true > LogVerbatim
int64_t IOOffset
Definition: IOTypes.h:20
static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_future< std::shared_ptr< Source > > open()
double seconds()
void start()
Definition: CPUTimer.cc:68
std::shared_ptr< XrdCl::File > getActiveFile() const
uint16_t *__restrict__ id
void getDisabledSourceNames(std::vector< std::string > &sources) const
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:288
list status
Definition: mps_update.py:107
std::string prepareOpaqueString() const
#define LIKELY(x)
Definition: Likely.h:20
void * data() const
Definition: IOPosBuffer.h:44
static constexpr int XRD_CL_MAX_CHUNK
IOSize size() const
Definition: IOPosBuffer.h:47
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:56
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
int timeout
Definition: mps_check.py:53
assert(be >=bs)
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:50
SendMonitoringInfoHandler nullHandler
static std::string const input
Definition: EdmProvDump.cc:47
static void SendMonitoringInfo(XrdCl::File &file)
OpenHandler(const OpenHandler &)=delete
string URL
Definition: hcal_runs.py:4
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
void queueUpdateCurrentServer(const std::string &)
SiPixelHitStatus Status
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:246
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:256
def move
Definition: eostools.py:511
bool isAvailable() const
Definition: Service.h:40
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
XrdSiteStatisticsInformation * statsService
Definition: XrdSource.cc:213
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
#define CMS_THREAD_SAFE
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
Times stop()
Definition: CPUTimer.cc:87
void clearMessage()
Definition: Exception.cc:159
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::atomic< std::string * > m_serverToAdvertise
std::shared_ptr< OpenHandler > m_open_handler
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void clearContext()
Definition: Exception.cc:161
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void getActiveSourceNames(std::vector< std::string > &sources) const
Log< level::Info, false > LogInfo
size_t IOSize
Definition: IOTypes.h:15
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:118
void addContext(std::string const &context)
Definition: Exception.cc:165
static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY
static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:53
double a
Definition: hdecay.h:119
std::shared_ptr< Source > pickSingleSource()
tuple filename
Definition: lut2db_cfg.py:20
static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:326
static IOSize validateList(const std::vector< IOPosBuffer > req)
edm::storage::IOSize IOSize
Log< level::Warning, false > LogWarning
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
IOOffset offset() const
Definition: IOPosBuffer.h:41
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:219
static std::string const source
Definition: EdmProvDump.cc:46
void clearAdditionalInfo()
Definition: Exception.cc:163
RequestManager(const RequestManager &)=delete
std::recursive_mutex m_source_mutex
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)