CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Classes

class  OpenHandler
 

Public Types

using IOOffset = edm::storage::IOOffset
 
using IOPosBuffer = edm::storage::IOPosBuffer
 
using IOSize = edm::storage::IOSize
 

Public Member Functions

void addConnections (cms::Exception &) const
 
std::shared_ptr< XrdCl::File > getActiveFile () const
 
void getActiveSourceNames (std::vector< std::string > &sources) const
 
void getDisabledSourceNames (std::vector< std::string > &sources) const
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources) const
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer >> iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
RequestManageroperator= (const RequestManager &)=delete
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
 RequestManager (const RequestManager &)=delete
 
virtual ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr< RequestManagergetInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
void checkSourcesImpl (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
bool compareSources (const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString () const
 
void queueUpdateCurrentServer (const std::string &)
 
void reportSiteChange (std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
 
void updateCurrentServer ()
 

Private Attributes

std::vector< std::shared_ptr< Source > > m_activeSources
 
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
 
oneapi::tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHashm_disabledSources
 
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
 
std::uniform_real_distribution< float > m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr< Source > > m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
int m_redirectLimitDelayScale
 
std::atomic< std::string * > m_serverToAdvertise
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 44 of file XrdRequestManager.h.

Member Typedef Documentation

◆ IOOffset

Definition at line 48 of file XrdRequestManager.h.

◆ IOPosBuffer

Definition at line 47 of file XrdRequestManager.h.

◆ IOSize

Definition at line 46 of file XrdRequestManager.h.

Constructor & Destructor Documentation

◆ RequestManager() [1/2]

XrdAdaptor::RequestManager::RequestManager ( const RequestManager )
delete

Referenced by getInstance().

◆ ~RequestManager()

virtual XrdAdaptor::RequestManager::~RequestManager ( )
virtualdefault

◆ RequestManager() [2/2]

RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 121 of file XrdRequestManager.cc.

122  : m_serverToAdvertise(nullptr),
126  m_name(filename),
127  m_flags(flags),
128  m_perms(perms),
129  m_distribution(0, 100),
std::uniform_real_distribution< float > m_distribution
static const unsigned int XRD_DEFAULT_TIMEOUT
std::atomic< std::string * > m_serverToAdvertise
XrdCl::OpenFlags::Flags m_flags
XrdCl::Access::Mode m_perms
std::atomic< unsigned > m_excluded_active_count

Member Function Documentation

◆ addConnections()

void RequestManager::addConnections ( cms::Exception ex) const

Add the list of active connections to the exception extra info.

Definition at line 495 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), source, and CalibrationSummaryClient_cfi::sources.

Referenced by getActiveFile(), initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), pickSingleSource(), and requestFailure().

495  {
496  std::vector<std::string> sources;
498  for (auto const &source : sources) {
499  ex.addAdditionalInfo("Active source: " + source);
500  }
501  sources.clear();
503  for (auto const &source : sources) {
504  ex.addAdditionalInfo("Disabled source: " + source);
505  }
506 }
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
void getDisabledSourceNames(std::vector< std::string > &sources) const
static std::string const source
Definition: EdmProvDump.cc:49
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const

◆ broadcastRequest()

void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

◆ checkSources()

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 307 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, submitPVValidationJobs::now, and timeDiffMS().

Referenced by handle().

310  {
311  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
312  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
313  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
314  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
315  { // Be more aggressive about getting rid of very bad sources.
316  compareSources(now, 0, 1, activeSources, inactiveSources);
317  compareSources(now, 1, 0, activeSources, inactiveSources);
318  }
320  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
321  }
322  }
323 }
Log< level::Info, true > LogVerbatim
long long timeDiffMS(const timespec &a, const timespec &b)
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const

◆ checkSourcesImpl()

