CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
XrdRequestManager.cc
Go to the documentation of this file.
1 
2 #include <assert.h>
3 #include <iostream>
4 #include <algorithm>
5 #include <netdb.h>
6 
7 #include "XrdCl/XrdClFile.hh"
8 #include "XrdCl/XrdClDefaultEnv.hh"
9 #include "XrdCl/XrdClFileSystem.hh"
10 
16 
17 #include "XrdStatistics.h"
19 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
20 
21 #define XRD_CL_MAX_CHUNK 512*1024
22 
23 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5
24 
25 #ifdef XRD_FAKE_OPEN_PROBE
26 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100
27 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20
28 // This is the minimal difference in quality required to swap an active and inactive source
29 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0
30 #else
31 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10
32 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2*60
33 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
34 #endif
35 
36 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000
37 
38 
39 #ifdef __MACH__
40 #include <mach/clock.h>
41 #include <mach/mach.h>
42 #define GET_CLOCK_MONOTONIC(ts) \
43 { \
44  clock_serv_t cclock; \
45  mach_timespec_t mts; \
46  host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
47  clock_get_time(cclock, &mts); \
48  mach_port_deallocate(mach_task_self(), cclock); \
49  ts.tv_sec = mts.tv_sec; \
50  ts.tv_nsec = mts.tv_nsec; \
51 }
52 #else
53 #define GET_CLOCK_MONOTONIC(ts) \
54  clock_gettime(CLOCK_MONOTONIC, &ts);
55 #endif
56 
57 using namespace XrdAdaptor;
58 
59 long long timeDiffMS(const timespec &a, const timespec &b)
60 {
61  long long diff = (a.tv_sec - b.tv_sec) * 1000;
62  diff += (a.tv_nsec - b.tv_nsec) / 1e6;
63  return diff;
64 }
65 
66 /*
67  * We do not care about the response of sending the monitoring information;
68  * this handler class simply frees any returned buffer to prevent memory leaks.
69  */
70 class SendMonitoringInfoHandler : boost::noncopyable, public XrdCl::ResponseHandler
71 {
72  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
73  {
74  if (response)
75  {
76  XrdCl::Buffer *buffer = nullptr;
77  response->Get(buffer);
78  response->Set(static_cast<int*>(nullptr));
79  delete buffer;
80  }
81  // Send Info has a response object; we must delete it.
82  delete response;
83  delete status;
84  }
85 };
86 
88 
89 
90 static void
92 {
93  // Do not send this to a dCache data server as they return an error.
94  // In some versions of dCache, sending the monitoring information causes
95  // the server to close the connection - resulting in failures.
96  if (Source::isDCachePool(file)) {return;}
97 
98  // Send the monitoring info, if available.
100  std::string lastUrl;
101  file.GetProperty("LastURL", lastUrl);
102  if (jobId && lastUrl.size())
103  {
104  XrdCl::URL url(lastUrl);
105  XrdCl::FileSystem fs(url);
106  if (!(fs.SendInfo(jobId, &nullHandler, 30).IsOK()))
107  {
108  edm::LogWarning("XrdAdaptorInternal") << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
109  }
110  edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
111  }
112 }
113 
114 
116  : m_serverToAdvertise(nullptr),
117  m_timeout(XRD_DEFAULT_TIMEOUT),
118  m_nextInitialSourceToggle(false),
119  m_name(filename),
120  m_flags(flags),
121  m_perms(perms),
122  m_distribution(0,100),
123  m_excluded_active_count(0)
124 {
125 }
126 
127 
128 void
129 RequestManager::initialize(std::weak_ptr<RequestManager> self)
130 {
132 
133  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
134  if (env) {env->GetInt("StreamErrorWindow", m_timeout);}
135 
136  std::string orig_site;
137  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos))
138  {
139  std::string hostname;
140  if (Source::getHostname(orig_site, hostname))
141  {
142  Source::getDomain(hostname, orig_site);
143  }
144  }
145 
146  std::unique_ptr<XrdCl::File> file;
148  bool validFile = false;
149  const int retries = 5;
150  std::string excludeString;
151  for (int idx=0; idx<retries; idx++)
152  {
153  file.reset(new XrdCl::File());
154  auto opaque = prepareOpaqueString();
155  std::string new_filename = m_name + (opaque.size() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
156  SyncHostResponseHandler handler;
157  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
158  if (!openStatus.IsOK())
159  { // In this case, we failed immediately - this indicates we have previously tried to talk to this
160  // server and it was marked bad - xrootd couldn't even queue up the request internally!
161  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
162  // redirector being down or authentication failures.
163  ex.clearMessage();
164  ex.clearContext();
165  ex.clearAdditionalInfo();
166  ex << "XrdCl::File::Open(name='" << m_name
167  << "', flags=0x" << std::hex << m_flags
168  << ", permissions=0" << std::oct << m_perms << std::dec
169  << ") => error '" << openStatus.ToStr()
170  << "' (errno=" << openStatus.errNo << ", code=" << openStatus.code << ")";
171  ex.addContext("Calling XrdFile::open()");
172  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
173  throw ex;
174  }
175  handler.WaitForResponse();
176  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
177  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
178  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
179  assert(status);
180  if (status->IsOK())
181  {
182  validFile = true;
183  break;
184  }
185  else
186  {
187  ex.clearMessage();
188  ex.clearContext();
189  ex.clearAdditionalInfo();
190  ex << "XrdCl::File::Open(name='" << m_name
191  << "', flags=0x" << std::hex << m_flags
192  << ", permissions=0" << std::oct << m_perms << std::dec
193  << ") => error '" << status->ToStr()
194  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
195  ex.addContext("Calling XrdFile::open()");
196  addConnections(ex);
197  std::string dataServer, lastUrl;
198  file->GetProperty("DataServer", dataServer);
199  file->GetProperty("LastURL", lastUrl);
200  if (dataServer.size())
201  {
202  ex.addAdditionalInfo("Problematic data server: " + dataServer);
203  }
204  if (lastUrl.size())
205  {
206  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
207  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
208  }
209  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) != m_disabledSourceStrings.end())
210  {
211  ex << ". No additional data servers were found.";
212  throw ex;
213  }
214  if (dataServer.size())
215  {
216  m_disabledSourceStrings.insert(dataServer);
217  m_disabledExcludeStrings.insert(excludeString);
218  }
219  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
220  if (lastUrl == new_filename)
221  {
222  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
223  throw ex;
224  }
225  }
226  }
227  if (!validFile)
228  {
229  throw ex;
230  }
231  SendMonitoringInfo(*file);
232 
233  timespec ts;
235 
236  std::shared_ptr<Source> source(new Source(ts, std::move(file), excludeString));
237  {
238  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
239  auto oldList = m_activeSources;
240  m_activeSources.push_back(source);
242  }
243  queueUpdateCurrentServer(source->ID());
245 
246  m_lastSourceCheck = ts;
247  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
249 }
250 
258 void
260 {
261  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
262  // a pending update. *However*, since we call this for every read, we'll get it
263  // eventually.
264  if (likely(!m_serverToAdvertise.load(std::memory_order_relaxed))) {return;}
265  std::string *hostname_ptr;
266  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr)))
267  {
268  std::unique_ptr<std::string> hostname(hostname_ptr);
270  if (statsService.isAvailable()) {
271  statsService->setCurrentServer(*hostname_ptr);
272  }
273  }
274 }
275 
276 
277 void
279 {
280  std::unique_ptr<std::string> hostname(new std::string(id));
281  if (Source::getHostname(id, *hostname))
282  {
283  std::string *null_hostname = nullptr;
284  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get()))
285  {
286  hostname.release();
287  }
288  }
289 }
290 
291 namespace {
292  std::string formatSites(std::vector<std::shared_ptr<Source> > const& iSources) {
293  std::string siteA, siteB;
294  if (iSources.size()) {siteA = iSources[0]->Site();}
295  if (iSources.size() == 2) {siteB = iSources[1]->Site();}
296  std::string siteList = siteA;
297  if (siteB.size() && (siteB != siteA)) {siteList = siteA + ", " + siteB;}
298  return siteList;
299  }
300 }
301 
302 void
303 RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source> > const& iOld,
304  std::vector<std::shared_ptr<Source> > const& iNew,
305  std::string orig_site) const
306 {
307  auto siteList = formatSites(iNew);
308  if (orig_site.size() && (orig_site != siteList))
309  {
310  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
311  }
312  else {
313  auto oldSites = formatSites(iOld);
314  if (!orig_site.size() && (siteList != oldSites))
315  {
316  if (oldSites.size() >0 )
317  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
318  }
319  }
320 }
321 
322 
323 void
324 RequestManager::checkSources(timespec &now, IOSize requestSize,
325  std::vector<std::shared_ptr<Source>>& activeSources,
326  std::vector<std::shared_ptr<Source>>& inactiveSources)
327 {
328  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check "
329  << timeDiffMS(now, m_lastSourceCheck) << "; last check "
330  << m_lastSourceCheck.tv_sec << "; now " <<now.tv_sec
331  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
332  if (timeDiffMS(now, m_lastSourceCheck) > 1000)
333  {
334  { // Be more aggressive about getting rid of very bad sources.
335  compareSources(now, 0, 1, activeSources, inactiveSources);
336  compareSources(now, 1, 0, activeSources, inactiveSources);
337  }
338  if (timeDiffMS(now, m_nextActiveSourceCheck) > 0)
339  {
340  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
341  }
342  }
343 }
344 
345 
346 bool
347 RequestManager::compareSources(const timespec &now, unsigned a, unsigned b,
348  std::vector<std::shared_ptr<Source>>& activeSources,
349  std::vector<std::shared_ptr<Source>>& inactiveSources) const
350 {
351  if (activeSources.size() < std::max(a, b)+1) {return false;}
352 
353  bool findNewSource = false;
354  if ((activeSources[a]->getQuality() > 5130) ||
355  ((activeSources[a]->getQuality() > 260) && (activeSources[b]->getQuality()*4 < activeSources[a]->getQuality())))
356  {
357  edm::LogVerbatim("XrdAdaptorInternal") << "Removing "
358  << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
359  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
360  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource = true;}
361  activeSources[a]->setLastDowngrade(now);
362  inactiveSources.emplace_back(activeSources[a]);
363  auto oldSources = activeSources;
364  activeSources.erase(activeSources.begin()+a);
365  reportSiteChange(oldSources,activeSources);
366  }
367  return findNewSource;
368 }
369 
370 void
372  IOSize requestSize,
373  std::vector<std::shared_ptr<Source>>& activeSources,
374  std::vector<std::shared_ptr<Source>>& inactiveSources)
375 {
376 
377  bool findNewSource = false;
378  if (activeSources.size() <= 1)
379  {
380  findNewSource = true;
381  }
382  else if (activeSources.size() > 1)
383  {
384  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality() << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
385  findNewSource |= compareSources(now, 0, 1, activeSources,inactiveSources);
386  findNewSource |= compareSources(now, 1, 0,activeSources, inactiveSources);
387 
388  // NOTE: We could probably replace the copy with a better sort function.
389  // However, there are typically very few sources and the correctness is more obvious right now.
390  std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(inactiveSources.size());
391  for (const auto & source : inactiveSources)
392  {
393  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY-1)*1000) {eligibleInactiveSources.push_back(source);}
394  }
395  auto bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
396  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
397  auto worstActiveSource = std::max_element(activeSources.cbegin(), activeSources.cend(),
398  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
399  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
400  {
401  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " <<(*bestInactiveSource)->PrettyID()
402  << ", quality " << (*bestInactiveSource)->getQuality();
403  }
404  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " <<(*worstActiveSource)->PrettyID()
405  << ", quality " << (*worstActiveSource)->getQuality();
406  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
407  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
408  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*activeSources[0]->getQuality()))
409  {
410  auto oldSources = activeSources;
411  activeSources.push_back(*bestInactiveSource);
412  reportSiteChange(oldSources, activeSources);
413  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {inactiveSources.erase(it); break;}
414  }
415  else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
416  {
417  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << (*worstActiveSource)->PrettyID()
418  << " from active sources due to quality (" << (*worstActiveSource)->getQuality()
419  << ") and promoting " << (*bestInactiveSource)->PrettyID() << " (quality: "
420  << (*bestInactiveSource)->getQuality() << ")" << std::endl;
421  (*worstActiveSource)->setLastDowngrade(now);
422  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {inactiveSources.erase(it); break;}
423  inactiveSources.emplace_back(std::move(*worstActiveSource));
424  auto oldSources = activeSources;
425  activeSources.erase(worstActiveSource);
426  activeSources.emplace_back(std::move(*bestInactiveSource));
427  reportSiteChange(oldSources, activeSources);
428  eligibleInactiveSources.clear();
429  for (const auto & source : inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source);
430  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
431  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
432  worstActiveSource = std::max_element(activeSources.begin(), activeSources.end(),
433  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
434  }
435  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000*XRD_ADAPTOR_LONG_OPEN_DELAY))
436  {
437  float r = m_distribution(m_generator);
439  {
440  findNewSource = true;
441  }
442  }
443  }
444  if (findNewSource)
445  {
446  m_open_handler->open();
448  }
449 
450  // Only aggressively look for new sources if we don't have two.
451  if (activeSources.size() == 2)
452  {
454  }
455  else
456  {
457  now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
458  }
460 }
461 
462 std::shared_ptr<XrdCl::File>
464 {
465  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
466  if (m_activeSources.empty())
467  {
469  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name
470  << "', flags=0x" << std::hex << m_flags
471  << ", permissions=0" << std::oct << m_perms << std::dec
472  << ") => Source used after fatal exception.";
473  ex.addContext("In XrdAdaptor::RequestManager::handle()");
474  addConnections(ex);
475  throw ex;
476  }
477  return m_activeSources[0]->getFileHandle();
478 }
479 
480 void
481 RequestManager::getActiveSourceNames(std::vector<std::string> & sources) const
482 {
483  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
484  sources.reserve(m_activeSources.size());
485  for (auto const& source : m_activeSources) {
486  sources.push_back(source->ID());
487  }
488 }
489 
490 void
491 RequestManager::getPrettyActiveSourceNames(std::vector<std::string> & sources) const
492 {
493  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
494  sources.reserve(m_activeSources.size());
495  for (auto const& source : m_activeSources) {
496  sources.push_back(source->PrettyID());
497  }
498 }
499 
500 void
501 RequestManager::getDisabledSourceNames(std::vector<std::string> & sources) const
502 {
503  sources.reserve(m_disabledSourceStrings.size());
504  for (auto const& source : m_disabledSourceStrings) {
505  sources.push_back(source);
506  }
507 }
508 
509 void
511 {
512  std::vector<std::string> sources;
514  for (auto const& source : sources)
515  {
516  ex.addAdditionalInfo("Active source: " + source);
517  }
518  sources.clear();
519  getDisabledSourceNames(sources);
520  for (auto const& source : sources)
521  {
522  ex.addAdditionalInfo("Disabled source: " + source);
523  }
524 }
525 
526 std::shared_ptr<Source>
528 {
529  std::shared_ptr<Source> source = nullptr;
530  {
531  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
532  if (m_activeSources.size() == 2)
533  {
535  {
536  source = m_activeSources[0];
538  }
539  else
540  {
541  source = m_activeSources[1];
543  }
544  }
545  else if (m_activeSources.empty())
546  {
548  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name
549  << "', flags=0x" << std::hex << m_flags
550  << ", permissions=0" << std::oct << m_perms << std::dec
551  << ") => Source used after fatal exception.";
552  ex.addContext("In XrdAdaptor::RequestManager::handle()");
553  addConnections(ex);
554  throw ex;
555  }
556  else
557  {
558  source = m_activeSources[0];
559  }
560  }
561  return source;
562 }
563 
564 std::future<IOSize>
565 RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr)
566 {
567  assert(c_ptr.get());
568  timespec now;
569  GET_CLOCK_MONOTONIC(now);
570  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
571  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
572  {
573  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
574  activeSources = m_activeSources;
575  inactiveSources = m_inactiveSources;
576  }
577  {
578  //make sure we update values before calling pickSingelSource
579  std::shared_ptr<void*> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
580  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
581  m_activeSources = std::move(activeSources);
582  m_inactiveSources = std::move(inactiveSources);
583  });
584 
585  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
586  }
587 
588  std::shared_ptr<Source> source = pickSingleSource();
589  source->handle(c_ptr);
590  return c_ptr->get_future();
591 }
592 
595 {
596  std::stringstream ss;
597  ss << "tried=";
598  size_t count = 0;
599  {
600  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
601 
602  for ( const auto & it : m_activeSources )
603  {
604  count++;
605  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
606  }
607  for ( const auto & it : m_inactiveSources )
608  {
609  count++;
610  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
611  }
612  }
613  for ( const auto & it : m_disabledExcludeStrings )
614  {
615  count++;
616  ss << it.substr(0, it.find(":")) << ",";
617  }
618  if (count)
619  {
620  std::string tmp_str = ss.str();
621  return tmp_str.substr(0, tmp_str.size()-1);
622  }
623  return "";
624 }
625 
626 void
627 XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source)
628 {
629  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
630  if (status.IsOK())
631  {
632  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
633  for (const auto & s : m_activeSources)
634  {
635  if (source->ID() == s->ID())
636  {
637  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded source " << source->PrettyID()
638  << "; ignoring" << std::endl;
639  unsigned returned_count = ++m_excluded_active_count;
640  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
641  if (returned_count >= 3) {m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2*XRD_ADAPTOR_SHORT_OPEN_DELAY;}
642  return;
643  }
644  }
645  for (const auto & s : m_inactiveSources)
646  {
647  if (source->ID() == s->ID())
648  {
649  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded inactive source " << source->PrettyID()
650  << "; ignoring" << std::endl;
651  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
652  return;
653  }
654  }
655  if (m_activeSources.size() < 2)
656  {
657  auto oldSources = m_activeSources;
658  m_activeSources.push_back(source);
659  reportSiteChange(oldSources, m_activeSources);
660  queueUpdateCurrentServer(source->ID());
661  }
662  else
663  {
664  m_inactiveSources.push_back(source);
665  }
666  }
667  else
668  { // File-open failure - wait at least 120s before next attempt.
669  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
670  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
671  }
672 }
673 
674 std::future<IOSize>
675 XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer> > iolist)
676 {
677  //Use a copy of m_activeSources and m_inactiveSources throughout this function
678  // in order to avoid holding the lock a long time and causing a deadlock.
679  // When the function is over we will update the values of the containers
680  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
681  {
682  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
683  activeSources = m_activeSources;
684  inactiveSources = m_inactiveSources;
685  }
686  //Make sure we update changes when we leave the function
687  std::shared_ptr<void*> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
688  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
689  m_activeSources = std::move(activeSources);
690  m_inactiveSources = std::move(inactiveSources);
691  });
692 
693  updateCurrentServer();
694 
695  timespec now;
696  GET_CLOCK_MONOTONIC(now);
697 
698  edm::CPUTimer timer;
699  timer.start();
700 
701  if (activeSources.size() == 1)
702  {
703  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
704  checkSources(now, c_ptr->getSize(), activeSources,inactiveSources);
705  activeSources[0]->handle(c_ptr);
706  return c_ptr->get_future();
707  }
708  // Make sure active
709  else if (activeSources.empty())
710  {
712  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name
713  << "', flags=0x" << std::hex << m_flags
714  << ", permissions=0" << std::oct << m_perms << std::dec
715  << ") => Source used after fatal exception.";
716  ex.addContext("In XrdAdaptor::RequestManager::handle()");
717  addConnections(ex);
718  throw ex;
719  }
720 
721  assert(iolist.get());
722  std::shared_ptr<std::vector<IOPosBuffer> > req1(new std::vector<IOPosBuffer>);
723  std::shared_ptr<std::vector<IOPosBuffer> > req2(new std::vector<IOPosBuffer>);
724  splitClientRequest(*iolist, *req1, *req2, activeSources);
725 
726  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
727  // CheckSources may have removed a source
728  if (activeSources.size() == 1)
729  {
730  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
731  activeSources[0]->handle(c_ptr);
732  return c_ptr->get_future();
733  }
734 
735  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
736  std::future<IOSize> future1, future2;
737  if (req1->size())
738  {
739  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
740  activeSources[0]->handle(c_ptr1);
741  future1 = c_ptr1->get_future();
742  }
743  if (req2->size())
744  {
745  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
746  activeSources[1]->handle(c_ptr2);
747  future2 = c_ptr2->get_future();
748  }
749  if (req1->size() && req2->size())
750  {
751  std::future<IOSize> task = std::async(std::launch::deferred,
752  [](std::future<IOSize> a, std::future<IOSize> b){
753  // Wait until *both* results are available. This is essential
754  // as the callback may try referencing the RequestManager. If one
755  // throws an exception (causing the RequestManager to be destroyed by
756  // XrdFile) and the other has a failure, then the recovery code will
757  // reference the destroyed RequestManager.
758  //
759  // Unlike other places where we use shared/weak ptrs to maintain object
760  // lifetime and destruction asynchronously, we *cannot* destroy the request
761  // asynchronously as it is associated with a ROOT buffer. We must wait until we
762  // are guaranteed that XrdCl will not write into the ROOT buffer before we
763  // can return.
764  b.wait(); a.wait();
765  return b.get() + a.get();
766  },
767  std::move(future1),
768  std::move(future2));
769  timer.stop();
770  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
771  return task;
772  }
773  else if (req1->size()) { return future1; }
774  else if (req2->size()) { return future2; }
775  else
776  { // Degenerate case - no bytes to read.
777  std::promise<IOSize> p; p.set_value(0);
778  return p.get_future();
779  }
780 }
781 
782 void
783 RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status)
784 {
785  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
786 
787  // Fail early for invalid responses - XrdFile has a separate path for handling this.
788  if (c_status.code == XrdCl::errInvalidResponse)
789  {
790  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
792  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name
793  << "', flags=0x" << std::hex << m_flags
794  << ", permissions=0" << std::oct << m_perms << std::dec
795  << ", old source=" << source_ptr->PrettyID()
796  << ") => Invalid ReadV response from server";
797  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
798  addConnections(ex);
799  throw ex;
800  }
801  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
802 
803  // Note that we do not delete the Source itself. That is because this
804  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
805  // In such a case, if you close a file in the handler, it will deadlock
806  m_disabledSourceStrings.insert(source_ptr->ID());
807  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
808  m_disabledSources.insert(source_ptr);
809 
810  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
811  if ((m_activeSources.size() > 0) && (m_activeSources[0].get() == source_ptr.get()))
812  {
813  auto oldSources = m_activeSources;
814  m_activeSources.erase(m_activeSources.begin());
815  reportSiteChange(oldSources, m_activeSources);
816  }
817  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
818  {
819  auto oldSources = m_activeSources;
820  m_activeSources.erase(m_activeSources.begin()+1);
821  reportSiteChange(oldSources, m_activeSources);
822  }
823  std::shared_ptr<Source> new_source;
824  if (m_activeSources.size() == 0)
825  {
826  std::shared_future<std::shared_ptr<Source> > future = m_open_handler->open();
827  timespec now;
828  GET_CLOCK_MONOTONIC(now);
830  // Note we only wait for 180 seconds here. This is because we've already failed
831  // once and the likelihood the program has some inconsistent state is decent.
832  // We'd much rather fail hard than deadlock!
833  sentry.unlock();
834  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout+10));
835  if (status == std::future_status::timeout)
836  {
838  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
839  << "', flags=0x" << std::hex << m_flags
840  << ", permissions=0" << std::oct << m_perms << std::dec
841  << ", old source=" << source_ptr->PrettyID()
842  << ") => timeout when waiting for file open";
843  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
844  addConnections(ex);
845  throw ex;
846  }
847  else
848  {
849  try
850  {
851  new_source = future.get();
852  }
853  catch (edm::Exception &ex)
854  {
855  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
856  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
857  throw;
858  }
859  }
860 
861  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) != m_disabledSourceStrings.end())
862  {
863  // The server gave us back a data node we requested excluded. Fatal!
865  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
866  << "', flags=0x" << std::hex << m_flags
867  << ", permissions=0" << std::oct << m_perms << std::dec
868  << ", old source=" << source_ptr->PrettyID()
869  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
870  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
871  addConnections(ex);
872  throw ex;
873  }
874  sentry.lock();
875 
876  auto oldSources = m_activeSources;
877  m_activeSources.push_back(new_source);
878  reportSiteChange(oldSources,m_activeSources);
879  }
880  else
881  {
882  new_source = m_activeSources[0];
883  }
884  new_source->handle(c_ptr);
885 }
886 
887 static void
888 consumeChunkFront(size_t &front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
889 {
890  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD))
891  {
892  IOPosBuffer &io = input[front];
893  IOPosBuffer &outio = output.back();
894  if (io.size() > chunksize)
895  {
896  IOSize consumed;
897  if (output.size() && (outio.size() < XRD_CL_MAX_CHUNK) && (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset()))
898  {
899  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK)
900  {
901  consumed = (XRD_CL_MAX_CHUNK - outio.size());
902  outio.set_size(XRD_CL_MAX_CHUNK);
903  }
904  else
905  {
906  consumed = chunksize;
907  outio.set_size(outio.size() + consumed);
908  }
909  }
910  else
911  {
912  consumed = chunksize;
913  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
914  }
915  chunksize -= consumed;
916  IOSize newsize = io.size() - consumed;
917  IOOffset newoffset = io.offset() + consumed;
918  void* newdata = static_cast<char*>(io.data()) + consumed;
919  io.set_offset(newoffset);
920  io.set_data(newdata);
921  io.set_size(newsize);
922  }
923  else if (io.size() == 0)
924  {
925  front++;
926  }
927  else
928  {
929  output.push_back(io);
930  chunksize -= io.size();
931  front++;
932  }
933  }
934 }
935 
936 static void
937 consumeChunkBack(size_t front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
938 {
939  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD))
940  {
941  IOPosBuffer &io = input.back();
942  IOPosBuffer &outio = output.back();
943  if (io.size() > chunksize)
944  {
945  IOSize consumed;
946  if (output.size() && (outio.size() < XRD_CL_MAX_CHUNK) && (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset()))
947  {
948  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK)
949  {
950  consumed = (XRD_CL_MAX_CHUNK - outio.size());
951  outio.set_size(XRD_CL_MAX_CHUNK);
952  }
953  else
954  {
955  consumed = chunksize;
956  outio.set_size(outio.size() + consumed);
957  }
958  }
959  else
960  {
961  consumed = chunksize;
962  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
963  }
964  chunksize -= consumed;
965  IOSize newsize = io.size() - consumed;
966  IOOffset newoffset = io.offset() + consumed;
967  void* newdata = static_cast<char*>(io.data()) + consumed;
968  io.set_offset(newoffset);
969  io.set_data(newdata);
970  io.set_size(newsize);
971  }
972  else if (io.size() == 0)
973  {
974  input.pop_back();
975  }
976  else
977  {
978  output.push_back(io);
979  chunksize -= io.size();
980  input.pop_back();
981  }
982  }
983 }
984 
985 static IOSize validateList(const std::vector<IOPosBuffer> req)
986 {
987  IOSize total = 0;
988  off_t last_offset = -1;
989  for (const auto & it : req)
990  {
991  total += it.size();
992  assert(it.offset() > last_offset);
993  last_offset = it.offset();
994  assert(it.size() <= XRD_CL_MAX_CHUNK);
995  assert(it.offset() < 0x1ffffffffff);
996  }
997  assert(req.size() <= 1024);
998  return total;
999 }
1000 
1001 void
1002 XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist, std::vector<IOPosBuffer> &req1, std::vector<IOPosBuffer> &req2, std::vector<std::shared_ptr<Source>> const& activeSources) const
1003 {
1004  if (iolist.size() == 0) return;
1005  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
1006  req1.reserve(iolist.size()/2+1);
1007  req2.reserve(iolist.size()/2+1);
1008  size_t front=0;
1009 
1010  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
1011  float q1 = static_cast<float>(activeSources[0]->getQuality())+5;
1012  float q2 = static_cast<float>(activeSources[1]->getQuality())+5;
1013  IOSize chunk1, chunk2;
1014  // Make sure the chunk size is at least 1024; little point to reads less than that size.
1015  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
1016  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
1017 
1018  IOSize size_orig = 0;
1019  for (const auto & it : iolist) size_orig += it.size();
1020 
1021  while (tmp_iolist.size()-front > 0)
1022  {
1023  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) && (req2.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD))
1024  { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
1025  // are passed to the request manager. However, because we have a max chunk size, we increase
1026  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
1027  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
1028  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
1030  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name
1031  << "', flags=0x" << std::hex << m_flags
1032  << ", permissions=0" << std::oct << m_perms << std::dec
1033  << ") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
1034  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
1035  addConnections(ex);
1036  std::stringstream ss; ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
1037  ex.addAdditionalInfo(ss.str());
1038  std::stringstream ss2; ss2 << "Quality source 1 " << q1-5 << ", quality source 2: " << q2-5;
1039  ex.addAdditionalInfo(ss2.str());
1040  throw ex;
1041  }
1042  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
1043  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
1044  }
1045  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
1046  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
1047 
1048  IOSize size1 = validateList(req1);
1049  IOSize size2 = validateList(req2);
1050 
1051  assert(size_orig == size1 + size2);
1052 
1053  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig << " bytes) split into requests size " << req1.size() << " (" << size1 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
1054 }
1055 
1056 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager)
1057  : m_manager(manager)
1058 {
1059 }
1060 
1061 
1062  // Cannot use ~OpenHandler=default as XrdCl::File is not fully
1063  // defined in the header.
1065 {
1066 }
1067 
1068 
1069 void
1070 XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status_ptr, XrdCl::AnyObject *, XrdCl::HostList *hostList_ptr)
1071 {
1072  // NOTE: as in XrdCl::File (synchronous), we ignore the response object.
1073  // Make sure that we set m_outstanding_open to false on exit from this function.
1074  std::unique_ptr<char, std::function<void(char*)>> outstanding_guard(nullptr, [&](char*){m_outstanding_open=false;});
1075 
1076  std::shared_ptr<Source> source;
1077  std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
1078  std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1079 
1080  // Make sure we get rid of the strong self-reference when the callback finishes.
1081  std::shared_ptr<OpenHandler> self = m_self;
1082  m_self.reset();
1083 
1084  auto manager = m_manager.lock();
1085  // Manager object has already been deleted. Cleanup the
1086  // response objects, remove our self-reference, and ignore the response.
1087  if (!manager)
1088  {
1089  return;
1090  }
1091  //if we need to delete the File object we must do it outside
1092  // of the lock to avoid a potential deadlock
1093  std::unique_ptr<XrdCl::File> releaseFile;
1094  {
1095  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1096 
1097  if (status->IsOK())
1098  {
1099  SendMonitoringInfo(*m_file);
1100  timespec now;
1101  GET_CLOCK_MONOTONIC(now);
1102 
1103  std::string excludeString;
1104  Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1105 
1106  source.reset(new Source(now, std::move(m_file), excludeString));
1107  m_promise.set_value(source);
1108  }
1109  else
1110  {
1111  releaseFile = std::move(m_file);
1113  ex << "XrdCl::File::Open(name='" << manager->m_name
1114  << "', flags=0x" << std::hex << manager->m_flags
1115  << ", permissions=0" << std::oct << manager->m_perms << std::dec
1116  << ") => error '" << status->ToStr()
1117  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1118  ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1119  manager->addConnections(ex);
1120 
1121  m_promise.set_exception(std::make_exception_ptr(ex));
1122  }
1123  }
1124  manager->handleOpen(*status, source);
1125 }
1126 
1129 {
1130  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1131 
1132  if (!m_file.get())
1133  {
1134  return "(no open in progress)";
1135  }
1136  std::string dataServer;
1137  m_file->GetProperty("DataServer", dataServer);
1138  if (!dataServer.size()) { return "(unknown source)"; }
1139  return dataServer;
1140 }
1141 
1142 std::shared_future<std::shared_ptr<Source> >
1144 {
1145  auto manager_ptr = m_manager.lock();
1146  if (!manager_ptr)
1147  {
1149  ex << "XrdCl::File::Open() =>"
1150  << " error: OpenHandler called within an invalid RequestManager context."
1151  << " This is a logic error and should be reported to the CMSSW developers.";
1152  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1153  throw ex;
1154  }
1155  RequestManager &manager = *manager_ptr;
1156  auto self_ptr = m_self_weak.lock();
1157  if (!self_ptr)
1158  {
1160  ex << "XrdCl::File::Open() => error: "
1161  << "OpenHandler called after it was deleted. This is a logic error "
1162  << "and should be reported to the CMSSW developers.";
1163  ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1164  throw ex;
1165  }
1166 
1167  // NOTE NOTE: we look at this variable *without* the lock. This means the method
1168  // is not thread-safe; the caller is responsible to verify it is not called from
1169  // multiple threads simultaneously.
1170  //
1171  // This is done because ::open may be called from a Xrootd callback; if we
1172  // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1173  // and make a call into xrootd (when it invokes m_file.reset()). Hence, our callback
1174  // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1175  // an Xrootd mutex and tries to hold m_mutex. This is a classic deadlock.
1176  if (m_outstanding_open)
1177  {
1178  return m_shared_future;
1179  }
1180  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1181  std::promise<std::shared_ptr<Source> > new_promise;
1182  m_promise.swap(new_promise);
1183  m_shared_future = m_promise.get_future().share();
1184 
1185  auto opaque = manager.prepareOpaqueString();
1186  std::string new_name = manager.m_name + ((manager.m_name.find("?") == manager.m_name.npos) ? "?" : "&") + opaque;
1187  edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1188  m_file.reset(new XrdCl::File());
1189  m_outstanding_open = true;
1190 
1191  // Always make sure we release m_file and set m_outstanding_open to false on error.
1192  std::unique_ptr<char, std::function<void(char*)>> exit_guard(nullptr, [&](char*){m_outstanding_open = false; m_file.reset();});
1193 
1194  XrdCl::XRootDStatus status;
1195  if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK())
1196  {
1198  ex << "XrdCl::File::Open(name='" << new_name
1199  << "', flags=0x" << std::hex << manager.m_flags
1200  << ", permissions=0" << std::oct << manager.m_perms << std::dec
1201  << ") => error '" << status.ToStr()
1202  << "' (errno=" << status.errNo << ", code=" << status.code << ")";
1203  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1204  manager.addConnections(ex);
1205  throw ex;
1206  }
1207  exit_guard.release();
1208  // Have a strong self-reference for as long as the callback is in-progress.
1209  m_self = self_ptr;
1210  return m_shared_future;
1211 }
1212 
void setCurrentServer(const std::string &servername)
std::shared_future< std::shared_ptr< Source > > open()
double seconds()
void start()
Definition: CPUTimer.cc:74
std::shared_ptr< XrdCl::File > getActiveFile() const
void getDisabledSourceNames(std::vector< std::string > &sources) const
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
assert(m_qm.get())
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:307
std::string prepareOpaqueString() const
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
#define XRD_CL_MAX_CHUNK
OpenHandler(std::weak_ptr< RequestManager > manager)
#define nullptr
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
int timeout
Definition: mps_check.py:51
list diff
Definition: mps_update.py:85
double q2[4]
Definition: TauolaWrapper.h:88
SendMonitoringInfoHandler nullHandler
static std::string const input
Definition: EdmProvDump.cc:44
tuple s2
Definition: indexGen.py:106
#define XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static void SendMonitoringInfo(XrdCl::File &file)
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
#define likely(x)
void queueUpdateCurrentServer(const std::string &)
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
string URL
Definition: rrClient.py:4
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:255
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:266
std::atomic< std::string * > m_serverToAdvertise
def move
Definition: eostools.py:510
bool isAvailable() const
Definition: Service.h:46
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
XrdSiteStatisticsInformation * statsService
Definition: XrdSource.cc:219
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
Times stop()
Definition: CPUTimer.cc:94
void clearMessage()
Definition: Exception.cc:215
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void * data(void) const
Definition: IOPosBuffer.h:59
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void clearContext()
Definition: Exception.cc:219
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void getActiveSourceNames(std::vector< std::string > &sources) const
IOSize size(void) const
Definition: IOPosBuffer.h:64
XrdCl::OpenFlags::Flags m_flags
virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
double q1[4]
Definition: TauolaWrapper.h:87
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
double a
Definition: hdecay.h:121
std::shared_ptr< Source > pickSingleSource()
tuple filename
Definition: lut2db_cfg.py:20
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:344
static IOSize validateList(const std::vector< IOPosBuffer > req)
size_t IOSize
Definition: IOTypes.h:14
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
volatile std::atomic< bool > shutdown_flag false
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
static std::string const source
Definition: EdmProvDump.cc:43
void clearAdditionalInfo()
Definition: Exception.cc:223
std::recursive_mutex m_source_mutex
void initialize(std::weak_ptr< RequestManager > selfref)
tuple status
Definition: mps_update.py:57
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)