CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
XrdRequestManager.cc
Go to the documentation of this file.
1 
2 #include <assert.h>
3 #include <iostream>
4 #include <algorithm>
5 #include <netdb.h>
6 
7 #include "XrdCl/XrdClFile.hh"
8 #include "XrdCl/XrdClDefaultEnv.hh"
9 
15 
16 #include "XrdStatistics.h"
18 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
19 
20 #define XRD_CL_MAX_CHUNK 512*1024
21 
22 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5
23 
24 #ifdef XRD_FAKE_OPEN_PROBE
25 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100
26 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20
27 // This is the minimal difference in quality required to swap an active and inactive source
28 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0
29 #else
30 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10
31 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2*60
32 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
33 #endif
34 
35 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000
36 
37 
38 #ifdef __MACH__
39 #include <mach/clock.h>
40 #include <mach/mach.h>
41 #define GET_CLOCK_MONOTONIC(ts) \
42 { \
43  clock_serv_t cclock; \
44  mach_timespec_t mts; \
45  host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
46  clock_get_time(cclock, &mts); \
47  mach_port_deallocate(mach_task_self(), cclock); \
48  ts.tv_sec = mts.tv_sec; \
49  ts.tv_nsec = mts.tv_nsec; \
50 }
51 #else
52 #define GET_CLOCK_MONOTONIC(ts) \
53  clock_gettime(CLOCK_MONOTONIC, &ts);
54 #endif
55 
56 using namespace XrdAdaptor;
57 
58 long long timeDiffMS(const timespec &a, const timespec &b)
59 {
60  long long diff = (a.tv_sec - b.tv_sec) * 1000;
61  diff += (a.tv_nsec - b.tv_nsec) / 1e6;
62  return diff;
63 }
64 
65 /*
66  * We do not care about the response of sending the monitoring information;
67  * this handler class simply frees any returned buffer to prevent memory leaks.
68  */
69 class SendMonitoringInfoHandler : boost::noncopyable, public XrdCl::ResponseHandler
70 {
71  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
72  {
73  if (response)
74  {
75  XrdCl::Buffer *buffer = nullptr;
76  response->Get(buffer);
77  delete buffer;
78  }
79  }
80 };
81 
83 
84 
85 static void
87 {
88  // Do not send this to a dCache data server as they return an error.
89  // In some versions of dCache, sending the monitoring information causes
90  // the server to close the connection - resulting in failures.
91  if (Source::isDCachePool(file)) {return;}
92 
93  // Send the monitoring info, if available.
95  std::string lastUrl;
96  file.GetProperty("LastURL", lastUrl);
97  if (jobId && lastUrl.size())
98  {
99  XrdCl::FileSystem fs = XrdCl::FileSystem(XrdCl::URL(lastUrl));
100  fs.SendInfo(jobId, &nullHandler, 30);
101  edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
102  }
103 }
104 
105 
107  : m_timeout(XRD_DEFAULT_TIMEOUT),
108  m_nextInitialSourceToggle(false),
109  m_name(filename),
110  m_flags(flags),
111  m_perms(perms),
112  m_distribution(0,100),
113  m_excluded_active_count(0)
114 {
115 }
116 
117 
118 void
119 RequestManager::initialize(std::weak_ptr<RequestManager> self)
120 {
122 
123  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
124  if (env) {env->GetInt("StreamErrorWindow", m_timeout);}
125 
126  std::string orig_site;
127  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos))
128  {
129  std::string hostname;
130  if (Source::getHostname(orig_site, hostname))
131  {
132  Source::getDomain(hostname, orig_site);
133  }
134  }
135 
136  std::unique_ptr<XrdCl::File> file;
138  bool validFile = false;
139  const int retries = 5;
140  std::string excludeString;
141  for (int idx=0; idx<retries; idx++)
142  {
143  file.reset(new XrdCl::File());
144  auto opaque = prepareOpaqueString();
145  std::string new_filename = m_name + (opaque.size() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
146  SyncHostResponseHandler handler;
147  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
148  if (!openStatus.IsOK())
149  { // In this case, we failed immediately - this indicates we have previously tried to talk to this
150  // server and it was marked bad - xrootd couldn't even queue up the request internally!
151  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
152  // redirector being down or authentication failures.
153  ex.clearMessage();
154  ex.clearContext();
155  ex.clearAdditionalInfo();
156  ex << "XrdCl::File::Open(name='" << m_name
157  << "', flags=0x" << std::hex << m_flags
158  << ", permissions=0" << std::oct << m_perms << std::dec
159  << ") => error '" << openStatus.ToStr()
160  << "' (errno=" << openStatus.errNo << ", code=" << openStatus.code << ")";
161  ex.addContext("Calling XrdFile::open()");
162  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
163  throw ex;
164  }
165  handler.WaitForResponse();
166  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
167  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
168  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
169  assert(status);
170  if (status->IsOK())
171  {
172  validFile = true;
173  break;
174  }
175  else
176  {
177  ex.clearMessage();
178  ex.clearContext();
179  ex.clearAdditionalInfo();
180  ex << "XrdCl::File::Open(name='" << m_name
181  << "', flags=0x" << std::hex << m_flags
182  << ", permissions=0" << std::oct << m_perms << std::dec
183  << ") => error '" << status->ToStr()
184  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
185  ex.addContext("Calling XrdFile::open()");
186  addConnections(ex);
187  std::string dataServer, lastUrl;
188  file->GetProperty("DataServer", dataServer);
189  file->GetProperty("LastURL", lastUrl);
190  if (dataServer.size())
191  {
192  ex.addAdditionalInfo("Problematic data server: " + dataServer);
193  }
194  if (lastUrl.size())
195  {
196  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
197  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
198  }
199  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) != m_disabledSourceStrings.end())
200  {
201  ex << ". No additional data servers were found.";
202  throw ex;
203  }
204  if (dataServer.size())
205  {
206  m_disabledSourceStrings.insert(dataServer);
207  m_disabledExcludeStrings.insert(excludeString);
208  }
209  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
210  if (lastUrl == new_filename)
211  {
212  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
213  throw ex;
214  }
215  }
216  }
217  if (!validFile)
218  {
219  throw ex;
220  }
221  SendMonitoringInfo(*file);
222 
223  timespec ts;
225 
226  std::shared_ptr<Source> source(new Source(ts, std::move(file), excludeString));
227  {
228  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
229  m_activeSources.push_back(source);
230  updateSiteInfo(orig_site);
231  }
232 
233  m_lastSourceCheck = ts;
234  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
236 }
237 
238 
239 void
241 {
242  std::string siteA, siteB, siteList;
243  if (m_activeSources.size()) {siteA = m_activeSources[0]->Site();}
244  if (m_activeSources.size() == 2) {siteB = m_activeSources[1]->Site();}
245  siteList = siteA;
246  if (siteB.size() && (siteB != siteA)) {siteList = siteA + ", " + siteB;}
247  if (orig_site.size() && (orig_site != siteList))
248  {
249  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
250  m_activeSites = siteList;
251  }
252  else if (!orig_site.size() && (siteList != m_activeSites))
253  {
254  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << m_activeSites;
255  m_activeSites = siteList;
256  }
257 }
258 
259 
260 void
261 RequestManager::checkSources(timespec &now, IOSize requestSize)
262 {
263  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check "
264  << timeDiffMS(now, m_lastSourceCheck) << "; last check "
265  << m_lastSourceCheck.tv_sec << "; now " <<now.tv_sec
266  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
267  if (timeDiffMS(now, m_lastSourceCheck) > 1000)
268  {
269  { // Be more aggressive about getting rid of very bad sources.
270  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
271  compareSources(now, 0, 1);
272  compareSources(now, 1, 0);
273  }
274  if (timeDiffMS(now, m_nextActiveSourceCheck) > 0)
275  {
276  checkSourcesImpl(now, requestSize);
277  }
278  }
279 }
280 
281 
282 bool
283 RequestManager::compareSources(const timespec &now, unsigned a, unsigned b)
284 {
285  if (m_activeSources.size() < std::max(a, b)+1) {return false;}
286 
287  bool findNewSource = false;
288  if ((m_activeSources[a]->getQuality() > 5130) ||
289  ((m_activeSources[a]->getQuality() > 260) && (m_activeSources[b]->getQuality()*4 < m_activeSources[a]->getQuality())))
290  {
291  edm::LogVerbatim("XrdAdaptorInternal") << "Removing "
292  << m_activeSources[a]->PrettyID() << " from active sources due to poor quality ("
293  << m_activeSources[a]->getQuality() << " vs " << m_activeSources[b]->getQuality() << ")" << std::endl;
294  if (m_activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource = true;}
295  m_activeSources[a]->setLastDowngrade(now);
296  m_inactiveSources.emplace_back(m_activeSources[a]);
297  m_activeSources.erase(m_activeSources.begin()+a);
298  updateSiteInfo();
299  }
300  return findNewSource;
301 }
302 
303 void
305 {
306  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
307 
308  bool findNewSource = false;
309  if (m_activeSources.size() <= 1)
310  {
311  findNewSource = true;
312  }
313  else if (m_activeSources.size() > 1)
314  {
315  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << m_activeSources[0]->getQuality() << ", source 1 quality " << m_activeSources[1]->getQuality() << std::endl;
316  findNewSource |= compareSources(now, 0, 1);
317  findNewSource |= compareSources(now, 1, 0);
318 
319  // NOTE: We could probably replace the copy with a better sort function.
320  // However, there are typically very few sources and the correctness is more obvious right now.
321  std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(m_inactiveSources.size());
322  for (const auto & source : m_inactiveSources)
323  {
324  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY-1)*1000) {eligibleInactiveSources.push_back(source);}
325  }
326  std::vector<std::shared_ptr<Source> >::iterator bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
327  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
328  std::vector<std::shared_ptr<Source> >::iterator worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(),
329  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
330  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
331  {
332  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " <<(*bestInactiveSource)->PrettyID()
333  << ", quality " << (*bestInactiveSource)->getQuality();
334  }
335  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " <<(*worstActiveSource)->PrettyID()
336  << ", quality " << (*worstActiveSource)->getQuality();
337  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
338  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
339  if ((bestInactiveSource != eligibleInactiveSources.end()) && m_activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*m_activeSources[0]->getQuality()))
340  {
341  m_activeSources.push_back(*bestInactiveSource);
342  updateSiteInfo();
343  for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;}
344  }
345  else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
346  {
347  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << (*worstActiveSource)->PrettyID()
348  << " from active sources due to quality (" << (*worstActiveSource)->getQuality()
349  << ") and promoting " << (*bestInactiveSource)->PrettyID() << " (quality: "
350  << (*bestInactiveSource)->getQuality() << ")" << std::endl;
351  (*worstActiveSource)->setLastDowngrade(now);
352  for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;}
353  m_inactiveSources.emplace_back(std::move(*worstActiveSource));
354  m_activeSources.erase(worstActiveSource);
355  m_activeSources.emplace_back(std::move(*bestInactiveSource));
356  updateSiteInfo();
357  eligibleInactiveSources.clear();
358  for (const auto & source : m_inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source);
359  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
360  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
361  worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(),
362  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
363  }
364  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000*XRD_ADAPTOR_LONG_OPEN_DELAY))
365  {
366  float r = m_distribution(m_generator);
368  {
369  findNewSource = true;
370  }
371  }
372  }
373  if (findNewSource)
374  {
375  m_open_handler->open();
377  }
378 
379  // Only aggressively look for new sources if we don't have two.
380  if (m_activeSources.size() == 2)
381  {
383  }
384  else
385  {
386  now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
387  }
389 }
390 
391 std::shared_ptr<XrdCl::File>
393 {
394  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
395  return m_activeSources[0]->getFileHandle();
396 }
397 
398 void
399 RequestManager::getActiveSourceNames(std::vector<std::string> & sources)
400 {
401  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
402  sources.reserve(m_activeSources.size());
403  for (auto const& source : m_activeSources) {
404  sources.push_back(source->ID());
405  }
406 }
407 
408 void
409 RequestManager::getPrettyActiveSourceNames(std::vector<std::string> & sources)
410 {
411  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
412  sources.reserve(m_activeSources.size());
413  for (auto const& source : m_activeSources) {
414  sources.push_back(source->PrettyID());
415  }
416 }
417 
418 void
419 RequestManager::getDisabledSourceNames(std::vector<std::string> & sources)
420 {
421  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
422  sources.reserve(m_disabledSourceStrings.size());
423  for (auto const& source : m_disabledSourceStrings) {
424  sources.push_back(source);
425  }
426 }
427 
428 void
430 {
431  std::vector<std::string> sources;
433  for (auto const& source : sources)
434  {
435  ex.addAdditionalInfo("Active source: " + source);
436  }
437  sources.clear();
438  getDisabledSourceNames(sources);
439  for (auto const& source : sources)
440  {
441  ex.addAdditionalInfo("Disabled source: " + source);
442  }
443 }
444 
445 std::shared_ptr<Source>
447 {
448  std::shared_ptr<Source> source = nullptr;
449  {
450  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
451  if (m_activeSources.size() == 2)
452  {
454  {
455  source = m_activeSources[0];
457  }
458  else
459  {
460  source = m_activeSources[1];
462  }
463  }
464  else
465  {
466  source = m_activeSources[0];
467  }
468  }
469  return source;
470 }
471 
472 std::future<IOSize>
473 RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr)
474 {
475  assert(c_ptr.get());
476  timespec now;
477  GET_CLOCK_MONOTONIC(now);
478  checkSources(now, c_ptr->getSize());
479 
480  std::shared_ptr<Source> source = pickSingleSource();
481  source->handle(c_ptr);
482  return c_ptr->get_future();
483 }
484 
487 {
488  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
489  std::stringstream ss;
490  ss << "tried=";
491  size_t count = 0;
492  for ( const auto & it : m_activeSources )
493  {
494  count++;
495  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
496  }
497  for ( const auto & it : m_inactiveSources )
498  {
499  count++;
500  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
501  }
502  for ( const auto & it : m_disabledExcludeStrings )
503  {
504  count++;
505  ss << it.substr(0, it.find(":")) << ",";
506  }
507  if (count)
508  {
509  std::string tmp_str = ss.str();
510  return tmp_str.substr(0, tmp_str.size()-1);
511  }
512  return "";
513 }
514 
515 void
516 XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source)
517 {
518  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
519  if (status.IsOK())
520  {
521  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
522  for (const auto & s : m_activeSources)
523  {
524  if (source->ID() == s->ID())
525  {
526  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded source " << source->PrettyID()
527  << "; ignoring" << std::endl;
528  unsigned returned_count = ++m_excluded_active_count;
529  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
530  if (returned_count >= 3) {m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2*XRD_ADAPTOR_SHORT_OPEN_DELAY;}
531  return;
532  }
533  }
534  for (const auto & s : m_inactiveSources)
535  {
536  if (source->ID() == s->ID())
537  {
538  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded inactive source " << source->PrettyID()
539  << "; ignoring" << std::endl;
540  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
541  return;
542  }
543  }
544  if (m_activeSources.size() < 2)
545  {
546  m_activeSources.push_back(source);
547  updateSiteInfo();
548  }
549  else
550  {
551  m_inactiveSources.push_back(source);
552  }
553  }
554  else
555  { // File-open failure - wait at least 120s before next attempt.
556  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
557  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
558  }
559 }
560 
561 std::future<IOSize>
562 XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer> > iolist)
563 {
564  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
565 
566  timespec now;
567  GET_CLOCK_MONOTONIC(now);
568 
569  edm::CPUTimer timer;
570  timer.start();
571 
572  assert(m_activeSources.size());
573  if (m_activeSources.size() == 1)
574  {
575  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
576  checkSources(now, c_ptr->getSize());
577  m_activeSources[0]->handle(c_ptr);
578  return c_ptr->get_future();
579  }
580 
581  assert(iolist.get());
582  std::shared_ptr<std::vector<IOPosBuffer> > req1(new std::vector<IOPosBuffer>);
583  std::shared_ptr<std::vector<IOPosBuffer> > req2(new std::vector<IOPosBuffer>);
584  splitClientRequest(*iolist, *req1, *req2);
585 
586  checkSources(now, req1->size() + req2->size());
587  // CheckSources may have removed a source
588  if (m_activeSources.size() == 1)
589  {
590  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
591  m_activeSources[0]->handle(c_ptr);
592  return c_ptr->get_future();
593  }
594 
595  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
596  std::future<IOSize> future1, future2;
597  if (req1->size())
598  {
599  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
600  m_activeSources[0]->handle(c_ptr1);
601  future1 = c_ptr1->get_future();
602  }
603  if (req2->size())
604  {
605  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
606  m_activeSources[1]->handle(c_ptr2);
607  future2 = c_ptr2->get_future();
608  }
609  if (req1->size() && req2->size())
610  {
611  std::future<IOSize> task = std::async(std::launch::deferred,
612  [](std::future<IOSize> a, std::future<IOSize> b){
613  // Wait until *both* results are available. This is essential
614  // as the callback may try referencing the RequestManager. If one
615  // throws an exception (causing the RequestManager to be destroyed by
616  // XrdFile) and the other has a failure, then the recovery code will
617  // reference the destroyed RequestManager.
618  //
619  // Unlike other places where we use shared/weak ptrs to maintain object
620  // lifetime and destruction asynchronously, we *cannot* destroy the request
621  // asynchronously as it is associated with a ROOT buffer. We must wait until we
622  // are guaranteed that XrdCl will not write into the ROOT buffer before we
623  // can return.
624  b.wait(); a.wait();
625  return b.get() + a.get();
626  },
627  std::move(future1),
628  std::move(future2));
629  timer.stop();
630  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
631  return task;
632  }
633  else if (req1->size()) { return future1; }
634  else if (req2->size()) { return future2; }
635  else
636  { // Degenerate case - no bytes to read.
637  std::promise<IOSize> p; p.set_value(0);
638  return p.get_future();
639  }
640 }
641 
642 void
643 RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status)
644 {
645  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
646  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
647 
648  // Fail early for invalid responses - XrdFile has a separate path for handling this.
649  if (c_status.code == XrdCl::errInvalidResponse)
650  {
651  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
653  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name
654  << "', flags=0x" << std::hex << m_flags
655  << ", permissions=0" << std::oct << m_perms << std::dec
656  << ", old source=" << source_ptr->PrettyID()
657  << ") => Invalid ReadV response from server";
658  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
659  addConnections(ex);
660  throw ex;
661  }
662  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
663 
664  // Note that we do not delete the Source itself. That is because this
665  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
666  // In such a case, if you close a file in the handler, it will deadlock
667  m_disabledSourceStrings.insert(source_ptr->ID());
668  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
669  m_disabledSources.insert(source_ptr);
670 
671  if ((m_activeSources.size() > 0) && (m_activeSources[0].get() == source_ptr.get()))
672  {
673  m_activeSources.erase(m_activeSources.begin());
674  updateSiteInfo();
675  }
676  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
677  {
678  m_activeSources.erase(m_activeSources.begin()+1);
679  updateSiteInfo();
680  }
681  std::shared_ptr<Source> new_source;
682  if (m_activeSources.size() == 0)
683  {
684  std::shared_future<std::shared_ptr<Source> > future = m_open_handler->open();
685  timespec now;
686  GET_CLOCK_MONOTONIC(now);
688  // Note we only wait for 180 seconds here. This is because we've already failed
689  // once and the likelihood the program has some inconsistent state is decent.
690  // We'd much rather fail hard than deadlock!
691  sentry.unlock();
692  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout+10));
693  if (status == std::future_status::timeout)
694  {
696  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
697  << "', flags=0x" << std::hex << m_flags
698  << ", permissions=0" << std::oct << m_perms << std::dec
699  << ", old source=" << source_ptr->PrettyID()
700  << ") => timeout when waiting for file open";
701  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
702  addConnections(ex);
703  throw ex;
704  }
705  else
706  {
707  try
708  {
709  new_source = future.get();
710  }
711  catch (edm::Exception &ex)
712  {
713  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
714  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
715  throw;
716  }
717  }
718  sentry.lock();
719 
720  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) != m_disabledSourceStrings.end())
721  {
722  // The server gave us back a data node we requested excluded. Fatal!
724  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
725  << "', flags=0x" << std::hex << m_flags
726  << ", permissions=0" << std::oct << m_perms << std::dec
727  << ", old source=" << source_ptr->PrettyID()
728  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
729  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
730  addConnections(ex);
731  throw ex;
732  }
733  m_activeSources.push_back(new_source);
734  updateSiteInfo();
735  }
736  else
737  {
738  new_source = m_activeSources[0];
739  }
740  new_source->handle(c_ptr);
741 }
742 
743 static void
744 consumeChunkFront(size_t &front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
745 {
746  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD))
747  {
748  IOPosBuffer &io = input[front];
749  IOPosBuffer &outio = output.back();
750  if (io.size() > chunksize)
751  {
752  IOSize consumed;
753  if (output.size() && (outio.size() < XRD_CL_MAX_CHUNK) && (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset()))
754  {
755  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK)
756  {
757  consumed = (XRD_CL_MAX_CHUNK - outio.size());
758  outio.set_size(XRD_CL_MAX_CHUNK);
759  }
760  else
761  {
762  consumed = chunksize;
763  outio.set_size(outio.size() + consumed);
764  }
765  }
766  else
767  {
768  consumed = chunksize;
769  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
770  }
771  chunksize -= consumed;
772  IOSize newsize = io.size() - consumed;
773  IOOffset newoffset = io.offset() + consumed;
774  void* newdata = static_cast<char*>(io.data()) + consumed;
775  io.set_offset(newoffset);
776  io.set_data(newdata);
777  io.set_size(newsize);
778  }
779  else if (io.size() == 0)
780  {
781  front++;
782  }
783  else
784  {
785  output.push_back(io);
786  chunksize -= io.size();
787  front++;
788  }
789  }
790 }
791 
792 static void
793 consumeChunkBack(size_t front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
794 {
795  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD))
796  {
797  IOPosBuffer &io = input.back();
798  IOPosBuffer &outio = output.back();
799  if (io.size() > chunksize)
800  {
801  IOSize consumed;
802  if (output.size() && (outio.size() < XRD_CL_MAX_CHUNK) && (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset()))
803  {
804  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK)
805  {
806  consumed = (XRD_CL_MAX_CHUNK - outio.size());
807  outio.set_size(XRD_CL_MAX_CHUNK);
808  }
809  else
810  {
811  consumed = chunksize;
812  outio.set_size(outio.size() + consumed);
813  }
814  }
815  else
816  {
817  consumed = chunksize;
818  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
819  }
820  chunksize -= consumed;
821  IOSize newsize = io.size() - consumed;
822  IOOffset newoffset = io.offset() + consumed;
823  void* newdata = static_cast<char*>(io.data()) + consumed;
824  io.set_offset(newoffset);
825  io.set_data(newdata);
826  io.set_size(newsize);
827  }
828  else if (io.size() == 0)
829  {
830  input.pop_back();
831  }
832  else
833  {
834  output.push_back(io);
835  chunksize -= io.size();
836  input.pop_back();
837  }
838  }
839 }
840 
841 static IOSize validateList(const std::vector<IOPosBuffer> req)
842 {
843  IOSize total = 0;
844  off_t last_offset = -1;
845  for (const auto & it : req)
846  {
847  total += it.size();
848  assert(it.offset() > last_offset);
849  last_offset = it.offset();
850  assert(it.size() <= XRD_CL_MAX_CHUNK);
851  assert(it.offset() < 0x1ffffffffff);
852  }
853  assert(req.size() <= 1024);
854  return total;
855 }
856 
857 void
858 XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist, std::vector<IOPosBuffer> &req1, std::vector<IOPosBuffer> &req2)
859 {
860  if (iolist.size() == 0) return;
861  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
862  req1.reserve(iolist.size()/2+1);
863  req2.reserve(iolist.size()/2+1);
864  size_t front=0;
865 
866  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
867  float q1 = static_cast<float>(m_activeSources[0]->getQuality())+5;
868  float q2 = static_cast<float>(m_activeSources[1]->getQuality())+5;
869  IOSize chunk1, chunk2;
870  // Make sure the chunk size is at least 1024; little point to reads less than that size.
871  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
872  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
873 
874  IOSize size_orig = 0;
875  for (const auto & it : iolist) size_orig += it.size();
876 
877  while (tmp_iolist.size()-front > 0)
878  {
879  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) && (req2.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD))
880  { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
881  // are passed to the request manager. However, because we have a max chunk size, we increase
882  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
883  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
884  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
886  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name
887  << "', flags=0x" << std::hex << m_flags
888  << ", permissions=0" << std::oct << m_perms << std::dec
889  << ") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
890  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
891  addConnections(ex);
892  std::stringstream ss; ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
893  ex.addAdditionalInfo(ss.str());
894  std::stringstream ss2; ss2 << "Quality source 1 " << q1-5 << ", quality source 2: " << q2-5;
895  ex.addAdditionalInfo(ss2.str());
896  throw ex;
897  }
898  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
899  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
900  }
901  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
902  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
903 
904  IOSize size1 = validateList(req1);
905  IOSize size2 = validateList(req2);
906 
907  assert(size_orig == size1 + size2);
908 
909  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig << " bytes) split into requests size " << req1.size() << " (" << size1 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
910 }
911 
912 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager)
913  : m_manager(manager)
914 {
915 }
916 
917 
918  // Cannot use ~OpenHandler=default as XrdCl::File is not fully
919  // defined in the header.
921 {
922 }
923 
924 
925 void
926 XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status_ptr, XrdCl::AnyObject *, XrdCl::HostList *hostList_ptr)
927 {
928  std::shared_ptr<Source> source;
929  std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
930  std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
931 
932  // Make sure we get rid of the strong self-reference when the callback finishes.
933  std::shared_ptr<OpenHandler> self = m_self;
934  m_self.reset();
935 
936  auto manager = m_manager.lock();
937  // Manager object has already been deleted. Cleanup the
938  // response objects, remove our self-reference, and ignore the response.
939  if (!manager)
940  {
941  return;
942  }
943  //if we need to delete the File object we must do it outside
944  // of the lock to avoid a potential deadlock
945  std::unique_ptr<XrdCl::File> releaseFile;
946  {
947  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
948 
949  if (status->IsOK())
950  {
951  SendMonitoringInfo(*m_file);
952  timespec now;
953  GET_CLOCK_MONOTONIC(now);
954 
955  std::string excludeString;
956  Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
957 
958  source.reset(new Source(now, std::move(m_file), excludeString));
959  m_promise.set_value(source);
960  }
961  else
962  {
963  releaseFile = std::move(m_file);
965  ex << "XrdCl::File::Open(name='" << manager->m_name
966  << "', flags=0x" << std::hex << manager->m_flags
967  << ", permissions=0" << std::oct << manager->m_perms << std::dec
968  << ") => error '" << status->ToStr()
969  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
970  ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
971  manager->addConnections(ex);
972 
973  m_promise.set_exception(std::make_exception_ptr(ex));
974  }
975  }
976  manager->handleOpen(*status, source);
977 }
978 
981 {
982  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
983 
984  if (!m_file.get())
985  {
986  return "(no open in progress)";
987  }
988  std::string dataServer;
989  m_file->GetProperty("DataServer", dataServer);
990  if (!dataServer.size()) { return "(unknown source)"; }
991  return dataServer;
992 }
993 
994 std::shared_future<std::shared_ptr<Source> >
996 {
997  auto manager_ptr = m_manager.lock();
998  if (!manager_ptr)
999  {
1001  ex << "XrdCl::File::Open() =>"
1002  << " error: OpenHandler called within an invalid RequestManager context."
1003  << " This is a logic error and should be reported to the CMSSW developers.";
1004  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1005  throw ex;
1006  }
1007  RequestManager &manager = *manager_ptr;
1008  auto self_ptr = m_self_weak.lock();
1009  if (!self_ptr)
1010  {
1012  ex << "XrdCl::File::Open() => error: "
1013  << "OpenHandler called after it was deleted. This is a logic error "
1014  << "and should be reported to the CMSSW developers.";
1015  ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1016  throw ex;
1017  }
1018 
1019  // NOTE NOTE: we look at this variable *without* the lock. This means the method
1020  // is not thread-safe; the caller is responsible to verify it is not called from
1021  // multiple threads simultaneously.
1022  //
1023  // This is done because ::open may be called from a Xrootd callback; if we
1024  // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1025  // and make a call into xrootd (when it invokes m_file.reset()). Hence, our callback
1026  // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1027  // an Xrootd mutex and tries to hold m_mutex. This is a classic deadlock.
1028  if (m_file.get())
1029  {
1030  return m_shared_future;
1031  }
1032  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1033  std::promise<std::shared_ptr<Source> > new_promise;
1034  m_promise.swap(new_promise);
1035  m_shared_future = m_promise.get_future().share();
1036 
1037  auto opaque = manager.prepareOpaqueString();
1038  std::string new_name = manager.m_name + ((manager.m_name.find("?") == manager.m_name.npos) ? "?" : "&") + opaque;
1039  edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1040  m_file.reset(new XrdCl::File());
1041  XrdCl::XRootDStatus status;
1042  if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK())
1043  {
1045  ex << "XrdCl::File::Open(name='" << new_name
1046  << "', flags=0x" << std::hex << manager.m_flags
1047  << ", permissions=0" << std::oct << manager.m_perms << std::dec
1048  << ") => error '" << status.ToStr()
1049  << "' (errno=" << status.errNo << ", code=" << status.code << ")";
1050  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1051  manager.addConnections(ex);
1052  throw ex;
1053  }
1054  // Have a strong self-reference for as long as the callback is in-progress.
1055  m_self = self_ptr;
1056  return m_shared_future;
1057 }
1058 
std::shared_future< std::shared_ptr< Source > > open()
double seconds()
void start()
Definition: CPUTimer.cc:74
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
assert(m_qm.get())
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:196
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
void updateSiteInfo(std::string orig_site="")
#define XRD_CL_MAX_CHUNK
std::set< std::string > m_disabledSourceStrings
OpenHandler(std::weak_ptr< RequestManager > manager)
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
double q2[4]
Definition: TauolaWrapper.h:88
SendMonitoringInfoHandler nullHandler
static std::string const input
Definition: EdmProvDump.cc:43
tuple s2
Definition: indexGen.py:106
bool compareSources(const timespec &now, unsigned a, unsigned b)
#define XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static void SendMonitoringInfo(XrdCl::File &file)
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
void addConnections(cms::Exception &)
long long timeDiffMS(const timespec &a, const timespec &b)
std::set< std::shared_ptr< Source > > m_disabledSources
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:144
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:155
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
Times stop()
Definition: CPUTimer.cc:94
std::set< std::string > m_disabledExcludeStrings
std::shared_ptr< XrdCl::File > getActiveFile()
void clearMessage()
Definition: Exception.cc:215
std::shared_ptr< OpenHandler > m_open_handler
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void * data(void) const
Definition: IOPosBuffer.h:59
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void clearContext()
Definition: Exception.cc:219
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
IOSize size(void) const
Definition: IOPosBuffer.h:64
XrdCl::OpenFlags::Flags m_flags
virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
double q1[4]
Definition: TauolaWrapper.h:87
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
void getActiveSourceNames(std::vector< std::string > &sources)
void checkSources(timespec &now, IOSize requestSize)
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
void checkSourcesImpl(timespec &now, IOSize requestSize)
double a
Definition: hdecay.h:121
std::shared_ptr< Source > pickSingleSource()
tuple filename
Definition: lut2db_cfg.py:20
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:233
void getDisabledSourceNames(std::vector< std::string > &sources)
static IOSize validateList(const std::vector< IOPosBuffer > req)
void getPrettyActiveSourceNames(std::vector< std::string > &sources)
size_t IOSize
Definition: IOTypes.h:14
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
volatile std::atomic< bool > shutdown_flag false
tuple status
Definition: ntuplemaker.py:245
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2)
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:116
static std::string const source
Definition: EdmProvDump.cc:42
void clearAdditionalInfo()
Definition: Exception.cc:223
std::recursive_mutex m_source_mutex
void initialize(std::weak_ptr< RequestManager > selfref)