void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Definition at line 353 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, compareSources(), m_distribution, m_generator, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, eostools::move(), submitPVValidationJobs::now, alignCSCRings::r, reportSiteChange(), source, timeDiffMS(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

356  {
357  bool findNewSource = false;
358  if (activeSources.size() <= 1) {
359  findNewSource = true;
360  } else if (activeSources.size() > 1) {
361  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
362  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
363  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
364  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
365 
366  // NOTE: We could probably replace the copy with a better sort function.
367  // However, there are typically very few sources and the correctness is more obvious right now.
368  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
369  eligibleInactiveSources.reserve(inactiveSources.size());
370  for (const auto &source : inactiveSources) {
371  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
372  eligibleInactiveSources.push_back(source);
373  }
374  }
375  auto bestInactiveSource =
376  std::min_element(eligibleInactiveSources.begin(),
377  eligibleInactiveSources.end(),
378  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
379  return s1->getQuality() < s2->getQuality();
380  });
381  auto worstActiveSource = std::max_element(activeSources.cbegin(),
382  activeSources.cend(),
383  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
384  return s1->getQuality() < s2->getQuality();
385  });
386  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
387  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
388  << ", quality " << (*bestInactiveSource)->getQuality();
389  }
390  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
391  << ", quality " << (*worstActiveSource)->getQuality();
392  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
393  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
394  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
395  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
396  auto oldSources = activeSources;
397  activeSources.push_back(*bestInactiveSource);
398  reportSiteChange(oldSources, activeSources);
399  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
400  if (it->get() == bestInactiveSource->get()) {
401  inactiveSources.erase(it);
402  break;
403  }
404  } else
405  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
406  (*worstActiveSource)->getQuality() >
407  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
408  edm::LogVerbatim("XrdAdaptorInternal")
409  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
410  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
411  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
412  (*worstActiveSource)->setLastDowngrade(now);
413  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
414  if (it->get() == bestInactiveSource->get()) {
415  inactiveSources.erase(it);
416  break;
417  }
418  inactiveSources.emplace_back(*worstActiveSource);
419  auto oldSources = activeSources;
420  activeSources.erase(worstActiveSource);
421  activeSources.emplace_back(std::move(*bestInactiveSource));
422  reportSiteChange(oldSources, activeSources);
423  eligibleInactiveSources.clear();
424  for (const auto &source : inactiveSources)
425  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
426  eligibleInactiveSources.push_back(source);
427  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
428  eligibleInactiveSources.end(),
429  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
430  return s1->getQuality() < s2->getQuality();
431  });
432  worstActiveSource = std::max_element(activeSources.begin(),
433  activeSources.end(),
434  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
435  return s1->getQuality() < s2->getQuality();
436  });
437  }
438  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
439  float r = m_distribution(m_generator);
441  findNewSource = true;
442  }
443  }
444  }
445  if (findNewSource) {
446  m_open_handler->open();
448  }
449 
450  // Only aggressively look for new sources if we don't have two.
451  if (activeSources.size() == 2) {
453  } else {
455  }
457 }
Log< level::Info, true > LogVerbatim
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::uniform_real_distribution< float > m_distribution
long long timeDiffMS(const timespec &a, const timespec &b)
static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY
std::shared_ptr< OpenHandler > m_open_handler
static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY
static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
static std::string const source
Definition: EdmProvDump.cc:49
def move(src, dest)
Definition: eostools.py:511

◆ compareSources()

bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
) const
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 325 of file XrdRequestManager.cc.

References a, CalibrationSummaryClient_cfi::activeSources, b, SiStripPI::max, submitPVValidationJobs::now, and reportSiteChange().

Referenced by checkSources(), and checkSourcesImpl().

329  {
330  if (activeSources.size() < std::max(a, b) + 1) {
331  return false;
332  }
333 
334  bool findNewSource = false;
335  if ((activeSources[a]->getQuality() > 5130) ||
336  ((activeSources[a]->getQuality() > 260) &&
337  (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
338  edm::LogVerbatim("XrdAdaptorInternal")
339  << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
340  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
341  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
342  findNewSource = true;
343  }
344  activeSources[a]->setLastDowngrade(now);
345  inactiveSources.emplace_back(activeSources[a]);
346  auto oldSources = activeSources;
347  activeSources.erase(activeSources.begin() + a);
348  reportSiteChange(oldSources, activeSources);
349  }
350  return findNewSource;
351 }
Log< level::Info, true > LogVerbatim
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
double b
Definition: hdecay.h:118
double a
Definition: hdecay.h:119

◆ getActiveFile()

std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  ) const

