CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Classes

class  OpenHandler
 

Public Types

using IOOffset = edm::storage::IOOffset
 
using IOPosBuffer = edm::storage::IOPosBuffer
 
using IOSize = edm::storage::IOSize
 

Public Member Functions

void addConnections (cms::Exception &) const
 
std::shared_ptr< XrdCl::File > getActiveFile () const
 
void getActiveSourceNames (std::vector< std::string > &sources) const
 
void getDisabledSourceNames (std::vector< std::string > &sources) const
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources) const
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer >> iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
RequestManageroperator= (const RequestManager &)=delete
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
 RequestManager (const RequestManager &)=delete
 
virtual ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr< RequestManagergetInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
void checkSourcesImpl (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
bool compareSources (const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString () const
 
void queueUpdateCurrentServer (const std::string &)
 
void reportSiteChange (std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
 
void updateCurrentServer ()
 

Private Attributes

std::vector< std::shared_ptr< Source > > m_activeSources
 
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
 
oneapi::tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHashm_disabledSources
 
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
 
std::uniform_real_distribution< float > m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr< Source > > m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
int m_redirectLimitDelayScale
 
std::atomic< std::string * > m_serverToAdvertise
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 44 of file XrdRequestManager.h.

Member Typedef Documentation

◆ IOOffset

Definition at line 48 of file XrdRequestManager.h.

◆ IOPosBuffer

Definition at line 47 of file XrdRequestManager.h.

◆ IOSize

Definition at line 46 of file XrdRequestManager.h.

Constructor & Destructor Documentation

◆ RequestManager() [1/2]

XrdAdaptor::RequestManager::RequestManager ( const RequestManager )
delete

Referenced by getInstance().

◆ ~RequestManager()

virtual XrdAdaptor::RequestManager::~RequestManager ( )
virtualdefault

◆ RequestManager() [2/2]

RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 169 of file XrdRequestManager.cc.

170  : m_serverToAdvertise(nullptr),
174  m_name(filename),
175  m_flags(flags),
176  m_perms(perms),
177  m_distribution(0, 100),
std::uniform_real_distribution< float > m_distribution
static const unsigned int XRD_DEFAULT_TIMEOUT
std::atomic< std::string * > m_serverToAdvertise
XrdCl::OpenFlags::Flags m_flags
XrdCl::Access::Mode m_perms
std::atomic< unsigned > m_excluded_active_count

Member Function Documentation

◆ addConnections()

void RequestManager::addConnections ( cms::Exception ex) const

Add the list of active connections to the exception extra info.

Definition at line 573 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), source, and CalibrationSummaryClient_cfi::sources.

Referenced by getActiveFile(), initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), pickSingleSource(), and requestFailure().

573  {
574  std::vector<std::string> sources;
576  for (auto const &source : sources) {
577  ex.addAdditionalInfo("Active source: " + source);
578  }
579  sources.clear();
581  for (auto const &source : sources) {
582  ex.addAdditionalInfo("Disabled source: " + source);
583  }
584 }
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:173
void getDisabledSourceNames(std::vector< std::string > &sources) const
static std::string const source
Definition: EdmProvDump.cc:49
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const

◆ broadcastRequest()

void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

◆ checkSources()

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 371 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, submitPVValidationJobs::now, and timeDiffMS().

Referenced by handle().

374  {
375  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
376  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
377  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
378  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
379  { // Be more aggressive about getting rid of very bad sources.
380  compareSources(now, 0, 1, activeSources, inactiveSources);
381  compareSources(now, 1, 0, activeSources, inactiveSources);
382  }
384  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
385  }
386  }
387 }
Log< level::Info, true > LogVerbatim
long long timeDiffMS(const timespec &a, const timespec &b)
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const

◆ checkSourcesImpl()

