CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Classes

class  OpenHandler
 

Public Member Functions

void addConnections (cms::Exception &) const
 
std::shared_ptr< XrdCl::File > getActiveFile () const
 
void getActiveSourceNames (std::vector< std::string > &sources) const
 
void getDisabledSourceNames (std::vector< std::string > &sources) const
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources) const
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer >> iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
RequestManageroperator= (const RequestManager &)=delete
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
 RequestManager (const RequestManager &)=delete
 
virtual ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr< RequestManagergetInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
void checkSourcesImpl (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
bool compareSources (const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString () const
 
void queueUpdateCurrentServer (const std::string &)
 
void reportSiteChange (std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
 
void updateCurrentServer ()
 

Private Attributes

std::vector< std::shared_ptr< Source > > m_activeSources
 
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
 
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHashm_disabledSources
 
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
 
std::uniform_real_distribution< float > m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr< Source > > m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
std::atomic< std::string * > m_serverToAdvertise
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 44 of file XrdRequestManager.h.

Constructor & Destructor Documentation

◆ RequestManager() [1/2]

XrdAdaptor::RequestManager::RequestManager ( const RequestManager )
delete

Referenced by getInstance().

◆ ~RequestManager()

virtual XrdAdaptor::RequestManager::~RequestManager ( )
virtualdefault

◆ RequestManager() [2/2]

RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 115 of file XrdRequestManager.cc.

116  : m_serverToAdvertise(nullptr),
119  m_name(filename),
120  m_flags(flags),
121  m_perms(perms),
122  m_distribution(0, 100),

Member Function Documentation

◆ addConnections()

void RequestManager::addConnections ( cms::Exception ex) const

Add the list of active connections to the exception extra info.

Definition at line 488 of file XrdRequestManager.cc.

488  {
489  std::vector<std::string> sources;
491  for (auto const &source : sources) {
492  ex.addAdditionalInfo("Active source: " + source);
493  }
494  sources.clear();
496  for (auto const &source : sources) {
497  ex.addAdditionalInfo("Disabled source: " + source);
498  }
499 }

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), source, and CalibrationSummaryClient_cfi::sources.

Referenced by getActiveFile(), initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), pickSingleSource(), and requestFailure().

◆ broadcastRequest()

void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

◆ checkSources()

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 300 of file XrdRequestManager.cc.

303  {
304  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
305  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
306  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
307  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
308  { // Be more aggressive about getting rid of very bad sources.
309  compareSources(now, 0, 1, activeSources, inactiveSources);
310  compareSources(now, 1, 0, activeSources, inactiveSources);
311  }
313  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
314  }
315  }
316 }

References CalibrationSummaryClient_cfi::activeSources, checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, submitPVValidationJobs::now, and timeDiffMS().

Referenced by handle().

◆ checkSourcesImpl()

void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Definition at line 346 of file XrdRequestManager.cc.

349  {
350  bool findNewSource = false;
351  if (activeSources.size() <= 1) {
352  findNewSource = true;
353  } else if (activeSources.size() > 1) {
354  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
355  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
356  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
357  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
358 
359  // NOTE: We could probably replace the copy with a better sort function.
360  // However, there are typically very few sources and the correctness is more obvious right now.
361  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
362  eligibleInactiveSources.reserve(inactiveSources.size());
363  for (const auto &source : inactiveSources) {
364  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
365  eligibleInactiveSources.push_back(source);
366  }
367  }
368  auto bestInactiveSource =
369  std::min_element(eligibleInactiveSources.begin(),
370  eligibleInactiveSources.end(),
371  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
372  return s1->getQuality() < s2->getQuality();
373  });
374  auto worstActiveSource = std::max_element(activeSources.cbegin(),
375  activeSources.cend(),
376  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
377  return s1->getQuality() < s2->getQuality();
378  });
379  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
380  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
381  << ", quality " << (*bestInactiveSource)->getQuality();
382  }
383  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
384  << ", quality " << (*worstActiveSource)->getQuality();
385  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
386  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
387  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
388  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
389  auto oldSources = activeSources;
390  activeSources.push_back(*bestInactiveSource);
391  reportSiteChange(oldSources, activeSources);
392  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
393  if (it->get() == bestInactiveSource->get()) {
394  inactiveSources.erase(it);
395  break;
396  }
397  } else
398  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
399  (*worstActiveSource)->getQuality() >
400  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
401  edm::LogVerbatim("XrdAdaptorInternal")
402  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
403  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
404  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
405  (*worstActiveSource)->setLastDowngrade(now);
406  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
407  if (it->get() == bestInactiveSource->get()) {
408  inactiveSources.erase(it);
409  break;
410  }
411  inactiveSources.emplace_back(*worstActiveSource);
412  auto oldSources = activeSources;
413  activeSources.erase(worstActiveSource);
414  activeSources.emplace_back(std::move(*bestInactiveSource));
415  reportSiteChange(oldSources, activeSources);
416  eligibleInactiveSources.clear();
417  for (const auto &source : inactiveSources)
418  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
419  eligibleInactiveSources.push_back(source);
420  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
421  eligibleInactiveSources.end(),
422  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
423  return s1->getQuality() < s2->getQuality();
424  });
425  worstActiveSource = std::max_element(activeSources.begin(),
426  activeSources.end(),
427  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
428  return s1->getQuality() < s2->getQuality();
429  });
430  }
431  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
432  float r = m_distribution(m_generator);
434  findNewSource = true;
435  }
436  }
437  }
438  if (findNewSource) {
439  m_open_handler->open();
441  }
442 
443  // Only aggressively look for new sources if we don't have two.
444  if (activeSources.size() == 2) {
446  } else {
448  }
450 }