Return a pointer to an active file. Useful for metadata operations.

Definition at line 459 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_perms, and m_source_mutex.

459  {
460  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
461  if (m_activeSources.empty()) {
463  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
464  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
465  ex.addContext("In XrdAdaptor::RequestManager::handle()");
466  addConnections(ex);
467  throw ex;
468  }
469  return m_activeSources[0]->getFileHandle();
470 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
std::recursive_mutex m_source_mutex

◆ getActiveSourceNames()

void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 472 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, source, and CalibrationSummaryClient_cfi::sources.

472  {
473  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
474  sources.reserve(m_activeSources.size());
475  for (auto const &source : m_activeSources) {
476  sources.push_back(source->ID());
477  }
478 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:49
std::recursive_mutex m_source_mutex

◆ getDisabledSourceNames()

void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 488 of file XrdRequestManager.cc.

References m_disabledSourceStrings, source, and CalibrationSummaryClient_cfi::sources.

Referenced by addConnections().

488  {
489  sources.reserve(m_disabledSourceStrings.size());
490  for (auto const &source : m_disabledSourceStrings) {
491  sources.push_back(source);
492  }
493 }
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static std::string const source
Definition: EdmProvDump.cc:49

◆ getFilename()

const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 107 of file XrdRequestManager.h.

References m_name.

107 { return m_name; }

◆ getInstance()

static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 116 of file XrdRequestManager.h.

References corrVsCorr::filename, HLT_2022v14_cff::flags, instance, and RequestManager().

118  {
119  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
120  instance->initialize(instance);
121  return instance;
122  }
static PFTauRenderPlugin instance
RequestManager(const RequestManager &)=delete

◆ getPrettyActiveSourceNames()

void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources) const

Definition at line 480 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, source, and CalibrationSummaryClient_cfi::sources.

Referenced by addConnections().

480  {
481  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
482  sources.reserve(m_activeSources.size());
483  for (auto const &source : m_activeSources) {
484  sources.push_back(source->PrettyID());
485  }
486 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:49
std::recursive_mutex m_source_mutex

◆ handle() [1/3]

std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 60 of file XrdRequestManager.h.

References findQualityFiles::size.

60  {
61  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
62  return handle(c_ptr);
63  }
size
Write out results.
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)

◆ handle() [2/3]

std::future< IOSize > XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer >>  iolist)

Definition at line 639 of file XrdRequestManager.cc.

References a, CalibrationSummaryClient_cfi::activeSources, cms::Exception::addContext(), cms::cuda::assert(), b, TauDecayModes::dec, edm::errors::FileReadError, GET_CLOCK_MONOTONIC, eostools::move(), submitPVValidationJobs::now, AlCaHLTBitMon_ParallelJobs::p, edm::CPUTimer::start(), edm::CPUTimer::stop(), and TrackValidation_cff::task.

