CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Inheritance diagram for XrdAdaptor::RequestManager:

Classes

class  OpenHandler
 

Public Member Functions

void addConnections (cms::Exception &) const
 
std::shared_ptr< XrdCl::File > getActiveFile () const
 
void getActiveSourceNames (std::vector< std::string > &sources) const
 
void getDisabledSourceNames (std::vector< std::string > &sources) const
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources) const
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer > > iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
virtual ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr< RequestManagergetInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3*60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
void checkSourcesImpl (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
bool compareSources (const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString () const
 
void queueUpdateCurrentServer (const std::string &)
 
void reportSiteChange (std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
 
void updateCurrentServer ()
 

Private Attributes

std::vector< std::shared_ptr< Source > > m_activeSources
 
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
 
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHashm_disabledSources
 
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
 
std::uniform_real_distribution< float > m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr< Source > > m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
std::atomic< std::string * > m_serverToAdvertise
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 53 of file XrdRequestManager.h.

Constructor & Destructor Documentation

virtual XrdAdaptor::RequestManager::~RequestManager ( )
virtualdefault
RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 106 of file XrdRequestManager.cc.

107  : m_serverToAdvertise(nullptr),
110  m_name(filename),
111  m_flags(flags),
112  m_perms(perms),
113  m_distribution(0, 100),
std::uniform_real_distribution< float > m_distribution
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const unsigned int XRD_DEFAULT_TIMEOUT
std::atomic< std::string * > m_serverToAdvertise
XrdCl::OpenFlags::Flags m_flags
XrdCl::Access::Mode m_perms
std::atomic< unsigned > m_excluded_active_count

Member Function Documentation

void RequestManager::addConnections ( cms::Exception ex) const

Add the list of active connections to the exception extra info.

Definition at line 479 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), and source.

Referenced by getActiveFile(), handleOpen(), initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), pickSingleSource(), requestFailure(), and splitClientRequest().

479  {
480  std::vector<std::string> sources;
482  for (auto const &source : sources) {
483  ex.addAdditionalInfo("Active source: " + source);
484  }
485  sources.clear();
486  getDisabledSourceNames(sources);
487  for (auto const &source : sources) {
488  ex.addAdditionalInfo("Disabled source: " + source);
489  }
490 }
void getDisabledSourceNames(std::vector< std::string > &sources) const
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
static std::string const source
Definition: EdmProvDump.cc:43
void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 291 of file XrdRequestManager.cc.

References checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, and timeDiffMS().

Referenced by handle(), and handleOpen().

294  {
295  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
296  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
297  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
298  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
299  { // Be more aggressive about getting rid of very bad sources.
300  compareSources(now, 0, 1, activeSources, inactiveSources);
301  compareSources(now, 1, 0, activeSources, inactiveSources);
302  }
304  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
305  }
306  }
307 }
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Definition at line 337 of file XrdRequestManager.cc.

