test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
XrdRequestManager.cc
Go to the documentation of this file.
1 
2 #include <assert.h>
3 #include <iostream>
4 #include <algorithm>
5 #include <netdb.h>
6 
7 #include "XrdCl/XrdClFile.hh"
8 #include "XrdCl/XrdClDefaultEnv.hh"
9 #include "XrdCl/XrdClFileSystem.hh"
10 
16 
17 #include "XrdStatistics.h"
19 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
20 
21 #define XRD_CL_MAX_CHUNK 512*1024
22 
23 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5
24 
25 #ifdef XRD_FAKE_OPEN_PROBE
26 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100
27 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20
28 // This is the minimal difference in quality required to swap an active and inactive source
29 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0
30 #else
31 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10
32 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2*60
33 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
34 #endif
35 
36 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000
37 
38 
39 #ifdef __MACH__
40 #include <mach/clock.h>
41 #include <mach/mach.h>
42 #define GET_CLOCK_MONOTONIC(ts) \
43 { \
44  clock_serv_t cclock; \
45  mach_timespec_t mts; \
46  host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
47  clock_get_time(cclock, &mts); \
48  mach_port_deallocate(mach_task_self(), cclock); \
49  ts.tv_sec = mts.tv_sec; \
50  ts.tv_nsec = mts.tv_nsec; \
51 }
52 #else
53 #define GET_CLOCK_MONOTONIC(ts) \
54  clock_gettime(CLOCK_MONOTONIC, &ts);
55 #endif
56 
57 using namespace XrdAdaptor;
58 
59 long long timeDiffMS(const timespec &a, const timespec &b)
60 {
61  long long diff = (a.tv_sec - b.tv_sec) * 1000;
62  diff += (a.tv_nsec - b.tv_nsec) / 1e6;
63  return diff;
64 }
65 
66 /*
67  * We do not care about the response of sending the monitoring information;
68  * this handler class simply frees any returned buffer to prevent memory leaks.
69  */
70 class SendMonitoringInfoHandler : boost::noncopyable, public XrdCl::ResponseHandler
71 {
72  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
73  {
74  if (response)
75  {
76  XrdCl::Buffer *buffer = nullptr;
77  response->Get(buffer);
78  delete buffer;
79  }
80  }
81 };
82 
84 
85 
86 static void
88 {
89  // Do not send this to a dCache data server as they return an error.
90  // In some versions of dCache, sending the monitoring information causes
91  // the server to close the connection - resulting in failures.
92  if (Source::isDCachePool(file)) {return;}
93 
94  // Send the monitoring info, if available.
96  std::string lastUrl;
97  file.GetProperty("LastURL", lastUrl);
98  if (jobId && lastUrl.size())
99  {
100  XrdCl::URL url(lastUrl);
101  XrdCl::FileSystem fs(url);
102  fs.SendInfo(jobId, &nullHandler, 30);
103  edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
104  }
105 }
106 
107 
109  : m_serverToAdvertise(nullptr),
110  m_timeout(XRD_DEFAULT_TIMEOUT),
111  m_nextInitialSourceToggle(false),
112  m_name(filename),
113  m_flags(flags),
114  m_perms(perms),
115  m_distribution(0,100),
116  m_excluded_active_count(0)
117 {
118 }
119 
120 
121 void
122 RequestManager::initialize(std::weak_ptr<RequestManager> self)
123 {
125 
126  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
127  if (env) {env->GetInt("StreamErrorWindow", m_timeout);}
128 
129  std::string orig_site;
130  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos))
131  {
132  std::string hostname;
133  if (Source::getHostname(orig_site, hostname))
134  {
135  Source::getDomain(hostname, orig_site);
136  }
137  }
138 
139  std::unique_ptr<XrdCl::File> file;
141  bool validFile = false;
142  const int retries = 5;
143  std::string excludeString;
144  for (int idx=0; idx<retries; idx++)
145  {
146  file.reset(new XrdCl::File());
147  auto opaque = prepareOpaqueString();
148  std::string new_filename = m_name + (opaque.size() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
149  SyncHostResponseHandler handler;
150  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
151  if (!openStatus.IsOK())
152  { // In this case, we failed immediately - this indicates we have previously tried to talk to this
153  // server and it was marked bad - xrootd couldn't even queue up the request internally!
154  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
155  // redirector being down or authentication failures.
156  ex.clearMessage();
157  ex.clearContext();
158  ex.clearAdditionalInfo();
159  ex << "XrdCl::File::Open(name='" << m_name
160  << "', flags=0x" << std::hex << m_flags
161  << ", permissions=0" << std::oct << m_perms << std::dec
162  << ") => error '" << openStatus.ToStr()
163  << "' (errno=" << openStatus.errNo << ", code=" << openStatus.code << ")";
164  ex.addContext("Calling XrdFile::open()");
165  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
166  throw ex;
167  }
168  handler.WaitForResponse();
169  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
170  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
171  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
172  assert(status);
173  if (status->IsOK())
174  {
175  validFile = true;
176  break;
177  }
178  else
179  {
180  ex.clearMessage();
181  ex.clearContext();
182  ex.clearAdditionalInfo();
183  ex << "XrdCl::File::Open(name='" << m_name
184  << "', flags=0x" << std::hex << m_flags
185  << ", permissions=0" << std::oct << m_perms << std::dec
186  << ") => error '" << status->ToStr()
187  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
188  ex.addContext("Calling XrdFile::open()");
189  addConnections(ex);
190  std::string dataServer, lastUrl;
191  file->GetProperty("DataServer", dataServer);
192  file->GetProperty("LastURL", lastUrl);
193  if (dataServer.size())
194  {
195  ex.addAdditionalInfo("Problematic data server: " + dataServer);
196  }
197  if (lastUrl.size())
198  {
199  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
200  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
201  }
202  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) != m_disabledSourceStrings.end())
203  {
204  ex << ". No additional data servers were found.";
205  throw ex;
206  }
207  if (dataServer.size())
208  {
209  m_disabledSourceStrings.insert(dataServer);
210  m_disabledExcludeStrings.insert(excludeString);
211  }
212  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
213  if (lastUrl == new_filename)
214  {
215  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
216  throw ex;
217  }
218  }
219  }
220  if (!validFile)
221  {
222  throw ex;
223  }
224  SendMonitoringInfo(*file);
225 
226  timespec ts;
228 
229  std::shared_ptr<Source> source(new Source(ts, std::move(file), excludeString));
230  {
231  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
232  m_activeSources.push_back(source);
233  updateSiteInfo(orig_site);
234  }
235  queueUpdateCurrentServer(source->ID());
237 
238  m_lastSourceCheck = ts;
239  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
241 }
242 
250 void
252 {
253  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
254  // a pending update. *However*, since we call this for every read, we'll get it
255  // eventually.
256  if (likely(!m_serverToAdvertise.load(std::memory_order_relaxed))) {return;}
257  std::string *hostname_ptr;
258  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr)))
259  {
260  std::unique_ptr<std::string> hostname(hostname_ptr);
262  if (statsService.isAvailable()) {
263  statsService->setCurrentServer(*hostname_ptr);
264  }
265  }
266 }
267 
268 
269 void
271 {
272  std::unique_ptr<std::string> hostname(new std::string(id));
273  if (Source::getHostname(id, *hostname))
274  {
275  std::string *null_hostname = nullptr;
276  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get()))
277  {
278  hostname.release();
279  }
280  }
281 }
282 
283 void
285 {
286  std::string siteA, siteB, siteList;
287  if (m_activeSources.size()) {siteA = m_activeSources[0]->Site();}
288  if (m_activeSources.size() == 2) {siteB = m_activeSources[1]->Site();}
289  siteList = siteA;
290  if (siteB.size() && (siteB != siteA)) {siteList = siteA + ", " + siteB;}
291  if (orig_site.size() && (orig_site != siteList))
292  {
293  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
294  m_activeSites = siteList;
295  }
296  else if (!orig_site.size() && (siteList != m_activeSites))
297  {
298  if (m_activeSites.size() >0 )
299  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << m_activeSites;
300  m_activeSites = siteList;
301  }
302 }
303 
304 
305 void
306 RequestManager::checkSources(timespec &now, IOSize requestSize)
307 {
308  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check "
309  << timeDiffMS(now, m_lastSourceCheck) << "; last check "
310  << m_lastSourceCheck.tv_sec << "; now " <<now.tv_sec
311  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
312  if (timeDiffMS(now, m_lastSourceCheck) > 1000)
313  {
314  { // Be more aggressive about getting rid of very bad sources.
315  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
316  compareSources(now, 0, 1);
317  compareSources(now, 1, 0);
318  }
319  if (timeDiffMS(now, m_nextActiveSourceCheck) > 0)
320  {
321  checkSourcesImpl(now, requestSize);
322  }
323  }
324 }
325 
326 
327 bool
328 RequestManager::compareSources(const timespec &now, unsigned a, unsigned b)
329 {
330  if (m_activeSources.size() < std::max(a, b)+1) {return false;}
331 
332  bool findNewSource = false;
333  if ((m_activeSources[a]->getQuality() > 5130) ||
334  ((m_activeSources[a]->getQuality() > 260) && (m_activeSources[b]->getQuality()*4 < m_activeSources[a]->getQuality())))
335  {
336  edm::LogVerbatim("XrdAdaptorInternal") << "Removing "
337  << m_activeSources[a]->PrettyID() << " from active sources due to poor quality ("
338  << m_activeSources[a]->getQuality() << " vs " << m_activeSources[b]->getQuality() << ")" << std::endl;
339  if (m_activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource = true;}
340  m_activeSources[a]->setLastDowngrade(now);
341  m_inactiveSources.emplace_back(m_activeSources[a]);
342  m_activeSources.erase(m_activeSources.begin()+a);
343  updateSiteInfo();
344  }
345  return findNewSource;
346 }
347 
348 void
350 {
351  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
352 
353  bool findNewSource = false;
354  if (m_activeSources.size() <= 1)
355  {
356  findNewSource = true;
357  }
358  else if (m_activeSources.size() > 1)
359  {
360  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << m_activeSources[0]->getQuality() << ", source 1 quality " << m_activeSources[1]->getQuality() << std::endl;
361  findNewSource |= compareSources(now, 0, 1);
362  findNewSource |= compareSources(now, 1, 0);
363 
364  // NOTE: We could probably replace the copy with a better sort function.
365  // However, there are typically very few sources and the correctness is more obvious right now.
366  std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(m_inactiveSources.size());
367  for (const auto & source : m_inactiveSources)
368  {
369  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY-1)*1000) {eligibleInactiveSources.push_back(source);}
370  }
371  std::vector<std::shared_ptr<Source> >::iterator bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
372  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
373  std::vector<std::shared_ptr<Source> >::iterator worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(),
374  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
375  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
376  {
377  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " <<(*bestInactiveSource)->PrettyID()
378  << ", quality " << (*bestInactiveSource)->getQuality();
379  }
380  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " <<(*worstActiveSource)->PrettyID()
381  << ", quality " << (*worstActiveSource)->getQuality();
382  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
383  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
384  if ((bestInactiveSource != eligibleInactiveSources.end()) && m_activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*m_activeSources[0]->getQuality()))
385  {
386  m_activeSources.push_back(*bestInactiveSource);
387  updateSiteInfo();
388  for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;}
389  }
390  else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
391  {
392  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << (*worstActiveSource)->PrettyID()
393  << " from active sources due to quality (" << (*worstActiveSource)->getQuality()
394  << ") and promoting " << (*bestInactiveSource)->PrettyID() << " (quality: "
395  << (*bestInactiveSource)->getQuality() << ")" << std::endl;
396  (*worstActiveSource)->setLastDowngrade(now);
397  for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;}
398  m_inactiveSources.emplace_back(std::move(*worstActiveSource));
399  m_activeSources.erase(worstActiveSource);
400  m_activeSources.emplace_back(std::move(*bestInactiveSource));
401  updateSiteInfo();
402  eligibleInactiveSources.clear();
403  for (const auto & source : m_inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source);
404  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
405  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
406  worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(),
407  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
408  }
409  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000*XRD_ADAPTOR_LONG_OPEN_DELAY))
410  {
411  float r = m_distribution(m_generator);
413  {
414  findNewSource = true;
415  }
416  }
417  }
418  if (findNewSource)
419  {
420  m_open_handler->open();
422  }
423 
424  // Only aggressively look for new sources if we don't have two.
425  if (m_activeSources.size() == 2)
426  {
428  }
429  else
430  {
431  now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
432  }
434 }
435 
436 std::shared_ptr<XrdCl::File>
438 {
439  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
440  if (m_activeSources.empty())
441  {
443  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name
444  << "', flags=0x" << std::hex << m_flags
445  << ", permissions=0" << std::oct << m_perms << std::dec
446  << ") => Source used after fatal exception.";
447  ex.addContext("In XrdAdaptor::RequestManager::handle()");
448  addConnections(ex);
449  throw ex;
450  }
451  return m_activeSources[0]->getFileHandle();
452 }
453 
454 void
455 RequestManager::getActiveSourceNames(std::vector<std::string> & sources)
456 {
457  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
458  sources.reserve(m_activeSources.size());
459  for (auto const& source : m_activeSources) {
460  sources.push_back(source->ID());
461  }
462 }
463 
464 void
465 RequestManager::getPrettyActiveSourceNames(std::vector<std::string> & sources)
466 {
467  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
468  sources.reserve(m_activeSources.size());
469  for (auto const& source : m_activeSources) {
470  sources.push_back(source->PrettyID());
471  }
472 }
473 
474 void
475 RequestManager::getDisabledSourceNames(std::vector<std::string> & sources)
476 {
477  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
478  sources.reserve(m_disabledSourceStrings.size());
479  for (auto const& source : m_disabledSourceStrings) {
480  sources.push_back(source);
481  }
482 }
483 
484 void
486 {
487  std::vector<std::string> sources;
489  for (auto const& source : sources)
490  {
491  ex.addAdditionalInfo("Active source: " + source);
492  }
493  sources.clear();
494  getDisabledSourceNames(sources);
495  for (auto const& source : sources)
496  {
497  ex.addAdditionalInfo("Disabled source: " + source);
498  }
499 }
500 
501 std::shared_ptr<Source>
503 {
504  std::shared_ptr<Source> source = nullptr;
505  {
506  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
507  if (m_activeSources.size() == 2)
508  {
510  {
511  source = m_activeSources[0];
513  }
514  else
515  {
516  source = m_activeSources[1];
518  }
519  }
520  else if (m_activeSources.empty())
521  {
523  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name
524  << "', flags=0x" << std::hex << m_flags
525  << ", permissions=0" << std::oct << m_perms << std::dec
526  << ") => Source used after fatal exception.";
527  ex.addContext("In XrdAdaptor::RequestManager::handle()");
528  addConnections(ex);
529  throw ex;
530  }
531  else
532  {
533  source = m_activeSources[0];
534  }
535  }
536  return source;
537 }
538 
539 std::future<IOSize>
540 RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr)
541 {
542  assert(c_ptr.get());
543  timespec now;
544  GET_CLOCK_MONOTONIC(now);
545  checkSources(now, c_ptr->getSize());
546 
547  std::shared_ptr<Source> source = pickSingleSource();
548  source->handle(c_ptr);
549  return c_ptr->get_future();
550 }
551 
554 {
555  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
556  std::stringstream ss;
557  ss << "tried=";
558  size_t count = 0;
559  for ( const auto & it : m_activeSources )
560  {
561  count++;
562  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
563  }
564  for ( const auto & it : m_inactiveSources )
565  {
566  count++;
567  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
568  }
569  for ( const auto & it : m_disabledExcludeStrings )
570  {
571  count++;
572  ss << it.substr(0, it.find(":")) << ",";
573  }
574  if (count)
575  {
576  std::string tmp_str = ss.str();
577  return tmp_str.substr(0, tmp_str.size()-1);
578  }
579  return "";
580 }
581 
582 void
583 XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source)
584 {
585  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
586  if (status.IsOK())
587  {
588  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
589  for (const auto & s : m_activeSources)
590  {
591  if (source->ID() == s->ID())
592  {
593  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded source " << source->PrettyID()
594  << "; ignoring" << std::endl;
595  unsigned returned_count = ++m_excluded_active_count;
596  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
597  if (returned_count >= 3) {m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2*XRD_ADAPTOR_SHORT_OPEN_DELAY;}
598  return;
599  }
600  }
601  for (const auto & s : m_inactiveSources)
602  {
603  if (source->ID() == s->ID())
604  {
605  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded inactive source " << source->PrettyID()
606  << "; ignoring" << std::endl;
607  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
608  return;
609  }
610  }
611  if (m_activeSources.size() < 2)
612  {
613  m_activeSources.push_back(source);
614  updateSiteInfo();
615  queueUpdateCurrentServer(source->ID());
616  }
617  else
618  {
619  m_inactiveSources.push_back(source);
620  }
621  }
622  else
623  { // File-open failure - wait at least 120s before next attempt.
624  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
625  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
626  }
627 }
628 
629 std::future<IOSize>
630 XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer> > iolist)
631 {
632  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
633  updateCurrentServer();
634 
635  timespec now;
636  GET_CLOCK_MONOTONIC(now);
637 
638  edm::CPUTimer timer;
639  timer.start();
640 
641  if (m_activeSources.size() == 1)
642  {
643  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
644  checkSources(now, c_ptr->getSize());
645  m_activeSources[0]->handle(c_ptr);
646  return c_ptr->get_future();
647  }
648  // Make sure active
649  else if (m_activeSources.empty())
650  {
652  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name
653  << "', flags=0x" << std::hex << m_flags
654  << ", permissions=0" << std::oct << m_perms << std::dec
655  << ") => Source used after fatal exception.";
656  ex.addContext("In XrdAdaptor::RequestManager::handle()");
657  addConnections(ex);
658  throw ex;
659  }
660 
661  assert(iolist.get());
662  std::shared_ptr<std::vector<IOPosBuffer> > req1(new std::vector<IOPosBuffer>);
663  std::shared_ptr<std::vector<IOPosBuffer> > req2(new std::vector<IOPosBuffer>);
664  splitClientRequest(*iolist, *req1, *req2);
665 
666  checkSources(now, req1->size() + req2->size());
667  // CheckSources may have removed a source
668  if (m_activeSources.size() == 1)
669  {
670  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
671  m_activeSources[0]->handle(c_ptr);
672  return c_ptr->get_future();
673  }
674 
675  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
676  std::future<IOSize> future1, future2;
677  if (req1->size())
678  {
679  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
680  m_activeSources[0]->handle(c_ptr1);
681  future1 = c_ptr1->get_future();
682  }
683  if (req2->size())
684  {
685  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
686  m_activeSources[1]->handle(c_ptr2);
687  future2 = c_ptr2->get_future();
688  }
689  if (req1->size() && req2->size())
690  {
691  std::future<IOSize> task = std::async(std::launch::deferred,
692  [](std::future<IOSize> a, std::future<IOSize> b){
693  // Wait until *both* results are available. This is essential
694  // as the callback may try referencing the RequestManager. If one
695  // throws an exception (causing the RequestManager to be destroyed by
696  // XrdFile) and the other has a failure, then the recovery code will
697  // reference the destroyed RequestManager.
698  //
699  // Unlike other places where we use shared/weak ptrs to maintain object
700  // lifetime and destruction asynchronously, we *cannot* destroy the request
701  // asynchronously as it is associated with a ROOT buffer. We must wait until we
702  // are guaranteed that XrdCl will not write into the ROOT buffer before we
703  // can return.
704  b.wait(); a.wait();
705  return b.get() + a.get();
706  },
707  std::move(future1),
708  std::move(future2));
709  timer.stop();
710  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
711  return task;
712  }
713  else if (req1->size()) { return future1; }
714  else if (req2->size()) { return future2; }
715  else
716  { // Degenerate case - no bytes to read.
717  std::promise<IOSize> p; p.set_value(0);
718  return p.get_future();
719  }
720 }
721 
722 void
723 RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status)
724 {
725  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
726  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
727 
728  // Fail early for invalid responses - XrdFile has a separate path for handling this.
729  if (c_status.code == XrdCl::errInvalidResponse)
730  {
731  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
733  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name
734  << "', flags=0x" << std::hex << m_flags
735  << ", permissions=0" << std::oct << m_perms << std::dec
736  << ", old source=" << source_ptr->PrettyID()
737  << ") => Invalid ReadV response from server";
738  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
739  addConnections(ex);
740  throw ex;
741  }
742  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
743 
744  // Note that we do not delete the Source itself. That is because this
745  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
746  // In such a case, if you close a file in the handler, it will deadlock
747  m_disabledSourceStrings.insert(source_ptr->ID());
748  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
749  m_disabledSources.insert(source_ptr);
750 
751  if ((m_activeSources.size() > 0) && (m_activeSources[0].get() == source_ptr.get()))
752  {
753  m_activeSources.erase(m_activeSources.begin());
754  updateSiteInfo();
755  }
756  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
757  {
758  m_activeSources.erase(m_activeSources.begin()+1);
759  updateSiteInfo();
760  }
761  std::shared_ptr<Source> new_source;
762  if (m_activeSources.size() == 0)
763  {
764  std::shared_future<std::shared_ptr<Source> > future = m_open_handler->open();
765  timespec now;
766  GET_CLOCK_MONOTONIC(now);
768  // Note we only wait for 180 seconds here. This is because we've already failed
769  // once and the likelihood the program has some inconsistent state is decent.
770  // We'd much rather fail hard than deadlock!
771  sentry.unlock();
772  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout+10));
773  if (status == std::future_status::timeout)
774  {
776  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
777  << "', flags=0x" << std::hex << m_flags
778  << ", permissions=0" << std::oct << m_perms << std::dec
779  << ", old source=" << source_ptr->PrettyID()
780  << ") => timeout when waiting for file open";
781  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
782  addConnections(ex);
783  throw ex;
784  }
785  else
786  {
787  try
788  {
789  new_source = future.get();
790  }
791  catch (edm::Exception &ex)
792  {
793  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
794  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
795  throw;
796  }
797  }
798  sentry.lock();
799 
800  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) != m_disabledSourceStrings.end())
801  {
802  // The server gave us back a data node we requested excluded. Fatal!
804  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
805  << "', flags=0x" << std::hex << m_flags
806  << ", permissions=0" << std::oct << m_perms << std::dec
807  << ", old source=" << source_ptr->PrettyID()
808  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
809  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
810  addConnections(ex);
811  throw ex;
812  }
813  m_activeSources.push_back(new_source);
814  updateSiteInfo();
815  }
816  else
817  {
818  new_source = m_activeSources[0];
819  }
820  new_source->handle(c_ptr);
821 }
822 
823 static void
824 consumeChunkFront(size_t &front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
825 {
826  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD))
827  {
828  IOPosBuffer &io = input[front];
829  IOPosBuffer &outio = output.back();
830  if (io.size() > chunksize)
831  {
832  IOSize consumed;
833  if (output.size() && (outio.size() < XRD_CL_MAX_CHUNK) && (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset()))
834  {
835  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK)
836  {
837  consumed = (XRD_CL_MAX_CHUNK - outio.size());
838  outio.set_size(XRD_CL_MAX_CHUNK);
839  }
840  else
841  {
842  consumed = chunksize;
843  outio.set_size(outio.size() + consumed);
844  }
845  }
846  else
847  {
848  consumed = chunksize;
849  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
850  }
851  chunksize -= consumed;
852  IOSize newsize = io.size() - consumed;
853  IOOffset newoffset = io.offset() + consumed;
854  void* newdata = static_cast<char*>(io.data()) + consumed;
855  io.set_offset(newoffset);
856  io.set_data(newdata);
857  io.set_size(newsize);
858  }
859  else if (io.size() == 0)
860  {
861  front++;
862  }
863  else
864  {
865  output.push_back(io);
866  chunksize -= io.size();
867  front++;
868  }
869  }
870 }
871 
872 static void
873 consumeChunkBack(size_t front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
874 {
875  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD))
876  {
877  IOPosBuffer &io = input.back();
878  IOPosBuffer &outio = output.back();
879  if (io.size() > chunksize)
880  {
881  IOSize consumed;
882  if (output.size() && (outio.size() < XRD_CL_MAX_CHUNK) && (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset()))
883  {
884  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK)
885  {
886  consumed = (XRD_CL_MAX_CHUNK - outio.size());
887  outio.set_size(XRD_CL_MAX_CHUNK);
888  }
889  else
890  {
891  consumed = chunksize;
892  outio.set_size(outio.size() + consumed);
893  }
894  }
895  else
896  {
897  consumed = chunksize;
898  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
899  }
900  chunksize -= consumed;
901  IOSize newsize = io.size() - consumed;
902  IOOffset newoffset = io.offset() + consumed;
903  void* newdata = static_cast<char*>(io.data()) + consumed;
904  io.set_offset(newoffset);
905  io.set_data(newdata);
906  io.set_size(newsize);
907  }
908  else if (io.size() == 0)
909  {
910  input.pop_back();
911  }
912  else
913  {
914  output.push_back(io);
915  chunksize -= io.size();
916  input.pop_back();
917  }
918  }
919 }
920 
921 static IOSize validateList(const std::vector<IOPosBuffer> req)
922 {
923  IOSize total = 0;
924  off_t last_offset = -1;
925  for (const auto & it : req)
926  {
927  total += it.size();
928  assert(it.offset() > last_offset);
929  last_offset = it.offset();
930  assert(it.size() <= XRD_CL_MAX_CHUNK);
931  assert(it.offset() < 0x1ffffffffff);
932  }
933  assert(req.size() <= 1024);
934  return total;
935 }
936 
937 void
938 XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist, std::vector<IOPosBuffer> &req1, std::vector<IOPosBuffer> &req2)
939 {
940  if (iolist.size() == 0) return;
941  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
942  req1.reserve(iolist.size()/2+1);
943  req2.reserve(iolist.size()/2+1);
944  size_t front=0;
945 
946  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
947  float q1 = static_cast<float>(m_activeSources[0]->getQuality())+5;
948  float q2 = static_cast<float>(m_activeSources[1]->getQuality())+5;
949  IOSize chunk1, chunk2;
950  // Make sure the chunk size is at least 1024; little point to reads less than that size.
951  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
952  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
953 
954  IOSize size_orig = 0;
955  for (const auto & it : iolist) size_orig += it.size();
956 
957  while (tmp_iolist.size()-front > 0)
958  {
959  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) && (req2.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD))
960  { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
961  // are passed to the request manager. However, because we have a max chunk size, we increase
962  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
963  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
964  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
966  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name
967  << "', flags=0x" << std::hex << m_flags
968  << ", permissions=0" << std::oct << m_perms << std::dec
969  << ") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
970  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
971  addConnections(ex);
972  std::stringstream ss; ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
973  ex.addAdditionalInfo(ss.str());
974  std::stringstream ss2; ss2 << "Quality source 1 " << q1-5 << ", quality source 2: " << q2-5;
975  ex.addAdditionalInfo(ss2.str());
976  throw ex;
977  }
978  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
979  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
980  }
981  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
982  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
983 
984  IOSize size1 = validateList(req1);
985  IOSize size2 = validateList(req2);
986 
987  assert(size_orig == size1 + size2);
988 
989  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig << " bytes) split into requests size " << req1.size() << " (" << size1 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
990 }
991 
992 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager)
993  : m_manager(manager)
994 {
995 }
996 
997 
998  // Cannot use ~OpenHandler=default as XrdCl::File is not fully
999  // defined in the header.
1001 {
1002 }
1003 
1004 
1005 void
1006 XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status_ptr, XrdCl::AnyObject *, XrdCl::HostList *hostList_ptr)
1007 {
1008  std::shared_ptr<Source> source;
1009  std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
1010  std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1011 
1012  // Make sure we get rid of the strong self-reference when the callback finishes.
1013  std::shared_ptr<OpenHandler> self = m_self;
1014  m_self.reset();
1015 
1016  auto manager = m_manager.lock();
1017  // Manager object has already been deleted. Cleanup the
1018  // response objects, remove our self-reference, and ignore the response.
1019  if (!manager)
1020  {
1021  return;
1022  }
1023  //if we need to delete the File object we must do it outside
1024  // of the lock to avoid a potential deadlock
1025  std::unique_ptr<XrdCl::File> releaseFile;
1026  {
1027  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1028 
1029  if (status->IsOK())
1030  {
1031  SendMonitoringInfo(*m_file);
1032  timespec now;
1033  GET_CLOCK_MONOTONIC(now);
1034 
1035  std::string excludeString;
1036  Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1037 
1038  source.reset(new Source(now, std::move(m_file), excludeString));
1039  m_promise.set_value(source);
1040  }
1041  else
1042  {
1043  releaseFile = std::move(m_file);
1045  ex << "XrdCl::File::Open(name='" << manager->m_name
1046  << "', flags=0x" << std::hex << manager->m_flags
1047  << ", permissions=0" << std::oct << manager->m_perms << std::dec
1048  << ") => error '" << status->ToStr()
1049  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1050  ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1051  manager->addConnections(ex);
1052 
1053  m_promise.set_exception(std::make_exception_ptr(ex));
1054  }
1055  }
1056  manager->handleOpen(*status, source);
1057 }
1058 
1061 {
1062  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1063 
1064  if (!m_file.get())
1065  {
1066  return "(no open in progress)";
1067  }
1068  std::string dataServer;
1069  m_file->GetProperty("DataServer", dataServer);
1070  if (!dataServer.size()) { return "(unknown source)"; }
1071  return dataServer;
1072 }
1073 
1074 std::shared_future<std::shared_ptr<Source> >
1076 {
1077  auto manager_ptr = m_manager.lock();
1078  if (!manager_ptr)
1079  {
1081  ex << "XrdCl::File::Open() =>"
1082  << " error: OpenHandler called within an invalid RequestManager context."
1083  << " This is a logic error and should be reported to the CMSSW developers.";
1084  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1085  throw ex;
1086  }
1087  RequestManager &manager = *manager_ptr;
1088  auto self_ptr = m_self_weak.lock();
1089  if (!self_ptr)
1090  {
1092  ex << "XrdCl::File::Open() => error: "
1093  << "OpenHandler called after it was deleted. This is a logic error "
1094  << "and should be reported to the CMSSW developers.";
1095  ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1096  throw ex;
1097  }
1098 
1099  // NOTE NOTE: we look at this variable *without* the lock. This means the method
1100  // is not thread-safe; the caller is responsible to verify it is not called from
1101  // multiple threads simultaneously.
1102  //
1103  // This is done because ::open may be called from a Xrootd callback; if we
1104  // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1105  // and make a call into xrootd (when it invokes m_file.reset()). Hence, our callback
1106  // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1107  // an Xrootd mutex and tries to hold m_mutex. This is a classic deadlock.
1108  if (m_file.get())
1109  {
1110  return m_shared_future;
1111  }
1112  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1113  std::promise<std::shared_ptr<Source> > new_promise;
1114  m_promise.swap(new_promise);
1115  m_shared_future = m_promise.get_future().share();
1116 
1117  auto opaque = manager.prepareOpaqueString();
1118  std::string new_name = manager.m_name + ((manager.m_name.find("?") == manager.m_name.npos) ? "?" : "&") + opaque;
1119  edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1120  m_file.reset(new XrdCl::File());
1121  XrdCl::XRootDStatus status;
1122  if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK())
1123  {
1125  ex << "XrdCl::File::Open(name='" << new_name
1126  << "', flags=0x" << std::hex << manager.m_flags
1127  << ", permissions=0" << std::oct << manager.m_perms << std::dec
1128  << ") => error '" << status.ToStr()
1129  << "' (errno=" << status.errNo << ", code=" << status.code << ")";
1130  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1131  manager.addConnections(ex);
1132  throw ex;
1133  }
1134  // Have a strong self-reference for as long as the callback is in-progress.
1135  m_self = self_ptr;
1136  return m_shared_future;
1137 }
1138 
void setCurrentServer(const std::string &servername)
std::shared_future< std::shared_ptr< Source > > open()
double seconds()
void start()
Definition: CPUTimer.cc:74
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
assert(m_qm.get())
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:196
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
void updateSiteInfo(std::string orig_site="")
#define XRD_CL_MAX_CHUNK
std::set< std::string > m_disabledSourceStrings
OpenHandler(std::weak_ptr< RequestManager > manager)
#define nullptr
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
double q2[4]
Definition: TauolaWrapper.h:88
SendMonitoringInfoHandler nullHandler
static std::string const input
Definition: EdmProvDump.cc:43
tuple s2
Definition: indexGen.py:106
bool compareSources(const timespec &now, unsigned a, unsigned b)
#define XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static void SendMonitoringInfo(XrdCl::File &file)
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
#define likely(x)
void queueUpdateCurrentServer(const std::string &)
void addConnections(cms::Exception &)
long long timeDiffMS(const timespec &a, const timespec &b)
std::set< std::shared_ptr< Source > > m_disabledSources
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:144
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:155
std::atomic< std::string * > m_serverToAdvertise
def move
Definition: eostools.py:510
bool isAvailable() const
Definition: Service.h:46
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
XrdSiteStatisticsInformation * statsService
Definition: XrdSource.cc:108
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
Times stop()
Definition: CPUTimer.cc:94
std::set< std::string > m_disabledExcludeStrings
std::shared_ptr< XrdCl::File > getActiveFile()
void clearMessage()
Definition: Exception.cc:215
std::shared_ptr< OpenHandler > m_open_handler
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void * data(void) const
Definition: IOPosBuffer.h:59
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void clearContext()
Definition: Exception.cc:219
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
IOSize size(void) const
Definition: IOPosBuffer.h:64
XrdCl::OpenFlags::Flags m_flags
virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
double q1[4]
Definition: TauolaWrapper.h:87
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
void getActiveSourceNames(std::vector< std::string > &sources)
void checkSources(timespec &now, IOSize requestSize)
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
void checkSourcesImpl(timespec &now, IOSize requestSize)
double a
Definition: hdecay.h:121
std::shared_ptr< Source > pickSingleSource()
tuple filename
Definition: lut2db_cfg.py:20
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:233
void getDisabledSourceNames(std::vector< std::string > &sources)
static IOSize validateList(const std::vector< IOPosBuffer > req)
void getPrettyActiveSourceNames(std::vector< std::string > &sources)
size_t IOSize
Definition: IOTypes.h:14
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
volatile std::atomic< bool > shutdown_flag false
tuple status
Definition: ntuplemaker.py:245
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2)
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:116
static std::string const source
Definition: EdmProvDump.cc:42
void clearAdditionalInfo()
Definition: Exception.cc:223
std::recursive_mutex m_source_mutex
void initialize(std::weak_ptr< RequestManager > selfref)