CMS 3D CMS Logo

XrdRequestManager.cc
Go to the documentation of this file.
1 
2 #include <cassert>
3 #include <iostream>
4 #include <algorithm>
5 #include <netdb.h>
6 
7 #include "XrdCl/XrdClFile.hh"
8 #include "XrdCl/XrdClDefaultEnv.hh"
9 #include "XrdCl/XrdClFileSystem.hh"
10 
18 
19 #include "XrdStatistics.h"
21 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
22 
23 #define XRD_CL_MAX_CHUNK 512 * 1024
24 
25 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5
26 
27 #ifdef XRD_FAKE_OPEN_PROBE
28 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100
29 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20
30 // This is the minimal difference in quality required to swap an active and inactive source
31 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0
32 #else
33 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10
34 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2 * 60
35 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
36 #endif
37 
38 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000
39 
40 #ifdef __MACH__
41 #include <mach/clock.h>
42 #include <mach/mach.h>
43 #define GET_CLOCK_MONOTONIC(ts) \
44  { \
45  clock_serv_t cclock; \
46  mach_timespec_t mts; \
47  host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
48  clock_get_time(cclock, &mts); \
49  mach_port_deallocate(mach_task_self(), cclock); \
50  ts.tv_sec = mts.tv_sec; \
51  ts.tv_nsec = mts.tv_nsec; \
52  }
53 #else
54 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
55 #endif
56 
57 using namespace XrdAdaptor;
58 
59 long long timeDiffMS(const timespec &a, const timespec &b) {
60  long long diff = (a.tv_sec - b.tv_sec) * 1000;
61  diff += (a.tv_nsec - b.tv_nsec) / 1e6;
62  return diff;
63 }
64 
65 /*
66  * We do not care about the response of sending the monitoring information;
67  * this handler class simply frees any returned buffer to prevent memory leaks.
68  */
69 class SendMonitoringInfoHandler : boost::noncopyable, public XrdCl::ResponseHandler {
70  void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
71  if (response) {
72  XrdCl::Buffer *buffer = nullptr;
73  response->Get(buffer);
74  response->Set(static_cast<int *>(nullptr));
75  delete buffer;
76  }
77  // Send Info has a response object; we must delete it.
78  delete response;
79  delete status;
80  delete this;
81  }
82 
83  XrdCl::FileSystem m_fs;
84 
85 public:
87  SendMonitoringInfoHandler &operator=(const SendMonitoringInfoHandler &) = delete;
88  SendMonitoringInfoHandler() = default;
89 
90  SendMonitoringInfoHandler(const std::string &url) : m_fs(url) {}
91 
92  XrdCl::FileSystem& fs() { return m_fs; }
93 };
94 
96  // Do not send this to a dCache data server as they return an error.
97  // In some versions of dCache, sending the monitoring information causes
98  // the server to close the connection - resulting in failures.
99  if (Source::isDCachePool(file)) {
100  return;
101  }
102 
103  // Send the monitoring info, if available.
105  std::string lastUrl;
106  file.GetProperty("LastURL", lastUrl);
107  if (jobId && !lastUrl.empty()) {
108  auto sm_handler = new SendMonitoringInfoHandler(lastUrl);
109  if (!(sm_handler->fs().SendInfo(jobId, sm_handler, 30).IsOK())) {
110  edm::LogWarning("XrdAdaptorInternal")
111  << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
112  delete sm_handler;
113  }
114  edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
115  }
116 }
117 
119  : m_serverToAdvertise(nullptr),
120  m_timeout(XRD_DEFAULT_TIMEOUT),
121  m_nextInitialSourceToggle(false),
122  m_name(filename),
123  m_flags(flags),
124  m_perms(perms),
125  m_distribution(0, 100),
126  m_excluded_active_count(0) {}
127 
128 void RequestManager::initialize(std::weak_ptr<RequestManager> self) {
130 
131  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
132  if (env) {
133  env->GetInt("StreamErrorWindow", m_timeout);
134  }
135 
136  std::string orig_site;
137  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos)) {
138  std::string hostname;
139  if (Source::getHostname(orig_site, hostname)) {
140  Source::getDomain(hostname, orig_site);
141  }
142  }
143 
144  std::unique_ptr<XrdCl::File> file;
146  bool validFile = false;
147  const int retries = 5;
148  std::string excludeString;
149  for (int idx = 0; idx < retries; idx++) {
150  file.reset(new XrdCl::File());
151  auto opaque = prepareOpaqueString();
152  std::string new_filename =
153  m_name + (!opaque.empty() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
154  SyncHostResponseHandler handler;
155  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
156  if (!openStatus
157  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
158  // server and it was marked bad - xrootd couldn't even queue up the request internally!
159  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
160  // redirector being down or authentication failures.
161  ex.clearMessage();
162  ex.clearContext();
163  ex.clearAdditionalInfo();
164  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
165  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
166  << ", code=" << openStatus.code << ")";
167  ex.addContext("Calling XrdFile::open()");
168  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
169  throw ex;
170  }
171  handler.WaitForResponse();
172  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
173  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
174  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
175  assert(status);
176  if (status->IsOK()) {
177  validFile = true;
178  break;
179  } else {
180  ex.clearMessage();
181  ex.clearContext();
182  ex.clearAdditionalInfo();
183  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
184  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
185  << ", code=" << status->code << ")";
186  ex.addContext("Calling XrdFile::open()");
187  addConnections(ex);
188  std::string dataServer, lastUrl;
189  file->GetProperty("DataServer", dataServer);
190  file->GetProperty("LastURL", lastUrl);
191  if (!dataServer.empty()) {
192  ex.addAdditionalInfo("Problematic data server: " + dataServer);
193  }
194  if (!lastUrl.empty()) {
195  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
196  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
197  }
198  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
199  m_disabledSourceStrings.end()) {
200  ex << ". No additional data servers were found.";
201  throw ex;
202  }
203  if (!dataServer.empty()) {
204  m_disabledSourceStrings.insert(dataServer);
205  m_disabledExcludeStrings.insert(excludeString);
206  }
207  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
208  if (lastUrl == new_filename) {
209  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
210  throw ex;
211  }
212  }
213  }
214  if (!validFile) {
215  throw ex;
216  }
217  SendMonitoringInfo(*file);
218 
219  timespec ts;
221 
222  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
223  {
224  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
225  auto oldList = m_activeSources;
226  m_activeSources.push_back(source);
228  }
231 
232  m_lastSourceCheck = ts;
233  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
235 }
236 
245  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
246  // a pending update. *However*, since we call this for every read, we'll get it
247  // eventually.
248  if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
249  return;
250  }
251  std::string *hostname_ptr;
252  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
253  std::unique_ptr<std::string> hostname(hostname_ptr);
255  if (statsService.isAvailable()) {
256  statsService->setCurrentServer(m_name, *hostname_ptr);
257  }
258  }
259 }
260 
262  auto hostname = std::make_unique<std::string>(id);
263  if (Source::getHostname(id, *hostname)) {
264  std::string *null_hostname = nullptr;
265  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
266  hostname.release();
267  }
268  }
269 }
270 
271 namespace {
272  std::string formatSites(std::vector<std::shared_ptr<Source>> const &iSources) {
273  std::string siteA, siteB;
274  if (!iSources.empty()) {
275  siteA = iSources[0]->Site();
276  }
277  if (iSources.size() == 2) {
278  siteB = iSources[1]->Site();
279  }
280  std::string siteList = siteA;
281  if (!siteB.empty() && (siteB != siteA)) {
282  siteList = siteA + ", " + siteB;
283  }
284  return siteList;
285  }
286 } // namespace
287 
288 void RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
289  std::vector<std::shared_ptr<Source>> const &iNew,
290  std::string orig_site) const {
291  auto siteList = formatSites(iNew);
292  if (!orig_site.empty() && (orig_site != siteList)) {
293  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
294  } else {
295  auto oldSites = formatSites(iOld);
296  if (orig_site.empty() && (siteList != oldSites)) {
297  if (!oldSites.empty())
298  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
299  }
300  }
301 }
302 
304  IOSize requestSize,
305  std::vector<std::shared_ptr<Source>> &activeSources,
306  std::vector<std::shared_ptr<Source>> &inactiveSources) {
307  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
308  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
309  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
310  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
311  { // Be more aggressive about getting rid of very bad sources.
312  compareSources(now, 0, 1, activeSources, inactiveSources);
313  compareSources(now, 1, 0, activeSources, inactiveSources);
314  }
315  if (timeDiffMS(now, m_nextActiveSourceCheck) > 0) {
316  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
317  }
318  }
319 }
320 
321 bool RequestManager::compareSources(const timespec &now,
322  unsigned a,
323  unsigned b,
324  std::vector<std::shared_ptr<Source>> &activeSources,
325  std::vector<std::shared_ptr<Source>> &inactiveSources) const {
326  if (activeSources.size() < std::max(a, b) + 1) {
327  return false;
328  }
329 
330  bool findNewSource = false;
331  if ((activeSources[a]->getQuality() > 5130) ||
332  ((activeSources[a]->getQuality() > 260) &&
333  (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
334  edm::LogVerbatim("XrdAdaptorInternal")
335  << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
336  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
337  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
338  findNewSource = true;
339  }
340  activeSources[a]->setLastDowngrade(now);
341  inactiveSources.emplace_back(activeSources[a]);
342  auto oldSources = activeSources;
343  activeSources.erase(activeSources.begin() + a);
344  reportSiteChange(oldSources, activeSources);
345  }
346  return findNewSource;
347 }
348 
350  IOSize requestSize,
351  std::vector<std::shared_ptr<Source>> &activeSources,
352  std::vector<std::shared_ptr<Source>> &inactiveSources) {
353  bool findNewSource = false;
354  if (activeSources.size() <= 1) {
355  findNewSource = true;
356  } else if (activeSources.size() > 1) {
357  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
358  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
359  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
360  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
361 
362  // NOTE: We could probably replace the copy with a better sort function.
363  // However, there are typically very few sources and the correctness is more obvious right now.
364  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
365  eligibleInactiveSources.reserve(inactiveSources.size());
366  for (const auto &source : inactiveSources) {
367  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
368  eligibleInactiveSources.push_back(source);
369  }
370  }
371  auto bestInactiveSource =
372  std::min_element(eligibleInactiveSources.begin(),
373  eligibleInactiveSources.end(),
374  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
375  return s1->getQuality() < s2->getQuality();
376  });
377  auto worstActiveSource = std::max_element(activeSources.cbegin(),
378  activeSources.cend(),
379  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
380  return s1->getQuality() < s2->getQuality();
381  });
382  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
383  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
384  << ", quality " << (*bestInactiveSource)->getQuality();
385  }
386  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
387  << ", quality " << (*worstActiveSource)->getQuality();
388  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
389  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
390  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
391  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
392  auto oldSources = activeSources;
393  activeSources.push_back(*bestInactiveSource);
394  reportSiteChange(oldSources, activeSources);
395  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
396  if (it->get() == bestInactiveSource->get()) {
397  inactiveSources.erase(it);
398  break;
399  }
400  } else
401  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
402  (*worstActiveSource)->getQuality() >
403  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
404  edm::LogVerbatim("XrdAdaptorInternal")
405  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
406  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
407  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
408  (*worstActiveSource)->setLastDowngrade(now);
409  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
410  if (it->get() == bestInactiveSource->get()) {
411  inactiveSources.erase(it);
412  break;
413  }
414  inactiveSources.emplace_back(std::move(*worstActiveSource));
415  auto oldSources = activeSources;
416  activeSources.erase(worstActiveSource);
417  activeSources.emplace_back(std::move(*bestInactiveSource));
418  reportSiteChange(oldSources, activeSources);
419  eligibleInactiveSources.clear();
420  for (const auto &source : inactiveSources)
421  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
422  eligibleInactiveSources.push_back(source);
423  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
424  eligibleInactiveSources.end(),
425  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
426  return s1->getQuality() < s2->getQuality();
427  });
428  worstActiveSource = std::max_element(activeSources.begin(),
429  activeSources.end(),
430  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
431  return s1->getQuality() < s2->getQuality();
432  });
433  }
434  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
435  float r = m_distribution(m_generator);
437  findNewSource = true;
438  }
439  }
440  }
441  if (findNewSource) {
442  m_open_handler->open();
444  }
445 
446  // Only aggressively look for new sources if we don't have two.
447  if (activeSources.size() == 2) {
449  } else {
450  now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
451  }
453 }
454 
455 std::shared_ptr<XrdCl::File> RequestManager::getActiveFile() const {
456  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
457  if (m_activeSources.empty()) {
459  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
460  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
461  ex.addContext("In XrdAdaptor::RequestManager::handle()");
462  addConnections(ex);
463  throw ex;
464  }
465  return m_activeSources[0]->getFileHandle();
466 }
467 
468 void RequestManager::getActiveSourceNames(std::vector<std::string> &sources) const {
469  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
470  sources.reserve(m_activeSources.size());
471  for (auto const &source : m_activeSources) {
472  sources.push_back(source->ID());
473  }
474 }
475 
476 void RequestManager::getPrettyActiveSourceNames(std::vector<std::string> &sources) const {
477  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
478  sources.reserve(m_activeSources.size());
479  for (auto const &source : m_activeSources) {
480  sources.push_back(source->PrettyID());
481  }
482 }
483 
484 void RequestManager::getDisabledSourceNames(std::vector<std::string> &sources) const {
485  sources.reserve(m_disabledSourceStrings.size());
486  for (auto const &source : m_disabledSourceStrings) {
487  sources.push_back(source);
488  }
489 }
490 
492  std::vector<std::string> sources;
494  for (auto const &source : sources) {
495  ex.addAdditionalInfo("Active source: " + source);
496  }
497  sources.clear();
498  getDisabledSourceNames(sources);
499  for (auto const &source : sources) {
500  ex.addAdditionalInfo("Disabled source: " + source);
501  }
502 }
503 
504 std::shared_ptr<Source> RequestManager::pickSingleSource() {
505  std::shared_ptr<Source> source = nullptr;
506  {
507  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
508  if (m_activeSources.size() == 2) {
510  source = m_activeSources[0];
512  } else {
513  source = m_activeSources[1];
515  }
516  } else if (m_activeSources.empty()) {
518  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
519  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
520  ex.addContext("In XrdAdaptor::RequestManager::handle()");
521  addConnections(ex);
522  throw ex;
523  } else {
524  source = m_activeSources[0];
525  }
526  }
527  return source;
528 }
529 
530 std::future<IOSize> RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr) {
531  assert(c_ptr.get());
532  timespec now;
533  GET_CLOCK_MONOTONIC(now);
534  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
535  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
536  {
537  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
538  activeSources = m_activeSources;
539  inactiveSources = m_inactiveSources;
540  }
541  {
542  //make sure we update values before calling pickSingelSource
543  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
544  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
545  m_activeSources = std::move(activeSources);
546  m_inactiveSources = std::move(inactiveSources);
547  });
548 
549  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
550  }
551 
552  std::shared_ptr<Source> source = pickSingleSource();
553  source->handle(c_ptr);
554  return c_ptr->get_future();
555 }
556 
558  struct {
559  std::stringstream ss;
560  size_t count = 0;
561  bool has_active = false;
562 
563  void append_tried(const std::string &id, bool active = false) {
564  ss << (count ? "," : "tried=") << id;
565  count++;
566  if (active) {
567  has_active = true;
568  }
569  }
570  } state;
571  {
572  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
573 
574  for (const auto &it : m_activeSources) {
575  state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')), true);
576  }
577  for (const auto &it : m_inactiveSources) {
578  state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')));
579  }
580  }
581  for (const auto &it : m_disabledExcludeStrings) {
582  state.append_tried(it.substr(0, it.find(':')));
583  }
584  if (state.has_active) {
585  state.ss << "&triedrc=resel";
586  }
587 
588  return state.ss.str();
589 }
590 
591 void XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source) {
592  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
593  if (status.IsOK()) {
594  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
595  for (const auto &s : m_activeSources) {
596  if (source->ID() == s->ID()) {
597  edm::LogVerbatim("XrdAdaptorInternal")
598  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
599  unsigned returned_count = ++m_excluded_active_count;
601  if (returned_count >= 3) {
603  }
604  return;
605  }
606  }
607  for (const auto &s : m_inactiveSources) {
608  if (source->ID() == s->ID()) {
609  edm::LogVerbatim("XrdAdaptorInternal")
610  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
612  return;
613  }
614  }
615  if (m_activeSources.size() < 2) {
616  auto oldSources = m_activeSources;
617  m_activeSources.push_back(source);
618  reportSiteChange(oldSources, m_activeSources);
619  queueUpdateCurrentServer(source->ID());
620  } else {
621  m_inactiveSources.push_back(source);
622  }
623  } else { // File-open failure - wait at least 120s before next attempt.
624  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
626  }
627 }
628 
629 std::future<IOSize> XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist) {
630  //Use a copy of m_activeSources and m_inactiveSources throughout this function
631  // in order to avoid holding the lock a long time and causing a deadlock.
632  // When the function is over we will update the values of the containers
633  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
634  {
635  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
636  activeSources = m_activeSources;
637  inactiveSources = m_inactiveSources;
638  }
639  //Make sure we update changes when we leave the function
640  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
641  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
642  m_activeSources = std::move(activeSources);
643  m_inactiveSources = std::move(inactiveSources);
644  });
645 
647 
648  timespec now;
649  GET_CLOCK_MONOTONIC(now);
650 
651  edm::CPUTimer timer;
652  timer.start();
653 
654  if (activeSources.size() == 1) {
655  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
656  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
657  activeSources[0]->handle(c_ptr);
658  return c_ptr->get_future();
659  }
660  // Make sure active
661  else if (activeSources.empty()) {
663  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
664  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
665  ex.addContext("In XrdAdaptor::RequestManager::handle()");
666  addConnections(ex);
667  throw ex;
668  }
669 
670  assert(iolist.get());
671  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
672  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
673  splitClientRequest(*iolist, *req1, *req2, activeSources);
674 
675  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
676  // CheckSources may have removed a source
677  if (activeSources.size() == 1) {
678  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
679  activeSources[0]->handle(c_ptr);
680  return c_ptr->get_future();
681  }
682 
683  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
684  std::future<IOSize> future1, future2;
685  if (!req1->empty()) {
686  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
687  activeSources[0]->handle(c_ptr1);
688  future1 = c_ptr1->get_future();
689  }
690  if (!req2->empty()) {
691  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
692  activeSources[1]->handle(c_ptr2);
693  future2 = c_ptr2->get_future();
694  }
695  if (!req1->empty() && !req2->empty()) {
696  std::future<IOSize> task =
697  std::async(std::launch::deferred,
698  [](std::future<IOSize> a, std::future<IOSize> b) {
699  // Wait until *both* results are available. This is essential
700  // as the callback may try referencing the RequestManager. If one
701  // throws an exception (causing the RequestManager to be destroyed by
702  // XrdFile) and the other has a failure, then the recovery code will
703  // reference the destroyed RequestManager.
704  //
705  // Unlike other places where we use shared/weak ptrs to maintain object
706  // lifetime and destruction asynchronously, we *cannot* destroy the request
707  // asynchronously as it is associated with a ROOT buffer. We must wait until we
708  // are guaranteed that XrdCl will not write into the ROOT buffer before we
709  // can return.
710  b.wait();
711  a.wait();
712  return b.get() + a.get();
713  },
714  std::move(future1),
715  std::move(future2));
716  timer.stop();
717  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
718  return task;
719  } else if (!req1->empty()) {
720  return future1;
721  } else if (!req2->empty()) {
722  return future2;
723  } else { // Degenerate case - no bytes to read.
724  std::promise<IOSize> p;
725  p.set_value(0);
726  return p.get_future();
727  }
728 }
729 
730 void RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status) {
731  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
732 
733  // Fail early for invalid responses - XrdFile has a separate path for handling this.
734  if (c_status.code == XrdCl::errInvalidResponse) {
735  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
737  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
738  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
739  << ") => Invalid ReadV response from server";
740  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
741  addConnections(ex);
742  throw ex;
743  }
744  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
745 
746  // Note that we do not delete the Source itself. That is because this
747  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
748  // In such a case, if you close a file in the handler, it will deadlock
749  m_disabledSourceStrings.insert(source_ptr->ID());
750  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
751  m_disabledSources.insert(source_ptr);
752 
753  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
754  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
755  auto oldSources = m_activeSources;
756  m_activeSources.erase(m_activeSources.begin());
757  reportSiteChange(oldSources, m_activeSources);
758  } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
759  auto oldSources = m_activeSources;
760  m_activeSources.erase(m_activeSources.begin() + 1);
761  reportSiteChange(oldSources, m_activeSources);
762  }
763  std::shared_ptr<Source> new_source;
764  if (m_activeSources.empty()) {
765  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
766  timespec now;
767  GET_CLOCK_MONOTONIC(now);
769  // Note we only wait for 180 seconds here. This is because we've already failed
770  // once and the likelihood the program has some inconsistent state is decent.
771  // We'd much rather fail hard than deadlock!
772  sentry.unlock();
773  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
774  if (status == std::future_status::timeout) {
776  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
777  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
778  << ") => timeout when waiting for file open";
779  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
780  addConnections(ex);
781  throw ex;
782  } else {
783  try {
784  new_source = future.get();
785  } catch (edm::Exception &ex) {
786  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
787  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
788  throw;
789  }
790  }
791 
792  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
793  m_disabledSourceStrings.end()) {
794  // The server gave us back a data node we requested excluded. Fatal!
796  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
797  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
798  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
799  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
800  addConnections(ex);
801  throw ex;
802  }
803  sentry.lock();
804 
805  auto oldSources = m_activeSources;
806  m_activeSources.push_back(new_source);
807  reportSiteChange(oldSources, m_activeSources);
808  } else {
809  new_source = m_activeSources[0];
810  }
811  new_source->handle(c_ptr);
812 }
813 
814 static void consumeChunkFront(size_t &front,
815  std::vector<IOPosBuffer> &input,
816  std::vector<IOPosBuffer> &output,
817  IOSize chunksize) {
818  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
819  IOPosBuffer &io = input[front];
820  IOPosBuffer &outio = output.back();
821  if (io.size() > chunksize) {
822  IOSize consumed;
823  if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
824  (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
825  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
826  consumed = (XRD_CL_MAX_CHUNK - outio.size());
827  outio.set_size(XRD_CL_MAX_CHUNK);
828  } else {
829  consumed = chunksize;
830  outio.set_size(outio.size() + consumed);
831  }
832  } else {
833  consumed = chunksize;
834  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
835  }
836  chunksize -= consumed;
837  IOSize newsize = io.size() - consumed;
838  IOOffset newoffset = io.offset() + consumed;
839  void *newdata = static_cast<char *>(io.data()) + consumed;
840  io.set_offset(newoffset);
841  io.set_data(newdata);
842  io.set_size(newsize);
843  } else if (io.size() == 0) {
844  front++;
845  } else {
846  output.push_back(io);
847  chunksize -= io.size();
848  front++;
849  }
850  }
851 }
852 
853 static void consumeChunkBack(size_t front,
854  std::vector<IOPosBuffer> &input,
855  std::vector<IOPosBuffer> &output,
856  IOSize chunksize) {
857  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
858  IOPosBuffer &io = input.back();
859  IOPosBuffer &outio = output.back();
860  if (io.size() > chunksize) {
861  IOSize consumed;
862  if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
863  (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
864  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
865  consumed = (XRD_CL_MAX_CHUNK - outio.size());
866  outio.set_size(XRD_CL_MAX_CHUNK);
867  } else {
868  consumed = chunksize;
869  outio.set_size(outio.size() + consumed);
870  }
871  } else {
872  consumed = chunksize;
873  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
874  }
875  chunksize -= consumed;
876  IOSize newsize = io.size() - consumed;
877  IOOffset newoffset = io.offset() + consumed;
878  void *newdata = static_cast<char *>(io.data()) + consumed;
879  io.set_offset(newoffset);
880  io.set_data(newdata);
881  io.set_size(newsize);
882  } else if (io.size() == 0) {
883  input.pop_back();
884  } else {
885  output.push_back(io);
886  chunksize -= io.size();
887  input.pop_back();
888  }
889  }
890 }
891 
892 static IOSize validateList(const std::vector<IOPosBuffer> req) {
893  IOSize total = 0;
894  off_t last_offset = -1;
895  for (const auto &it : req) {
896  total += it.size();
897  assert(it.offset() > last_offset);
898  last_offset = it.offset();
899  assert(it.size() <= XRD_CL_MAX_CHUNK);
900  assert(it.offset() < 0x1ffffffffff);
901  }
902  assert(req.size() <= 1024);
903  return total;
904 }
905 
906 void XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist,
907  std::vector<IOPosBuffer> &req1,
908  std::vector<IOPosBuffer> &req2,
909  std::vector<std::shared_ptr<Source>> const &activeSources) const {
910  if (iolist.empty())
911  return;
912  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
913  req1.reserve(iolist.size() / 2 + 1);
914  req2.reserve(iolist.size() / 2 + 1);
915  size_t front = 0;
916 
917  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
918  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
919  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
920  IOSize chunk1, chunk2;
921  // Make sure the chunk size is at least 1024; little point to reads less than that size.
922  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
923  static_cast<IOSize>(1024));
924  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
925  static_cast<IOSize>(1024));
926 
927  IOSize size_orig = 0;
928  for (const auto &it : iolist)
929  size_orig += it.size();
930 
931  while (tmp_iolist.size() - front > 0) {
932  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
933  (req2.size() >=
934  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
935  // are passed to the request manager. However, because we have a max chunk size, we increase
936  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
937  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
938  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
940  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
941  << ", permissions=0" << std::oct << m_perms << std::dec
942  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
943  "reported to CMSSW developers.";
944  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
945  addConnections(ex);
946  std::stringstream ss;
947  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
948  ex.addAdditionalInfo(ss.str());
949  std::stringstream ss2;
950  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
951  ex.addAdditionalInfo(ss2.str());
952  throw ex;
953  }
954  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
955  consumeChunkFront(front, tmp_iolist, req1, chunk1);
956  }
957  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
958  consumeChunkBack(front, tmp_iolist, req2, chunk2);
959  }
960  }
961  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
962  return left.offset() < right.offset();
963  });
964  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
965  return left.offset() < right.offset();
966  });
967 
968  IOSize size1 = validateList(req1);
969  IOSize size2 = validateList(req2);
970 
971  assert(size_orig == size1 + size2);
972 
973  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
974  << " bytes) split into requests size " << req1.size() << " (" << size1
975  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
976 }
977 
978 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager) : m_manager(manager) {}
979 
980 // Cannot use ~OpenHandler=default as XrdCl::File is not fully
981 // defined in the header.
983 
985  XrdCl::AnyObject *,
986  XrdCl::HostList *hostList_ptr) {
987  // Make sure we get rid of the strong self-reference when the callback finishes.
988  std::shared_ptr<OpenHandler> self = m_self;
989  m_self.reset();
990 
991  // NOTE: as in XrdCl::File (synchronous), we ignore the response object.
992  // Make sure that we set m_outstanding_open to false on exit from this function.
993  // NOTE: we need to pass non-nullptr to unique_ptr in order for the guard to run
994  std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> outstanding_guard(
995  this, [&](OpenHandler *) { m_outstanding_open = false; });
996 
997  std::shared_ptr<Source> source;
998  std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
999  std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1000 
1001  auto manager = m_manager.lock();
1002  // Manager object has already been deleted. Cleanup the
1003  // response objects, remove our self-reference, and ignore the response.
1004  if (!manager) {
1005  return;
1006  }
1007  //if we need to delete the File object we must do it outside
1008  // of the lock to avoid a potential deadlock
1009  std::unique_ptr<XrdCl::File> releaseFile;
1010  {
1011  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1012 
1013  if (status->IsOK()) {
1015  timespec now;
1016  GET_CLOCK_MONOTONIC(now);
1017 
1018  std::string excludeString;
1019  Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1020 
1021  source.reset(new Source(now, std::move(m_file), excludeString));
1022  m_promise.set_value(source);
1023  } else {
1024  releaseFile = std::move(m_file);
1026  ex << "XrdCl::File::Open(name='" << manager->m_name << "', flags=0x" << std::hex << manager->m_flags
1027  << ", permissions=0" << std::oct << manager->m_perms << std::dec << ") => error '" << status->ToStr()
1028  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1029  ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1030  manager->addConnections(ex);
1031 
1032  // Brian, should we do something like this:
1033  // if (status.status == XrdCl::errRedirectLimit) {
1034  // // The following method does not exist (yet), would probaly need a multiplier for OPEN_DELAY.
1035  // // Note that with XCache cluster one will never get multiple sources.
1036  // manager->increaseMultiSourceInterval();
1037  // }
1038 
1039  m_promise.set_exception(std::make_exception_ptr(ex));
1040  }
1041  }
1042  manager->handleOpen(*status, source);
1043 }
1044 
1046  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1047 
1048  if (!m_file.get()) {
1049  return "(no open in progress)";
1050  }
1051  std::string dataServer;
1052  m_file->GetProperty("DataServer", dataServer);
1053  if (dataServer.empty()) {
1054  return "(unknown source)";
1055  }
1056  return dataServer;
1057 }
1058 
1059 std::shared_future<std::shared_ptr<Source>> XrdAdaptor::RequestManager::OpenHandler::open() {
1060  auto manager_ptr = m_manager.lock();
1061  if (!manager_ptr) {
1063  ex << "XrdCl::File::Open() =>"
1064  << " error: OpenHandler called within an invalid RequestManager context."
1065  << " This is a logic error and should be reported to the CMSSW developers.";
1066  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1067  throw ex;
1068  }
1069  RequestManager &manager = *manager_ptr;
1070  auto self_ptr = m_self_weak.lock();
1071  if (!self_ptr) {
1073  ex << "XrdCl::File::Open() => error: "
1074  << "OpenHandler called after it was deleted. This is a logic error "
1075  << "and should be reported to the CMSSW developers.";
1076  ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1077  throw ex;
1078  }
1079 
1080  // NOTE NOTE: we look at this variable *without* the lock. This means the method
1081  // is not thread-safe; the caller is responsible to verify it is not called from
1082  // multiple threads simultaneously.
1083  //
1084  // This is done because ::open may be called from a Xrootd callback; if we
1085  // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1086  // and make a call into xrootd (when it invokes m_file.reset()). Hence, our callback
1087  // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1088  // an Xrootd mutex and tries to hold m_mutex. This is a classic deadlock.
1089  if (m_outstanding_open) {
1090  return m_shared_future;
1091  }
1092  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1093  std::promise<std::shared_ptr<Source>> new_promise;
1094  m_promise.swap(new_promise);
1095  m_shared_future = m_promise.get_future().share();
1096 
1097  auto opaque = manager.prepareOpaqueString();
1098  std::string new_name = manager.m_name + ((manager.m_name.find("?") == manager.m_name.npos) ? "?" : "&") + opaque;
1099  edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1100  m_file.reset(new XrdCl::File());
1101  m_outstanding_open = true;
1102 
1103  // Always make sure we release m_file and set m_outstanding_open to false on error.
1104  std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> exit_guard(this, [&](OpenHandler *) {
1105  m_outstanding_open = false;
1106  m_file.reset();
1107  });
1108 
1109  XrdCl::XRootDStatus status;
1110  if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK()) {
1112  ex << "XrdCl::File::Open(name='" << new_name << "', flags=0x" << std::hex << manager.m_flags << ", permissions=0"
1113  << std::oct << manager.m_perms << std::dec << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
1114  << ", code=" << status.code << ")";
1115  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1116  manager.addConnections(ex);
1117  throw ex;
1118  }
1119  exit_guard.release();
1120  // Have a strong self-reference for as long as the callback is in-progress.
1121  m_self = self_ptr;
1122  return m_shared_future;
1123 }
std::shared_future< std::shared_ptr< Source > > open()
double seconds()
void start()
Definition: CPUTimer.cc:68
std::shared_ptr< XrdCl::File > getActiveFile() const
void getDisabledSourceNames(std::vector< std::string > &sources) const
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:290
#define nullptr
std::string prepareOpaqueString() const
#define LIKELY(x)
Definition: Likely.h:20
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
#define XRD_CL_MAX_CHUNK
OpenHandler(std::weak_ptr< RequestManager > manager)
std::shared_ptr< OpenHandler > m_self
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
int timeout
Definition: mps_check.py:53
double q2[4]
Definition: TauolaWrapper.h:88
std::weak_ptr< OpenHandler > m_self_weak
static std::string const input
Definition: EdmProvDump.cc:48
#define XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static void SendMonitoringInfo(XrdCl::File &file)
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
std::shared_future< std::shared_ptr< Source > > m_shared_future
void queueUpdateCurrentServer(const std::string &)
std::weak_ptr< RequestManager > m_manager
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:248
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:258
std::atomic< std::string * > m_serverToAdvertise
bool isAvailable() const
Definition: Service.h:40
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
Times stop()
Definition: CPUTimer.cc:87
void clearMessage()
Definition: Exception.cc:159
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
void * data(void) const
Definition: IOPosBuffer.h:59
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void clearContext()
Definition: Exception.cc:161
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void getActiveSourceNames(std::vector< std::string > &sources) const
IOSize size(void) const
Definition: IOPosBuffer.h:64
XrdCl::OpenFlags::Flags m_flags
double q1[4]
Definition: TauolaWrapper.h:87
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:165
int64_t IOOffset
Definition: IOTypes.h:19
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::atomic< unsigned > m_excluded_active_count
double a
Definition: hdecay.h:121
std::shared_ptr< Source > pickSingleSource()
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:328
static IOSize validateList(const std::vector< IOPosBuffer > req)
std::unique_ptr< XrdCl::File > m_file
size_t IOSize
Definition: IOTypes.h:14
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
XrdCl::FileSystem & fs()
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
SendMonitoringInfoHandler(const std::string &url)
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:221
static std::string const source
Definition: EdmProvDump.cc:47
void clearAdditionalInfo()
Definition: Exception.cc:163
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)