CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Inheritance diagram for XrdAdaptor::RequestManager:

Classes

class  OpenHandler
 

Public Member Functions

void addConnections (cms::Exception &) const
 
std::shared_ptr< XrdCl::File > getActiveFile () const
 
void getActiveSourceNames (std::vector< std::string > &sources) const
 
void getDisabledSourceNames (std::vector< std::string > &sources) const
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources) const
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer > > iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
 ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr
< RequestManager
getInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3*60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
void checkSourcesImpl (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
bool compareSources (const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString () const
 
void queueUpdateCurrentServer (const std::string &)
 
void reportSiteChange (std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
 
void updateCurrentServer ()
 

Private Attributes

std::vector< std::shared_ptr
< Source > > 
m_activeSources
 
tbb::concurrent_unordered_set
< std::string > 
m_disabledExcludeStrings
 
tbb::concurrent_unordered_set
< std::shared_ptr< Source >
, SourceHash
m_disabledSources
 
tbb::concurrent_unordered_set
< std::string > 
m_disabledSourceStrings
 
std::uniform_real_distribution
< float > 
m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr
< Source > > 
m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
std::atomic< std::string * > m_serverToAdvertise
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 53 of file XrdRequestManager.h.

Constructor & Destructor Documentation

XrdAdaptor::RequestManager::~RequestManager ( )
default
RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 110 of file XrdRequestManager.cc.

Referenced by getInstance().

111  : m_serverToAdvertise(nullptr),
114  m_name(filename),
115  m_flags(flags),
116  m_perms(perms),
117  m_distribution(0, 100),
std::uniform_real_distribution< float > m_distribution
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const unsigned int XRD_DEFAULT_TIMEOUT
std::atomic< std::string * > m_serverToAdvertise
XrdCl::OpenFlags::Flags m_flags
XrdCl::Access::Mode m_perms
std::atomic< unsigned > m_excluded_active_count
tuple filename
Definition: lut2db_cfg.py:20

Member Function Documentation

void RequestManager::addConnections ( cms::Exception ex) const

Add the list of active connections to the exception extra info.

Definition at line 483 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), and source.

Referenced by getActiveFile(), initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), pickSingleSource(), and requestFailure().

483  {
484  std::vector<std::string> sources;
486  for (auto const &source : sources) {
487  ex.addAdditionalInfo("Active source: " + source);
488  }
489  sources.clear();
490  getDisabledSourceNames(sources);
491  for (auto const &source : sources) {
492  ex.addAdditionalInfo("Disabled source: " + source);
493  }
494 }
void getDisabledSourceNames(std::vector< std::string > &sources) const
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
static std::string const source
Definition: EdmProvDump.cc:43
void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 295 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, and timeDiffMS().

Referenced by handle().

298  {
299  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
300  << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
301  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
302  if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
303  { // Be more aggressive about getting rid of very bad sources.
304  compareSources(now, 0, 1, activeSources, inactiveSources);
305  compareSources(now, 1, 0, activeSources, inactiveSources);
306  }
308  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
309  }
310  }
311 }
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Definition at line 341 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, compareSources(), m_distribution, m_generator, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, eostools::move(), fileCollector::now, alignCSCRings::r, reportSiteChange(), indexGen::s2, source, timeDiffMS(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

344  {
345  bool findNewSource = false;
346  if (activeSources.size() <= 1) {
347  findNewSource = true;
348  } else if (activeSources.size() > 1) {
349  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
350  << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
351  findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
352  findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
353 
354  // NOTE: We could probably replace the copy with a better sort function.
355  // However, there are typically very few sources and the correctness is more obvious right now.
356  std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
357  eligibleInactiveSources.reserve(inactiveSources.size());
358  for (const auto &source : inactiveSources) {
359  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
360  eligibleInactiveSources.push_back(source);
361  }
362  }
363  auto bestInactiveSource =
364  std::min_element(eligibleInactiveSources.begin(),
365  eligibleInactiveSources.end(),
366  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
367  return s1->getQuality() < s2->getQuality();
368  });
369  auto worstActiveSource = std::max_element(activeSources.cbegin(),
370  activeSources.cend(),
371  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
372  return s1->getQuality() < s2->getQuality();
373  });
374  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
375  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
376  << ", quality " << (*bestInactiveSource)->getQuality();
377  }
378  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
379  << ", quality " << (*worstActiveSource)->getQuality();
380  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
381  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
382  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
383  ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
384  auto oldSources = activeSources;
385  activeSources.push_back(*bestInactiveSource);
386  reportSiteChange(oldSources, activeSources);
387  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
388  if (it->get() == bestInactiveSource->get()) {
389  inactiveSources.erase(it);
390  break;
391  }
392  } else
393  while ((bestInactiveSource != eligibleInactiveSources.end()) &&
394  (*worstActiveSource)->getQuality() >
395  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
396  edm::LogVerbatim("XrdAdaptorInternal")
397  << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
398  << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
399  << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
400  (*worstActiveSource)->setLastDowngrade(now);
401  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
402  if (it->get() == bestInactiveSource->get()) {
403  inactiveSources.erase(it);
404  break;
405  }
406  inactiveSources.emplace_back(std::move(*worstActiveSource));
407  auto oldSources = activeSources;
408  activeSources.erase(worstActiveSource);
409  activeSources.emplace_back(std::move(*bestInactiveSource));
410  reportSiteChange(oldSources, activeSources);
411  eligibleInactiveSources.clear();
412  for (const auto &source : inactiveSources)
413  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
414  eligibleInactiveSources.push_back(source);
415  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
416  eligibleInactiveSources.end(),
417  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
418  return s1->getQuality() < s2->getQuality();
419  });
420  worstActiveSource = std::max_element(activeSources.begin(),
421  activeSources.end(),
422  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
423  return s1->getQuality() < s2->getQuality();
424  });
425  }
426  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
427  float r = m_distribution(m_generator);
429  findNewSource = true;
430  }
431  }
432  }
433  if (findNewSource) {
434  m_open_handler->open();
436  }
437 
438  // Only aggressively look for new sources if we don't have two.
439  if (activeSources.size() == 2) {
441  } else {
443  }
445 }
std::uniform_real_distribution< float > m_distribution
tuple s2
Definition: indexGen.py:106
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
def move
Definition: eostools.py:510
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_ptr< OpenHandler > m_open_handler
#define XRD_ADAPTOR_LONG_OPEN_DELAY
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static std::string const source
Definition: EdmProvDump.cc:43
bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
) const
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 313 of file XrdRequestManager.cc.