639  {
640  //Use a copy of m_activeSources and m_inactiveSources throughout this function
641  // in order to avoid holding the lock a long time and causing a deadlock.
642  // When the function is over we will update the values of the containers
643  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
644  {
645  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
647  inactiveSources = m_inactiveSources;
648  }
649  //Make sure we update changes when we leave the function
650  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
651  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
653  m_inactiveSources = std::move(inactiveSources);
654  });
655 
657 
658  timespec now;
660 
661  edm::CPUTimer timer;
662  timer.start();
663 
664  if (activeSources.size() == 1) {
665  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
666  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
667  activeSources[0]->handle(c_ptr);
668  return c_ptr->get_future();
669  }
670  // Make sure active
671  else if (activeSources.empty()) {
673  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
674  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
675  ex.addContext("In XrdAdaptor::RequestManager::handle()");
676  addConnections(ex);
677  throw ex;
678  }
679 
680  assert(iolist.get());
681  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
682  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
683  splitClientRequest(*iolist, *req1, *req2, activeSources);
684 
685  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
686  // CheckSources may have removed a source
687  if (activeSources.size() == 1) {
688  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
689  activeSources[0]->handle(c_ptr);
690  return c_ptr->get_future();
691  }
692 
693  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
694  std::future<IOSize> future1, future2;
695  if (!req1->empty()) {
696  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
697  activeSources[0]->handle(c_ptr1);
698  future1 = c_ptr1->get_future();
699  }
700  if (!req2->empty()) {
701  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
702  activeSources[1]->handle(c_ptr2);
703  future2 = c_ptr2->get_future();
704  }
705  if (!req1->empty() && !req2->empty()) {
706  std::future<IOSize> task = std::async(
707  std::launch::deferred,
708  [](std::future<IOSize> a, std::future<IOSize> b) {
709  // Wait until *both* results are available. This is essential
710  // as the callback may try referencing the RequestManager. If one
711  // throws an exception (causing the RequestManager to be destroyed by
712  // XrdFile) and the other has a failure, then the recovery code will
713  // reference the destroyed RequestManager.
714  //
715  // Unlike other places where we use shared/weak ptrs to maintain object
716  // lifetime and destruction asynchronously, we *cannot* destroy the request
717  // asynchronously as it is associated with a ROOT buffer. We must wait until we
718  // are guaranteed that XrdCl will not write into the ROOT buffer before we
719  // can return.
720  b.wait();
721  a.wait();
722  return b.get() + a.get();
723  },
724  std::move(future1),
725  std::move(future2));
726  timer.stop();
727  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
728  return task;
729  } else if (!req1->empty()) {
730  return future1;
731  } else if (!req2->empty()) {
732  return future2;
733  } else { // Degenerate case - no bytes to read.
734  std::promise<IOSize> p;
735  p.set_value(0);
736  return p.get_future();
737  }
738 }
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
void start()
Definition: CPUTimer.cc:68
#define GET_CLOCK_MONOTONIC(ts)
assert(be >=bs)
std::vector< std::shared_ptr< Source > > m_inactiveSources
void addConnections(cms::Exception &) const
Times stop()
Definition: CPUTimer.cc:87
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:118
double a
Definition: hdecay.h:119
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)

◆ handle() [3/3]

std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 534 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, cms::cuda::assert(), checkSources(), GET_CLOCK_MONOTONIC, m_activeSources, m_inactiveSources, m_source_mutex, eostools::move(), submitPVValidationJobs::now, pickSingleSource(), and source.

534  {
535  assert(c_ptr.get());
536  timespec now;
538  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
539  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
540  {
541  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
543  inactiveSources = m_inactiveSources;
544  }
545  {
546  //make sure we update values before calling pickSingelSource
547  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
548  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
550  m_inactiveSources = std::move(inactiveSources);
551  });
552 
553  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
554  }
555 
556  std::shared_ptr<Source> source = pickSingleSource();
557  source->handle(c_ptr);
558  return c_ptr->get_future();
559 }
#define GET_CLOCK_MONOTONIC(ts)
assert(be >=bs)
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
std::shared_ptr< Source > pickSingleSource()
static std::string const source
Definition: EdmProvDump.cc:49
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)

◆ handleOpen()

void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 595 of file XrdRequestManager.cc.

References SiStripPI::min, alignCSCRings::s, source, mps_update::status, XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