References CalibrationSummaryClient_cfi::activeSources, compareSources(), m_distribution, m_generator, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, eostools::move(), submitPVValidationJobs::now, alignCSCRings::r, reportSiteChange(), source, timeDiffMS(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

◆ compareSources()

bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
) const
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 318 of file XrdRequestManager.cc.

322  {
323  if (activeSources.size() < std::max(a, b) + 1) {
324  return false;
325  }
326 
327  bool findNewSource = false;
328  if ((activeSources[a]->getQuality() > 5130) ||
329  ((activeSources[a]->getQuality() > 260) &&
330  (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
331  edm::LogVerbatim("XrdAdaptorInternal")
332  << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
333  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
334  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
335  findNewSource = true;
336  }
337  activeSources[a]->setLastDowngrade(now);
338  inactiveSources.emplace_back(activeSources[a]);
339  auto oldSources = activeSources;
340  activeSources.erase(activeSources.begin() + a);
341  reportSiteChange(oldSources, activeSources);
342  }
343  return findNewSource;
344 }

References a, CalibrationSummaryClient_cfi::activeSources, b, SiStripPI::max, submitPVValidationJobs::now, and reportSiteChange().

Referenced by checkSources(), and checkSourcesImpl().

◆ getActiveFile()

std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  ) const

Return a pointer to an active file. Useful for metadata operations.

Definition at line 452 of file XrdRequestManager.cc.

452  {
453  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
454  if (m_activeSources.empty()) {
456  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
457  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
458  ex.addContext("In XrdAdaptor::RequestManager::handle()");
459  addConnections(ex);
460  throw ex;
461  }
462  return m_activeSources[0]->getFileHandle();
463 }

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_perms, and m_source_mutex.

◆ getActiveSourceNames()

void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 465 of file XrdRequestManager.cc.

465  {
466  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
467  sources.reserve(m_activeSources.size());
468  for (auto const &source : m_activeSources) {
469  sources.push_back(source->ID());
470  }
471 }

References m_activeSources, m_source_mutex, source, and CalibrationSummaryClient_cfi::sources.

◆ getDisabledSourceNames()

void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 481 of file XrdRequestManager.cc.

481  {
482  sources.reserve(m_disabledSourceStrings.size());
483  for (auto const &source : m_disabledSourceStrings) {
484  sources.push_back(source);
485  }
486 }

References m_disabledSourceStrings, source, and CalibrationSummaryClient_cfi::sources.

Referenced by addConnections().

◆ getFilename()

const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 103 of file XrdRequestManager.h.

103 { return m_name; }

References m_name.

◆ getInstance()

static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 112 of file XrdRequestManager.h.

114  {
115  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
116  instance->initialize(instance);
117  return instance;
118  }

References corrVsCorr::filename, HLT_FULL_cff::flags, instance, and RequestManager().

◆ getPrettyActiveSourceNames()

void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources) const

Definition at line 473 of file XrdRequestManager.cc.

473  {
474  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
475  sources.reserve(m_activeSources.size());
476  for (auto const &source : m_activeSources) {
477  sources.push_back(source->PrettyID());
478  }
479 }

References m_activeSources, m_source_mutex, source, and CalibrationSummaryClient_cfi::sources.

Referenced by addConnections().

◆ handle() [1/3]

std::future< IOSize > XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer >>  iolist)

Definition at line 619 of file XrdRequestManager.cc.

