CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Inheritance diagram for XrdAdaptor::RequestManager:

Classes

class  OpenHandler
 

Public Member Functions

void addConnections (cms::Exception &)
 
std::shared_ptr< XrdCl::File > getActiveFile ()
 
void getActiveSourceNames (std::vector< std::string > &sources)
 
void getDisabledSourceNames (std::vector< std::string > &sources)
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources)
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer > > iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
 ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr
< RequestManager
getInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3*60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize)
 
void checkSourcesImpl (timespec &now, IOSize requestSize)
 
bool compareSources (const timespec &now, unsigned a, unsigned b)
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString ()
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2)
 
void updateSiteInfo (std::string orig_site="")
 

Private Attributes

std::string m_activeSites
 
std::vector< std::shared_ptr
< Source > > 
m_activeSources
 
std::set< std::string > m_disabledExcludeStrings
 
std::set< std::shared_ptr
< Source > > 
m_disabledSources
 
std::set< std::string > m_disabledSourceStrings
 
std::uniform_real_distribution
< float > 
m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr
< Source > > 
m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 43 of file XrdRequestManager.h.

Constructor & Destructor Documentation

XrdAdaptor::RequestManager::~RequestManager ( )
default
RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 103 of file XrdRequestManager.cc.

Referenced by getInstance().

106  m_name(filename),
107  m_flags(flags),
108  m_perms(perms),
109  m_distribution(0,100),
111 {
112 }
std::uniform_real_distribution< float > m_distribution
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const unsigned int XRD_DEFAULT_TIMEOUT
XrdCl::OpenFlags::Flags m_flags
XrdCl::Access::Mode m_perms
std::atomic< unsigned > m_excluded_active_count
tuple filename
Definition: lut2db_cfg.py:20

Member Function Documentation

void RequestManager::addConnections ( cms::Exception ex)

Add the list of active connections to the exception extra info.

Definition at line 426 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), and source.

Referenced by initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), and requestFailure().

427 {
428  std::vector<std::string> sources;
430  for (auto const& source : sources)
431  {
432  ex.addAdditionalInfo("Active source: " + source);
433  }
434  sources.clear();
435  getDisabledSourceNames(sources);
436  for (auto const& source : sources)
437  {
438  ex.addAdditionalInfo("Disabled source: " + source);
439  }
440 }
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void getDisabledSourceNames(std::vector< std::string > &sources)
void getPrettyActiveSourceNames(std::vector< std::string > &sources)
static std::string const source
Definition: EdmProvDump.cc:42
void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 258 of file XrdRequestManager.cc.

References checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, m_source_mutex, and timeDiffMS().

Referenced by handle().

259 {
260  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check "
261  << timeDiffMS(now, m_lastSourceCheck) << "; last check "
262  << m_lastSourceCheck.tv_sec << "; now " <<now.tv_sec
263  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
264  if (timeDiffMS(now, m_lastSourceCheck) > 1000)
265  {
266  { // Be more aggressive about getting rid of very bad sources.
267  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
268  compareSources(now, 0, 1);
269  compareSources(now, 1, 0);
270  }
272  {
273  checkSourcesImpl(now, requestSize);
274  }
275  }
276 }
bool compareSources(const timespec &now, unsigned a, unsigned b)
long long timeDiffMS(const timespec &a, const timespec &b)
void checkSourcesImpl(timespec &now, IOSize requestSize)
std::recursive_mutex m_source_mutex
void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize 
)
private

Definition at line 301 of file XrdRequestManager.cc.

