CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Inheritance diagram for XrdAdaptor::RequestManager:

Classes

class  OpenHandler
 

Public Member Functions

void addConnections (cms::Exception &)
 
std::shared_ptr< XrdCl::File > getActiveFile ()
 
void getActiveSourceNames (std::vector< std::string > &sources)
 
void getDisabledSourceNames (std::vector< std::string > &sources)
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources)
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer > > iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
 ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr
< RequestManager
getInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3*60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize)
 
void checkSourcesImpl (timespec &now, IOSize requestSize)
 
bool compareSources (const timespec &now, unsigned a, unsigned b)
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString ()
 
void queueUpdateCurrentServer (const std::string &)
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2)
 
void updateCurrentServer ()
 
void updateSiteInfo (std::string orig_site="")
 

Private Attributes

std::string m_activeSites
 
std::vector< std::shared_ptr
< Source > > 
m_activeSources
 
std::set< std::string > m_disabledExcludeStrings
 
std::set< std::shared_ptr
< Source > > 
m_disabledSources
 
std::set< std::string > m_disabledSourceStrings
 
std::uniform_real_distribution
< float > 
m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr
< Source > > 
m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
std::atomic< std::string * > m_serverToAdvertise
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 43 of file XrdRequestManager.h.

Constructor & Destructor Documentation

XrdAdaptor::RequestManager::~RequestManager ( )
default
RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 108 of file XrdRequestManager.cc.

Referenced by getInstance().

109  : m_serverToAdvertise(nullptr),
112  m_name(filename),
113  m_flags(flags),
114  m_perms(perms),
115  m_distribution(0,100),
117 {
118 }
std::uniform_real_distribution< float > m_distribution
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const unsigned int XRD_DEFAULT_TIMEOUT
std::atomic< std::string * > m_serverToAdvertise
XrdCl::OpenFlags::Flags m_flags
XrdCl::Access::Mode m_perms
std::atomic< unsigned > m_excluded_active_count
tuple filename
Definition: lut2db_cfg.py:20

Member Function Documentation

void RequestManager::addConnections ( cms::Exception ex)

Add the list of active connections to the exception extra info.

Definition at line 485 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), and source.

Referenced by getActiveFile(), initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), pickSingleSource(), and requestFailure().

486 {
487  std::vector<std::string> sources;
489  for (auto const& source : sources)
490  {
491  ex.addAdditionalInfo("Active source: " + source);
492  }
493  sources.clear();
494  getDisabledSourceNames(sources);
495  for (auto const& source : sources)
496  {
497  ex.addAdditionalInfo("Disabled source: " + source);
498  }
499 }
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void getDisabledSourceNames(std::vector< std::string > &sources)
void getPrettyActiveSourceNames(std::vector< std::string > &sources)
static std::string const source
Definition: EdmProvDump.cc:42
void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 306 of file XrdRequestManager.cc.

References checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, m_source_mutex, and timeDiffMS().

Referenced by handle().

307 {
308  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check "
309  << timeDiffMS(now, m_lastSourceCheck) << "; last check "
310  << m_lastSourceCheck.tv_sec << "; now " <<now.tv_sec
311  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
312  if (timeDiffMS(now, m_lastSourceCheck) > 1000)
313  {
314  { // Be more aggressive about getting rid of very bad sources.
315  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
316  compareSources(now, 0, 1);
317  compareSources(now, 1, 0);
318  }
320  {
321  checkSourcesImpl(now, requestSize);
322  }
323  }
324 }
bool compareSources(const timespec &now, unsigned a, unsigned b)
long long timeDiffMS(const timespec &a, const timespec &b)
void checkSourcesImpl(timespec &now, IOSize requestSize)
std::recursive_mutex m_source_mutex
void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize 
)
private

Definition at line 349 of file XrdRequestManager.cc.