void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Definition at line 429 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, compareSources(), m_distribution, m_generator, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, eostools::move(), submitPVValidationJobs::now, reportSiteChange(), source, timeDiffMS(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

432  {
433  bool findNewSource = false;
434  if (activeSources.size() <= 1) {
435  findNewSource = true;
436  edm::LogInfo("XrdAdaptorLvl3")
437  << "Looking for an additional source because the number of active sources is smaller than 2";
438  } else if (activeSources.size() > 1) {
439  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
440  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
441  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
442  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
443 
444  // NOTE: We could probably replace the copy with a better sort function.
445  // However, there are typically very few sources and the correctness is more obvious right now.
446  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
447  eligibleInactiveSources.reserve(inactiveSources.size());
448  for (const auto &source : inactiveSources) {
449  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
450  eligibleInactiveSources.push_back(source);
451  }
452  }
453  auto bestInactiveSource =
454  std::min_element(eligibleInactiveSources.begin(),
455  eligibleInactiveSources.end(),
456  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
457  return s1->getQuality() < s2->getQuality();
458  });
459  auto worstActiveSource = std::max_element(activeSources.cbegin(),
460  activeSources.cend(),
461  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
462  return s1->getQuality() < s2->getQuality();
463  });
464  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
465  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
466  << ", quality " << (*bestInactiveSource)->getQuality();
467  }
468  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
469  << ", quality " << (*worstActiveSource)->getQuality();
470  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
471  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
472  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
473  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
474  auto oldSources = activeSources;
475  activeSources.push_back(*bestInactiveSource);
476  reportSiteChange(oldSources, activeSources);
477  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
478  if (it->get() == bestInactiveSource->get()) {
479  inactiveSources.erase(it);
480  break;
481  }
482  } else
483  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
484  (*worstActiveSource)->getQuality() >
485  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
486  edm::LogVerbatim("XrdAdaptorInternal")
487  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
488  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
489  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
490  (*worstActiveSource)->setLastDowngrade(now);
491  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
492  if (it->get() == bestInactiveSource->get()) {
493  inactiveSources.erase(it);
494  break;
495  }
496  inactiveSources.emplace_back(*worstActiveSource);
497  auto oldSources = activeSources;
498  activeSources.erase(worstActiveSource);
499  activeSources.emplace_back(std::move(*bestInactiveSource));
500  reportSiteChange(oldSources, activeSources);
501  eligibleInactiveSources.clear();
502  for (const auto &source : inactiveSources)
503  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
504  eligibleInactiveSources.push_back(source);
505  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
506  eligibleInactiveSources.end(),
507  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
508  return s1->getQuality() < s2->getQuality();
509  });
510  worstActiveSource = std::max_element(activeSources.begin(),
511  activeSources.end(),
512  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
513  return s1->getQuality() < s2->getQuality();
514  });
515  }
516  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
517  float r = m_distribution(m_generator);
519  findNewSource = true;
520  }
521  }
522  }
523  if (findNewSource) {
524  m_open_handler->open();
526  }
527 
528  // Only aggressively look for new sources if we don't have two.
529  if (activeSources.size() == 2) {
531  } else {
533  }
535 }
Log< level::Info, true > LogVerbatim
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::uniform_real_distribution< float > m_distribution
long long timeDiffMS(const timespec &a, const timespec &b)
static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY
std::shared_ptr< OpenHandler > m_open_handler
Log< level::Info, false > LogInfo
static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY
static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
static std::string const source
Definition: EdmProvDump.cc:49
def move(src, dest)
Definition: eostools.py:511

◆ compareSources()

bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
) const
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 389 of file XrdRequestManager.cc.

References a, CalibrationSummaryClient_cfi::activeSources, b, XrdAdaptor::Source::getHostname(), SiStripPI::max, submitPVValidationJobs::now, reportSiteChange(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by checkSources(), and checkSourcesImpl().

393  {
394  if (activeSources.size() < std::max(a, b) + 1) {
395  return false;
396  }
397  unsigned quality_a = activeSources[a]->getQuality();
398  unsigned quality_b = activeSources[b]->getQuality();
399  bool findNewSource = false;
400  if ((quality_a > 5130) || ((quality_a > 260) && (quality_b * 4 < quality_a))) {
401  std::string hostname_a;
402  Source::getHostname(activeSources[a]->ID(), hostname_a);
403  if (quality_a > 5130) {
404  edm::LogWarning("XrdAdaptorLvl3") << "Deactivating " << hostname_a << " from active sources because the quality ("
405  << quality_a << ") is above 5130 and it is not the only active server";
406  }
407  if ((quality_a > 260) && (quality_b * 4 < quality_a)) {
408  std::string hostname_b;
409  Source::getHostname(activeSources[b]->ID(), hostname_b);
410  edm::LogWarning("XrdAdaptorLvl3") << "Deactivating " << hostname_a << " from active sources because its quality ("
411  << quality_a
412  << ") is higher than 260 and 4 times larger than the other active server "
413  << hostname_b << " (" << quality_b << ") ";
414  }
415  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << hostname_a << " from active sources due to poor quality ("
416  << quality_a << " vs " << quality_b << ")" << std::endl;
417  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
418  findNewSource = true;
419  }
420  activeSources[a]->setLastDowngrade(now);
421  inactiveSources.emplace_back(activeSources[a]);
422  auto oldSources = activeSources;
423  activeSources.erase(activeSources.begin() + a);
424  reportSiteChange(oldSources, activeSources);
425  }
426  return findNewSource;
427 }
Log< level::Info, true > LogVerbatim
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
uint32_t ID
Definition: Definitions.h:24
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
Log< level::Warning, false > LogWarning
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:221

