CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Inheritance diagram for XrdAdaptor::RequestManager:

Classes

class  OpenHandler
 

Public Member Functions

void addConnections (cms::Exception &) const
 
std::shared_ptr< XrdCl::File > getActiveFile () const
 
void getActiveSourceNames (std::vector< std::string > &sources) const
 
void getDisabledSourceNames (std::vector< std::string > &sources) const
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources) const
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer >> iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
virtual ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr< RequestManagergetInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
void checkSourcesImpl (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
bool compareSources (const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString () const
 
void queueUpdateCurrentServer (const std::string &)
 
void reportSiteChange (std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
 
void updateCurrentServer ()
 

Private Attributes

std::vector< std::shared_ptr< Source > > m_activeSources
 
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
 
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHashm_disabledSources
 
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
 
std::uniform_real_distribution< float > m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr< Source > > m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
std::atomic< std::string * > m_serverToAdvertise
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 45 of file XrdRequestManager.h.

Constructor & Destructor Documentation

◆ ~RequestManager()

virtual XrdAdaptor::RequestManager::~RequestManager ( )
virtualdefault

◆ RequestManager()

RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 108 of file XrdRequestManager.cc.

109  : m_serverToAdvertise(nullptr),
112  m_name(filename),
113  m_flags(flags),
114  m_perms(perms),
115  m_distribution(0, 100),

Referenced by getInstance().

Member Function Documentation

◆ addConnections()

void RequestManager::addConnections ( cms::Exception ex) const

Add the list of active connections to the exception extra info.

Definition at line 481 of file XrdRequestManager.cc.

481  {
482  std::vector<std::string> sources;
484  for (auto const &source : sources) {
485  ex.addAdditionalInfo("Active source: " + source);
486  }
487  sources.clear();
489  for (auto const &source : sources) {
490  ex.addAdditionalInfo("Disabled source: " + source);
491  }
492 }

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), source, and CalibrationSummaryClient_cfi::sources.

Referenced by getActiveFile(), initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), pickSingleSource(), and requestFailure().

◆ broadcastRequest()

void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

◆ checkSources()

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 293 of file XrdRequestManager.cc.

296  {
297  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
298  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
299  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
300  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
301  { // Be more aggressive about getting rid of very bad sources.
302  compareSources(now, 0, 1, activeSources, inactiveSources);
303  compareSources(now, 1, 0, activeSources, inactiveSources);
304  }
306  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
307  }
308  }
309 }

References CalibrationSummaryClient_cfi::activeSources, checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, fileCollector::now, and timeDiffMS().

Referenced by handle().

◆ checkSourcesImpl()

void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Definition at line 339 of file XrdRequestManager.cc.

342  {
343  bool findNewSource = false;
344  if (activeSources.size() <= 1) {
345  findNewSource = true;
346  } else if (activeSources.size() > 1) {
347  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
348  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
349  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
350  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
351 
352  // NOTE: We could probably replace the copy with a better sort function.
353  // However, there are typically very few sources and the correctness is more obvious right now.
354  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
355  eligibleInactiveSources.reserve(inactiveSources.size());
356  for (const auto &source : inactiveSources) {
357  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
358  eligibleInactiveSources.push_back(source);
359  }
360  }
361  auto bestInactiveSource =
362  std::min_element(eligibleInactiveSources.begin(),
363  eligibleInactiveSources.end(),
364  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
365  return s1->getQuality() < s2->getQuality();
366  });
367  auto worstActiveSource = std::max_element(activeSources.cbegin(),
368  activeSources.cend(),
369  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
370  return s1->getQuality() < s2->getQuality();
371  });
372  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
373  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
374  << ", quality " << (*bestInactiveSource)->getQuality();
375  }
376  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
377  << ", quality " << (*worstActiveSource)->getQuality();
378  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
379  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
380  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
381  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
382  auto oldSources = activeSources;
383  activeSources.push_back(*bestInactiveSource);
384  reportSiteChange(oldSources, activeSources);
385  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
386  if (it->get() == bestInactiveSource->get()) {
387  inactiveSources.erase(it);
388  break;
389  }
390  } else
391  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
392  (*worstActiveSource)->getQuality() >
393  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
394  edm::LogVerbatim("XrdAdaptorInternal")
395  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
396  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
397  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
398  (*worstActiveSource)->setLastDowngrade(now);
399  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
400  if (it->get() == bestInactiveSource->get()) {
401  inactiveSources.erase(it);
402  break;
403  }
404  inactiveSources.emplace_back(std::move(*worstActiveSource));
405  auto oldSources = activeSources;
406  activeSources.erase(worstActiveSource);
407  activeSources.emplace_back(std::move(*bestInactiveSource));
408  reportSiteChange(oldSources, activeSources);
409  eligibleInactiveSources.clear();
410  for (const auto &source : inactiveSources)
411  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
412  eligibleInactiveSources.push_back(source);
413  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
414  eligibleInactiveSources.end(),
415  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
416  return s1->getQuality() < s2->getQuality();
417  });
418  worstActiveSource = std::max_element(activeSources.begin(),
419  activeSources.end(),
420  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
421  return s1->getQuality() < s2->getQuality();
422  });
423  }
424  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
425  float r = m_distribution(m_generator);
427  findNewSource = true;
428  }
429  }
430  }
431  if (findNewSource) {
432  m_open_handler->open();
434  }
435 
436  // Only aggressively look for new sources if we don't have two.
437  if (activeSources.size() == 2) {
439  } else {
441  }
443 }