References compareSources(), m_activeSources, m_distribution, m_generator, m_inactiveSources, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, m_source_mutex, eostools::move(), fileCollector::now, alignCSCRings::r, indexGen::s2, source, timeDiffMS(), updateSiteInfo(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

350 {
351  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
352 
353  bool findNewSource = false;
354  if (m_activeSources.size() <= 1)
355  {
356  findNewSource = true;
357  }
358  else if (m_activeSources.size() > 1)
359  {
360  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << m_activeSources[0]->getQuality() << ", source 1 quality " << m_activeSources[1]->getQuality() << std::endl;
361  findNewSource |= compareSources(now, 0, 1);
362  findNewSource |= compareSources(now, 1, 0);
363 
364  // NOTE: We could probably replace the copy with a better sort function.
365  // However, there are typically very few sources and the correctness is more obvious right now.
366  std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(m_inactiveSources.size());
367  for (const auto & source : m_inactiveSources)
368  {
369  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY-1)*1000) {eligibleInactiveSources.push_back(source);}
370  }
371  std::vector<std::shared_ptr<Source> >::iterator bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
372  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
373  std::vector<std::shared_ptr<Source> >::iterator worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(),
374  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
375  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
376  {
377  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " <<(*bestInactiveSource)->PrettyID()
378  << ", quality " << (*bestInactiveSource)->getQuality();
379  }
380  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " <<(*worstActiveSource)->PrettyID()
381  << ", quality " << (*worstActiveSource)->getQuality();
382  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
383  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
384  if ((bestInactiveSource != eligibleInactiveSources.end()) && m_activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*m_activeSources[0]->getQuality()))
385  {
386  m_activeSources.push_back(*bestInactiveSource);
387  updateSiteInfo();
388  for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;}
389  }
390  else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
391  {
392  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << (*worstActiveSource)->PrettyID()
393  << " from active sources due to quality (" << (*worstActiveSource)->getQuality()
394  << ") and promoting " << (*bestInactiveSource)->PrettyID() << " (quality: "
395  << (*bestInactiveSource)->getQuality() << ")" << std::endl;
396  (*worstActiveSource)->setLastDowngrade(now);
397  for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;}
398  m_inactiveSources.emplace_back(std::move(*worstActiveSource));
399  m_activeSources.erase(worstActiveSource);
400  m_activeSources.emplace_back(std::move(*bestInactiveSource));
401  updateSiteInfo();
402  eligibleInactiveSources.clear();
403  for (const auto & source : m_inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source);
404  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
405  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
406  worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(),
407  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
408  }
409  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000*XRD_ADAPTOR_LONG_OPEN_DELAY))
410  {
411  float r = m_distribution(m_generator);
413  {
414  findNewSource = true;
415  }
416  }
417  }
418  if (findNewSource)
419  {
420  m_open_handler->open();
422  }
423 
424  // Only aggressively look for new sources if we don't have two.
425  if (m_activeSources.size() == 2)
426  {
428  }
429  else
430  {
432  }
434 }
std::uniform_real_distribution< float > m_distribution
void updateSiteInfo(std::string orig_site="")
tuple s2
Definition: indexGen.py:106
bool compareSources(const timespec &now, unsigned a, unsigned b)
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
long long timeDiffMS(const timespec &a, const timespec &b)
std::vector< std::shared_ptr< Source > > m_inactiveSources
def move
Definition: eostools.py:510
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_ptr< OpenHandler > m_open_handler
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b 
)
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 328 of file XrdRequestManager.cc.

References a, b, m_activeSources, m_inactiveSources, bookConverter::max, and updateSiteInfo().

Referenced by checkSources(), and checkSourcesImpl().

329 {
330  if (m_activeSources.size() < std::max(a, b)+1) {return false;}
331 
332  bool findNewSource = false;
333  if ((m_activeSources[a]->getQuality() > 5130) ||
334  ((m_activeSources[a]->getQuality() > 260) && (m_activeSources[b]->getQuality()*4 < m_activeSources[a]->getQuality())))
335  {
336  edm::LogVerbatim("XrdAdaptorInternal") << "Removing "
337  << m_activeSources[a]->PrettyID() << " from active sources due to poor quality ("
338  << m_activeSources[a]->getQuality() << " vs " << m_activeSources[b]->getQuality() << ")" << std::endl;
339  if (m_activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource = true;}
340  m_activeSources[a]->setLastDowngrade(now);
341  m_inactiveSources.emplace_back(m_activeSources[a]);
342  m_activeSources.erase(m_activeSources.begin()+a);
343  updateSiteInfo();
344  }
345  return findNewSource;
346 }
void updateSiteInfo(std::string orig_site="")
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  )

