CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Inheritance diagram for XrdAdaptor::RequestManager:

Classes

class  OpenHandler
 

Public Member Functions

void addConnections (cms::Exception &) const
 
std::shared_ptr< XrdCl::File > getActiveFile () const
 
void getActiveSourceNames (std::vector< std::string > &sources) const
 
void getDisabledSourceNames (std::vector< std::string > &sources) const
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources) const
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer > > iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
virtual ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr< RequestManagergetInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3*60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
void checkSourcesImpl (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
bool compareSources (const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString () const
 
void queueUpdateCurrentServer (const std::string &)
 
void reportSiteChange (std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
 
void updateCurrentServer ()
 

Private Attributes

std::vector< std::shared_ptr< Source > > m_activeSources
 
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
 
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHashm_disabledSources
 
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
 
std::uniform_real_distribution< float > m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr< Source > > m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
std::atomic< std::string * > m_serverToAdvertise
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 53 of file XrdRequestManager.h.

Constructor & Destructor Documentation

virtual XrdAdaptor::RequestManager::~RequestManager ( )
virtualdefault
RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 118 of file XrdRequestManager.cc.

119  : m_serverToAdvertise(nullptr),
122  m_name(filename),
123  m_flags(flags),
124  m_perms(perms),
125  m_distribution(0, 100),
std::uniform_real_distribution< float > m_distribution
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const unsigned int XRD_DEFAULT_TIMEOUT
std::atomic< std::string * > m_serverToAdvertise
XrdCl::OpenFlags::Flags m_flags
XrdCl::Access::Mode m_perms
std::atomic< unsigned > m_excluded_active_count

Member Function Documentation

void RequestManager::addConnections ( cms::Exception ex) const

Add the list of active connections to the exception extra info.

Definition at line 491 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), and source.

Referenced by getActiveFile(), handleOpen(), initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), pickSingleSource(), requestFailure(), and splitClientRequest().

491  {
492  std::vector<std::string> sources;
494  for (auto const &source : sources) {
495  ex.addAdditionalInfo("Active source: " + source);
496  }
497  sources.clear();
498  getDisabledSourceNames(sources);
499  for (auto const &source : sources) {
500  ex.addAdditionalInfo("Disabled source: " + source);
501  }
502 }
void getDisabledSourceNames(std::vector< std::string > &sources) const
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
static std::string const source
Definition: EdmProvDump.cc:47
void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 303 of file XrdRequestManager.cc.

References checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, and timeDiffMS().

Referenced by handle(), and handleOpen().

306  {
307  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
308  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
309  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
310  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
311  { // Be more aggressive about getting rid of very bad sources.
312  compareSources(now, 0, 1, activeSources, inactiveSources);
313  compareSources(now, 1, 0, activeSources, inactiveSources);
314  }
316  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
317  }
318  }
319 }
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Definition at line 349 of file XrdRequestManager.cc.