◆ getActiveFile()

std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  ) const

Return a pointer to an active file. Useful for metadata operations.

Definition at line 537 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_perms, and m_source_mutex.

537  {
538  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
539  if (m_activeSources.empty()) {
541  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
542  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
543  ex.addContext("In XrdAdaptor::RequestManager::handle()");
544  addConnections(ex);
545  throw ex;
546  }
547  return m_activeSources[0]->getFileHandle();
548 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
std::recursive_mutex m_source_mutex

◆ getActiveSourceNames()

void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 550 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, source, and CalibrationSummaryClient_cfi::sources.

550  {
551  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
552  sources.reserve(m_activeSources.size());
553  for (auto const &source : m_activeSources) {
554  sources.push_back(source->ID());
555  }
556 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:49
std::recursive_mutex m_source_mutex

◆ getDisabledSourceNames()

void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 566 of file XrdRequestManager.cc.

References m_disabledSourceStrings, source, and CalibrationSummaryClient_cfi::sources.

Referenced by addConnections().

566  {
567  sources.reserve(m_disabledSourceStrings.size());
568  for (auto const &source : m_disabledSourceStrings) {
569  sources.push_back(source);
570  }
571 }
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static std::string const source
Definition: EdmProvDump.cc:49

◆ getFilename()

const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 107 of file XrdRequestManager.h.

References m_name.

107 { return m_name; }

◆ getInstance()

static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 116 of file XrdRequestManager.h.

References corrVsCorr::filename, HLT_2023v12_cff::flags, instance, and RequestManager().

118  {
119  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
120  instance->initialize(instance);
121  return instance;
122  }
static PFTauRenderPlugin instance
RequestManager(const RequestManager &)=delete

◆ getPrettyActiveSourceNames()

void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources) const

Definition at line 558 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, source, and CalibrationSummaryClient_cfi::sources.

Referenced by addConnections().

558  {
559  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
560  sources.reserve(m_activeSources.size());
561  for (auto const &source : m_activeSources) {
562  sources.push_back(source->PrettyID());
563  }
564 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:49
std::recursive_mutex m_source_mutex

◆ handle() [1/3]

std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 60 of file XrdRequestManager.h.

60  {
61  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
62  return handle(c_ptr);
63  }
size
Write out results.
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)

◆ handle() [2/3]

std::future< IOSize > XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer >>  iolist)

Definition at line 717 of file XrdRequestManager.cc.

References a, CalibrationSummaryClient_cfi::activeSources, cms::Exception::addContext(), cms::cuda::assert(), b, TauDecayModes::dec, edm::errors::FileReadError, GET_CLOCK_MONOTONIC, eostools::move(), submitPVValidationJobs::now, AlCaHLTBitMon_ParallelJobs::p, edm::CPUTimer::start(), edm::CPUTimer::stop(), and TrackValidation_cff::task.