References compareSources(), m_activeSources, m_distribution, m_generator, m_inactiveSources, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, m_source_mutex, eostools::move(), fileCollector::now, alignCSCRings::r, indexGen::s2, source, timeDiffMS(), updateSiteInfo(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

302 {
303  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
304 
305  bool findNewSource = false;
306  if (m_activeSources.size() <= 1)
307  {
308  findNewSource = true;
309  }
310  else if (m_activeSources.size() > 1)
311  {
312  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << m_activeSources[0]->getQuality() << ", source 1 quality " << m_activeSources[1]->getQuality() << std::endl;
313  findNewSource |= compareSources(now, 0, 1);
314  findNewSource |= compareSources(now, 1, 0);
315 
316  // NOTE: We could probably replace the copy with a better sort function.
317  // However, there are typically very few sources and the correctness is more obvious right now.
318  std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(m_inactiveSources.size());
319  for (const auto & source : m_inactiveSources)
320  {
321  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY-1)*1000) {eligibleInactiveSources.push_back(source);}
322  }
323  std::vector<std::shared_ptr<Source> >::iterator bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
324  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
325  std::vector<std::shared_ptr<Source> >::iterator worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(),
326  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
327  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
328  {
329  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " <<(*bestInactiveSource)->PrettyID()
330  << ", quality " << (*bestInactiveSource)->getQuality();
331  }
332  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " <<(*worstActiveSource)->PrettyID()
333  << ", quality " << (*worstActiveSource)->getQuality();
334  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
335  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
336  if ((bestInactiveSource != eligibleInactiveSources.end()) && m_activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*m_activeSources[0]->getQuality()))
337  {
338  m_activeSources.push_back(*bestInactiveSource);
339  updateSiteInfo();
340  for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;}
341  }
342  else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
343  {
344  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << (*worstActiveSource)->PrettyID()
345  << " from active sources due to quality (" << (*worstActiveSource)->getQuality()
346  << ") and promoting " << (*bestInactiveSource)->PrettyID() << " (quality: "
347  << (*bestInactiveSource)->getQuality() << ")" << std::endl;
348  (*worstActiveSource)->setLastDowngrade(now);
349  for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;}
350  m_inactiveSources.emplace_back(std::move(*worstActiveSource));
351  m_activeSources.erase(worstActiveSource);
352  m_activeSources.emplace_back(std::move(*bestInactiveSource));
353  updateSiteInfo();
354  eligibleInactiveSources.clear();
355  for (const auto & source : m_inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source);
356  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
357  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
358  worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(),
359  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
360  }
361  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000*XRD_ADAPTOR_LONG_OPEN_DELAY))
362  {
363  float r = m_distribution(m_generator);
365  {
366  findNewSource = true;
367  }
368  }
369  }
370  if (findNewSource)
371  {
372  m_open_handler->open();
374  }
375 
376  // Only aggressively look for new sources if we don't have two.
377  if (m_activeSources.size() == 2)
378  {
380  }
381  else
382  {
384  }
386 }
std::uniform_real_distribution< float > m_distribution
void updateSiteInfo(std::string orig_site="")
tuple s2
Definition: indexGen.py:106
bool compareSources(const timespec &now, unsigned a, unsigned b)
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
long long timeDiffMS(const timespec &a, const timespec &b)
std::vector< std::shared_ptr< Source > > m_inactiveSources
def move
Definition: eostools.py:508
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_ptr< OpenHandler > m_open_handler
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b 
)
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 280 of file XrdRequestManager.cc.

References a, b, m_activeSources, m_inactiveSources, bookConverter::max, and updateSiteInfo().

Referenced by checkSources(), and checkSourcesImpl().

281 {
282  if (m_activeSources.size() < std::max(a, b)+1) {return false;}
283 
284  bool findNewSource = false;
285  if ((m_activeSources[a]->getQuality() > 5130) ||
286  ((m_activeSources[a]->getQuality() > 260) && (m_activeSources[b]->getQuality()*4 < m_activeSources[a]->getQuality())))
287  {
288  edm::LogVerbatim("XrdAdaptorInternal") << "Removing "
289  << m_activeSources[a]->PrettyID() << " from active sources due to poor quality ("
290  << m_activeSources[a]->getQuality() << " vs " << m_activeSources[b]->getQuality() << ")" << std::endl;
291  if (m_activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource = true;}
292  m_activeSources[a]->setLastDowngrade(now);
293  m_inactiveSources.emplace_back(m_activeSources[a]);
294  m_activeSources.erase(m_activeSources.begin()+a);
295  updateSiteInfo();
296  }
297  return findNewSource;
298 }
void updateSiteInfo(std::string orig_site="")
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  )