595  {
596  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
597  if (status.IsOK()) {
598  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
600  for (const auto &s : m_activeSources) {
601  if (source->ID() == s->ID()) {
602  edm::LogVerbatim("XrdAdaptorInternal")
603  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
604  unsigned returned_count = ++m_excluded_active_count;
606  if (returned_count >= 3) {
608  }
609  return;
610  }
611  }
612  for (const auto &s : m_inactiveSources) {
613  if (source->ID() == s->ID()) {
614  edm::LogVerbatim("XrdAdaptorInternal")
615  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
617  return;
618  }
619  }
620  if (m_activeSources.size() < 2) {
621  auto oldSources = m_activeSources;
622  m_activeSources.push_back(source);
623  reportSiteChange(oldSources, m_activeSources);
625  } else {
626  m_inactiveSources.push_back(source);
627  }
628  } else { // File-open failure - wait at least 120s before next attempt.
629  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
630  int delayScale = 1;
631  if (status.status == XrdCl::errRedirectLimit) {
633  delayScale = m_redirectLimitDelayScale;
634  }
636  }
637 }
Log< level::Info, true > LogVerbatim
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
void queueUpdateCurrentServer(const std::string &)
static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY
std::atomic< unsigned > m_excluded_active_count
static std::string const source
Definition: EdmProvDump.cc:49
std::recursive_mutex m_source_mutex

◆ initialize()

void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 132 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), cms::cuda::assert(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, geometryDiff::file, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), heavyIonCSV_trainingSettings::idx, m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, eostools::move(), pileupDistInMC::oldList, prepareOpaqueString(), queueUpdateCurrentServer(), reportSiteChange(), SendMonitoringInfo(), source, mps_update::status, AlCaHLTBitMon_QueryRunRegistry::string, updateCurrentServer(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

132  {
134 
135  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
136  if (env) {
137  env->GetInt("StreamErrorWindow", m_timeout);
138  }
139 
140  std::string orig_site;
141  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find('.') == std::string::npos)) {
142  std::string hostname;
143  if (Source::getHostname(orig_site, hostname)) {
144  Source::getDomain(hostname, orig_site);
145  }
146  }
147 
148  std::unique_ptr<XrdCl::File> file;
150  bool validFile = false;
151  const int retries = 5;
152  std::string excludeString;
153  for (int idx = 0; idx < retries; idx++) {
154  file = std::make_unique<XrdCl::File>();
155  auto opaque = prepareOpaqueString();
156  std::string new_filename =
157  m_name + (!opaque.empty() ? ((m_name.find('?') == m_name.npos) ? "?" : "&") + opaque : "");
158  SyncHostResponseHandler handler;
159  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
160  if (!openStatus
161  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
162  // server and it was marked bad - xrootd couldn't even queue up the request internally!
163  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
164  // redirector being down or authentication failures.
165  ex.clearMessage();
166  ex.clearContext();
167  ex.clearAdditionalInfo();
168  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
169  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
170  << ", code=" << openStatus.code << ")";
171  ex.addContext("Calling XrdFile::open()");
172  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
173  throw ex;
174  }
175  handler.WaitForResponse();
176  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
177  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
178  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
179  assert(status);
180  if (status->IsOK()) {
181  validFile = true;
182  break;
183  } else {
184  ex.clearMessage();
185  ex.clearContext();
186  ex.clearAdditionalInfo();
187  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
188  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
189  << ", code=" << status->code << ")";
190  ex.addContext("Calling XrdFile::open()");
191  addConnections(ex);
192  std::string dataServer, lastUrl;
193  file->GetProperty("DataServer", dataServer);
194  file->GetProperty("LastURL", lastUrl);
195  if (!dataServer.empty()) {
196  ex.addAdditionalInfo("Problematic data server: " + dataServer);
197  }
198  if (!lastUrl.empty()) {
199  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
200  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
201  }
202  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
203  m_disabledSourceStrings.end()) {
204  ex << ". No additional data servers were found.";
205  throw ex;
206  }
207  if (!dataServer.empty()) {
208  m_disabledSourceStrings.insert(dataServer);
209  m_disabledExcludeStrings.insert(excludeString);
210  }
211  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
212  if (lastUrl == new_filename) {
213  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
214  throw ex;
215  }
216  }
217  }
218  if (!validFile) {
219  throw ex;
220  }
222 
223  timespec ts;
225 
226  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
227  {
228  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
229  auto oldList = m_activeSources;
230  m_activeSources.push_back(source);
232  }
235 
236  m_lastSourceCheck = ts;
237  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
239 }
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
#define GET_CLOCK_MONOTONIC(ts)
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:290
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
std::string prepareOpaqueString() const
static void SendMonitoringInfo(XrdCl::File &file)
void queueUpdateCurrentServer(const std::string &)
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:248
void addConnections(cms::Exception &) const
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:328
Log< level::Warning, false > LogWarning
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:221
static std::string const source
Definition: EdmProvDump.cc:49
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex

