CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Inheritance diagram for XrdAdaptor::RequestManager:

Classes

class  OpenHandler
 

Public Member Functions

void addConnections (cms::Exception &) const
 
std::shared_ptr< XrdCl::File > getActiveFile () const
 
void getActiveSourceNames (std::vector< std::string > &sources) const
 
void getDisabledSourceNames (std::vector< std::string > &sources) const
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources) const
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer >> iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
virtual ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr< RequestManagergetInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
void checkSourcesImpl (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
bool compareSources (const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString () const
 
void queueUpdateCurrentServer (const std::string &)
 
void reportSiteChange (std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
 
void updateCurrentServer ()
 

Private Attributes

std::vector< std::shared_ptr< Source > > m_activeSources
 
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
 
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHashm_disabledSources
 
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
 
std::uniform_real_distribution< float > m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr< Source > > m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
std::atomic< std::string * > m_serverToAdvertise
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 45 of file XrdRequestManager.h.

Constructor & Destructor Documentation

virtual XrdAdaptor::RequestManager::~RequestManager ( )
virtualdefault
RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 108 of file XrdRequestManager.cc.

109  : m_serverToAdvertise(nullptr),
112  m_name(filename),
113  m_flags(flags),
114  m_perms(perms),
115  m_distribution(0, 100),
std::uniform_real_distribution< float > m_distribution
static const unsigned int XRD_DEFAULT_TIMEOUT
std::atomic< std::string * > m_serverToAdvertise
XrdCl::OpenFlags::Flags m_flags
XrdCl::Access::Mode m_perms
std::atomic< unsigned > m_excluded_active_count

Member Function Documentation

void RequestManager::addConnections ( cms::Exception ex) const

Add the list of active connections to the exception extra info.

Definition at line 481 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), source, and CalibrationSummaryClient_cfi::sources.

Referenced by getActiveFile(), handle(), initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), pickSingleSource(), requestFailure(), and splitClientRequest().

481  {
482  std::vector<std::string> sources;
484  for (auto const &source : sources) {
485  ex.addAdditionalInfo("Active source: " + source);
486  }
487  sources.clear();
488  getDisabledSourceNames(sources);
489  for (auto const &source : sources) {
490  ex.addAdditionalInfo("Disabled source: " + source);
491  }
492 }
void getDisabledSourceNames(std::vector< std::string > &sources) const
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
static std::string const source
Definition: EdmProvDump.cc:47
void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 293 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, and timeDiffMS().

Referenced by handle().

296  {
297  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
298  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
299  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
300  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
301  { // Be more aggressive about getting rid of very bad sources.
302  compareSources(now, 0, 1, activeSources, inactiveSources);
303  compareSources(now, 1, 0, activeSources, inactiveSources);
304  }
306  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
307  }
308  }
309 }
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Definition at line 339 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, compareSources(), m_distribution, m_generator, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, eostools::move(), fileCollector::now, alignCSCRings::r, reportSiteChange(), indexGen::s2, source, timeDiffMS(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

342  {
343  bool findNewSource = false;
344  if (activeSources.size() <= 1) {
345  findNewSource = true;
346  } else if (activeSources.size() > 1) {
347  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
348  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
349  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
350  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
351 
352  // NOTE: We could probably replace the copy with a better sort function.
353  // However, there are typically very few sources and the correctness is more obvious right now.
354  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
355  eligibleInactiveSources.reserve(inactiveSources.size());
356  for (const auto &source : inactiveSources) {
357  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
358  eligibleInactiveSources.push_back(source);
359  }
360  }
361  auto bestInactiveSource =
362  std::min_element(eligibleInactiveSources.begin(),
363  eligibleInactiveSources.end(),
364  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
365  return s1->getQuality() < s2->getQuality();
366  });
367  auto worstActiveSource = std::max_element(activeSources.cbegin(),
368  activeSources.cend(),
369  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
370  return s1->getQuality() < s2->getQuality();
371  });
372  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
373  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
374  << ", quality " << (*bestInactiveSource)->getQuality();
375  }
376  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
377  << ", quality " << (*worstActiveSource)->getQuality();
378  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
379  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
380  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
381  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
382  auto oldSources = activeSources;
383  activeSources.push_back(*bestInactiveSource);
384  reportSiteChange(oldSources, activeSources);
385  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
386  if (it->get() == bestInactiveSource->get()) {
387  inactiveSources.erase(it);
388  break;
389  }
390  } else
391  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
392  (*worstActiveSource)->getQuality() >
393  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
394  edm::LogVerbatim("XrdAdaptorInternal")
395  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
396  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
397  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
398  (*worstActiveSource)->setLastDowngrade(now);
399  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
400  if (it->get() == bestInactiveSource->get()) {
401  inactiveSources.erase(it);
402  break;
403  }
404  inactiveSources.emplace_back(std::move(*worstActiveSource));
405  auto oldSources = activeSources;
406  activeSources.erase(worstActiveSource);
407  activeSources.emplace_back(std::move(*bestInactiveSource));
408  reportSiteChange(oldSources, activeSources);
409  eligibleInactiveSources.clear();
410  for (const auto &source : inactiveSources)
411  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
412  eligibleInactiveSources.push_back(source);
413  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
414  eligibleInactiveSources.end(),
415  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
416  return s1->getQuality() < s2->getQuality();
417  });
418  worstActiveSource = std::max_element(activeSources.begin(),
419  activeSources.end(),
420  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
421  return s1->getQuality() < s2->getQuality();
422  });
423  }
424  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
425  float r = m_distribution(m_generator);
427  findNewSource = true;
428  }
429  }
430  }
431  if (findNewSource) {
432  m_open_handler->open();
434  }
435 
436  // Only aggressively look for new sources if we don't have two.
437  if (activeSources.size() == 2) {
439  } else {
441  }
443 }
std::uniform_real_distribution< float > m_distribution
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_ptr< OpenHandler > m_open_handler
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
#define XRD_ADAPTOR_LONG_OPEN_DELAY
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
static std::string const source
Definition: EdmProvDump.cc:47
def move(src, dest)
Definition: eostools.py:511
bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
) const
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 311 of file XrdRequestManager.cc.