Return a pointer to an active file. Useful for metadata operations.

Definition at line 389 of file XrdRequestManager.cc.

References m_activeSources, and m_source_mutex.

390 {
391  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
392  return m_activeSources[0]->getFileHandle();
393 }
std::vector< std::shared_ptr< Source > > m_activeSources
std::recursive_mutex m_source_mutex
void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources)

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 396 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

397 {
398  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
399  sources.reserve(m_activeSources.size());
400  for (auto const& source : m_activeSources) {
401  sources.push_back(source->ID());
402  }
403 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources)

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 416 of file XrdRequestManager.cc.

References m_disabledSourceStrings, m_source_mutex, and source.

Referenced by addConnections().

417 {
418  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
419  sources.reserve(m_disabledSourceStrings.size());
420  for (auto const& source : m_disabledSourceStrings) {
421  sources.push_back(source);
422  }
423 }
std::set< std::string > m_disabledSourceStrings
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 101 of file XrdRequestManager.h.

References m_name.

101 {return m_name;}
static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 111 of file XrdRequestManager.h.

References instance, and RequestManager().

112  {
113  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
114  instance->initialize(instance);
115  return instance;
116  }
static PFTauRenderPlugin instance
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
tuple filename
Definition: lut2db_cfg.py:20
void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources)

Definition at line 406 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

Referenced by addConnections().

407 {
408  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
409  sources.reserve(m_activeSources.size());
410  for (auto const& source : m_activeSources) {
411  sources.push_back(source->PrettyID());
412  }
413 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 53 of file XrdRequestManager.h.

54  {
55  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, into, size, off));
56  return handle(c_ptr);
57  }
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
tuple size
Write out results.
std::future< IOSize > XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer > >  iolist)

Definition at line 559 of file XrdRequestManager.cc.

References a, assert(), b, GET_CLOCK_MONOTONIC, eostools::move(), fileCollector::now, AlCaHLTBitMon_ParallelJobs::p, edm::CPUTimer::start(), and edm::CPUTimer::stop().

560 {
561  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
562 
563  timespec now;
564  GET_CLOCK_MONOTONIC(now);
565 
566  edm::CPUTimer timer;
567  timer.start();
568 
569  assert(m_activeSources.size());
570  if (m_activeSources.size() == 1)
571  {
572  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
573  checkSources(now, c_ptr->getSize());
574  m_activeSources[0]->handle(c_ptr);
575  return c_ptr->get_future();
576  }
577 
578  assert(iolist.get());
579  std::shared_ptr<std::vector<IOPosBuffer> > req1(new std::vector<IOPosBuffer>);
580  std::shared_ptr<std::vector<IOPosBuffer> > req2(new std::vector<IOPosBuffer>);
581  splitClientRequest(*iolist, *req1, *req2);
582 
583  checkSources(now, req1->size() + req2->size());
584  // CheckSources may have removed a source
585  if (m_activeSources.size() == 1)
586  {
587  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
588  m_activeSources[0]->handle(c_ptr);
589  return c_ptr->get_future();
590  }
591 
592  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
593  std::future<IOSize> future1, future2;
594  if (req1->size())
595  {
596  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
597  m_activeSources[0]->handle(c_ptr1);
598  future1 = c_ptr1->get_future();
599  }
600  if (req2->size())
601  {
602  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
603  m_activeSources[1]->handle(c_ptr2);
604  future2 = c_ptr2->get_future();
605  }
606  if (req1->size() && req2->size())
607  {
608  std::future<IOSize> task = std::async(std::launch::deferred,
609  [](std::future<IOSize> a, std::future<IOSize> b){
610  // Wait until *both* results are available. This is essential
611  // as the callback may try referencing the RequestManager. If one
612  // throws an exception (causing the RequestManager to be destroyed by
613  // XrdFile) and the other has a failure, then the recovery code will
614  // reference the destroyed RequestManager.
615  //
616  // Unlike other places where we use shared/weak ptrs to maintain object
617  // lifetime and destruction asynchronously, we *cannot* destroy the request
618  // asynchronously as it is associated with a ROOT buffer. We must wait until we
619  // are guaranteed that XrdCl will not write into the ROOT buffer before we
620  // can return.
621  b.wait(); a.wait();
622  return b.get() + a.get();
623  },
624  std::move(future1),
625  std::move(future2));
626  timer.stop();
627  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
628  return task;
629  }
630  else if (req1->size()) { return future1; }
631  else if (req2->size()) { return future2; }
632  else
633  { // Degenerate case - no bytes to read.
634  std::promise<IOSize> p; p.set_value(0);
635  return p.get_future();
636  }
637 }
void start()
Definition: CPUTimer.cc:74
#define GET_CLOCK_MONOTONIC(ts)
assert(m_qm.get())
def move
Definition: eostools.py:508
Times stop()
Definition: CPUTimer.cc:94
void checkSources(timespec &now, IOSize requestSize)
std::vector< std::shared_ptr< Source > > m_activeSources
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2)
std::recursive_mutex m_source_mutex
std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 470 of file XrdRequestManager.cc.