Return a pointer to an active file. Useful for metadata operations.

Definition at line 437 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_perms, and m_source_mutex.

438 {
439  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
440  if (m_activeSources.empty())
441  {
443  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name
444  << "', flags=0x" << std::hex << m_flags
445  << ", permissions=0" << std::oct << m_perms << std::dec
446  << ") => Source used after fatal exception.";
447  ex.addContext("In XrdAdaptor::RequestManager::handle()");
448  addConnections(ex);
449  throw ex;
450  }
451  return m_activeSources[0]->getFileHandle();
452 }
void addConnections(cms::Exception &)
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
std::recursive_mutex m_source_mutex
void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources)

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 455 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

456 {
457  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
458  sources.reserve(m_activeSources.size());
459  for (auto const& source : m_activeSources) {
460  sources.push_back(source->ID());
461  }
462 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources)

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 475 of file XrdRequestManager.cc.

References m_disabledSourceStrings, m_source_mutex, and source.

Referenced by addConnections().

476 {
477  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
478  sources.reserve(m_disabledSourceStrings.size());
479  for (auto const& source : m_disabledSourceStrings) {
480  sources.push_back(source);
481  }
482 }
std::set< std::string > m_disabledSourceStrings
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 101 of file XrdRequestManager.h.

References m_name.

101 {return m_name;}
static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 111 of file XrdRequestManager.h.

References instance, and RequestManager().

112  {
113  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
114  instance->initialize(instance);
115  return instance;
116  }
static PFTauRenderPlugin instance
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
tuple filename
Definition: lut2db_cfg.py:20
void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources)

Definition at line 465 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

Referenced by addConnections().

466 {
467  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
468  sources.reserve(m_activeSources.size());
469  for (auto const& source : m_activeSources) {
470  sources.push_back(source->PrettyID());
471  }
472 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 53 of file XrdRequestManager.h.

54  {
55  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, into, size, off));
56  return handle(c_ptr);
57  }
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
tuple size
Write out results.
std::future< IOSize > XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer > >  iolist)

Definition at line 630 of file XrdRequestManager.cc.

References a, cms::Exception::addContext(), assert(), b, TauDecayModes::dec, edm::errors::FileReadError, GET_CLOCK_MONOTONIC, eostools::move(), fileCollector::now, AlCaHLTBitMon_ParallelJobs::p, edm::CPUTimer::start(), and edm::CPUTimer::stop().