References a, CalibrationSummaryClient_cfi::activeSources, b, SiStripPI::max, and reportSiteChange().

Referenced by checkSources(), and checkSourcesImpl().

315  {
316  if (activeSources.size() < std::max(a, b) + 1) {
317  return false;
318  }
319 
320  bool findNewSource = false;
321  if ((activeSources[a]->getQuality() > 5130) ||
322  ((activeSources[a]->getQuality() > 260) &&
323  (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
324  edm::LogVerbatim("XrdAdaptorInternal")
325  << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
326  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
327  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
328  findNewSource = true;
329  }
330  activeSources[a]->setLastDowngrade(now);
331  inactiveSources.emplace_back(activeSources[a]);
332  auto oldSources = activeSources;
333  activeSources.erase(activeSources.begin() + a);
334  reportSiteChange(oldSources, activeSources);
335  }
336  return findNewSource;
337 }
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
double b
Definition: hdecay.h:118
double a
Definition: hdecay.h:119
std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  ) const

Return a pointer to an active file. Useful for metadata operations.

Definition at line 445 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_perms, and m_source_mutex.

445  {
446  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
447  if (m_activeSources.empty()) {
449  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
450  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
451  ex.addContext("In XrdAdaptor::RequestManager::handle()");
452  addConnections(ex);
453  throw ex;
454  }
455  return m_activeSources[0]->getFileHandle();
456 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
std::recursive_mutex m_source_mutex
void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 458 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

458  {
459  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
460  sources.reserve(m_activeSources.size());
461  for (auto const &source : m_activeSources) {
462  sources.push_back(source->ID());
463  }
464 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:47
std::recursive_mutex m_source_mutex
void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 474 of file XrdRequestManager.cc.

References m_disabledSourceStrings, and source.

Referenced by addConnections().

474  {
475  sources.reserve(m_disabledSourceStrings.size());
476  for (auto const &source : m_disabledSourceStrings) {
477  sources.push_back(source);
478  }
479 }
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static std::string const source
Definition: EdmProvDump.cc:47
const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 101 of file XrdRequestManager.h.

101 { return m_name; }
static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 110 of file XrdRequestManager.h.

References a, CalibrationSummaryClient_cfi::activeSources, b, corrVsCorr::filename, HLT_2018_cff::flags, initialize(), instance, fileCollector::now, mps_update::status, and AlCaHLTBitMon_QueryRunRegistry::string.

112  {
113  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
114  instance->initialize(instance);
115  return instance;
116  }
static PFTauRenderPlugin instance
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources) const

Definition at line 466 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

Referenced by addConnections().

466  {
467  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
468  sources.reserve(m_activeSources.size());
469  for (auto const &source : m_activeSources) {
470  sources.push_back(source->PrettyID());
471  }
472 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:47
std::recursive_mutex m_source_mutex
std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 54 of file XrdRequestManager.h.

References patZpeak::handle, findQualityFiles::size, CalibrationSummaryClient_cfi::sources, and btagGenBb_cfi::Status.

54  {
55  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
56  return handle(c_ptr);
57  }
size
Write out results.
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::future< IOSize > XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer >>  iolist)

Definition at line 612 of file XrdRequestManager.cc.

References a, CalibrationSummaryClient_cfi::activeSources, addConnections(), cms::Exception::addContext(), b, checkSources(), TauDecayModes::dec, edm::errors::FileReadError, GET_CLOCK_MONOTONIC, m_activeSources, m_flags, m_inactiveSources, m_name, m_perms, m_source_mutex, eostools::move(), fileCollector::now, AlCaHLTBitMon_ParallelJobs::p, splitClientRequest(), edm::CPUTimer::start(), edm::CPUTimer::stop(), TrackValidation_cff::task, and updateCurrentServer().

612  {
613  //Use a copy of m_activeSources and m_inactiveSources throughout this function
614  // in order to avoid holding the lock a long time and causing a deadlock.
615  // When the function is over we will update the values of the containers
616  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
617  {
618  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
619  activeSources = m_activeSources;
620  inactiveSources = m_inactiveSources;
621  }
622  //Make sure we update changes when we leave the function
623  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
624  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
625  m_activeSources = std::move(activeSources);
626  m_inactiveSources = std::move(inactiveSources);
627  });
628 
630 
631  timespec now;
632  GET_CLOCK_MONOTONIC(now);
633 
634  edm::CPUTimer timer;
635  timer.start();
636 
637  if (activeSources.size() == 1) {
638  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
639  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
640  activeSources[0]->handle(c_ptr);
641  return c_ptr->get_future();
642  }
643  // Make sure active
644  else if (activeSources.empty()) {
646  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
647  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
648  ex.addContext("In XrdAdaptor::RequestManager::handle()");
649  addConnections(ex);
650  throw ex;
651  }
652 
653  assert(iolist.get());
654  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
655  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
656  splitClientRequest(*iolist, *req1, *req2, activeSources);
657 
658  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
659  // CheckSources may have removed a source
660  if (activeSources.size() == 1) {
661  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
662  activeSources[0]->handle(c_ptr);
663  return c_ptr->get_future();
664  }
665 
666  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
667  std::future<IOSize> future1, future2;
668  if (!req1->empty()) {
669  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
670  activeSources[0]->handle(c_ptr1);
671  future1 = c_ptr1->get_future();
672  }
673  if (!req2->empty()) {
674  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
675  activeSources[1]->handle(c_ptr2);
676  future2 = c_ptr2->get_future();
677  }
678  if (!req1->empty() && !req2->empty()) {
679  std::future<IOSize> task =
680  std::async(std::launch::deferred,
681  [](std::future<IOSize> a, std::future<IOSize> b) {
682  // Wait until *both* results are available. This is essential
683  // as the callback may try referencing the RequestManager. If one
684  // throws an exception (causing the RequestManager to be destroyed by
685  // XrdFile) and the other has a failure, then the recovery code will
686  // reference the destroyed RequestManager.
687  //
688  // Unlike other places where we use shared/weak ptrs to maintain object
689  // lifetime and destruction asynchronously, we *cannot* destroy the request
690  // asynchronously as it is associated with a ROOT buffer. We must wait until we
691  // are guaranteed that XrdCl will not write into the ROOT buffer before we
692  // can return.
693  b.wait();
694  a.wait();
695  return b.get() + a.get();
696  },
697  std::move(future1),
698  std::move(future2));
699  timer.stop();
700  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
701  return task;
702  } else if (!req1->empty()) {
703  return future1;
704  } else if (!req2->empty()) {
705  return future2;
706  } else { // Degenerate case - no bytes to read.
707  std::promise<IOSize> p;
708  p.set_value(0);
709  return p.get_future();
710  }
711 }
void start()
Definition: CPUTimer.cc:68
#define GET_CLOCK_MONOTONIC(ts)
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
std::vector< std::shared_ptr< Source > > m_inactiveSources
Times stop()
Definition: CPUTimer.cc:87
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:118
double a
Definition: hdecay.h:119
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 520 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, checkSources(), GET_CLOCK_MONOTONIC, m_activeSources, m_inactiveSources, m_source_mutex, eostools::move(), fileCollector::now, pickSingleSource(), and source.