References compareSources(), m_distribution, m_generator, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, eostools::move(), cmsPerfSuiteHarvest::now, alignCSCRings::r, reportSiteChange(), indexGen::s2, source, timeDiffMS(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

340  {
341  bool findNewSource = false;
342  if (activeSources.size() <= 1) {
343  findNewSource = true;
344  } else if (activeSources.size() > 1) {
345  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
346  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
347  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
348  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
349 
350  // NOTE: We could probably replace the copy with a better sort function.
351  // However, there are typically very few sources and the correctness is more obvious right now.
352  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
353  eligibleInactiveSources.reserve(inactiveSources.size());
354  for (const auto &source : inactiveSources) {
355  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
356  eligibleInactiveSources.push_back(source);
357  }
358  }
359  auto bestInactiveSource =
360  std::min_element(eligibleInactiveSources.begin(),
361  eligibleInactiveSources.end(),
362  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
363  return s1->getQuality() < s2->getQuality();
364  });
365  auto worstActiveSource = std::max_element(activeSources.cbegin(),
366  activeSources.cend(),
367  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
368  return s1->getQuality() < s2->getQuality();
369  });
370  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
371  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
372  << ", quality " << (*bestInactiveSource)->getQuality();
373  }
374  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
375  << ", quality " << (*worstActiveSource)->getQuality();
376  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
377  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
378  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
379  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
380  auto oldSources = activeSources;
381  activeSources.push_back(*bestInactiveSource);
382  reportSiteChange(oldSources, activeSources);
383  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
384  if (it->get() == bestInactiveSource->get()) {
385  inactiveSources.erase(it);
386  break;
387  }
388  } else
389  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
390  (*worstActiveSource)->getQuality() >
391  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
392  edm::LogVerbatim("XrdAdaptorInternal")
393  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
394  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
395  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
396  (*worstActiveSource)->setLastDowngrade(now);
397  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
398  if (it->get() == bestInactiveSource->get()) {
399  inactiveSources.erase(it);
400  break;
401  }
402  inactiveSources.emplace_back(std::move(*worstActiveSource));
403  auto oldSources = activeSources;
404  activeSources.erase(worstActiveSource);
405  activeSources.emplace_back(std::move(*bestInactiveSource));
406  reportSiteChange(oldSources, activeSources);
407  eligibleInactiveSources.clear();
408  for (const auto &source : inactiveSources)
409  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
410  eligibleInactiveSources.push_back(source);
411  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
412  eligibleInactiveSources.end(),
413  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
414  return s1->getQuality() < s2->getQuality();
415  });
416  worstActiveSource = std::max_element(activeSources.begin(),
417  activeSources.end(),
418  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
419  return s1->getQuality() < s2->getQuality();
420  });
421  }
422  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
423  float r = m_distribution(m_generator);
425  findNewSource = true;
426  }
427  }
428  }
429  if (findNewSource) {
430  m_open_handler->open();
432  }
433 
434  // Only aggressively look for new sources if we don't have two.
435  if (activeSources.size() == 2) {
437  } else {
439  }
441 }
std::uniform_real_distribution< float > m_distribution
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_ptr< OpenHandler > m_open_handler
#define XRD_ADAPTOR_LONG_OPEN_DELAY
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static std::string const source
Definition: EdmProvDump.cc:43
def move(src, dest)
Definition: eostools.py:510
bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
) const
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 309 of file XrdRequestManager.cc.

References a, b, SiStripPI::max, and reportSiteChange().

Referenced by checkSources(), and checkSourcesImpl().

313  {
314  if (activeSources.size() < std::max(a, b) + 1) {
315  return false;
316  }
317 
318  bool findNewSource = false;
319  if ((activeSources[a]->getQuality() > 5130) ||
320  ((activeSources[a]->getQuality() > 260) &&
321  (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
322  edm::LogVerbatim("XrdAdaptorInternal")
323  << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
324  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
325  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
326  findNewSource = true;
327  }
328  activeSources[a]->setLastDowngrade(now);
329  inactiveSources.emplace_back(activeSources[a]);
330  auto oldSources = activeSources;
331  activeSources.erase(activeSources.begin() + a);
332  reportSiteChange(oldSources, activeSources);
333  }
334  return findNewSource;
335 }
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  ) const

Return a pointer to an active file. Useful for metadata operations.

Definition at line 443 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_perms, and m_source_mutex.