717  {
718  //Use a copy of m_activeSources and m_inactiveSources throughout this function
719  // in order to avoid holding the lock a long time and causing a deadlock.
720  // When the function is over we will update the values of the containers
721  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
722  {
723  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
725  inactiveSources = m_inactiveSources;
726  }
727  //Make sure we update changes when we leave the function
728  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
729  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
731  m_inactiveSources = std::move(inactiveSources);
732  });
733 
735 
736  timespec now;
738 
739  edm::CPUTimer timer;
740  timer.start();
741 
742  if (activeSources.size() == 1) {
743  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
744  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
745  activeSources[0]->handle(c_ptr);
746  return c_ptr->get_future();
747  }
748  // Make sure active
749  else if (activeSources.empty()) {
751  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
752  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
753  ex.addContext("In XrdAdaptor::RequestManager::handle()");
754  addConnections(ex);
755  throw ex;
756  }
757 
758  assert(iolist.get());
759  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
760  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
761  splitClientRequest(*iolist, *req1, *req2, activeSources);
762 
763  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
764  // CheckSources may have removed a source
765  if (activeSources.size() == 1) {
766  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
767  activeSources[0]->handle(c_ptr);
768  return c_ptr->get_future();
769  }
770 
771  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
772  std::future<IOSize> future1, future2;
773  if (!req1->empty()) {
774  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
775  activeSources[0]->handle(c_ptr1);
776  future1 = c_ptr1->get_future();
777  }
778  if (!req2->empty()) {
779  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
780  activeSources[1]->handle(c_ptr2);
781  future2 = c_ptr2->get_future();
782  }
783  if (!req1->empty() && !req2->empty()) {
784  std::future<IOSize> task = std::async(
785  std::launch::deferred,
786  [](std::future<IOSize> a, std::future<IOSize> b) {
787  // Wait until *both* results are available. This is essential
788  // as the callback may try referencing the RequestManager. If one
789  // throws an exception (causing the RequestManager to be destroyed by
790  // XrdFile) and the other has a failure, then the recovery code will
791  // reference the destroyed RequestManager.
792  //
793  // Unlike other places where we use shared/weak ptrs to maintain object
794  // lifetime and destruction asynchronously, we *cannot* destroy the request
795  // asynchronously as it is associated with a ROOT buffer. We must wait until we
796  // are guaranteed that XrdCl will not write into the ROOT buffer before we
797  // can return.
798  b.wait();
799  a.wait();
800  return b.get() + a.get();
801  },
802  std::move(future1),
803  std::move(future2));
804  timer.stop();
805  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
806  return task;
807  } else if (!req1->empty()) {
808  return future1;
809  } else if (!req2->empty()) {
810  return future2;
811  } else { // Degenerate case - no bytes to read.
812  std::promise<IOSize> p;
813  p.set_value(0);
814  return p.get_future();
815  }
816 }
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
void start()
Definition: CPUTimer.cc:68
#define GET_CLOCK_MONOTONIC(ts)
assert(be >=bs)
std::vector< std::shared_ptr< Source > > m_inactiveSources
void addConnections(cms::Exception &) const
Times stop()
Definition: CPUTimer.cc:87
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)

◆ handle() [3/3]

std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 612 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, cms::cuda::assert(), checkSources(), GET_CLOCK_MONOTONIC, m_activeSources, m_inactiveSources, m_source_mutex, eostools::move(), submitPVValidationJobs::now, pickSingleSource(), and source.

612  {
613  assert(c_ptr.get());
614  timespec now;
616  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
617  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
618  {
619  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
621  inactiveSources = m_inactiveSources;
622  }
623  {
624  //make sure we update values before calling pickSingelSource
625  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
626  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
628  m_inactiveSources = std::move(inactiveSources);
629  });
630 
631  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
632  }
633 
634  std::shared_ptr<Source> source = pickSingleSource();
635  source->handle(c_ptr);
636  return c_ptr->get_future();
637 }
#define GET_CLOCK_MONOTONIC(ts)
assert(be >=bs)
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
std::shared_ptr< Source > pickSingleSource()
static std::string const source
Definition: EdmProvDump.cc:49
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)

◆ handleOpen()

void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 673 of file XrdRequestManager.cc.

References SiStripPI::min, alignCSCRings::s, source, mps_update::status, XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

673  {
674  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
675  if (status.IsOK()) {
676  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
678  for (const auto &s : m_activeSources) {
679  if (source->ID() == s->ID()) {
680  edm::LogVerbatim("XrdAdaptorInternal")
681  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
682  unsigned returned_count = ++m_excluded_active_count;
684  if (returned_count >= 3) {
686  }
687  return;
688  }
689  }
690  for (const auto &s : m_inactiveSources) {
691  if (source->ID() == s->ID()) {
692  edm::LogVerbatim("XrdAdaptorInternal")
693  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
695  return;
696  }
697  }
698  if (m_activeSources.size() < 2) {
699  auto oldSources = m_activeSources;
700  m_activeSources.push_back(source);
701  reportSiteChange(oldSources, m_activeSources);
703  } else {
704  m_inactiveSources.push_back(source);
705  }
706  } else { // File-open failure - wait at least 120s before next attempt.
707  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
708  int delayScale = 1;
709  if (status.status == XrdCl::errRedirectLimit) {
711  delayScale = m_redirectLimitDelayScale;
712  }
714  }
715 }
Log< level::Info, true > LogVerbatim
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
void queueUpdateCurrentServer(const std::string &)
static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY
std::atomic< unsigned > m_excluded_active_count
static std::string const source
Definition: EdmProvDump.cc:49
std::recursive_mutex m_source_mutex