631 {
632  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
634 
635  timespec now;
636  GET_CLOCK_MONOTONIC(now);
637 
638  edm::CPUTimer timer;
639  timer.start();
640 
641  if (m_activeSources.size() == 1)
642  {
643  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
644  checkSources(now, c_ptr->getSize());
645  m_activeSources[0]->handle(c_ptr);
646  return c_ptr->get_future();
647  }
648  // Make sure active
649  else if (m_activeSources.empty())
650  {
652  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name
653  << "', flags=0x" << std::hex << m_flags
654  << ", permissions=0" << std::oct << m_perms << std::dec
655  << ") => Source used after fatal exception.";
656  ex.addContext("In XrdAdaptor::RequestManager::handle()");
657  addConnections(ex);
658  throw ex;
659  }
660 
661  assert(iolist.get());
662  std::shared_ptr<std::vector<IOPosBuffer> > req1(new std::vector<IOPosBuffer>);
663  std::shared_ptr<std::vector<IOPosBuffer> > req2(new std::vector<IOPosBuffer>);
664  splitClientRequest(*iolist, *req1, *req2);
665 
666  checkSources(now, req1->size() + req2->size());
667  // CheckSources may have removed a source
668  if (m_activeSources.size() == 1)
669  {
670  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
671  m_activeSources[0]->handle(c_ptr);
672  return c_ptr->get_future();
673  }
674 
675  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
676  std::future<IOSize> future1, future2;
677  if (req1->size())
678  {
679  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
680  m_activeSources[0]->handle(c_ptr1);
681  future1 = c_ptr1->get_future();
682  }
683  if (req2->size())
684  {
685  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
686  m_activeSources[1]->handle(c_ptr2);
687  future2 = c_ptr2->get_future();
688  }
689  if (req1->size() && req2->size())
690  {
691  std::future<IOSize> task = std::async(std::launch::deferred,
692  [](std::future<IOSize> a, std::future<IOSize> b){
693  // Wait until *both* results are available. This is essential
694  // as the callback may try referencing the RequestManager. If one
695  // throws an exception (causing the RequestManager to be destroyed by
696  // XrdFile) and the other has a failure, then the recovery code will
697  // reference the destroyed RequestManager.
698  //
699  // Unlike other places where we use shared/weak ptrs to maintain object
700  // lifetime and destruction asynchronously, we *cannot* destroy the request
701  // asynchronously as it is associated with a ROOT buffer. We must wait until we
702  // are guaranteed that XrdCl will not write into the ROOT buffer before we
703  // can return.
704  b.wait(); a.wait();
705  return b.get() + a.get();
706  },
707  std::move(future1),
708  std::move(future2));
709  timer.stop();
710  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
711  return task;
712  }
713  else if (req1->size()) { return future1; }
714  else if (req2->size()) { return future2; }
715  else
716  { // Degenerate case - no bytes to read.
717  std::promise<IOSize> p; p.set_value(0);
718  return p.get_future();
719  }
720 }
void start()
Definition: CPUTimer.cc:74
#define GET_CLOCK_MONOTONIC(ts)
assert(m_qm.get())
void addConnections(cms::Exception &)
def move
Definition: eostools.py:510
Times stop()
Definition: CPUTimer.cc:94
XrdCl::OpenFlags::Flags m_flags
void checkSources(timespec &now, IOSize requestSize)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2)
std::recursive_mutex m_source_mutex
std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 540 of file XrdRequestManager.cc.

References assert(), checkSources(), GET_CLOCK_MONOTONIC, fileCollector::now, pickSingleSource(), and source.

541 {
542  assert(c_ptr.get());
543  timespec now;
544  GET_CLOCK_MONOTONIC(now);
545  checkSources(now, c_ptr->getSize());
546 
547  std::shared_ptr<Source> source = pickSingleSource();
548  source->handle(c_ptr);
549  return c_ptr->get_future();
550 }
#define GET_CLOCK_MONOTONIC(ts)
assert(m_qm.get())
void checkSources(timespec &now, IOSize requestSize)
std::shared_ptr< Source > pickSingleSource()
static std::string const source
Definition: EdmProvDump.cc:42
void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 583 of file XrdRequestManager.cc.

References alignCSCRings::s, XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