443  {
444  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
445  if (m_activeSources.empty()) {
447  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
448  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
449  ex.addContext("In XrdAdaptor::RequestManager::handle()");
450  addConnections(ex);
451  throw ex;
452  }
453  return m_activeSources[0]->getFileHandle();
454 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
std::recursive_mutex m_source_mutex
void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 456 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

456  {
457  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
458  sources.reserve(m_activeSources.size());
459  for (auto const &source : m_activeSources) {
460  sources.push_back(source->ID());
461  }
462 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 472 of file XrdRequestManager.cc.

References m_disabledSourceStrings, and source.

Referenced by addConnections().

472  {
473  sources.reserve(m_disabledSourceStrings.size());
474  for (auto const &source : m_disabledSourceStrings) {
475  sources.push_back(source);
476  }
477 }
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static std::string const source
Definition: EdmProvDump.cc:43
const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 111 of file XrdRequestManager.h.

111 {return m_name;}
static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 121 of file XrdRequestManager.h.

References a, b, corrVsCorr::filename, flags, initialize(), instance, cmsPerfSuiteHarvest::now, mps_update::status, and AlCaHLTBitMon_QueryRunRegistry::string.

122  {
123  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
124  instance->initialize(instance);
125  return instance;
126  }
static PFTauRenderPlugin instance
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources) const

Definition at line 464 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

Referenced by addConnections().

464  {
465  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
466  sources.reserve(m_activeSources.size());
467  for (auto const &source : m_activeSources) {
468  sources.push_back(source->PrettyID());
469  }
470 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 63 of file XrdRequestManager.h.

References cmsBatch::handle, findQualityFiles::size, and btagGenBb_cfi::Status.

Referenced by handleOpen().

64  {
65  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
66  return handle(c_ptr);
67  }
size
Write out results.
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::future<IOSize> XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer > >  iolist)
std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 518 of file XrdRequestManager.cc.

References checkSources(), GET_CLOCK_MONOTONIC, m_activeSources, m_inactiveSources, m_source_mutex, eostools::move(), cmsPerfSuiteHarvest::now, pickSingleSource(), and source.

518  {
519  assert(c_ptr.get());
520  timespec now;
521  GET_CLOCK_MONOTONIC(now);
522  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
523  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
524  {
525  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
526  activeSources = m_activeSources;
527  inactiveSources = m_inactiveSources;
528  }
529  {
530  //make sure we update values before calling pickSingelSource
531  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
532  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
533  m_activeSources = std::move(activeSources);
534  m_inactiveSources = std::move(inactiveSources);
535  });
536 
537  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
538  }
539 
540  std::shared_ptr<Source> source = pickSingleSource();
541  source->handle(c_ptr);
542  return c_ptr->get_future();
543 }
#define GET_CLOCK_MONOTONIC(ts)
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
std::shared_ptr< Source > pickSingleSource()
static std::string const source
Definition: EdmProvDump.cc:43
def move(src, dest)
Definition: eostools.py:510
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 572 of file XrdRequestManager.cc.

References a, addConnections(), cms::Exception::addContext(), b, checkSources(), TauDecayModes::dec, edm::errors::FileReadError, GET_CLOCK_MONOTONIC, handle(), m_activeSources, m_excluded_active_count, m_flags, m_inactiveSources, m_name, m_nextActiveSourceCheck, m_perms, m_source_mutex, eostools::move(), cmsPerfSuiteHarvest::now, AlCaHLTBitMon_ParallelJobs::p, queueUpdateCurrentServer(), reportSiteChange(), alignCSCRings::s, splitClientRequest(), edm::CPUTimer::start(), edm::CPUTimer::stop(), updateCurrentServer(), XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