References a, CalibrationSummaryClient_cfi::activeSources, b, bookConverter::max, and reportSiteChange().

Referenced by checkSources(), and checkSourcesImpl().

317  {
318  if (activeSources.size() < std::max(a, b) + 1) {
319  return false;
320  }
321 
322  bool findNewSource = false;
323  if ((activeSources[a]->getQuality() > 5130) ||
324  ((activeSources[a]->getQuality() > 260) &&
325  (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
326  edm::LogVerbatim("XrdAdaptorInternal")
327  << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
328  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
329  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
330  findNewSource = true;
331  }
332  activeSources[a]->setLastDowngrade(now);
333  inactiveSources.emplace_back(activeSources[a]);
334  auto oldSources = activeSources;
335  activeSources.erase(activeSources.begin() + a);
336  reportSiteChange(oldSources, activeSources);
337  }
338  return findNewSource;
339 }
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  ) const

Return a pointer to an active file. Useful for metadata operations.

Definition at line 447 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_perms, and m_source_mutex.

447  {
448  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
449  if (m_activeSources.empty()) {
451  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
452  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
453  ex.addContext("In XrdAdaptor::RequestManager::handle()");
454  addConnections(ex);
455  throw ex;
456  }
457  return m_activeSources[0]->getFileHandle();
458 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
std::recursive_mutex m_source_mutex
void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 460 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

460  {
461  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
462  sources.reserve(m_activeSources.size());
463  for (auto const &source : m_activeSources) {
464  sources.push_back(source->ID());
465  }
466 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 476 of file XrdRequestManager.cc.

References m_disabledSourceStrings, and source.

Referenced by addConnections().

476  {
477  sources.reserve(m_disabledSourceStrings.size());
478  for (auto const &source : m_disabledSourceStrings) {
479  sources.push_back(source);
480  }
481 }
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static std::string const source
Definition: EdmProvDump.cc:43
const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 111 of file XrdRequestManager.h.

References m_name.

111 {return m_name;}
static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 121 of file XrdRequestManager.h.

References instance, and RequestManager().

122  {
123  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
124  instance->initialize(instance);
125  return instance;
126  }
static PFTauRenderPlugin instance
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
tuple filename
Definition: lut2db_cfg.py:20
void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources) const

Definition at line 468 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

Referenced by addConnections().

468  {
469  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
470  sources.reserve(m_activeSources.size());
471  for (auto const &source : m_activeSources) {
472  sources.push_back(source->PrettyID());
473  }
474 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 63 of file XrdRequestManager.h.

64  {
65  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, into, size, off));
66  return handle(c_ptr);
67  }
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
tuple size
Write out results.
std::future<IOSize> XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer > >  iolist)
std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 522 of file XrdRequestManager.cc.

