CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
List of all members | Classes | Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Classes

class  OpenHandler
 

Public Types

using IOOffset = edm::storage::IOOffset
 
using IOPosBuffer = edm::storage::IOPosBuffer
 
using IOSize = edm::storage::IOSize
 

Public Member Functions

void addConnections (cms::Exception &) const
 
std::shared_ptr< XrdCl::File > getActiveFile () const
 
void getActiveSourceNames (std::vector< std::string > &sources) const
 
void getDisabledSourceNames (std::vector< std::string > &sources) const
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources) const
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer >> iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
RequestManageroperator= (const RequestManager &)=delete
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
 RequestManager (const RequestManager &)=delete
 
virtual ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr
< RequestManager
getInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
void checkSourcesImpl (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
bool compareSources (const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString () const
 
void queueUpdateCurrentServer (const std::string &)
 
void reportSiteChange (std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
 
void updateCurrentServer ()
 

Private Attributes

std::vector< std::shared_ptr
< Source > > 
m_activeSources
 
tbb::concurrent_unordered_set
< std::string > 
m_disabledExcludeStrings
 
tbb::concurrent_unordered_set
< std::shared_ptr< Source >
, SourceHash
m_disabledSources
 
tbb::concurrent_unordered_set
< std::string > 
m_disabledSourceStrings
 
std::uniform_real_distribution
< float > 
m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr
< Source > > 
m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
std::atomic< std::string * > m_serverToAdvertise
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 44 of file XrdRequestManager.h.

Member Typedef Documentation

Definition at line 48 of file XrdRequestManager.h.

Definition at line 47 of file XrdRequestManager.h.

Definition at line 46 of file XrdRequestManager.h.

Constructor & Destructor Documentation

XrdAdaptor::RequestManager::RequestManager ( const RequestManager )
delete

Referenced by getInstance().

virtual XrdAdaptor::RequestManager::~RequestManager ( )
virtualdefault
RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 116 of file XrdRequestManager.cc.

117  : m_serverToAdvertise(nullptr),
120  m_name(filename),
121  m_flags(flags),
122  m_perms(perms),
123  m_distribution(0, 100),
std::uniform_real_distribution< float > m_distribution
static const unsigned int XRD_DEFAULT_TIMEOUT
std::atomic< std::string * > m_serverToAdvertise
XrdCl::OpenFlags::Flags m_flags
XrdCl::Access::Mode m_perms
std::atomic< unsigned > m_excluded_active_count
tuple filename
Definition: lut2db_cfg.py:20

Member Function Documentation

void RequestManager::addConnections ( cms::Exception ex) const

Add the list of active connections to the exception extra info.

Definition at line 489 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), and source.

Referenced by getActiveFile(), initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), pickSingleSource(), and requestFailure().

489  {
490  std::vector<std::string> sources;
492  for (auto const &source : sources) {
493  ex.addAdditionalInfo("Active source: " + source);
494  }
495  sources.clear();
496  getDisabledSourceNames(sources);
497  for (auto const &source : sources) {
498  ex.addAdditionalInfo("Disabled source: " + source);
499  }
500 }
void getDisabledSourceNames(std::vector< std::string > &sources) const
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
static std::string const source
Definition: EdmProvDump.cc:46
void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 301 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, and timeDiffMS().

Referenced by handle().

304  {
305  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
306  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
307  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
308  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
309  { // Be more aggressive about getting rid of very bad sources.
310  compareSources(now, 0, 1, activeSources, inactiveSources);
311  compareSources(now, 1, 0, activeSources, inactiveSources);
312  }
314  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
315  }
316  }
317 }
Log< level::Info, true > LogVerbatim
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Definition at line 347 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, compareSources(), m_distribution, m_generator, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, eostools::move(), submitPVValidationJobs::now, alignCSCRings::r, reportSiteChange(), source, timeDiffMS(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

350  {
351  bool findNewSource = false;
352  if (activeSources.size() <= 1) {
353  findNewSource = true;
354  } else if (activeSources.size() > 1) {
355  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
356  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
357  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
358  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
359 
360  // NOTE: We could probably replace the copy with a better sort function.
361  // However, there are typically very few sources and the correctness is more obvious right now.
362  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
363  eligibleInactiveSources.reserve(inactiveSources.size());
364  for (const auto &source : inactiveSources) {
365  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
366  eligibleInactiveSources.push_back(source);
367  }
368  }
369  auto bestInactiveSource =
370  std::min_element(eligibleInactiveSources.begin(),
371  eligibleInactiveSources.end(),
372  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
373  return s1->getQuality() < s2->getQuality();
374  });
375  auto worstActiveSource = std::max_element(activeSources.cbegin(),
376  activeSources.cend(),
377  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
378  return s1->getQuality() < s2->getQuality();
379  });
380  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
381  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
382  << ", quality " << (*bestInactiveSource)->getQuality();
383  }
384  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
385  << ", quality " << (*worstActiveSource)->getQuality();
386  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
387  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
388  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
389  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
390  auto oldSources = activeSources;
391  activeSources.push_back(*bestInactiveSource);
392  reportSiteChange(oldSources, activeSources);
393  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
394  if (it->get() == bestInactiveSource->get()) {
395  inactiveSources.erase(it);
396  break;
397  }
398  } else
399  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
400  (*worstActiveSource)->getQuality() >
401  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
402  edm::LogVerbatim("XrdAdaptorInternal")
403  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
404  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
405  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
406  (*worstActiveSource)->setLastDowngrade(now);
407  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
408  if (it->get() == bestInactiveSource->get()) {
409  inactiveSources.erase(it);
410  break;
411  }
412  inactiveSources.emplace_back(*worstActiveSource);
413  auto oldSources = activeSources;
414  activeSources.erase(worstActiveSource);
415  activeSources.emplace_back(std::move(*bestInactiveSource));
416  reportSiteChange(oldSources, activeSources);
417  eligibleInactiveSources.clear();
418  for (const auto &source : inactiveSources)
419  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
420  eligibleInactiveSources.push_back(source);
421  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
422  eligibleInactiveSources.end(),
423  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
424  return s1->getQuality() < s2->getQuality();
425  });
426  worstActiveSource = std::max_element(activeSources.begin(),
427  activeSources.end(),
428  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
429  return s1->getQuality() < s2->getQuality();
430  });
431  }
432  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
433  float r = m_distribution(m_generator);
435  findNewSource = true;
436  }
437  }
438  }
439  if (findNewSource) {
440  m_open_handler->open();
442  }
443 
444  // Only aggressively look for new sources if we don't have two.
445  if (activeSources.size() == 2) {
447  } else {
449  }
451 }
Log< level::Info, true > LogVerbatim
static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::uniform_real_distribution< float > m_distribution
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY
def move
Definition: eostools.py:511
std::shared_ptr< OpenHandler > m_open_handler
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY
static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
static std::string const source
Definition: EdmProvDump.cc:46
bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
) const
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 319 of file XrdRequestManager.cc.