References compareSources(), m_distribution, m_generator, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, eostools::move(), cmsPerfSuiteHarvest::now, alignCSCRings::r, reportSiteChange(), indexGen::s2, source, timeDiffMS(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

352  {
353  bool findNewSource = false;
354  if (activeSources.size() <= 1) {
355  findNewSource = true;
356  } else if (activeSources.size() > 1) {
357  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
358  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
359  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
360  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
361 
362  // NOTE: We could probably replace the copy with a better sort function.
363  // However, there are typically very few sources and the correctness is more obvious right now.
364  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
365  eligibleInactiveSources.reserve(inactiveSources.size());
366  for (const auto &source : inactiveSources) {
367  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
368  eligibleInactiveSources.push_back(source);
369  }
370  }
371  auto bestInactiveSource =
372  std::min_element(eligibleInactiveSources.begin(),
373  eligibleInactiveSources.end(),
374  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
375  return s1->getQuality() < s2->getQuality();
376  });
377  auto worstActiveSource = std::max_element(activeSources.cbegin(),
378  activeSources.cend(),
379  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
380  return s1->getQuality() < s2->getQuality();
381  });
382  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
383  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
384  << ", quality " << (*bestInactiveSource)->getQuality();
385  }
386  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
387  << ", quality " << (*worstActiveSource)->getQuality();
388  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
389  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
390  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
391  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
392  auto oldSources = activeSources;
393  activeSources.push_back(*bestInactiveSource);
394  reportSiteChange(oldSources, activeSources);
395  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
396  if (it->get() == bestInactiveSource->get()) {
397  inactiveSources.erase(it);
398  break;
399  }
400  } else
401  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
402  (*worstActiveSource)->getQuality() >
403  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
404  edm::LogVerbatim("XrdAdaptorInternal")
405  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
406  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
407  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
408  (*worstActiveSource)->setLastDowngrade(now);
409  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
410  if (it->get() == bestInactiveSource->get()) {
411  inactiveSources.erase(it);
412  break;
413  }
414  inactiveSources.emplace_back(std::move(*worstActiveSource));
415  auto oldSources = activeSources;
416  activeSources.erase(worstActiveSource);
417  activeSources.emplace_back(std::move(*bestInactiveSource));
418  reportSiteChange(oldSources, activeSources);
419  eligibleInactiveSources.clear();
420  for (const auto &source : inactiveSources)
421  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
422  eligibleInactiveSources.push_back(source);
423  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
424  eligibleInactiveSources.end(),
425  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
426  return s1->getQuality() < s2->getQuality();
427  });
428  worstActiveSource = std::max_element(activeSources.begin(),
429  activeSources.end(),
430  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
431  return s1->getQuality() < s2->getQuality();
432  });
433  }
434  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
435  float r = m_distribution(m_generator);
437  findNewSource = true;
438  }
439  }
440  }
441  if (findNewSource) {
442  m_open_handler->open();
444  }
445 
446  // Only aggressively look for new sources if we don't have two.
447  if (activeSources.size() == 2) {
449  } else {
451  }
453 }
std::uniform_real_distribution< float > m_distribution
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_ptr< OpenHandler > m_open_handler
#define XRD_ADAPTOR_LONG_OPEN_DELAY
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static std::string const source
Definition: EdmProvDump.cc:47
def move(src, dest)
Definition: eostools.py:511
bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
) const
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 321 of file XrdRequestManager.cc.

References a, b, SiStripPI::max, and reportSiteChange().

Referenced by checkSources(), and checkSourcesImpl().

325  {
326  if (activeSources.size() < std::max(a, b) + 1) {
327  return false;
328  }
329 
330  bool findNewSource = false;
331  if ((activeSources[a]->getQuality() > 5130) ||
332  ((activeSources[a]->getQuality() > 260) &&
333  (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
334  edm::LogVerbatim("XrdAdaptorInternal")
335  << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
336  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
337  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
338  findNewSource = true;
339  }
340  activeSources[a]->setLastDowngrade(now);
341  inactiveSources.emplace_back(activeSources[a]);
342  auto oldSources = activeSources;
343  activeSources.erase(activeSources.begin() + a);
344  reportSiteChange(oldSources, activeSources);
345  }
346  return findNewSource;
347 }
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  ) const

Return a pointer to an active file. Useful for metadata operations.

Definition at line 455 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_perms, and m_source_mutex.