References CalibrationSummaryClient_cfi::activeSources, compareSources(), m_distribution, m_generator, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, eostools::move(), fileCollector::now, alignCSCRings::r, reportSiteChange(), indexGen::s2, source, timeDiffMS(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

◆ compareSources()

bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
) const
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 311 of file XrdRequestManager.cc.

315  {
316  if (activeSources.size() < std::max(a, b) + 1) {
317  return false;
318  }
319 
320  bool findNewSource = false;
321  if ((activeSources[a]->getQuality() > 5130) ||
322  ((activeSources[a]->getQuality() > 260) &&
323  (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
324  edm::LogVerbatim("XrdAdaptorInternal")
325  << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
326  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
327  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
328  findNewSource = true;
329  }
330  activeSources[a]->setLastDowngrade(now);
331  inactiveSources.emplace_back(activeSources[a]);
332  auto oldSources = activeSources;
333  activeSources.erase(activeSources.begin() + a);
334  reportSiteChange(oldSources, activeSources);
335  }
336  return findNewSource;
337 }

References a, CalibrationSummaryClient_cfi::activeSources, b, SiStripPI::max, fileCollector::now, and reportSiteChange().

Referenced by checkSources(), and checkSourcesImpl().

◆ getActiveFile()

std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  ) const

Return a pointer to an active file. Useful for metadata operations.

Definition at line 445 of file XrdRequestManager.cc.

445  {
446  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
447  if (m_activeSources.empty()) {
449  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
450  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
451  ex.addContext("In XrdAdaptor::RequestManager::handle()");
452  addConnections(ex);
453  throw ex;
454  }
455  return m_activeSources[0]->getFileHandle();
456 }

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_perms, and m_source_mutex.

◆ getActiveSourceNames()

void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 458 of file XrdRequestManager.cc.

458  {
459  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
460  sources.reserve(m_activeSources.size());
461  for (auto const &source : m_activeSources) {
462  sources.push_back(source->ID());
463  }
464 }

References m_activeSources, m_source_mutex, source, and CalibrationSummaryClient_cfi::sources.

◆ getDisabledSourceNames()

void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 474 of file XrdRequestManager.cc.

474  {
475  sources.reserve(m_disabledSourceStrings.size());
476  for (auto const &source : m_disabledSourceStrings) {
477  sources.push_back(source);
478  }
479 }

References m_disabledSourceStrings, source, and CalibrationSummaryClient_cfi::sources.

Referenced by addConnections().

◆ getFilename()

const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 101 of file XrdRequestManager.h.

101 { return m_name; }

References m_name.

◆ getInstance()

static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 110 of file XrdRequestManager.h.

112  {
113  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
114  instance->initialize(instance);
115  return instance;
116  }

References corrVsCorr::filename, HLT_2018_cff::flags, instance, and RequestManager().

◆ getPrettyActiveSourceNames()

void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources) const

Definition at line 466 of file XrdRequestManager.cc.

466  {
467  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
468  sources.reserve(m_activeSources.size());
469  for (auto const &source : m_activeSources) {
470  sources.push_back(source->PrettyID());
471  }
472 }

References m_activeSources, m_source_mutex, source, and CalibrationSummaryClient_cfi::sources.

Referenced by addConnections().

◆ handle() [1/3]

std::future< IOSize > XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer >>  iolist)

Definition at line 612 of file XrdRequestManager.cc.

