CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
XrdRequestManager.cc
Go to the documentation of this file.
1 
2 #include <assert.h>
3 #include <iostream>
4 #include <algorithm>
5 #include <netdb.h>
6 
7 #include "XrdCl/XrdClFile.hh"
8 #include "XrdCl/XrdClDefaultEnv.hh"
9 
15 
16 #include "XrdStatistics.h"
18 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
19 
20 #define XRD_CL_MAX_CHUNK 512*1024
21 
22 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5
23 
24 #ifdef XRD_FAKE_OPEN_PROBE
25 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100
26 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20
27 // This is the minimal difference in quality required to swap an active and inactive source
28 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0
29 #else
30 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10
31 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2*60
32 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
33 #endif
34 
35 #ifdef __MACH__
36 #include <mach/clock.h>
37 #include <mach/mach.h>
38 #define GET_CLOCK_MONOTONIC(ts) \
39 { \
40  clock_serv_t cclock; \
41  mach_timespec_t mts; \
42  host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
43  clock_get_time(cclock, &mts); \
44  mach_port_deallocate(mach_task_self(), cclock); \
45  ts.tv_sec = mts.tv_sec; \
46  ts.tv_nsec = mts.tv_nsec; \
47 }
48 #else
49 #define GET_CLOCK_MONOTONIC(ts) \
50  clock_gettime(CLOCK_MONOTONIC, &ts);
51 #endif
52 
53 using namespace XrdAdaptor;
54 
55 long long timeDiffMS(const timespec &a, const timespec &b)
56 {
57  long long diff = (a.tv_sec - b.tv_sec) * 1000;
58  diff += (a.tv_nsec - b.tv_nsec) / 1e6;
59  return diff;
60 }
61 
62 /*
63  * We do not care about the response of sending the monitoring information;
64  * this handler class simply frees any returned buffer to prevent memory leaks.
65  */
66 class SendMonitoringInfoHandler : boost::noncopyable, public XrdCl::ResponseHandler
67 {
68  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
69  {
70  if (response)
71  {
72  XrdCl::Buffer *buffer = nullptr;
73  response->Get(buffer);
74  delete buffer;
75  }
76  }
77 };
78 
80 
81 
82 static void
84 {
85  // Do not send this to a dCache data server as they return an error.
86  // In some versions of dCache, sending the monitoring information causes
87  // the server to close the connection - resulting in failures.
88  if (Source::isDCachePool(file)) {return;}
89 
90  // Send the monitoring info, if available.
92  std::string lastUrl;
93  file.GetProperty("LastURL", lastUrl);
94  if (jobId && lastUrl.size())
95  {
96  XrdCl::FileSystem fs = XrdCl::FileSystem(XrdCl::URL(lastUrl));
97  fs.SendInfo(jobId, &nullHandler, 30);
98  edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
99  }
100 }
101 
102 
104  : m_timeout(XRD_DEFAULT_TIMEOUT),
105  m_nextInitialSourceToggle(false),
106  m_name(filename),
107  m_flags(flags),
108  m_perms(perms),
109  m_distribution(0,100),
110  m_excluded_active_count(0)
111 {
112 }
113 
114 
115 void
116 RequestManager::initialize(std::weak_ptr<RequestManager> self)
117 {
119 
120  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
121  if (env) {env->GetInt("StreamErrorWindow", m_timeout);}
122 
123  std::string orig_site;
124  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos))
125  {
126  std::string hostname;
127  if (Source::getHostname(orig_site, hostname))
128  {
129  Source::getDomain(hostname, orig_site);
130  }
131  }
132 
133  std::unique_ptr<XrdCl::File> file;
135  bool validFile = false;
136  const int retries = 5;
137  std::string excludeString;
138  for (int idx=0; idx<retries; idx++)
139  {
140  file.reset(new XrdCl::File());
141  auto opaque = prepareOpaqueString();
142  std::string new_filename = m_name + (opaque.size() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
143  SyncHostResponseHandler handler;
144  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
145  if (!openStatus.IsOK())
146  { // In this case, we failed immediately - this indicates we have previously tried to talk to this
147  // server and it was marked bad - xrootd couldn't even queue up the request internally!
148  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
149  // redirector being down or authentication failures.
150  ex.clearMessage();
151  ex.clearContext();
152  ex.clearAdditionalInfo();
153  ex << "XrdCl::File::Open(name='" << m_name
154  << "', flags=0x" << std::hex << m_flags
155  << ", permissions=0" << std::oct << m_perms << std::dec
156  << ") => error '" << openStatus.ToStr()
157  << "' (errno=" << openStatus.errNo << ", code=" << openStatus.code << ")";
158  ex.addContext("Calling XrdFile::open()");
159  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
160  throw ex;
161  }
162  handler.WaitForResponse();
163  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
164  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
165  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
166  assert(status);
167  if (status->IsOK())
168  {
169  validFile = true;
170  break;
171  }
172  else
173  {
174  ex.clearMessage();
175  ex.clearContext();
176  ex.clearAdditionalInfo();
177  ex << "XrdCl::File::Open(name='" << m_name
178  << "', flags=0x" << std::hex << m_flags
179  << ", permissions=0" << std::oct << m_perms << std::dec
180  << ") => error '" << status->ToStr()
181  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
182  ex.addContext("Calling XrdFile::open()");
183  addConnections(ex);
184  std::string dataServer, lastUrl;
185  file->GetProperty("DataServer", dataServer);
186  file->GetProperty("LastURL", lastUrl);
187  if (dataServer.size())
188  {
189  ex.addAdditionalInfo("Problematic data server: " + dataServer);
190  }
191  if (lastUrl.size())
192  {
193  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
194  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
195  }
196  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) != m_disabledSourceStrings.end())
197  {
198  ex << ". No additional data servers were found.";
199  throw ex;
200  }
201  if (dataServer.size())
202  {
203  m_disabledSourceStrings.insert(dataServer);
204  m_disabledExcludeStrings.insert(excludeString);
205  }
206  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
207  if (lastUrl == new_filename)
208  {
209  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
210  throw ex;
211  }
212  }
213  }
214  if (!validFile)
215  {
216  throw ex;
217  }
218  SendMonitoringInfo(*file);
219 
220  timespec ts;
222 
223  std::shared_ptr<Source> source(new Source(ts, std::move(file), excludeString));
224  {
225  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
226  m_activeSources.push_back(source);
227  updateSiteInfo(orig_site);
228  }
229 
230  m_lastSourceCheck = ts;
231  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
233 }
234 
235 
236 void
238 {
239  std::string siteA, siteB, siteList;
240  if (m_activeSources.size()) {siteA = m_activeSources[0]->Site();}
241  if (m_activeSources.size() == 2) {siteB = m_activeSources[1]->Site();}
242  siteList = siteA;
243  if (siteB.size() && (siteB != siteA)) {siteList = siteA + ", " + siteB;}
244  if (orig_site.size() && (orig_site != siteList))
245  {
246  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
247  m_activeSites = siteList;
248  }
249  else if (!orig_site.size() && (siteList != m_activeSites))
250  {
251  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << m_activeSites;
252  m_activeSites = siteList;
253  }
254 }
255 
256 
257 void
258 RequestManager::checkSources(timespec &now, IOSize requestSize)
259 {
260  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check "
261  << timeDiffMS(now, m_lastSourceCheck) << "; last check "
262  << m_lastSourceCheck.tv_sec << "; now " <<now.tv_sec
263  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
264  if (timeDiffMS(now, m_lastSourceCheck) > 1000)
265  {
266  { // Be more aggressive about getting rid of very bad sources.
267  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
268  compareSources(now, 0, 1);
269  compareSources(now, 1, 0);
270  }
271  if (timeDiffMS(now, m_nextActiveSourceCheck) > 0)
272  {
273  checkSourcesImpl(now, requestSize);
274  }
275  }
276 }
277 
278 
279 bool
280 RequestManager::compareSources(const timespec &now, unsigned a, unsigned b)
281 {
282  if (m_activeSources.size() < std::max(a, b)+1) {return false;}
283 
284  bool findNewSource = false;
285  if ((m_activeSources[a]->getQuality() > 5130) ||
286  ((m_activeSources[a]->getQuality() > 260) && (m_activeSources[b]->getQuality()*4 < m_activeSources[a]->getQuality())))
287  {
288  edm::LogVerbatim("XrdAdaptorInternal") << "Removing "
289  << m_activeSources[a]->PrettyID() << " from active sources due to poor quality ("
290  << m_activeSources[a]->getQuality() << " vs " << m_activeSources[b]->getQuality() << ")" << std::endl;
291  if (m_activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource = true;}
292  m_activeSources[a]->setLastDowngrade(now);
293  m_inactiveSources.emplace_back(m_activeSources[a]);
294  m_activeSources.erase(m_activeSources.begin()+a);
295  updateSiteInfo();
296  }
297  return findNewSource;
298 }
299 
300 void
302 {
303  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
304 
305  bool findNewSource = false;
306  if (m_activeSources.size() <= 1)
307  {
308  findNewSource = true;
309  }
310  else if (m_activeSources.size() > 1)
311  {
312  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << m_activeSources[0]->getQuality() << ", source 1 quality " << m_activeSources[1]->getQuality() << std::endl;
313  findNewSource |= compareSources(now, 0, 1);
314  findNewSource |= compareSources(now, 1, 0);
315 
316  // NOTE: We could probably replace the copy with a better sort function.
317  // However, there are typically very few sources and the correctness is more obvious right now.
318  std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(m_inactiveSources.size());
319  for (const auto & source : m_inactiveSources)
320  {
321  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY-1)*1000) {eligibleInactiveSources.push_back(source);}
322  }
323  std::vector<std::shared_ptr<Source> >::iterator bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
324  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
325  std::vector<std::shared_ptr<Source> >::iterator worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(),
326  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
327  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
328  {
329  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " <<(*bestInactiveSource)->PrettyID()
330  << ", quality " << (*bestInactiveSource)->getQuality();
331  }
332  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " <<(*worstActiveSource)->PrettyID()
333  << ", quality " << (*worstActiveSource)->getQuality();
334  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
335  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
336  if ((bestInactiveSource != eligibleInactiveSources.end()) && m_activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*m_activeSources[0]->getQuality()))
337  {
338  m_activeSources.push_back(*bestInactiveSource);
339  updateSiteInfo();
340  for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;}
341  }
342  else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
343  {
344  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << (*worstActiveSource)->PrettyID()
345  << " from active sources due to quality (" << (*worstActiveSource)->getQuality()
346  << ") and promoting " << (*bestInactiveSource)->PrettyID() << " (quality: "
347  << (*bestInactiveSource)->getQuality() << ")" << std::endl;
348  (*worstActiveSource)->setLastDowngrade(now);
349  for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;}
350  m_inactiveSources.emplace_back(std::move(*worstActiveSource));
351  m_activeSources.erase(worstActiveSource);
352  m_activeSources.emplace_back(std::move(*bestInactiveSource));
353  updateSiteInfo();
354  eligibleInactiveSources.clear();
355  for (const auto & source : m_inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source);
356  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
357  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
358  worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(),
359  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
360  }
361  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000*XRD_ADAPTOR_LONG_OPEN_DELAY))
362  {
363  float r = m_distribution(m_generator);
365  {
366  findNewSource = true;
367  }
368  }
369  }
370  if (findNewSource)
371  {
372  m_open_handler->open();
374  }
375 
376  // Only aggressively look for new sources if we don't have two.
377  if (m_activeSources.size() == 2)
378  {
380  }
381  else
382  {
383  now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
384  }
386 }
387 
388 std::shared_ptr<XrdCl::File>
390 {
391  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
392  return m_activeSources[0]->getFileHandle();
393 }
394 
395 void
396 RequestManager::getActiveSourceNames(std::vector<std::string> & sources)
397 {
398  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
399  sources.reserve(m_activeSources.size());
400  for (auto const& source : m_activeSources) {
401  sources.push_back(source->ID());
402  }
403 }
404 
405 void
406 RequestManager::getPrettyActiveSourceNames(std::vector<std::string> & sources)
407 {
408  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
409  sources.reserve(m_activeSources.size());
410  for (auto const& source : m_activeSources) {
411  sources.push_back(source->PrettyID());
412  }
413 }
414 
415 void
416 RequestManager::getDisabledSourceNames(std::vector<std::string> & sources)
417 {
418  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
419  sources.reserve(m_disabledSourceStrings.size());
420  for (auto const& source : m_disabledSourceStrings) {
421  sources.push_back(source);
422  }
423 }
424 
425 void
427 {
428  std::vector<std::string> sources;
430  for (auto const& source : sources)
431  {
432  ex.addAdditionalInfo("Active source: " + source);
433  }
434  sources.clear();
435  getDisabledSourceNames(sources);
436  for (auto const& source : sources)
437  {
438  ex.addAdditionalInfo("Disabled source: " + source);
439  }
440 }
441 
442 std::shared_ptr<Source>
444 {
445  std::shared_ptr<Source> source = nullptr;
446  {
447  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
448  if (m_activeSources.size() == 2)
449  {
451  {
452  source = m_activeSources[0];
454  }
455  else
456  {
457  source = m_activeSources[1];
459  }
460  }
461  else
462  {
463  source = m_activeSources[0];
464  }
465  }
466  return source;
467 }
468 
469 std::future<IOSize>
470 RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr)
471 {
472  assert(c_ptr.get());
473  timespec now;
474  GET_CLOCK_MONOTONIC(now);
475  checkSources(now, c_ptr->getSize());
476 
477  std::shared_ptr<Source> source = pickSingleSource();
478  source->handle(c_ptr);
479  return c_ptr->get_future();
480 }
481 
484 {
485  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
486  std::stringstream ss;
487  ss << "tried=";
488  size_t count = 0;
489  for ( const auto & it : m_activeSources )
490  {
491  count++;
492  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
493  }
494  for ( const auto & it : m_inactiveSources )
495  {
496  count++;
497  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
498  }
499  for ( const auto & it : m_disabledExcludeStrings )
500  {
501  count++;
502  ss << it.substr(0, it.find(":")) << ",";
503  }
504  if (count)
505  {
506  std::string tmp_str = ss.str();
507  return tmp_str.substr(0, tmp_str.size()-1);
508  }
509  return "";
510 }
511 
512 void
513 XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source)
514 {
515  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
516  if (status.IsOK())
517  {
518  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
519  for (const auto & s : m_activeSources)
520  {
521  if (source->ID() == s->ID())
522  {
523  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded source " << source->PrettyID()
524  << "; ignoring" << std::endl;
525  unsigned returned_count = ++m_excluded_active_count;
526  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
527  if (returned_count >= 3) {m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2*XRD_ADAPTOR_SHORT_OPEN_DELAY;}
528  return;
529  }
530  }
531  for (const auto & s : m_inactiveSources)
532  {
533  if (source->ID() == s->ID())
534  {
535  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded inactive source " << source->PrettyID()
536  << "; ignoring" << std::endl;
537  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
538  return;
539  }
540  }
541  if (m_activeSources.size() < 2)
542  {
543  m_activeSources.push_back(source);
544  updateSiteInfo();
545  }
546  else
547  {
548  m_inactiveSources.push_back(source);
549  }
550  }
551  else
552  { // File-open failure - wait at least 120s before next attempt.
553  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
554  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
555  }
556 }
557 
558 std::future<IOSize>
559 XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer> > iolist)
560 {
561  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
562 
563  timespec now;
564  GET_CLOCK_MONOTONIC(now);
565 
566  edm::CPUTimer timer;
567  timer.start();
568 
569  assert(m_activeSources.size());
570  if (m_activeSources.size() == 1)
571  {
572  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
573  checkSources(now, c_ptr->getSize());
574  m_activeSources[0]->handle(c_ptr);
575  return c_ptr->get_future();
576  }
577 
578  assert(iolist.get());
579  std::shared_ptr<std::vector<IOPosBuffer> > req1(new std::vector<IOPosBuffer>);
580  std::shared_ptr<std::vector<IOPosBuffer> > req2(new std::vector<IOPosBuffer>);
581  splitClientRequest(*iolist, *req1, *req2);
582 
583  checkSources(now, req1->size() + req2->size());
584  // CheckSources may have removed a source
585  if (m_activeSources.size() == 1)
586  {
587  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
588  m_activeSources[0]->handle(c_ptr);
589  return c_ptr->get_future();
590  }
591 
592  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
593  std::future<IOSize> future1, future2;
594  if (req1->size())
595  {
596  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
597  m_activeSources[0]->handle(c_ptr1);
598  future1 = c_ptr1->get_future();
599  }
600  if (req2->size())
601  {
602  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
603  m_activeSources[1]->handle(c_ptr2);
604  future2 = c_ptr2->get_future();
605  }
606  if (req1->size() && req2->size())
607  {
608  std::future<IOSize> task = std::async(std::launch::deferred,
609  [](std::future<IOSize> a, std::future<IOSize> b){
610  // Wait until *both* results are available. This is essential
611  // as the callback may try referencing the RequestManager. If one
612  // throws an exception (causing the RequestManager to be destroyed by
613  // XrdFile) and the other has a failure, then the recovery code will
614  // reference the destroyed RequestManager.
615  //
616  // Unlike other places where we use shared/weak ptrs to maintain object
617  // lifetime and destruction asynchronously, we *cannot* destroy the request
618  // asynchronously as it is associated with a ROOT buffer. We must wait until we
619  // are guaranteed that XrdCl will not write into the ROOT buffer before we
620  // can return.
621  b.wait(); a.wait();
622  return b.get() + a.get();
623  },
624  std::move(future1),
625  std::move(future2));
626  timer.stop();
627  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
628  return task;
629  }
630  else if (req1->size()) { return future1; }
631  else if (req2->size()) { return future2; }
632  else
633  { // Degenerate case - no bytes to read.
634  std::promise<IOSize> p; p.set_value(0);
635  return p.get_future();
636  }
637 }
638 
639 void
640 RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status)
641 {
642  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
643  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
644 
645  // Fail early for invalid responses - XrdFile has a separate path for handling this.
646  if (c_status.code == XrdCl::errInvalidResponse)
647  {
648  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
650  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name
651  << "', flags=0x" << std::hex << m_flags
652  << ", permissions=0" << std::oct << m_perms << std::dec
653  << ", old source=" << source_ptr->PrettyID()
654  << ") => Invalid ReadV response from server";
655  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
656  addConnections(ex);
657  throw ex;
658  }
659  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
660 
661  // Note that we do not delete the Source itself. That is because this
662  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
663  // In such a case, if you close a file in the handler, it will deadlock
664  m_disabledSourceStrings.insert(source_ptr->ID());
665  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
666  m_disabledSources.insert(source_ptr);
667 
668  if ((m_activeSources.size() > 0) && (m_activeSources[0].get() == source_ptr.get()))
669  {
670  m_activeSources.erase(m_activeSources.begin());
671  updateSiteInfo();
672  }
673  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
674  {
675  m_activeSources.erase(m_activeSources.begin()+1);
676  updateSiteInfo();
677  }
678  std::shared_ptr<Source> new_source;
679  if (m_activeSources.size() == 0)
680  {
681  std::shared_future<std::shared_ptr<Source> > future = m_open_handler->open();
682  timespec now;
683  GET_CLOCK_MONOTONIC(now);
685  // Note we only wait for 180 seconds here. This is because we've already failed
686  // once and the likelihood the program has some inconsistent state is decent.
687  // We'd much rather fail hard than deadlock!
688  sentry.unlock();
689  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout+10));
690  if (status == std::future_status::timeout)
691  {
693  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
694  << "', flags=0x" << std::hex << m_flags
695  << ", permissions=0" << std::oct << m_perms << std::dec
696  << ", old source=" << source_ptr->PrettyID()
697  << ", current server=" << m_open_handler->current_source()
698  << ") => timeout when waiting for file open";
699  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
700  addConnections(ex);
701  throw ex;
702  }
703  else
704  {
705  try
706  {
707  new_source = future.get();
708  }
709  catch (edm::Exception &ex)
710  {
711  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
712  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
713  throw;
714  }
715  }
716  sentry.lock();
717 
718  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) != m_disabledSourceStrings.end())
719  {
720  // The server gave us back a data node we requested excluded. Fatal!
722  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
723  << "', flags=0x" << std::hex << m_flags
724  << ", permissions=0" << std::oct << m_perms << std::dec
725  << ", old source=" << source_ptr->PrettyID()
726  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
727  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
728  addConnections(ex);
729  throw ex;
730  }
731  m_activeSources.push_back(new_source);
732  updateSiteInfo();
733  }
734  else
735  {
736  new_source = m_activeSources[0];
737  }
738  new_source->handle(c_ptr);
739 }
740 
741 static void
742 consumeChunkFront(size_t &front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
743 {
744  while ((chunksize > 0) && (front < input.size()))
745  {
746  IOPosBuffer &io = input[front];
747  IOPosBuffer &outio = output.back();
748  if (io.size() > chunksize)
749  {
750  IOSize consumed;
751  if (output.size() && (outio.size() < XRD_CL_MAX_CHUNK) && (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset()))
752  {
753  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK)
754  {
755  consumed = (XRD_CL_MAX_CHUNK - outio.size());
756  outio.set_size(XRD_CL_MAX_CHUNK);
757  }
758  else
759  {
760  consumed = chunksize;
761  outio.set_size(outio.size() + consumed);
762  }
763  }
764  else
765  {
766  consumed = chunksize;
767  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
768  }
769  chunksize -= consumed;
770  IOSize newsize = io.size() - consumed;
771  IOOffset newoffset = io.offset() + consumed;
772  void* newdata = static_cast<char*>(io.data()) + consumed;
773  io.set_offset(newoffset);
774  io.set_data(newdata);
775  io.set_size(newsize);
776  }
777  else if (io.size() == 0)
778  {
779  front++;
780  }
781  else
782  {
783  output.push_back(io);
784  chunksize -= io.size();
785  front++;
786  }
787  }
788 }
789 
790 static void
791 consumeChunkBack(size_t front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
792 {
793  while ((chunksize > 0) && (front < input.size()))
794  {
795  IOPosBuffer &io = input.back();
796  IOPosBuffer &outio = output.back();
797  if (io.size() > chunksize)
798  {
799  IOSize consumed;
800  if (output.size() && (outio.size() < XRD_CL_MAX_CHUNK) && (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset()))
801  {
802  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK)
803  {
804  consumed = (XRD_CL_MAX_CHUNK - outio.size());
805  outio.set_size(XRD_CL_MAX_CHUNK);
806  }
807  else
808  {
809  consumed = chunksize;
810  outio.set_size(outio.size() + consumed);
811  }
812  }
813  else
814  {
815  consumed = chunksize;
816  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
817  }
818  chunksize -= consumed;
819  IOSize newsize = io.size() - consumed;
820  IOOffset newoffset = io.offset() + consumed;
821  void* newdata = static_cast<char*>(io.data()) + consumed;
822  io.set_offset(newoffset);
823  io.set_data(newdata);
824  io.set_size(newsize);
825  }
826  else if (io.size() == 0)
827  {
828  input.pop_back();
829  }
830  else
831  {
832  output.push_back(io);
833  chunksize -= io.size();
834  input.pop_back();
835  }
836  }
837 }
838 
839 static IOSize validateList(const std::vector<IOPosBuffer> req)
840 {
841  IOSize total = 0;
842  off_t last_offset = -1;
843  for (const auto & it : req)
844  {
845  total += it.size();
846  assert(it.offset() > last_offset);
847  last_offset = it.offset();
848  assert(it.size() <= XRD_CL_MAX_CHUNK);
849  assert(it.offset() < 0x1ffffffffff);
850  }
851  assert(req.size() <= 1024);
852  return total;
853 }
854 
855 void
856 XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist, std::vector<IOPosBuffer> &req1, std::vector<IOPosBuffer> &req2)
857 {
858  if (iolist.size() == 0) return;
859  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
860  req1.reserve(iolist.size()/2+1);
861  req2.reserve(iolist.size()/2+1);
862  size_t front=0;
863 
864  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
865  float q1 = static_cast<float>(m_activeSources[0]->getQuality())+5;
866  float q2 = static_cast<float>(m_activeSources[1]->getQuality())+5;
867  IOSize chunk1, chunk2;
868  chunk1 = static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2));
869  chunk2 = static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2));
870 
871  IOSize size_orig = 0;
872  for (const auto & it : iolist) size_orig += it.size();
873 
874  while (tmp_iolist.size()-front > 0)
875  {
876  if ((req1.size() >= 1000) && (req2.size() >= 1000))
877  { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
878  // are passed to the request manager. However, because we have a max chunk size, we increase
879  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
880  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
881  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
883  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name
884  << "', flags=0x" << std::hex << m_flags
885  << ", permissions=0" << std::oct << m_perms << std::dec
886  << ") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
887  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
888  addConnections(ex);
889  std::stringstream ss; ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
890  ex.addAdditionalInfo(ss.str());
891  std::stringstream ss2; ss2 << "Quality source 1 " << q1-5 << ", quality source 2: " << q2-5;
892  ex.addAdditionalInfo(ss2.str());
893  throw ex;
894  }
895  if (req1.size() < 1000) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
896  if (req2.size() < 1000) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
897  }
898  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
899  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
900 
901  IOSize size1 = validateList(req1);
902  IOSize size2 = validateList(req2);
903 
904  assert(size_orig == size1 + size2);
905 
906  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig << " bytes) split into requests size " << req1.size() << " (" << size1 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
907 }
908 
909 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager)
910  : m_manager(manager)
911 {
912 }
913 
914 
915  // Cannot use ~OpenHandler=default as XrdCl::File is not fully
916  // defined in the header.
918 {
919 }
920 
921 
922 void
923 XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status_ptr, XrdCl::AnyObject *, XrdCl::HostList *hostList_ptr)
924 {
925  std::shared_ptr<Source> source;
926  std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
927  std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
928 
929  // Make sure we get rid of the strong self-reference when the callback finishes.
930  std::shared_ptr<OpenHandler> self = m_self;
931  m_self.reset();
932 
933  auto manager = m_manager.lock();
934  // Manager object has already been deleted. Cleanup the
935  // response objects, remove our self-reference, and ignore the response.
936  if (!manager)
937  {
938  return;
939  }
940  //if we need to delete the File object we must do it outside
941  // of the lock to avoid a potential deadlock
942  std::unique_ptr<XrdCl::File> releaseFile;
943  {
944  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
945 
946  if (status->IsOK())
947  {
948  SendMonitoringInfo(*m_file);
949  timespec now;
950  GET_CLOCK_MONOTONIC(now);
951 
952  std::string excludeString;
953  Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
954 
955  source.reset(new Source(now, std::move(m_file), excludeString));
956  m_promise.set_value(source);
957  }
958  else
959  {
960  releaseFile = std::move(m_file);
962  ex << "XrdCl::File::Open(name='" << manager->m_name
963  << "', flags=0x" << std::hex << manager->m_flags
964  << ", permissions=0" << std::oct << manager->m_perms << std::dec
965  << ") => error '" << status->ToStr()
966  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
967  ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
968  manager->addConnections(ex);
969 
970  m_promise.set_exception(std::make_exception_ptr(ex));
971  }
972  }
973  manager->handleOpen(*status, source);
974 }
975 
978 {
979  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
980 
981  if (!m_file.get())
982  {
983  return "(no open in progress)";
984  }
985  std::string dataServer;
986  m_file->GetProperty("DataServer", dataServer);
987  if (!dataServer.size()) { return "(unknown source)"; }
988  return dataServer;
989 }
990 
991 std::shared_future<std::shared_ptr<Source> >
993 {
994  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
995  auto manager_ptr = m_manager.lock();
996  if (!manager_ptr)
997  {
999  ex << "XrdCl::File::Open() =>"
1000  << " error: OpenHandler called within an invalid RequestManager context."
1001  << " This is a logic error and should be reported to the CMSSW developers.";
1002  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1003  throw ex;
1004  }
1005  RequestManager &manager = *manager_ptr;
1006  auto self_ptr = m_self_weak.lock();
1007  if (!self_ptr)
1008  {
1010  ex << "XrdCl::File::Open() => error: "
1011  << "OpenHandler called after it was deleted. This is a logic error "
1012  << "and should be reported to the CMSSW developers.";
1013  ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1014  throw ex;
1015  }
1016 
1017  if (m_file.get())
1018  {
1019  return m_shared_future;
1020  }
1021  std::promise<std::shared_ptr<Source> > new_promise;
1022  m_promise.swap(new_promise);
1023  m_shared_future = m_promise.get_future().share();
1024 
1025  auto opaque = manager.prepareOpaqueString();
1026  std::string new_name = manager.m_name + ((manager.m_name.find("?") == manager.m_name.npos) ? "?" : "&") + opaque;
1027  edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1028  m_file.reset(new XrdCl::File());
1029  XrdCl::XRootDStatus status;
1030  if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK())
1031  {
1033  ex << "XrdCl::File::Open(name='" << new_name
1034  << "', flags=0x" << std::hex << manager.m_flags
1035  << ", permissions=0" << std::oct << manager.m_perms << std::dec
1036  << ") => error '" << status.ToStr()
1037  << "' (errno=" << status.errNo << ", code=" << status.code << ")";
1038  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1039  manager.addConnections(ex);
1040  throw ex;
1041  }
1042  // Have a strong self-reference for as long as the callback is in-progress.
1043  m_self = self_ptr;
1044  return m_shared_future;
1045 }
1046 
std::shared_future< std::shared_ptr< Source > > open()
double seconds()
void start()
Definition: CPUTimer.cc:74
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
assert(m_qm.get())
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:196
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
void updateSiteInfo(std::string orig_site="")
#define XRD_CL_MAX_CHUNK
std::set< std::string > m_disabledSourceStrings
OpenHandler(std::weak_ptr< RequestManager > manager)
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
double q2[4]
Definition: TauolaWrapper.h:88
SendMonitoringInfoHandler nullHandler
static std::string const input
Definition: EdmProvDump.cc:43
tuple s2
Definition: indexGen.py:106
bool compareSources(const timespec &now, unsigned a, unsigned b)
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static void SendMonitoringInfo(XrdCl::File &file)
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
void swap(Exception &other)
Definition: EDMException.h:87
void addConnections(cms::Exception &)
long long timeDiffMS(const timespec &a, const timespec &b)
std::set< std::shared_ptr< Source > > m_disabledSources
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:144
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:155
def move
Definition: eostools.py:508
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
Times stop()
Definition: CPUTimer.cc:94
std::set< std::string > m_disabledExcludeStrings
std::shared_ptr< XrdCl::File > getActiveFile()
void clearMessage()
Definition: Exception.cc:215
std::shared_ptr< OpenHandler > m_open_handler
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void * data(void) const
Definition: IOPosBuffer.h:59
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void clearContext()
Definition: Exception.cc:219
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
IOSize size(void) const
Definition: IOPosBuffer.h:64
XrdCl::OpenFlags::Flags m_flags
virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
double q1[4]
Definition: TauolaWrapper.h:87
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
void getActiveSourceNames(std::vector< std::string > &sources)
void checkSources(timespec &now, IOSize requestSize)
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
void checkSourcesImpl(timespec &now, IOSize requestSize)
double a
Definition: hdecay.h:121
std::shared_ptr< Source > pickSingleSource()
tuple filename
Definition: lut2db_cfg.py:20
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:233
void getDisabledSourceNames(std::vector< std::string > &sources)
static IOSize validateList(const std::vector< IOPosBuffer > req)
void getPrettyActiveSourceNames(std::vector< std::string > &sources)
size_t IOSize
Definition: IOTypes.h:14
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
volatile std::atomic< bool > shutdown_flag false
tuple status
Definition: ntuplemaker.py:245
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2)
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:116
static std::string const source
Definition: EdmProvDump.cc:42
void clearAdditionalInfo()
Definition: Exception.cc:223
std::recursive_mutex m_source_mutex
void initialize(std::weak_ptr< RequestManager > selfref)