CMS 3D CMS Logo

XrdRequestManager.cc
Go to the documentation of this file.
1 
2 #include <cassert>
3 #include <iostream>
4 #include <algorithm>
5 #include <netdb.h>
6 
7 #include "XrdCl/XrdClFile.hh"
8 #include "XrdCl/XrdClDefaultEnv.hh"
9 #include "XrdCl/XrdClFileSystem.hh"
10 
18 
19 #include "XrdStatistics.h"
21 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
22 
23 #define XRD_CL_MAX_CHUNK 512*1024
24 
25 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5
26 
27 #ifdef XRD_FAKE_OPEN_PROBE
28 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100
29 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20
30 // This is the minimal difference in quality required to swap an active and inactive source
31 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0
32 #else
33 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10
34 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2*60
35 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
36 #endif
37 
38 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000
39 
40 
41 #ifdef __MACH__
42 #include <mach/clock.h>
43 #include <mach/mach.h>
44 #define GET_CLOCK_MONOTONIC(ts) \
45 { \
46  clock_serv_t cclock; \
47  mach_timespec_t mts; \
48  host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
49  clock_get_time(cclock, &mts); \
50  mach_port_deallocate(mach_task_self(), cclock); \
51  ts.tv_sec = mts.tv_sec; \
52  ts.tv_nsec = mts.tv_nsec; \
53 }
54 #else
55 #define GET_CLOCK_MONOTONIC(ts) \
56  clock_gettime(CLOCK_MONOTONIC, &ts);
57 #endif
58 
59 using namespace XrdAdaptor;
60 
61 long long timeDiffMS(const timespec &a, const timespec &b)
62 {
63  long long diff = (a.tv_sec - b.tv_sec) * 1000;
64  diff += (a.tv_nsec - b.tv_nsec) / 1e6;
65  return diff;
66 }
67 
68 /*
69  * We do not care about the response of sending the monitoring information;
70  * this handler class simply frees any returned buffer to prevent memory leaks.
71  */
72 class SendMonitoringInfoHandler : boost::noncopyable, public XrdCl::ResponseHandler
73 {
74  void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
75  {
76  if (response)
77  {
78  XrdCl::Buffer *buffer = nullptr;
79  response->Get(buffer);
80  response->Set(static_cast<int*>(nullptr));
81  delete buffer;
82  }
83  // Send Info has a response object; we must delete it.
84  delete response;
85  delete status;
86  }
87 };
88 
90 
91 
92 static void
94 {
95  // Do not send this to a dCache data server as they return an error.
96  // In some versions of dCache, sending the monitoring information causes
97  // the server to close the connection - resulting in failures.
98  if (Source::isDCachePool(file)) {return;}
99 
100  // Send the monitoring info, if available.
102  std::string lastUrl;
103  file.GetProperty("LastURL", lastUrl);
104  if (jobId && !lastUrl.empty())
105  {
106  XrdCl::URL url(lastUrl);
107  XrdCl::FileSystem fs(url);
108  if (!(fs.SendInfo(jobId, &nullHandler, 30).IsOK()))
109  {
110  edm::LogWarning("XrdAdaptorInternal") << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
111  }
112  edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
113  }
114 }
115 
116 
118  : m_serverToAdvertise(nullptr),
119  m_timeout(XRD_DEFAULT_TIMEOUT),
120  m_nextInitialSourceToggle(false),
121  m_name(filename),
122  m_flags(flags),
123  m_perms(perms),
124  m_distribution(0,100),
125  m_excluded_active_count(0)
126 {
127 }
128 
129 
130 void
131 RequestManager::initialize(std::weak_ptr<RequestManager> self)
132 {
134 
135  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
136  if (env) {env->GetInt("StreamErrorWindow", m_timeout);}
137 
138  std::string orig_site;
139  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos))
140  {
141  std::string hostname;
142  if (Source::getHostname(orig_site, hostname))
143  {
144  Source::getDomain(hostname, orig_site);
145  }
146  }
147 
148  std::unique_ptr<XrdCl::File> file;
150  bool validFile = false;
151  const int retries = 5;
152  std::string excludeString;
153  for (int idx=0; idx<retries; idx++)
154  {
155  file.reset(new XrdCl::File());
156  auto opaque = prepareOpaqueString();
157  std::string new_filename = m_name + (!opaque.empty() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
158  SyncHostResponseHandler handler;
159  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
160  if (!openStatus.IsOK())
161  { // In this case, we failed immediately - this indicates we have previously tried to talk to this
162  // server and it was marked bad - xrootd couldn't even queue up the request internally!
163  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
164  // redirector being down or authentication failures.
165  ex.clearMessage();
166  ex.clearContext();
167  ex.clearAdditionalInfo();
168  ex << "XrdCl::File::Open(name='" << m_name
169  << "', flags=0x" << std::hex << m_flags
170  << ", permissions=0" << std::oct << m_perms << std::dec
171  << ") => error '" << openStatus.ToStr()
172  << "' (errno=" << openStatus.errNo << ", code=" << openStatus.code << ")";
173  ex.addContext("Calling XrdFile::open()");
174  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
175  throw ex;
176  }
177  handler.WaitForResponse();
178  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
179  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
180  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
181  assert(status);
182  if (status->IsOK())
183  {
184  validFile = true;
185  break;
186  }
187  else
188  {
189  ex.clearMessage();
190  ex.clearContext();
191  ex.clearAdditionalInfo();
192  ex << "XrdCl::File::Open(name='" << m_name
193  << "', flags=0x" << std::hex << m_flags
194  << ", permissions=0" << std::oct << m_perms << std::dec
195  << ") => error '" << status->ToStr()
196  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
197  ex.addContext("Calling XrdFile::open()");
198  addConnections(ex);
199  std::string dataServer, lastUrl;
200  file->GetProperty("DataServer", dataServer);
201  file->GetProperty("LastURL", lastUrl);
202  if (!dataServer.empty())
203  {
204  ex.addAdditionalInfo("Problematic data server: " + dataServer);
205  }
206  if (!lastUrl.empty())
207  {
208  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
209  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
210  }
211  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) != m_disabledSourceStrings.end())
212  {
213  ex << ". No additional data servers were found.";
214  throw ex;
215  }
216  if (!dataServer.empty())
217  {
218  m_disabledSourceStrings.insert(dataServer);
219  m_disabledExcludeStrings.insert(excludeString);
220  }
221  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
222  if (lastUrl == new_filename)
223  {
224  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
225  throw ex;
226  }
227  }
228  }
229  if (!validFile)
230  {
231  throw ex;
232  }
233  SendMonitoringInfo(*file);
234 
235  timespec ts;
237 
238  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
239  {
240  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
241  auto oldList = m_activeSources;
242  m_activeSources.push_back(source);
244  }
247 
248  m_lastSourceCheck = ts;
249  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
251 }
252 
260 void
262 {
263  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
264  // a pending update. *However*, since we call this for every read, we'll get it
265  // eventually.
266  if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {return;}
267  std::string *hostname_ptr;
268  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr)))
269  {
270  std::unique_ptr<std::string> hostname(hostname_ptr);
272  if (statsService.isAvailable()) {
273  statsService->setCurrentServer(*hostname_ptr);
274  }
275  }
276 }
277 
278 
279 void
281 {
282  auto hostname = std::make_unique<std::string>(id);
283  if (Source::getHostname(id, *hostname))
284  {
285  std::string *null_hostname = nullptr;
286  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get()))
287  {
288  hostname.release();
289  }
290  }
291 }
292 
293 namespace {
294  std::string formatSites(std::vector<std::shared_ptr<Source> > const& iSources) {
295  std::string siteA, siteB;
296  if (!iSources.empty()) {siteA = iSources[0]->Site();}
297  if (iSources.size() == 2) {siteB = iSources[1]->Site();}
298  std::string siteList = siteA;
299  if (!siteB.empty() && (siteB != siteA)) {siteList = siteA + ", " + siteB;}
300  return siteList;
301  }
302 }
303 
304 void
305 RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source> > const& iOld,
306  std::vector<std::shared_ptr<Source> > const& iNew,
307  std::string orig_site) const
308 {
309  auto siteList = formatSites(iNew);
310  if (!orig_site.empty() && (orig_site != siteList))
311  {
312  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
313  }
314  else {
315  auto oldSites = formatSites(iOld);
316  if (orig_site.empty() && (siteList != oldSites))
317  {
318  if (!oldSites.empty() )
319  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
320  }
321  }
322 }
323 
324 
325 void
326 RequestManager::checkSources(timespec &now, IOSize requestSize,
327  std::vector<std::shared_ptr<Source>>& activeSources,
328  std::vector<std::shared_ptr<Source>>& inactiveSources)
329 {
330  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check "
331  << timeDiffMS(now, m_lastSourceCheck) << "; last check "
332  << m_lastSourceCheck.tv_sec << "; now " <<now.tv_sec
333  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
334  if (timeDiffMS(now, m_lastSourceCheck) > 1000)
335  {
336  { // Be more aggressive about getting rid of very bad sources.
337  compareSources(now, 0, 1, activeSources, inactiveSources);
338  compareSources(now, 1, 0, activeSources, inactiveSources);
339  }
340  if (timeDiffMS(now, m_nextActiveSourceCheck) > 0)
341  {
342  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
343  }
344  }
345 }
346 
347 
348 bool
349 RequestManager::compareSources(const timespec &now, unsigned a, unsigned b,
350  std::vector<std::shared_ptr<Source>>& activeSources,
351  std::vector<std::shared_ptr<Source>>& inactiveSources) const
352 {
353  if (activeSources.size() < std::max(a, b)+1) {return false;}
354 
355  bool findNewSource = false;
356  if ((activeSources[a]->getQuality() > 5130) ||
357  ((activeSources[a]->getQuality() > 260) && (activeSources[b]->getQuality()*4 < activeSources[a]->getQuality())))
358  {
359  edm::LogVerbatim("XrdAdaptorInternal") << "Removing "
360  << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
361  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
362  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource = true;}
363  activeSources[a]->setLastDowngrade(now);
364  inactiveSources.emplace_back(activeSources[a]);
365  auto oldSources = activeSources;
366  activeSources.erase(activeSources.begin()+a);
367  reportSiteChange(oldSources,activeSources);
368  }
369  return findNewSource;
370 }
371 
372 void
374  IOSize requestSize,
375  std::vector<std::shared_ptr<Source>>& activeSources,
376  std::vector<std::shared_ptr<Source>>& inactiveSources)
377 {
378 
379  bool findNewSource = false;
380  if (activeSources.size() <= 1)
381  {
382  findNewSource = true;
383  }
384  else if (activeSources.size() > 1)
385  {
386  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality() << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
387  findNewSource |= compareSources(now, 0, 1, activeSources,inactiveSources);
388  findNewSource |= compareSources(now, 1, 0,activeSources, inactiveSources);
389 
390  // NOTE: We could probably replace the copy with a better sort function.
391  // However, there are typically very few sources and the correctness is more obvious right now.
392  std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(inactiveSources.size());
393  for (const auto & source : inactiveSources)
394  {
395  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY-1)*1000) {eligibleInactiveSources.push_back(source);}
396  }
397  auto bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
398  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
399  auto worstActiveSource = std::max_element(activeSources.cbegin(), activeSources.cend(),
400  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
401  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
402  {
403  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " <<(*bestInactiveSource)->PrettyID()
404  << ", quality " << (*bestInactiveSource)->getQuality();
405  }
406  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " <<(*worstActiveSource)->PrettyID()
407  << ", quality " << (*worstActiveSource)->getQuality();
408  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
409  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
410  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*activeSources[0]->getQuality()))
411  {
412  auto oldSources = activeSources;
413  activeSources.push_back(*bestInactiveSource);
414  reportSiteChange(oldSources, activeSources);
415  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {inactiveSources.erase(it); break;}
416  }
417  else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
418  {
419  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << (*worstActiveSource)->PrettyID()
420  << " from active sources due to quality (" << (*worstActiveSource)->getQuality()
421  << ") and promoting " << (*bestInactiveSource)->PrettyID() << " (quality: "
422  << (*bestInactiveSource)->getQuality() << ")" << std::endl;
423  (*worstActiveSource)->setLastDowngrade(now);
424  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {inactiveSources.erase(it); break;}
425  inactiveSources.emplace_back(std::move(*worstActiveSource));
426  auto oldSources = activeSources;
427  activeSources.erase(worstActiveSource);
428  activeSources.emplace_back(std::move(*bestInactiveSource));
429  reportSiteChange(oldSources, activeSources);
430  eligibleInactiveSources.clear();
431  for (const auto & source : inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source);
432  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
433  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
434  worstActiveSource = std::max_element(activeSources.begin(), activeSources.end(),
435  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
436  }
437  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000*XRD_ADAPTOR_LONG_OPEN_DELAY))
438  {
439  float r = m_distribution(m_generator);
441  {
442  findNewSource = true;
443  }
444  }
445  }
446  if (findNewSource)
447  {
448  m_open_handler->open();
450  }
451 
452  // Only aggressively look for new sources if we don't have two.
453  if (activeSources.size() == 2)
454  {
456  }
457  else
458  {
459  now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
460  }
462 }
463 
464 std::shared_ptr<XrdCl::File>
466 {
467  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
468  if (m_activeSources.empty())
469  {
471  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name
472  << "', flags=0x" << std::hex << m_flags
473  << ", permissions=0" << std::oct << m_perms << std::dec
474  << ") => Source used after fatal exception.";
475  ex.addContext("In XrdAdaptor::RequestManager::handle()");
476  addConnections(ex);
477  throw ex;
478  }
479  return m_activeSources[0]->getFileHandle();
480 }
481 
482 void
483 RequestManager::getActiveSourceNames(std::vector<std::string> & sources) const
484 {
485  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
486  sources.reserve(m_activeSources.size());
487  for (auto const& source : m_activeSources) {
488  sources.push_back(source->ID());
489  }
490 }
491 
492 void
493 RequestManager::getPrettyActiveSourceNames(std::vector<std::string> & sources) const
494 {
495  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
496  sources.reserve(m_activeSources.size());
497  for (auto const& source : m_activeSources) {
498  sources.push_back(source->PrettyID());
499  }
500 }
501 
502 void
503 RequestManager::getDisabledSourceNames(std::vector<std::string> & sources) const
504 {
505  sources.reserve(m_disabledSourceStrings.size());
506  for (auto const& source : m_disabledSourceStrings) {
507  sources.push_back(source);
508  }
509 }
510 
511 void
513 {
514  std::vector<std::string> sources;
516  for (auto const& source : sources)
517  {
518  ex.addAdditionalInfo("Active source: " + source);
519  }
520  sources.clear();
521  getDisabledSourceNames(sources);
522  for (auto const& source : sources)
523  {
524  ex.addAdditionalInfo("Disabled source: " + source);
525  }
526 }
527 
528 std::shared_ptr<Source>
530 {
531  std::shared_ptr<Source> source = nullptr;
532  {
533  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
534  if (m_activeSources.size() == 2)
535  {
537  {
538  source = m_activeSources[0];
540  }
541  else
542  {
543  source = m_activeSources[1];
545  }
546  }
547  else if (m_activeSources.empty())
548  {
550  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name
551  << "', flags=0x" << std::hex << m_flags
552  << ", permissions=0" << std::oct << m_perms << std::dec
553  << ") => Source used after fatal exception.";
554  ex.addContext("In XrdAdaptor::RequestManager::handle()");
555  addConnections(ex);
556  throw ex;
557  }
558  else
559  {
560  source = m_activeSources[0];
561  }
562  }
563  return source;
564 }
565 
566 std::future<IOSize>
567 RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr)
568 {
569  assert(c_ptr.get());
570  timespec now;
571  GET_CLOCK_MONOTONIC(now);
572  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
573  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
574  {
575  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
576  activeSources = m_activeSources;
577  inactiveSources = m_inactiveSources;
578  }
579  {
580  //make sure we update values before calling pickSingelSource
581  std::shared_ptr<void*> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
582  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
583  m_activeSources = std::move(activeSources);
584  m_inactiveSources = std::move(inactiveSources);
585  });
586 
587  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
588  }
589 
590  std::shared_ptr<Source> source = pickSingleSource();
591  source->handle(c_ptr);
592  return c_ptr->get_future();
593 }
594 
597 {
598  std::stringstream ss;
599  ss << "tried=";
600  size_t count = 0;
601  {
602  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
603 
604  for ( const auto & it : m_activeSources )
605  {
606  count++;
607  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
608  }
609  for ( const auto & it : m_inactiveSources )
610  {
611  count++;
612  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
613  }
614  }
615  for ( const auto & it : m_disabledExcludeStrings )
616  {
617  count++;
618  ss << it.substr(0, it.find(":")) << ",";
619  }
620  if (count)
621  {
622  std::string tmp_str = ss.str();
623  return tmp_str.substr(0, tmp_str.size()-1);
624  }
625  return "";
626 }
627 
628 void
629 XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source)
630 {
631  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
632  if (status.IsOK())
633  {
634  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
635  for (const auto & s : m_activeSources)
636  {
637  if (source->ID() == s->ID())
638  {
639  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded source " << source->PrettyID()
640  << "; ignoring" << std::endl;
641  unsigned returned_count = ++m_excluded_active_count;
644  return;
645  }
646  }
647  for (const auto & s : m_inactiveSources)
648  {
649  if (source->ID() == s->ID())
650  {
651  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded inactive source " << source->PrettyID()
652  << "; ignoring" << std::endl;
654  return;
655  }
656  }
657  if (m_activeSources.size() < 2)
658  {
659  auto oldSources = m_activeSources;
660  m_activeSources.push_back(source);
661  reportSiteChange(oldSources, m_activeSources);
662  queueUpdateCurrentServer(source->ID());
663  }
664  else
665  {
666  m_inactiveSources.push_back(source);
667  }
668  }
669  else
670  { // File-open failure - wait at least 120s before next attempt.
671  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
673  }
674 }
675 
676 std::future<IOSize>
677 XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer> > iolist)
678 {
679  //Use a copy of m_activeSources and m_inactiveSources throughout this function
680  // in order to avoid holding the lock a long time and causing a deadlock.
681  // When the function is over we will update the values of the containers
682  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
683  {
684  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
685  activeSources = m_activeSources;
686  inactiveSources = m_inactiveSources;
687  }
688  //Make sure we update changes when we leave the function
689  std::shared_ptr<void*> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
690  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
691  m_activeSources = std::move(activeSources);
692  m_inactiveSources = std::move(inactiveSources);
693  });
694 
696 
697  timespec now;
698  GET_CLOCK_MONOTONIC(now);
699 
700  edm::CPUTimer timer;
701  timer.start();
702 
703  if (activeSources.size() == 1)
704  {
705  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
706  checkSources(now, c_ptr->getSize(), activeSources,inactiveSources);
707  activeSources[0]->handle(c_ptr);
708  return c_ptr->get_future();
709  }
710  // Make sure active
711  else if (activeSources.empty())
712  {
714  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name
715  << "', flags=0x" << std::hex << m_flags
716  << ", permissions=0" << std::oct << m_perms << std::dec
717  << ") => Source used after fatal exception.";
718  ex.addContext("In XrdAdaptor::RequestManager::handle()");
719  addConnections(ex);
720  throw ex;
721  }
722 
723  assert(iolist.get());
724  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
725  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
726  splitClientRequest(*iolist, *req1, *req2, activeSources);
727 
728  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
729  // CheckSources may have removed a source
730  if (activeSources.size() == 1)
731  {
732  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
733  activeSources[0]->handle(c_ptr);
734  return c_ptr->get_future();
735  }
736 
737  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
738  std::future<IOSize> future1, future2;
739  if (!req1->empty())
740  {
741  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
742  activeSources[0]->handle(c_ptr1);
743  future1 = c_ptr1->get_future();
744  }
745  if (!req2->empty())
746  {
747  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
748  activeSources[1]->handle(c_ptr2);
749  future2 = c_ptr2->get_future();
750  }
751  if (!req1->empty() && !req2->empty())
752  {
753  std::future<IOSize> task = std::async(std::launch::deferred,
754  [](std::future<IOSize> a, std::future<IOSize> b){
755  // Wait until *both* results are available. This is essential
756  // as the callback may try referencing the RequestManager. If one
757  // throws an exception (causing the RequestManager to be destroyed by
758  // XrdFile) and the other has a failure, then the recovery code will
759  // reference the destroyed RequestManager.
760  //
761  // Unlike other places where we use shared/weak ptrs to maintain object
762  // lifetime and destruction asynchronously, we *cannot* destroy the request
763  // asynchronously as it is associated with a ROOT buffer. We must wait until we
764  // are guaranteed that XrdCl will not write into the ROOT buffer before we
765  // can return.
766  b.wait(); a.wait();
767  return b.get() + a.get();
768  },
769  std::move(future1),
770  std::move(future2));
771  timer.stop();
772  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
773  return task;
774  }
775  else if (!req1->empty()) { return future1; }
776  else if (!req2->empty()) { return future2; }
777  else
778  { // Degenerate case - no bytes to read.
779  std::promise<IOSize> p; p.set_value(0);
780  return p.get_future();
781  }
782 }
783 
784 void
785 RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status)
786 {
787  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
788 
789  // Fail early for invalid responses - XrdFile has a separate path for handling this.
790  if (c_status.code == XrdCl::errInvalidResponse)
791  {
792  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
794  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name
795  << "', flags=0x" << std::hex << m_flags
796  << ", permissions=0" << std::oct << m_perms << std::dec
797  << ", old source=" << source_ptr->PrettyID()
798  << ") => Invalid ReadV response from server";
799  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
800  addConnections(ex);
801  throw ex;
802  }
803  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
804 
805  // Note that we do not delete the Source itself. That is because this
806  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
807  // In such a case, if you close a file in the handler, it will deadlock
808  m_disabledSourceStrings.insert(source_ptr->ID());
809  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
810  m_disabledSources.insert(source_ptr);
811 
812  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
813  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get()))
814  {
815  auto oldSources = m_activeSources;
816  m_activeSources.erase(m_activeSources.begin());
817  reportSiteChange(oldSources, m_activeSources);
818  }
819  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
820  {
821  auto oldSources = m_activeSources;
822  m_activeSources.erase(m_activeSources.begin()+1);
823  reportSiteChange(oldSources, m_activeSources);
824  }
825  std::shared_ptr<Source> new_source;
826  if (m_activeSources.empty())
827  {
828  std::shared_future<std::shared_ptr<Source> > future = m_open_handler->open();
829  timespec now;
830  GET_CLOCK_MONOTONIC(now);
832  // Note we only wait for 180 seconds here. This is because we've already failed
833  // once and the likelihood the program has some inconsistent state is decent.
834  // We'd much rather fail hard than deadlock!
835  sentry.unlock();
836  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout+10));
837  if (status == std::future_status::timeout)
838  {
840  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
841  << "', flags=0x" << std::hex << m_flags
842  << ", permissions=0" << std::oct << m_perms << std::dec
843  << ", old source=" << source_ptr->PrettyID()
844  << ") => timeout when waiting for file open";
845  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
846  addConnections(ex);
847  throw ex;
848  }
849  else
850  {
851  try
852  {
853  new_source = future.get();
854  }
855  catch (edm::Exception &ex)
856  {
857  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
858  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
859  throw;
860  }
861  }
862 
863  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) != m_disabledSourceStrings.end())
864  {
865  // The server gave us back a data node we requested excluded. Fatal!
867  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
868  << "', flags=0x" << std::hex << m_flags
869  << ", permissions=0" << std::oct << m_perms << std::dec
870  << ", old source=" << source_ptr->PrettyID()
871  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
872  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
873  addConnections(ex);
874  throw ex;
875  }
876  sentry.lock();
877 
878  auto oldSources = m_activeSources;
879  m_activeSources.push_back(new_source);
880  reportSiteChange(oldSources,m_activeSources);
881  }
882  else
883  {
884  new_source = m_activeSources[0];
885  }
886  new_source->handle(c_ptr);
887 }
888 
889 static void
890 consumeChunkFront(size_t &front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
891 {
892  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD))
893  {
894  IOPosBuffer &io = input[front];
895  IOPosBuffer &outio = output.back();
896  if (io.size() > chunksize)
897  {
898  IOSize consumed;
899  if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) && (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset()))
900  {
901  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK)
902  {
903  consumed = (XRD_CL_MAX_CHUNK - outio.size());
904  outio.set_size(XRD_CL_MAX_CHUNK);
905  }
906  else
907  {
908  consumed = chunksize;
909  outio.set_size(outio.size() + consumed);
910  }
911  }
912  else
913  {
914  consumed = chunksize;
915  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
916  }
917  chunksize -= consumed;
918  IOSize newsize = io.size() - consumed;
919  IOOffset newoffset = io.offset() + consumed;
920  void* newdata = static_cast<char*>(io.data()) + consumed;
921  io.set_offset(newoffset);
922  io.set_data(newdata);
923  io.set_size(newsize);
924  }
925  else if (io.size() == 0)
926  {
927  front++;
928  }
929  else
930  {
931  output.push_back(io);
932  chunksize -= io.size();
933  front++;
934  }
935  }
936 }
937 
938 static void
939 consumeChunkBack(size_t front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
940 {
941  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD))
942  {
943  IOPosBuffer &io = input.back();
944  IOPosBuffer &outio = output.back();
945  if (io.size() > chunksize)
946  {
947  IOSize consumed;
948  if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) && (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset()))
949  {
950  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK)
951  {
952  consumed = (XRD_CL_MAX_CHUNK - outio.size());
953  outio.set_size(XRD_CL_MAX_CHUNK);
954  }
955  else
956  {
957  consumed = chunksize;
958  outio.set_size(outio.size() + consumed);
959  }
960  }
961  else
962  {
963  consumed = chunksize;
964  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
965  }
966  chunksize -= consumed;
967  IOSize newsize = io.size() - consumed;
968  IOOffset newoffset = io.offset() + consumed;
969  void* newdata = static_cast<char*>(io.data()) + consumed;
970  io.set_offset(newoffset);
971  io.set_data(newdata);
972  io.set_size(newsize);
973  }
974  else if (io.size() == 0)
975  {
976  input.pop_back();
977  }
978  else
979  {
980  output.push_back(io);
981  chunksize -= io.size();
982  input.pop_back();
983  }
984  }
985 }
986 
987 static IOSize validateList(const std::vector<IOPosBuffer> req)
988 {
989  IOSize total = 0;
990  off_t last_offset = -1;
991  for (const auto & it : req)
992  {
993  total += it.size();
994  assert(it.offset() > last_offset);
995  last_offset = it.offset();
996  assert(it.size() <= XRD_CL_MAX_CHUNK);
997  assert(it.offset() < 0x1ffffffffff);
998  }
999  assert(req.size() <= 1024);
1000  return total;
1001 }
1002 
1003 void
1004 XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist, std::vector<IOPosBuffer> &req1, std::vector<IOPosBuffer> &req2, std::vector<std::shared_ptr<Source>> const& activeSources) const
1005 {
1006  if (iolist.empty()) return;
1007  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
1008  req1.reserve(iolist.size()/2+1);
1009  req2.reserve(iolist.size()/2+1);
1010  size_t front=0;
1011 
1012  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
1013  float q1 = static_cast<float>(activeSources[0]->getQuality())+5;
1014  float q2 = static_cast<float>(activeSources[1]->getQuality())+5;
1015  IOSize chunk1, chunk2;
1016  // Make sure the chunk size is at least 1024; little point to reads less than that size.
1017  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
1018  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
1019 
1020  IOSize size_orig = 0;
1021  for (const auto & it : iolist) size_orig += it.size();
1022 
1023  while (tmp_iolist.size()-front > 0)
1024  {
1025  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) && (req2.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD))
1026  { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
1027  // are passed to the request manager. However, because we have a max chunk size, we increase
1028  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
1029  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
1030  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
1032  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name
1033  << "', flags=0x" << std::hex << m_flags
1034  << ", permissions=0" << std::oct << m_perms << std::dec
1035  << ") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
1036  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
1037  addConnections(ex);
1038  std::stringstream ss; ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
1039  ex.addAdditionalInfo(ss.str());
1040  std::stringstream ss2; ss2 << "Quality source 1 " << q1-5 << ", quality source 2: " << q2-5;
1041  ex.addAdditionalInfo(ss2.str());
1042  throw ex;
1043  }
1044  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
1045  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
1046  }
1047  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
1048  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
1049 
1050  IOSize size1 = validateList(req1);
1051  IOSize size2 = validateList(req2);
1052 
1053  assert(size_orig == size1 + size2);
1054 
1055  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig << " bytes) split into requests size " << req1.size() << " (" << size1 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
1056 }
1057 
1058 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager)
1059  : m_manager(manager)
1060 {
1061 }
1062 
1063 
1064  // Cannot use ~OpenHandler=default as XrdCl::File is not fully
1065  // defined in the header.
1067 {
1068 }
1069 
1070 
1071 void
1072 XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status_ptr, XrdCl::AnyObject *, XrdCl::HostList *hostList_ptr)
1073 {
1074  // Make sure we get rid of the strong self-reference when the callback finishes.
1075  std::shared_ptr<OpenHandler> self = m_self;
1076  m_self.reset();
1077 
1078  // NOTE: as in XrdCl::File (synchronous), we ignore the response object.
1079  // Make sure that we set m_outstanding_open to false on exit from this function.
1080  // NOTE: we need to pass non-nullptr to unique_ptr in order for the guard to run
1081  std::unique_ptr<OpenHandler, std::function<void(OpenHandler*)>> outstanding_guard(this, [&](OpenHandler*){m_outstanding_open=false;});
1082 
1083  std::shared_ptr<Source> source;
1084  std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
1085  std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1086 
1087  auto manager = m_manager.lock();
1088  // Manager object has already been deleted. Cleanup the
1089  // response objects, remove our self-reference, and ignore the response.
1090  if (!manager)
1091  {
1092  return;
1093  }
1094  //if we need to delete the File object we must do it outside
1095  // of the lock to avoid a potential deadlock
1096  std::unique_ptr<XrdCl::File> releaseFile;
1097  {
1098  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1099 
1100  if (status->IsOK())
1101  {
1103  timespec now;
1104  GET_CLOCK_MONOTONIC(now);
1105 
1106  std::string excludeString;
1107  Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1108 
1109  source.reset(new Source(now, std::move(m_file), excludeString));
1110  m_promise.set_value(source);
1111  }
1112  else
1113  {
1114  releaseFile = std::move(m_file);
1116  ex << "XrdCl::File::Open(name='" << manager->m_name
1117  << "', flags=0x" << std::hex << manager->m_flags
1118  << ", permissions=0" << std::oct << manager->m_perms << std::dec
1119  << ") => error '" << status->ToStr()
1120  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1121  ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1122  manager->addConnections(ex);
1123 
1124  m_promise.set_exception(std::make_exception_ptr(ex));
1125  }
1126  }
1127  manager->handleOpen(*status, source);
1128 }
1129 
1132 {
1133  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1134 
1135  if (!m_file.get())
1136  {
1137  return "(no open in progress)";
1138  }
1139  std::string dataServer;
1140  m_file->GetProperty("DataServer", dataServer);
1141  if (dataServer.empty()) { return "(unknown source)"; }
1142  return dataServer;
1143 }
1144 
1145 std::shared_future<std::shared_ptr<Source> >
1147 {
1148  auto manager_ptr = m_manager.lock();
1149  if (!manager_ptr)
1150  {
1152  ex << "XrdCl::File::Open() =>"
1153  << " error: OpenHandler called within an invalid RequestManager context."
1154  << " This is a logic error and should be reported to the CMSSW developers.";
1155  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1156  throw ex;
1157  }
1158  RequestManager &manager = *manager_ptr;
1159  auto self_ptr = m_self_weak.lock();
1160  if (!self_ptr)
1161  {
1163  ex << "XrdCl::File::Open() => error: "
1164  << "OpenHandler called after it was deleted. This is a logic error "
1165  << "and should be reported to the CMSSW developers.";
1166  ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1167  throw ex;
1168  }
1169 
1170  // NOTE NOTE: we look at this variable *without* the lock. This means the method
1171  // is not thread-safe; the caller is responsible to verify it is not called from
1172  // multiple threads simultaneously.
1173  //
1174  // This is done because ::open may be called from a Xrootd callback; if we
1175  // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1176  // and make a call into xrootd (when it invokes m_file.reset()). Hence, our callback
1177  // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1178  // an Xrootd mutex and tries to hold m_mutex. This is a classic deadlock.
1179  if (m_outstanding_open)
1180  {
1181  return m_shared_future;
1182  }
1183  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1184  std::promise<std::shared_ptr<Source> > new_promise;
1185  m_promise.swap(new_promise);
1186  m_shared_future = m_promise.get_future().share();
1187 
1188  auto opaque = manager.prepareOpaqueString();
1189  std::string new_name = manager.m_name + ((manager.m_name.find("?") == manager.m_name.npos) ? "?" : "&") + opaque;
1190  edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1191  m_file.reset(new XrdCl::File());
1192  m_outstanding_open = true;
1193 
1194  // Always make sure we release m_file and set m_outstanding_open to false on error.
1195  std::unique_ptr<OpenHandler, std::function<void(OpenHandler*)>> exit_guard(this, [&](OpenHandler*){m_outstanding_open = false; m_file.reset();});
1196 
1197  XrdCl::XRootDStatus status;
1198  if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK())
1199  {
1201  ex << "XrdCl::File::Open(name='" << new_name
1202  << "', flags=0x" << std::hex << manager.m_flags
1203  << ", permissions=0" << std::oct << manager.m_perms << std::dec
1204  << ") => error '" << status.ToStr()
1205  << "' (errno=" << status.errNo << ", code=" << status.code << ")";
1206  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1207  manager.addConnections(ex);
1208  throw ex;
1209  }
1210  exit_guard.release();
1211  // Have a strong self-reference for as long as the callback is in-progress.
1212  m_self = self_ptr;
1213  return m_shared_future;
1214 }
1215 
void setCurrentServer(const std::string &servername)
std::shared_future< std::shared_ptr< Source > > open()
double seconds()
void start()
Definition: CPUTimer.cc:74
std::shared_ptr< XrdCl::File > getActiveFile() const
void getDisabledSourceNames(std::vector< std::string > &sources) const
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:307
std::string prepareOpaqueString() const
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
#define XRD_CL_MAX_CHUNK
OpenHandler(std::weak_ptr< RequestManager > manager)
std::shared_ptr< OpenHandler > m_self
#define nullptr
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
int timeout
Definition: mps_check.py:51
double q2[4]
Definition: TauolaWrapper.h:88
std::weak_ptr< OpenHandler > m_self_weak
SendMonitoringInfoHandler nullHandler
static std::string const input
Definition: EdmProvDump.cc:44
#define XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static void SendMonitoringInfo(XrdCl::File &file)
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
std::shared_future< std::shared_ptr< Source > > m_shared_future
void queueUpdateCurrentServer(const std::string &)
std::weak_ptr< RequestManager > m_manager
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:255
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:266
std::atomic< std::string * > m_serverToAdvertise
bool isAvailable() const
Definition: Service.h:46
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
#define CMS_THREAD_SAFE
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
Times stop()
Definition: CPUTimer.cc:94
void clearMessage()
Definition: Exception.cc:215
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
void * data(void) const
Definition: IOPosBuffer.h:59
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void clearContext()
Definition: Exception.cc:219
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void getActiveSourceNames(std::vector< std::string > &sources) const
IOSize size(void) const
Definition: IOPosBuffer.h:64
XrdCl::OpenFlags::Flags m_flags
double q1[4]
Definition: TauolaWrapper.h:87
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::atomic< unsigned > m_excluded_active_count
double a
Definition: hdecay.h:121
std::shared_ptr< Source > pickSingleSource()
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:344
static IOSize validateList(const std::vector< IOPosBuffer > req)
#define LIKELY(x)
std::unique_ptr< XrdCl::File > m_file
size_t IOSize
Definition: IOTypes.h:14
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
static std::string const source
Definition: EdmProvDump.cc:43
void clearAdditionalInfo()
Definition: Exception.cc:223
def move(src, dest)
Definition: eostools.py:510
std::recursive_mutex m_source_mutex
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)