◆ initialize()

void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 180 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), cms::cuda::assert(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, geometryDiff::file, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), heavyIonCSV_trainingSettings::idx, m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, eostools::move(), pileupDistInMC::oldList, prepareOpaqueString(), queueUpdateCurrentServer(), reportSiteChange(), SendMonitoringInfo(), source, mps_update::status, AlCaHLTBitMon_QueryRunRegistry::string, updateCurrentServer(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

180  {
182 
183  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
184  if (env) {
185  env->GetInt("StreamErrorWindow", m_timeout);
186  }
187 
188  std::string orig_site;
189  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find('.') == std::string::npos)) {
190  std::string hostname;
191  if (Source::getHostname(orig_site, hostname)) {
192  Source::getDomain(hostname, orig_site);
193  }
194  }
195 
196  std::unique_ptr<XrdCl::File> file;
198  bool validFile = false;
199  const int retries = 5;
200  std::string excludeString;
201  for (int idx = 0; idx < retries; idx++) {
202  file = std::make_unique<XrdCl::File>();
203  auto opaque = prepareOpaqueString();
204  std::string new_filename =
205  m_name + (!opaque.empty() ? ((m_name.find('?') == m_name.npos) ? "?" : "&") + opaque : "");
206  SyncHostResponseHandler handler;
207  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
208  if (!openStatus
209  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
210  // server and it was marked bad - xrootd couldn't even queue up the request internally!
211  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
212  // redirector being down or authentication failures.
213  ex.clearMessage();
214  ex.clearContext();
215  ex.clearAdditionalInfo();
216  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
217  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
218  << ", code=" << openStatus.code << ")";
219  ex.addContext("Calling XrdFile::open()");
220  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
221  throw ex;
222  }
223  handler.WaitForResponse();
224  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
225  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
226  tracerouteRedirections(hostList.get());
227  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
228  assert(status);
229  if (status->IsOK()) {
230  validFile = true;
231  break;
232  } else {
233  ex.clearMessage();
234  ex.clearContext();
235  ex.clearAdditionalInfo();
236  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
237  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
238  << ", code=" << status->code << ")";
239  ex.addContext("Calling XrdFile::open()");
240  addConnections(ex);
241  std::string dataServer, lastUrl;
242  file->GetProperty("DataServer", dataServer);
243  file->GetProperty("LastURL", lastUrl);
244  if (!dataServer.empty()) {
245  ex.addAdditionalInfo("Problematic data server: " + dataServer);
246  }
247  if (!lastUrl.empty()) {
248  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
249  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
250  }
251  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
252  m_disabledSourceStrings.end()) {
253  ex << ". No additional data servers were found.";
254  throw ex;
255  }
256  if (!dataServer.empty()) {
257  m_disabledSourceStrings.insert(dataServer);
258  m_disabledExcludeStrings.insert(excludeString);
259  }
260  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
261  if (lastUrl == new_filename) {
262  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
263  throw ex;
264  }
265  }
266  }
267  if (!validFile) {
268  throw ex;
269  }
271 
272  timespec ts;
274 
275  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
276  {
277  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
278  auto oldList = m_activeSources;
279  m_activeSources.push_back(source);
281  }
284 
285  m_lastSourceCheck = ts;
286  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
288 }
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
#define GET_CLOCK_MONOTONIC(ts)
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:290
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
std::string prepareOpaqueString() const
static void SendMonitoringInfo(XrdCl::File &file)
void queueUpdateCurrentServer(const std::string &)
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:248
void addConnections(cms::Exception &) const
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:328
Log< level::Warning, false > LogWarning
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:221
static std::string const source
Definition: EdmProvDump.cc:49
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex

◆ operator=()