◆ operator=()

RequestManager& XrdAdaptor::RequestManager::operator= ( const RequestManager )
delete

◆ pickSingleSource()

std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 508 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_nextInitialSourceToggle, m_perms, m_source_mutex, and source.

Referenced by handle().

508  {
509  std::shared_ptr<Source> source = nullptr;
510  {
511  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
512  if (m_activeSources.size() == 2) {
514  source = m_activeSources[0];
516  } else {
517  source = m_activeSources[1];
519  }
520  } else if (m_activeSources.empty()) {
522  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
523  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
524  ex.addContext("In XrdAdaptor::RequestManager::handle()");
525  addConnections(ex);
526  throw ex;
527  } else {
528  source = m_activeSources[0];
529  }
530  }
531  return source;
532 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static std::string const source
Definition: EdmProvDump.cc:49
std::recursive_mutex m_source_mutex

◆ prepareOpaqueString()

std::string RequestManager::prepareOpaqueString ( ) const
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 561 of file XrdRequestManager.cc.

References submitPVResolutionJobs::count, m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, contentValuesCheck::ss, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

561  {
562  struct {
563  std::stringstream ss;
564  size_t count = 0;
565  bool has_active = false;
566 
567  void append_tried(const std::string &id, bool active = false) {
568  ss << (count ? "," : "tried=") << id;
569  count++;
570  if (active) {
571  has_active = true;
572  }
573  }
574  } state;
575  {
576  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
577 
578  for (const auto &it : m_activeSources) {
579  state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')), true);
580  }
581  for (const auto &it : m_inactiveSources) {
582  state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')));
583  }
584  }
585  for (const auto &it : m_disabledExcludeStrings) {
586  state.append_tried(it.substr(0, it.find(':')));
587  }
588  if (state.has_active) {
589  state.ss << "&triedrc=resel";
590  }
591 
592  return state.ss.str();
593 }
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
std::recursive_mutex m_source_mutex

◆ queueUpdateCurrentServer()

void RequestManager::queueUpdateCurrentServer ( const std::string &  id)
private

Definition at line 265 of file XrdRequestManager.cc.

References XrdAdaptor::Source::getHostname(), l1ctLayer2EG_cff::id, m_serverToAdvertise, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize().

265  {
266  auto hostname = std::make_unique<std::string>(id);
267  if (Source::getHostname(id, *hostname)) {
268  std::string *null_hostname = nullptr;
269  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
270  hostname.release();
271  }
272  }
273 }
std::atomic< std::string * > m_serverToAdvertise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:221

◆ reportSiteChange()

void RequestManager::reportSiteChange ( std::vector< std::shared_ptr< Source >> const &  iOld,
std::vector< std::shared_ptr< Source >> const &  iNew,
std::string  orig_site = std::string{} 
) const
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 292 of file XrdRequestManager.cc.

Referenced by checkSourcesImpl(), compareSources(), initialize(), and requestFailure().

294  {
295  auto siteList = formatSites(iNew);
296  if (!orig_site.empty() && (orig_site != siteList)) {
297  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
298  } else {
299  auto oldSites = formatSites(iOld);
300  if (orig_site.empty() && (siteList != oldSites)) {
301  if (!oldSites.empty())
302  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
303  }
304  }
305 }
Log< level::Warning, false > LogWarning

◆ requestFailure()

void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 740 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, submitPVValidationJobs::now, reportSiteChange(), seconds(), mps_update::status, and mps_check::timeout.