References assert(), checkSources(), GET_CLOCK_MONOTONIC, fileCollector::now, pickSingleSource(), and source.

471 {
472  assert(c_ptr.get());
473  timespec now;
474  GET_CLOCK_MONOTONIC(now);
475  checkSources(now, c_ptr->getSize());
476 
477  std::shared_ptr<Source> source = pickSingleSource();
478  source->handle(c_ptr);
479  return c_ptr->get_future();
480 }
#define GET_CLOCK_MONOTONIC(ts)
assert(m_qm.get())
void checkSources(timespec &now, IOSize requestSize)
std::shared_ptr< Source > pickSingleSource()
static std::string const source
Definition: EdmProvDump.cc:42
void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 513 of file XrdRequestManager.cc.

References alignCSCRings::s, XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

514 {
515  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
516  if (status.IsOK())
517  {
518  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
519  for (const auto & s : m_activeSources)
520  {
521  if (source->ID() == s->ID())
522  {
523  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded source " << source->PrettyID()
524  << "; ignoring" << std::endl;
525  unsigned returned_count = ++m_excluded_active_count;
528  return;
529  }
530  }
531  for (const auto & s : m_inactiveSources)
532  {
533  if (source->ID() == s->ID())
534  {
535  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded inactive source " << source->PrettyID()
536  << "; ignoring" << std::endl;
538  return;
539  }
540  }
541  if (m_activeSources.size() < 2)
542  {
543  m_activeSources.push_back(source);
544  updateSiteInfo();
545  }
546  else
547  {
548  m_inactiveSources.push_back(source);
549  }
550  }
551  else
552  { // File-open failure - wait at least 120s before next attempt.
553  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
555  }
556 }
void updateSiteInfo(std::string orig_site="")
std::vector< std::shared_ptr< Source > > m_inactiveSources
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
std::atomic< unsigned > m_excluded_active_count
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
tuple status
Definition: ntuplemaker.py:245
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 116 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), assert(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, mergeVDriftHistosByStation::file, reco_application_tbsim_DetSim-Digi_cfg::File, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), customizeTrackingMonitorSeedNumber::idx, m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, eostools::move(), prepareOpaqueString(), SendMonitoringInfo(), source, popcon_last_value_cfg::Source, ntuplemaker::status, AlCaHLTBitMon_QueryRunRegistry::string, updateSiteInfo(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

117 {
119 
120  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
121  if (env) {env->GetInt("StreamErrorWindow", m_timeout);}
122 
123  std::string orig_site;
124  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos))
125  {
126  std::string hostname;
127  if (Source::getHostname(orig_site, hostname))
128  {
129  Source::getDomain(hostname, orig_site);
130  }
131  }
132 
133  std::unique_ptr<XrdCl::File> file;
135  bool validFile = false;
136  const int retries = 5;
137  std::string excludeString;
138  for (int idx=0; idx<retries; idx++)
139  {
140  file.reset(new XrdCl::File());
141  auto opaque = prepareOpaqueString();
142  std::string new_filename = m_name + (opaque.size() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
143  SyncHostResponseHandler handler;
144  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
145  if (!openStatus.IsOK())
146  { // In this case, we failed immediately - this indicates we have previously tried to talk to this
147  // server and it was marked bad - xrootd couldn't even queue up the request internally!
148  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
149  // redirector being down or authentication failures.
150  ex.clearMessage();
151  ex.clearContext();
152  ex.clearAdditionalInfo();
153  ex << "XrdCl::File::Open(name='" << m_name
154  << "', flags=0x" << std::hex << m_flags
155  << ", permissions=0" << std::oct << m_perms << std::dec
156  << ") => error '" << openStatus.ToStr()
157  << "' (errno=" << openStatus.errNo << ", code=" << openStatus.code << ")";
158  ex.addContext("Calling XrdFile::open()");
159  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
160  throw ex;
161  }
162  handler.WaitForResponse();
163  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
164  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
165  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
166  assert(status);
167  if (status->IsOK())
168  {
169  validFile = true;
170  break;
171  }
172  else
173  {
174  ex.clearMessage();
175  ex.clearContext();
176  ex.clearAdditionalInfo();
177  ex << "XrdCl::File::Open(name='" << m_name
178  << "', flags=0x" << std::hex << m_flags
179  << ", permissions=0" << std::oct << m_perms << std::dec
180  << ") => error '" << status->ToStr()
181  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
182  ex.addContext("Calling XrdFile::open()");
183  addConnections(ex);
184  std::string dataServer, lastUrl;
185  file->GetProperty("DataServer", dataServer);
186  file->GetProperty("LastURL", lastUrl);
187  if (dataServer.size())
188  {
189  ex.addAdditionalInfo("Problematic data server: " + dataServer);
190  }
191  if (lastUrl.size())
192  {
193  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
194  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
195  }
196  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) != m_disabledSourceStrings.end())
197  {
198  ex << ". No additional data servers were found.";
199  throw ex;
200  }
201  if (dataServer.size())
202  {
203  m_disabledSourceStrings.insert(dataServer);
204  m_disabledExcludeStrings.insert(excludeString);
205  }
206  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
207  if (lastUrl == new_filename)
208  {
209  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
210  throw ex;
211  }
212  }
213  }
214  if (!validFile)
215  {
216  throw ex;
217  }
218  SendMonitoringInfo(*file);
219 
220  timespec ts;
222 
223  std::shared_ptr<Source> source(new Source(ts, std::move(file), excludeString));
224  {
225  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
226  m_activeSources.push_back(source);
227  updateSiteInfo(orig_site);
228  }
229 
230  m_lastSourceCheck = ts;
231  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
233 }
#define GET_CLOCK_MONOTONIC(ts)
assert(m_qm.get())
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:196
void updateSiteInfo(std::string orig_site="")
std::set< std::string > m_disabledSourceStrings
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
static void SendMonitoringInfo(XrdCl::File &file)
void addConnections(cms::Exception &)
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:144
def move
Definition: eostools.py:508
std::set< std::string > m_disabledExcludeStrings
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:233
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
tuple status
Definition: ntuplemaker.py:245
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:116
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 443 of file XrdRequestManager.cc.