RequestManager& XrdAdaptor::RequestManager::operator= ( const RequestManager )
delete

◆ pickSingleSource()

std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 586 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_nextInitialSourceToggle, m_perms, m_source_mutex, and source.

Referenced by handle().

586  {
587  std::shared_ptr<Source> source = nullptr;
588  {
589  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
590  if (m_activeSources.size() == 2) {
592  source = m_activeSources[0];
594  } else {
595  source = m_activeSources[1];
597  }
598  } else if (m_activeSources.empty()) {
600  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
601  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
602  ex.addContext("In XrdAdaptor::RequestManager::handle()");
603  addConnections(ex);
604  throw ex;
605  } else {
606  source = m_activeSources[0];
607  }
608  }
609  return source;
610 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static std::string const source
Definition: EdmProvDump.cc:49
std::recursive_mutex m_source_mutex

◆ prepareOpaqueString()

std::string RequestManager::prepareOpaqueString ( ) const
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 639 of file XrdRequestManager.cc.

References submitPVResolutionJobs::count, m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, contentValuesCheck::ss, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

639  {
640  struct {
641  std::stringstream ss;
642  size_t count = 0;
643  bool has_active = false;
644 
645  void append_tried(const std::string &id, bool active = false) {
646  ss << (count ? "," : "tried=") << id;
647  count++;
648  if (active) {
649  has_active = true;
650  }
651  }
652  } state;
653  {
654  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
655 
656  for (const auto &it : m_activeSources) {
657  state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')), true);
658  }
659  for (const auto &it : m_inactiveSources) {
660  state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')));
661  }
662  }
663  for (const auto &it : m_disabledExcludeStrings) {
664  state.append_tried(it.substr(0, it.find(':')));
665  }
666  if (state.has_active) {
667  state.ss << "&triedrc=resel";
668  }
669 
670  return state.ss.str();
671 }
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
std::recursive_mutex m_source_mutex

◆ queueUpdateCurrentServer()

void RequestManager::queueUpdateCurrentServer ( const std::string &  id)
private

Definition at line 314 of file XrdRequestManager.cc.

References XrdAdaptor::Source::getHostname(), l1ctLayer2EG_cff::id, m_serverToAdvertise, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize().

314  {
315  auto hostname = std::make_unique<std::string>(id);
316  if (Source::getHostname(id, *hostname)) {
317  std::string *null_hostname = nullptr;
318  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
319  hostname.release();
320  }
321  }
322 }
std::atomic< std::string * > m_serverToAdvertise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:221

◆ reportSiteChange()

void RequestManager::reportSiteChange ( std::vector< std::shared_ptr< Source >> const &  iOld,
std::vector< std::shared_ptr< Source >> const &  iNew,
std::string  orig_site = std::string{} 
) const
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 341 of file XrdRequestManager.cc.

References XrdAdaptor::Source::getHostname(), mps_fire::i, quality, AlCaHLTBitMon_QueryRunRegistry::string, and to_string().

Referenced by checkSourcesImpl(), compareSources(), initialize(), and requestFailure().

343  {
344  auto siteList = formatSites(iNew);
345  if (orig_site.empty() || (orig_site == siteList)) {
346  auto oldSites = formatSites(iOld);
347  }
348 
349  edm::LogInfo("XrdAdaptorLvl1").log([&](auto &li) {
350  li << "Serving data from: ";
351  int size_active_sources = iNew.size();
352  for (int i = 0; i < size_active_sources; ++i) {
353  std::string hostname_a;
354  Source::getHostname(iNew[i]->PrettyID(), hostname_a);
355  li.format(" [{}] {}", i + 1, hostname_a);
356  }
357  });
358 
359  edm::LogInfo("XrdAdaptorLvl3").log([&](auto &li) {
360  li << "The quality of the active sources is: ";
361  int size_active_sources = iNew.size();
362  for (int i = 0; i < size_active_sources; ++i) {
363  std::string hostname_a;
364  Source::getHostname(iNew[i]->PrettyID(), hostname_a);
365  std::string quality = std::to_string(iNew[i]->getQuality());
366  li.format(" [{}] {} for {}", i + 1, quality, hostname_a);
367  }
368  });
369 }
static std::string to_string(const XMLCh *ch)
string quality
Log< level::Info, false > LogInfo
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:221

◆ requestFailure()