572  {
573  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
574  if (status.IsOK()) {
575  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
576  for (const auto &s : m_activeSources) {
577  if (source->ID() == s->ID()) {
578  edm::LogVerbatim("XrdAdaptorInternal")
579  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
580  unsigned returned_count = ++m_excluded_active_count;
582  if (returned_count >= 3) {
584  }
585  return;
586  }
587  }
588  for (const auto &s : m_inactiveSources) {
589  if (source->ID() == s->ID()) {
590  edm::LogVerbatim("XrdAdaptorInternal")
591  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
593  return;
594  }
595  }
596  if (m_activeSources.size() < 2) {
597  auto oldSources = m_activeSources;
598  m_activeSources.push_back(source);
599  reportSiteChange(oldSources, m_activeSources);
601  } else {
602  m_inactiveSources.push_back(source);
603  }
604  } else { // File-open failure - wait at least 120s before next attempt.
605  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
607  }
608 }
void queueUpdateCurrentServer(const std::string &)
std::vector< std::shared_ptr< Source > > m_inactiveSources
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
std::atomic< unsigned > m_excluded_active_count
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 116 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, FrontierConditions_GlobalTag_cff::file, ecalTB2006H4_GenSimDigiReco_cfg::File, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), training_settings::idx, m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, eostools::move(), pileupDistInMC::oldList, prepareOpaqueString(), queueUpdateCurrentServer(), reportSiteChange(), SendMonitoringInfo(), source, mps_update::status, AlCaHLTBitMon_QueryRunRegistry::string, updateCurrentServer(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

116  {
118 
119  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
120  if (env) {
121  env->GetInt("StreamErrorWindow", m_timeout);
122  }
123 
124  std::string orig_site;
125  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos)) {
126  std::string hostname;
127  if (Source::getHostname(orig_site, hostname)) {
128  Source::getDomain(hostname, orig_site);
129  }
130  }
131 
132  std::unique_ptr<XrdCl::File> file;
134  bool validFile = false;
135  const int retries = 5;
136  std::string excludeString;
137  for (int idx = 0; idx < retries; idx++) {
138  file.reset(new XrdCl::File());
139  auto opaque = prepareOpaqueString();
140  std::string new_filename =
141  m_name + (!opaque.empty() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
142  SyncHostResponseHandler handler;
143  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
144  if (!openStatus
145  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
146  // server and it was marked bad - xrootd couldn't even queue up the request internally!
147  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
148  // redirector being down or authentication failures.
149  ex.clearMessage();
150  ex.clearContext();
151  ex.clearAdditionalInfo();
152  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
153  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
154  << ", code=" << openStatus.code << ")";
155  ex.addContext("Calling XrdFile::open()");
156  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
157  throw ex;
158  }
159  handler.WaitForResponse();
160  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
161  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
162  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
163  assert(status);
164  if (status->IsOK()) {
165  validFile = true;
166  break;
167  } else {
168  ex.clearMessage();
169  ex.clearContext();
170  ex.clearAdditionalInfo();
171  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
172  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
173  << ", code=" << status->code << ")";
174  ex.addContext("Calling XrdFile::open()");
175  addConnections(ex);
176  std::string dataServer, lastUrl;
177  file->GetProperty("DataServer", dataServer);
178  file->GetProperty("LastURL", lastUrl);
179  if (!dataServer.empty()) {
180  ex.addAdditionalInfo("Problematic data server: " + dataServer);
181  }
182  if (!lastUrl.empty()) {
183  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
184  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
185  }
186  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
187  m_disabledSourceStrings.end()) {
188  ex << ". No additional data servers were found.";
189  throw ex;
190  }
191  if (!dataServer.empty()) {
192  m_disabledSourceStrings.insert(dataServer);
193  m_disabledExcludeStrings.insert(excludeString);
194  }
195  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
196  if (lastUrl == new_filename) {
197  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
198  throw ex;
199  }
200  }
201  }
202  if (!validFile) {
203  throw ex;
204  }
205  SendMonitoringInfo(*file);
206 
207  timespec ts;
209 
210  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
211  {
212  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
213  auto oldList = m_activeSources;
214  m_activeSources.push_back(source);
216  }
219 
220  m_lastSourceCheck = ts;
221  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
223 }
#define GET_CLOCK_MONOTONIC(ts)
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:307
std::string prepareOpaqueString() const
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
static void SendMonitoringInfo(XrdCl::File &file)
void addConnections(cms::Exception &) const
void queueUpdateCurrentServer(const std::string &)
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:255
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:344
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
static std::string const source
Definition: EdmProvDump.cc:43
def move(src, dest)
Definition: eostools.py:510
std::recursive_mutex m_source_mutex
std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 492 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_nextInitialSourceToggle, m_perms, m_source_mutex, and source.

Referenced by handle().

492  {
493  std::shared_ptr<Source> source = nullptr;
494  {
495  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
496  if (m_activeSources.size() == 2) {
498  source = m_activeSources[0];
500  } else {
501  source = m_activeSources[1];
503  }
504  } else if (m_activeSources.empty()) {
506  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
507  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
508  ex.addContext("In XrdAdaptor::RequestManager::handle()");
509  addConnections(ex);
510  throw ex;
511  } else {
512  source = m_activeSources[0];
513  }
514  }
515  return source;
516 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
std::string RequestManager::prepareOpaqueString ( ) const
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 545 of file XrdRequestManager.cc.

References KineDebug3::count(), m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, AlCaHLTBitMon_QueryRunRegistry::string, and create_public_lumi_plots::tmp_str.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

545  {
546  std::stringstream ss;
547  ss << "tried=";
548  size_t count = 0;
549  {
550  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
551 
552  for (const auto &it : m_activeSources) {
553  count++;
554  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
555  }
556  for (const auto &it : m_inactiveSources) {
557  count++;
558  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
559  }
560  }
561  for (const auto &it : m_disabledExcludeStrings) {
562  count++;
563  ss << it.substr(0, it.find(":")) << ",";
564  }
565  if (count) {
566  std::string tmp_str = ss.str();
567  return tmp_str.substr(0, tmp_str.size() - 1);
568  }
569  return "";
570 }
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::recursive_mutex m_source_mutex
void RequestManager::queueUpdateCurrentServer ( const std::string &  id)
private

Definition at line 249 of file XrdRequestManager.cc.