References a, CalibrationSummaryClient_cfi::activeSources, b, SiStripPI::max, and reportSiteChange().

Referenced by checkSources(), and checkSourcesImpl().

323  {
324  if (activeSources.size() < std::max(a, b) + 1) {
325  return false;
326  }
327 
328  bool findNewSource = false;
329  if ((activeSources[a]->getQuality() > 5130) ||
330  ((activeSources[a]->getQuality() > 260) &&
331  (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
332  edm::LogVerbatim("XrdAdaptorInternal")
333  << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
334  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
335  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
336  findNewSource = true;
337  }
338  activeSources[a]->setLastDowngrade(now);
339  inactiveSources.emplace_back(activeSources[a]);
340  auto oldSources = activeSources;
341  activeSources.erase(activeSources.begin() + a);
342  reportSiteChange(oldSources, activeSources);
343  }
344  return findNewSource;
345 }
Log< level::Info, true > LogVerbatim
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
double b
Definition: hdecay.h:118
double a
Definition: hdecay.h:119
std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  ) const

Return a pointer to an active file. Useful for metadata operations.

Definition at line 453 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_perms, and m_source_mutex.

453  {
454  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
455  if (m_activeSources.empty()) {
457  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
458  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
459  ex.addContext("In XrdAdaptor::RequestManager::handle()");
460  addConnections(ex);
461  throw ex;
462  }
463  return m_activeSources[0]->getFileHandle();
464 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
std::recursive_mutex m_source_mutex
void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 466 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

466  {
467  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
468  sources.reserve(m_activeSources.size());
469  for (auto const &source : m_activeSources) {
470  sources.push_back(source->ID());
471  }
472 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:46
std::recursive_mutex m_source_mutex
void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 482 of file XrdRequestManager.cc.

References m_disabledSourceStrings, and source.

Referenced by addConnections().

482  {
483  sources.reserve(m_disabledSourceStrings.size());
484  for (auto const &source : m_disabledSourceStrings) {
485  sources.push_back(source);
486  }
487 }
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static std::string const source
Definition: EdmProvDump.cc:46
const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 107 of file XrdRequestManager.h.

References m_name.

107 { return m_name; }
static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 116 of file XrdRequestManager.h.

References instance, and RequestManager().

118  {
119  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
120  instance->initialize(instance);
121  return instance;
122  }
static PFTauRenderPlugin instance
tuple filename
Definition: lut2db_cfg.py:20
RequestManager(const RequestManager &)=delete
void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources) const

Definition at line 474 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

Referenced by addConnections().

474  {
475  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
476  sources.reserve(m_activeSources.size());
477  for (auto const &source : m_activeSources) {
478  sources.push_back(source->PrettyID());
479  }
480 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:46
std::recursive_mutex m_source_mutex
std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 60 of file XrdRequestManager.h.

References findQualityFiles::size.

60  {
61  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
62  return handle(c_ptr);
63  }
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
tuple size
Write out results.
std::future< IOSize > XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer >>  iolist)

Definition at line 620 of file XrdRequestManager.cc.

References a, CalibrationSummaryClient_cfi::activeSources, cms::Exception::addContext(), cms::cuda::assert(), b, TauDecayModes::dec, edm::errors::FileReadError, GET_CLOCK_MONOTONIC, eostools::move(), submitPVValidationJobs::now, AlCaHLTBitMon_ParallelJobs::p, edm::CPUTimer::start(), and edm::CPUTimer::stop().

620  {
621  //Use a copy of m_activeSources and m_inactiveSources throughout this function
622  // in order to avoid holding the lock a long time and causing a deadlock.
623  // When the function is over we will update the values of the containers
624  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
625  {
626  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
627  activeSources = m_activeSources;
628  inactiveSources = m_inactiveSources;
629  }
630  //Make sure we update changes when we leave the function
631  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
632  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
633  m_activeSources = std::move(activeSources);
634  m_inactiveSources = std::move(inactiveSources);
635  });
636 
638 
639  timespec now;
640  GET_CLOCK_MONOTONIC(now);
641 
642  edm::CPUTimer timer;
643  timer.start();
644 
645  if (activeSources.size() == 1) {
646  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
647  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
648  activeSources[0]->handle(c_ptr);
649  return c_ptr->get_future();
650  }
651  // Make sure active
652  else if (activeSources.empty()) {
654  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
655  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
656  ex.addContext("In XrdAdaptor::RequestManager::handle()");
657  addConnections(ex);
658  throw ex;
659  }
660 
661  assert(iolist.get());
662  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
663  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
664  splitClientRequest(*iolist, *req1, *req2, activeSources);
665 
666  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
667  // CheckSources may have removed a source
668  if (activeSources.size() == 1) {
669  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
670  activeSources[0]->handle(c_ptr);
671  return c_ptr->get_future();
672  }
673 
674  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
675  std::future<IOSize> future1, future2;
676  if (!req1->empty()) {
677  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
678  activeSources[0]->handle(c_ptr1);
679  future1 = c_ptr1->get_future();
680  }
681  if (!req2->empty()) {
682  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
683  activeSources[1]->handle(c_ptr2);
684  future2 = c_ptr2->get_future();
685  }
686  if (!req1->empty() && !req2->empty()) {
687  std::future<IOSize> task = std::async(
688  std::launch::deferred,
689  [](std::future<IOSize> a, std::future<IOSize> b) {
690  // Wait until *both* results are available. This is essential
691  // as the callback may try referencing the RequestManager. If one
692  // throws an exception (causing the RequestManager to be destroyed by
693  // XrdFile) and the other has a failure, then the recovery code will
694  // reference the destroyed RequestManager.
695  //
696  // Unlike other places where we use shared/weak ptrs to maintain object
697  // lifetime and destruction asynchronously, we *cannot* destroy the request
698  // asynchronously as it is associated with a ROOT buffer. We must wait until we
699  // are guaranteed that XrdCl will not write into the ROOT buffer before we
700  // can return.
701  b.wait();
702  a.wait();
703  return b.get() + a.get();
704  },
705  std::move(future1),
706  std::move(future2));
707  timer.stop();
708  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
709  return task;
710  } else if (!req1->empty()) {
711  return future1;
712  } else if (!req2->empty()) {
713  return future2;
714  } else { // Degenerate case - no bytes to read.
715  std::promise<IOSize> p;
716  p.set_value(0);
717  return p.get_future();
718  }
719 }
void start()
Definition: CPUTimer.cc:68
#define GET_CLOCK_MONOTONIC(ts)
assert(be >=bs)
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
std::vector< std::shared_ptr< Source > > m_inactiveSources
def move
Definition: eostools.py:511
Times stop()
Definition: CPUTimer.cc:87
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:118
double a
Definition: hdecay.h:119
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 528 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, cms::cuda::assert(), checkSources(), GET_CLOCK_MONOTONIC, m_activeSources, m_inactiveSources, m_source_mutex, eostools::move(), submitPVValidationJobs::now, pickSingleSource(), and source.