612  {
613  //Use a copy of m_activeSources and m_inactiveSources throughout this function
614  // in order to avoid holding the lock a long time and causing a deadlock.
615  // When the function is over we will update the values of the containers
616  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
617  {
618  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
620  inactiveSources = m_inactiveSources;
621  }
622  //Make sure we update changes when we leave the function
623  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
624  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
626  m_inactiveSources = std::move(inactiveSources);
627  });
628 
630 
631  timespec now;
633 
634  edm::CPUTimer timer;
635  timer.start();
636 
637  if (activeSources.size() == 1) {
638  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
639  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
640  activeSources[0]->handle(c_ptr);
641  return c_ptr->get_future();
642  }
643  // Make sure active
644  else if (activeSources.empty()) {
646  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
647  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
648  ex.addContext("In XrdAdaptor::RequestManager::handle()");
649  addConnections(ex);
650  throw ex;
651  }
652 
653  assert(iolist.get());
654  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
655  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
656  splitClientRequest(*iolist, *req1, *req2, activeSources);
657 
658  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
659  // CheckSources may have removed a source
660  if (activeSources.size() == 1) {
661  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
662  activeSources[0]->handle(c_ptr);
663  return c_ptr->get_future();
664  }
665 
666  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
667  std::future<IOSize> future1, future2;
668  if (!req1->empty()) {
669  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
670  activeSources[0]->handle(c_ptr1);
671  future1 = c_ptr1->get_future();
672  }
673  if (!req2->empty()) {
674  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
675  activeSources[1]->handle(c_ptr2);
676  future2 = c_ptr2->get_future();
677  }
678  if (!req1->empty() && !req2->empty()) {
679  std::future<IOSize> task = std::async(
680  std::launch::deferred,
681  [](std::future<IOSize> a, std::future<IOSize> b) {
682  // Wait until *both* results are available. This is essential
683  // as the callback may try referencing the RequestManager. If one
684  // throws an exception (causing the RequestManager to be destroyed by
685  // XrdFile) and the other has a failure, then the recovery code will
686  // reference the destroyed RequestManager.
687  //
688  // Unlike other places where we use shared/weak ptrs to maintain object
689  // lifetime and destruction asynchronously, we *cannot* destroy the request
690  // asynchronously as it is associated with a ROOT buffer. We must wait until we
691  // are guaranteed that XrdCl will not write into the ROOT buffer before we
692  // can return.
693  b.wait();
694  a.wait();
695  return b.get() + a.get();
696  },
697  std::move(future1),
698  std::move(future2));
699  timer.stop();
700  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
701  return task;
702  } else if (!req1->empty()) {
703  return future1;
704  } else if (!req2->empty()) {
705  return future2;
706  } else { // Degenerate case - no bytes to read.
707  std::promise<IOSize> p;
708  p.set_value(0);
709  return p.get_future();
710  }
711 }

References a, CalibrationSummaryClient_cfi::activeSources, cms::Exception::addContext(), cms::cuda::assert(), b, TauDecayModes::dec, edm::errors::FileReadError, GET_CLOCK_MONOTONIC, eostools::move(), fileCollector::now, AlCaHLTBitMon_ParallelJobs::p, edm::CPUTimer::start(), edm::CPUTimer::stop(), and TrackValidation_cff::task.

◆ handle() [2/3]

std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 520 of file XrdRequestManager.cc.

520  {
521  assert(c_ptr.get());
522  timespec now;
524  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
525  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
526  {
527  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
529  inactiveSources = m_inactiveSources;
530  }
531  {
532  //make sure we update values before calling pickSingelSource
533  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
534  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
536  m_inactiveSources = std::move(inactiveSources);
537  });
538 
539  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
540  }
541 
542  std::shared_ptr<Source> source = pickSingleSource();
543  source->handle(c_ptr);
544  return c_ptr->get_future();
545 }

References CalibrationSummaryClient_cfi::activeSources, cms::cuda::assert(), checkSources(), GET_CLOCK_MONOTONIC, m_activeSources, m_inactiveSources, m_source_mutex, eostools::move(), fileCollector::now, pickSingleSource(), and source.

◆ handle() [3/3]

std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 54 of file XrdRequestManager.h.

54  {
55  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
56  return handle(c_ptr);
57  }

References findQualityFiles::size.

◆ handleOpen()

void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 574 of file XrdRequestManager.cc.

574  {
575  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
576  if (status.IsOK()) {
577  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
578  for (const auto &s : m_activeSources) {
579  if (source->ID() == s->ID()) {
580  edm::LogVerbatim("XrdAdaptorInternal")
581  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
582  unsigned returned_count = ++m_excluded_active_count;
584  if (returned_count >= 3) {
586  }
587  return;
588  }
589  }
590  for (const auto &s : m_inactiveSources) {
591  if (source->ID() == s->ID()) {
592  edm::LogVerbatim("XrdAdaptorInternal")
593  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
595  return;
596  }
597  }
598  if (m_activeSources.size() < 2) {
599  auto oldSources = m_activeSources;
600  m_activeSources.push_back(source);
601  reportSiteChange(oldSources, m_activeSources);
603  } else {
604  m_inactiveSources.push_back(source);
605  }
606  } else { // File-open failure - wait at least 120s before next attempt.
607  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
609  }
610 }