520  {
521  assert(c_ptr.get());
522  timespec now;
523  GET_CLOCK_MONOTONIC(now);
524  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
525  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
526  {
527  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
528  activeSources = m_activeSources;
529  inactiveSources = m_inactiveSources;
530  }
531  {
532  //make sure we update values before calling pickSingelSource
533  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
534  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
535  m_activeSources = std::move(activeSources);
536  m_inactiveSources = std::move(inactiveSources);
537  });
538 
539  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
540  }
541 
542  std::shared_ptr<Source> source = pickSingleSource();
543  source->handle(c_ptr);
544  return c_ptr->get_future();
545 }
#define GET_CLOCK_MONOTONIC(ts)
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
std::shared_ptr< Source > pickSingleSource()
static std::string const source
Definition: EdmProvDump.cc:47
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 574 of file XrdRequestManager.cc.

References m_activeSources, m_excluded_active_count, m_inactiveSources, m_nextActiveSourceCheck, m_source_mutex, queueUpdateCurrentServer(), reportSiteChange(), alignCSCRings::s, XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

574  {
575  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
576  if (status.IsOK()) {
577  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
578  for (const auto &s : m_activeSources) {
579  if (source->ID() == s->ID()) {
580  edm::LogVerbatim("XrdAdaptorInternal")
581  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
582  unsigned returned_count = ++m_excluded_active_count;
584  if (returned_count >= 3) {
586  }
587  return;
588  }
589  }
590  for (const auto &s : m_inactiveSources) {
591  if (source->ID() == s->ID()) {
592  edm::LogVerbatim("XrdAdaptorInternal")
593  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
595  return;
596  }
597  }
598  if (m_activeSources.size() < 2) {
599  auto oldSources = m_activeSources;
600  m_activeSources.push_back(source);
601  reportSiteChange(oldSources, m_activeSources);
603  } else {
604  m_inactiveSources.push_back(source);
605  }
606  } else { // File-open failure - wait at least 120s before next attempt.
607  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
609  }
610 }
void queueUpdateCurrentServer(const std::string &)
std::vector< std::shared_ptr< Source > > m_inactiveSources
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
std::atomic< unsigned > m_excluded_active_count
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
static std::string const source
Definition: EdmProvDump.cc:47
std::recursive_mutex m_source_mutex
void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 118 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, FrontierConditions_GlobalTag_cff::file, VtxSmearedBeamProfile_cfi::File, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), training_settings::idx, m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, eostools::move(), pileupDistInMC::oldList, prepareOpaqueString(), queueUpdateCurrentServer(), reportSiteChange(), SendMonitoringInfo(), source, mps_update::status, AlCaHLTBitMon_QueryRunRegistry::string, updateCurrentServer(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

118  {
120 
121  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
122  if (env) {
123  env->GetInt("StreamErrorWindow", m_timeout);
124  }
125 
126  std::string orig_site;
127  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos)) {
128  std::string hostname;
129  if (Source::getHostname(orig_site, hostname)) {
130  Source::getDomain(hostname, orig_site);
131  }
132  }
133 
134  std::unique_ptr<XrdCl::File> file;
136  bool validFile = false;
137  const int retries = 5;
138  std::string excludeString;
139  for (int idx = 0; idx < retries; idx++) {
140  file.reset(new XrdCl::File());
141  auto opaque = prepareOpaqueString();
142  std::string new_filename =
143  m_name + (!opaque.empty() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
144  SyncHostResponseHandler handler;
145  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
146  if (!openStatus
147  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
148  // server and it was marked bad - xrootd couldn't even queue up the request internally!
149  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
150  // redirector being down or authentication failures.
151  ex.clearMessage();
152  ex.clearContext();
153  ex.clearAdditionalInfo();
154  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
155  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
156  << ", code=" << openStatus.code << ")";
157  ex.addContext("Calling XrdFile::open()");
158  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
159  throw ex;
160  }
161  handler.WaitForResponse();
162  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
163  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
164  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
165  assert(status);
166  if (status->IsOK()) {
167  validFile = true;
168  break;
169  } else {
170  ex.clearMessage();
171  ex.clearContext();
172  ex.clearAdditionalInfo();
173  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
174  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
175  << ", code=" << status->code << ")";
176  ex.addContext("Calling XrdFile::open()");
177  addConnections(ex);
178  std::string dataServer, lastUrl;
179  file->GetProperty("DataServer", dataServer);
180  file->GetProperty("LastURL", lastUrl);
181  if (!dataServer.empty()) {
182  ex.addAdditionalInfo("Problematic data server: " + dataServer);
183  }
184  if (!lastUrl.empty()) {
185  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
186  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
187  }
188  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
189  m_disabledSourceStrings.end()) {
190  ex << ". No additional data servers were found.";
191  throw ex;
192  }
193  if (!dataServer.empty()) {
194  m_disabledSourceStrings.insert(dataServer);
195  m_disabledExcludeStrings.insert(excludeString);
196  }
197  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
198  if (lastUrl == new_filename) {
199  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
200  throw ex;
201  }
202  }
203  }
204  if (!validFile) {
205  throw ex;
206  }
207  SendMonitoringInfo(*file);
208 
209  timespec ts;
211 
212  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
213  {
214  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
215  auto oldList = m_activeSources;
216  m_activeSources.push_back(source);
218  }
221 
222  m_lastSourceCheck = ts;
223  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
225 }
#define GET_CLOCK_MONOTONIC(ts)
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:283
std::string prepareOpaqueString() const
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
static void SendMonitoringInfo(XrdCl::File &file)
void addConnections(cms::Exception &) const
void queueUpdateCurrentServer(const std::string &)
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:241
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:321
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:214
static std::string const source
Definition: EdmProvDump.cc:47
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 494 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_nextInitialSourceToggle, m_perms, m_source_mutex, and source.