455  {
456  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
457  if (m_activeSources.empty()) {
459  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
460  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
461  ex.addContext("In XrdAdaptor::RequestManager::handle()");
462  addConnections(ex);
463  throw ex;
464  }
465  return m_activeSources[0]->getFileHandle();
466 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
std::recursive_mutex m_source_mutex
void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 468 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

468  {
469  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
470  sources.reserve(m_activeSources.size());
471  for (auto const &source : m_activeSources) {
472  sources.push_back(source->ID());
473  }
474 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:47
std::recursive_mutex m_source_mutex
void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 484 of file XrdRequestManager.cc.

References m_disabledSourceStrings, and source.

Referenced by addConnections().

484  {
485  sources.reserve(m_disabledSourceStrings.size());
486  for (auto const &source : m_disabledSourceStrings) {
487  sources.push_back(source);
488  }
489 }
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static std::string const source
Definition: EdmProvDump.cc:47
const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 111 of file XrdRequestManager.h.

111 {return m_name;}
static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 121 of file XrdRequestManager.h.

References a, b, corrVsCorr::filename, flags, initialize(), instance, cmsPerfSuiteHarvest::now, mps_update::status, and AlCaHLTBitMon_QueryRunRegistry::string.

122  {
123  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
124  instance->initialize(instance);
125  return instance;
126  }
static PFTauRenderPlugin instance
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources) const

Definition at line 476 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

Referenced by addConnections().

476  {
477  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
478  sources.reserve(m_activeSources.size());
479  for (auto const &source : m_activeSources) {
480  sources.push_back(source->PrettyID());
481  }
482 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:47
std::recursive_mutex m_source_mutex
std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 63 of file XrdRequestManager.h.

References cmsBatch::handle, findQualityFiles::size, and btagGenBb_cfi::Status.

Referenced by handleOpen().

64  {
65  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
66  return handle(c_ptr);
67  }
size
Write out results.
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::future<IOSize> XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer > >  iolist)
std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 530 of file XrdRequestManager.cc.

References checkSources(), GET_CLOCK_MONOTONIC, m_activeSources, m_inactiveSources, m_source_mutex, eostools::move(), cmsPerfSuiteHarvest::now, pickSingleSource(), and source.

530  {
531  assert(c_ptr.get());
532  timespec now;
533  GET_CLOCK_MONOTONIC(now);
534  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
535  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
536  {
537  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
538  activeSources = m_activeSources;
539  inactiveSources = m_inactiveSources;
540  }
541  {
542  //make sure we update values before calling pickSingelSource
543  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
544  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
545  m_activeSources = std::move(activeSources);
546  m_inactiveSources = std::move(inactiveSources);
547  });
548 
549  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
550  }
551 
552  std::shared_ptr<Source> source = pickSingleSource();
553  source->handle(c_ptr);
554  return c_ptr->get_future();
555 }
#define GET_CLOCK_MONOTONIC(ts)
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
std::shared_ptr< Source > pickSingleSource()
static std::string const source
Definition: EdmProvDump.cc:47
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 591 of file XrdRequestManager.cc.

References a, addConnections(), cms::Exception::addContext(), b, checkSources(), TauDecayModes::dec, edm::errors::FileReadError, GET_CLOCK_MONOTONIC, handle(), m_activeSources, m_excluded_active_count, m_flags, m_inactiveSources, m_name, m_nextActiveSourceCheck, m_perms, m_source_mutex, eostools::move(), cmsPerfSuiteHarvest::now, AlCaHLTBitMon_ParallelJobs::p, queueUpdateCurrentServer(), reportSiteChange(), alignCSCRings::s, splitClientRequest(), edm::CPUTimer::start(), edm::CPUTimer::stop(), TrackValidation_cff::task, updateCurrentServer(), XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