References CalibrationSummaryClient_cfi::activeSources, assert(), checkSources(), GET_CLOCK_MONOTONIC, m_activeSources, m_inactiveSources, m_source_mutex, eostools::move(), fileCollector::now, pickSingleSource(), and source.

522  {
523  assert(c_ptr.get());
524  timespec now;
525  GET_CLOCK_MONOTONIC(now);
526  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
527  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
528  {
529  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
530  activeSources = m_activeSources;
531  inactiveSources = m_inactiveSources;
532  }
533  {
534  //make sure we update values before calling pickSingelSource
535  std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
536  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
537  m_activeSources = std::move(activeSources);
538  m_inactiveSources = std::move(inactiveSources);
539  });
540 
541  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
542  }
543 
544  std::shared_ptr<Source> source = pickSingleSource();
545  source->handle(c_ptr);
546  return c_ptr->get_future();
547 }
#define GET_CLOCK_MONOTONIC(ts)
assert(m_qm.get())
std::vector< std::shared_ptr< Source > > m_inactiveSources
def move
Definition: eostools.py:510
std::vector< std::shared_ptr< Source > > m_activeSources
std::shared_ptr< Source > pickSingleSource()
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 576 of file XrdRequestManager.cc.

References alignCSCRings::s, XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

576  {
577  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
578  if (status.IsOK()) {
579  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
580  for (const auto &s : m_activeSources) {
581  if (source->ID() == s->ID()) {
582  edm::LogVerbatim("XrdAdaptorInternal")
583  << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
584  unsigned returned_count = ++m_excluded_active_count;
586  if (returned_count >= 3) {
588  }
589  return;
590  }
591  }
592  for (const auto &s : m_inactiveSources) {
593  if (source->ID() == s->ID()) {
594  edm::LogVerbatim("XrdAdaptorInternal")
595  << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
597  return;
598  }
599  }
600  if (m_activeSources.size() < 2) {
601  auto oldSources = m_activeSources;
602  m_activeSources.push_back(source);
603  reportSiteChange(oldSources, m_activeSources);
605  } else {
606  m_inactiveSources.push_back(source);
607  }
608  } else { // File-open failure - wait at least 120s before next attempt.
609  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
611  }
612 }
void queueUpdateCurrentServer(const std::string &)
std::vector< std::shared_ptr< Source > > m_inactiveSources
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
std::atomic< unsigned > m_excluded_active_count
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
tuple status
Definition: mps_update.py:57
void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 120 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), assert(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, mergeVDriftHistosByStation::file, reco_application_tbsim_DetSim-Digi_cfg::File, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), customizeTrackingMonitorSeedNumber::idx, m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, eostools::move(), pileupDistInMC::oldList, prepareOpaqueString(), queueUpdateCurrentServer(), reportSiteChange(), SendMonitoringInfo(), source, popcon_last_value_cfg::Source, mps_update::status, AlCaHLTBitMon_QueryRunRegistry::string, updateCurrentServer(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

120  {
122 
123  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
124  if (env) {
125  env->GetInt("StreamErrorWindow", m_timeout);
126  }
127 
128  std::string orig_site;
129  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos)) {
130  std::string hostname;
131  if (Source::getHostname(orig_site, hostname)) {
132  Source::getDomain(hostname, orig_site);
133  }
134  }
135 
136  std::unique_ptr<XrdCl::File> file;
138  bool validFile = false;
139  const int retries = 5;
140  std::string excludeString;
141  for (int idx = 0; idx < retries; idx++) {
142  file.reset(new XrdCl::File());
143  auto opaque = prepareOpaqueString();
144  std::string new_filename = m_name + (opaque.size() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
145  SyncHostResponseHandler handler;
146  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
147  if (!openStatus
148  .IsOK()) { // In this case, we failed immediately - this indicates we have previously tried to talk to this
149  // server and it was marked bad - xrootd couldn't even queue up the request internally!
150  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
151  // redirector being down or authentication failures.
152  ex.clearMessage();
153  ex.clearContext();
154  ex.clearAdditionalInfo();
155  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
156  << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
157  << ", code=" << openStatus.code << ")";
158  ex.addContext("Calling XrdFile::open()");
159  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
160  throw ex;
161  }
162  handler.WaitForResponse();
163  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
164  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
165  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
166  assert(status);
167  if (status->IsOK()) {
168  validFile = true;
169  break;
170  } else {
171  ex.clearMessage();
172  ex.clearContext();
173  ex.clearAdditionalInfo();
174  ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
175  << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
176  << ", code=" << status->code << ")";
177  ex.addContext("Calling XrdFile::open()");
178  addConnections(ex);
179  std::string dataServer, lastUrl;
180  file->GetProperty("DataServer", dataServer);
181  file->GetProperty("LastURL", lastUrl);
182  if (dataServer.size())
183  {
184  ex.addAdditionalInfo("Problematic data server: " + dataServer);
185  }
186  if (lastUrl.size())
187  {
188  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
189  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
190  }
191  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
192  m_disabledSourceStrings.end()) {
193  ex << ". No additional data servers were found.";
194  throw ex;
195  }
196  if (dataServer.size())
197  {
198  m_disabledSourceStrings.insert(dataServer);
199  m_disabledExcludeStrings.insert(excludeString);
200  }
201  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
202  if (lastUrl == new_filename) {
203  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
204  throw ex;
205  }
206  }
207  }
208  if (!validFile) {
209  throw ex;
210  }
211  SendMonitoringInfo(*file);
212 
213  timespec ts;
215 
216  std::shared_ptr<Source> source(new Source(ts, std::move(file), excludeString));
217  {
218  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
219  auto oldList = m_activeSources;
220  m_activeSources.push_back(source);
222  }
225 
226  m_lastSourceCheck = ts;
227  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
229 }
#define GET_CLOCK_MONOTONIC(ts)
assert(m_qm.get())
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:307
std::string prepareOpaqueString() const
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
static void SendMonitoringInfo(XrdCl::File &file)
void addConnections(cms::Exception &) const
void queueUpdateCurrentServer(const std::string &)
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:255
def move
Definition: eostools.py:510
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:344
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
tuple status
Definition: mps_update.py:57
std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 496 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_nextInitialSourceToggle, m_perms, m_source_mutex, and source.