Referenced by handle().

494  {
495  std::shared_ptr<Source> source = nullptr;
496  {
497  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
498  if (m_activeSources.size() == 2) {
500  source = m_activeSources[0];
502  } else {
503  source = m_activeSources[1];
505  }
506  } else if (m_activeSources.empty()) {
508  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
509  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
510  ex.addContext("In XrdAdaptor::RequestManager::handle()");
511  addConnections(ex);
512  throw ex;
513  } else {
514  source = m_activeSources[0];
515  }
516  }
517  return source;
518 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static std::string const source
Definition: EdmProvDump.cc:47
std::recursive_mutex m_source_mutex
std::string RequestManager::prepareOpaqueString ( ) const
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 547 of file XrdRequestManager.cc.

References KineDebug3::count(), m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, contentValuesCheck::ss, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

547  {
548  std::stringstream ss;
549  ss << "tried=";
550  size_t count = 0;
551  {
552  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
553 
554  for (const auto &it : m_activeSources) {
555  count++;
556  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
557  }
558  for (const auto &it : m_inactiveSources) {
559  count++;
560  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
561  }
562  }
563  for (const auto &it : m_disabledExcludeStrings) {
564  count++;
565  ss << it.substr(0, it.find(":")) << ",";
566  }
567  if (count) {
568  std::string tmp_str = ss.str();
569  return tmp_str.substr(0, tmp_str.size() - 1);
570  }
571  return "";
572 }
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::recursive_mutex m_source_mutex
void RequestManager::queueUpdateCurrentServer ( const std::string &  id)
private