References XrdAdaptor::Source::getHostname(), triggerObjects_cff::id, m_serverToAdvertise, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by handleOpen(), and initialize().

249  {
250  auto hostname = std::make_unique<std::string>(id);
251  if (Source::getHostname(id, *hostname)) {
252  std::string *null_hostname = nullptr;
253  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
254  hostname.release();
255  }
256  }
257 }
std::atomic< std::string * > m_serverToAdvertise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
void RequestManager::reportSiteChange ( std::vector< std::shared_ptr< Source > > const &  iOld,
std::vector< std::shared_ptr< Source > > const &  iNew,
std::string  orig_site = std::string{} 
) const
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 276 of file XrdRequestManager.cc.

Referenced by checkSourcesImpl(), compareSources(), handleOpen(), initialize(), and requestFailure().

278  {
279  auto siteList = formatSites(iNew);
280  if (!orig_site.empty() && (orig_site != siteList)) {
281  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
282  } else {
283  auto oldSites = formatSites(iOld);
284  if (orig_site.empty() && (siteList != oldSites)) {
285  if (!oldSites.empty())
286  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
287  }
288  }
289 }
void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 711 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, cmsPerfSuiteHarvest::now, reportSiteChange(), seconds(), mps_update::status, and mps_check::timeout.

711  {
712  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
713 
714  // Fail early for invalid responses - XrdFile has a separate path for handling this.
715  if (c_status.code == XrdCl::errInvalidResponse) {
716  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
718  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
719  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
720  << ") => Invalid ReadV response from server";
721  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
722  addConnections(ex);
723  throw ex;
724  }
725  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
726 
727  // Note that we do not delete the Source itself. That is because this
728  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
729  // In such a case, if you close a file in the handler, it will deadlock
730  m_disabledSourceStrings.insert(source_ptr->ID());
731  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
732  m_disabledSources.insert(source_ptr);
733 
734  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
735  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
736  auto oldSources = m_activeSources;
737  m_activeSources.erase(m_activeSources.begin());
738  reportSiteChange(oldSources, m_activeSources);
739  } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
740  auto oldSources = m_activeSources;
741  m_activeSources.erase(m_activeSources.begin() + 1);
742  reportSiteChange(oldSources, m_activeSources);
743  }
744  std::shared_ptr<Source> new_source;
745  if (m_activeSources.empty()) {
746  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
747  timespec now;
748  GET_CLOCK_MONOTONIC(now);
750  // Note we only wait for 180 seconds here. This is because we've already failed
751  // once and the likelihood the program has some inconsistent state is decent.
752  // We'd much rather fail hard than deadlock!
753  sentry.unlock();
754  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
755  if (status == std::future_status::timeout) {
757  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
758  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
759  << ") => timeout when waiting for file open";
760  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
761  addConnections(ex);
762  throw ex;
763  } else {
764  try {
765  new_source = future.get();
766  } catch (edm::Exception &ex) {
767  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
768  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
769  throw;
770  }
771  }
772 
773  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
774  m_disabledSourceStrings.end()) {
775  // The server gave us back a data node we requested excluded. Fatal!
777  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
778  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
779  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
780  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
781  addConnections(ex);
782  throw ex;
783  }
784  sentry.lock();
785 
786  auto oldSources = m_activeSources;
787  m_activeSources.push_back(new_source);
788  reportSiteChange(oldSources, m_activeSources);
789  } else {
790  new_source = m_activeSources[0];
791  }
792  new_source->handle(c_ptr);
793 }
double seconds()
#define GET_CLOCK_MONOTONIC(ts)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
int timeout
Definition: mps_check.py:51
void addConnections(cms::Exception &) const
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
Definition: Exception.cc:227
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::recursive_mutex m_source_mutex
void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2,
std::vector< std::shared_ptr< Source >> const &  activeSources 
) const
private

Given a client request, split it into two requests lists.

Definition at line 887 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), consumeChunkBack(), consumeChunkFront(), TauDecayModes::dec, edm::errors::FileReadError, m_flags, m_name, m_perms, SiStripPI::max, IOPosBuffer::offset(), q1, q2, validateList(), XRD_ADAPTOR_CHUNK_THRESHOLD, and XRD_CL_MAX_CHUNK.

Referenced by handleOpen().