528  {
529  assert(c_ptr.get());
530  timespec now;
531  GET_CLOCK_MONOTONIC(now);
532  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
533  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
534  {
535  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
536  activeSources = m_activeSources;
537  inactiveSources = m_inactiveSources;
538  }
539  {
540  //make sure we update values before calling pickSingelSource
541  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
542  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
543  m_activeSources = std::move(activeSources);
544  m_inactiveSources = std::move(inactiveSources);
545  });
546 
547  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
548  }
549 
550  std::shared_ptr<Source> source = pickSingleSource();
551  source->handle(c_ptr);
552  return c_ptr->get_future();
553 }
#define GET_CLOCK_MONOTONIC(ts)
assert(be >=bs)
std::vector< std::shared_ptr< Source > > m_inactiveSources
def move
Definition: eostools.py:511
std::vector< std::shared_ptr< Source > > m_activeSources
std::shared_ptr< Source > pickSingleSource()
static std::string const source
Definition: EdmProvDump.cc:46
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 582 of file XrdRequestManager.cc.

References alignCSCRings::s, XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

582  {
583  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
584  if (status.IsOK()) {
585  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
586  for (const auto &s : m_activeSources) {
587  if (source->ID() == s->ID()) {
588  edm::LogVerbatim("XrdAdaptorInternal")
589  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
590  unsigned returned_count = ++m_excluded_active_count;
592  if (returned_count >= 3) {
594  }
595  return;
596  }
597  }
598  for (const auto &s : m_inactiveSources) {
599  if (source->ID() == s->ID()) {
600  edm::LogVerbatim("XrdAdaptorInternal")
601  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
603  return;
604  }
605  }
606  if (m_activeSources.size() < 2) {
607  auto oldSources = m_activeSources;
608  m_activeSources.push_back(source);
609  reportSiteChange(oldSources, m_activeSources);
611  } else {
612  m_inactiveSources.push_back(source);
613  }
614  } else { // File-open failure - wait at least 120s before next attempt.
615  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
617  }
618 }
Log< level::Info, true > LogVerbatim
list status
Definition: mps_update.py:107
void queueUpdateCurrentServer(const std::string &)
static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_inactiveSources
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
std::vector< std::shared_ptr< Source > > m_activeSources
static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY
std::atomic< unsigned > m_excluded_active_count
static std::string const source
Definition: EdmProvDump.cc:46
std::recursive_mutex m_source_mutex
void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 126 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), cms::cuda::assert(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, mergeVDriftHistosByStation::file, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, eostools::move(), pileupDistInMC::oldList, prepareOpaqueString(), queueUpdateCurrentServer(), reportSiteChange(), SendMonitoringInfo(), source, mps_update::status, AlCaHLTBitMon_QueryRunRegistry::string, updateCurrentServer(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

126  {
128 
129  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
130  if (env) {
131  env->GetInt("StreamErrorWindow", m_timeout);
132  }
133 
134  std::string orig_site;
135  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find('.') == std::string::npos)) {
136  std::string hostname;
137  if (Source::getHostname(orig_site, hostname)) {
138  Source::getDomain(hostname, orig_site);
139  }
140  }
141 
142  std::unique_ptr<XrdCl::File> file;
144  bool validFile = false;
145  const int retries = 5;
146  std::string excludeString;
147  for (int idx = 0; idx < retries; idx++) {
148  file = std::make_unique<XrdCl::File>();
149  auto opaque = prepareOpaqueString();
150  std::string new_filename =
151  m_name + (!opaque.empty() ? ((m_name.find('?') == m_name.npos) ? "?" : "&") + opaque : "");
152  SyncHostResponseHandler handler;
153  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
154  if (!openStatus
155  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
156  // server and it was marked bad - xrootd couldn't even queue up the request internally!
157  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
158  // redirector being down or authentication failures.
159  ex.clearMessage();
160  ex.clearContext();
161  ex.clearAdditionalInfo();
162  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
163  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
164  << ", code=" << openStatus.code << ")";
165  ex.addContext("Calling XrdFile::open()");
166  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
167  throw ex;
168  }
169  handler.WaitForResponse();
170  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
171  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
172  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
173  assert(status);
174  if (status->IsOK()) {
175  validFile = true;
176  break;
177  } else {
178  ex.clearMessage();
179  ex.clearContext();
180  ex.clearAdditionalInfo();
181  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
182  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
183  << ", code=" << status->code << ")";
184  ex.addContext("Calling XrdFile::open()");
185  addConnections(ex);
186  std::string dataServer, lastUrl;
187  file->GetProperty("DataServer", dataServer);
188  file->GetProperty("LastURL", lastUrl);
189  if (!dataServer.empty()) {
190  ex.addAdditionalInfo("Problematic data server: " + dataServer);
191  }
192  if (!lastUrl.empty()) {
193  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
194  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
195  }
196  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
197  m_disabledSourceStrings.end()) {
198  ex << ". No additional data servers were found.";
199  throw ex;
200  }
201  if (!dataServer.empty()) {
202  m_disabledSourceStrings.insert(dataServer);
203  m_disabledExcludeStrings.insert(excludeString);
204  }
205  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
206  if (lastUrl == new_filename) {
207  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
208  throw ex;
209  }
210  }
211  }
212  if (!validFile) {
213  throw ex;
214  }
215  SendMonitoringInfo(*file);
216 
217  timespec ts;
219 
220  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
221  {
222  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
223  auto oldList = m_activeSources;
224  m_activeSources.push_back(source);
226  }
229 
230  m_lastSourceCheck = ts;
231  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
233 }
#define GET_CLOCK_MONOTONIC(ts)
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:288
list status
Definition: mps_update.py:107
std::string prepareOpaqueString() const
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
assert(be >=bs)
static void SendMonitoringInfo(XrdCl::File &file)
void addConnections(cms::Exception &) const
void queueUpdateCurrentServer(const std::string &)
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:246
def move
Definition: eostools.py:511
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:326
Log< level::Warning, false > LogWarning
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:219
static std::string const source
Definition: EdmProvDump.cc:46
std::recursive_mutex m_source_mutex
RequestManager& XrdAdaptor::RequestManager::operator= ( const RequestManager )
delete
std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 502 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_nextInitialSourceToggle, m_perms, m_source_mutex, and source.