619  {
620  //Use a copy of m_activeSources and m_inactiveSources throughout this function
621  // in order to avoid holding the lock a long time and causing a deadlock.
622  // When the function is over we will update the values of the containers
623  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
624  {
625  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
627  inactiveSources = m_inactiveSources;
628  }
629  //Make sure we update changes when we leave the function
630  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
631  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
633  m_inactiveSources = std::move(inactiveSources);
634  });
635 
637 
638  timespec now;
640 
641  edm::CPUTimer timer;
642  timer.start();
643 
644  if (activeSources.size() == 1) {
645  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
646  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
647  activeSources[0]->handle(c_ptr);
648  return c_ptr->get_future();
649  }
650  // Make sure active
651  else if (activeSources.empty()) {
653  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
654  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
655  ex.addContext("In XrdAdaptor::RequestManager::handle()");
656  addConnections(ex);
657  throw ex;
658  }
659 
660  assert(iolist.get());
661  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
662  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
663  splitClientRequest(*iolist, *req1, *req2, activeSources);
664 
665  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
666  // CheckSources may have removed a source
667  if (activeSources.size() == 1) {
668  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
669  activeSources[0]->handle(c_ptr);
670  return c_ptr->get_future();
671  }
672 
673  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
674  std::future<IOSize> future1, future2;
675  if (!req1->empty()) {
676  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
677  activeSources[0]->handle(c_ptr1);
678  future1 = c_ptr1->get_future();
679  }
680  if (!req2->empty()) {
681  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
682  activeSources[1]->handle(c_ptr2);
683  future2 = c_ptr2->get_future();
684  }
685  if (!req1->empty() && !req2->empty()) {
686  std::future<IOSize> task = std::async(
687  std::launch::deferred,
688  [](std::future<IOSize> a, std::future<IOSize> b) {
689  // Wait until *both* results are available. This is essential
690  // as the callback may try referencing the RequestManager. If one
691  // throws an exception (causing the RequestManager to be destroyed by
692  // XrdFile) and the other has a failure, then the recovery code will
693  // reference the destroyed RequestManager.
694  //
695  // Unlike other places where we use shared/weak ptrs to maintain object
696  // lifetime and destruction asynchronously, we *cannot* destroy the request
697  // asynchronously as it is associated with a ROOT buffer. We must wait until we
698  // are guaranteed that XrdCl will not write into the ROOT buffer before we
699  // can return.
700  b.wait();
701  a.wait();
702  return b.get() + a.get();
703  },
704  std::move(future1),
705  std::move(future2));
706  timer.stop();
707  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
708  return task;
709  } else if (!req1->empty()) {
710  return future1;
711  } else if (!req2->empty()) {
712  return future2;
713  } else { // Degenerate case - no bytes to read.
714  std::promise<IOSize> p;
715  p.set_value(0);
716  return p.get_future();
717  }
718 }

References a, CalibrationSummaryClient_cfi::activeSources, cms::Exception::addContext(), cms::cuda::assert(), b, TauDecayModes::dec, edm::errors::FileReadError, GET_CLOCK_MONOTONIC, eostools::move(), submitPVValidationJobs::now, AlCaHLTBitMon_ParallelJobs::p, edm::CPUTimer::start(), edm::CPUTimer::stop(), and TrackValidation_cff::task.

◆ handle() [2/3]

std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 527 of file XrdRequestManager.cc.

527  {
528  assert(c_ptr.get());
529  timespec now;
531  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
532  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
533  {
534  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
536  inactiveSources = m_inactiveSources;
537  }
538  {
539  //make sure we update values before calling pickSingelSource
540  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
541  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
543  m_inactiveSources = std::move(inactiveSources);
544  });
545 
546  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
547  }
548 
549  std::shared_ptr<Source> source = pickSingleSource();
550  source->handle(c_ptr);
551  return c_ptr->get_future();
552 }

References CalibrationSummaryClient_cfi::activeSources, cms::cuda::assert(), checkSources(), GET_CLOCK_MONOTONIC, m_activeSources, m_inactiveSources, m_source_mutex, eostools::move(), submitPVValidationJobs::now, pickSingleSource(), and source.

◆ handle() [3/3]

std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 56 of file XrdRequestManager.h.

56  {
57  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
58  return handle(c_ptr);
59  }

References findQualityFiles::size.

◆ handleOpen()

void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 581 of file XrdRequestManager.cc.

581  {
582  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
583  if (status.IsOK()) {
584  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
585  for (const auto &s : m_activeSources) {
586  if (source->ID() == s->ID()) {
587  edm::LogVerbatim("XrdAdaptorInternal")
588  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
589  unsigned returned_count = ++m_excluded_active_count;
591  if (returned_count >= 3) {
593  }
594  return;
595  }
596  }
597  for (const auto &s : m_inactiveSources) {
598  if (source->ID() == s->ID()) {
599  edm::LogVerbatim("XrdAdaptorInternal")
600  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
602  return;
603  }
604  }
605  if (m_activeSources.size() < 2) {
606  auto oldSources = m_activeSources;
607  m_activeSources.push_back(source);
608  reportSiteChange(oldSources, m_activeSources);
610  } else {
611  m_inactiveSources.push_back(source);
612  }
613  } else { // File-open failure - wait at least 120s before next attempt.
614  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
616  }
617 }