591  {
592  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
593  if (status.IsOK()) {
594  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
595  for (const auto &s : m_activeSources) {
596  if (source->ID() == s->ID()) {
597  edm::LogVerbatim("XrdAdaptorInternal")
598  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
599  unsigned returned_count = ++m_excluded_active_count;
601  if (returned_count >= 3) {
603  }
604  return;
605  }
606  }
607  for (const auto &s : m_inactiveSources) {
608  if (source->ID() == s->ID()) {
609  edm::LogVerbatim("XrdAdaptorInternal")
610  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
612  return;
613  }
614  }
615  if (m_activeSources.size() < 2) {
616  auto oldSources = m_activeSources;
617  m_activeSources.push_back(source);
618  reportSiteChange(oldSources, m_activeSources);
620  } else {
621  m_inactiveSources.push_back(source);
622  }
623  } else { // File-open failure - wait at least 120s before next attempt.
624  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
626  }
627 }
void queueUpdateCurrentServer(const std::string &)
std::vector< std::shared_ptr< Source > > m_inactiveSources
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
std::atomic< unsigned > m_excluded_active_count
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static std::string const source
Definition: EdmProvDump.cc:47
std::recursive_mutex m_source_mutex
void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 128 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, FrontierConditions_GlobalTag_cff::file, ecalTB2006H4_GenSimDigiReco_cfg::File, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), training_settings::idx, m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, eostools::move(), pileupDistInMC::oldList, prepareOpaqueString(), queueUpdateCurrentServer(), reportSiteChange(), SendMonitoringInfo(), source, mps_update::status, AlCaHLTBitMon_QueryRunRegistry::string, updateCurrentServer(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

128  {
130 
131  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
132  if (env) {
133  env->GetInt("StreamErrorWindow", m_timeout);
134  }
135 
136  std::string orig_site;
137  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos)) {
138  std::string hostname;
139  if (Source::getHostname(orig_site, hostname)) {
140  Source::getDomain(hostname, orig_site);
141  }
142  }
143 
144  std::unique_ptr<XrdCl::File> file;
146  bool validFile = false;
147  const int retries = 5;
148  std::string excludeString;
149  for (int idx = 0; idx < retries; idx++) {
150  file.reset(new XrdCl::File());
151  auto opaque = prepareOpaqueString();
152  std::string new_filename =
153  m_name + (!opaque.empty() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
154  SyncHostResponseHandler handler;
155  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
156  if (!openStatus
157  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
158  // server and it was marked bad - xrootd couldn't even queue up the request internally!
159  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
160  // redirector being down or authentication failures.
161  ex.clearMessage();
162  ex.clearContext();
163  ex.clearAdditionalInfo();
164  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
165  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
166  << ", code=" << openStatus.code << ")";
167  ex.addContext("Calling XrdFile::open()");
168  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
169  throw ex;
170  }
171  handler.WaitForResponse();
172  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
173  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
174  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
175  assert(status);
176  if (status->IsOK()) {
177  validFile = true;
178  break;
179  } else {
180  ex.clearMessage();
181  ex.clearContext();
182  ex.clearAdditionalInfo();
183  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
184  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
185  << ", code=" << status->code << ")";
186  ex.addContext("Calling XrdFile::open()");
187  addConnections(ex);
188  std::string dataServer, lastUrl;
189  file->GetProperty("DataServer", dataServer);
190  file->GetProperty("LastURL", lastUrl);
191  if (!dataServer.empty()) {
192  ex.addAdditionalInfo("Problematic data server: " + dataServer);
193  }
194  if (!lastUrl.empty()) {
195  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
196  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
197  }
198  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
199  m_disabledSourceStrings.end()) {
200  ex << ". No additional data servers were found.";
201  throw ex;
202  }
203  if (!dataServer.empty()) {
204  m_disabledSourceStrings.insert(dataServer);
205  m_disabledExcludeStrings.insert(excludeString);
206  }
207  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
208  if (lastUrl == new_filename) {
209  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
210  throw ex;
211  }
212  }
213  }
214  if (!validFile) {
215  throw ex;
216  }
217  SendMonitoringInfo(*file);
218 
219  timespec ts;
221 
222  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
223  {
224  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
225  auto oldList = m_activeSources;
226  m_activeSources.push_back(source);
228  }
231 
232  m_lastSourceCheck = ts;
233  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
235 }
#define GET_CLOCK_MONOTONIC(ts)
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:290
std::string prepareOpaqueString() const
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
static void SendMonitoringInfo(XrdCl::File &file)
void addConnections(cms::Exception &) const
void queueUpdateCurrentServer(const std::string &)
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:248
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:328
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:221
static std::string const source
Definition: EdmProvDump.cc:47
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 504 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_nextInitialSourceToggle, m_perms, m_source_mutex, and source.

Referenced by handle().

504  {
505  std::shared_ptr<Source> source = nullptr;
506  {
507  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
508  if (m_activeSources.size() == 2) {
510  source = m_activeSources[0];
512  } else {
513  source = m_activeSources[1];
515  }
516  } else if (m_activeSources.empty()) {
518  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
519  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
520  ex.addContext("In XrdAdaptor::RequestManager::handle()");
521  addConnections(ex);
522  throw ex;
523  } else {
524  source = m_activeSources[0];
525  }
526  }
527  return source;
528 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static std::string const source
Definition: EdmProvDump.cc:47
std::recursive_mutex m_source_mutex
std::string RequestManager::prepareOpaqueString ( ) const
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 557 of file XrdRequestManager.cc.

References KineDebug3::count(), m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

557  {
558  struct {
559  std::stringstream ss;
560  size_t count = 0;
561  bool has_active = false;
562 
563  void append_tried(const std::string &id, bool active = false) {
564  ss << (count ? "," : "tried=") << id;
565  count++;
566  if (active) {
567  has_active = true;
568  }
569  }
570  } state;
571  {
572  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
573 
574  for (const auto &it : m_activeSources) {
575  state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')), true);
576  }
577  for (const auto &it : m_inactiveSources) {
578  state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')));
579  }
580  }
581  for (const auto &it : m_disabledExcludeStrings) {
582  state.append_tried(it.substr(0, it.find(':')));
583  }
584  if (state.has_active) {
585  state.ss << "&triedrc=resel";
586  }
587 
588  return state.ss.str();
589 }
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::recursive_mutex m_source_mutex
void RequestManager::queueUpdateCurrentServer ( const std::string &  id)
private

Definition at line 261 of file XrdRequestManager.cc.