Referenced by handle().

502  {
503  std::shared_ptr<Source> source = nullptr;
504  {
505  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
506  if (m_activeSources.size() == 2) {
508  source = m_activeSources[0];
510  } else {
511  source = m_activeSources[1];
513  }
514  } else if (m_activeSources.empty()) {
516  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
517  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
518  ex.addContext("In XrdAdaptor::RequestManager::handle()");
519  addConnections(ex);
520  throw ex;
521  } else {
522  source = m_activeSources[0];
523  }
524  }
525  return source;
526 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static std::string const source
Definition: EdmProvDump.cc:46
std::recursive_mutex m_source_mutex
std::string RequestManager::prepareOpaqueString ( ) const
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 555 of file XrdRequestManager.cc.

References submitPVResolutionJobs::count, m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, contentValuesCheck::ss, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

555  {
556  std::stringstream ss;
557  ss << "tried=";
558  size_t count = 0;
559  {
560  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
561 
562  for (const auto &it : m_activeSources) {
563  count++;
564  ss << it->ExcludeID().substr(0, it->ExcludeID().find(':')) << ",";
565  }
566  for (const auto &it : m_inactiveSources) {
567  count++;
568  ss << it->ExcludeID().substr(0, it->ExcludeID().find(':')) << ",";
569  }
570  }
571  for (const auto &it : m_disabledExcludeStrings) {
572  count++;
573  ss << it.substr(0, it.find(':')) << ",";
574  }
575  if (count) {
576  std::string tmp_str = ss.str();
577  return tmp_str.substr(0, tmp_str.size() - 1);
578  }
579  return "";
580 }
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::recursive_mutex m_source_mutex
void RequestManager::queueUpdateCurrentServer ( const std::string &  id)
private