740  {
741  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
742 
743  // Fail early for invalid responses - XrdFile has a separate path for handling this.
744  if (c_status.code == XrdCl::errInvalidResponse) {
745  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
747  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
748  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
749  << ") => Invalid ReadV response from server";
750  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
751  addConnections(ex);
752  throw ex;
753  }
754  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
755 
756  // Note that we do not delete the Source itself. That is because this
757  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
758  // In such a case, if you close a file in the handler, it will deadlock
759  m_disabledSourceStrings.insert(source_ptr->ID());
760  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
761  m_disabledSources.insert(source_ptr);
762 
763  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
764  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
765  auto oldSources = m_activeSources;
766  m_activeSources.erase(m_activeSources.begin());
767  reportSiteChange(oldSources, m_activeSources);
768  } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
769  auto oldSources = m_activeSources;
770  m_activeSources.erase(m_activeSources.begin() + 1);
771  reportSiteChange(oldSources, m_activeSources);
772  }
773  std::shared_ptr<Source> new_source;
774  if (m_activeSources.empty()) {
775  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
776  timespec now;
779  // Note we only wait for 180 seconds here. This is because we've already failed
780  // once and the likelihood the program has some inconsistent state is decent.
781  // We'd much rather fail hard than deadlock!
782  sentry.unlock();
783  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
786  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
787  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
788  << ") => timeout when waiting for file open";
789  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
790  addConnections(ex);
791  throw ex;
792  } else {
793  try {
794  new_source = future.get();
795  } catch (edm::Exception &ex) {
796  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
797  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
798  throw;
799  }
800  }
801 
802  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
803  m_disabledSourceStrings.end()) {
804  // The server gave us back a data node we requested excluded. Fatal!
806  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
807  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
808  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
809  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
810  addConnections(ex);
811  throw ex;
812  }
813  sentry.lock();
814 
815  auto oldSources = m_activeSources;
816  m_activeSources.push_back(new_source);
817  reportSiteChange(oldSources, m_activeSources);
818  } else {
819  new_source = m_activeSources[0];
820  }
821  new_source->handle(c_ptr);
822 }
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
double seconds()
#define GET_CLOCK_MONOTONIC(ts)
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
int timeout
Definition: mps_check.py:53
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
void addConnections(cms::Exception &) const
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
Definition: Exception.cc:165
oneapi::tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
Log< level::Warning, false > LogWarning
std::recursive_mutex m_source_mutex

◆ splitClientRequest()

void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2,
std::vector< std::shared_ptr< Source >> const &  activeSources 
) const
private

Given a client request, split it into two requests lists.

Definition at line 916 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, cms::Exception::addAdditionalInfo(), cms::Exception::addContext(), cms::cuda::assert(), consumeChunkBack(), consumeChunkFront(), TauDecayModes::dec, edm::errors::FileReadError, SiStripPI::max, edm::storage::IOPosBuffer::offset(), jetUpdater_cfi::sort, contentValuesCheck::ss, validateList(), XRD_ADAPTOR_CHUNK_THRESHOLD, and XRD_CL_MAX_CHUNK.

919  {
920  if (iolist.empty())
921  return;
922  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
923  req1.reserve(iolist.size() / 2 + 1);
924  req2.reserve(iolist.size() / 2 + 1);
925  size_t front = 0;
926 
927  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
928  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
929  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
930  IOSize chunk1, chunk2;
931  // Make sure the chunk size is at least 1024; little point to reads less than that size.
932  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
933  static_cast<IOSize>(1024));
934  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
935  static_cast<IOSize>(1024));
936 
937  IOSize size_orig = 0;
938  for (const auto &it : iolist)
939  size_orig += it.size();
940 
941  while (tmp_iolist.size() - front > 0) {
942  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
943  (req2.size() >=
944  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
945  // are passed to the request manager. However, because we have a max chunk size, we increase
946  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
947  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
948  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
950  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
951  << ", permissions=0" << std::oct << m_perms << std::dec
952  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
953  "reported to CMSSW developers.";
954  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
955  addConnections(ex);
956  std::stringstream ss;
957  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
958  ex.addAdditionalInfo(ss.str());
959  std::stringstream ss2;
960  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
961  ex.addAdditionalInfo(ss2.str());
962  throw ex;
963  }
964  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
965  consumeChunkFront(front, tmp_iolist, req1, chunk1);
966  }
967  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
968  consumeChunkBack(front, tmp_iolist, req2, chunk2);
969  }
970  }
971  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
972  return left.offset() < right.offset();
973  });
974  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
975  return left.offset() < right.offset();
976  });
977 
978  IOSize size1 = validateList(req1);
979  IOSize size2 = validateList(req2);
980 
981  assert(size_orig == size1 + size2);
982 
983  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
984  << " bytes) split into requests size " << req1.size() << " (" << size1
985  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
986 }
Log< level::Info, true > LogVerbatim
static constexpr int XRD_CL_MAX_CHUNK
assert(be >=bs)
IOOffset offset() const
Definition: IOPosBuffer.h:41
void addConnections(cms::Exception &) const
size_t IOSize
Definition: IOTypes.h:15
XrdCl::OpenFlags::Flags m_flags
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
XrdCl::Access::Mode m_perms
static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD
static IOSize validateList(const std::vector< IOPosBuffer > req)
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)