584 {
585  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
586  if (status.IsOK())
587  {
588  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
589  for (const auto & s : m_activeSources)
590  {
591  if (source->ID() == s->ID())
592  {
593  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded source " << source->PrettyID()
594  << "; ignoring" << std::endl;
595  unsigned returned_count = ++m_excluded_active_count;
598  return;
599  }
600  }
601  for (const auto & s : m_inactiveSources)
602  {
603  if (source->ID() == s->ID())
604  {
605  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded inactive source " << source->PrettyID()
606  << "; ignoring" << std::endl;
608  return;
609  }
610  }
611  if (m_activeSources.size() < 2)
612  {
613  m_activeSources.push_back(source);
614  updateSiteInfo();
616  }
617  else
618  {
619  m_inactiveSources.push_back(source);
620  }
621  }
622  else
623  { // File-open failure - wait at least 120s before next attempt.
624  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
626  }
627 }
void updateSiteInfo(std::string orig_site="")
void queueUpdateCurrentServer(const std::string &)
std::vector< std::shared_ptr< Source > > m_inactiveSources
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
std::atomic< unsigned > m_excluded_active_count
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
tuple status
Definition: ntuplemaker.py:245
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 122 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), assert(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, mergeVDriftHistosByStation::file, reco_application_tbsim_DetSim-Digi_cfg::File, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), customizeTrackingMonitorSeedNumber::idx, m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, eostools::move(), prepareOpaqueString(), queueUpdateCurrentServer(), SendMonitoringInfo(), source, popcon_last_value_cfg::Source, ntuplemaker::status, AlCaHLTBitMon_QueryRunRegistry::string, updateCurrentServer(), updateSiteInfo(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

123 {
125 
126  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
127  if (env) {env->GetInt("StreamErrorWindow", m_timeout);}
128 
129  std::string orig_site;
130  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos))
131  {
132  std::string hostname;
133  if (Source::getHostname(orig_site, hostname))
134  {
135  Source::getDomain(hostname, orig_site);
136  }
137  }
138 
139  std::unique_ptr<XrdCl::File> file;
141  bool validFile = false;
142  const int retries = 5;
143  std::string excludeString;
144  for (int idx=0; idx<retries; idx++)
145  {
146  file.reset(new XrdCl::File());
147  auto opaque = prepareOpaqueString();
148  std::string new_filename = m_name + (opaque.size() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
149  SyncHostResponseHandler handler;
150  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
151  if (!openStatus.IsOK())
152  { // In this case, we failed immediately - this indicates we have previously tried to talk to this
153  // server and it was marked bad - xrootd couldn't even queue up the request internally!
154  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
155  // redirector being down or authentication failures.
156  ex.clearMessage();
157  ex.clearContext();
158  ex.clearAdditionalInfo();
159  ex << "XrdCl::File::Open(name='" << m_name
160  << "', flags=0x" << std::hex << m_flags
161  << ", permissions=0" << std::oct << m_perms << std::dec
162  << ") => error '" << openStatus.ToStr()
163  << "' (errno=" << openStatus.errNo << ", code=" << openStatus.code << ")";
164  ex.addContext("Calling XrdFile::open()");
165  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
166  throw ex;
167  }
168  handler.WaitForResponse();
169  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
170  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
171  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
172  assert(status);
173  if (status->IsOK())
174  {
175  validFile = true;
176  break;
177  }
178  else
179  {
180  ex.clearMessage();
181  ex.clearContext();
182  ex.clearAdditionalInfo();
183  ex << "XrdCl::File::Open(name='" << m_name
184  << "', flags=0x" << std::hex << m_flags
185  << ", permissions=0" << std::oct << m_perms << std::dec
186  << ") => error '" << status->ToStr()
187  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
188  ex.addContext("Calling XrdFile::open()");
189  addConnections(ex);
190  std::string dataServer, lastUrl;
191  file->GetProperty("DataServer", dataServer);
192  file->GetProperty("LastURL", lastUrl);
193  if (dataServer.size())
194  {
195  ex.addAdditionalInfo("Problematic data server: " + dataServer);
196  }
197  if (lastUrl.size())
198  {
199  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
200  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
201  }
202  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) != m_disabledSourceStrings.end())
203  {
204  ex << ". No additional data servers were found.";
205  throw ex;
206  }
207  if (dataServer.size())
208  {
209  m_disabledSourceStrings.insert(dataServer);
210  m_disabledExcludeStrings.insert(excludeString);
211  }
212  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
213  if (lastUrl == new_filename)
214  {
215  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
216  throw ex;
217  }
218  }
219  }
220  if (!validFile)
221  {
222  throw ex;
223  }
224  SendMonitoringInfo(*file);
225 
226  timespec ts;
228 
229  std::shared_ptr<Source> source(new Source(ts, std::move(file), excludeString));
230  {
231  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
232  m_activeSources.push_back(source);
233  updateSiteInfo(orig_site);
234  }
237 
238  m_lastSourceCheck = ts;
239  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
241 }
#define GET_CLOCK_MONOTONIC(ts)
assert(m_qm.get())
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:196
void updateSiteInfo(std::string orig_site="")
std::set< std::string > m_disabledSourceStrings
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
static void SendMonitoringInfo(XrdCl::File &file)
void queueUpdateCurrentServer(const std::string &)
void addConnections(cms::Exception &)
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:144
def move
Definition: eostools.py:510
std::set< std::string > m_disabledExcludeStrings
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:233
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
tuple status
Definition: ntuplemaker.py:245
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:116
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 502 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_nextInitialSourceToggle, m_perms, m_source_mutex, and source.