References XrdAdaptor::Source::getHostname(), triggerObjects_cff::id, m_serverToAdvertise, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by handleOpen(), and initialize().

261  {
262  auto hostname = std::make_unique<std::string>(id);
263  if (Source::getHostname(id, *hostname)) {
264  std::string *null_hostname = nullptr;
265  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
266  hostname.release();
267  }
268  }
269 }
std::atomic< std::string * > m_serverToAdvertise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:221
void RequestManager::reportSiteChange ( std::vector< std::shared_ptr< Source > > const &  iOld,
std::vector< std::shared_ptr< Source > > const &  iNew,
std::string  orig_site = std::string{} 
) const
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 288 of file XrdRequestManager.cc.

Referenced by checkSourcesImpl(), compareSources(), handleOpen(), initialize(), and requestFailure().

290  {
291  auto siteList = formatSites(iNew);
292  if (!orig_site.empty() && (orig_site != siteList)) {
293  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
294  } else {
295  auto oldSites = formatSites(iOld);
296  if (orig_site.empty() && (siteList != oldSites)) {
297  if (!oldSites.empty())
298  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
299  }
300  }
301 }
void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 730 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, cmsPerfSuiteHarvest::now, reportSiteChange(), seconds(), mps_update::status, and mps_check::timeout.

730  {
731  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
732 
733  // Fail early for invalid responses - XrdFile has a separate path for handling this.
734  if (c_status.code == XrdCl::errInvalidResponse) {
735  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
737  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
738  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
739  << ") => Invalid ReadV response from server";
740  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
741  addConnections(ex);
742  throw ex;
743  }
744  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
745 
746  // Note that we do not delete the Source itself. That is because this
747  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
748  // In such a case, if you close a file in the handler, it will deadlock
749  m_disabledSourceStrings.insert(source_ptr->ID());
750  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
751  m_disabledSources.insert(source_ptr);
752 
753  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
754  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
755  auto oldSources = m_activeSources;
756  m_activeSources.erase(m_activeSources.begin());
757  reportSiteChange(oldSources, m_activeSources);
758  } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
759  auto oldSources = m_activeSources;
760  m_activeSources.erase(m_activeSources.begin() + 1);
761  reportSiteChange(oldSources, m_activeSources);
762  }
763  std::shared_ptr<Source> new_source;
764  if (m_activeSources.empty()) {
765  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
766  timespec now;
767  GET_CLOCK_MONOTONIC(now);
769  // Note we only wait for 180 seconds here. This is because we've already failed
770  // once and the likelihood the program has some inconsistent state is decent.
771  // We'd much rather fail hard than deadlock!
772  sentry.unlock();
773  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
774  if (status == std::future_status::timeout) {
776  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
777  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
778  << ") => timeout when waiting for file open";
779  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
780  addConnections(ex);
781  throw ex;
782  } else {
783  try {
784  new_source = future.get();
785  } catch (edm::Exception &ex) {
786  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
787  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
788  throw;
789  }
790  }
791 
792  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
793  m_disabledSourceStrings.end()) {
794  // The server gave us back a data node we requested excluded. Fatal!
796  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
797  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
798  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
799  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
800  addConnections(ex);
801  throw ex;
802  }
803  sentry.lock();
804 
805  auto oldSources = m_activeSources;
806  m_activeSources.push_back(new_source);
807  reportSiteChange(oldSources, m_activeSources);
808  } else {
809  new_source = m_activeSources[0];
810  }
811  new_source->handle(c_ptr);
812 }
double seconds()
#define GET_CLOCK_MONOTONIC(ts)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
int timeout
Definition: mps_check.py:53
void addConnections(cms::Exception &) const
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
Definition: Exception.cc:165
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::recursive_mutex m_source_mutex
void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2,
std::vector< std::shared_ptr< Source >> const &  activeSources 
) const
private

Given a client request, split it into two requests lists.

Definition at line 906 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), consumeChunkBack(), consumeChunkFront(), TauDecayModes::dec, edm::errors::FileReadError, m_flags, m_name, m_perms, SiStripPI::max, IOPosBuffer::offset(), q1, q2, jetUpdater_cfi::sort, validateList(), XRD_ADAPTOR_CHUNK_THRESHOLD, and XRD_CL_MAX_CHUNK.

Referenced by handleOpen().