References alignCSCRings::s, source, mps_update::status, XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

◆ initialize()

void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 125 of file XrdRequestManager.cc.

125  {
127 
128  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
129  if (env) {
130  env->GetInt("StreamErrorWindow", m_timeout);
131  }
132 
133  std::string orig_site;
134  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find('.') == std::string::npos)) {
135  std::string hostname;
136  if (Source::getHostname(orig_site, hostname)) {
137  Source::getDomain(hostname, orig_site);
138  }
139  }
140 
141  std::unique_ptr<XrdCl::File> file;
143  bool validFile = false;
144  const int retries = 5;
145  std::string excludeString;
146  for (int idx = 0; idx < retries; idx++) {
147  file = std::make_unique<XrdCl::File>();
148  auto opaque = prepareOpaqueString();
149  std::string new_filename =
150  m_name + (!opaque.empty() ? ((m_name.find('?') == m_name.npos) ? "?" : "&") + opaque : "");
151  SyncHostResponseHandler handler;
152  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
153  if (!openStatus
154  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
155  // server and it was marked bad - xrootd couldn't even queue up the request internally!
156  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
157  // redirector being down or authentication failures.
158  ex.clearMessage();
159  ex.clearContext();
160  ex.clearAdditionalInfo();
161  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
162  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
163  << ", code=" << openStatus.code << ")";
164  ex.addContext("Calling XrdFile::open()");
165  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
166  throw ex;
167  }
168  handler.WaitForResponse();
169  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
170  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
171  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
172  assert(status);
173  if (status->IsOK()) {
174  validFile = true;
175  break;
176  } else {
177  ex.clearMessage();
178  ex.clearContext();
179  ex.clearAdditionalInfo();
180  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
181  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
182  << ", code=" << status->code << ")";
183  ex.addContext("Calling XrdFile::open()");
184  addConnections(ex);
185  std::string dataServer, lastUrl;
186  file->GetProperty("DataServer", dataServer);
187  file->GetProperty("LastURL", lastUrl);
188  if (!dataServer.empty()) {
189  ex.addAdditionalInfo("Problematic data server: " + dataServer);
190  }
191  if (!lastUrl.empty()) {
192  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
193  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
194  }
195  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
196  m_disabledSourceStrings.end()) {
197  ex << ". No additional data servers were found.";
198  throw ex;
199  }
200  if (!dataServer.empty()) {
201  m_disabledSourceStrings.insert(dataServer);
202  m_disabledExcludeStrings.insert(excludeString);
203  }
204  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
205  if (lastUrl == new_filename) {
206  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
207  throw ex;
208  }
209  }
210  }
211  if (!validFile) {
212  throw ex;
213  }
215 
216  timespec ts;
218 
219  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
220  {
221  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
222  auto oldList = m_activeSources;
223  m_activeSources.push_back(source);
225  }
228 
229  m_lastSourceCheck = ts;
230  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
232 }

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), cms::cuda::assert(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, geometryDiff::file, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), heavyIonCSV_trainingSettings::idx, m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, eostools::move(), pileupDistInMC::oldList, prepareOpaqueString(), queueUpdateCurrentServer(), reportSiteChange(), SendMonitoringInfo(), source, mps_update::status, AlCaHLTBitMon_QueryRunRegistry::string, updateCurrentServer(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

◆ operator=()

RequestManager& XrdAdaptor::RequestManager::operator= ( const RequestManager )
delete

◆ pickSingleSource()

std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 501 of file XrdRequestManager.cc.

501  {
502  std::shared_ptr<Source> source = nullptr;
503  {
504  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
505  if (m_activeSources.size() == 2) {
507  source = m_activeSources[0];
509  } else {
510  source = m_activeSources[1];
512  }
513  } else if (m_activeSources.empty()) {
515  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
516  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
517  ex.addContext("In XrdAdaptor::RequestManager::handle()");
518  addConnections(ex);
519  throw ex;
520  } else {
521  source = m_activeSources[0];
522  }
523  }
524  return source;
525 }

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_nextInitialSourceToggle, m_perms, m_source_mutex, and source.

Referenced by handle().

◆ prepareOpaqueString()

std::string RequestManager::prepareOpaqueString ( ) const
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 554 of file XrdRequestManager.cc.