Definition at line 259 of file XrdRequestManager.cc.

References XrdAdaptor::Source::getHostname(), gpuClustering::id, m_serverToAdvertise, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize().

259  {
260  auto hostname = std::make_unique<std::string>(id);
261  if (Source::getHostname(id, *hostname)) {
262  std::string *null_hostname = nullptr;
263  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
264  hostname.release();
265  }
266  }
267 }
uint16_t *__restrict__ id
std::atomic< std::string * > m_serverToAdvertise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:219
void RequestManager::reportSiteChange ( std::vector< std::shared_ptr< Source >> const &  iOld,
std::vector< std::shared_ptr< Source >> const &  iNew,
std::string  orig_site = std::string{} 
) const
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 286 of file XrdRequestManager.cc.

Referenced by checkSourcesImpl(), compareSources(), initialize(), and requestFailure().

288  {
289  auto siteList = formatSites(iNew);
290  if (!orig_site.empty() && (orig_site != siteList)) {
291  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
292  } else {
293  auto oldSites = formatSites(iOld);
294  if (orig_site.empty() && (siteList != oldSites)) {
295  if (!oldSites.empty())
296  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
297  }
298  }
299 }
Log< level::Warning, false > LogWarning
void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 721 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, submitPVValidationJobs::now, reportSiteChange(), seconds(), mps_update::status, and mps_check::timeout.