Referenced by handle().

503 {
504  std::shared_ptr<Source> source = nullptr;
505  {
506  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
507  if (m_activeSources.size() == 2)
508  {
510  {
511  source = m_activeSources[0];
513  }
514  else
515  {
516  source = m_activeSources[1];
518  }
519  }
520  else if (m_activeSources.empty())
521  {
523  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name
524  << "', flags=0x" << std::hex << m_flags
525  << ", permissions=0" << std::oct << m_perms << std::dec
526  << ") => Source used after fatal exception.";
527  ex.addContext("In XrdAdaptor::RequestManager::handle()");
528  addConnections(ex);
529  throw ex;
530  }
531  else
532  {
533  source = m_activeSources[0];
534  }
535  }
536  return source;
537 }
void addConnections(cms::Exception &)
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
std::string RequestManager::prepareOpaqueString ( )
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 553 of file XrdRequestManager.cc.

References prof2calltree::count, m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, and create_public_lumi_plots::tmp_str.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

554 {
555  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
556  std::stringstream ss;
557  ss << "tried=";
558  size_t count = 0;
559  for ( const auto & it : m_activeSources )
560  {
561  count++;
562  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
563  }
564  for ( const auto & it : m_inactiveSources )
565  {
566  count++;
567  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
568  }
569  for ( const auto & it : m_disabledExcludeStrings )
570  {
571  count++;
572  ss << it.substr(0, it.find(":")) << ",";
573  }
574  if (count)
575  {
576  std::string tmp_str = ss.str();
577  return tmp_str.substr(0, tmp_str.size()-1);
578  }
579  return "";
580 }
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::set< std::string > m_disabledExcludeStrings
std::vector< std::shared_ptr< Source > > m_activeSources
std::recursive_mutex m_source_mutex
void RequestManager::queueUpdateCurrentServer ( const std::string &  id)
private

Definition at line 270 of file XrdRequestManager.cc.

References XrdAdaptor::Source::getHostname(), m_serverToAdvertise, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize().

271 {
272  std::unique_ptr<std::string> hostname(new std::string(id));
273  if (Source::getHostname(id, *hostname))
274  {
275  std::string *null_hostname = nullptr;
276  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get()))
277  {
278  hostname.release();
279  }
280  }
281 }
std::atomic< std::string * > m_serverToAdvertise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:116
void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 723 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, fileCollector::now, seconds(), ntuplemaker::status, and updateSiteInfo().