References alignCSCRings::s, source, mps_update::status, XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

◆ initialize()

void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 118 of file XrdRequestManager.cc.

118  {
120 
121  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
122  if (env) {
123  env->GetInt("StreamErrorWindow", m_timeout);
124  }
125 
126  std::string orig_site;
127  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos)) {
128  std::string hostname;
129  if (Source::getHostname(orig_site, hostname)) {
130  Source::getDomain(hostname, orig_site);
131  }
132  }
133 
134  std::unique_ptr<XrdCl::File> file;
136  bool validFile = false;
137  const int retries = 5;
138  std::string excludeString;
139  for (int idx = 0; idx < retries; idx++) {
140  file.reset(new XrdCl::File());
141  auto opaque = prepareOpaqueString();
142  std::string new_filename =
143  m_name + (!opaque.empty() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
144  SyncHostResponseHandler handler;
145  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
146  if (!openStatus
147  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
148  // server and it was marked bad - xrootd couldn't even queue up the request internally!
149  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
150  // redirector being down or authentication failures.
151  ex.clearMessage();
152  ex.clearContext();
153  ex.clearAdditionalInfo();
154  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
155  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
156  << ", code=" << openStatus.code << ")";
157  ex.addContext("Calling XrdFile::open()");
158  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
159  throw ex;
160  }
161  handler.WaitForResponse();
162  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
163  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
164  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
165  assert(status);
166  if (status->IsOK()) {
167  validFile = true;
168  break;
169  } else {
170  ex.clearMessage();
171  ex.clearContext();
172  ex.clearAdditionalInfo();
173  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
174  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
175  << ", code=" << status->code << ")";
176  ex.addContext("Calling XrdFile::open()");
177  addConnections(ex);
178  std::string dataServer, lastUrl;
179  file->GetProperty("DataServer", dataServer);
180  file->GetProperty("LastURL", lastUrl);
181  if (!dataServer.empty()) {
182  ex.addAdditionalInfo("Problematic data server: " + dataServer);
183  }
184  if (!lastUrl.empty()) {
185  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
186  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
187  }
188  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
189  m_disabledSourceStrings.end()) {
190  ex << ". No additional data servers were found.";
191  throw ex;
192  }
193  if (!dataServer.empty()) {
194  m_disabledSourceStrings.insert(dataServer);
195  m_disabledExcludeStrings.insert(excludeString);
196  }
197  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
198  if (lastUrl == new_filename) {
199  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
200  throw ex;
201  }
202  }
203  }
204  if (!validFile) {
205  throw ex;
206  }
208 
209  timespec ts;
211 
212  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
213  {
214  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
215  auto oldList = m_activeSources;
216  m_activeSources.push_back(source);
218  }
221 
222  m_lastSourceCheck = ts;
223  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
225 }

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), cms::cuda::assert(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, FrontierConditions_GlobalTag_cff::file, VtxSmearedBeamProfile_cfi::File, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), training_settings::idx, m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, eostools::move(), pileupDistInMC::oldList, prepareOpaqueString(), queueUpdateCurrentServer(), reportSiteChange(), SendMonitoringInfo(), source, mps_update::status, AlCaHLTBitMon_QueryRunRegistry::string, updateCurrentServer(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

◆ pickSingleSource()

std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 494 of file XrdRequestManager.cc.

494  {
495  std::shared_ptr<Source> source = nullptr;
496  {
497  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
498  if (m_activeSources.size() == 2) {
500  source = m_activeSources[0];
502  } else {
503  source = m_activeSources[1];
505  }
506  } else if (m_activeSources.empty()) {
508  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
509  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
510  ex.addContext("In XrdAdaptor::RequestManager::handle()");
511  addConnections(ex);
512  throw ex;
513  } else {
514  source = m_activeSources[0];
515  }
516  }
517  return source;
518 }

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_nextInitialSourceToggle, m_perms, m_source_mutex, and source.

Referenced by handle().

◆ prepareOpaqueString()

std::string RequestManager::prepareOpaqueString ( ) const
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 547 of file XrdRequestManager.cc.

547  {
548  std::stringstream ss;
549  ss << "tried=";
550  size_t count = 0;
551  {
552  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
553 
554  for (const auto &it : m_activeSources) {
555  count++;
556  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
557  }
558  for (const auto &it : m_inactiveSources) {
559  count++;
560  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
561  }
562  }
563  for (const auto &it : m_disabledExcludeStrings) {
564  count++;
565  ss << it.substr(0, it.find(":")) << ",";
566  }
567  if (count) {
568  std::string tmp_str = ss.str();
569  return tmp_str.substr(0, tmp_str.size() - 1);
570  }
571  return "";
572 }