Definition at line 251 of file XrdRequestManager.cc.

References XrdAdaptor::Source::getHostname(), triggerObjects_cff::id, m_serverToAdvertise, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by handleOpen(), and initialize().

251  {
252  auto hostname = std::make_unique<std::string>(id);
253  if (Source::getHostname(id, *hostname)) {
254  std::string *null_hostname = nullptr;
255  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
256  hostname.release();
257  }
258  }
259 }
std::atomic< std::string * > m_serverToAdvertise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:214
void RequestManager::reportSiteChange ( std::vector< std::shared_ptr< Source >> const &  iOld,
std::vector< std::shared_ptr< Source >> const &  iNew,
std::string  orig_site = std::string{} 
) const
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 278 of file XrdRequestManager.cc.

Referenced by checkSourcesImpl(), compareSources(), handleOpen(), initialize(), and requestFailure().

280  {
281  auto siteList = formatSites(iNew);
282  if (!orig_site.empty() && (orig_site != siteList)) {
283  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
284  } else {
285  auto oldSites = formatSites(iOld);
286  if (orig_site.empty() && (siteList != oldSites)) {
287  if (!oldSites.empty())
288  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
289  }
290  }
291 }
void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 713 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, fileCollector::now, reportSiteChange(), seconds(), mps_update::status, and mps_check::timeout.

