CMS 3D CMS Logo

XrdRequestManager.cc
Go to the documentation of this file.
1 
2 #include <assert.h>
3 #include <iostream>
4 #include <algorithm>
5 #include <netdb.h>
6 
7 #include "XrdCl/XrdClFile.hh"
8 #include "XrdCl/XrdClDefaultEnv.hh"
9 #include "XrdCl/XrdClFileSystem.hh"
10 
16 
17 #include "XrdStatistics.h"
19 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
20 
21 #define XRD_CL_MAX_CHUNK 512*1024
22 
23 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5
24 
25 #ifdef XRD_FAKE_OPEN_PROBE
26 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100
27 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20
28 // This is the minimal difference in quality required to swap an active and inactive source
29 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0
30 #else
31 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10
32 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2*60
33 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
34 #endif
35 
36 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000
37 
38 
39 #ifdef __MACH__
40 #include <mach/clock.h>
41 #include <mach/mach.h>
42 #define GET_CLOCK_MONOTONIC(ts) \
43 { \
44  clock_serv_t cclock; \
45  mach_timespec_t mts; \
46  host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
47  clock_get_time(cclock, &mts); \
48  mach_port_deallocate(mach_task_self(), cclock); \
49  ts.tv_sec = mts.tv_sec; \
50  ts.tv_nsec = mts.tv_nsec; \
51 }
52 #else
53 #define GET_CLOCK_MONOTONIC(ts) \
54  clock_gettime(CLOCK_MONOTONIC, &ts);
55 #endif
56 
57 using namespace XrdAdaptor;
58 
59 long long timeDiffMS(const timespec &a, const timespec &b)
60 {
61  long long diff = (a.tv_sec - b.tv_sec) * 1000;
62  diff += (a.tv_nsec - b.tv_nsec) / 1e6;
63  return diff;
64 }
65 
66 /*
67  * We do not care about the response of sending the monitoring information;
68  * this handler class simply frees any returned buffer to prevent memory leaks.
69  */
70 class SendMonitoringInfoHandler : boost::noncopyable, public XrdCl::ResponseHandler
71 {
72  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
73  {
74  if (response)
75  {
76  XrdCl::Buffer *buffer = nullptr;
77  response->Get(buffer);
78  response->Set(static_cast<int*>(nullptr));
79  delete buffer;
80  }
81  // Send Info has a response object; we must delete it.
82  delete response;
83  delete status;
84  }
85 };
86 
88 
89 
90 static void
92 {
93  // Do not send this to a dCache data server as they return an error.
94  // In some versions of dCache, sending the monitoring information causes
95  // the server to close the connection - resulting in failures.
96  if (Source::isDCachePool(file)) {return;}
97 
98  // Send the monitoring info, if available.
100  std::string lastUrl;
101  file.GetProperty("LastURL", lastUrl);
102  if (jobId && lastUrl.size())
103  {
104  XrdCl::URL url(lastUrl);
105  XrdCl::FileSystem fs(url);
106  if (!(fs.SendInfo(jobId, &nullHandler, 30).IsOK()))
107  {
108  edm::LogWarning("XrdAdaptorInternal") << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
109  }
110  edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
111  }
112 }
113 
114 
116  : m_serverToAdvertise(nullptr),
117  m_timeout(XRD_DEFAULT_TIMEOUT),
118  m_nextInitialSourceToggle(false),
119  m_name(filename),
120  m_flags(flags),
121  m_perms(perms),
122  m_distribution(0,100),
123  m_excluded_active_count(0)
124 {
125 }
126 
127 
128 void
129 RequestManager::initialize(std::weak_ptr<RequestManager> self)
130 {
132 
133  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
134  if (env) {env->GetInt("StreamErrorWindow", m_timeout);}
135 
136  std::string orig_site;
137  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos))
138  {
139  std::string hostname;
140  if (Source::getHostname(orig_site, hostname))
141  {
142  Source::getDomain(hostname, orig_site);
143  }
144  }
145 
146  std::unique_ptr<XrdCl::File> file;
148  bool validFile = false;
149  const int retries = 5;
150  std::string excludeString;
151  for (int idx=0; idx<retries; idx++)
152  {
153  file.reset(new XrdCl::File());
154  file->SetProperty("ReadRecovery", "false");
155  auto opaque = prepareOpaqueString();
156  std::string new_filename = m_name + (opaque.size() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
157  SyncHostResponseHandler handler;
158  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
159  if (!openStatus.IsOK())
160  { // In this case, we failed immediately - this indicates we have previously tried to talk to this
161  // server and it was marked bad - xrootd couldn't even queue up the request internally!
162  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
163  // redirector being down or authentication failures.
164  ex.clearMessage();
165  ex.clearContext();
166  ex.clearAdditionalInfo();
167  ex << "XrdCl::File::Open(name='" << m_name
168  << "', flags=0x" << std::hex << m_flags
169  << ", permissions=0" << std::oct << m_perms << std::dec
170  << ") => error '" << openStatus.ToStr()
171  << "' (errno=" << openStatus.errNo << ", code=" << openStatus.code << ")";
172  ex.addContext("Calling XrdFile::open()");
173  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
174  throw ex;
175  }
176  handler.WaitForResponse();
177  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
178  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
179  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
180  assert(status);
181  if (status->IsOK())
182  {
183  validFile = true;
184  break;
185  }
186  else
187  {
188  ex.clearMessage();
189  ex.clearContext();
190  ex.clearAdditionalInfo();
191  ex << "XrdCl::File::Open(name='" << m_name
192  << "', flags=0x" << std::hex << m_flags
193  << ", permissions=0" << std::oct << m_perms << std::dec
194  << ") => error '" << status->ToStr()
195  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
196  ex.addContext("Calling XrdFile::open()");
197  addConnections(ex);
198  std::string dataServer, lastUrl;
199  file->GetProperty("DataServer", dataServer);
200  file->GetProperty("LastURL", lastUrl);
201  if (dataServer.size())
202  {
203  ex.addAdditionalInfo("Problematic data server: " + dataServer);
204  }
205  if (lastUrl.size())
206  {
207  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
208  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
209  }
210  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) != m_disabledSourceStrings.end())
211  {
212  ex << ". No additional data servers were found.";
213  throw ex;
214  }
215  if (dataServer.size())
216  {
217  m_disabledSourceStrings.insert(dataServer);
218  m_disabledExcludeStrings.insert(excludeString);
219  }
220  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
221  if (lastUrl == new_filename)
222  {
223  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
224  throw ex;
225  }
226  }
227  }
228  if (!validFile)
229  {
230  throw ex;
231  }
232  SendMonitoringInfo(*file);
233 
234  timespec ts;
236 
237  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
238  {
239  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
240  auto oldList = m_activeSources;
241  m_activeSources.push_back(source);
243  }
246 
247  m_lastSourceCheck = ts;
248  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
250 }
251 
259 void
261 {
262  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
263  // a pending update. *However*, since we call this for every read, we'll get it
264  // eventually.
265  if (likely(!m_serverToAdvertise.load(std::memory_order_relaxed))) {return;}
266  std::string *hostname_ptr;
267  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr)))
268  {
269  std::unique_ptr<std::string> hostname(hostname_ptr);
271  if (statsService.isAvailable()) {
272  statsService->setCurrentServer(*hostname_ptr);
273  }
274  }
275 }
276 
277 
278 void
280 {
281  auto hostname = std::make_unique<std::string>(id);
282  if (Source::getHostname(id, *hostname))
283  {
284  std::string *null_hostname = nullptr;
285  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get()))
286  {
287  hostname.release();
288  }
289  }
290 }
291 
292 namespace {
293  std::string formatSites(std::vector<std::shared_ptr<Source> > const& iSources) {
294  std::string siteA, siteB;
295  if (iSources.size()) {siteA = iSources[0]->Site();}
296  if (iSources.size() == 2) {siteB = iSources[1]->Site();}
297  std::string siteList = siteA;
298  if (siteB.size() && (siteB != siteA)) {siteList = siteA + ", " + siteB;}
299  return siteList;
300  }
301 }
302 
303 void
304 RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source> > const& iOld,
305  std::vector<std::shared_ptr<Source> > const& iNew,
306  std::string orig_site) const
307 {
308  auto siteList = formatSites(iNew);
309  if (orig_site.size() && (orig_site != siteList))
310  {
311  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
312  }
313  else {
314  auto oldSites = formatSites(iOld);
315  if (!orig_site.size() && (siteList != oldSites))
316  {
317  if (oldSites.size() >0 )
318  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
319  }
320  }
321 }
322 
323 
324 void
325 RequestManager::checkSources(timespec &now, IOSize requestSize,
326  std::vector<std::shared_ptr<Source>>& activeSources,
327  std::vector<std::shared_ptr<Source>>& inactiveSources)
328 {
329  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check "
330  << timeDiffMS(now, m_lastSourceCheck) << "; last check "
331  << m_lastSourceCheck.tv_sec << "; now " <<now.tv_sec
332  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
333  if (timeDiffMS(now, m_lastSourceCheck) > 1000)
334  {
335  { // Be more aggressive about getting rid of very bad sources.
336  compareSources(now, 0, 1, activeSources, inactiveSources);
337  compareSources(now, 1, 0, activeSources, inactiveSources);
338  }
339  if (timeDiffMS(now, m_nextActiveSourceCheck) > 0)
340  {
341  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
342  }
343  }
344 }
345 
346 
347 bool
348 RequestManager::compareSources(const timespec &now, unsigned a, unsigned b,
349  std::vector<std::shared_ptr<Source>>& activeSources,
350  std::vector<std::shared_ptr<Source>>& inactiveSources) const
351 {
352  if (activeSources.size() < std::max(a, b)+1) {return false;}
353 
354  bool findNewSource = false;
355  if ((activeSources[a]->getQuality() > 5130) ||
356  ((activeSources[a]->getQuality() > 260) && (activeSources[b]->getQuality()*4 < activeSources[a]->getQuality())))
357  {
358  edm::LogVerbatim("XrdAdaptorInternal") << "Removing "
359  << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
360  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
361  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource = true;}
362  activeSources[a]->setLastDowngrade(now);
363  inactiveSources.emplace_back(activeSources[a]);
364  auto oldSources = activeSources;
365  activeSources.erase(activeSources.begin()+a);
366  reportSiteChange(oldSources,activeSources);
367  }
368  return findNewSource;
369 }
370 
371 void
373  IOSize requestSize,
374  std::vector<std::shared_ptr<Source>>& activeSources,
375  std::vector<std::shared_ptr<Source>>& inactiveSources)
376 {
377 
378  bool findNewSource = false;
379  if (activeSources.size() <= 1)
380  {
381  findNewSource = true;
382  }
383  else if (activeSources.size() > 1)
384  {
385  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality() << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
386  findNewSource |= compareSources(now, 0, 1, activeSources,inactiveSources);
387  findNewSource |= compareSources(now, 1, 0,activeSources, inactiveSources);
388 
389  // NOTE: We could probably replace the copy with a better sort function.
390  // However, there are typically very few sources and the correctness is more obvious right now.
391  std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(inactiveSources.size());
392  for (const auto & source : inactiveSources)
393  {
394  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY-1)*1000) {eligibleInactiveSources.push_back(source);}
395  }
396  auto bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
397  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
398  auto worstActiveSource = std::max_element(activeSources.cbegin(), activeSources.cend(),
399  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
400  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
401  {
402  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " <<(*bestInactiveSource)->PrettyID()
403  << ", quality " << (*bestInactiveSource)->getQuality();
404  }
405  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " <<(*worstActiveSource)->PrettyID()
406  << ", quality " << (*worstActiveSource)->getQuality();
407  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
408  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
409  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*activeSources[0]->getQuality()))
410  {
411  auto oldSources = activeSources;
412  activeSources.push_back(*bestInactiveSource);
413  reportSiteChange(oldSources, activeSources);
414  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {inactiveSources.erase(it); break;}
415  }
416  else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
417  {
418  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << (*worstActiveSource)->PrettyID()
419  << " from active sources due to quality (" << (*worstActiveSource)->getQuality()
420  << ") and promoting " << (*bestInactiveSource)->PrettyID() << " (quality: "
421  << (*bestInactiveSource)->getQuality() << ")" << std::endl;
422  (*worstActiveSource)->setLastDowngrade(now);
423  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {inactiveSources.erase(it); break;}
424  inactiveSources.emplace_back(std::move(*worstActiveSource));
425  auto oldSources = activeSources;
426  activeSources.erase(worstActiveSource);
427  activeSources.emplace_back(std::move(*bestInactiveSource));
428  reportSiteChange(oldSources, activeSources);
429  eligibleInactiveSources.clear();
430  for (const auto & source : inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source);
431  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
432  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
433  worstActiveSource = std::max_element(activeSources.begin(), activeSources.end(),
434  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
435  }
436  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000*XRD_ADAPTOR_LONG_OPEN_DELAY))
437  {
438  float r = m_distribution(m_generator);
440  {
441  findNewSource = true;
442  }
443  }
444  }
445  if (findNewSource)
446  {
447  m_open_handler->open();
449  }
450 
451  // Only aggressively look for new sources if we don't have two.
452  if (activeSources.size() == 2)
453  {
455  }
456  else
457  {
458  now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
459  }
461 }
462 
463 std::shared_ptr<XrdCl::File>
465 {
466  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
467  if (m_activeSources.empty())
468  {
470  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name
471  << "', flags=0x" << std::hex << m_flags
472  << ", permissions=0" << std::oct << m_perms << std::dec
473  << ") => Source used after fatal exception.";
474  ex.addContext("In XrdAdaptor::RequestManager::handle()");
475  addConnections(ex);
476  throw ex;
477  }
478  return m_activeSources[0]->getFileHandle();
479 }
480 
481 void
482 RequestManager::getActiveSourceNames(std::vector<std::string> & sources) const
483 {
484  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
485  sources.reserve(m_activeSources.size());
486  for (auto const& source : m_activeSources) {
487  sources.push_back(source->ID());
488  }
489 }
490 
491 void
492 RequestManager::getPrettyActiveSourceNames(std::vector<std::string> & sources) const
493 {
494  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
495  sources.reserve(m_activeSources.size());
496  for (auto const& source : m_activeSources) {
497  sources.push_back(source->PrettyID());
498  }
499 }
500 
501 void
502 RequestManager::getDisabledSourceNames(std::vector<std::string> & sources) const
503 {
504  sources.reserve(m_disabledSourceStrings.size());
505  for (auto const& source : m_disabledSourceStrings) {
506  sources.push_back(source);
507  }
508 }
509 
510 void
512 {
513  std::vector<std::string> sources;
515  for (auto const& source : sources)
516  {
517  ex.addAdditionalInfo("Active source: " + source);
518  }
519  sources.clear();
520  getDisabledSourceNames(sources);
521  for (auto const& source : sources)
522  {
523  ex.addAdditionalInfo("Disabled source: " + source);
524  }
525 }
526 
527 std::shared_ptr<Source>
529 {
530  std::shared_ptr<Source> source = nullptr;
531  {
532  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
533  if (m_activeSources.size() == 2)
534  {
536  {
537  source = m_activeSources[0];
539  }
540  else
541  {
542  source = m_activeSources[1];
544  }
545  }
546  else if (m_activeSources.empty())
547  {
549  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name
550  << "', flags=0x" << std::hex << m_flags
551  << ", permissions=0" << std::oct << m_perms << std::dec
552  << ") => Source used after fatal exception.";
553  ex.addContext("In XrdAdaptor::RequestManager::handle()");
554  addConnections(ex);
555  throw ex;
556  }
557  else
558  {
559  source = m_activeSources[0];
560  }
561  }
562  return source;
563 }
564 
565 std::future<IOSize>
566 RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr)
567 {
568  assert(c_ptr.get());
569  timespec now;
570  GET_CLOCK_MONOTONIC(now);
571  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
572  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
573  {
574  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
575  activeSources = m_activeSources;
576  inactiveSources = m_inactiveSources;
577  }
578  {
579  //make sure we update values before calling pickSingelSource
580  std::shared_ptr<void*> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
581  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
582  m_activeSources = std::move(activeSources);
583  m_inactiveSources = std::move(inactiveSources);
584  });
585 
586  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
587  }
588 
589  std::shared_ptr<Source> source = pickSingleSource();
590  source->handle(c_ptr);
591  return c_ptr->get_future();
592 }
593 
596 {
597  std::stringstream ss;
598  ss << "tried=";
599  size_t count = 0;
600  {
601  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
602 
603  for ( const auto & it : m_activeSources )
604  {
605  count++;
606  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
607  }
608  for ( const auto & it : m_inactiveSources )
609  {
610  count++;
611  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
612  }
613  }
614  for ( const auto & it : m_disabledExcludeStrings )
615  {
616  count++;
617  ss << it.substr(0, it.find(":")) << ",";
618  }
619  if (count)
620  {
621  std::string tmp_str = ss.str();
622  return tmp_str.substr(0, tmp_str.size()-1);
623  }
624  return "";
625 }
626 
627 void
628 XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source)
629 {
630  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
631  if (status.IsOK())
632  {
633  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
634  for (const auto & s : m_activeSources)
635  {
636  if (source->ID() == s->ID())
637  {
638  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded source " << source->PrettyID()
639  << "; ignoring" << std::endl;
640  unsigned returned_count = ++m_excluded_active_count;
643  return;
644  }
645  }
646  for (const auto & s : m_inactiveSources)
647  {
648  if (source->ID() == s->ID())
649  {
650  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded inactive source " << source->PrettyID()
651  << "; ignoring" << std::endl;
653  return;
654  }
655  }
656  if (m_activeSources.size() < 2)
657  {
658  auto oldSources = m_activeSources;
659  m_activeSources.push_back(source);
660  reportSiteChange(oldSources, m_activeSources);
661  queueUpdateCurrentServer(source->ID());
662  }
663  else
664  {
665  m_inactiveSources.push_back(source);
666  }
667  }
668  else
669  { // File-open failure - wait at least 120s before next attempt.
670  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
672  }
673 }
674 
675 std::future<IOSize>
676 XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer> > iolist)
677 {
678  //Use a copy of m_activeSources and m_inactiveSources throughout this function
679  // in order to avoid holding the lock a long time and causing a deadlock.
680  // When the function is over we will update the values of the containers
681  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
682  {
683  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
684  activeSources = m_activeSources;
685  inactiveSources = m_inactiveSources;
686  }
687  //Make sure we update changes when we leave the function
688  std::shared_ptr<void*> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
689  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
690  m_activeSources = std::move(activeSources);
691  m_inactiveSources = std::move(inactiveSources);
692  });
693 
695 
696  timespec now;
697  GET_CLOCK_MONOTONIC(now);
698 
699  edm::CPUTimer timer;
700  timer.start();
701 
702  if (activeSources.size() == 1)
703  {
704  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
705  checkSources(now, c_ptr->getSize(), activeSources,inactiveSources);
706  activeSources[0]->handle(c_ptr);
707  return c_ptr->get_future();
708  }
709  // Make sure active
710  else if (activeSources.empty())
711  {
713  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name
714  << "', flags=0x" << std::hex << m_flags
715  << ", permissions=0" << std::oct << m_perms << std::dec
716  << ") => Source used after fatal exception.";
717  ex.addContext("In XrdAdaptor::RequestManager::handle()");
718  addConnections(ex);
719  throw ex;
720  }
721 
722  assert(iolist.get());
723  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
724  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
725  splitClientRequest(*iolist, *req1, *req2, activeSources);
726 
727  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
728  // CheckSources may have removed a source
729  if (activeSources.size() == 1)
730  {
731  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
732  activeSources[0]->handle(c_ptr);
733  return c_ptr->get_future();
734  }
735 
736  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
737  std::future<IOSize> future1, future2;
738  if (req1->size())
739  {
740  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
741  activeSources[0]->handle(c_ptr1);
742  future1 = c_ptr1->get_future();
743  }
744  if (req2->size())
745  {
746  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
747  activeSources[1]->handle(c_ptr2);
748  future2 = c_ptr2->get_future();
749  }
750  if (req1->size() && req2->size())
751  {
752  std::future<IOSize> task = std::async(std::launch::deferred,
753  [](std::future<IOSize> a, std::future<IOSize> b){
754  // Wait until *both* results are available. This is essential
755  // as the callback may try referencing the RequestManager. If one
756  // throws an exception (causing the RequestManager to be destroyed by
757  // XrdFile) and the other has a failure, then the recovery code will
758  // reference the destroyed RequestManager.
759  //
760  // Unlike other places where we use shared/weak ptrs to maintain object
761  // lifetime and destruction asynchronously, we *cannot* destroy the request
762  // asynchronously as it is associated with a ROOT buffer. We must wait until we
763  // are guaranteed that XrdCl will not write into the ROOT buffer before we
764  // can return.
765  b.wait(); a.wait();
766  return b.get() + a.get();
767  },
768  std::move(future1),
769  std::move(future2));
770  timer.stop();
771  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
772  return task;
773  }
774  else if (req1->size()) { return future1; }
775  else if (req2->size()) { return future2; }
776  else
777  { // Degenerate case - no bytes to read.
778  std::promise<IOSize> p; p.set_value(0);
779  return p.get_future();
780  }
781 }
782 
783 void
784 RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status)
785 {
786  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
787 
788  // Fail early for invalid responses - XrdFile has a separate path for handling this.
789  if (c_status.code == XrdCl::errInvalidResponse)
790  {
791  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
793  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name
794  << "', flags=0x" << std::hex << m_flags
795  << ", permissions=0" << std::oct << m_perms << std::dec
796  << ", old source=" << source_ptr->PrettyID()
797  << ") => Invalid ReadV response from server";
798  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
799  addConnections(ex);
800  throw ex;
801  }
802  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
803 
804  // Note that we do not delete the Source itself. That is because this
805  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
806  // In such a case, if you close a file in the handler, it will deadlock
807  m_disabledSourceStrings.insert(source_ptr->ID());
808  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
809  m_disabledSources.insert(source_ptr);
810 
811  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
812  if ((m_activeSources.size() > 0) && (m_activeSources[0].get() == source_ptr.get()))
813  {
814  auto oldSources = m_activeSources;
815  m_activeSources.erase(m_activeSources.begin());
816  reportSiteChange(oldSources, m_activeSources);
817  }
818  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
819  {
820  auto oldSources = m_activeSources;
821  m_activeSources.erase(m_activeSources.begin()+1);
822  reportSiteChange(oldSources, m_activeSources);
823  }
824  std::shared_ptr<Source> new_source;
825  if (m_activeSources.size() == 0)
826  {
827  std::shared_future<std::shared_ptr<Source> > future = m_open_handler->open();
828  timespec now;
829  GET_CLOCK_MONOTONIC(now);
831  // Note we only wait for 180 seconds here. This is because we've already failed
832  // once and the likelihood the program has some inconsistent state is decent.
833  // We'd much rather fail hard than deadlock!
834  sentry.unlock();
835  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout+10));
836  if (status == std::future_status::timeout)
837  {
839  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
840  << "', flags=0x" << std::hex << m_flags
841  << ", permissions=0" << std::oct << m_perms << std::dec
842  << ", old source=" << source_ptr->PrettyID()
843  << ") => timeout when waiting for file open";
844  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
845  addConnections(ex);
846  throw ex;
847  }
848  else
849  {
850  try
851  {
852  new_source = future.get();
853  }
854  catch (edm::Exception &ex)
855  {
856  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
857  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
858  throw;
859  }
860  }
861 
862  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) != m_disabledSourceStrings.end())
863  {
864  // The server gave us back a data node we requested excluded. Fatal!
866  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
867  << "', flags=0x" << std::hex << m_flags
868  << ", permissions=0" << std::oct << m_perms << std::dec
869  << ", old source=" << source_ptr->PrettyID()
870  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
871  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
872  addConnections(ex);
873  throw ex;
874  }
875  sentry.lock();
876 
877  auto oldSources = m_activeSources;
878  m_activeSources.push_back(new_source);
879  reportSiteChange(oldSources,m_activeSources);
880  }
881  else
882  {
883  new_source = m_activeSources[0];
884  }
885  new_source->handle(c_ptr);
886 }
887 
888 static void
889 consumeChunkFront(size_t &front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
890 {
891  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD))
892  {
893  IOPosBuffer &io = input[front];
894  IOPosBuffer &outio = output.back();
895  if (io.size() > chunksize)
896  {
897  IOSize consumed;
898  if (output.size() && (outio.size() < XRD_CL_MAX_CHUNK) && (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset()))
899  {
900  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK)
901  {
902  consumed = (XRD_CL_MAX_CHUNK - outio.size());
903  outio.set_size(XRD_CL_MAX_CHUNK);
904  }
905  else
906  {
907  consumed = chunksize;
908  outio.set_size(outio.size() + consumed);
909  }
910  }
911  else
912  {
913  consumed = chunksize;
914  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
915  }
916  chunksize -= consumed;
917  IOSize newsize = io.size() - consumed;
918  IOOffset newoffset = io.offset() + consumed;
919  void* newdata = static_cast<char*>(io.data()) + consumed;
920  io.set_offset(newoffset);
921  io.set_data(newdata);
922  io.set_size(newsize);
923  }
924  else if (io.size() == 0)
925  {
926  front++;
927  }
928  else
929  {
930  output.push_back(io);
931  chunksize -= io.size();
932  front++;
933  }
934  }
935 }
936 
937 static void
938 consumeChunkBack(size_t front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
939 {
940  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD))
941  {
942  IOPosBuffer &io = input.back();
943  IOPosBuffer &outio = output.back();
944  if (io.size() > chunksize)
945  {
946  IOSize consumed;
947  if (output.size() && (outio.size() < XRD_CL_MAX_CHUNK) && (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset()))
948  {
949  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK)
950  {
951  consumed = (XRD_CL_MAX_CHUNK - outio.size());
952  outio.set_size(XRD_CL_MAX_CHUNK);
953  }
954  else
955  {
956  consumed = chunksize;
957  outio.set_size(outio.size() + consumed);
958  }
959  }
960  else
961  {
962  consumed = chunksize;
963  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
964  }
965  chunksize -= consumed;
966  IOSize newsize = io.size() - consumed;
967  IOOffset newoffset = io.offset() + consumed;
968  void* newdata = static_cast<char*>(io.data()) + consumed;
969  io.set_offset(newoffset);
970  io.set_data(newdata);
971  io.set_size(newsize);
972  }
973  else if (io.size() == 0)
974  {
975  input.pop_back();
976  }
977  else
978  {
979  output.push_back(io);
980  chunksize -= io.size();
981  input.pop_back();
982  }
983  }
984 }
985 
986 static IOSize validateList(const std::vector<IOPosBuffer> req)
987 {
988  IOSize total = 0;
989  off_t last_offset = -1;
990  for (const auto & it : req)
991  {
992  total += it.size();
993  assert(it.offset() > last_offset);
994  last_offset = it.offset();
995  assert(it.size() <= XRD_CL_MAX_CHUNK);
996  assert(it.offset() < 0x1ffffffffff);
997  }
998  assert(req.size() <= 1024);
999  return total;
1000 }
1001 
1002 void
1003 XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist, std::vector<IOPosBuffer> &req1, std::vector<IOPosBuffer> &req2, std::vector<std::shared_ptr<Source>> const& activeSources) const
1004 {
1005  if (iolist.size() == 0) return;
1006  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
1007  req1.reserve(iolist.size()/2+1);
1008  req2.reserve(iolist.size()/2+1);
1009  size_t front=0;
1010 
1011  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
1012  float q1 = static_cast<float>(activeSources[0]->getQuality())+5;
1013  float q2 = static_cast<float>(activeSources[1]->getQuality())+5;
1014  IOSize chunk1, chunk2;
1015  // Make sure the chunk size is at least 1024; little point to reads less than that size.
1016  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
1017  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
1018 
1019  IOSize size_orig = 0;
1020  for (const auto & it : iolist) size_orig += it.size();
1021 
1022  while (tmp_iolist.size()-front > 0)
1023  {
1024  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) && (req2.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD))
1025  { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
1026  // are passed to the request manager. However, because we have a max chunk size, we increase
1027  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
1028  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
1029  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
1031  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name
1032  << "', flags=0x" << std::hex << m_flags
1033  << ", permissions=0" << std::oct << m_perms << std::dec
1034  << ") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
1035  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
1036  addConnections(ex);
1037  std::stringstream ss; ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
1038  ex.addAdditionalInfo(ss.str());
1039  std::stringstream ss2; ss2 << "Quality source 1 " << q1-5 << ", quality source 2: " << q2-5;
1040  ex.addAdditionalInfo(ss2.str());
1041  throw ex;
1042  }
1043  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
1044  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
1045  }
1046  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
1047  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
1048 
1049  IOSize size1 = validateList(req1);
1050  IOSize size2 = validateList(req2);
1051 
1052  assert(size_orig == size1 + size2);
1053 
1054  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig << " bytes) split into requests size " << req1.size() << " (" << size1 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
1055 }
1056 
1057 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager)
1058  : m_manager(manager)
1059 {
1060 }
1061 
1062 
1063  // Cannot use ~OpenHandler=default as XrdCl::File is not fully
1064  // defined in the header.
1066 {
1067 }
1068 
1069 
1070 void
1071 XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status_ptr, XrdCl::AnyObject *, XrdCl::HostList *hostList_ptr)
1072 {
1073  // NOTE: as in XrdCl::File (synchronous), we ignore the response object.
1074  // Make sure that we set m_outstanding_open to false on exit from this function.
1075  // NOTE: we need to pass non-nullptr to unique_ptr in order for the guard to run
1076  std::unique_ptr<OpenHandler, std::function<void(OpenHandler*)>> outstanding_guard(this, [&](OpenHandler*){m_outstanding_open=false;});
1077 
1078  std::shared_ptr<Source> source;
1079  std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
1080  std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1081 
1082  // Make sure we get rid of the strong self-reference when the callback finishes.
1083  std::shared_ptr<OpenHandler> self = m_self;
1084  m_self.reset();
1085 
1086  auto manager = m_manager.lock();
1087  // Manager object has already been deleted. Cleanup the
1088  // response objects, remove our self-reference, and ignore the response.
1089  if (!manager)
1090  {
1091  return;
1092  }
1093  //if we need to delete the File object we must do it outside
1094  // of the lock to avoid a potential deadlock
1095  std::unique_ptr<XrdCl::File> releaseFile;
1096  {
1097  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1098 
1099  if (status->IsOK())
1100  {
1102  timespec now;
1103  GET_CLOCK_MONOTONIC(now);
1104 
1105  std::string excludeString;
1106  Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1107 
1108  source.reset(new Source(now, std::move(m_file), excludeString));
1109  m_promise.set_value(source);
1110  }
1111  else
1112  {
1113  releaseFile = std::move(m_file);
1115  ex << "XrdCl::File::Open(name='" << manager->m_name
1116  << "', flags=0x" << std::hex << manager->m_flags
1117  << ", permissions=0" << std::oct << manager->m_perms << std::dec
1118  << ") => error '" << status->ToStr()
1119  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1120  ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1121  manager->addConnections(ex);
1122 
1123  m_promise.set_exception(std::make_exception_ptr(ex));
1124  }
1125  }
1126  manager->handleOpen(*status, source);
1127 }
1128 
1131 {
1132  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1133 
1134  if (!m_file.get())
1135  {
1136  return "(no open in progress)";
1137  }
1138  std::string dataServer;
1139  m_file->GetProperty("DataServer", dataServer);
1140  if (!dataServer.size()) { return "(unknown source)"; }
1141  return dataServer;
1142 }
1143 
1144 std::shared_future<std::shared_ptr<Source> >
1146 {
1147  auto manager_ptr = m_manager.lock();
1148  if (!manager_ptr)
1149  {
1151  ex << "XrdCl::File::Open() =>"
1152  << " error: OpenHandler called within an invalid RequestManager context."
1153  << " This is a logic error and should be reported to the CMSSW developers.";
1154  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1155  throw ex;
1156  }
1157  RequestManager &manager = *manager_ptr;
1158  auto self_ptr = m_self_weak.lock();
1159  if (!self_ptr)
1160  {
1162  ex << "XrdCl::File::Open() => error: "
1163  << "OpenHandler called after it was deleted. This is a logic error "
1164  << "and should be reported to the CMSSW developers.";
1165  ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1166  throw ex;
1167  }
1168 
1169  // NOTE NOTE: we look at this variable *without* the lock. This means the method
1170  // is not thread-safe; the caller is responsible to verify it is not called from
1171  // multiple threads simultaneously.
1172  //
1173  // This is done because ::open may be called from a Xrootd callback; if we
1174  // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1175  // and make a call into xrootd (when it invokes m_file.reset()). Hence, our callback
1176  // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1177  // an Xrootd mutex and tries to hold m_mutex. This is a classic deadlock.
1178  if (m_outstanding_open)
1179  {
1180  return m_shared_future;
1181  }
1182  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1183  std::promise<std::shared_ptr<Source> > new_promise;
1184  m_promise.swap(new_promise);
1185  m_shared_future = m_promise.get_future().share();
1186 
1187  auto opaque = manager.prepareOpaqueString();
1188  std::string new_name = manager.m_name + ((manager.m_name.find("?") == manager.m_name.npos) ? "?" : "&") + opaque;
1189  edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1190  m_file.reset(new XrdCl::File());
1191  m_file->SetProperty("ReadRecovery", "false");
1192  m_outstanding_open = true;
1193 
1194  // Always make sure we release m_file and set m_outstanding_open to false on error.
1195  std::unique_ptr<OpenHandler, std::function<void(OpenHandler*)>> exit_guard(this, [&](OpenHandler*){m_outstanding_open = false; m_file.reset();});
1196 
1197  XrdCl::XRootDStatus status;
1198  if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK())
1199  {
1201  ex << "XrdCl::File::Open(name='" << new_name
1202  << "', flags=0x" << std::hex << manager.m_flags
1203  << ", permissions=0" << std::oct << manager.m_perms << std::dec
1204  << ") => error '" << status.ToStr()
1205  << "' (errno=" << status.errNo << ", code=" << status.code << ")";
1206  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1207  manager.addConnections(ex);
1208  throw ex;
1209  }
1210  exit_guard.release();
1211  // Have a strong self-reference for as long as the callback is in-progress.
1212  m_self = self_ptr;
1213  return m_shared_future;
1214 }
1215 
void setCurrentServer(const std::string &servername)
std::shared_future< std::shared_ptr< Source > > open()
double seconds()
void start()
Definition: CPUTimer.cc:74
std::shared_ptr< XrdCl::File > getActiveFile() const
void getDisabledSourceNames(std::vector< std::string > &sources) const
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:307
std::string prepareOpaqueString() const
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
#define XRD_CL_MAX_CHUNK
OpenHandler(std::weak_ptr< RequestManager > manager)
std::shared_ptr< OpenHandler > m_self
#define nullptr
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
int timeout
Definition: mps_check.py:51
double q2[4]
Definition: TauolaWrapper.h:88
std::weak_ptr< OpenHandler > m_self_weak
SendMonitoringInfoHandler nullHandler
static std::string const input
Definition: EdmProvDump.cc:44
#define XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static void SendMonitoringInfo(XrdCl::File &file)
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
#define likely(x)
std::shared_future< std::shared_ptr< Source > > m_shared_future
void queueUpdateCurrentServer(const std::string &)
std::weak_ptr< RequestManager > m_manager
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:255
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:266
std::atomic< std::string * > m_serverToAdvertise
bool isAvailable() const
Definition: Service.h:46
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
Times stop()
Definition: CPUTimer.cc:94
void clearMessage()
Definition: Exception.cc:215
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void * data(void) const
Definition: IOPosBuffer.h:59
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void clearContext()
Definition: Exception.cc:219
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void getActiveSourceNames(std::vector< std::string > &sources) const
IOSize size(void) const
Definition: IOPosBuffer.h:64
XrdCl::OpenFlags::Flags m_flags
virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
double q1[4]
Definition: TauolaWrapper.h:87
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::atomic< unsigned > m_excluded_active_count
double a
Definition: hdecay.h:121
std::shared_ptr< Source > pickSingleSource()
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:344
static IOSize validateList(const std::vector< IOPosBuffer > req)
std::unique_ptr< XrdCl::File > m_file
size_t IOSize
Definition: IOTypes.h:14
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
static std::string const source
Definition: EdmProvDump.cc:43
void clearAdditionalInfo()
Definition: Exception.cc:223
def move(src, dest)
Definition: eostools.py:510
std::recursive_mutex m_source_mutex
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)