void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 818 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, submitPVValidationJobs::now, reportSiteChange(), seconds(), mps_update::status, and mps_check::timeout.

818  {
819  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
820 
821  // Fail early for invalid responses - XrdFile has a separate path for handling this.
822  if (c_status.code == XrdCl::errInvalidResponse) {
823  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
825  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
826  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
827  << ") => Invalid ReadV response from server";
828  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
829  addConnections(ex);
830  throw ex;
831  }
832  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
833 
834  // Note that we do not delete the Source itself. That is because this
835  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
836  // In such a case, if you close a file in the handler, it will deadlock
837  m_disabledSourceStrings.insert(source_ptr->ID());
838  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
839  m_disabledSources.insert(source_ptr);
840 
841  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
842  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
843  auto oldSources = m_activeSources;
844  m_activeSources.erase(m_activeSources.begin());
845  reportSiteChange(oldSources, m_activeSources);
846  } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
847  auto oldSources = m_activeSources;
848  m_activeSources.erase(m_activeSources.begin() + 1);
849  reportSiteChange(oldSources, m_activeSources);
850  }
851  std::shared_ptr<Source> new_source;
852  if (m_activeSources.empty()) {
853  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
854  timespec now;
857  // Note we only wait for 180 seconds here. This is because we've already failed
858  // once and the likelihood the program has some inconsistent state is decent.
859  // We'd much rather fail hard than deadlock!
860  sentry.unlock();
861  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
864  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
865  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
866  << ") => timeout when waiting for file open";
867  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
868  addConnections(ex);
869  throw ex;
870  } else {
871  try {
872  new_source = future.get();
873  } catch (edm::Exception &ex) {
874  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
875  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
876  throw;
877  }
878  }
879 
880  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
881  m_disabledSourceStrings.end()) {
882  // The server gave us back a data node we requested excluded. Fatal!
884  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
885  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
886  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
887  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
888  addConnections(ex);
889  throw ex;
890  }
891  sentry.lock();
892 
893  auto oldSources = m_activeSources;
894  m_activeSources.push_back(new_source);
895  reportSiteChange(oldSources, m_activeSources);
896  } else {
897  new_source = m_activeSources[0];
898  }
899  new_source->handle(c_ptr);
900 }
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
double seconds()
#define GET_CLOCK_MONOTONIC(ts)
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
int timeout
Definition: mps_check.py:53
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:173
void addConnections(cms::Exception &) const
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
Definition: Exception.cc:169
oneapi::tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
Log< level::Warning, false > LogWarning
std::recursive_mutex m_source_mutex

◆ splitClientRequest()

void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2,
std::vector< std::shared_ptr< Source >> const &  activeSources 
) const
private

Given a client request, split it into two requests lists.

Definition at line 994 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, cms::Exception::addAdditionalInfo(), cms::Exception::addContext(), cms::cuda::assert(), consumeChunkBack(), consumeChunkFront(), TauDecayModes::dec, edm::errors::FileReadError, SiStripPI::max, edm::storage::IOPosBuffer::offset(), jetUpdater_cfi::sort, contentValuesCheck::ss, validateList(), XRD_ADAPTOR_CHUNK_THRESHOLD, and XRD_CL_MAX_CHUNK.