References KineDebug3::count(), m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, contentValuesCheck::ss, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

◆ queueUpdateCurrentServer()

void RequestManager::queueUpdateCurrentServer ( const std::string &  id)
private

Definition at line 251 of file XrdRequestManager.cc.

251  {
252  auto hostname = std::make_unique<std::string>(id);
253  if (Source::getHostname(id, *hostname)) {
254  std::string *null_hostname = nullptr;
255  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
256  hostname.release();
257  }
258  }
259 }

References XrdAdaptor::Source::getHostname(), m_serverToAdvertise, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize().

◆ reportSiteChange()

void RequestManager::reportSiteChange ( std::vector< std::shared_ptr< Source >> const &  iOld,
std::vector< std::shared_ptr< Source >> const &  iNew,
std::string  orig_site = std::string{} 
) const
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 278 of file XrdRequestManager.cc.

280  {
281  auto siteList = formatSites(iNew);
282  if (!orig_site.empty() && (orig_site != siteList)) {
283  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
284  } else {
285  auto oldSites = formatSites(iOld);
286  if (orig_site.empty() && (siteList != oldSites)) {
287  if (!oldSites.empty())
288  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
289  }
290  }
291 }

Referenced by checkSourcesImpl(), compareSources(), initialize(), and requestFailure().

◆ requestFailure()

void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 713 of file XrdRequestManager.cc.

713  {
714  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
715 
716  // Fail early for invalid responses - XrdFile has a separate path for handling this.
717  if (c_status.code == XrdCl::errInvalidResponse) {
718  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
720  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
721  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
722  << ") => Invalid ReadV response from server";
723  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
724  addConnections(ex);
725  throw ex;
726  }
727  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
728 
729  // Note that we do not delete the Source itself. That is because this
730  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
731  // In such a case, if you close a file in the handler, it will deadlock
732  m_disabledSourceStrings.insert(source_ptr->ID());
733  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
734  m_disabledSources.insert(source_ptr);
735 
736  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
737  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
738  auto oldSources = m_activeSources;
739  m_activeSources.erase(m_activeSources.begin());
740  reportSiteChange(oldSources, m_activeSources);
741  } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
742  auto oldSources = m_activeSources;
743  m_activeSources.erase(m_activeSources.begin() + 1);
744  reportSiteChange(oldSources, m_activeSources);
745  }
746  std::shared_ptr<Source> new_source;
747  if (m_activeSources.empty()) {
748  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
749  timespec now;
752  // Note we only wait for 180 seconds here. This is because we've already failed
753  // once and the likelihood the program has some inconsistent state is decent.
754  // We'd much rather fail hard than deadlock!
755  sentry.unlock();
756  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
759  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
760  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
761  << ") => timeout when waiting for file open";
762  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
763  addConnections(ex);
764  throw ex;
765  } else {
766  try {
767  new_source = future.get();
768  } catch (edm::Exception &ex) {
769  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
770  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
771  throw;
772  }
773  }
774 
775  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
776  m_disabledSourceStrings.end()) {
777  // The server gave us back a data node we requested excluded. Fatal!
779  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
780  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
781  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
782  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
783  addConnections(ex);
784  throw ex;
785  }
786  sentry.lock();
787 
788  auto oldSources = m_activeSources;
789  m_activeSources.push_back(new_source);
790  reportSiteChange(oldSources, m_activeSources);
791  } else {
792  new_source = m_activeSources[0];
793  }
794  new_source->handle(c_ptr);
795 }

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), get, GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, fileCollector::now, reportSiteChange(), seconds(), mps_update::status, and mps_check::timeout.

◆ splitClientRequest()

void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2,
std::vector< std::shared_ptr< Source >> const &  activeSources 
) const
private

Given a client request, split it into two requests lists.

Definition at line 889 of file XrdRequestManager.cc.