909  {
910  if (iolist.empty())
911  return;
912  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
913  req1.reserve(iolist.size() / 2 + 1);
914  req2.reserve(iolist.size() / 2 + 1);
915  size_t front = 0;
916 
917  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
918  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
919  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
920  IOSize chunk1, chunk2;
921  // Make sure the chunk size is at least 1024; little point to reads less than that size.
922  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
923  static_cast<IOSize>(1024));
924  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
925  static_cast<IOSize>(1024));
926 
927  IOSize size_orig = 0;
928  for (const auto &it : iolist)
929  size_orig += it.size();
930 
931  while (tmp_iolist.size() - front > 0) {
932  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
933  (req2.size() >=
934  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
935  // are passed to the request manager. However, because we have a max chunk size, we increase
936  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
937  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
938  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
940  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
941  << ", permissions=0" << std::oct << m_perms << std::dec
942  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
943  "reported to CMSSW developers.";
944  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
945  addConnections(ex);
946  std::stringstream ss;
947  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
948  ex.addAdditionalInfo(ss.str());
949  std::stringstream ss2;
950  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
951  ex.addAdditionalInfo(ss2.str());
952  throw ex;
953  }
954  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
955  consumeChunkFront(front, tmp_iolist, req1, chunk1);
956  }
957  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
958  consumeChunkBack(front, tmp_iolist, req2, chunk2);
959  }
960  }
961  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
962  return left.offset() < right.offset();
963  });
964  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
965  return left.offset() < right.offset();
966  });
967 
968  IOSize size1 = validateList(req1);
969  IOSize size2 = validateList(req2);
970 
971  assert(size_orig == size1 + size2);
972 
973  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
974  << " bytes) split into requests size " << req1.size() << " (" << size1
975  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
976 }
#define XRD_CL_MAX_CHUNK
double q2[4]
Definition: TauolaWrapper.h:88
#define XRD_ADAPTOR_CHUNK_THRESHOLD
void addConnections(cms::Exception &) const
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
XrdCl::OpenFlags::Flags m_flags
double q1[4]
Definition: TauolaWrapper.h:87
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
XrdCl::Access::Mode m_perms
static IOSize validateList(const std::vector< IOPosBuffer > req)
size_t IOSize
Definition: IOTypes.h:14
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void RequestManager::updateCurrentServer ( )
inlineprivate

Update the StatisticsSenderService, if necessary, with the current server.

Update the StatisticsSenderService with the current server info.

As this accesses the edm::Service infrastructure, this MUST be called from an edm-managed thread. It CANNOT be called from an Xrootd-managed thread.

Definition at line 244 of file XrdRequestManager.cc.

References edm::Service< T >::isAvailable(), LIKELY, m_name, m_serverToAdvertise, edm::storage::StatisticsSenderService::setCurrentServer(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by handleOpen(), and initialize().

244  {
245  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
246  // a pending update. *However*, since we call this for every read, we'll get it
247  // eventually.
248  if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
249  return;
250  }
251  std::string *hostname_ptr;
252  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
253  std::unique_ptr<std::string> hostname(hostname_ptr);
255  if (statsService.isAvailable()) {
256  statsService->setCurrentServer(m_name, *hostname_ptr);
257  }
258  }
259 }
#define LIKELY(x)
Definition: Likely.h:20
std::atomic< std::string * > m_serverToAdvertise
bool isAvailable() const
Definition: Service.h:40
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)

Member Data Documentation

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 212 of file XrdRequestManager.h.

Referenced by getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), handle(), handleOpen(), initialize(), pickSingleSource(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 216 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 217 of file XrdRequestManager.h.

Referenced by requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 215 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 237 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 239 of file XrdRequestManager.h.

Referenced by handleOpen().

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private
std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 236 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 213 of file XrdRequestManager.h.

Referenced by handle(), handleOpen(), and prepareOpaqueString().

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 223 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

const std::string XrdAdaptor::RequestManager::m_name
private
timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 228 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), handleOpen(), and initialize().

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 226 of file XrdRequestManager.h.

Referenced by pickSingleSource().

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 297 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private
std::atomic<std::string*> XrdAdaptor::RequestManager::m_serverToAdvertise
private

Definition at line 221 of file XrdRequestManager.h.

Referenced by queueUpdateCurrentServer(), and updateCurrentServer().

std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
mutableprivate
int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 224 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 229 of file XrdRequestManager.h.

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3*60
static

Definition at line 56 of file XrdRequestManager.h.