CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
XrdRequestManager.cc
Go to the documentation of this file.
1 
2 #include <assert.h>
3 #include <iostream>
4 #include <algorithm>
5 #include <netdb.h>
6 
7 #include "XrdCl/XrdClFile.hh"
8 #include "XrdCl/XrdClDefaultEnv.hh"
9 
15 
16 #include "XrdStatistics.h"
18 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
19 
20 #define XRD_CL_MAX_CHUNK 512*1024
21 
22 #define XRD_ADAPTOR_SHORT_OPEN_DELAY 5
23 
24 #ifdef XRD_FAKE_OPEN_PROBE
25 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100
26 #define XRD_ADAPTOR_LONG_OPEN_DELAY 20
27 // This is the minimal difference in quality required to swap an active and inactive source
28 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0
29 #else
30 #define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10
31 #define XRD_ADAPTOR_LONG_OPEN_DELAY 2*60
32 #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
33 #endif
34 
35 #define XRD_ADAPTOR_CHUNK_THRESHOLD 1000
36 
37 
38 #ifdef __MACH__
39 #include <mach/clock.h>
40 #include <mach/mach.h>
41 #define GET_CLOCK_MONOTONIC(ts) \
42 { \
43  clock_serv_t cclock; \
44  mach_timespec_t mts; \
45  host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
46  clock_get_time(cclock, &mts); \
47  mach_port_deallocate(mach_task_self(), cclock); \
48  ts.tv_sec = mts.tv_sec; \
49  ts.tv_nsec = mts.tv_nsec; \
50 }
51 #else
52 #define GET_CLOCK_MONOTONIC(ts) \
53  clock_gettime(CLOCK_MONOTONIC, &ts);
54 #endif
55 
56 using namespace XrdAdaptor;
57 
58 long long timeDiffMS(const timespec &a, const timespec &b)
59 {
60  long long diff = (a.tv_sec - b.tv_sec) * 1000;
61  diff += (a.tv_nsec - b.tv_nsec) / 1e6;
62  return diff;
63 }
64 
65 /*
66  * We do not care about the response of sending the monitoring information;
67  * this handler class simply frees any returned buffer to prevent memory leaks.
68  */
69 class SendMonitoringInfoHandler : boost::noncopyable, public XrdCl::ResponseHandler
70 {
71  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
72  {
73  if (response)
74  {
75  XrdCl::Buffer *buffer = nullptr;
76  response->Get(buffer);
77  delete buffer;
78  }
79  }
80 };
81 
83 
84 
85 static void
87 {
88  // Do not send this to a dCache data server as they return an error.
89  // In some versions of dCache, sending the monitoring information causes
90  // the server to close the connection - resulting in failures.
91  if (Source::isDCachePool(file)) {return;}
92 
93  // Send the monitoring info, if available.
95  std::string lastUrl;
96  file.GetProperty("LastURL", lastUrl);
97  if (jobId && lastUrl.size())
98  {
99  XrdCl::FileSystem fs = XrdCl::FileSystem(XrdCl::URL(lastUrl));
100  fs.SendInfo(jobId, &nullHandler, 30);
101  edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
102  }
103 }
104 
105 
107  : m_timeout(XRD_DEFAULT_TIMEOUT),
108  m_nextInitialSourceToggle(false),
109  m_name(filename),
110  m_flags(flags),
111  m_perms(perms),
112  m_distribution(0,100),
113  m_excluded_active_count(0)
114 {
115 }
116 
117 
118 void
119 RequestManager::initialize(std::weak_ptr<RequestManager> self)
120 {
122 
123  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
124  if (env) {env->GetInt("StreamErrorWindow", m_timeout);}
125 
126  std::string orig_site;
127  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos))
128  {
129  std::string hostname;
130  if (Source::getHostname(orig_site, hostname))
131  {
132  Source::getDomain(hostname, orig_site);
133  }
134  }
135 
136  std::unique_ptr<XrdCl::File> file;
138  bool validFile = false;
139  const int retries = 5;
140  std::string excludeString;
141  for (int idx=0; idx<retries; idx++)
142  {
143  file.reset(new XrdCl::File());
144  auto opaque = prepareOpaqueString();
145  std::string new_filename = m_name + (opaque.size() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
146  SyncHostResponseHandler handler;
147  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
148  if (!openStatus.IsOK())
149  { // In this case, we failed immediately - this indicates we have previously tried to talk to this
150  // server and it was marked bad - xrootd couldn't even queue up the request internally!
151  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
152  // redirector being down or authentication failures.
153  ex.clearMessage();
154  ex.clearContext();
155  ex.clearAdditionalInfo();
156  ex << "XrdCl::File::Open(name='" << m_name
157  << "', flags=0x" << std::hex << m_flags
158  << ", permissions=0" << std::oct << m_perms << std::dec
159  << ") => error '" << openStatus.ToStr()
160  << "' (errno=" << openStatus.errNo << ", code=" << openStatus.code << ")";
161  ex.addContext("Calling XrdFile::open()");
162  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
163  throw ex;
164  }
165  handler.WaitForResponse();
166  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
167  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
168  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
169  assert(status);
170  if (status->IsOK())
171  {
172  validFile = true;
173  break;
174  }
175  else
176  {
177  ex.clearMessage();
178  ex.clearContext();
179  ex.clearAdditionalInfo();
180  ex << "XrdCl::File::Open(name='" << m_name
181  << "', flags=0x" << std::hex << m_flags
182  << ", permissions=0" << std::oct << m_perms << std::dec
183  << ") => error '" << status->ToStr()
184  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
185  ex.addContext("Calling XrdFile::open()");
186  addConnections(ex);
187  std::string dataServer, lastUrl;
188  file->GetProperty("DataServer", dataServer);
189  file->GetProperty("LastURL", lastUrl);
190  if (dataServer.size())
191  {
192  ex.addAdditionalInfo("Problematic data server: " + dataServer);
193  }
194  if (lastUrl.size())
195  {
196  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
197  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
198  }
199  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) != m_disabledSourceStrings.end())
200  {
201  ex << ". No additional data servers were found.";
202  throw ex;
203  }
204  if (dataServer.size())
205  {
206  m_disabledSourceStrings.insert(dataServer);
207  m_disabledExcludeStrings.insert(excludeString);
208  }
209  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
210  if (lastUrl == new_filename)
211  {
212  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
213  throw ex;
214  }
215  }
216  }
217  if (!validFile)
218  {
219  throw ex;
220  }
221  SendMonitoringInfo(*file);
222 
223  timespec ts;
225 
226  std::shared_ptr<Source> source(new Source(ts, std::move(file), excludeString));
227  {
228  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
229  m_activeSources.push_back(source);
230  updateSiteInfo(orig_site);
231  }
232 
233  m_lastSourceCheck = ts;
234  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
236 }
237 
238 
239 void
241 {
242  std::string siteA, siteB, siteList;
243  if (m_activeSources.size()) {siteA = m_activeSources[0]->Site();}
244  if (m_activeSources.size() == 2) {siteB = m_activeSources[1]->Site();}
245  siteList = siteA;
246  if (siteB.size() && (siteB != siteA)) {siteList = siteA + ", " + siteB;}
247  if (orig_site.size() && (orig_site != siteList))
248  {
249  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
250  m_activeSites = siteList;
251  }
252  else if (!orig_site.size() && (siteList != m_activeSites))
253  {
254  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << m_activeSites;
255  m_activeSites = siteList;
256  }
257 }
258 
259 
260 void
261 RequestManager::checkSources(timespec &now, IOSize requestSize)
262 {
263  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check "
264  << timeDiffMS(now, m_lastSourceCheck) << "; last check "
265  << m_lastSourceCheck.tv_sec << "; now " <<now.tv_sec
266  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
267  if (timeDiffMS(now, m_lastSourceCheck) > 1000)
268  {
269  { // Be more aggressive about getting rid of very bad sources.
270  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
271  compareSources(now, 0, 1);
272  compareSources(now, 1, 0);
273  }
274  if (timeDiffMS(now, m_nextActiveSourceCheck) > 0)
275  {
276  checkSourcesImpl(now, requestSize);
277  }
278  }
279 }
280 
281 
282 bool
283 RequestManager::compareSources(const timespec &now, unsigned a, unsigned b)
284 {
285  if (m_activeSources.size() < std::max(a, b)+1) {return false;}
286 
287  bool findNewSource = false;
288  if ((m_activeSources[a]->getQuality() > 5130) ||
289  ((m_activeSources[a]->getQuality() > 260) && (m_activeSources[b]->getQuality()*4 < m_activeSources[a]->getQuality())))
290  {
291  edm::LogVerbatim("XrdAdaptorInternal") << "Removing "
292  << m_activeSources[a]->PrettyID() << " from active sources due to poor quality ("
293  << m_activeSources[a]->getQuality() << " vs " << m_activeSources[b]->getQuality() << ")" << std::endl;
294  if (m_activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource = true;}
295  m_activeSources[a]->setLastDowngrade(now);
296  m_inactiveSources.emplace_back(m_activeSources[a]);
297  m_activeSources.erase(m_activeSources.begin()+a);
298  updateSiteInfo();
299  }
300  return findNewSource;
301 }
302 
303 void
305 {
306  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
307 
308  bool findNewSource = false;
309  if (m_activeSources.size() <= 1)
310  {
311  findNewSource = true;
312  }
313  else if (m_activeSources.size() > 1)
314  {
315  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << m_activeSources[0]->getQuality() << ", source 1 quality " << m_activeSources[1]->getQuality() << std::endl;
316  findNewSource |= compareSources(now, 0, 1);
317  findNewSource |= compareSources(now, 1, 0);
318 
319  // NOTE: We could probably replace the copy with a better sort function.
320  // However, there are typically very few sources and the correctness is more obvious right now.
321  std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(m_inactiveSources.size());
322  for (const auto & source : m_inactiveSources)
323  {
324  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY-1)*1000) {eligibleInactiveSources.push_back(source);}
325  }
326  std::vector<std::shared_ptr<Source> >::iterator bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
327  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
328  std::vector<std::shared_ptr<Source> >::iterator worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(),
329  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
330  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
331  {
332  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " <<(*bestInactiveSource)->PrettyID()
333  << ", quality " << (*bestInactiveSource)->getQuality();
334  }
335  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " <<(*worstActiveSource)->PrettyID()
336  << ", quality " << (*worstActiveSource)->getQuality();
337  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
338  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
339  if ((bestInactiveSource != eligibleInactiveSources.end()) && m_activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*m_activeSources[0]->getQuality()))
340  {
341  m_activeSources.push_back(*bestInactiveSource);
342  updateSiteInfo();
343  for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;}
344  }
345  else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
346  {
347  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << (*worstActiveSource)->PrettyID()
348  << " from active sources due to quality (" << (*worstActiveSource)->getQuality()
349  << ") and promoting " << (*bestInactiveSource)->PrettyID() << " (quality: "
350  << (*bestInactiveSource)->getQuality() << ")" << std::endl;
351  (*worstActiveSource)->setLastDowngrade(now);
352  for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;}
353  m_inactiveSources.emplace_back(std::move(*worstActiveSource));
354  m_activeSources.erase(worstActiveSource);
355  m_activeSources.emplace_back(std::move(*bestInactiveSource));
356  updateSiteInfo();
357  eligibleInactiveSources.clear();
358  for (const auto & source : m_inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source);
359  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
360  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
361  worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(),
362  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
363  }
364  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000*XRD_ADAPTOR_LONG_OPEN_DELAY))
365  {
366  float r = m_distribution(m_generator);
368  {
369  findNewSource = true;
370  }
371  }
372  }
373  if (findNewSource)
374  {
375  m_open_handler->open();
377  }
378 
379  // Only aggressively look for new sources if we don't have two.
380  if (m_activeSources.size() == 2)
381  {
383  }
384  else
385  {
386  now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
387  }
389 }
390 
391 std::shared_ptr<XrdCl::File>
393 {
394  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
395  return m_activeSources[0]->getFileHandle();
396 }
397 
398 void
399 RequestManager::getActiveSourceNames(std::vector<std::string> & sources)
400 {
401  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
402  sources.reserve(m_activeSources.size());
403  for (auto const& source : m_activeSources) {
404  sources.push_back(source->ID());
405  }
406 }
407 
408 void
409 RequestManager::getPrettyActiveSourceNames(std::vector<std::string> & sources)
410 {
411  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
412  sources.reserve(m_activeSources.size());
413  for (auto const& source : m_activeSources) {
414  sources.push_back(source->PrettyID());
415  }
416 }
417 
418 void
419 RequestManager::getDisabledSourceNames(std::vector<std::string> & sources)
420 {
421  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
422  sources.reserve(m_disabledSourceStrings.size());
423  for (auto const& source : m_disabledSourceStrings) {
424  sources.push_back(source);
425  }
426 }
427 
428 void
430 {
431  std::vector<std::string> sources;
433  for (auto const& source : sources)
434  {
435  ex.addAdditionalInfo("Active source: " + source);
436  }
437  sources.clear();
438  getDisabledSourceNames(sources);
439  for (auto const& source : sources)
440  {
441  ex.addAdditionalInfo("Disabled source: " + source);
442  }
443 }
444 
445 std::shared_ptr<Source>
447 {
448  std::shared_ptr<Source> source = nullptr;
449  {
450  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
451  if (m_activeSources.size() == 2)
452  {
454  {
455  source = m_activeSources[0];
457  }
458  else
459  {
460  source = m_activeSources[1];
462  }
463  }
464  else
465  {
466  source = m_activeSources[0];
467  }
468  }
469  return source;
470 }
471 
472 std::future<IOSize>
473 RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr)
474 {
475  assert(c_ptr.get());
476  timespec now;
477  GET_CLOCK_MONOTONIC(now);
478  checkSources(now, c_ptr->getSize());
479 
480  std::shared_ptr<Source> source = pickSingleSource();
481  source->handle(c_ptr);
482  return c_ptr->get_future();
483 }
484 
487 {
488  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
489  std::stringstream ss;
490  ss << "tried=";
491  size_t count = 0;
492  for ( const auto & it : m_activeSources )
493  {
494  count++;
495  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
496  }
497  for ( const auto & it : m_inactiveSources )
498  {
499  count++;
500  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
501  }
502  for ( const auto & it : m_disabledExcludeStrings )
503  {
504  count++;
505  ss << it.substr(0, it.find(":")) << ",";
506  }
507  if (count)
508  {
509  std::string tmp_str = ss.str();
510  return tmp_str.substr(0, tmp_str.size()-1);
511  }
512  return "";
513 }
514 
515 void
516 XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source)
517 {
518  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
519  if (status.IsOK())
520  {
521  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
522  for (const auto & s : m_activeSources)
523  {
524  if (source->ID() == s->ID())
525  {
526  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded source " << source->PrettyID()
527  << "; ignoring" << std::endl;
528  unsigned returned_count = ++m_excluded_active_count;
529  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
530  if (returned_count >= 3) {m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2*XRD_ADAPTOR_SHORT_OPEN_DELAY;}
531  return;
532  }
533  }
534  for (const auto & s : m_inactiveSources)
535  {
536  if (source->ID() == s->ID())
537  {
538  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded inactive source " << source->PrettyID()
539  << "; ignoring" << std::endl;
540  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
541  return;
542  }
543  }
544  if (m_activeSources.size() < 2)
545  {
546  m_activeSources.push_back(source);
547  updateSiteInfo();
548  }
549  else
550  {
551  m_inactiveSources.push_back(source);
552  }
553  }
554  else
555  { // File-open failure - wait at least 120s before next attempt.
556  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
557  m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
558  }
559 }
560 
561 std::future<IOSize>
562 XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer> > iolist)
563 {
564  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
565 
566  timespec now;
567  GET_CLOCK_MONOTONIC(now);
568 
569  edm::CPUTimer timer;
570  timer.start();
571 
572  assert(m_activeSources.size());
573  if (m_activeSources.size() == 1)
574  {
575  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
576  checkSources(now, c_ptr->getSize());
577  m_activeSources[0]->handle(c_ptr);
578  return c_ptr->get_future();
579  }
580 
581  assert(iolist.get());
582  std::shared_ptr<std::vector<IOPosBuffer> > req1(new std::vector<IOPosBuffer>);
583  std::shared_ptr<std::vector<IOPosBuffer> > req2(new std::vector<IOPosBuffer>);
584  splitClientRequest(*iolist, *req1, *req2);
585 
586  checkSources(now, req1->size() + req2->size());
587  // CheckSources may have removed a source
588  if (m_activeSources.size() == 1)
589  {
590  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
591  m_activeSources[0]->handle(c_ptr);
592  return c_ptr->get_future();
593  }
594 
595  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
596  std::future<IOSize> future1, future2;
597  if (req1->size())
598  {
599  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
600  m_activeSources[0]->handle(c_ptr1);
601  future1 = c_ptr1->get_future();
602  }
603  if (req2->size())
604  {
605  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
606  m_activeSources[1]->handle(c_ptr2);
607  future2 = c_ptr2->get_future();
608  }
609  if (req1->size() && req2->size())
610  {
611  std::future<IOSize> task = std::async(std::launch::deferred,
612  [](std::future<IOSize> a, std::future<IOSize> b){
613  // Wait until *both* results are available. This is essential
614  // as the callback may try referencing the RequestManager. If one
615  // throws an exception (causing the RequestManager to be destroyed by
616  // XrdFile) and the other has a failure, then the recovery code will
617  // reference the destroyed RequestManager.
618  //
619  // Unlike other places where we use shared/weak ptrs to maintain object
620  // lifetime and destruction asynchronously, we *cannot* destroy the request
621  // asynchronously as it is associated with a ROOT buffer. We must wait until we
622  // are guaranteed that XrdCl will not write into the ROOT buffer before we
623  // can return.
624  b.wait(); a.wait();
625  return b.get() + a.get();
626  },
627  std::move(future1),
628  std::move(future2));
629  timer.stop();
630  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
631  return task;
632  }
633  else if (req1->size()) { return future1; }
634  else if (req2->size()) { return future2; }
635  else
636  { // Degenerate case - no bytes to read.
637  std::promise<IOSize> p; p.set_value(0);
638  return p.get_future();
639  }
640 }
641 
642 void
643 RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status)
644 {
645  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
646  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
647 
648  // Fail early for invalid responses - XrdFile has a separate path for handling this.
649  if (c_status.code == XrdCl::errInvalidResponse)
650  {
651  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
653  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name
654  << "', flags=0x" << std::hex << m_flags
655  << ", permissions=0" << std::oct << m_perms << std::dec
656  << ", old source=" << source_ptr->PrettyID()
657  << ") => Invalid ReadV response from server";
658  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
659  addConnections(ex);
660  throw ex;
661  }
662  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
663 
664  // Note that we do not delete the Source itself. That is because this
665  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
666  // In such a case, if you close a file in the handler, it will deadlock
667  m_disabledSourceStrings.insert(source_ptr->ID());
668  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
669  m_disabledSources.insert(source_ptr);
670 
671  if ((m_activeSources.size() > 0) && (m_activeSources[0].get() == source_ptr.get()))
672  {
673  m_activeSources.erase(m_activeSources.begin());
674  updateSiteInfo();
675  }
676  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
677  {
678  m_activeSources.erase(m_activeSources.begin()+1);
679  updateSiteInfo();
680  }
681  std::shared_ptr<Source> new_source;
682  if (m_activeSources.size() == 0)
683  {
684  std::shared_future<std::shared_ptr<Source> > future = m_open_handler->open();
685  timespec now;
686  GET_CLOCK_MONOTONIC(now);
688  // Note we only wait for 180 seconds here. This is because we've already failed
689  // once and the likelihood the program has some inconsistent state is decent.
690  // We'd much rather fail hard than deadlock!
691  sentry.unlock();
692  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout+10));
693  if (status == std::future_status::timeout)
694  {
696  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
697  << "', flags=0x" << std::hex << m_flags
698  << ", permissions=0" << std::oct << m_perms << std::dec
699  << ", old source=" << source_ptr->PrettyID()
700  << ", current server=" << m_open_handler->current_source()
701  << ") => timeout when waiting for file open";
702  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
703  addConnections(ex);
704  throw ex;
705  }
706  else
707  {
708  try
709  {
710  new_source = future.get();
711  }
712  catch (edm::Exception &ex)
713  {
714  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
715  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
716  throw;
717  }
718  }
719  sentry.lock();
720 
721  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) != m_disabledSourceStrings.end())
722  {
723  // The server gave us back a data node we requested excluded. Fatal!
725  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
726  << "', flags=0x" << std::hex << m_flags
727  << ", permissions=0" << std::oct << m_perms << std::dec
728  << ", old source=" << source_ptr->PrettyID()
729  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
730  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
731  addConnections(ex);
732  throw ex;
733  }
734  m_activeSources.push_back(new_source);
735  updateSiteInfo();
736  }
737  else
738  {
739  new_source = m_activeSources[0];
740  }
741  new_source->handle(c_ptr);
742 }
743 
744 static void
745 consumeChunkFront(size_t &front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
746 {
747  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD))
748  {
749  IOPosBuffer &io = input[front];
750  IOPosBuffer &outio = output.back();
751  if (io.size() > chunksize)
752  {
753  IOSize consumed;
754  if (output.size() && (outio.size() < XRD_CL_MAX_CHUNK) && (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset()))
755  {
756  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK)
757  {
758  consumed = (XRD_CL_MAX_CHUNK - outio.size());
759  outio.set_size(XRD_CL_MAX_CHUNK);
760  }
761  else
762  {
763  consumed = chunksize;
764  outio.set_size(outio.size() + consumed);
765  }
766  }
767  else
768  {
769  consumed = chunksize;
770  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
771  }
772  chunksize -= consumed;
773  IOSize newsize = io.size() - consumed;
774  IOOffset newoffset = io.offset() + consumed;
775  void* newdata = static_cast<char*>(io.data()) + consumed;
776  io.set_offset(newoffset);
777  io.set_data(newdata);
778  io.set_size(newsize);
779  }
780  else if (io.size() == 0)
781  {
782  front++;
783  }
784  else
785  {
786  output.push_back(io);
787  chunksize -= io.size();
788  front++;
789  }
790  }
791 }
792 
793 static void
794 consumeChunkBack(size_t front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
795 {
796  while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD))
797  {
798  IOPosBuffer &io = input.back();
799  IOPosBuffer &outio = output.back();
800  if (io.size() > chunksize)
801  {
802  IOSize consumed;
803  if (output.size() && (outio.size() < XRD_CL_MAX_CHUNK) && (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset()))
804  {
805  if (outio.size() + chunksize > XRD_CL_MAX_CHUNK)
806  {
807  consumed = (XRD_CL_MAX_CHUNK - outio.size());
808  outio.set_size(XRD_CL_MAX_CHUNK);
809  }
810  else
811  {
812  consumed = chunksize;
813  outio.set_size(outio.size() + consumed);
814  }
815  }
816  else
817  {
818  consumed = chunksize;
819  output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
820  }
821  chunksize -= consumed;
822  IOSize newsize = io.size() - consumed;
823  IOOffset newoffset = io.offset() + consumed;
824  void* newdata = static_cast<char*>(io.data()) + consumed;
825  io.set_offset(newoffset);
826  io.set_data(newdata);
827  io.set_size(newsize);
828  }
829  else if (io.size() == 0)
830  {
831  input.pop_back();
832  }
833  else
834  {
835  output.push_back(io);
836  chunksize -= io.size();
837  input.pop_back();
838  }
839  }
840 }
841 
842 static IOSize validateList(const std::vector<IOPosBuffer> req)
843 {
844  IOSize total = 0;
845  off_t last_offset = -1;
846  for (const auto & it : req)
847  {
848  total += it.size();
849  assert(it.offset() > last_offset);
850  last_offset = it.offset();
851  assert(it.size() <= XRD_CL_MAX_CHUNK);
852  assert(it.offset() < 0x1ffffffffff);
853  }
854  assert(req.size() <= 1024);
855  return total;
856 }
857 
858 void
859 XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist, std::vector<IOPosBuffer> &req1, std::vector<IOPosBuffer> &req2)
860 {
861  if (iolist.size() == 0) return;
862  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
863  req1.reserve(iolist.size()/2+1);
864  req2.reserve(iolist.size()/2+1);
865  size_t front=0;
866 
867  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
868  float q1 = static_cast<float>(m_activeSources[0]->getQuality())+5;
869  float q2 = static_cast<float>(m_activeSources[1]->getQuality())+5;
870  IOSize chunk1, chunk2;
871  // Make sure the chunk size is at least 1024; little point to reads less than that size.
872  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
873  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
874 
875  IOSize size_orig = 0;
876  for (const auto & it : iolist) size_orig += it.size();
877 
878  while (tmp_iolist.size()-front > 0)
879  {
880  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) && (req2.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD))
881  { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
882  // are passed to the request manager. However, because we have a max chunk size, we increase
883  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
884  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
885  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
887  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name
888  << "', flags=0x" << std::hex << m_flags
889  << ", permissions=0" << std::oct << m_perms << std::dec
890  << ") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
891  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
892  addConnections(ex);
893  std::stringstream ss; ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
894  ex.addAdditionalInfo(ss.str());
895  std::stringstream ss2; ss2 << "Quality source 1 " << q1-5 << ", quality source 2: " << q2-5;
896  ex.addAdditionalInfo(ss2.str());
897  throw ex;
898  }
899  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
900  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
901  }
902  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
903  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
904 
905  IOSize size1 = validateList(req1);
906  IOSize size2 = validateList(req2);
907 
908  assert(size_orig == size1 + size2);
909 
910  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig << " bytes) split into requests size " << req1.size() << " (" << size1 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
911 }
912 
913 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager)
914  : m_manager(manager)
915 {
916 }
917 
918 
919  // Cannot use ~OpenHandler=default as XrdCl::File is not fully
920  // defined in the header.
922 {
923 }
924 
925 
926 void
927 XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status_ptr, XrdCl::AnyObject *, XrdCl::HostList *hostList_ptr)
928 {
929  std::shared_ptr<Source> source;
930  std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
931  std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
932 
933  // Make sure we get rid of the strong self-reference when the callback finishes.
934  std::shared_ptr<OpenHandler> self = m_self;
935  m_self.reset();
936 
937  auto manager = m_manager.lock();
938  // Manager object has already been deleted. Cleanup the
939  // response objects, remove our self-reference, and ignore the response.
940  if (!manager)
941  {
942  return;
943  }
944  //if we need to delete the File object we must do it outside
945  // of the lock to avoid a potential deadlock
946  std::unique_ptr<XrdCl::File> releaseFile;
947  {
948  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
949 
950  if (status->IsOK())
951  {
952  SendMonitoringInfo(*m_file);
953  timespec now;
954  GET_CLOCK_MONOTONIC(now);
955 
956  std::string excludeString;
957  Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
958 
959  source.reset(new Source(now, std::move(m_file), excludeString));
960  m_promise.set_value(source);
961  }
962  else
963  {
964  releaseFile = std::move(m_file);
966  ex << "XrdCl::File::Open(name='" << manager->m_name
967  << "', flags=0x" << std::hex << manager->m_flags
968  << ", permissions=0" << std::oct << manager->m_perms << std::dec
969  << ") => error '" << status->ToStr()
970  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
971  ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
972  manager->addConnections(ex);
973 
974  m_promise.set_exception(std::make_exception_ptr(ex));
975  }
976  }
977  manager->handleOpen(*status, source);
978 }
979 
982 {
983  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
984 
985  if (!m_file.get())
986  {
987  return "(no open in progress)";
988  }
989  std::string dataServer;
990  m_file->GetProperty("DataServer", dataServer);
991  if (!dataServer.size()) { return "(unknown source)"; }
992  return dataServer;
993 }
994 
995 std::shared_future<std::shared_ptr<Source> >
997 {
998  std::lock_guard<std::recursive_mutex> sentry(m_mutex);
999  auto manager_ptr = m_manager.lock();
1000  if (!manager_ptr)
1001  {
1003  ex << "XrdCl::File::Open() =>"
1004  << " error: OpenHandler called within an invalid RequestManager context."
1005  << " This is a logic error and should be reported to the CMSSW developers.";
1006  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1007  throw ex;
1008  }
1009  RequestManager &manager = *manager_ptr;
1010  auto self_ptr = m_self_weak.lock();
1011  if (!self_ptr)
1012  {
1014  ex << "XrdCl::File::Open() => error: "
1015  << "OpenHandler called after it was deleted. This is a logic error "
1016  << "and should be reported to the CMSSW developers.";
1017  ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1018  throw ex;
1019  }
1020 
1021  if (m_file.get())
1022  {
1023  return m_shared_future;
1024  }
1025  std::promise<std::shared_ptr<Source> > new_promise;
1026  m_promise.swap(new_promise);
1027  m_shared_future = m_promise.get_future().share();
1028 
1029  auto opaque = manager.prepareOpaqueString();
1030  std::string new_name = manager.m_name + ((manager.m_name.find("?") == manager.m_name.npos) ? "?" : "&") + opaque;
1031  edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1032  m_file.reset(new XrdCl::File());
1033  XrdCl::XRootDStatus status;
1034  if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK())
1035  {
1037  ex << "XrdCl::File::Open(name='" << new_name
1038  << "', flags=0x" << std::hex << manager.m_flags
1039  << ", permissions=0" << std::oct << manager.m_perms << std::dec
1040  << ") => error '" << status.ToStr()
1041  << "' (errno=" << status.errNo << ", code=" << status.code << ")";
1042  ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1043  manager.addConnections(ex);
1044  throw ex;
1045  }
1046  // Have a strong self-reference for as long as the callback is in-progress.
1047  m_self = self_ptr;
1048  return m_shared_future;
1049 }
1050 
std::shared_future< std::shared_ptr< Source > > open()
double seconds()
void start()
Definition: CPUTimer.cc:74
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
#define GET_CLOCK_MONOTONIC(ts)
std::uniform_real_distribution< float > m_distribution
assert(m_qm.get())
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:196
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
void updateSiteInfo(std::string orig_site="")
#define XRD_CL_MAX_CHUNK
std::set< std::string > m_disabledSourceStrings
OpenHandler(std::weak_ptr< RequestManager > manager)
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
double q2[4]
Definition: TauolaWrapper.h:88
SendMonitoringInfoHandler nullHandler
static std::string const input
Definition: EdmProvDump.cc:43
tuple s2
Definition: indexGen.py:106
bool compareSources(const timespec &now, unsigned a, unsigned b)
#define XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static void SendMonitoringInfo(XrdCl::File &file)
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
void swap(Exception &other)
Definition: EDMException.h:87
void addConnections(cms::Exception &)
long long timeDiffMS(const timespec &a, const timespec &b)
std::set< std::shared_ptr< Source > > m_disabledSources
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:144
std::vector< std::shared_ptr< Source > > m_inactiveSources
static bool isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList=nullptr)
Definition: XrdSource.cc:155
def move
Definition: eostools.py:508
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
Times stop()
Definition: CPUTimer.cc:94
std::set< std::string > m_disabledExcludeStrings
std::shared_ptr< XrdCl::File > getActiveFile()
void clearMessage()
Definition: Exception.cc:215
std::shared_ptr< OpenHandler > m_open_handler
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void * data(void) const
Definition: IOPosBuffer.h:59
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void clearContext()
Definition: Exception.cc:219
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
IOSize size(void) const
Definition: IOPosBuffer.h:64
XrdCl::OpenFlags::Flags m_flags
virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
double q1[4]
Definition: TauolaWrapper.h:87
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
void getActiveSourceNames(std::vector< std::string > &sources)
void checkSources(timespec &now, IOSize requestSize)
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
void checkSourcesImpl(timespec &now, IOSize requestSize)
double a
Definition: hdecay.h:121
std::shared_ptr< Source > pickSingleSource()
tuple filename
Definition: lut2db_cfg.py:20
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:233
void getDisabledSourceNames(std::vector< std::string > &sources)
static IOSize validateList(const std::vector< IOPosBuffer > req)
void getPrettyActiveSourceNames(std::vector< std::string > &sources)
size_t IOSize
Definition: IOTypes.h:14
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
volatile std::atomic< bool > shutdown_flag false
tuple status
Definition: ntuplemaker.py:245
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2)
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:116
static std::string const source
Definition: EdmProvDump.cc:42
void clearAdditionalInfo()
Definition: Exception.cc:223
std::recursive_mutex m_source_mutex
void initialize(std::weak_ptr< RequestManager > selfref)