References m_activeSources, m_nextInitialSourceToggle, m_source_mutex, and source.

Referenced by handle().

444 {
445  std::shared_ptr<Source> source = nullptr;
446  {
447  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
448  if (m_activeSources.size() == 2)
449  {
451  {
452  source = m_activeSources[0];
454  }
455  else
456  {
457  source = m_activeSources[1];
459  }
460  }
461  else
462  {
463  source = m_activeSources[0];
464  }
465  }
466  return source;
467 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
std::string RequestManager::prepareOpaqueString ( )
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 483 of file XrdRequestManager.cc.

References prof2calltree::count, m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, and create_public_lumi_plots::tmp_str.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

484 {
485  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
486  std::stringstream ss;
487  ss << "tried=";
488  size_t count = 0;
489  for ( const auto & it : m_activeSources )
490  {
491  count++;
492  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
493  }
494  for ( const auto & it : m_inactiveSources )
495  {
496  count++;
497  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
498  }
499  for ( const auto & it : m_disabledExcludeStrings )
500  {
501  count++;
502  ss << it.substr(0, it.find(":")) << ",";
503  }
504  if (count)
505  {
506  std::string tmp_str = ss.str();
507  return tmp_str.substr(0, tmp_str.size()-1);
508  }
509  return "";
510 }
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::set< std::string > m_disabledExcludeStrings
std::vector< std::shared_ptr< Source > > m_activeSources
std::recursive_mutex m_source_mutex
void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 640 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, fileCollector::now, seconds(), ntuplemaker::status, and updateSiteInfo().

641 {
642  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
643  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
644 
645  // Fail early for invalid responses - XrdFile has a separate path for handling this.
646  if (c_status.code == XrdCl::errInvalidResponse)
647  {
648  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
650  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name
651  << "', flags=0x" << std::hex << m_flags
652  << ", permissions=0" << std::oct << m_perms << std::dec
653  << ", old source=" << source_ptr->PrettyID()
654  << ") => Invalid ReadV response from server";
655  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
656  addConnections(ex);
657  throw ex;
658  }
659  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
660 
661  // Note that we do not delete the Source itself. That is because this
662  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
663  // In such a case, if you close a file in the handler, it will deadlock
664  m_disabledSourceStrings.insert(source_ptr->ID());
665  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
666  m_disabledSources.insert(source_ptr);
667 
668  if ((m_activeSources.size() > 0) && (m_activeSources[0].get() == source_ptr.get()))
669  {
670  m_activeSources.erase(m_activeSources.begin());
671  updateSiteInfo();
672  }
673  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
674  {
675  m_activeSources.erase(m_activeSources.begin()+1);
676  updateSiteInfo();
677  }
678  std::shared_ptr<Source> new_source;
679  if (m_activeSources.size() == 0)
680  {
681  std::shared_future<std::shared_ptr<Source> > future = m_open_handler->open();
682  timespec now;
683  GET_CLOCK_MONOTONIC(now);
685  // Note we only wait for 180 seconds here. This is because we've already failed
686  // once and the likelihood the program has some inconsistent state is decent.
687  // We'd much rather fail hard than deadlock!
688  sentry.unlock();
689  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout+10));
690  if (status == std::future_status::timeout)
691  {
693  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
694  << "', flags=0x" << std::hex << m_flags
695  << ", permissions=0" << std::oct << m_perms << std::dec
696  << ", old source=" << source_ptr->PrettyID()
697  << ", current server=" << m_open_handler->current_source()
698  << ") => timeout when waiting for file open";
699  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
700  addConnections(ex);
701  throw ex;
702  }
703  else
704  {
705  try
706  {
707  new_source = future.get();
708  }
709  catch (edm::Exception &ex)
710  {
711  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
712  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
713  throw;
714  }
715  }
716  sentry.lock();
717 
718  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) != m_disabledSourceStrings.end())
719  {
720  // The server gave us back a data node we requested excluded. Fatal!
722  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
723  << "', flags=0x" << std::hex << m_flags
724  << ", permissions=0" << std::oct << m_perms << std::dec
725  << ", old source=" << source_ptr->PrettyID()
726  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
727  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
728  addConnections(ex);
729  throw ex;
730  }
731  m_activeSources.push_back(new_source);
732  updateSiteInfo();
733  }
734  else
735  {
736  new_source = m_activeSources[0];
737  }
738  new_source->handle(c_ptr);
739 }
double seconds()
#define GET_CLOCK_MONOTONIC(ts)
void updateSiteInfo(std::string orig_site="")
std::set< std::string > m_disabledSourceStrings
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
void addConnections(cms::Exception &)
std::set< std::shared_ptr< Source > > m_disabledSources
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::set< std::string > m_disabledExcludeStrings
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
Definition: Exception.cc:227
tuple status
Definition: ntuplemaker.py:245
std::recursive_mutex m_source_mutex
void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2 
)
private