721  {
722  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
723 
724  // Fail early for invalid responses - XrdFile has a separate path for handling this.
725  if (c_status.code == XrdCl::errInvalidResponse) {
726  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
728  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
729  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
730  << ") => Invalid ReadV response from server";
731  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
732  addConnections(ex);
733  throw ex;
734  }
735  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
736 
737  // Note that we do not delete the Source itself. That is because this
738  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
739  // In such a case, if you close a file in the handler, it will deadlock
740  m_disabledSourceStrings.insert(source_ptr->ID());
741  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
742  m_disabledSources.insert(source_ptr);
743 
744  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
745  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
746  auto oldSources = m_activeSources;
747  m_activeSources.erase(m_activeSources.begin());
748  reportSiteChange(oldSources, m_activeSources);
749  } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
750  auto oldSources = m_activeSources;
751  m_activeSources.erase(m_activeSources.begin() + 1);
752  reportSiteChange(oldSources, m_activeSources);
753  }
754  std::shared_ptr<Source> new_source;
755  if (m_activeSources.empty()) {
756  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
757  timespec now;
758  GET_CLOCK_MONOTONIC(now);
760  // Note we only wait for 180 seconds here. This is because we've already failed
761  // once and the likelihood the program has some inconsistent state is decent.
762  // We'd much rather fail hard than deadlock!
763  sentry.unlock();
764  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
765  if (status == std::future_status::timeout) {
767  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
768  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
769  << ") => timeout when waiting for file open";
770  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
771  addConnections(ex);
772  throw ex;
773  } else {
774  try {
775  new_source = future.get();
776  } catch (edm::Exception &ex) {
777  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
778  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
779  throw;
780  }
781  }
782 
783  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
784  m_disabledSourceStrings.end()) {
785  // The server gave us back a data node we requested excluded. Fatal!
787  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
788  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
789  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
790  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
791  addConnections(ex);
792  throw ex;
793  }
794  sentry.lock();
795 
796  auto oldSources = m_activeSources;
797  m_activeSources.push_back(new_source);
798  reportSiteChange(oldSources, m_activeSources);
799  } else {
800  new_source = m_activeSources[0];
801  }
802  new_source->handle(c_ptr);
803 }
double seconds()
#define GET_CLOCK_MONOTONIC(ts)
list status
Definition: mps_update.py:107
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
int timeout
Definition: mps_check.py:53
void addConnections(cms::Exception &) const
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
Definition: Exception.cc:165
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
Log< level::Warning, false > LogWarning
std::recursive_mutex m_source_mutex
void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2,
std::vector< std::shared_ptr< Source >> const &  activeSources 
) const
private