Referenced by handle().

496  {
497  std::shared_ptr<Source> source = nullptr;
498  {
499  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
500  if (m_activeSources.size() == 2) {
502  source = m_activeSources[0];
504  } else {
505  source = m_activeSources[1];
507  }
508  } else if (m_activeSources.empty()) {
510  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
511  << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
512  ex.addContext("In XrdAdaptor::RequestManager::handle()");
513  addConnections(ex);
514  throw ex;
515  } else {
516  source = m_activeSources[0];
517  }
518  }
519  return source;
520 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
std::string RequestManager::prepareOpaqueString ( ) const
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 549 of file XrdRequestManager.cc.

References KineDebug3::count(), m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, and create_public_lumi_plots::tmp_str.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

549  {
550  std::stringstream ss;
551  ss << "tried=";
552  size_t count = 0;
553  {
554  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
555 
556  for (const auto &it : m_activeSources) {
557  count++;
558  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
559  }
560  for (const auto &it : m_inactiveSources) {
561  count++;
562  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
563  }
564  }
565  for (const auto &it : m_disabledExcludeStrings) {
566  count++;
567  ss << it.substr(0, it.find(":")) << ",";
568  }
569  if (count) {
570  std::string tmp_str = ss.str();
571  return tmp_str.substr(0, tmp_str.size() - 1);
572  }
573  return "";
574 }
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::recursive_mutex m_source_mutex
void RequestManager::queueUpdateCurrentServer ( const std::string &  id)
private

Definition at line 255 of file XrdRequestManager.cc.

References XrdAdaptor::Source::getHostname(), m_serverToAdvertise, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize().