713  {
714  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
715 
716  // Fail early for invalid responses - XrdFile has a separate path for handling this.
717  if (c_status.code == XrdCl::errInvalidResponse) {
718  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
720  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
721  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
722  << ") => Invalid ReadV response from server";
723  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
724  addConnections(ex);
725  throw ex;
726  }
727  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
728 
729  // Note that we do not delete the Source itself. That is because this
730  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
731  // In such a case, if you close a file in the handler, it will deadlock
732  m_disabledSourceStrings.insert(source_ptr->ID());
733  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
734  m_disabledSources.insert(source_ptr);
735 
736  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
737  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
738  auto oldSources = m_activeSources;
739  m_activeSources.erase(m_activeSources.begin());
740  reportSiteChange(oldSources, m_activeSources);
741  } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
742  auto oldSources = m_activeSources;
743  m_activeSources.erase(m_activeSources.begin() + 1);
744  reportSiteChange(oldSources, m_activeSources);
745  }
746  std::shared_ptr<Source> new_source;
747  if (m_activeSources.empty()) {
748  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
749  timespec now;
750  GET_CLOCK_MONOTONIC(now);
752  // Note we only wait for 180 seconds here. This is because we've already failed
753  // once and the likelihood the program has some inconsistent state is decent.
754  // We'd much rather fail hard than deadlock!
755  sentry.unlock();
756  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
757  if (status == std::future_status::timeout) {
759  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
760  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
761  << ") => timeout when waiting for file open";
762  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
763  addConnections(ex);
764  throw ex;
765  } else {
766  try {
767  new_source = future.get();
768  } catch (edm::Exception &ex) {
769  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
770  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
771  throw;
772  }
773  }
774 
775  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
776  m_disabledSourceStrings.end()) {
777  // The server gave us back a data node we requested excluded. Fatal!
779  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
780  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
781  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
782  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
783  addConnections(ex);
784  throw ex;
785  }
786  sentry.lock();
787 
788  auto oldSources = m_activeSources;
789  m_activeSources.push_back(new_source);
790  reportSiteChange(oldSources, m_activeSources);
791  } else {
792  new_source = m_activeSources[0];
793  }
794  new_source->handle(c_ptr);
795 }
double seconds()
#define GET_CLOCK_MONOTONIC(ts)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:19
int timeout
Definition: mps_check.py:53
void addConnections(cms::Exception &) const
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
Definition: Exception.cc:165
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::recursive_mutex m_source_mutex
void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2,
std::vector< std::shared_ptr< Source >> const &  activeSources 
) const
private

Given a client request, split it into two requests lists.

Definition at line 889 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), consumeChunkBack(), consumeChunkFront(), TauDecayModes::dec, edm::errors::FileReadError, m_flags, m_name, m_perms, SiStripPI::max, IOPosBuffer::offset(), q1, q2, contentValuesCheck::ss, validateList(), XRD_ADAPTOR_CHUNK_THRESHOLD, and XRD_CL_MAX_CHUNK.

Referenced by handle().