Given a client request, split it into two requests lists.

Definition at line 897 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, cms::Exception::addAdditionalInfo(), cms::Exception::addContext(), cms::cuda::assert(), consumeChunkBack(), consumeChunkFront(), TauDecayModes::dec, edm::errors::FileReadError, SiStripPI::max, edm::storage::IOPosBuffer::offset(), contentValuesCheck::ss, validateList(), XRD_ADAPTOR_CHUNK_THRESHOLD, and XRD_CL_MAX_CHUNK.

900  {
901  if (iolist.empty())
902  return;
903  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
904  req1.reserve(iolist.size() / 2 + 1);
905  req2.reserve(iolist.size() / 2 + 1);
906  size_t front = 0;
907 
908  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
909  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
910  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
911  IOSize chunk1, chunk2;
912  // Make sure the chunk size is at least 1024; little point to reads less than that size.
913  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
914  static_cast<IOSize>(1024));
915  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
916  static_cast<IOSize>(1024));
917 
918  IOSize size_orig = 0;
919  for (const auto &it : iolist)
920  size_orig += it.size();
921 
922  while (tmp_iolist.size() - front > 0) {
923  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
924  (req2.size() >=
925  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
926  // are passed to the request manager. However, because we have a max chunk size, we increase
927  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
928  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
929  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
931  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
932  << ", permissions=0" << std::oct << m_perms << std::dec
933  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
934  "reported to CMSSW developers.";
935  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
936  addConnections(ex);
937  std::stringstream ss;
938  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
939  ex.addAdditionalInfo(ss.str());
940  std::stringstream ss2;
941  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
942  ex.addAdditionalInfo(ss2.str());
943  throw ex;
944  }
945  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
946  consumeChunkFront(front, tmp_iolist, req1, chunk1);
947  }
948  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
949  consumeChunkBack(front, tmp_iolist, req2, chunk2);
950  }
951  }
952  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
953  return left.offset() < right.offset();
954  });
955  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
956  return left.offset() < right.offset();
957  });
958 
959  IOSize size1 = validateList(req1);
960  IOSize size2 = validateList(req2);
961 
962  assert(size_orig == size1 + size2);
963 
964  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
965  << " bytes) split into requests size " << req1.size() << " (" << size1
966  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
967 }
Log< level::Info, true > LogVerbatim
static constexpr int XRD_CL_MAX_CHUNK
assert(be >=bs)
void addConnections(cms::Exception &) const
size_t IOSize
Definition: IOTypes.h:15
XrdCl::OpenFlags::Flags m_flags
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
XrdCl::Access::Mode m_perms
static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD
static IOSize validateList(const std::vector< IOPosBuffer > req)
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
IOOffset offset() const
Definition: IOPosBuffer.h:41
void RequestManager::updateCurrentServer ( )
inlineprivate