255  {
256  std::unique_ptr<std::string> hostname(new std::string(id));
257  if (Source::getHostname(id, *hostname)) {
258  std::string *null_hostname = nullptr;
259  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
260  hostname.release();
261  }
262  }
263 }
std::atomic< std::string * > m_serverToAdvertise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
void RequestManager::reportSiteChange ( std::vector< std::shared_ptr< Source > > const &  iOld,
std::vector< std::shared_ptr< Source > > const &  iNew,
std::string  orig_site = std::string{} 
) const
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 277 of file XrdRequestManager.cc.

Referenced by checkSourcesImpl(), compareSources(), initialize(), and requestFailure().

280 {
281  auto siteList = formatSites(iNew);
282  if (orig_site.size() && (orig_site != siteList))
283  {
284  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
285  } else {
286  auto oldSites = formatSites(iOld);
287  if (!orig_site.size() && (siteList != oldSites))
288  {
289  if (oldSites.size() >0 )
290  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
291  }
292  }
293 }
void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 716 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, fileCollector::now, reportSiteChange(), seconds(), mps_update::status, and mps_check::timeout.

716  {
717  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
718 
719  // Fail early for invalid responses - XrdFile has a separate path for handling this.
720  if (c_status.code == XrdCl::errInvalidResponse) {
721  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
723  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
724  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
725  << ") => Invalid ReadV response from server";
726  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
727  addConnections(ex);
728  throw ex;
729  }
730  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
731 
732  // Note that we do not delete the Source itself. That is because this
733  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
734  // In such a case, if you close a file in the handler, it will deadlock
735  m_disabledSourceStrings.insert(source_ptr->ID());
736  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
737  m_disabledSources.insert(source_ptr);
738 
739  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
740  if ((m_activeSources.size() > 0) && (m_activeSources[0].get() == source_ptr.get()))
741  {
742  auto oldSources = m_activeSources;
743  m_activeSources.erase(m_activeSources.begin());
744  reportSiteChange(oldSources, m_activeSources);
745  }
746  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
747  {
748  auto oldSources = m_activeSources;
749  m_activeSources.erase(m_activeSources.begin()+1);
750  reportSiteChange(oldSources, m_activeSources);
751  }
752  std::shared_ptr<Source> new_source;
753  if (m_activeSources.size() == 0) {
754  std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
755  timespec now;
756  GET_CLOCK_MONOTONIC(now);
758  // Note we only wait for 180 seconds here. This is because we've already failed
759  // once and the likelihood the program has some inconsistent state is decent.
760  // We'd much rather fail hard than deadlock!
761  sentry.unlock();
762  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
763  if (status == std::future_status::timeout) {
765  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
766  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
767  << ") => timeout when waiting for file open";
768  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
769  addConnections(ex);
770  throw ex;
771  } else {
772  try {
773  new_source = future.get();
774  } catch (edm::Exception &ex) {
775  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
776  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
777  throw;
778  }
779  }
780 
781  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
782  m_disabledSourceStrings.end()) {
783  // The server gave us back a data node we requested excluded. Fatal!
785  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
786  << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
787  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
788  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
789  addConnections(ex);
790  throw ex;
791  }
792  sentry.lock();
793 
794  auto oldSources = m_activeSources;
795  m_activeSources.push_back(new_source);
796  reportSiteChange(oldSources, m_activeSources);
797  } else {
798  new_source = m_activeSources[0];
799  }
800  new_source->handle(c_ptr);
801 }
double seconds()
#define GET_CLOCK_MONOTONIC(ts)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
int timeout
Definition: mps_check.py:51
void addConnections(cms::Exception &) const
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
Definition: Exception.cc:227
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::recursive_mutex m_source_mutex
tuple status
Definition: mps_update.py:57
void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2,
std::vector< std::shared_ptr< Source >> const &  activeSources 
) const
private

Given a client request, split it into two requests lists.

Definition at line 896 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::addContext(), assert(), consumeChunkBack(), consumeChunkFront(), edm::errors::FileReadError, bookConverter::max, IOPosBuffer::offset(), q1, q2, contentValuesCheck::ss, validateList(), XRD_ADAPTOR_CHUNK_THRESHOLD, and XRD_CL_MAX_CHUNK.