554  {
555  std::stringstream ss;
556  ss << "tried=";
557  size_t count = 0;
558  {
559  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
560 
561  for (const auto &it : m_activeSources) {
562  count++;
563  ss << it->ExcludeID().substr(0, it->ExcludeID().find(':')) << ",";
564  }
565  for (const auto &it : m_inactiveSources) {
566  count++;
567  ss << it->ExcludeID().substr(0, it->ExcludeID().find(':')) << ",";
568  }
569  }
570  for (const auto &it : m_disabledExcludeStrings) {
571  count++;
572  ss << it.substr(0, it.find(':')) << ",";
573  }
574  if (count) {
575  std::string tmp_str = ss.str();
576  return tmp_str.substr(0, tmp_str.size() - 1);
577  }
578  return "";
579 }

References submitPVResolutionJobs::count, m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, contentValuesCheck::ss, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

◆ queueUpdateCurrentServer()

void RequestManager::queueUpdateCurrentServer ( const std::string &  id)
private

Definition at line 258 of file XrdRequestManager.cc.

258  {
259  auto hostname = std::make_unique<std::string>(id);
260  if (Source::getHostname(id, *hostname)) {
261  std::string *null_hostname = nullptr;
262  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
263  hostname.release();
264  }
265  }
266 }

References XrdAdaptor::Source::getHostname(), m_serverToAdvertise, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize().

◆ reportSiteChange()

void RequestManager::reportSiteChange ( std::vector< std::shared_ptr< Source >> const &  iOld,
std::vector< std::shared_ptr< Source >> const &  iNew,
std::string  orig_site = std::string{} 
) const
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 285 of file XrdRequestManager.cc.

287  {
288  auto siteList = formatSites(iNew);
289  if (!orig_site.empty() && (orig_site != siteList)) {
290  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
291  } else {
292  auto oldSites = formatSites(iOld);
293  if (orig_site.empty() && (siteList != oldSites)) {
294  if (!oldSites.empty())
295  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
296  }
297  }
298 }

Referenced by checkSourcesImpl(), compareSources(), initialize(), and requestFailure().

◆ requestFailure()

void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 720 of file XrdRequestManager.cc.

720  {
721  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
722 
723  // Fail early for invalid responses - XrdFile has a separate path for handling this.
724  if (c_status.code == XrdCl::errInvalidResponse) {
725  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
727  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
728  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
729  << ") => Invalid ReadV response from server";
730  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
731  addConnections(ex);
732  throw ex;
733  }
734  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
735 
736  // Note that we do not delete the Source itself. That is because this
737  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
738  // In such a case, if you close a file in the handler, it will deadlock
739  m_disabledSourceStrings.insert(source_ptr->ID());
740  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
741  m_disabledSources.insert(source_ptr);
742 
743  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
744  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
745  auto oldSources = m_activeSources;
746  m_activeSources.erase(m_activeSources.begin());
747  reportSiteChange(oldSources, m_activeSources);
748  } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
749  auto oldSources = m_activeSources;
750  m_activeSources.erase(m_activeSources.begin() + 1);
751  reportSiteChange(oldSources, m_activeSources);
752  }
753  std::shared_ptr<Source> new_source;
754  if (m_activeSources.empty()) {
755  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
756  timespec now;
759  // Note we only wait for 180 seconds here. This is because we've already failed
760  // once and the likelihood the program has some inconsistent state is decent.
761  // We'd much rather fail hard than deadlock!
762  sentry.unlock();
763  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
766  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
767  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
768  << ") => timeout when waiting for file open";
769  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
770  addConnections(ex);
771  throw ex;
772  } else {
773  try {
774  new_source = future.get();
775  } catch (edm::Exception &ex) {
776  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
777  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
778  throw;
779  }
780  }
781 
782  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
783  m_disabledSourceStrings.end()) {
784  // The server gave us back a data node we requested excluded. Fatal!
786  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
787  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
788  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
789  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
790  addConnections(ex);
791  throw ex;
792  }
793  sentry.lock();
794 
795  auto oldSources = m_activeSources;
796  m_activeSources.push_back(new_source);
797  reportSiteChange(oldSources, m_activeSources);
798  } else {
799  new_source = m_activeSources[0];
800  }
801  new_source->handle(c_ptr);
802 }

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), get, GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, submitPVValidationJobs::now, reportSiteChange(), seconds(), mps_update::status, and mps_check::timeout.

◆ splitClientRequest()

void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2,
std::vector< std::shared_ptr< Source >> const &  activeSources 
) const
private

Given a client request, split it into two requests lists.

Definition at line 896 of file XrdRequestManager.cc.