Given a client request, split it into two requests lists.

Definition at line 856 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::addContext(), assert(), consumeChunkBack(), consumeChunkFront(), TauDecayModes::dec, edm::errors::FileReadError, prof2calltree::front, IOPosBuffer::offset(), q1, q2, python.multivaluedict::sort(), contentValuesCheck::ss, validateList(), and XRD_CL_MAX_CHUNK.

857 {
858  if (iolist.size() == 0) return;
859  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
860  req1.reserve(iolist.size()/2+1);
861  req2.reserve(iolist.size()/2+1);
862  size_t front=0;
863 
864  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
865  float q1 = static_cast<float>(m_activeSources[0]->getQuality())+5;
866  float q2 = static_cast<float>(m_activeSources[1]->getQuality())+5;
867  IOSize chunk1, chunk2;
868  chunk1 = static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2));
869  chunk2 = static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2));
870 
871  IOSize size_orig = 0;
872  for (const auto & it : iolist) size_orig += it.size();
873 
874  while (tmp_iolist.size()-front > 0)
875  {
876  if ((req1.size() >= 1000) && (req2.size() >= 1000))
877  { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
878  // are passed to the request manager. However, because we have a max chunk size, we increase
879  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
880  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
881  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
883  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name
884  << "', flags=0x" << std::hex << m_flags
885  << ", permissions=0" << std::oct << m_perms << std::dec
886  << ") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
887  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
888  addConnections(ex);
889  std::stringstream ss; ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
890  ex.addAdditionalInfo(ss.str());
891  std::stringstream ss2; ss2 << "Quality source 1 " << q1-5 << ", quality source 2: " << q2-5;
892  ex.addAdditionalInfo(ss2.str());
893  throw ex;
894  }
895  if (req1.size() < 1000) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
896  if (req2.size() < 1000) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
897  }
898  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
899  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
900 
901  IOSize size1 = validateList(req1);
902  IOSize size2 = validateList(req2);
903 
904  assert(size_orig == size1 + size2);
905 
906  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig << " bytes) split into requests size " << req1.size() << " (" << size1 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
907 }
assert(m_qm.get())
#define XRD_CL_MAX_CHUNK
double q2[4]
Definition: TauolaWrapper.h:88
void addConnections(cms::Exception &)
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
XrdCl::OpenFlags::Flags m_flags
double q1[4]
Definition: TauolaWrapper.h:87
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static IOSize validateList(const std::vector< IOPosBuffer > req)
size_t IOSize
Definition: IOTypes.h:14
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void RequestManager::updateSiteInfo ( std::string  orig_site = "")
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 237 of file XrdRequestManager.cc.