892  {
893  if (iolist.empty())
894  return;
895  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
896  req1.reserve(iolist.size() / 2 + 1);
897  req2.reserve(iolist.size() / 2 + 1);
898  size_t front = 0;
899 
900  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
901  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
902  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
903  IOSize chunk1, chunk2;
904  // Make sure the chunk size is at least 1024; little point to reads less than that size.
905  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
906  static_cast<IOSize>(1024));
907  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
908  static_cast<IOSize>(1024));
909 
910  IOSize size_orig = 0;
911  for (const auto &it : iolist)
912  size_orig += it.size();
913 
914  while (tmp_iolist.size() - front > 0) {
915  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
916  (req2.size() >=
917  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
918  // are passed to the request manager. However, because we have a max chunk size, we increase
919  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
920  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
921  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
923  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
924  << ", permissions=0" << std::oct << m_perms << std::dec
925  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
926  "reported to CMSSW developers.";
927  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
928  addConnections(ex);
929  std::stringstream ss;
930  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
931  ex.addAdditionalInfo(ss.str());
932  std::stringstream ss2;
933  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
934  ex.addAdditionalInfo(ss2.str());
935  throw ex;
936  }
937  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
938  consumeChunkFront(front, tmp_iolist, req1, chunk1);
939  }
940  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
941  consumeChunkBack(front, tmp_iolist, req2, chunk2);
942  }
943  }
944  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
945  return left.offset() < right.offset();
946  });
947  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
948  return left.offset() < right.offset();
949  });
950 
951  IOSize size1 = validateList(req1);
952  IOSize size2 = validateList(req2);
953 
954  assert(size_orig == size1 + size2);
955 
956  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
957  << " bytes) split into requests size " << req1.size() << " (" << size1
958  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
959 }
#define XRD_CL_MAX_CHUNK
double q2[4]
Definition: TauolaWrapper.h:88
#define XRD_ADAPTOR_CHUNK_THRESHOLD
void addConnections(cms::Exception &) const
IOOffset offset(void) const
Definition: IOPosBuffer.h:39
XrdCl::OpenFlags::Flags m_flags
double q1[4]
Definition: TauolaWrapper.h:87
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
XrdCl::Access::Mode m_perms
static IOSize validateList(const std::vector< IOPosBuffer > req)
size_t IOSize
Definition: IOTypes.h:14
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void RequestManager::updateCurrentServer ( )
inlineprivate

Update the StatisticsSenderService, if necessary, with the current server.

Update the StatisticsSenderService with the current server info.

As this accesses the edm::Service infrastructure, this MUST be called from an edm-managed thread. It CANNOT be called from an Xrootd-managed thread.

Definition at line 234 of file XrdRequestManager.cc.

References edm::Service< T >::isAvailable(), LIKELY, m_serverToAdvertise, edm::storage::StatisticsSenderService::setCurrentServer(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by handle(), and initialize().

234  {
235  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
236  // a pending update. *However*, since we call this for every read, we'll get it
237  // eventually.
238  if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
239  return;
240  }
241  std::string *hostname_ptr;
242  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
243  std::unique_ptr<std::string> hostname(hostname_ptr);
245  if (statsService.isAvailable()) {
246  statsService->setCurrentServer(*hostname_ptr);
247  }
248  }
249 }
void setCurrentServer(const std::string &servername)
#define LIKELY(x)
Definition: Likely.h:20
bool isAvailable() const
Definition: Service.h:40
std::atomic< std::string * > m_serverToAdvertise

Member Data Documentation

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 206 of file XrdRequestManager.h.

Referenced by getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), handle(), handleOpen(), initialize(), pickSingleSource(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 210 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 211 of file XrdRequestManager.h.

Referenced by requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 209 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 231 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 233 of file XrdRequestManager.h.

Referenced by handleOpen().

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private
std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 230 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 207 of file XrdRequestManager.h.

Referenced by handle(), handleOpen(), and prepareOpaqueString().

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 217 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

const std::string XrdAdaptor::RequestManager::m_name
private
timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 222 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), handleOpen(), and initialize().

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 220 of file XrdRequestManager.h.

Referenced by pickSingleSource().

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 289 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private
std::atomic<std::string *> XrdAdaptor::RequestManager::m_serverToAdvertise
private

Definition at line 215 of file XrdRequestManager.h.

Referenced by queueUpdateCurrentServer(), and updateCurrentServer().

std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
mutableprivate
int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 218 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 223 of file XrdRequestManager.h.

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3 * 60
static

Definition at line 47 of file XrdRequestManager.h.