890  {
891  if (iolist.empty())
892  return;
893  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
894  req1.reserve(iolist.size() / 2 + 1);
895  req2.reserve(iolist.size() / 2 + 1);
896  size_t front = 0;
897 
898  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
899  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
900  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
901  IOSize chunk1, chunk2;
902  // Make sure the chunk size is at least 1024; little point to reads less than that size.
903  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
904  static_cast<IOSize>(1024));
905  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
906  static_cast<IOSize>(1024));
907 
908  IOSize size_orig = 0;
909  for (const auto &it : iolist)
910  size_orig += it.size();
911 
912  while (tmp_iolist.size() - front > 0) {
913  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
914  (req2.size() >=
915  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
916  // are passed to the request manager. However, because we have a max chunk size, we increase
917  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
918  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
919  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
921  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
922  << ", permissions=0" << std::oct << m_perms << std::dec
923  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
924  "reported to CMSSW developers.";
925  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
926  addConnections(ex);
927  std::stringstream ss;
928  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
929  ex.addAdditionalInfo(ss.str());
930  std::stringstream ss2;
931  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
932  ex.addAdditionalInfo(ss2.str());
933  throw ex;
934  }
935  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
936  consumeChunkFront(front, tmp_iolist, req1, chunk1);
937  }
938  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
939  consumeChunkBack(front, tmp_iolist, req2, chunk2);
940  }
941  }
942  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
943  return left.offset() < right.offset();
944  });
945  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
946  return left.offset() < right.offset();
947  });
948 
949  IOSize size1 = validateList(req1);
950  IOSize size2 = validateList(req2);
951 
952  assert(size_orig == size1 + size2);
953 
954  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
955  << " bytes) split into requests size " << req1.size() << " (" << size1
956  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
957 }
#define XRD_CL_MAX_CHUNK
double q2[4]
Definition: TauolaWrapper.h:88
#define XRD_ADAPTOR_CHUNK_THRESHOLD
void addConnections(cms::Exception &) const
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
XrdCl::OpenFlags::Flags m_flags
double q1[4]
Definition: TauolaWrapper.h:87
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
XrdCl::Access::Mode m_perms
static IOSize validateList(const std::vector< IOPosBuffer > req)
size_t IOSize
Definition: IOTypes.h:14
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void RequestManager::updateCurrentServer ( )
inlineprivate

Update the StatisticsSenderService, if necessary, with the current server.

Update the StatisticsSenderService with the current server info.

As this accesses the edm::Service infrastructure, this MUST be called from an edm-managed thread. It CANNOT be called from an Xrootd-managed thread.

Definition at line 232 of file XrdRequestManager.cc.

References edm::Service< T >::isAvailable(), likely, m_name, m_serverToAdvertise, edm::storage::StatisticsSenderService::setCurrentServer(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by handleOpen(), and initialize().

232  {
233  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
234  // a pending update. *However*, since we call this for every read, we'll get it
235  // eventually.
236  if (likely(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
237  return;
238  }
239  std::string *hostname_ptr;
240  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
241  std::unique_ptr<std::string> hostname(hostname_ptr);
243  if (statsService.isAvailable()) {
244  statsService->setCurrentServer(m_name, *hostname_ptr);
245  }
246  }
247 }
#define likely(x)
std::atomic< std::string * > m_serverToAdvertise
bool isAvailable() const
Definition: Service.h:46
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)

Member Data Documentation

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 212 of file XrdRequestManager.h.

Referenced by getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), handle(), handleOpen(), initialize(), pickSingleSource(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 216 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 217 of file XrdRequestManager.h.

Referenced by requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 215 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 237 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 239 of file XrdRequestManager.h.

Referenced by handleOpen().

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private
std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 236 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 213 of file XrdRequestManager.h.

Referenced by handle(), handleOpen(), and prepareOpaqueString().

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 223 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

const std::string XrdAdaptor::RequestManager::m_name
private
timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 228 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), handleOpen(), and initialize().

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 226 of file XrdRequestManager.h.

Referenced by pickSingleSource().

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 297 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private
std::atomic<std::string*> XrdAdaptor::RequestManager::m_serverToAdvertise
private

Definition at line 221 of file XrdRequestManager.h.

Referenced by queueUpdateCurrentServer(), and updateCurrentServer().

std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
mutableprivate
int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 224 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 229 of file XrdRequestManager.h.

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3*60
static

Definition at line 56 of file XrdRequestManager.h.