892  {
893  if (iolist.empty())
894  return;
895  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
896  req1.reserve(iolist.size() / 2 + 1);
897  req2.reserve(iolist.size() / 2 + 1);
898  size_t front = 0;
899 
900  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
901  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
902  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
903  IOSize chunk1, chunk2;
904  // Make sure the chunk size is at least 1024; little point to reads less than that size.
905  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
906  static_cast<IOSize>(1024));
907  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
908  static_cast<IOSize>(1024));
909 
910  IOSize size_orig = 0;
911  for (const auto &it : iolist)
912  size_orig += it.size();
913 
914  while (tmp_iolist.size() - front > 0) {
915  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
916  (req2.size() >=
917  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
918  // are passed to the request manager. However, because we have a max chunk size, we increase
919  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
920  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
921  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
923  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
924  << ", permissions=0" << std::oct << m_perms << std::dec
925  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
926  "reported to CMSSW developers.";
927  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
928  addConnections(ex);
929  std::stringstream ss;
930  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
931  ex.addAdditionalInfo(ss.str());
932  std::stringstream ss2;
933  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
934  ex.addAdditionalInfo(ss2.str());
935  throw ex;
936  }
937  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
938  consumeChunkFront(front, tmp_iolist, req1, chunk1);
939  }
940  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
941  consumeChunkBack(front, tmp_iolist, req2, chunk2);
942  }
943  }
944  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
945  return left.offset() < right.offset();
946  });
947  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
948  return left.offset() < right.offset();
949  });
950 
951  IOSize size1 = validateList(req1);
952  IOSize size2 = validateList(req2);
953 
954  assert(size_orig == size1 + size2);
955 
956  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
957  << " bytes) split into requests size " << req1.size() << " (" << size1
958  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
959 }

References CalibrationSummaryClient_cfi::activeSources, cms::Exception::addAdditionalInfo(), cms::Exception::addContext(), cms::cuda::assert(), consumeChunkBack(), consumeChunkFront(), TauDecayModes::dec, edm::errors::FileReadError, SiStripPI::max, IOPosBuffer::offset(), q1, q2, contentValuesCheck::ss, validateList(), XRD_ADAPTOR_CHUNK_THRESHOLD, and XRD_CL_MAX_CHUNK.

◆ updateCurrentServer()

void RequestManager::updateCurrentServer ( )
inlineprivate

Update the StatisticsSenderService, if necessary, with the current server.

Update the StatisticsSenderService with the current server info.

As this accesses the edm::Service infrastructure, this MUST be called from an edm-managed thread. It CANNOT be called from an Xrootd-managed thread.

Definition at line 234 of file XrdRequestManager.cc.

234  {
235  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
236  // a pending update. *However*, since we call this for every read, we'll get it
237  // eventually.
238  if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
239  return;
240  }
241  std::string *hostname_ptr;
242  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
243  std::unique_ptr<std::string> hostname(hostname_ptr);
245  if (statsService.isAvailable()) {
246  statsService->setCurrentServer(*hostname_ptr);
247  }
248  }
249 }

References edm::Service< T >::isAvailable(), LIKELY, m_serverToAdvertise, edm::storage::StatisticsSenderService::setCurrentServer(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize().

Member Data Documentation

◆ m_activeSources

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 206 of file XrdRequestManager.h.

Referenced by getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), handle(), initialize(), pickSingleSource(), prepareOpaqueString(), and requestFailure().

◆ m_disabledExcludeStrings

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 210 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

◆ m_disabledSources

tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 211 of file XrdRequestManager.h.

Referenced by requestFailure().

◆ m_disabledSourceStrings

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 209 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

◆ m_distribution

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 231 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

◆ m_excluded_active_count

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 233 of file XrdRequestManager.h.

◆ m_flags

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private

◆ m_generator

std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 230 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

◆ m_inactiveSources

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 207 of file XrdRequestManager.h.

Referenced by handle(), and prepareOpaqueString().

◆ m_lastSourceCheck

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 217 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

◆ m_name

const std::string XrdAdaptor::RequestManager::m_name
private

◆ m_nextActiveSourceCheck

timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 222 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), and initialize().

◆ m_nextInitialSourceToggle

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 220 of file XrdRequestManager.h.

Referenced by pickSingleSource().

◆ m_open_handler

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 289 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

◆ m_perms

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private

◆ m_serverToAdvertise

std::atomic<std::string *> XrdAdaptor::RequestManager::m_serverToAdvertise
private

Definition at line 215 of file XrdRequestManager.h.

Referenced by queueUpdateCurrentServer(), and updateCurrentServer().

◆ m_source_mutex

std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
mutableprivate

◆ m_timeout

int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 218 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

◆ searchMode

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 223 of file XrdRequestManager.h.

◆ XRD_DEFAULT_TIMEOUT

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3 * 60
static

Definition at line 47 of file XrdRequestManager.h.