897 {
898  if (iolist.size() == 0) return;
899  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
900  req1.reserve(iolist.size() / 2 + 1);
901  req2.reserve(iolist.size() / 2 + 1);
902  size_t front = 0;
903 
904  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
905  float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
906  float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
907  IOSize chunk1, chunk2;
908  // Make sure the chunk size is at least 1024; little point to reads less than that size.
909  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
910  static_cast<IOSize>(1024));
911  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
912  static_cast<IOSize>(1024));
913 
914  IOSize size_orig = 0;
915  for (const auto &it : iolist)
916  size_orig += it.size();
917 
918  while (tmp_iolist.size() - front > 0) {
919  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
920  (req2.size() >=
921  XRD_ADAPTOR_CHUNK_THRESHOLD)) { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
922  // are passed to the request manager. However, because we have a max chunk size, we increase
923  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
924  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
925  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
927  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
928  << ", permissions=0" << std::oct << m_perms << std::dec
929  << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
930  "reported to CMSSW developers.";
931  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
932  addConnections(ex);
933  std::stringstream ss;
934  ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
935  ex.addAdditionalInfo(ss.str());
936  std::stringstream ss2;
937  ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
938  ex.addAdditionalInfo(ss2.str());
939  throw ex;
940  }
941  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
942  consumeChunkFront(front, tmp_iolist, req1, chunk1);
943  }
944  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
945  consumeChunkBack(front, tmp_iolist, req2, chunk2);
946  }
947  }
948  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
949  return left.offset() < right.offset();
950  });
951  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
952  return left.offset() < right.offset();
953  });
954 
955  IOSize size1 = validateList(req1);
956  IOSize size2 = validateList(req2);
957 
958  assert(size_orig == size1 + size2);
959 
960  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
961  << " bytes) split into requests size " << req1.size() << " (" << size1
962  << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
963 }
assert(m_qm.get())
#define XRD_CL_MAX_CHUNK
double q2[4]
Definition: TauolaWrapper.h:88
#define XRD_ADAPTOR_CHUNK_THRESHOLD
void addConnections(cms::Exception &) const
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
XrdCl::OpenFlags::Flags m_flags
double q1[4]
Definition: TauolaWrapper.h:87
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
XrdCl::Access::Mode m_perms
static IOSize validateList(const std::vector< IOPosBuffer > req)
size_t IOSize
Definition: IOTypes.h:14
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void RequestManager::updateCurrentServer ( )
inlineprivate

Update the StatisticsSenderService, if necessary, with the current server.

Update the StatisticsSenderService with the current server info.

As this accesses the edm::Service infrastructure, this MUST be called from an edm-managed thread. It CANNOT be called from an Xrootd-managed thread.

Definition at line 238 of file XrdRequestManager.cc.

References edm::Service< T >::isAvailable(), likely, m_name, m_serverToAdvertise, edm::storage::StatisticsSenderService::setCurrentServer(), statsService, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize().

238  {
239  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
240  // a pending update. *However*, since we call this for every read, we'll get it
241  // eventually.
242  if (likely(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
243  return;
244  }
245  std::string *hostname_ptr;
246  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
247  std::unique_ptr<std::string> hostname(hostname_ptr);
249  if (statsService.isAvailable()) {
250  statsService->setCurrentServer(m_name, *hostname_ptr);
251  }
252  }
253 }
#define likely(x)
std::atomic< std::string * > m_serverToAdvertise
bool isAvailable() const
Definition: Service.h:46
XrdSiteStatisticsInformation * statsService
Definition: XrdSource.cc:219
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)

Member Data Documentation

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 212 of file XrdRequestManager.h.

Referenced by getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), handle(), initialize(), pickSingleSource(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 216 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 217 of file XrdRequestManager.h.

Referenced by requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 215 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 237 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 239 of file XrdRequestManager.h.

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private
std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 236 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 213 of file XrdRequestManager.h.

Referenced by handle(), and prepareOpaqueString().

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 223 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

const std::string XrdAdaptor::RequestManager::m_name
private
timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 228 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), and initialize().

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 226 of file XrdRequestManager.h.

Referenced by pickSingleSource().

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 297 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private
std::atomic<std::string*> XrdAdaptor::RequestManager::m_serverToAdvertise
private

Definition at line 221 of file XrdRequestManager.h.

Referenced by queueUpdateCurrentServer(), and updateCurrentServer().

std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
mutableprivate
int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 224 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 229 of file XrdRequestManager.h.

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3*60
static

Definition at line 56 of file XrdRequestManager.h.