References m_activeSites, m_activeSources, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by checkSourcesImpl(), compareSources(), initialize(), and requestFailure().

238 {
239  std::string siteA, siteB, siteList;
240  if (m_activeSources.size()) {siteA = m_activeSources[0]->Site();}
241  if (m_activeSources.size() == 2) {siteB = m_activeSources[1]->Site();}
242  siteList = siteA;
243  if (siteB.size() && (siteB != siteA)) {siteList = siteA + ", " + siteB;}
244  if (orig_site.size() && (orig_site != siteList))
245  {
246  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
247  m_activeSites = siteList;
248  }
249  else if (!orig_site.size() && (siteList != m_activeSites))
250  {
251  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << m_activeSites;
252  m_activeSites = siteList;
253  }
254 }
std::vector< std::shared_ptr< Source > > m_activeSources

Member Data Documentation

std::string XrdAdaptor::RequestManager::m_activeSites
private

Definition at line 191 of file XrdRequestManager.h.

Referenced by updateSiteInfo().

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 186 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), compareSources(), getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), initialize(), pickSingleSource(), prepareOpaqueString(), requestFailure(), and updateSiteInfo().

std::set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 189 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

std::set<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 190 of file XrdRequestManager.h.

Referenced by requestFailure().

std::set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 188 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 207 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 209 of file XrdRequestManager.h.

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private
std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 206 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 187 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), compareSources(), and prepareOpaqueString().

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 193 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

const std::string XrdAdaptor::RequestManager::m_name
private
timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 198 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), and initialize().

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 196 of file XrdRequestManager.h.

Referenced by pickSingleSource().

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 260 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private
std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
private
int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 194 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 199 of file XrdRequestManager.h.

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3*60
static

Definition at line 46 of file XrdRequestManager.h.