CMS 3D CMS Logo

XrdRequestManager.cc
Go to the documentation of this file.
1 
2 #include <algorithm>
3 #include <cassert>
4 #include <iostream>
5 #include <memory>
6 
7 #include <netdb.h>
8 
9 #include "XrdCl/XrdClFile.hh"
10 #include "XrdCl/XrdClDefaultEnv.hh"
11 #include "XrdCl/XrdClFileSystem.hh"
12 
20 
21 #include "XrdStatistics.h"
23 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
24 
25 #define XRD_CL_MAX_CHUNK 512 * 1024
26 
27 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5
28 
29 #ifdef XRD_FAKE_OPEN_PROBE
30 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100
31 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20
32 // This is the minimal difference in quality required to swap an active and inactive source
33 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0
34 #else
35 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10
36 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2 * 60
37 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
38 #endif
39 
40 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000
41 
42 #ifdef __MACH__
43 #include <mach/clock.h>
44 #include <mach/mach.h>
45 #define GET_CLOCK_MONOTONIC(ts) \
46  { \
47  clock_serv_t cclock; \
48  mach_timespec_t mts; \
49  host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
50  clock_get_time(cclock, &mts); \
51  mach_port_deallocate(mach_task_self(), cclock); \
52  ts.tv_sec = mts.tv_sec; \
53  ts.tv_nsec = mts.tv_nsec; \
54  }
55 #else
56 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
57 #endif
58 
59 using namespace XrdAdaptor;
60 
61 long long timeDiffMS(const timespec &a, const timespec &b) {
62  long long diff = (a.tv_sec - b.tv_sec) * 1000;
63  diff += (a.tv_nsec - b.tv_nsec) / 1e6;
64  return diff;
65 }
66 
67 /*
68  * We do not care about the response of sending the monitoring information;
69  * this handler class simply frees any returned buffer to prevent memory leaks.
70  */
71 class SendMonitoringInfoHandler : public XrdCl::ResponseHandler {
72  void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
73  if (response) {
74  XrdCl::Buffer *buffer = nullptr;
75  response->Get(buffer);
76  response->Set(static_cast<int *>(nullptr));
77  delete buffer;
78  }
79  // Send Info has a response object; we must delete it.
80  delete response;
81  delete status;
82  }
83 
84 public:
86  SendMonitoringInfoHandler &operator=(const SendMonitoringInfoHandler &) = delete;
87  SendMonitoringInfoHandler() = default;
88 };
89 
91 
93  // Do not send this to a dCache data server as they return an error.
94  // In some versions of dCache, sending the monitoring information causes
95  // the server to close the connection - resulting in failures.
97  return;
98  }
99 
100  // Send the monitoring info, if available.
102  std::string lastUrl;
103  file.GetProperty("LastURL", lastUrl);
104  if (jobId && !lastUrl.empty()) {
105  XrdCl::URL url(lastUrl);
106  XrdCl::FileSystem fs(url);
107  if (!(fs.SendInfo(jobId, &nullHandler, 30).IsOK())) {
108  edm::LogWarning("XrdAdaptorInternal")
109  << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
110  }
111  edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
112  }
113 }
114 
116  : m_serverToAdvertise(nullptr),
117  m_timeout(XRD_DEFAULT_TIMEOUT),
118  m_nextInitialSourceToggle(false),
119  m_name(filename),
120  m_flags(flags),
121  m_perms(perms),
122  m_distribution(0, 100),
123  m_excluded_active_count(0) {}
124 
125 void RequestManager::initialize(std::weak_ptr<RequestManager> self) {
127 
128  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
129  if (env) {
130  env->GetInt("StreamErrorWindow", m_timeout);
131  }
132 
133  std::string orig_site;
134  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find('.') == std::string::npos)) {
135  std::string hostname;
136  if (Source::getHostname(orig_site, hostname)) {
137  Source::getDomain(hostname, orig_site);
138  }
139  }
140 
141  std::unique_ptr<XrdCl::File> file;
143  bool validFile = false;
144  const int retries = 5;
145  std::string excludeString;
146  for (int idx = 0; idx < retries; idx++) {
147  file = std::make_unique<XrdCl::File>();
148  auto opaque = prepareOpaqueString();
149  std::string new_filename =
150  m_name + (!opaque.empty() ? ((m_name.find('?') == m_name.npos) ? "?" : "&") + opaque : "");
151  SyncHostResponseHandler handler;
152  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
153  if (!openStatus
154  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
155  // server and it was marked bad - xrootd couldn't even queue up the request internally!
156  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
157  // redirector being down or authentication failures.
158  ex.clearMessage();
159  ex.clearContext();
160  ex.clearAdditionalInfo();
161  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
162  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
163  << ", code=" << openStatus.code << ")";
164  ex.addContext("Calling XrdFile::open()");
165  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
166  throw ex;
167  }
168  handler.WaitForResponse();
169  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
170  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
171  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
172  assert(status);
173  if (status->IsOK()) {
174  validFile = true;
175  break;
176  } else {
177  ex.clearMessage();
178  ex.clearContext();
179  ex.clearAdditionalInfo();
180  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
181  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
182  << ", code=" << status->code << ")";
183  ex.addContext("Calling XrdFile::open()");
184  addConnections(ex);
185  std::string dataServer, lastUrl;
186  file->GetProperty("DataServer", dataServer);
187  file->GetProperty("LastURL", lastUrl);
188  if (!dataServer.empty()) {
189  ex.addAdditionalInfo("Problematic data server: " + dataServer);
190  }
191  if (!lastUrl.empty()) {
192  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
193  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
194  }
195  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
196  m_disabledSourceStrings.end()) {
197  ex << ". No additional data servers were found.";
198  throw ex;
199  }
200  if (!dataServer.empty()) {
201  m_disabledSourceStrings.insert(dataServer);
202  m_disabledExcludeStrings.insert(excludeString);
203  }
204  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
205  if (lastUrl == new_filename) {
206  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
207  throw ex;
208  }
209  }
210  }
211  if (!validFile) {
212  throw ex;
213  }
215 
216  timespec ts;
218 
219  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
220  {
221  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
222  auto oldList = m_activeSources;
223  m_activeSources.push_back(source);
225  }
228 
229  m_lastSourceCheck = ts;
230  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
232 }
233 
242  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
243  // a pending update. *However*, since we call this for every read, we'll get it
244  // eventually.
245  if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
246  return;
247  }
248  std::string *hostname_ptr;
249  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
250  std::unique_ptr<std::string> hostname(hostname_ptr);
252  if (statsService.isAvailable()) {
253  statsService->setCurrentServer(*hostname_ptr);
254  }
255  }
256 }
257 
259  auto hostname = std::make_unique<std::string>(id);
260  if (Source::getHostname(id, *hostname)) {
261  std::string *null_hostname = nullptr;
262  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
263  hostname.release();
264  }
265  }
266 }
267 
268 namespace {
269  std::string formatSites(std::vector<std::shared_ptr<Source>> const &iSources) {
270  std::string siteA, siteB;
271  if (!iSources.empty()) {
272  siteA = iSources[0]->Site();
273  }
274  if (iSources.size() == 2) {
275  siteB = iSources[1]->Site();
276  }
277  std::string siteList = siteA;
278  if (!siteB.empty() && (siteB != siteA)) {
279  siteList = siteA + ", " + siteB;
280  }
281  return siteList;
282  }
283 } // namespace
284 
285 void RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
286  std::vector<std::shared_ptr<Source>> const &iNew,
287  std::string orig_site) const {
288  auto siteList = formatSites(iNew);
289  if (!orig_site.empty() && (orig_site != siteList)) {
290  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
291  } else {
292  auto oldSites = formatSites(iOld);
293  if (orig_site.empty() && (siteList != oldSites)) {
294  if (!oldSites.empty())
295  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
296  }
297  }
298 }
299 
301  IOSize requestSize,
302  std::vector<std::shared_ptr<Source>> &activeSources,
303  std::vector<std::shared_ptr<Source>> &inactiveSources) {
304  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
305  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
306  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
307  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
308  { // Be more aggressive about getting rid of very bad sources.
309  compareSources(now, 0, 1, activeSources, inactiveSources);
310  compareSources(now, 1, 0, activeSources, inactiveSources);
311  }
313  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
314  }
315  }
316 }
317 
318 bool RequestManager::compareSources(const timespec &now,
319  unsigned a,
320  unsigned b,
321  std::vector<std::shared_ptr<Source>> &activeSources,
322  std::vector<std::shared_ptr<Source>> &inactiveSources) const {
323  if (activeSources.size() < std::max(a, b) + 1) {
324  return false;
325  }
326 
327  bool findNewSource = false;
328  if ((activeSources[a]->getQuality() > 5130) ||
329  ((activeSources[a]->getQuality() > 260) &&
330  (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
331  edm::LogVerbatim("XrdAdaptorInternal")
332  << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
333  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
334  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
335  findNewSource = true;
336  }
337  activeSources[a]->setLastDowngrade(now);
338  inactiveSources.emplace_back(activeSources[a]);
339  auto oldSources = activeSources;
340  activeSources.erase(activeSources.begin() + a);
341  reportSiteChange(oldSources, activeSources);
342  }
343  return findNewSource;
344 }
345 
347  IOSize requestSize,
348  std::vector<std::shared_ptr<Source>> &activeSources,
349  std::vector<std::shared_ptr<Source>> &inactiveSources) {
350  bool findNewSource = false;
351  if (activeSources.size() <= 1) {
352  findNewSource = true;
353  } else if (activeSources.size() > 1) {
354  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
355  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
356  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
357  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
358 
359  // NOTE: We could probably replace the copy with a better sort function.
360  // However, there are typically very few sources and the correctness is more obvious right now.
361  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
362  eligibleInactiveSources.reserve(inactiveSources.size());
363  for (const auto &source : inactiveSources) {
364  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
365  eligibleInactiveSources.push_back(source);
366  }
367  }
368  auto bestInactiveSource =
369  std::min_element(eligibleInactiveSources.begin(),
370  eligibleInactiveSources.end(),
371  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
372  return s1->getQuality() < s2->getQuality();
373  });
374  auto worstActiveSource = std::max_element(activeSources.cbegin(),
375  activeSources.cend(),
376  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
377  return s1->getQuality() < s2->getQuality();
378  });
379  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
380  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
381  << ", quality " << (*bestInactiveSource)->getQuality();
382  }
383  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
384  << ", quality " << (*worstActiveSource)->getQuality();
385  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
386  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
387  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
388  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
389  auto oldSources = activeSources;
390  activeSources.push_back(*bestInactiveSource);
391  reportSiteChange(oldSources, activeSources);
392  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
393  if (it->get() == bestInactiveSource->get()) {
394  inactiveSources.erase(it);
395  break;
396  }
397  } else
398  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
399  (*worstActiveSource)->getQuality() >
400  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
401  edm::LogVerbatim("XrdAdaptorInternal")
402  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
403  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
404  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
405  (*worstActiveSource)->setLastDowngrade(now);
406  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
407  if (it->get() == bestInactiveSource->get()) {
408  inactiveSources.erase(it);
409  break;
410  }
411  inactiveSources.emplace_back(*worstActiveSource);
412  auto oldSources = activeSources;
413  activeSources.erase(worstActiveSource);
414  activeSources.emplace_back(std::move(*bestInactiveSource));
415  reportSiteChange(oldSources, activeSources);
416  eligibleInactiveSources.clear();
417  for (const auto &source : inactiveSources)
418  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
419  eligibleInactiveSources.push_back(source);
420  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
421  eligibleInactiveSources.end(),
422  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
423  return s1->getQuality() < s2->getQuality();
424  });
425  worstActiveSource = std::max_element(activeSources.begin(),
426  activeSources.end(),
427  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
428  return s1->getQuality() < s2->getQuality();
429  });
430  }
431  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
432  float r = m_distribution(m_generator);
434  findNewSource = true;
435  }
436  }
437  }
438  if (findNewSource) {
439  m_open_handler->open();
441  }
442 
443  // Only aggressively look for new sources if we don't have two.
444  if (activeSources.size() == 2) {
446  } else {
448  }
450 }
451 
452 std::shared_ptr<XrdCl::File> RequestManager::getActiveFile() const {
453  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
454  if (m_activeSources.empty()) {
456  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
457  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
458  ex.addContext("In XrdAdaptor::RequestManager::handle()");
459  addConnections(ex);
460  throw ex;
461  }
462  return m_activeSources[0]->getFileHandle();
463 }
464 
465 void RequestManager::getActiveSourceNames(std::vector<std::string> &sources) const {
466  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
467  sources.reserve(m_activeSources.size());
468  for (auto const &source : m_activeSources) {
469  sources.push_back(source->ID());
470  }
471 }
472 
473 void RequestManager::getPrettyActiveSourceNames(std::vector<std::string> &sources) const {
474  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
475  sources.reserve(m_activeSources.size());
476  for (auto const &source : m_activeSources) {
477  sources.push_back(source->PrettyID());
478  }
479 }
480 
481 void RequestManager::getDisabledSourceNames(std::vector<std::string> &sources) const {
482  sources.reserve(m_disabledSourceStrings.size());
483  for (auto const &source : m_disabledSourceStrings) {
484  sources.push_back(source);
485  }
486 }
487 
489  std::vector<std::string> sources;
491  for (auto const &source : sources) {
492  ex.addAdditionalInfo("Active source: " + source);
493  }
494  sources.clear();
496  for (auto const &source : sources) {
497  ex.addAdditionalInfo("Disabled source: " + source);
498  }
499 }
500 
501 std::shared_ptr<Source> RequestManager::pickSingleSource() {
502  std::shared_ptr<Source> source = nullptr;
503  {
504  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
505  if (m_activeSources.size() == 2) {
507  source = m_activeSources[0];
509  } else {
510  source = m_activeSources[1];
512  }
513  } else if (m_activeSources.empty()) {
515  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
516  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
517  ex.addContext("In XrdAdaptor::RequestManager::handle()");
518  addConnections(ex);
519  throw ex;
520  } else {
521  source = m_activeSources[0];
522  }
523  }
524  return source;
525 }
526 
527 std::future<IOSize> RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr) {
528  assert(c_ptr.get());
529  timespec now;
531  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
532  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
533  {
534  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
536  inactiveSources = m_inactiveSources;
537  }
538  {
539  //make sure we update values before calling pickSingelSource
540  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
541  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
543  m_inactiveSources = std::move(inactiveSources);
544  });
545 
546  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
547  }
548 
549  std::shared_ptr<Source> source = pickSingleSource();
550  source->handle(c_ptr);
551  return c_ptr->get_future();
552 }
553 
555  std::stringstream ss;
556  ss << "tried=";
557  size_t count = 0;
558  {
559  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
560 
561  for (const auto &it : m_activeSources) {
562  count++;
563  ss << it->ExcludeID().substr(0, it->ExcludeID().find(':')) << ",";
564  }
565  for (const auto &it : m_inactiveSources) {
566  count++;
567  ss << it->ExcludeID().substr(0, it->ExcludeID().find(':')) << ",";
568  }
569  }
570  for (const auto &it : m_disabledExcludeStrings) {
571  count++;
572  ss << it.substr(0, it.find(':')) << ",";
573  }
574  if (count) {
575  std::string tmp_str = ss.str();
576  return tmp_str.substr(0, tmp_str.size() - 1);
577  }
578  return "";
579 }
580 
581 void XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source) {
582  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
583  if (status.IsOK()) {
584  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
585  for (const auto &s : m_activeSources) {
586  if (source->ID() == s->ID()) {
587  edm::LogVerbatim("XrdAdaptorInternal")
588  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
589  unsigned returned_count = ++m_excluded_active_count;
590  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
591  if (returned_count >= 3) {
592  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2 * XRD_ADAPTOR_SHORT_OPEN_DELAY;
593  }
594  return;
595  }
596  }
597  for (const auto &s : m_inactiveSources) {
598  if (source->ID() == s->ID()) {
599  edm::LogVerbatim("XrdAdaptorInternal")
600  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
601  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
602  return;
603  }
604  }
605  if (m_activeSources.size() < 2) {
606  auto oldSources = m_activeSources;
607  m_activeSources.push_back(source);
608  reportSiteChange(oldSources, m_activeSources);
609  queueUpdateCurrentServer(source->ID());
610  } else {
611  m_inactiveSources.push_back(source);
612  }
613  } else { // File-open failure - wait at least 120s before next attempt.
614  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
615  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
616  }
617 }
618 
619 std::future<IOSize> XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist) {
620  //Use a copy of m_activeSources and m_inactiveSources throughout this function
621  // in order to avoid holding the lock a long time and causing a deadlock.
622  // When the function is over we will update the values of the containers
623  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
624  {
625  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
626  activeSources = m_activeSources;
627  inactiveSources = m_inactiveSources;
628  }
629  //Make sure we update changes when we leave the function
630  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
631  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
632  m_activeSources = std::move(activeSources);
633  m_inactiveSources = std::move(inactiveSources);
634  });
635 
636  updateCurrentServer();
637 
638  timespec now;
640 
641  edm::CPUTimer timer;
642  timer.start();
643 
644  if (activeSources.size() == 1) {
645  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
646  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
647  activeSources[0]->handle(c_ptr);
648  return c_ptr->get_future();
649  }
650  // Make sure active
651  else if (activeSources.empty()) {
653  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
654  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
655  ex.addContext("In XrdAdaptor::RequestManager::handle()");
656  addConnections(ex);
657  throw ex;
658  }
659 
660  assert(iolist.get());
661  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
662  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
663  splitClientRequest(*iolist, *req1, *req2, activeSources);
664 
665  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
666  // CheckSources may have removed a source
667  if (activeSources.size() == 1) {
668  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
669  activeSources[0]->handle(c_ptr);
670  return c_ptr->get_future();
671  }
672 
673  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
674  std::future<IOSize> future1, future2;
675  if (!req1->empty()) {
676  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
677  activeSources[0]->handle(c_ptr1);
678  future1 = c_ptr1->get_future();
679  }
680  if (!req2->empty()) {
681  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
682  activeSources[1]->handle(c_ptr2);
683  future2 = c_ptr2->get_future();
684  }
685  if (!req1->empty() && !req2->empty()) {
686  std::future<IOSize> task = std::async(
687  std::launch::deferred,
688  [](std::future<IOSize> a, std::future<IOSize> b) {
689  // Wait until *both* results are available. This is essential
690  // as the callback may try referencing the RequestManager. If one
691  // throws an exception (causing the RequestManager to be destroyed by
692  // XrdFile) and the other has a failure, then the recovery code will
693  // reference the destroyed RequestManager.
694  //
695  // Unlike other places where we use shared/weak ptrs to maintain object
696  // lifetime and destruction asynchronously, we *cannot* destroy the request
697  // asynchronously as it is associated with a ROOT buffer. We must wait until we
698  // are guaranteed that XrdCl will not write into the ROOT buffer before we
699  // can return.
700  b.wait();
701  a.wait();
702  return b.get() + a.get();
703  },
704  std::move(future1),
705  std::move(future2));
706  timer.stop();
707  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
708  return task;
709  } else if (!req1->empty()) {
710  return future1;
711  } else if (!req2->empty()) {
712  return future2;
713  } else { // Degenerate case - no bytes to read.
714  std::promise<IOSize> p;
715  p.set_value(0);
716  return p.get_future();
717  }
718 }
719 
720 void RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status) {
721  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
722 
723  // Fail early for invalid responses - XrdFile has a separate path for handling this.
724  if (c_status.code == XrdCl::errInvalidResponse) {
725  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
727  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
728  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
729  << ") => Invalid ReadV response from server";
730  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
731  addConnections(ex);
732  throw ex;
733  }
734  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
735 
736  // Note that we do not delete the Source itself. That is because this
737  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
738  // In such a case, if you close a file in the handler, it will deadlock
739  m_disabledSourceStrings.insert(source_ptr->ID());
740  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
741  m_disabledSources.insert(source_ptr);
742 
743  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
744  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
745  auto oldSources = m_activeSources;
746  m_activeSources.erase(m_activeSources.begin());
747  reportSiteChange(oldSources, m_activeSources);
748  } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
749  auto oldSources = m_activeSources;
750  m_activeSources.erase(m_activeSources.begin() + 1);
751  reportSiteChange(oldSources, m_activeSources);
752  }
753  std::shared_ptr<Source> new_source;
754  if (m_activeSources.empty()) {
755  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
756  timespec now;
759  // Note we only wait for 180 seconds here. This is because we've already failed
760  // once and the likelihood the program has some inconsistent state is decent.
761  // We'd much rather fail hard than deadlock!
762  sentry.unlock();
763  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
766  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
767  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
768  << ") => timeout when waiting for file open";
769  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
770  addConnections(ex);
771  throw ex;
772  } else {
773  try {
774  new_source = future.get();
775  } catch (edm::Exception &ex) {
776  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
777  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
778  throw;
779  }
780  }
781 
782  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
783  m_disabledSourceStrings.end()) {
784  // The server gave us back a data node we requested excluded. Fatal!
786  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
787  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
788  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
789  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
790  addConnections(ex);
791  throw ex;
792  }
793  sentry.lock();
794 
795  auto oldSources = m_activeSources;
796  m_activeSources.push_back(new_source);
797  reportSiteChange(oldSources, m_activeSources);
798  } else {
799  new_source = m_activeSources[0];
800  }
801  new_source->handle(c_ptr);
802 }
803 
804 static void consumeChunkFront(size_t &front,
805  std::vector<IOPosBuffer> &input,
806  std::vector<IOPosBuffer> &output,
807  IOSize chunksize) {
808  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
809  IOPosBuffer &io = input[front];
810  IOPosBuffer &outio = output.back();
811  if (io.size() > chunksize) {
812  IOSize consumed;
813  if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
814  (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
815  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
816  consumed = (XRD_CL_MAX_CHUNK - outio.size());
817  outio.set_size(XRD_CL_MAX_CHUNK);
818  } else {
819  consumed = chunksize;
820  outio.set_size(outio.size() + consumed);
821  }
822  } else {
823  consumed = chunksize;
824  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
825  }
826  chunksize -= consumed;
827  IOSize newsize = io.size() - consumed;
828  IOOffset newoffset = io.offset() + consumed;
829  void *newdata = static_cast<char *>(io.data()) + consumed;
830  io.set_offset(newoffset);
831  io.set_data(newdata);
832  io.set_size(newsize);
833  } else if (io.size() == 0) {
834  front++;
835  } else {
836  output.push_back(io);
837  chunksize -= io.size();
838  front++;
839  }
840  }
841 }
842 
843 static void consumeChunkBack(size_t front,
844  std::vector<IOPosBuffer> &input,
845  std::vector<IOPosBuffer> &output,
846  IOSize chunksize) {
847  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
848  IOPosBuffer &io = input.back();
849  IOPosBuffer &outio = output.back();
850  if (io.size() > chunksize) {
851  IOSize consumed;
852  if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
853  (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
854  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
855  consumed = (XRD_CL_MAX_CHUNK - outio.size());
856  outio.set_size(XRD_CL_MAX_CHUNK);
857  } else {
858  consumed = chunksize;
859  outio.set_size(outio.size() + consumed);
860  }
861  } else {
862  consumed = chunksize;
863  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
864  }
865  chunksize -= consumed;
866  IOSize newsize = io.size() - consumed;
867  IOOffset newoffset = io.offset() + consumed;
868  void *newdata = static_cast<char *>(io.data()) + consumed;
869  io.set_offset(newoffset);
870  io.set_data(newdata);
871  io.set_size(newsize);
872  } else if (io.size() == 0) {
873  input.pop_back();
874  } else {
875  output.push_back(io);
876  chunksize -= io.size();
877  input.pop_back();
878  }
879  }
880 }
881 
882 static IOSize validateList(const std::vector<IOPosBuffer> req) {
883  IOSize total = 0;
884  off_t last_offset = -1;
885  for (const auto &it : req) {
886  total += it.size();
887  assert(it.offset() > last_offset);
888  last_offset = it.offset();
889  assert(it.size() <= XRD_CL_MAX_CHUNK);
890  assert(it.offset() < 0x1ffffffffff);
891  }
892  assert(req.size() <= 1024);
893  return total;
894 }
895 
896 void XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist,
897  std::vector<IOPosBuffer> &req1,
898  std::vector<IOPosBuffer> &req2,
899  std::vector<std::shared_ptr<Source>> const &activeSources) const {
900  if (iolist.empty())
901  return;
902  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
903  req1.reserve(iolist.size() / 2 + 1);
904  req2.reserve(iolist.size() / 2 + 1);
905  size_t front = 0;
906 
907  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
908  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
909  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
910  IOSize chunk1, chunk2;
911  // Make sure the chunk size is at least 1024; little point to reads less than that size.
912  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
913  static_cast<IOSize>(1024));
914  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
915  static_cast<IOSize>(1024));
916 
917  IOSize size_orig = 0;
918  for (const auto &it : iolist)
919  size_orig += it.size();
920 
921  while (tmp_iolist.size() - front > 0) {
922  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
923  (req2.size() >=
924  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
925  // are passed to the request manager. However, because we have a max chunk size, we increase
926  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
927  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
928  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
930  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
931  << ", permissions=0" << std::oct << m_perms << std::dec
932  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
933  "reported to CMSSW developers.";
934  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
935  addConnections(ex);
936  std::stringstream ss;
937  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
938  ex.addAdditionalInfo(ss.str());
939  std::stringstream ss2;
940  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
941  ex.addAdditionalInfo(ss2.str());
942  throw ex;
943  }
944  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
945  consumeChunkFront(front, tmp_iolist, req1, chunk1);
946  }
947  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
948  consumeChunkBack(front, tmp_iolist, req2, chunk2);
949  }
950  }
951  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
952  return left.offset() < right.offset();
953  });
954  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
955  return left.offset() < right.offset();
956  });
957 
958  IOSize size1 = validateList(req1);
959  IOSize size2 = validateList(req2);
960 
961  assert(size_orig == size1 + size2);
962 
963  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
964  << " bytes) split into requests size " << req1.size() << " (" << size1
965  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
966 }
967 
968 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager) : m_manager(manager) {}
969 
970 // Cannot use ~OpenHandler=default as XrdCl::File is not fully
971 // defined in the header.
973 
975  XrdCl::AnyObject *,
976  XrdCl::HostList *hostList_ptr) {
977  // Make sure we get rid of the strong self-reference when the callback finishes.
978  std::shared_ptr<OpenHandler> self = m_self;
979  m_self.reset();
980 
981  // NOTE: as in XrdCl::File (synchronous), we ignore the response object.
982  // Make sure that we set m_outstanding_open to false on exit from this function.
983  // NOTE: we need to pass non-nullptr to unique_ptr in order for the guard to run
984  std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> outstanding_guard(
985  this, [&](OpenHandler *) { m_outstanding_open = false; });
986 
987  std::shared_ptr<Source> source;
988  std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
989  std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
990 
991  auto manager = m_manager.lock();
992  // Manager object has already been deleted. Cleanup the
993  // response objects, remove our self-reference, and ignore the response.
994  if (!manager) {
995  return;
996  }
997  //if we need to delete the File object we must do it outside
998  // of the lock to avoid a potential deadlock
999  std::unique_ptr<XrdCl::File> releaseFile;
1000  {
1001  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1002 
1003  if (status->IsOK()) {
1004  SendMonitoringInfo(*m_file);
1005  timespec now;
1007 
1008  std::string excludeString;
1009  Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1010 
1011  source.reset(new Source(now, std::move(m_file), excludeString));
1012  m_promise.set_value(source);
1013  } else {
1014  releaseFile = std::move(m_file);
1016  ex << "XrdCl::File::Open(name='" << manager->m_name << "', flags=0x" << std::hex << manager->m_flags
1017  << ", permissions=0" << std::oct << manager->m_perms << std::dec << ") => error '" << status->ToStr()
1018  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1019  ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1020  manager->addConnections(ex);
1021 
1022  m_promise.set_exception(std::make_exception_ptr(ex));
1023  }
1024  }
1025  manager->handleOpen(*status, source);
1026 }
1027 
1029  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1030 
1031  if (!m_file.get()) {
1032  return "(no open in progress)";
1033  }
1034  std::string dataServer;
1035  m_file->GetProperty("DataServer", dataServer);
1036  if (dataServer.empty()) {
1037  return "(unknown source)";
1038  }
1039  return dataServer;
1040 }
1041 
1042 std::shared_future<std::shared_ptr<Source>> XrdAdaptor::RequestManager::OpenHandler::open() {
1043  auto manager_ptr = m_manager.lock();
1044  if (!manager_ptr) {
1046  ex << "XrdCl::File::Open() =>"
1047  << " error: OpenHandler called within an invalid RequestManager context."
1048  << " This is a logic error and should be reported to the CMSSW developers.";
1049  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1050  throw ex;
1051  }
1052  RequestManager &manager = *manager_ptr;
1053  auto self_ptr = m_self_weak.lock();
1054  if (!self_ptr) {
1056  ex << "XrdCl::File::Open() => error: "
1057  << "OpenHandler called after it was deleted. This is a logic error "
1058  << "and should be reported to the CMSSW developers.";
1059  ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1060  throw ex;
1061  }
1062 
1063  // NOTE NOTE: we look at this variable *without* the lock. This means the method
1064  // is not thread-safe; the caller is responsible to verify it is not called from
1065  // multiple threads simultaneously.
1066  //
1067  // This is done because ::open may be called from a Xrootd callback; if we
1068  // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1069  // and make a call into xrootd (when it invokes m_file.reset()). Hence, our callback
1070  // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1071  // an Xrootd mutex and tries to hold m_mutex. This is a classic deadlock.
1072  if (m_outstanding_open) {
1073  return m_shared_future;
1074  }
1075  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1076  std::promise<std::shared_ptr<Source>> new_promise;
1077  m_promise.swap(new_promise);
1078  m_shared_future = m_promise.get_future().share();
1079 
1080  auto opaque = manager.prepareOpaqueString();
1081  std::string new_name = manager.m_name + ((manager.m_name.find('?') == manager.m_name.npos) ? "?" : "&") + opaque;
1082  edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1083  m_file = std::make_unique<XrdCl::File>();
1084  m_outstanding_open = true;
1085 
1086  // Always make sure we release m_file and set m_outstanding_open to false on error.
1087  std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> exit_guard(this, [&](OpenHandler *) {
1088  m_outstanding_open = false;
1089  m_file.reset();
1090  });
1091 
1092  XrdCl::XRootDStatus status;
1093  if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK()) {
1095  ex << "XrdCl::File::Open(name='" << new_name << "', flags=0x" << std::hex << manager.m_flags << ", permissions=0"
1096  << std::oct << manager.m_perms << std::dec << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
1097  << ", code=" << status.code << ")";
1098  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1099  manager.addConnections(ex);
1100  throw ex;
1101  }
1102  exit_guard.release();
1103  // Have a strong self-reference for as long as the callback is in-progress.
1104  m_self = self_ptr;
1105  return m_shared_future;
1106 }
IOPosBuffer::set_offset
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:48
Likely.h
change_name.diff
diff
Definition: change_name.py:13
XRD_ADAPTOR_SHORT_OPEN_DELAY
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
Definition: XrdRequestManager.cc:27
XrdAdaptor::RequestManager::m_flags
XrdCl::OpenFlags::Flags m_flags
Definition: XrdRequestManager.h:228
XrdAdaptor::Source::determineHostExcludeString
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:288
XrdAdaptor::RequestManager::getActiveFile
std::shared_ptr< XrdCl::File > getActiveFile() const
Definition: XrdRequestManager.cc:452
XrdAdaptor::RequestManager::queueUpdateCurrentServer
void queueUpdateCurrentServer(const std::string &)
Definition: XrdRequestManager.cc:258
relmon_authenticated_wget.url
url
Definition: relmon_authenticated_wget.py:22
input
static const std::string input
Definition: EdmProvDump.cc:48
MessageLogger.h
XrdAdaptor::RequestManager::m_generator
std::mt19937 m_generator
Definition: XrdRequestManager.h:232
XrdStatistics.h
funct::false
false
Definition: Factorize.h:29
HcalTopologyMode::Mode
Mode
Definition: HcalTopologyMode.h:26
XrdAdaptor::RequestManager::m_nextActiveSourceCheck
timespec m_nextActiveSourceCheck
Definition: XrdRequestManager.h:224
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
XrdAdaptor::RequestManager::addConnections
void addConnections(cms::Exception &) const
Definition: XrdRequestManager.cc:488
XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
Definition: XrdRequestManager.cc:37
XrdAdaptor::RequestManager::m_disabledExcludeStrings
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
Definition: XrdRequestManager.h:212
edm::errors::LogicError
Definition: EDMException.h:37
convertSQLitetoXML_cfg.output
output
Definition: convertSQLitetoXML_cfg.py:72
submitPVValidationJobs.now
now
Definition: submitPVValidationJobs.py:639
mps_update.status
status
Definition: mps_update.py:69
XrdAdaptor::RequestManager::m_inactiveSources
std::vector< std::shared_ptr< Source > > m_inactiveSources
Definition: XrdRequestManager.h:209
XrdAdaptor::RequestManager::OpenHandler
Definition: XrdRequestManager.h:237
XrdAdaptor::RequestManager::m_open_handler
std::shared_ptr< OpenHandler > m_open_handler
Definition: XrdRequestManager.h:294
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
XrdAdaptor::RequestManager::getDisabledSourceNames
void getDisabledSourceNames(std::vector< std::string > &sources) const
Definition: XrdRequestManager.cc:481
edm::CPUTimer
Definition: CPUTimer.h:37
XrdAdaptor::RequestManager::m_activeSources
std::vector< std::shared_ptr< Source > > m_activeSources
Definition: XrdRequestManager.h:208
XrdAdaptor::RequestManager::OpenHandler::open
std::shared_future< std::shared_ptr< Source > > open()
Definition: XrdRequestManager.cc:1042
XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_CHUNK_THRESHOLD
Definition: XrdRequestManager.cc:40
XrdAdaptor::RequestManager::m_perms
XrdCl::Access::Mode m_perms
Definition: XrdRequestManager.h:229
pileupDistInMC.oldList
oldList
Definition: pileupDistInMC.py:39
IOPosBuffer::set_data
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:51
cms::cuda::assert
assert(be >=bs)
CalibrationSummaryClient_cfi.sources
sources
Definition: CalibrationSummaryClient_cfi.py:23
XrdAdaptor::RequestManager::m_disabledSources
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
Definition: XrdRequestManager.h:213
indexGen.s2
s2
Definition: indexGen.py:107
XrdAdaptor::RequestManager::m_serverToAdvertise
std::atomic< std::string * > m_serverToAdvertise
Definition: XrdRequestManager.h:217
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
spr::find
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
btagGenBb_cfi.Status
Status
Definition: btagGenBb_cfi.py:4
edm::Service::isAvailable
bool isAvailable() const
Definition: Service.h:40
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
XrdAdaptor::RequestManager::updateCurrentServer
void updateCurrentServer()
Definition: XrdRequestManager.cc:241
edm::errors::FileOpenError
Definition: EDMException.h:49
GET_CLOCK_MONOTONIC
#define GET_CLOCK_MONOTONIC(ts)
Definition: XrdRequestManager.cc:56
XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
Definition: XrdRequestManager.cc:974
heavyIonCSV_trainingSettings.idx
idx
Definition: heavyIonCSV_trainingSettings.py:5
XrdAdaptor::RequestManager::RequestManager
RequestManager(const RequestManager &)=delete
edm::Exception
Definition: EDMException.h:77
edmScanValgrind.buffer
buffer
Definition: edmScanValgrind.py:171
EDMException.h
SendMonitoringInfoHandler
Definition: XrdRequestManager.cc:71
contentValuesCheck.ss
ss
Definition: contentValuesCheck.py:33
consumeChunkBack
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
Definition: XrdRequestManager.cc:843
XrdAdaptor::RequestManager::getPrettyActiveSourceNames
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
Definition: XrdRequestManager.cc:473
XrdAdaptor::RequestManager::m_lastSourceCheck
timespec m_lastSourceCheck
Definition: XrdRequestManager.h:219
alignCSCRings.s
s
Definition: alignCSCRings.py:92
XrdAdaptor::RequestManager::pickSingleSource
std::shared_ptr< Source > pickSingleSource()
Definition: XrdRequestManager.cc:501
XrdAdaptor::RequestManager::compareSources
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
Definition: XrdRequestManager.cc:318
XrdAdaptor::RequestManager::reportSiteChange
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
Definition: XrdRequestManager.cc:285
Service.h
seconds
double seconds()
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
source
static const std::string source
Definition: EdmProvDump.cc:47
XrdAdaptor::RequestManager::initialize
void initialize(std::weak_ptr< RequestManager > selfref)
Definition: XrdRequestManager.cc:125
IOPosBuffer::data
void * data(void) const
Definition: IOPosBuffer.h:42
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
submitPVResolutionJobs.count
count
Definition: submitPVResolutionJobs.py:352
XrdAdaptor::Source::getDomain
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:246
XRD_ADAPTOR_LONG_OPEN_DELAY
#define XRD_ADAPTOR_LONG_OPEN_DELAY
Definition: XrdRequestManager.cc:36
edm::CPUTimer::start
void start()
Definition: CPUTimer.cc:68
CMS_THREAD_SAFE
#define CMS_THREAD_SAFE
Definition: thread_safety_macros.h:4
XrdRequestManager.h
b
double b
Definition: hdecay.h:118
q2
double q2[4]
Definition: TauolaWrapper.h:88
validateList
static IOSize validateList(const std::vector< IOPosBuffer > req)
Definition: XrdRequestManager.cc:882
XRD_CL_MAX_CHUNK
#define XRD_CL_MAX_CHUNK
Definition: XrdRequestManager.cc:25
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
XrdAdaptor::RequestManager::getActiveSourceNames
void getActiveSourceNames(std::vector< std::string > &sources) const
Definition: XrdRequestManager.cc:465
XrdAdaptor::RequestManager::m_name
const std::string m_name
Definition: XrdRequestManager.h:227
web.browse_db.env
env
Definition: browse_db.py:18
XrdAdaptor::RequestManager::m_nextInitialSourceToggle
bool m_nextInitialSourceToggle
Definition: XrdRequestManager.h:222
IOPosBuffer::set_size
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:54
XrdAdaptor::Source::isDCachePool
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:256
q1
double q1[4]
Definition: TauolaWrapper.h:87
a
double a
Definition: hdecay.h:119
XrdAdaptor::RequestManager::splitClientRequest
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
Definition: XrdRequestManager.cc:896
SiStripPI::max
Definition: SiStripPayloadInspectorHelper.h:169
cms::Exception::addAdditionalInfo
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
XrdAdaptor::RequestManager::handle
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
Definition: XrdRequestManager.h:56
mps_check.timeout
int timeout
Definition: mps_check.py:53
XrdAdaptor::Source::getXrootdSiteFromURL
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:326
IOOffset
int64_t IOOffset
Definition: IOTypes.h:19
thread_safety_macros.h
XrdAdaptor::RequestManager::m_distribution
std::uniform_real_distribution< float > m_distribution
Definition: XrdRequestManager.h:233
edm::Service
Definition: Service.h:30
XrdAdaptor::RequestManager::m_source_mutex
std::recursive_mutex m_source_mutex
Definition: XrdRequestManager.h:230
FrontierConditions_GlobalTag_cff.file
file
Definition: FrontierConditions_GlobalTag_cff.py:13
CalibrationSummaryClient_cfi.activeSources
activeSources
Definition: CalibrationSummaryClient_cfi.py:11
trackerHitRTTI::vector
Definition: trackerHitRTTI.h:21
StatisticsSenderService.h
edm::CPUTimer::stop
Times stop()
Definition: CPUTimer.cc:87
XrdAdaptor::RequestManager::checkSources
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
Definition: XrdRequestManager.cc:300
get
#define get
XrdAdaptor::RequestManager::OpenHandler::OpenHandler
OpenHandler(const OpenHandler &)=delete
XRD_ADAPTOR_OPEN_PROBE_PERCENT
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
Definition: XrdRequestManager.cc:35
nullHandler
SendMonitoringInfoHandler nullHandler
Definition: XrdRequestManager.cc:90
alignCSCRings.r
r
Definition: alignCSCRings.py:93
cms::cuda::device::unique_ptr
std::unique_ptr< T, impl::DeviceDeleter > unique_ptr
Definition: device_unique_ptr.h:33
VtxSmearedBeamProfile_cfi.File
File
Definition: VtxSmearedBeamProfile_cfi.py:30
eostools.move
def move(src, dest)
Definition: eostools.py:511
XrdAdaptor::RequestManager::handleOpen
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
Definition: XrdRequestManager.cc:581
IOPosBuffer::size
IOSize size(void) const
Definition: IOPosBuffer.h:45
edm::storage::StatisticsSenderService::getJobID
static const char * getJobID()
Definition: StatisticsSenderService.cc:145
cms::Exception::clearContext
void clearContext()
Definition: Exception.cc:161
XrdAdaptor::RequestManager::OpenHandler::getInstance
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
Definition: XrdRequestManager.h:242
IOPosBuffer::offset
IOOffset offset(void) const
Definition: IOPosBuffer.h:39
edm::LogVerbatim
Log< level::Info, true > LogVerbatim
Definition: MessageLogger.h:128
LIKELY
#define LIKELY(x)
Definition: Likely.h:20
XrdAdaptor::RequestManager::OpenHandler::~OpenHandler
~OpenHandler() override
Definition: XrdRequestManager.cc:972
SendMonitoringInfo
static void SendMonitoringInfo(XrdCl::File &file)
Definition: XrdRequestManager.cc:92
HiBiasedCentrality_cfi.function
function
Definition: HiBiasedCentrality_cfi.py:4
funct::void
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
XrdAdaptor::RequestManager::prepareOpaqueString
std::string prepareOpaqueString() const
Definition: XrdRequestManager.cc:554
dqmMemoryStats.total
total
Definition: dqmMemoryStats.py:152
IOPosBuffer
Definition: IOPosBuffer.h:7
consumeChunkFront
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
Definition: XrdRequestManager.cc:804
XrdAdaptor::XrootdException
Definition: XrdRequestManager.h:31
XrdAdaptor::RequestManager::m_disabledSourceStrings
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
Definition: XrdRequestManager.h:211
hcal_runs.URL
URL
Definition: hcal_runs.py:4
cms::Exception
Definition: Exception.h:70
CPUTimer.h
XrdAdaptor::RequestManager::m_timeout
int m_timeout
Definition: XrdRequestManager.h:220
XrdAdaptor::RequestManager::OpenHandler::current_source
std::string current_source()
Definition: XrdRequestManager.cc:1028
XrdAdaptor::RequestManager
Definition: XrdRequestManager.h:44
IOSize
size_t IOSize
Definition: IOTypes.h:14
HLT_FULL_cff.flags
flags
Definition: HLT_FULL_cff.py:13150
edm::Log
Definition: MessageLogger.h:70
cms::Exception::clearMessage
void clearMessage()
Definition: Exception.cc:159
edm::errors::FileReadError
Definition: EDMException.h:50
XrdAdaptor::Source::getHostname
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:219
TauDecayModes.dec
dec
Definition: TauDecayModes.py:143
edm::storage::StatisticsSenderService::setCurrentServer
void setCurrentServer(const std::string &servername)
Definition: StatisticsSenderService.cc:151
SendMonitoringInfoHandler::HandleResponse
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
Definition: XrdRequestManager.cc:72
XrdAdaptor
Definition: QualityMetric.h:14
cms::Exception::clearAdditionalInfo
void clearAdditionalInfo()
Definition: Exception.cc:163
XrdAdaptor::RequestManager::checkSourcesImpl
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
Definition: XrdRequestManager.cc:346
timeDiffMS
long long timeDiffMS(const timespec &a, const timespec &b)
Definition: XrdRequestManager.cc:61
XrdAdaptor::RequestManager::requestFailure
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
Definition: XrdRequestManager.cc:720
XrdAdaptor::ClientRequest
Definition: XrdRequest.h:22
CollectionTags_cfi.Source
Source
Definition: CollectionTags_cfi.py:11