724 {
725  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
726  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
727 
728  // Fail early for invalid responses - XrdFile has a separate path for handling this.
729  if (c_status.code == XrdCl::errInvalidResponse)
730  {
731  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
733  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name
734  << "', flags=0x" << std::hex << m_flags
735  << ", permissions=0" << std::oct << m_perms << std::dec
736  << ", old source=" << source_ptr->PrettyID()
737  << ") => Invalid ReadV response from server";
738  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
739  addConnections(ex);
740  throw ex;
741  }
742  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
743 
744  // Note that we do not delete the Source itself. That is because this
745  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
746  // In such a case, if you close a file in the handler, it will deadlock
747  m_disabledSourceStrings.insert(source_ptr->ID());
748  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
749  m_disabledSources.insert(source_ptr);
750 
751  if ((m_activeSources.size() > 0) && (m_activeSources[0].get() == source_ptr.get()))
752  {
753  m_activeSources.erase(m_activeSources.begin());
754  updateSiteInfo();
755  }
756  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
757  {
758  m_activeSources.erase(m_activeSources.begin()+1);
759  updateSiteInfo();
760  }
761  std::shared_ptr<Source> new_source;
762  if (m_activeSources.size() == 0)
763  {
764  std::shared_future<std::shared_ptr<Source> > future = m_open_handler->open();
765  timespec now;
766  GET_CLOCK_MONOTONIC(now);
768  // Note we only wait for 180 seconds here. This is because we've already failed
769  // once and the likelihood the program has some inconsistent state is decent.
770  // We'd much rather fail hard than deadlock!
771  sentry.unlock();
772  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout+10));
773  if (status == std::future_status::timeout)
774  {
776  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
777  << "', flags=0x" << std::hex << m_flags
778  << ", permissions=0" << std::oct << m_perms << std::dec
779  << ", old source=" << source_ptr->PrettyID()
780  << ") => timeout when waiting for file open";
781  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
782  addConnections(ex);
783  throw ex;
784  }
785  else
786  {
787  try
788  {
789  new_source = future.get();
790  }
791  catch (edm::Exception &ex)
792  {
793  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
794  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
795  throw;
796  }
797  }
798  sentry.lock();
799 
800  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) != m_disabledSourceStrings.end())
801  {
802  // The server gave us back a data node we requested excluded. Fatal!
804  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
805  << "', flags=0x" << std::hex << m_flags
806  << ", permissions=0" << std::oct << m_perms << std::dec
807  << ", old source=" << source_ptr->PrettyID()
808  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
809  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
810  addConnections(ex);
811  throw ex;
812  }
813  m_activeSources.push_back(new_source);
814  updateSiteInfo();
815  }
816  else
817  {
818  new_source = m_activeSources[0];
819  }
820  new_source->handle(c_ptr);
821 }
double seconds()
#define GET_CLOCK_MONOTONIC(ts)
void updateSiteInfo(std::string orig_site="")
std::set< std::string > m_disabledSourceStrings
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
void addConnections(cms::Exception &)
std::set< std::shared_ptr< Source > > m_disabledSources
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::set< std::string > m_disabledExcludeStrings
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
Definition: Exception.cc:227
tuple status
Definition: ntuplemaker.py:245
std::recursive_mutex m_source_mutex
void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2 
)
private

Given a client request, split it into two requests lists.

Definition at line 938 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::addContext(), assert(), consumeChunkBack(), consumeChunkFront(), TauDecayModes::dec, edm::errors::FileReadError, prof2calltree::front, bookConverter::max, IOPosBuffer::offset(), q1, q2, python.multivaluedict::sort(), contentValuesCheck::ss, validateList(), XRD_ADAPTOR_CHUNK_THRESHOLD, and XRD_CL_MAX_CHUNK.

939 {
940  if (iolist.size() == 0) return;
941  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
942  req1.reserve(iolist.size()/2+1);
943  req2.reserve(iolist.size()/2+1);
944  size_t front=0;
945 
946  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
947  float q1 = static_cast<float>(m_activeSources[0]->getQuality())+5;
948  float q2 = static_cast<float>(m_activeSources[1]->getQuality())+5;
949  IOSize chunk1, chunk2;
950  // Make sure the chunk size is at least 1024; little point to reads less than that size.
951  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
952  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
953 
954  IOSize size_orig = 0;
955  for (const auto & it : iolist) size_orig += it.size();
956 
957  while (tmp_iolist.size()-front > 0)
958  {
959  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) && (req2.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD))
960  { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
961  // are passed to the request manager. However, because we have a max chunk size, we increase
962  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
963  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
964  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
966  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name
967  << "', flags=0x" << std::hex << m_flags
968  << ", permissions=0" << std::oct << m_perms << std::dec
969  << ") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
970  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
971  addConnections(ex);
972  std::stringstream ss; ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
973  ex.addAdditionalInfo(ss.str());
974  std::stringstream ss2; ss2 << "Quality source 1 " << q1-5 << ", quality source 2: " << q2-5;
975  ex.addAdditionalInfo(ss2.str());
976  throw ex;
977  }
978  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
979  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
980  }
981  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
982  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
983 
984  IOSize size1 = validateList(req1);
985  IOSize size2 = validateList(req2);
986 
987  assert(size_orig == size1 + size2);
988 
989  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig << " bytes) split into requests size " << req1.size() << " (" << size1 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
990 }
assert(m_qm.get())
#define XRD_CL_MAX_CHUNK
double q2[4]
Definition: TauolaWrapper.h:88
#define XRD_ADAPTOR_CHUNK_THRESHOLD
void addConnections(cms::Exception &)
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
XrdCl::OpenFlags::Flags m_flags
double q1[4]
Definition: TauolaWrapper.h:87
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static IOSize validateList(const std::vector< IOPosBuffer > req)
size_t IOSize
Definition: IOTypes.h:14
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void RequestManager::updateCurrentServer ( )
inlineprivate