997  {
998  if (iolist.empty())
999  return;
1000  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
1001  req1.reserve(iolist.size() / 2 + 1);
1002  req2.reserve(iolist.size() / 2 + 1);
1003  size_t front = 0;
1004 
1005  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
1006  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
1007  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
1008  IOSize chunk1, chunk2;
1009  // Make sure the chunk size is at least 1024; little point to reads less than that size.
1010  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
1011  static_cast<IOSize>(1024));
1012  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
1013  static_cast<IOSize>(1024));
1014 
1015  IOSize size_orig = 0;
1016  for (const auto &it : iolist)
1017  size_orig += it.size();
1018 
1019  while (tmp_iolist.size() - front > 0) {
1020  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
1021  (req2.size() >=
1022  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
1023  // are passed to the request manager. However, because we have a max chunk size, we increase
1024  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
1025  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
1026  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
1028  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
1029  << ", permissions=0" << std::oct << m_perms << std::dec
1030  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
1031  "reported to CMSSW developers.";
1032  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
1033  addConnections(ex);
1034  std::stringstream ss;
1035  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
1036  ex.addAdditionalInfo(ss.str());
1037  std::stringstream ss2;
1038  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
1039  ex.addAdditionalInfo(ss2.str());
1040  throw ex;
1041  }
1042  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
1043  consumeChunkFront(front, tmp_iolist, req1, chunk1);
1044  }
1045  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
1046  consumeChunkBack(front, tmp_iolist, req2, chunk2);
1047  }
1048  }
1049  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
1050  return left.offset() < right.offset();
1051  });
1052  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
1053  return left.offset() < right.offset();
1054  });
1055 
1056  IOSize size1 = validateList(req1);
1057  IOSize size2 = validateList(req2);
1058 
1059  assert(size_orig == size1 + size2);
1060 
1061  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
1062  << " bytes) split into requests size " << req1.size() << " (" << size1
1063  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
1064 }
Log< level::Info, true > LogVerbatim
static constexpr int XRD_CL_MAX_CHUNK
assert(be >=bs)
IOOffset offset() const
Definition: IOPosBuffer.h:41
void addConnections(cms::Exception &) const
size_t IOSize
Definition: IOTypes.h:15
XrdCl::OpenFlags::Flags m_flags
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
XrdCl::Access::Mode m_perms
static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD
static IOSize validateList(const std::vector< IOPosBuffer > req)
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)

◆ updateCurrentServer()

void RequestManager::updateCurrentServer ( )
inlineprivate

Update the StatisticsSenderService, if necessary, with the current server.

Update the StatisticsSenderService with the current server info.

As this accesses the edm::Service infrastructure, this MUST be called from an edm-managed thread. It CANNOT be called from an Xrootd-managed thread.

Definition at line 297 of file XrdRequestManager.cc.

References edm::Service< T >::isAvailable(), LIKELY, m_name, m_serverToAdvertise, edm::storage::StatisticsSenderService::setCurrentServer(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize().

297  {
298  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
299  // a pending update. *However*, since we call this for every read, we'll get it
300  // eventually.
301  if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
302  return;
303  }
304  std::string *hostname_ptr;
305  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
306  std::unique_ptr<std::string> hostname(hostname_ptr);
308  if (statsService.isAvailable()) {
309  statsService->setCurrentServer(m_name, *hostname_ptr);
310  }
311  }
312 }
#define LIKELY(x)
Definition: Likely.h:20
std::atomic< std::string * > m_serverToAdvertise
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
bool isAvailable() const
Definition: Service.h:40

Member Data Documentation

◆ m_activeSources

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 212 of file XrdRequestManager.h.

Referenced by getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), handle(), initialize(), pickSingleSource(), prepareOpaqueString(), and requestFailure().

◆ m_disabledExcludeStrings

oneapi::tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 216 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

◆ m_disabledSources

oneapi::tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 217 of file XrdRequestManager.h.

Referenced by requestFailure().

◆ m_disabledSourceStrings

oneapi::tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 215 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

◆ m_distribution

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 238 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

◆ m_excluded_active_count

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 240 of file XrdRequestManager.h.

◆ m_flags

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private

◆ m_generator

std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 237 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

◆ m_inactiveSources

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 213 of file XrdRequestManager.h.

Referenced by handle(), and prepareOpaqueString().

◆ m_lastSourceCheck

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 223 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

◆ m_name

const std::string XrdAdaptor::RequestManager::m_name
private

◆ m_nextActiveSourceCheck

timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 228 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), and initialize().

◆ m_nextInitialSourceToggle

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 226 of file XrdRequestManager.h.

Referenced by pickSingleSource().

◆ m_open_handler

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 299 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

◆ m_perms

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private

◆ m_redirectLimitDelayScale

int XrdAdaptor::RequestManager::m_redirectLimitDelayScale
private

Definition at line 229 of file XrdRequestManager.h.

◆ m_serverToAdvertise

std::atomic<std::string *> XrdAdaptor::RequestManager::m_serverToAdvertise
private

Definition at line 221 of file XrdRequestManager.h.

Referenced by queueUpdateCurrentServer(), and updateCurrentServer().

◆ m_source_mutex

std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
mutableprivate

◆ m_timeout

int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 224 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

◆ searchMode

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 230 of file XrdRequestManager.h.

◆ XRD_DEFAULT_TIMEOUT

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3 * 60
static

Definition at line 53 of file XrdRequestManager.h.