CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
XrdRequestManager.cc
Go to the documentation of this file.
1 
2 #include <assert.h>
3 #include <iostream>
4 #include <algorithm>
5 #include <netdb.h>
6 
7 #include "XrdCl/XrdClFile.hh"
8 #include "XrdCl/XrdClDefaultEnv.hh"
9 #include "XrdCl/XrdClFileSystem.hh"
10 
16 
17 #include "XrdStatistics.h"
19 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
20 
21 #define XRD_CL_MAX_CHUNK 512 * 1024
22 
23 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5
24 
25 #ifdef XRD_FAKE_OPEN_PROBE
26 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100
27 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20
28 // This is the minimal difference in quality required to swap an active and inactive source
29 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0
30 #else
31 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10
32 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2 * 60
33 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
34 #endif
35 
36 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000
37 
38 #ifdef __MACH__
39 #include <mach/clock.h>
40 #include <mach/mach.h>
41 #define GET_CLOCK_MONOTONIC(ts) \
42  { \
43  clock_serv_t cclock; \
44  mach_timespec_t mts; \
45  host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
46  clock_get_time(cclock, &mts); \
47  mach_port_deallocate(mach_task_self(), cclock); \
48  ts.tv_sec = mts.tv_sec; \
49  ts.tv_nsec = mts.tv_nsec; \
50  }
51 #else
52 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
53 #endif
54 
55 using namespace XrdAdaptor;
56 
57 long long timeDiffMS(const timespec &a, const timespec &b) {
58  long long diff = (a.tv_sec - b.tv_sec) * 1000;
59  diff += (a.tv_nsec - b.tv_nsec) / 1e6;
60  return diff;
61 }
62 
63 /*
64  * We do not care about the response of sending the monitoring information;
65  * this handler class simply frees any returned buffer to prevent memory leaks.
66  */
67 class SendMonitoringInfoHandler : boost::noncopyable, public XrdCl::ResponseHandler
68 {
69  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
70  {
71  if (response)
72  {
73  XrdCl::Buffer *buffer = nullptr;
74  response->Get(buffer);
75  response->Set(static_cast<int*>(nullptr));
76  delete buffer;
77  }
78  // Send Info has a response object; we must delete it.
79  delete response;
80  delete status;
81  }
82 };
83 
85 
87  // Do not send this to a dCache data server as they return an error.
88  // In some versions of dCache, sending the monitoring information causes
89  // the server to close the connection - resulting in failures.
90  if (Source::isDCachePool(file)) {
91  return;
92  }
93 
94  // Send the monitoring info, if available.
96  std::string lastUrl;
97  file.GetProperty("LastURL", lastUrl);
98  if (jobId && lastUrl.size())
99  {
100  XrdCl::URL url(lastUrl);
101  XrdCl::FileSystem fs(url);
102  if (!(fs.SendInfo(jobId, &nullHandler, 30).IsOK()))
103  {
104  edm::LogWarning("XrdAdaptorInternal") << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
105  }
106  edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
107  }
108 }
109 
111  : m_serverToAdvertise(nullptr),
112  m_timeout(XRD_DEFAULT_TIMEOUT),
113  m_nextInitialSourceToggle(false),
114  m_name(filename),
115  m_flags(flags),
116  m_perms(perms),
117  m_distribution(0, 100),
118  m_excluded_active_count(0) {}
119 
120 void RequestManager::initialize(std::weak_ptr<RequestManager> self) {
122 
123  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
124  if (env) {
125  env->GetInt("StreamErrorWindow", m_timeout);
126  }
127 
128  std::string orig_site;
129  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos)) {
130  std::string hostname;
131  if (Source::getHostname(orig_site, hostname)) {
132  Source::getDomain(hostname, orig_site);
133  }
134  }
135 
136  std::unique_ptr<XrdCl::File> file;
138  bool validFile = false;
139  const int retries = 5;
140  std::string excludeString;
141  for (int idx = 0; idx < retries; idx++) {
142  file.reset(new XrdCl::File());
143  auto opaque = prepareOpaqueString();
144  std::string new_filename = m_name + (opaque.size() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
145  SyncHostResponseHandler handler;
146  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
147  if (!openStatus
148  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
149  // server and it was marked bad - xrootd couldn't even queue up the request internally!
150  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
151  // redirector being down or authentication failures.
152  ex.clearMessage();
153  ex.clearContext();
154  ex.clearAdditionalInfo();
155  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
156  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
157  << ", code=" << openStatus.code << ")";
158  ex.addContext("Calling XrdFile::open()");
159  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
160  throw ex;
161  }
162  handler.WaitForResponse();
163  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
164  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
165  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
166  assert(status);
167  if (status->IsOK()) {
168  validFile = true;
169  break;
170  } else {
171  ex.clearMessage();
172  ex.clearContext();
173  ex.clearAdditionalInfo();
174  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
175  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
176  << ", code=" << status->code << ")";
177  ex.addContext("Calling XrdFile::open()");
178  addConnections(ex);
179  std::string dataServer, lastUrl;
180  file->GetProperty("DataServer", dataServer);
181  file->GetProperty("LastURL", lastUrl);
182  if (dataServer.size())
183  {
184  ex.addAdditionalInfo("Problematic data server: " + dataServer);
185  }
186  if (lastUrl.size())
187  {
188  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
189  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
190  }
191  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
192  m_disabledSourceStrings.end()) {
193  ex << ". No additional data servers were found.";
194  throw ex;
195  }
196  if (dataServer.size())
197  {
198  m_disabledSourceStrings.insert(dataServer);
199  m_disabledExcludeStrings.insert(excludeString);
200  }
201  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
202  if (lastUrl == new_filename) {
203  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
204  throw ex;
205  }
206  }
207  }
208  if (!validFile) {
209  throw ex;
210  }
211  SendMonitoringInfo(*file);
212 
213  timespec ts;
215 
216  std::shared_ptr<Source> source(new Source(ts, std::move(file), excludeString));
217  {
218  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
219  auto oldList = m_activeSources;
220  m_activeSources.push_back(source);
222  }
223  queueUpdateCurrentServer(source->ID());
225 
226  m_lastSourceCheck = ts;
227  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
229 }
230 
239  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
240  // a pending update. *However*, since we call this for every read, we'll get it
241  // eventually.
242  if (likely(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
243  return;
244  }
245  std::string *hostname_ptr;
246  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
247  std::unique_ptr<std::string> hostname(hostname_ptr);
249  if (statsService.isAvailable()) {
250  statsService->setCurrentServer(m_name, *hostname_ptr);
251  }
252  }
253 }
254 
256  std::unique_ptr<std::string> hostname(new std::string(id));
257  if (Source::getHostname(id, *hostname)) {
258  std::string *null_hostname = nullptr;
259  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
260  hostname.release();
261  }
262  }
263 }
264 
265 namespace {
266  std::string formatSites(std::vector<std::shared_ptr<Source>> const &iSources) {
267  std::string siteA, siteB;
268  if (iSources.size()) {siteA = iSources[0]->Site();}
269  if (iSources.size() == 2) {siteB = iSources[1]->Site();}
270  std::string siteList = siteA;
271  if (siteB.size() && (siteB != siteA)) {siteList = siteA + ", " + siteB;}
272  return siteList;
273  }
274 }
275 
276 void
277 RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source> > const& iOld,
278  std::vector<std::shared_ptr<Source> > const& iNew,
279  std::string orig_site) const
280 {
281  auto siteList = formatSites(iNew);
282  if (orig_site.size() && (orig_site != siteList))
283  {
284  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
285  } else {
286  auto oldSites = formatSites(iOld);
287  if (!orig_site.size() && (siteList != oldSites))
288  {
289  if (oldSites.size() >0 )
290  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
291  }
292  }
293 }
294 
296  IOSize requestSize,
297  std::vector<std::shared_ptr<Source>> &activeSources,
298  std::vector<std::shared_ptr<Source>> &inactiveSources) {
299  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
300  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
301  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
302  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
303  { // Be more aggressive about getting rid of very bad sources.
304  compareSources(now, 0, 1, activeSources, inactiveSources);
305  compareSources(now, 1, 0, activeSources, inactiveSources);
306  }
307  if (timeDiffMS(now, m_nextActiveSourceCheck) > 0) {
308  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
309  }
310  }
311 }
312 
313 bool RequestManager::compareSources(const timespec &now,
314  unsigned a,
315  unsigned b,
316  std::vector<std::shared_ptr<Source>> &activeSources,
317  std::vector<std::shared_ptr<Source>> &inactiveSources) const {
318  if (activeSources.size() < std::max(a, b) + 1) {
319  return false;
320  }
321 
322  bool findNewSource = false;
323  if ((activeSources[a]->getQuality() > 5130) ||
324  ((activeSources[a]->getQuality() > 260) &&
325  (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
326  edm::LogVerbatim("XrdAdaptorInternal")
327  << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
328  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
329  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
330  findNewSource = true;
331  }
332  activeSources[a]->setLastDowngrade(now);
333  inactiveSources.emplace_back(activeSources[a]);
334  auto oldSources = activeSources;
335  activeSources.erase(activeSources.begin() + a);
336  reportSiteChange(oldSources, activeSources);
337  }
338  return findNewSource;
339 }
340 
342  IOSize requestSize,
343  std::vector<std::shared_ptr<Source>> &activeSources,
344  std::vector<std::shared_ptr<Source>> &inactiveSources) {
345  bool findNewSource = false;
346  if (activeSources.size() <= 1) {
347  findNewSource = true;
348  } else if (activeSources.size() > 1) {
349  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
350  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
351  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
352  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
353 
354  // NOTE: We could probably replace the copy with a better sort function.
355  // However, there are typically very few sources and the correctness is more obvious right now.
356  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
357  eligibleInactiveSources.reserve(inactiveSources.size());
358  for (const auto &source : inactiveSources) {
359  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
360  eligibleInactiveSources.push_back(source);
361  }
362  }
363  auto bestInactiveSource =
364  std::min_element(eligibleInactiveSources.begin(),
365  eligibleInactiveSources.end(),
366  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
367  return s1->getQuality() < s2->getQuality();
368  });
369  auto worstActiveSource = std::max_element(activeSources.cbegin(),
370  activeSources.cend(),
371  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
372  return s1->getQuality() < s2->getQuality();
373  });
374  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
375  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
376  << ", quality " << (*bestInactiveSource)->getQuality();
377  }
378  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
379  << ", quality " << (*worstActiveSource)->getQuality();
380  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
381  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
382  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
383  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
384  auto oldSources = activeSources;
385  activeSources.push_back(*bestInactiveSource);
386  reportSiteChange(oldSources, activeSources);
387  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
388  if (it->get() == bestInactiveSource->get()) {
389  inactiveSources.erase(it);
390  break;
391  }
392  } else
393  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
394  (*worstActiveSource)->getQuality() >
395  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
396  edm::LogVerbatim("XrdAdaptorInternal")
397  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
398  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
399  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
400  (*worstActiveSource)->setLastDowngrade(now);
401  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
402  if (it->get() == bestInactiveSource->get()) {
403  inactiveSources.erase(it);
404  break;
405  }
406  inactiveSources.emplace_back(std::move(*worstActiveSource));
407  auto oldSources = activeSources;
408  activeSources.erase(worstActiveSource);
409  activeSources.emplace_back(std::move(*bestInactiveSource));
410  reportSiteChange(oldSources, activeSources);
411  eligibleInactiveSources.clear();
412  for (const auto &source : inactiveSources)
413  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
414  eligibleInactiveSources.push_back(source);
415  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
416  eligibleInactiveSources.end(),
417  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
418  return s1->getQuality() < s2->getQuality();
419  });
420  worstActiveSource = std::max_element(activeSources.begin(),
421  activeSources.end(),
422  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
423  return s1->getQuality() < s2->getQuality();
424  });
425  }
426  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
427  float r = m_distribution(m_generator);
429  findNewSource = true;
430  }
431  }
432  }
433  if (findNewSource) {
434  m_open_handler->open();
436  }
437 
438  // Only aggressively look for new sources if we don't have two.
439  if (activeSources.size() == 2) {
441  } else {
442  now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
443  }
445 }
446 
447 std::shared_ptr<XrdCl::File> RequestManager::getActiveFile() const {
448  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
449  if (m_activeSources.empty()) {
451  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
452  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
453  ex.addContext("In XrdAdaptor::RequestManager::handle()");
454  addConnections(ex);
455  throw ex;
456  }
457  return m_activeSources[0]->getFileHandle();
458 }
459 
460 void RequestManager::getActiveSourceNames(std::vector<std::string> &sources) const {
461  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
462  sources.reserve(m_activeSources.size());
463  for (auto const &source : m_activeSources) {
464  sources.push_back(source->ID());
465  }
466 }
467 
468 void RequestManager::getPrettyActiveSourceNames(std::vector<std::string> &sources) const {
469  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
470  sources.reserve(m_activeSources.size());
471  for (auto const &source : m_activeSources) {
472  sources.push_back(source->PrettyID());
473  }
474 }
475 
476 void RequestManager::getDisabledSourceNames(std::vector<std::string> &sources) const {
477  sources.reserve(m_disabledSourceStrings.size());
478  for (auto const &source : m_disabledSourceStrings) {
479  sources.push_back(source);
480  }
481 }
482 
484  std::vector<std::string> sources;
486  for (auto const &source : sources) {
487  ex.addAdditionalInfo("Active source: " + source);
488  }
489  sources.clear();
490  getDisabledSourceNames(sources);
491  for (auto const &source : sources) {
492  ex.addAdditionalInfo("Disabled source: " + source);
493  }
494 }
495 
496 std::shared_ptr<Source> RequestManager::pickSingleSource() {
497  std::shared_ptr<Source> source = nullptr;
498  {
499  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
500  if (m_activeSources.size() == 2) {
502  source = m_activeSources[0];
504  } else {
505  source = m_activeSources[1];
507  }
508  } else if (m_activeSources.empty()) {
510  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
511  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
512  ex.addContext("In XrdAdaptor::RequestManager::handle()");
513  addConnections(ex);
514  throw ex;
515  } else {
516  source = m_activeSources[0];
517  }
518  }
519  return source;
520 }
521 
522 std::future<IOSize> RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr) {
523  assert(c_ptr.get());
524  timespec now;
525  GET_CLOCK_MONOTONIC(now);
526  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
527  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
528  {
529  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
530  activeSources = m_activeSources;
531  inactiveSources = m_inactiveSources;
532  }
533  {
534  //make sure we update values before calling pickSingelSource
535  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
536  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
537  m_activeSources = std::move(activeSources);
538  m_inactiveSources = std::move(inactiveSources);
539  });
540 
541  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
542  }
543 
544  std::shared_ptr<Source> source = pickSingleSource();
545  source->handle(c_ptr);
546  return c_ptr->get_future();
547 }
548 
550  std::stringstream ss;
551  ss << "tried=";
552  size_t count = 0;
553  {
554  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
555 
556  for (const auto &it : m_activeSources) {
557  count++;
558  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
559  }
560  for (const auto &it : m_inactiveSources) {
561  count++;
562  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
563  }
564  }
565  for (const auto &it : m_disabledExcludeStrings) {
566  count++;
567  ss << it.substr(0, it.find(":")) << ",";
568  }
569  if (count) {
570  std::string tmp_str = ss.str();
571  return tmp_str.substr(0, tmp_str.size() - 1);
572  }
573  return "";
574 }
575 
576 void XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source) {
577  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
578  if (status.IsOK()) {
579  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
580  for (const auto &s : m_activeSources) {
581  if (source->ID() == s->ID()) {
582  edm::LogVerbatim("XrdAdaptorInternal")
583  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
584  unsigned returned_count = ++m_excluded_active_count;
585  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
586  if (returned_count >= 3) {
587  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2 * XRD_ADAPTOR_SHORT_OPEN_DELAY;
588  }
589  return;
590  }
591  }
592  for (const auto &s : m_inactiveSources) {
593  if (source->ID() == s->ID()) {
594  edm::LogVerbatim("XrdAdaptorInternal")
595  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
596  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
597  return;
598  }
599  }
600  if (m_activeSources.size() < 2) {
601  auto oldSources = m_activeSources;
602  m_activeSources.push_back(source);
603  reportSiteChange(oldSources, m_activeSources);
604  queueUpdateCurrentServer(source->ID());
605  } else {
606  m_inactiveSources.push_back(source);
607  }
608  } else { // File-open failure - wait at least 120s before next attempt.
609  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
610  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
611  }
612 }
613 
614 std::future<IOSize> XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist) {
615  //Use a copy of m_activeSources and m_inactiveSources throughout this function
616  // in order to avoid holding the lock a long time and causing a deadlock.
617  // When the function is over we will update the values of the containers
618  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
619  {
620  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
621  activeSources = m_activeSources;
622  inactiveSources = m_inactiveSources;
623  }
624  //Make sure we update changes when we leave the function
625  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
626  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
627  m_activeSources = std::move(activeSources);
628  m_inactiveSources = std::move(inactiveSources);
629  });
630 
631  updateCurrentServer();
632 
633  timespec now;
634  GET_CLOCK_MONOTONIC(now);
635 
636  edm::CPUTimer timer;
637  timer.start();
638 
639  if (activeSources.size() == 1) {
640  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
641  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
642  activeSources[0]->handle(c_ptr);
643  return c_ptr->get_future();
644  }
645  // Make sure active
646  else if (activeSources.empty()) {
648  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
649  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
650  ex.addContext("In XrdAdaptor::RequestManager::handle()");
651  addConnections(ex);
652  throw ex;
653  }
654 
655  assert(iolist.get());
656  std::shared_ptr<std::vector<IOPosBuffer> > req1(new std::vector<IOPosBuffer>);
657  std::shared_ptr<std::vector<IOPosBuffer> > req2(new std::vector<IOPosBuffer>);
658  splitClientRequest(*iolist, *req1, *req2, activeSources);
659 
660  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
661  // CheckSources may have removed a source
662  if (activeSources.size() == 1) {
663  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
664  activeSources[0]->handle(c_ptr);
665  return c_ptr->get_future();
666  }
667 
668  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
669  std::future<IOSize> future1, future2;
670  if (req1->size())
671  {
672  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
673  activeSources[0]->handle(c_ptr1);
674  future1 = c_ptr1->get_future();
675  }
676  if (req2->size())
677  {
678  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
679  activeSources[1]->handle(c_ptr2);
680  future2 = c_ptr2->get_future();
681  }
682  if (req1->size() && req2->size()) {
683  std::future<IOSize> task =
684  std::async(std::launch::deferred,
685  [](std::future<IOSize> a, std::future<IOSize> b) {
686  // Wait until *both* results are available. This is essential
687  // as the callback may try referencing the RequestManager. If one
688  // throws an exception (causing the RequestManager to be destroyed by
689  // XrdFile) and the other has a failure, then the recovery code will
690  // reference the destroyed RequestManager.
691  //
692  // Unlike other places where we use shared/weak ptrs to maintain object
693  // lifetime and destruction asynchronously, we *cannot* destroy the request
694  // asynchronously as it is associated with a ROOT buffer. We must wait until we
695  // are guaranteed that XrdCl will not write into the ROOT buffer before we
696  // can return.
697  b.wait();
698  a.wait();
699  return b.get() + a.get();
700  },
701  std::move(future1),
702  std::move(future2));
703  timer.stop();
704  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
705  return task;
706  }
707  else if (req1->size()) { return future1; }
708  else if (req2->size()) { return future2; }
709  else
710  { // Degenerate case - no bytes to read.
711  std::promise<IOSize> p; p.set_value(0);
712  return p.get_future();
713  }
714 }
715 
716 void RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status) {
717  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
718 
719  // Fail early for invalid responses - XrdFile has a separate path for handling this.
720  if (c_status.code == XrdCl::errInvalidResponse) {
721  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
723  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
724  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
725  << ") => Invalid ReadV response from server";
726  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
727  addConnections(ex);
728  throw ex;
729  }
730  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
731 
732  // Note that we do not delete the Source itself. That is because this
733  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
734  // In such a case, if you close a file in the handler, it will deadlock
735  m_disabledSourceStrings.insert(source_ptr->ID());
736  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
737  m_disabledSources.insert(source_ptr);
738 
739  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
740  if ((m_activeSources.size() > 0) && (m_activeSources[0].get() == source_ptr.get()))
741  {
742  auto oldSources = m_activeSources;
743  m_activeSources.erase(m_activeSources.begin());
744  reportSiteChange(oldSources, m_activeSources);
745  }
746  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
747  {
748  auto oldSources = m_activeSources;
749  m_activeSources.erase(m_activeSources.begin()+1);
750  reportSiteChange(oldSources, m_activeSources);
751  }
752  std::shared_ptr<Source> new_source;
753  if (m_activeSources.size() == 0) {
754  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
755  timespec now;
756  GET_CLOCK_MONOTONIC(now);
758  // Note we only wait for 180 seconds here. This is because we've already failed
759  // once and the likelihood the program has some inconsistent state is decent.
760  // We'd much rather fail hard than deadlock!
761  sentry.unlock();
762  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
763  if (status == std::future_status::timeout) {
765  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
766  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
767  << ") => timeout when waiting for file open";
768  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
769  addConnections(ex);
770  throw ex;
771  } else {
772  try {
773  new_source = future.get();
774  } catch (edm::Exception &ex) {
775  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
776  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
777  throw;
778  }
779  }
780 
781  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
782  m_disabledSourceStrings.end()) {
783  // The server gave us back a data node we requested excluded. Fatal!
785  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
786  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
787  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
788  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
789  addConnections(ex);
790  throw ex;
791  }
792  sentry.lock();
793 
794  auto oldSources = m_activeSources;
795  m_activeSources.push_back(new_source);
796  reportSiteChange(oldSources, m_activeSources);
797  } else {
798  new_source = m_activeSources[0];
799  }
800  new_source->handle(c_ptr);
801 }
802 
803 static void consumeChunkFront(size_t &front,
804  std::vector<IOPosBuffer> &input,
805  std::vector<IOPosBuffer> &output,
806  IOSize chunksize) {
807  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
808  IOPosBuffer &io = input[front];
809  IOPosBuffer &outio = output.back();
810  if (io.size() > chunksize) {
811  IOSize consumed;
812  if (output.size() && (outio.size() < XRD_CL_MAX_CHUNK) &&
813  (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
814  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
815  consumed = (XRD_CL_MAX_CHUNK - outio.size());
816  outio.set_size(XRD_CL_MAX_CHUNK);
817  } else {
818  consumed = chunksize;
819  outio.set_size(outio.size() + consumed);
820  }
821  } else {
822  consumed = chunksize;
823  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
824  }
825  chunksize -= consumed;
826  IOSize newsize = io.size() - consumed;
827  IOOffset newoffset = io.offset() + consumed;
828  void *newdata = static_cast<char *>(io.data()) + consumed;
829  io.set_offset(newoffset);
830  io.set_data(newdata);
831  io.set_size(newsize);
832  } else if (io.size() == 0) {
833  front++;
834  } else {
835  output.push_back(io);
836  chunksize -= io.size();
837  front++;
838  }
839  }
840 }
841 
842 static void consumeChunkBack(size_t front,
843  std::vector<IOPosBuffer> &input,
844  std::vector<IOPosBuffer> &output,
845  IOSize chunksize) {
846  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
847  IOPosBuffer &io = input.back();
848  IOPosBuffer &outio = output.back();
849  if (io.size() > chunksize) {
850  IOSize consumed;
851  if (output.size() && (outio.size() < XRD_CL_MAX_CHUNK) &&
852  (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
853  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
854  consumed = (XRD_CL_MAX_CHUNK - outio.size());
855  outio.set_size(XRD_CL_MAX_CHUNK);
856  } else {
857  consumed = chunksize;
858  outio.set_size(outio.size() + consumed);
859  }
860  } else {
861  consumed = chunksize;
862  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
863  }
864  chunksize -= consumed;
865  IOSize newsize = io.size() - consumed;
866  IOOffset newoffset = io.offset() + consumed;
867  void *newdata = static_cast<char *>(io.data()) + consumed;
868  io.set_offset(newoffset);
869  io.set_data(newdata);
870  io.set_size(newsize);
871  } else if (io.size() == 0) {
872  input.pop_back();
873  } else {
874  output.push_back(io);
875  chunksize -= io.size();
876  input.pop_back();
877  }
878  }
879 }
880 
881 static IOSize validateList(const std::vector<IOPosBuffer> req) {
882  IOSize total = 0;
883  off_t last_offset = -1;
884  for (const auto &it : req) {
885  total += it.size();
886  assert(it.offset() > last_offset);
887  last_offset = it.offset();
888  assert(it.size() <= XRD_CL_MAX_CHUNK);
889  assert(it.offset() < 0x1ffffffffff);
890  }
891  assert(req.size() <= 1024);
892  return total;
893 }
894 
895 void
896 XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist, std::vector<IOPosBuffer> &req1, std::vector<IOPosBuffer> &req2, std::vector<std::shared_ptr<Source>> const& activeSources) const
897 {
898  if (iolist.size() == 0) return;
899  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
900  req1.reserve(iolist.size() / 2 + 1);
901  req2.reserve(iolist.size() / 2 + 1);
902  size_t front = 0;
903 
904  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
905  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
906  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
907  IOSize chunk1, chunk2;
908  // Make sure the chunk size is at least 1024; little point to reads less than that size.
909  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
910  static_cast<IOSize>(1024));
911  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
912  static_cast<IOSize>(1024));
913 
914  IOSize size_orig = 0;
915  for (const auto &it : iolist)
916  size_orig += it.size();
917 
918  while (tmp_iolist.size() - front > 0) {
919  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
920  (req2.size() >=
921  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
922  // are passed to the request manager. However, because we have a max chunk size, we increase
923  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
924  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
925  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
927  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
928  << ", permissions=0" << std::oct << m_perms << std::dec
929  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
930  "reported to CMSSW developers.";
931  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
932  addConnections(ex);
933  std::stringstream ss;
934  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
935  ex.addAdditionalInfo(ss.str());
936  std::stringstream ss2;
937  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
938  ex.addAdditionalInfo(ss2.str());
939  throw ex;
940  }
941  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
942  consumeChunkFront(front, tmp_iolist, req1, chunk1);
943  }
944  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
945  consumeChunkBack(front, tmp_iolist, req2, chunk2);
946  }
947  }
948  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
949  return left.offset() < right.offset();
950  });
951  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
952  return left.offset() < right.offset();
953  });
954 
955  IOSize size1 = validateList(req1);
956  IOSize size2 = validateList(req2);
957 
958  assert(size_orig == size1 + size2);
959 
960  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
961  << " bytes) split into requests size " << req1.size() << " (" << size1
962  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
963 }
964 
965 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager) : m_manager(manager) {}
966 
967 // Cannot use ~OpenHandler=default as XrdCl::File is not fully
968 // defined in the header.
970 
972  XrdCl::AnyObject *,
973  XrdCl::HostList *hostList_ptr) {
974  // NOTE: as in XrdCl::File (synchronous), we ignore the response object.
975  // Make sure that we set m_outstanding_open to false on exit from this function.
976  std::unique_ptr<char, std::function<void(char*)>> outstanding_guard(nullptr, [&](char*){m_outstanding_open=false;});
977 
978  std::shared_ptr<Source> source;
979  std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
980  std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
981 
982  // Make sure we get rid of the strong self-reference when the callback finishes.
983  std::shared_ptr<OpenHandler> self = m_self;
984  m_self.reset();
985 
986  auto manager = m_manager.lock();
987  // Manager object has already been deleted. Cleanup the
988  // response objects, remove our self-reference, and ignore the response.
989  if (!manager) {
990  return;
991  }
992  //if we need to delete the File object we must do it outside
993  // of the lock to avoid a potential deadlock
994  std::unique_ptr<XrdCl::File> releaseFile;
995  {
996  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
997 
998  if (status->IsOK()) {
999  SendMonitoringInfo(*m_file);
1000  timespec now;
1001  GET_CLOCK_MONOTONIC(now);
1002 
1003  std::string excludeString;
1004  Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1005 
1006  source.reset(new Source(now, std::move(m_file), excludeString));
1007  m_promise.set_value(source);
1008  } else {
1009  releaseFile = std::move(m_file);
1011  ex << "XrdCl::File::Open(name='" << manager->m_name << "', flags=0x" << std::hex << manager->m_flags
1012  << ", permissions=0" << std::oct << manager->m_perms << std::dec << ") => error '" << status->ToStr()
1013  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1014  ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1015  manager->addConnections(ex);
1016 
1017  m_promise.set_exception(std::make_exception_ptr(ex));
1018  }
1019  }
1020  manager->handleOpen(*status, source);
1021 }
1022 
1024  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1025 
1026  if (!m_file.get()) {
1027  return "(no open in progress)";
1028  }
1029  std::string dataServer;
1030  m_file->GetProperty("DataServer", dataServer);
1031  if (!dataServer.size()) { return "(unknown source)"; }
1032  return dataServer;
1033 }
1034 
1035 std::shared_future<std::shared_ptr<Source>> XrdAdaptor::RequestManager::OpenHandler::open() {
1036  auto manager_ptr = m_manager.lock();
1037  if (!manager_ptr) {
1039  ex << "XrdCl::File::Open() =>"
1040  << " error: OpenHandler called within an invalid RequestManager context."
1041  << " This is a logic error and should be reported to the CMSSW developers.";
1042  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1043  throw ex;
1044  }
1045  RequestManager &manager = *manager_ptr;
1046  auto self_ptr = m_self_weak.lock();
1047  if (!self_ptr) {
1049  ex << "XrdCl::File::Open() => error: "
1050  << "OpenHandler called after it was deleted. This is a logic error "
1051  << "and should be reported to the CMSSW developers.";
1052  ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1053  throw ex;
1054  }
1055 
1056  // NOTE NOTE: we look at this variable *without* the lock. This means the method
1057  // is not thread-safe; the caller is responsible to verify it is not called from
1058  // multiple threads simultaneously.
1059  //
1060  // This is done because ::open may be called from a Xrootd callback; if we
1061  // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1062  // and make a call into xrootd (when it invokes m_file.reset()). Hence, our callback
1063  // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1064  // an Xrootd mutex and tries to hold m_mutex. This is a classic deadlock.
1065  if (m_outstanding_open) {
1066  return m_shared_future;
1067  }
1068  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1069  std::promise<std::shared_ptr<Source>> new_promise;
1070  m_promise.swap(new_promise);
1071  m_shared_future = m_promise.get_future().share();
1072 
1073  auto opaque = manager.prepareOpaqueString();
1074  std::string new_name = manager.m_name + ((manager.m_name.find("?") == manager.m_name.npos) ? "?" : "&") + opaque;
1075  edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1076  m_file.reset(new XrdCl::File());
1077  m_outstanding_open = true;
1078 
1079  // Always make sure we release m_file and set m_outstanding_open to false on error.
1080  std::unique_ptr<char, std::function<void(char*)>> exit_guard(nullptr, [&](char*){m_outstanding_open = false; m_file.reset();});
1081 
1082  XrdCl::XRootDStatus status;
1083  if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK()) {
1085  ex << "XrdCl::File::Open(name='" << new_name << "', flags=0x" << std::hex << manager.m_flags << ", permissions=0"
1086  << std::oct << manager.m_perms << std::dec << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
1087  << ", code=" << status.code << ")";
1088  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1089  manager.addConnections(ex);
1090  throw ex;
1091  }
1092  exit_guard.release();
1093  // Have a strong self-reference for as long as the callback is in-progress.
1094  m_self = self_ptr;
1095  return m_shared_future;
1096 }
std::shared_future< std::shared_ptr< Source > > open()
double seconds()
void start()
Definition: CPUTimer.cc:74
std::shared_ptr< XrdCl::File > getActiveFile() const
void getDisabledSourceNames(std::vector< std::string > &sources) const
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
assert(m_qm.get())
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:307
std::string prepareOpaqueString() const
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
#define XRD_CL_MAX_CHUNK
OpenHandler(std::weak_ptr< RequestManager > manager)
#define nullptr
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
int timeout
Definition: mps_check.py:51
list diff
Definition: mps_update.py:85
double q2[4]
Definition: TauolaWrapper.h:88
SendMonitoringInfoHandler nullHandler
static std::string const input
Definition: EdmProvDump.cc:44
tuple s2
Definition: indexGen.py:106
#define XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static void SendMonitoringInfo(XrdCl::File &file)
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
#define likely(x)
void queueUpdateCurrentServer(const std::string &)
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
string URL
Definition: rrClient.py:4
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:255
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:266
std::atomic< std::string * > m_serverToAdvertise
def move
Definition: eostools.py:510
bool isAvailable() const
Definition: Service.h:46
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
XrdSiteStatisticsInformation * statsService
Definition: XrdSource.cc:219
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
Times stop()
Definition: CPUTimer.cc:94
void clearMessage()
Definition: Exception.cc:215
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void * data(void) const
Definition: IOPosBuffer.h:59
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void clearContext()
Definition: Exception.cc:219
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void getActiveSourceNames(std::vector< std::string > &sources) const
IOSize size(void) const
Definition: IOPosBuffer.h:64
XrdCl::OpenFlags::Flags m_flags
virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
double q1[4]
Definition: TauolaWrapper.h:87
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
double a
Definition: hdecay.h:121
std::shared_ptr< Source > pickSingleSource()
tuple filename
Definition: lut2db_cfg.py:20
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:344
static IOSize validateList(const std::vector< IOPosBuffer > req)
size_t IOSize
Definition: IOTypes.h:14
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
volatile std::atomic< bool > shutdown_flag false
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
static std::string const source
Definition: EdmProvDump.cc:43
void clearAdditionalInfo()
Definition: Exception.cc:223
std::recursive_mutex m_source_mutex
void initialize(std::weak_ptr< RequestManager > selfref)
tuple status
Definition: mps_update.py:57
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)