Update the StatisticsSenderService, if necessary, with the current server.

Update the StatisticsSenderService with the current server info.

As this accesses the edm::Service infrastructure, this MUST be called from an edm-managed thread. It CANNOT be called from an Xrootd-managed thread.

Definition at line 242 of file XrdRequestManager.cc.

References edm::Service< T >::isAvailable(), LIKELY, m_name, m_serverToAdvertise, edm::storage::StatisticsSenderService::setCurrentServer(), statsService, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize().

242  {
243  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
244  // a pending update. *However*, since we call this for every read, we'll get it
245  // eventually.
246  if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
247  return;
248  }
249  std::string *hostname_ptr;
250  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
251  std::unique_ptr<std::string> hostname(hostname_ptr);
253  if (statsService.isAvailable()) {
254  statsService->setCurrentServer(m_name, *hostname_ptr);
255  }
256  }
257 }
#define LIKELY(x)
Definition: Likely.h:20
bool isAvailable() const
Definition: Service.h:40
XrdSiteStatisticsInformation * statsService
Definition: XrdSource.cc:213
std::atomic< std::string * > m_serverToAdvertise
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)

Member Data Documentation

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 212 of file XrdRequestManager.h.

Referenced by getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), handle(), initialize(), pickSingleSource(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 216 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 217 of file XrdRequestManager.h.

Referenced by requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 215 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 237 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 239 of file XrdRequestManager.h.

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private
std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 236 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 213 of file XrdRequestManager.h.

Referenced by handle(), and prepareOpaqueString().

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 223 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

const std::string XrdAdaptor::RequestManager::m_name
private
timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 228 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), and initialize().

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 226 of file XrdRequestManager.h.

Referenced by pickSingleSource().

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 298 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private
std::atomic<std::string *> XrdAdaptor::RequestManager::m_serverToAdvertise
private

Definition at line 221 of file XrdRequestManager.h.

Referenced by queueUpdateCurrentServer(), and updateCurrentServer().

std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
mutableprivate
int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 224 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 229 of file XrdRequestManager.h.

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3 * 60
static

Definition at line 53 of file XrdRequestManager.h.