899  {
900  if (iolist.empty())
901  return;
902  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
903  req1.reserve(iolist.size() / 2 + 1);
904  req2.reserve(iolist.size() / 2 + 1);
905  size_t front = 0;
906 
907  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
908  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
909  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
910  IOSize chunk1, chunk2;
911  // Make sure the chunk size is at least 1024; little point to reads less than that size.
912  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
913  static_cast<IOSize>(1024));
914  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
915  static_cast<IOSize>(1024));
916 
917  IOSize size_orig = 0;
918  for (const auto &it : iolist)
919  size_orig += it.size();
920 
921  while (tmp_iolist.size() - front > 0) {
922  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
923  (req2.size() >=
924  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
925  // are passed to the request manager. However, because we have a max chunk size, we increase
926  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
927  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
928  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
930  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
931  << ", permissions=0" << std::oct << m_perms << std::dec
932  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
933  "reported to CMSSW developers.";
934  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
935  addConnections(ex);
936  std::stringstream ss;
937  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
938  ex.addAdditionalInfo(ss.str());
939  std::stringstream ss2;
940  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
941  ex.addAdditionalInfo(ss2.str());
942  throw ex;
943  }
944  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
945  consumeChunkFront(front, tmp_iolist, req1, chunk1);
946  }
947  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
948  consumeChunkBack(front, tmp_iolist, req2, chunk2);
949  }
950  }
951  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
952  return left.offset() < right.offset();
953  });
954  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
955  return left.offset() < right.offset();
956  });
957 
958  IOSize size1 = validateList(req1);
959  IOSize size2 = validateList(req2);
960 
961  assert(size_orig == size1 + size2);
962 
963  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
964  << " bytes) split into requests size " << req1.size() << " (" << size1
965  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
966 }

References CalibrationSummaryClient_cfi::activeSources, cms::Exception::addAdditionalInfo(), cms::Exception::addContext(), cms::cuda::assert(), consumeChunkBack(), consumeChunkFront(), TauDecayModes::dec, edm::errors::FileReadError, SiStripPI::max, IOPosBuffer::offset(), q1, q2, jetUpdater_cfi::sort, contentValuesCheck::ss, validateList(), XRD_ADAPTOR_CHUNK_THRESHOLD, and XRD_CL_MAX_CHUNK.

◆ updateCurrentServer()

void RequestManager::updateCurrentServer ( )
inlineprivate

Update the StatisticsSenderService, if necessary, with the current server.

Update the StatisticsSenderService with the current server info.

As this accesses the edm::Service infrastructure, this MUST be called from an edm-managed thread. It CANNOT be called from an Xrootd-managed thread.

Definition at line 241 of file XrdRequestManager.cc.

241  {
242  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
243  // a pending update. *However*, since we call this for every read, we'll get it
244  // eventually.
245  if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
246  return;
247  }
248  std::string *hostname_ptr;
249  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
250  std::unique_ptr<std::string> hostname(hostname_ptr);
252  if (statsService.isAvailable()) {
253  statsService->setCurrentServer(*hostname_ptr);
254  }
255  }
256 }

References edm::Service< T >::isAvailable(), LIKELY, m_serverToAdvertise, edm::storage::StatisticsSenderService::setCurrentServer(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize().

Member Data Documentation

◆ m_activeSources

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 208 of file XrdRequestManager.h.

Referenced by getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), handle(), initialize(), pickSingleSource(), prepareOpaqueString(), and requestFailure().

◆ m_disabledExcludeStrings

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 212 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

◆ m_disabledSources

tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 213 of file XrdRequestManager.h.

Referenced by requestFailure().

◆ m_disabledSourceStrings

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 211 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

◆ m_distribution

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 233 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

◆ m_excluded_active_count

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 235 of file XrdRequestManager.h.

◆ m_flags

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private

◆ m_generator

std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 232 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

◆ m_inactiveSources

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 209 of file XrdRequestManager.h.

Referenced by handle(), and prepareOpaqueString().

◆ m_lastSourceCheck

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 219 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

◆ m_name

const std::string XrdAdaptor::RequestManager::m_name
private

◆ m_nextActiveSourceCheck

timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 224 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), and initialize().

◆ m_nextInitialSourceToggle

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 222 of file XrdRequestManager.h.

Referenced by pickSingleSource().

◆ m_open_handler

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 294 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

◆ m_perms

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private

◆ m_serverToAdvertise

std::atomic<std::string *> XrdAdaptor::RequestManager::m_serverToAdvertise
private

Definition at line 217 of file XrdRequestManager.h.

Referenced by queueUpdateCurrentServer(), and updateCurrentServer().

◆ m_source_mutex

std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
mutableprivate

◆ m_timeout

int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 220 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

◆ searchMode

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 225 of file XrdRequestManager.h.

◆ XRD_DEFAULT_TIMEOUT

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3 * 60
static

Definition at line 49 of file XrdRequestManager.h.