Update the StatisticsSenderService, if necessary, with the current server.

Update the StatisticsSenderService with the current server info.

As this accesses the edm::Service infrastructure, this MUST be called from an edm-managed thread. It CANNOT be called from an Xrootd-managed thread.

Definition at line 251 of file XrdRequestManager.cc.

References edm::Service< T >::isAvailable(), likely, m_serverToAdvertise, edm::storage::StatisticsSenderService::setCurrentServer(), statsService, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by initialize().

252 {
253  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
254  // a pending update. *However*, since we call this for every read, we'll get it
255  // eventually.
256  if (likely(!m_serverToAdvertise.load(std::memory_order_relaxed))) {return;}
257  std::string *hostname_ptr;
258  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr)))
259  {
260  std::unique_ptr<std::string> hostname(hostname_ptr);
262  if (statsService.isAvailable()) {
263  statsService->setCurrentServer(*hostname_ptr);
264  }
265  }
266 }
void setCurrentServer(const std::string &servername)
#define likely(x)
std::atomic< std::string * > m_serverToAdvertise
bool isAvailable() const
Definition: Service.h:46
XrdSiteStatisticsInformation * statsService
Definition: XrdSource.cc:108
void RequestManager::updateSiteInfo ( std::string  orig_site = "")
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 284 of file XrdRequestManager.cc.

References m_activeSites, m_activeSources, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by checkSourcesImpl(), compareSources(), initialize(), and requestFailure().

285 {
286  std::string siteA, siteB, siteList;
287  if (m_activeSources.size()) {siteA = m_activeSources[0]->Site();}
288  if (m_activeSources.size() == 2) {siteB = m_activeSources[1]->Site();}
289  siteList = siteA;
290  if (siteB.size() && (siteB != siteA)) {siteList = siteA + ", " + siteB;}
291  if (orig_site.size() && (orig_site != siteList))
292  {
293  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
294  m_activeSites = siteList;
295  }
296  else if (!orig_site.size() && (siteList != m_activeSites))
297  {
298  if (m_activeSites.size() >0 )
299  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << m_activeSites;
300  m_activeSites = siteList;
301  }
302 }
std::vector< std::shared_ptr< Source > > m_activeSources

Member Data Documentation

std::string XrdAdaptor::RequestManager::m_activeSites
private

Definition at line 197 of file XrdRequestManager.h.

Referenced by updateSiteInfo().

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 192 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), compareSources(), getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), initialize(), pickSingleSource(), prepareOpaqueString(), requestFailure(), and updateSiteInfo().

std::set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 195 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

std::set<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 196 of file XrdRequestManager.h.

Referenced by requestFailure().

std::set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 194 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 216 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 218 of file XrdRequestManager.h.

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private
std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 215 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 193 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), compareSources(), and prepareOpaqueString().

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 202 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

const std::string XrdAdaptor::RequestManager::m_name
private
timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 207 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), and initialize().

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 205 of file XrdRequestManager.h.

Referenced by pickSingleSource().

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 273 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private
std::atomic<std::string*> XrdAdaptor::RequestManager::m_serverToAdvertise
private

Definition at line 200 of file XrdRequestManager.h.

Referenced by queueUpdateCurrentServer(), and updateCurrentServer().

std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
private
int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 203 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 208 of file XrdRequestManager.h.

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3*60
static

Definition at line 46 of file XrdRequestManager.h.