◆ updateCurrentServer()

void RequestManager::updateCurrentServer ( )
inlineprivate

Update the StatisticsSenderService, if necessary, with the current server.

Update the StatisticsSenderService with the current server info.

As this accesses the edm::Service infrastructure, this MUST be called from an edm-managed thread. It CANNOT be called from an Xrootd-managed thread.

Definition at line 248 of file XrdRequestManager.cc.

References edm::Service< T >::isAvailable(), LIKELY, m_name, m_serverToAdvertise, edm::storage::StatisticsSenderService::setCurrentServer(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize().

248  {
249  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
250  // a pending update. *However*, since we call this for every read, we'll get it
251  // eventually.
252  if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
253  return;
254  }
255  std::string *hostname_ptr;
256  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
257  std::unique_ptr<std::string> hostname(hostname_ptr);
259  if (statsService.isAvailable()) {
260  statsService->setCurrentServer(m_name, *hostname_ptr);
261  }
262  }
263 }
#define LIKELY(x)
Definition: Likely.h:20
std::atomic< std::string * > m_serverToAdvertise
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
bool isAvailable() const
Definition: Service.h:40

Member Data Documentation

◆ m_activeSources

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 212 of file XrdRequestManager.h.

Referenced by getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), handle(), initialize(), pickSingleSource(), prepareOpaqueString(), and requestFailure().

◆ m_disabledExcludeStrings

oneapi::tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 216 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

◆ m_disabledSources

oneapi::tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 217 of file XrdRequestManager.h.

Referenced by requestFailure().

◆ m_disabledSourceStrings

oneapi::tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 215 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

◆ m_distribution

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 238 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

◆ m_excluded_active_count

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 240 of file XrdRequestManager.h.

◆ m_flags

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private

◆ m_generator

std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 237 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

◆ m_inactiveSources

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 213 of file XrdRequestManager.h.

Referenced by handle(), and prepareOpaqueString().

◆ m_lastSourceCheck

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 223 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

◆ m_name

const std::string XrdAdaptor::RequestManager::m_name
private

◆ m_nextActiveSourceCheck

timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 228 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), and initialize().

◆ m_nextInitialSourceToggle

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 226 of file XrdRequestManager.h.

Referenced by pickSingleSource().

◆ m_open_handler

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 299 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

◆ m_perms

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private

◆ m_redirectLimitDelayScale

int XrdAdaptor::RequestManager::m_redirectLimitDelayScale
private

Definition at line 229 of file XrdRequestManager.h.

◆ m_serverToAdvertise

std::atomic<std::string *> XrdAdaptor::RequestManager::m_serverToAdvertise
private

Definition at line 221 of file XrdRequestManager.h.

Referenced by queueUpdateCurrentServer(), and updateCurrentServer().

◆ m_source_mutex

std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
mutableprivate

◆ m_timeout

int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 224 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

◆ searchMode

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 230 of file XrdRequestManager.h.

◆ XRD_DEFAULT_TIMEOUT

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3 * 60
static

Definition at line 53 of file XrdRequestManager.h.