XRD_ADAPTOR_SHORT_OPEN_DELAY
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
Definition: XrdRequestManager.cc:27
XrdAdaptor::RequestManager::m_flags
XrdCl::OpenFlags::Flags m_flags
Definition: XrdRequestManager.h:228
XrdAdaptor::Source::determineHostExcludeString
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:288
XrdAdaptor::RequestManager::queueUpdateCurrentServer
void queueUpdateCurrentServer(const std::string &)
Definition: XrdRequestManager.cc:258
XrdAdaptor::RequestManager::m_generator
std::mt19937 m_generator
Definition: XrdRequestManager.h:232
XrdAdaptor::RequestManager::m_nextActiveSourceCheck
timespec m_nextActiveSourceCheck
Definition: XrdRequestManager.h:224
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
XrdAdaptor::RequestManager::addConnections
void addConnections(cms::Exception &) const
Definition: XrdRequestManager.cc:488
XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
Definition: XrdRequestManager.cc:37
XrdAdaptor::RequestManager::m_disabledExcludeStrings
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
Definition: XrdRequestManager.h:212
submitPVValidationJobs.now
now
Definition: submitPVValidationJobs.py:639
mps_update.status
status
Definition: mps_update.py:68
XrdAdaptor::RequestManager::m_inactiveSources
std::vector< std::shared_ptr< Source > > m_inactiveSources
Definition: XrdRequestManager.h:209
XrdAdaptor::RequestManager::m_open_handler
std::shared_ptr< OpenHandler > m_open_handler
Definition: XrdRequestManager.h:294
XrdAdaptor::RequestManager::m_excluded_active_count
std::atomic< unsigned > m_excluded_active_count
Definition: XrdRequestManager.h:235
XrdAdaptor::RequestManager::getDisabledSourceNames
void getDisabledSourceNames(std::vector< std::string > &sources) const
Definition: XrdRequestManager.cc:481
edm::CPUTimer
Definition: CPUTimer.h:37
XrdAdaptor::RequestManager::m_activeSources
std::vector< std::shared_ptr< Source > > m_activeSources
Definition: XrdRequestManager.h:208
XRD_ADAPTOR_CHUNK_THRESHOLD
#define XRD_ADAPTOR_CHUNK_THRESHOLD
Definition: XrdRequestManager.cc:40
XrdAdaptor::RequestManager::m_perms
XrdCl::Access::Mode m_perms
Definition: XrdRequestManager.h:229
pileupDistInMC.oldList
oldList
Definition: pileupDistInMC.py:38
cms::cuda::assert
assert(be >=bs)
CalibrationSummaryClient_cfi.sources
sources
Definition: CalibrationSummaryClient_cfi.py:23
XrdAdaptor::RequestManager::m_disabledSources
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
Definition: XrdRequestManager.h:213
XrdAdaptor::RequestManager::m_serverToAdvertise
std::atomic< std::string * > m_serverToAdvertise
Definition: XrdRequestManager.h:217
spr::find
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
edm::Service::isAvailable
bool isAvailable() const
Definition: Service.h:40
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
XrdAdaptor::RequestManager::updateCurrentServer
void updateCurrentServer()
Definition: XrdRequestManager.cc:241
edm::errors::FileOpenError
Definition: EDMException.h:49
GET_CLOCK_MONOTONIC
#define GET_CLOCK_MONOTONIC(ts)
Definition: XrdRequestManager.cc:56
heavyIonCSV_trainingSettings.idx
idx
Definition: heavyIonCSV_trainingSettings.py:5
XrdAdaptor::RequestManager::RequestManager
RequestManager(const RequestManager &)=delete
edm::Exception
Definition: EDMException.h:77
contentValuesCheck.ss
ss
Definition: contentValuesCheck.py:33
consumeChunkBack
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
Definition: XrdRequestManager.cc:843
XrdAdaptor::RequestManager::m_lastSourceCheck
timespec m_lastSourceCheck
Definition: XrdRequestManager.h:219
XrdAdaptor::RequestManager::getPrettyActiveSourceNames
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
Definition: XrdRequestManager.cc:473
alignCSCRings.s
s
Definition: alignCSCRings.py:92
XrdAdaptor::RequestManager::pickSingleSource
std::shared_ptr< Source > pickSingleSource()
Definition: XrdRequestManager.cc:501
XrdAdaptor::RequestManager::compareSources
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
Definition: XrdRequestManager.cc:318
XrdAdaptor::RequestManager::reportSiteChange
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
Definition: XrdRequestManager.cc:285
seconds
double seconds()
TrackValidation_cff.task
task
Definition: TrackValidation_cff.py:253
source
static const std::string source
Definition: EdmProvDump.cc:47
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
submitPVResolutionJobs.count
count
Definition: submitPVResolutionJobs.py:352
XrdAdaptor::Source::getDomain
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:246
XRD_ADAPTOR_LONG_OPEN_DELAY
#define XRD_ADAPTOR_LONG_OPEN_DELAY
Definition: XrdRequestManager.cc:36
edm::CPUTimer::start
void start()
Definition: CPUTimer.cc:68
b
double b
Definition: hdecay.h:118
q2
double q2[4]
Definition: TauolaWrapper.h:88
validateList
static IOSize validateList(const std::vector< IOPosBuffer > req)
Definition: XrdRequestManager.cc:882
XRD_CL_MAX_CHUNK
#define XRD_CL_MAX_CHUNK
Definition: XrdRequestManager.cc:25
XrdAdaptor::RequestManager::m_name
const std::string m_name
Definition: XrdRequestManager.h:227
web.browse_db.env
env
Definition: browse_db.py:18
geometryDiff.file
file
Definition: geometryDiff.py:13
XrdAdaptor::RequestManager::m_nextInitialSourceToggle
bool m_nextInitialSourceToggle
Definition: XrdRequestManager.h:222
q1
double q1[4]
Definition: TauolaWrapper.h:87
a
double a
Definition: hdecay.h:119
AlCaHLTBitMon_ParallelJobs.p
def p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
XrdAdaptor::RequestManager::splitClientRequest
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
Definition: XrdRequestManager.cc:896
SiStripPI::max
Definition: SiStripPayloadInspectorHelper.h:169
cms::Exception::addAdditionalInfo
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
XrdAdaptor::RequestManager::handle
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
Definition: XrdRequestManager.h:56
mps_check.timeout
int timeout
Definition: mps_check.py:53
XrdAdaptor::Source::getXrootdSiteFromURL
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:326
jetUpdater_cfi.sort
sort
Definition: jetUpdater_cfi.py:29
XrdAdaptor::RequestManager::m_distribution
std::uniform_real_distribution< float > m_distribution
Definition: XrdRequestManager.h:233
edm::Service
Definition: Service.h:30
XrdAdaptor::RequestManager::m_source_mutex
std::recursive_mutex m_source_mutex
Definition: XrdRequestManager.h:230
CalibrationSummaryClient_cfi.activeSources
activeSources
Definition: CalibrationSummaryClient_cfi.py:11
edm::CPUTimer::stop
Times stop()
Definition: CPUTimer.cc:87
XrdAdaptor::RequestManager::checkSources
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
Definition: XrdRequestManager.cc:300
get
#define get
XRD_ADAPTOR_OPEN_PROBE_PERCENT
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
Definition: XrdRequestManager.cc:35
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
instance
static PFTauRenderPlugin instance
Definition: PFTauRenderPlugin.cc:70
alignCSCRings.r
r
Definition: alignCSCRings.py:93
eostools.move
def move(src, dest)
Definition: eostools.py:511
XrdAdaptor::RequestManager::OpenHandler::getInstance
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
Definition: XrdRequestManager.h:242
IOPosBuffer::offset
IOOffset offset(void) const
Definition: IOPosBuffer.h:39
edm::LogVerbatim
Log< level::Info, true > LogVerbatim
Definition: MessageLogger.h:128
LIKELY
#define LIKELY(x)
Definition: Likely.h:20
SendMonitoringInfo
static void SendMonitoringInfo(XrdCl::File &file)
Definition: XrdRequestManager.cc:92
XrdAdaptor::RequestManager::prepareOpaqueString
std::string prepareOpaqueString() const
Definition: XrdRequestManager.cc:554
IOPosBuffer
Definition: IOPosBuffer.h:7
consumeChunkFront
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
Definition: XrdRequestManager.cc:804
XrdAdaptor::XrootdException
Definition: XrdRequestManager.h:31
XrdAdaptor::RequestManager::m_disabledSourceStrings
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
Definition: XrdRequestManager.h:211
XrdAdaptor::RequestManager::m_timeout
int m_timeout
Definition: XrdRequestManager.h:220
XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT
static const unsigned int XRD_DEFAULT_TIMEOUT
Definition: XrdRequestManager.h:49
IOSize
size_t IOSize
Definition: IOTypes.h:14
HLT_FULL_cff.flags
flags
Definition: HLT_FULL_cff.py:13168
edm::Log
Definition: MessageLogger.h:70
edm::errors::FileReadError
Definition: EDMException.h:50
XrdAdaptor::Source::getHostname
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:219
TauDecayModes.dec
dec
Definition: TauDecayModes.py:142
edm::storage::StatisticsSenderService::setCurrentServer
void setCurrentServer(const std::string &servername)
Definition: StatisticsSenderService.cc:151
XrdAdaptor::RequestManager::checkSourcesImpl
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
Definition: XrdRequestManager.cc:346
timeDiffMS
long long timeDiffMS(const timespec &a, const timespec &b)
Definition: XrdRequestManager.cc:61
XrdAdaptor::ClientRequest
Definition: XrdRequest.h:22
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443