XRD_ADAPTOR_SHORT_OPEN_DELAY
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
Definition: XrdRequestManager.cc:25
XrdAdaptor::RequestManager::m_flags
XrdCl::OpenFlags::Flags m_flags
Definition: XrdRequestManager.h:226
XrdAdaptor::Source::determineHostExcludeString
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:283
XrdAdaptor::RequestManager::queueUpdateCurrentServer
void queueUpdateCurrentServer(const std::string &)
Definition: XrdRequestManager.cc:251
XrdAdaptor::RequestManager::m_generator
std::mt19937 m_generator
Definition: XrdRequestManager.h:230
XrdAdaptor::RequestManager::m_nextActiveSourceCheck
timespec m_nextActiveSourceCheck
Definition: XrdRequestManager.h:222
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
XrdAdaptor::RequestManager::addConnections
void addConnections(cms::Exception &) const
Definition: XrdRequestManager.cc:481
XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
Definition: XrdRequestManager.cc:35
XrdAdaptor::RequestManager::m_disabledExcludeStrings
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
Definition: XrdRequestManager.h:210
mps_update.status
status
Definition: mps_update.py:69
XrdAdaptor::RequestManager::m_inactiveSources
std::vector< std::shared_ptr< Source > > m_inactiveSources
Definition: XrdRequestManager.h:207
XrdAdaptor::RequestManager::m_open_handler
std::shared_ptr< OpenHandler > m_open_handler
Definition: XrdRequestManager.h:289
XrdAdaptor::RequestManager::m_excluded_active_count
std::atomic< unsigned > m_excluded_active_count
Definition: XrdRequestManager.h:233
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
XrdAdaptor::RequestManager::getDisabledSourceNames
void getDisabledSourceNames(std::vector< std::string > &sources) const
Definition: XrdRequestManager.cc:474
edm::CPUTimer
Definition: CPUTimer.h:37
XrdAdaptor::RequestManager::m_activeSources
std::vector< std::shared_ptr< Source > > m_activeSources
Definition: XrdRequestManager.h:206
XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_CHUNK_THRESHOLD
Definition: XrdRequestManager.cc:38
XrdAdaptor::RequestManager::m_perms
XrdCl::Access::Mode m_perms
Definition: XrdRequestManager.h:227
pileupDistInMC.oldList
oldList
Definition: pileupDistInMC.py:39
cms::cuda::assert
assert(be >=bs)
CalibrationSummaryClient_cfi.sources
sources
Definition: CalibrationSummaryClient_cfi.py:23
XrdAdaptor::RequestManager::m_disabledSources
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
Definition: XrdRequestManager.h:211
indexGen.s2
s2
Definition: indexGen.py:107
XrdAdaptor::RequestManager::m_serverToAdvertise
std::atomic< std::string * > m_serverToAdvertise
Definition: XrdRequestManager.h:215
spr::find
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
training_settings.idx
idx
Definition: training_settings.py:16
edm::Service::isAvailable
bool isAvailable() const
Definition: Service.h:40
XrdAdaptor::RequestManager::updateCurrentServer
void updateCurrentServer()
Definition: XrdRequestManager.cc:234
edm::errors::FileOpenError
Definition: EDMException.h:49
GET_CLOCK_MONOTONIC
#define GET_CLOCK_MONOTONIC(ts)
Definition: XrdRequestManager.cc:54
edm::Exception
Definition: EDMException.h:77
contentValuesCheck.ss
ss
Definition: contentValuesCheck.py:33
fileCollector.now
now
Definition: fileCollector.py:207
consumeChunkBack
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
Definition: XrdRequestManager.cc:836
XrdAdaptor::RequestManager::m_lastSourceCheck
timespec m_lastSourceCheck
Definition: XrdRequestManager.h:217
XrdAdaptor::RequestManager::getPrettyActiveSourceNames
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
Definition: XrdRequestManager.cc:466
alignCSCRings.s
s
Definition: alignCSCRings.py:92
XrdAdaptor::RequestManager::pickSingleSource
std::shared_ptr< Source > pickSingleSource()
Definition: XrdRequestManager.cc:494
XrdAdaptor::RequestManager::compareSources
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
Definition: XrdRequestManager.cc:311
XrdAdaptor::RequestManager::reportSiteChange
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
Definition: XrdRequestManager.cc:278
seconds
double seconds()
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:252
source
static const std::string source
Definition: EdmProvDump.cc:47
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
XrdAdaptor::Source::getDomain
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:241
XRD_ADAPTOR_LONG_OPEN_DELAY
#define XRD_ADAPTOR_LONG_OPEN_DELAY
Definition: XrdRequestManager.cc:34
edm::CPUTimer::start
void start()
Definition: CPUTimer.cc:68
b
double b
Definition: hdecay.h:118
q2
double q2[4]
Definition: TauolaWrapper.h:88
validateList
static IOSize validateList(const std::vector< IOPosBuffer > req)
Definition: XrdRequestManager.cc:875
XRD_CL_MAX_CHUNK
#define XRD_CL_MAX_CHUNK
Definition: XrdRequestManager.cc:23
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
XrdAdaptor::RequestManager::m_name
const std::string m_name
Definition: XrdRequestManager.h:225
web.browse_db.env
env
Definition: browse_db.py:18
edm::LogWarning
Definition: MessageLogger.h:141
XrdAdaptor::RequestManager::m_nextInitialSourceToggle
bool m_nextInitialSourceToggle
Definition: XrdRequestManager.h:220
q1
double q1[4]
Definition: TauolaWrapper.h:87
a
double a
Definition: hdecay.h:119
XrdAdaptor::RequestManager::splitClientRequest
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
Definition: XrdRequestManager.cc:889
SiStripPI::max
Definition: SiStripPayloadInspectorHelper.h:169
cms::Exception::addAdditionalInfo
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
XrdAdaptor::RequestManager::handle
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
Definition: XrdRequestManager.h:54
mps_check.timeout
int timeout
Definition: mps_check.py:53
KineDebug3::count
void count()
Definition: KinematicConstrainedVertexUpdatorT.h:21
XrdAdaptor::Source::getXrootdSiteFromURL
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:321
XrdAdaptor::RequestManager::m_distribution
std::uniform_real_distribution< float > m_distribution
Definition: XrdRequestManager.h:231
edm::Service
Definition: Service.h:30
XrdAdaptor::RequestManager::m_source_mutex
std::recursive_mutex m_source_mutex
Definition: XrdRequestManager.h:228
FrontierConditions_GlobalTag_cff.file
file
Definition: FrontierConditions_GlobalTag_cff.py:13
CalibrationSummaryClient_cfi.activeSources
activeSources
Definition: CalibrationSummaryClient_cfi.py:11
edm::LogVerbatim
Definition: MessageLogger.h:297
edm::CPUTimer::stop
Times stop()
Definition: CPUTimer.cc:87
XrdAdaptor::RequestManager::checkSources
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
Definition: XrdRequestManager.cc:293
get
#define get
XRD_ADAPTOR_OPEN_PROBE_PERCENT
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
Definition: XrdRequestManager.cc:33
instance
static PFTauRenderPlugin instance
Definition: PFTauRenderPlugin.cc:70
alignCSCRings.r
r
Definition: alignCSCRings.py:93
VtxSmearedBeamProfile_cfi.File
File
Definition: VtxSmearedBeamProfile_cfi.py:30
eostools.move
def move(src, dest)
Definition: eostools.py:511
XrdAdaptor::RequestManager::OpenHandler::getInstance
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
Definition: XrdRequestManager.h:237
IOPosBuffer::offset
IOOffset offset(void) const
Definition: IOPosBuffer.h:39
XrdAdaptor::RequestManager::RequestManager
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
Definition: XrdRequestManager.cc:108
LIKELY
#define LIKELY(x)
Definition: Likely.h:20
SendMonitoringInfo
static void SendMonitoringInfo(XrdCl::File &file)
Definition: XrdRequestManager.cc:85
XrdAdaptor::RequestManager::prepareOpaqueString
std::string prepareOpaqueString() const
Definition: XrdRequestManager.cc:547
IOPosBuffer
Definition: IOPosBuffer.h:7
consumeChunkFront
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
Definition: XrdRequestManager.cc:797
XrdAdaptor::XrootdException
Definition: XrdRequestManager.h:32
XrdAdaptor::RequestManager::m_disabledSourceStrings
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
Definition: XrdRequestManager.h:209
XrdAdaptor::RequestManager::m_timeout
int m_timeout
Definition: XrdRequestManager.h:218
XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT
static const unsigned int XRD_DEFAULT_TIMEOUT
Definition: XrdRequestManager.h:47
IOSize
size_t IOSize
Definition: IOTypes.h:14
edm::errors::FileReadError
Definition: EDMException.h:50
XrdAdaptor::Source::getHostname
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:214
HLT_2018_cff.flags
flags
Definition: HLT_2018_cff.py:11758
TauDecayModes.dec
dec
Definition: TauDecayModes.py:143
edm::storage::StatisticsSenderService::setCurrentServer
void setCurrentServer(const std::string &servername)
Definition: StatisticsSenderService.cc:151
XrdAdaptor::RequestManager::checkSourcesImpl
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
Definition: XrdRequestManager.cc:339
timeDiffMS
long long timeDiffMS(const timespec &a, const timespec &b)
Definition: XrdRequestManager.cc:59
XrdAdaptor::ClientRequest
Definition: XrdRequest.h:23
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443