CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Inheritance diagram for XrdAdaptor::RequestManager:

Classes

class  OpenHandler
 

Public Member Functions

void addConnections (cms::Exception &) const
 
std::shared_ptr< XrdCl::File > getActiveFile () const
 
void getActiveSourceNames (std::vector< std::string > &sources) const
 
void getDisabledSourceNames (std::vector< std::string > &sources) const
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources) const
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer > > iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
virtual ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr< RequestManagergetInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3*60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
void checkSourcesImpl (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
bool compareSources (const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString () const
 
void queueUpdateCurrentServer (const std::string &)
 
void reportSiteChange (std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
 
void updateCurrentServer ()
 

Private Attributes

std::vector< std::shared_ptr< Source > > m_activeSources
 
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
 
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHashm_disabledSources
 
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
 
std::uniform_real_distribution< float > m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr< Source > > m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
std::atomic< std::string * > m_serverToAdvertise
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 53 of file XrdRequestManager.h.

Constructor & Destructor Documentation

virtual XrdAdaptor::RequestManager::~RequestManager ( )
virtualdefault
RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 117 of file XrdRequestManager.cc.

118  : m_serverToAdvertise(nullptr),
121  m_name(filename),
122  m_flags(flags),
123  m_perms(perms),
124  m_distribution(0,100),
126 {
127 }
std::uniform_real_distribution< float > m_distribution
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const unsigned int XRD_DEFAULT_TIMEOUT
std::atomic< std::string * > m_serverToAdvertise
XrdCl::OpenFlags::Flags m_flags
XrdCl::Access::Mode m_perms
std::atomic< unsigned > m_excluded_active_count

Member Function Documentation

void RequestManager::addConnections ( cms::Exception ex) const

Add the list of active connections to the exception extra info.

Definition at line 512 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), and source.

Referenced by getActiveFile(), handle(), initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), pickSingleSource(), requestFailure(), and splitClientRequest().

513 {
514  std::vector<std::string> sources;
516  for (auto const& source : sources)
517  {
518  ex.addAdditionalInfo("Active source: " + source);
519  }
520  sources.clear();
521  getDisabledSourceNames(sources);
522  for (auto const& source : sources)
523  {
524  ex.addAdditionalInfo("Disabled source: " + source);
525  }
526 }
void getDisabledSourceNames(std::vector< std::string > &sources) const
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
static std::string const source
Definition: EdmProvDump.cc:47
void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 326 of file XrdRequestManager.cc.

References checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, and timeDiffMS().

Referenced by handle().

329 {
330  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check "
331  << timeDiffMS(now, m_lastSourceCheck) << "; last check "
332  << m_lastSourceCheck.tv_sec << "; now " <<now.tv_sec
333  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
334  if (timeDiffMS(now, m_lastSourceCheck) > 1000)
335  {
336  { // Be more aggressive about getting rid of very bad sources.
337  compareSources(now, 0, 1, activeSources, inactiveSources);
338  compareSources(now, 1, 0, activeSources, inactiveSources);
339  }
341  {
342  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
343  }
344  }
345 }
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Definition at line 373 of file XrdRequestManager.cc.

References compareSources(), m_distribution, m_generator, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, eostools::move(), cmsPerfSuiteHarvest::now, alignCSCRings::r, reportSiteChange(), indexGen::s2, source, timeDiffMS(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

377 {
378 
379  bool findNewSource = false;
380  if (activeSources.size() <= 1)
381  {
382  findNewSource = true;
383  }
384  else if (activeSources.size() > 1)
385  {
386  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality() << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
387  findNewSource |= compareSources(now, 0, 1, activeSources,inactiveSources);
388  findNewSource |= compareSources(now, 1, 0,activeSources, inactiveSources);
389 
390  // NOTE: We could probably replace the copy with a better sort function.
391  // However, there are typically very few sources and the correctness is more obvious right now.
392  std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(inactiveSources.size());
393  for (const auto & source : inactiveSources)
394  {
395  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY-1)*1000) {eligibleInactiveSources.push_back(source);}
396  }
397  auto bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
398  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
399  auto worstActiveSource = std::max_element(activeSources.cbegin(), activeSources.cend(),
400  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
401  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
402  {
403  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " <<(*bestInactiveSource)->PrettyID()
404  << ", quality " << (*bestInactiveSource)->getQuality();
405  }
406  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " <<(*worstActiveSource)->PrettyID()
407  << ", quality " << (*worstActiveSource)->getQuality();
408  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
409  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
410  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*activeSources[0]->getQuality()))
411  {
412  auto oldSources = activeSources;
413  activeSources.push_back(*bestInactiveSource);
414  reportSiteChange(oldSources, activeSources);
415  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {inactiveSources.erase(it); break;}
416  }
417  else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
418  {
419  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << (*worstActiveSource)->PrettyID()
420  << " from active sources due to quality (" << (*worstActiveSource)->getQuality()
421  << ") and promoting " << (*bestInactiveSource)->PrettyID() << " (quality: "
422  << (*bestInactiveSource)->getQuality() << ")" << std::endl;
423  (*worstActiveSource)->setLastDowngrade(now);
424  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {inactiveSources.erase(it); break;}
425  inactiveSources.emplace_back(std::move(*worstActiveSource));
426  auto oldSources = activeSources;
427  activeSources.erase(worstActiveSource);
428  activeSources.emplace_back(std::move(*bestInactiveSource));
429  reportSiteChange(oldSources, activeSources);
430  eligibleInactiveSources.clear();
431  for (const auto & source : inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source);
432  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
433  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
434  worstActiveSource = std::max_element(activeSources.begin(), activeSources.end(),
435  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
436  }
437  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000*XRD_ADAPTOR_LONG_OPEN_DELAY))
438  {
439  float r = m_distribution(m_generator);
441  {
442  findNewSource = true;
443  }
444  }
445  }
446  if (findNewSource)
447  {
448  m_open_handler->open();
450  }
451 
452  // Only aggressively look for new sources if we don't have two.
453  if (activeSources.size() == 2)
454  {
456  }
457  else
458  {
460  }
462 }
std::uniform_real_distribution< float > m_distribution
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_ptr< OpenHandler > m_open_handler
#define XRD_ADAPTOR_LONG_OPEN_DELAY
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static std::string const source
Definition: EdmProvDump.cc:47
def move(src, dest)
Definition: eostools.py:511
bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
) const
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 349 of file XrdRequestManager.cc.

References a, b, SiStripPI::max, and reportSiteChange().

Referenced by checkSources(), and checkSourcesImpl().

352 {
353  if (activeSources.size() < std::max(a, b)+1) {return false;}
354 
355  bool findNewSource = false;
356  if ((activeSources[a]->getQuality() > 5130) ||
357  ((activeSources[a]->getQuality() > 260) && (activeSources[b]->getQuality()*4 < activeSources[a]->getQuality())))
358  {
359  edm::LogVerbatim("XrdAdaptorInternal") << "Removing "
360  << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
361  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
362  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource = true;}
363  activeSources[a]->setLastDowngrade(now);
364  inactiveSources.emplace_back(activeSources[a]);
365  auto oldSources = activeSources;
366  activeSources.erase(activeSources.begin()+a);
367  reportSiteChange(oldSources,activeSources);
368  }
369  return findNewSource;
370 }
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  ) const

Return a pointer to an active file. Useful for metadata operations.

Definition at line 465 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_perms, and m_source_mutex.

466 {
467  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
468  if (m_activeSources.empty())
469  {
471  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name
472  << "', flags=0x" << std::hex << m_flags
473  << ", permissions=0" << std::oct << m_perms << std::dec
474  << ") => Source used after fatal exception.";
475  ex.addContext("In XrdAdaptor::RequestManager::handle()");
476  addConnections(ex);
477  throw ex;
478  }
479  return m_activeSources[0]->getFileHandle();
480 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
std::recursive_mutex m_source_mutex
void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 483 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

484 {
485  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
486  sources.reserve(m_activeSources.size());
487  for (auto const& source : m_activeSources) {
488  sources.push_back(source->ID());
489  }
490 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:47
std::recursive_mutex m_source_mutex
void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 503 of file XrdRequestManager.cc.

References m_disabledSourceStrings, and source.

Referenced by addConnections().

504 {
505  sources.reserve(m_disabledSourceStrings.size());
506  for (auto const& source : m_disabledSourceStrings) {
507  sources.push_back(source);
508  }
509 }
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static std::string const source
Definition: EdmProvDump.cc:47
const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 111 of file XrdRequestManager.h.

111 {return m_name;}
static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 121 of file XrdRequestManager.h.

References a, b, corrVsCorr::filename, flags, initialize(), instance, cmsPerfSuiteHarvest::now, mps_update::status, and AlCaHLTBitMon_QueryRunRegistry::string.

122  {
123  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
124  instance->initialize(instance);
125  return instance;
126  }
static PFTauRenderPlugin instance
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources) const

Definition at line 493 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

Referenced by addConnections().

494 {
495  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
496  sources.reserve(m_activeSources.size());
497  for (auto const& source : m_activeSources) {
498  sources.push_back(source->PrettyID());
499  }
500 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:47
std::recursive_mutex m_source_mutex
std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 63 of file XrdRequestManager.h.

References cmsBatch::handle, findQualityFiles::size, and btagGenBb_cfi::Status.

64  {
65  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
66  return handle(c_ptr);
67  }
size
Write out results.
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::future< IOSize > XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer > >  iolist)

Definition at line 677 of file XrdRequestManager.cc.

References a, addConnections(), cms::Exception::addContext(), b, checkSources(), TauDecayModes::dec, edm::errors::FileReadError, GET_CLOCK_MONOTONIC, m_activeSources, m_flags, m_inactiveSources, m_name, m_perms, m_source_mutex, eostools::move(), cmsPerfSuiteHarvest::now, AlCaHLTBitMon_ParallelJobs::p, splitClientRequest(), edm::CPUTimer::start(), edm::CPUTimer::stop(), TrackValidation_cff::task, and updateCurrentServer().

678 {
679  //Use a copy of m_activeSources and m_inactiveSources throughout this function
680  // in order to avoid holding the lock a long time and causing a deadlock.
681  // When the function is over we will update the values of the containers
682  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
683  {
684  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
685  activeSources = m_activeSources;
686  inactiveSources = m_inactiveSources;
687  }
688  //Make sure we update changes when we leave the function
689  std::shared_ptr<void*> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
690  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
691  m_activeSources = std::move(activeSources);
692  m_inactiveSources = std::move(inactiveSources);
693  });
694 
696 
697  timespec now;
698  GET_CLOCK_MONOTONIC(now);
699 
700  edm::CPUTimer timer;
701  timer.start();
702 
703  if (activeSources.size() == 1)
704  {
705  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
706  checkSources(now, c_ptr->getSize(), activeSources,inactiveSources);
707  activeSources[0]->handle(c_ptr);
708  return c_ptr->get_future();
709  }
710  // Make sure active
711  else if (activeSources.empty())
712  {
714  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name
715  << "', flags=0x" << std::hex << m_flags
716  << ", permissions=0" << std::oct << m_perms << std::dec
717  << ") => Source used after fatal exception.";
718  ex.addContext("In XrdAdaptor::RequestManager::handle()");
719  addConnections(ex);
720  throw ex;
721  }
722 
723  assert(iolist.get());
724  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
725  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
726  splitClientRequest(*iolist, *req1, *req2, activeSources);
727 
728  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
729  // CheckSources may have removed a source
730  if (activeSources.size() == 1)
731  {
732  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
733  activeSources[0]->handle(c_ptr);
734  return c_ptr->get_future();
735  }
736 
737  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
738  std::future<IOSize> future1, future2;
739  if (!req1->empty())
740  {
741  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
742  activeSources[0]->handle(c_ptr1);
743  future1 = c_ptr1->get_future();
744  }
745  if (!req2->empty())
746  {
747  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
748  activeSources[1]->handle(c_ptr2);
749  future2 = c_ptr2->get_future();
750  }
751  if (!req1->empty() && !req2->empty())
752  {
753  std::future<IOSize> task = std::async(std::launch::deferred,
754  [](std::future<IOSize> a, std::future<IOSize> b){
755  // Wait until *both* results are available. This is essential
756  // as the callback may try referencing the RequestManager. If one
757  // throws an exception (causing the RequestManager to be destroyed by
758  // XrdFile) and the other has a failure, then the recovery code will
759  // reference the destroyed RequestManager.
760  //
761  // Unlike other places where we use shared/weak ptrs to maintain object
762  // lifetime and destruction asynchronously, we *cannot* destroy the request
763  // asynchronously as it is associated with a ROOT buffer. We must wait until we
764  // are guaranteed that XrdCl will not write into the ROOT buffer before we
765  // can return.
766  b.wait(); a.wait();
767  return b.get() + a.get();
768  },
769  std::move(future1),
770  std::move(future2));
771  timer.stop();
772  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
773  return task;
774  }
775  else if (!req1->empty()) { return future1; }
776  else if (!req2->empty()) { return future2; }
777  else
778  { // Degenerate case - no bytes to read.
779  std::promise<IOSize> p; p.set_value(0);
780  return p.get_future();
781  }
782 }
void start()
Definition: CPUTimer.cc:68
#define GET_CLOCK_MONOTONIC(ts)
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
std::vector< std::shared_ptr< Source > > m_inactiveSources
Times stop()
Definition: CPUTimer.cc:87
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 567 of file XrdRequestManager.cc.

References checkSources(), GET_CLOCK_MONOTONIC, m_activeSources, m_inactiveSources, m_source_mutex, eostools::move(), cmsPerfSuiteHarvest::now, pickSingleSource(), source, and AlCaHLTBitMon_QueryRunRegistry::string.

568 {
569  assert(c_ptr.get());
570  timespec now;
571  GET_CLOCK_MONOTONIC(now);
572  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
573  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
574  {
575  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
576  activeSources = m_activeSources;
577  inactiveSources = m_inactiveSources;
578  }
579  {
580  //make sure we update values before calling pickSingelSource
581  std::shared_ptr<void*> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
582  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
583  m_activeSources = std::move(activeSources);
584  m_inactiveSources = std::move(inactiveSources);
585  });
586 
587  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
588  }
589 
590  std::shared_ptr<Source> source = pickSingleSource();
591  source->handle(c_ptr);
592  return c_ptr->get_future();
593 }
#define GET_CLOCK_MONOTONIC(ts)
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
std::shared_ptr< Source > pickSingleSource()
static std::string const source
Definition: EdmProvDump.cc:47
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 629 of file XrdRequestManager.cc.

References m_activeSources, m_excluded_active_count, m_inactiveSources, m_nextActiveSourceCheck, m_source_mutex, queueUpdateCurrentServer(), reportSiteChange(), alignCSCRings::s, XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

630 {
631  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
632  if (status.IsOK())
633  {
634  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
635  for (const auto & s : m_activeSources)
636  {
637  if (source->ID() == s->ID())
638  {
639  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded source " << source->PrettyID()
640  << "; ignoring" << std::endl;
641  unsigned returned_count = ++m_excluded_active_count;
644  return;
645  }
646  }
647  for (const auto & s : m_inactiveSources)
648  {
649  if (source->ID() == s->ID())
650  {
651  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded inactive source " << source->PrettyID()
652  << "; ignoring" << std::endl;
654  return;
655  }
656  }
657  if (m_activeSources.size() < 2)
658  {
659  auto oldSources = m_activeSources;
660  m_activeSources.push_back(source);
661  reportSiteChange(oldSources, m_activeSources);
663  }
664  else
665  {
666  m_inactiveSources.push_back(source);
667  }
668  }
669  else
670  { // File-open failure - wait at least 120s before next attempt.
671  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
673  }
674 }
void queueUpdateCurrentServer(const std::string &)
std::vector< std::shared_ptr< Source > > m_inactiveSources
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
std::atomic< unsigned > m_excluded_active_count
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static std::string const source
Definition: EdmProvDump.cc:47
std::recursive_mutex m_source_mutex
void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 131 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, FrontierConditions_GlobalTag_cff::file, ecalTB2006H4_GenSimDigiReco_cfg::File, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), training_settings::idx, m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, eostools::move(), pileupDistInMC::oldList, prepareOpaqueString(), queueUpdateCurrentServer(), reportSiteChange(), SendMonitoringInfo(), source, mps_update::status, AlCaHLTBitMon_QueryRunRegistry::string, updateCurrentServer(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

132 {
134 
135  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
136  if (env) {env->GetInt("StreamErrorWindow", m_timeout);}
137 
138  std::string orig_site;
139  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos))
140  {
141  std::string hostname;
142  if (Source::getHostname(orig_site, hostname))
143  {
144  Source::getDomain(hostname, orig_site);
145  }
146  }
147 
148  std::unique_ptr<XrdCl::File> file;
150  bool validFile = false;
151  const int retries = 5;
152  std::string excludeString;
153  for (int idx=0; idx<retries; idx++)
154  {
155  file.reset(new XrdCl::File());
156  auto opaque = prepareOpaqueString();
157  std::string new_filename = m_name + (!opaque.empty() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
158  SyncHostResponseHandler handler;
159  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
160  if (!openStatus.IsOK())
161  { // In this case, we failed immediately - this indicates we have previously tried to talk to this
162  // server and it was marked bad - xrootd couldn't even queue up the request internally!
163  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
164  // redirector being down or authentication failures.
165  ex.clearMessage();
166  ex.clearContext();
167  ex.clearAdditionalInfo();
168  ex << "XrdCl::File::Open(name='" << m_name
169  << "', flags=0x" << std::hex << m_flags
170  << ", permissions=0" << std::oct << m_perms << std::dec
171  << ") => error '" << openStatus.ToStr()
172  << "' (errno=" << openStatus.errNo << ", code=" << openStatus.code << ")";
173  ex.addContext("Calling XrdFile::open()");
174  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
175  throw ex;
176  }
177  handler.WaitForResponse();
178  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
179  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
180  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
181  assert(status);
182  if (status->IsOK())
183  {
184  validFile = true;
185  break;
186  }
187  else
188  {
189  ex.clearMessage();
190  ex.clearContext();
191  ex.clearAdditionalInfo();
192  ex << "XrdCl::File::Open(name='" << m_name
193  << "', flags=0x" << std::hex << m_flags
194  << ", permissions=0" << std::oct << m_perms << std::dec
195  << ") => error '" << status->ToStr()
196  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
197  ex.addContext("Calling XrdFile::open()");
198  addConnections(ex);
199  std::string dataServer, lastUrl;
200  file->GetProperty("DataServer", dataServer);
201  file->GetProperty("LastURL", lastUrl);
202  if (!dataServer.empty())
203  {
204  ex.addAdditionalInfo("Problematic data server: " + dataServer);
205  }
206  if (!lastUrl.empty())
207  {
208  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
209  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
210  }
211  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) != m_disabledSourceStrings.end())
212  {
213  ex << ". No additional data servers were found.";
214  throw ex;
215  }
216  if (!dataServer.empty())
217  {
218  m_disabledSourceStrings.insert(dataServer);
219  m_disabledExcludeStrings.insert(excludeString);
220  }
221  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
222  if (lastUrl == new_filename)
223  {
224  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
225  throw ex;
226  }
227  }
228  }
229  if (!validFile)
230  {
231  throw ex;
232  }
233  SendMonitoringInfo(*file);
234 
235  timespec ts;
237 
238  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
239  {
240  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
241  auto oldList = m_activeSources;
242  m_activeSources.push_back(source);
244  }
247 
248  m_lastSourceCheck = ts;
249  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
251 }
#define GET_CLOCK_MONOTONIC(ts)
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:307
std::string prepareOpaqueString() const
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
static void SendMonitoringInfo(XrdCl::File &file)
void addConnections(cms::Exception &) const
void queueUpdateCurrentServer(const std::string &)
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:255
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:344
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
static std::string const source
Definition: EdmProvDump.cc:47
def move(src, dest)
Definition: eostools.py:511
std::recursive_mutex m_source_mutex
std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 529 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_nextInitialSourceToggle, m_perms, m_source_mutex, and source.

Referenced by handle().

530 {
531  std::shared_ptr<Source> source = nullptr;
532  {
533  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
534  if (m_activeSources.size() == 2)
535  {
537  {
538  source = m_activeSources[0];
540  }
541  else
542  {
543  source = m_activeSources[1];
545  }
546  }
547  else if (m_activeSources.empty())
548  {
550  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name
551  << "', flags=0x" << std::hex << m_flags
552  << ", permissions=0" << std::oct << m_perms << std::dec
553  << ") => Source used after fatal exception.";
554  ex.addContext("In XrdAdaptor::RequestManager::handle()");
555  addConnections(ex);
556  throw ex;
557  }
558  else
559  {
560  source = m_activeSources[0];
561  }
562  }
563  return source;
564 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static std::string const source
Definition: EdmProvDump.cc:47
std::recursive_mutex m_source_mutex
std::string RequestManager::prepareOpaqueString ( ) const
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 596 of file XrdRequestManager.cc.

References KineDebug3::count(), m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, AlCaHLTBitMon_QueryRunRegistry::string, and create_public_lumi_plots::tmp_str.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

597 {
598  std::stringstream ss;
599  ss << "tried=";
600  size_t count = 0;
601  {
602  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
603 
604  for ( const auto & it : m_activeSources )
605  {
606  count++;
607  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
608  }
609  for ( const auto & it : m_inactiveSources )
610  {
611  count++;
612  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
613  }
614  }
615  for ( const auto & it : m_disabledExcludeStrings )
616  {
617  count++;
618  ss << it.substr(0, it.find(":")) << ",";
619  }
620  if (count)
621  {
622  std::string tmp_str = ss.str();
623  return tmp_str.substr(0, tmp_str.size()-1);
624  }
625  return "";
626 }
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::recursive_mutex m_source_mutex
void RequestManager::queueUpdateCurrentServer ( const std::string &  id)
private

Definition at line 280 of file XrdRequestManager.cc.

References XrdAdaptor::Source::getHostname(), triggerObjects_cff::id, m_serverToAdvertise, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by handleOpen(), and initialize().

281 {
282  auto hostname = std::make_unique<std::string>(id);
283  if (Source::getHostname(id, *hostname))
284  {
285  std::string *null_hostname = nullptr;
286  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get()))
287  {
288  hostname.release();
289  }
290  }
291 }
std::atomic< std::string * > m_serverToAdvertise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
void RequestManager::reportSiteChange ( std::vector< std::shared_ptr< Source > > const &  iOld,
std::vector< std::shared_ptr< Source > > const &  iNew,
std::string  orig_site = std::string{} 
) const
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 305 of file XrdRequestManager.cc.

Referenced by checkSourcesImpl(), compareSources(), handleOpen(), initialize(), and requestFailure().

308 {
309  auto siteList = formatSites(iNew);
310  if (!orig_site.empty() && (orig_site != siteList))
311  {
312  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
313  }
314  else {
315  auto oldSites = formatSites(iOld);
316  if (orig_site.empty() && (siteList != oldSites))
317  {
318  if (!oldSites.empty() )
319  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
320  }
321  }
322 }
void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 785 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, cmsPerfSuiteHarvest::now, reportSiteChange(), seconds(), mps_update::status, and mps_check::timeout.

786 {
787  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
788 
789  // Fail early for invalid responses - XrdFile has a separate path for handling this.
790  if (c_status.code == XrdCl::errInvalidResponse)
791  {
792  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
794  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name
795  << "', flags=0x" << std::hex << m_flags
796  << ", permissions=0" << std::oct << m_perms << std::dec
797  << ", old source=" << source_ptr->PrettyID()
798  << ") => Invalid ReadV response from server";
799  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
800  addConnections(ex);
801  throw ex;
802  }
803  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
804 
805  // Note that we do not delete the Source itself. That is because this
806  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
807  // In such a case, if you close a file in the handler, it will deadlock
808  m_disabledSourceStrings.insert(source_ptr->ID());
809  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
810  m_disabledSources.insert(source_ptr);
811 
812  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
813  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get()))
814  {
815  auto oldSources = m_activeSources;
816  m_activeSources.erase(m_activeSources.begin());
817  reportSiteChange(oldSources, m_activeSources);
818  }
819  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
820  {
821  auto oldSources = m_activeSources;
822  m_activeSources.erase(m_activeSources.begin()+1);
823  reportSiteChange(oldSources, m_activeSources);
824  }
825  std::shared_ptr<Source> new_source;
826  if (m_activeSources.empty())
827  {
828  std::shared_future<std::shared_ptr<Source> > future = m_open_handler->open();
829  timespec now;
830  GET_CLOCK_MONOTONIC(now);
832  // Note we only wait for 180 seconds here. This is because we've already failed
833  // once and the likelihood the program has some inconsistent state is decent.
834  // We'd much rather fail hard than deadlock!
835  sentry.unlock();
836  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout+10));
837  if (status == std::future_status::timeout)
838  {
840  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
841  << "', flags=0x" << std::hex << m_flags
842  << ", permissions=0" << std::oct << m_perms << std::dec
843  << ", old source=" << source_ptr->PrettyID()
844  << ") => timeout when waiting for file open";
845  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
846  addConnections(ex);
847  throw ex;
848  }
849  else
850  {
851  try
852  {
853  new_source = future.get();
854  }
855  catch (edm::Exception &ex)
856  {
857  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
858  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
859  throw;
860  }
861  }
862 
863  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) != m_disabledSourceStrings.end())
864  {
865  // The server gave us back a data node we requested excluded. Fatal!
867  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
868  << "', flags=0x" << std::hex << m_flags
869  << ", permissions=0" << std::oct << m_perms << std::dec
870  << ", old source=" << source_ptr->PrettyID()
871  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
872  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
873  addConnections(ex);
874  throw ex;
875  }
876  sentry.lock();
877 
878  auto oldSources = m_activeSources;
879  m_activeSources.push_back(new_source);
880  reportSiteChange(oldSources,m_activeSources);
881  }
882  else
883  {
884  new_source = m_activeSources[0];
885  }
886  new_source->handle(c_ptr);
887 }
double seconds()
#define GET_CLOCK_MONOTONIC(ts)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
int timeout
Definition: mps_check.py:53
void addConnections(cms::Exception &) const
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
Definition: Exception.cc:165
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::recursive_mutex m_source_mutex
void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2,
std::vector< std::shared_ptr< Source >> const &  activeSources 
) const
private

Given a client request, split it into two requests lists.

Definition at line 1004 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), consumeChunkBack(), consumeChunkFront(), TauDecayModes::dec, edm::errors::FileReadError, m_flags, m_name, m_perms, SiStripPI::max, IOPosBuffer::offset(), q1, q2, validateList(), XRD_ADAPTOR_CHUNK_THRESHOLD, and XRD_CL_MAX_CHUNK.

Referenced by handle().

1005 {
1006  if (iolist.empty()) return;
1007  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
1008  req1.reserve(iolist.size()/2+1);
1009  req2.reserve(iolist.size()/2+1);
1010  size_t front=0;
1011 
1012  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
1013  float q1 = static_cast<float>(activeSources[0]->getQuality())+5;
1014  float q2 = static_cast<float>(activeSources[1]->getQuality())+5;
1015  IOSize chunk1, chunk2;
1016  // Make sure the chunk size is at least 1024; little point to reads less than that size.
1017  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
1018  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
1019 
1020  IOSize size_orig = 0;
1021  for (const auto & it : iolist) size_orig += it.size();
1022 
1023  while (tmp_iolist.size()-front > 0)
1024  {
1025  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) && (req2.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD))
1026  { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
1027  // are passed to the request manager. However, because we have a max chunk size, we increase
1028  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
1029  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
1030  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
1032  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name
1033  << "', flags=0x" << std::hex << m_flags
1034  << ", permissions=0" << std::oct << m_perms << std::dec
1035  << ") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
1036  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
1037  addConnections(ex);
1038  std::stringstream ss; ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
1039  ex.addAdditionalInfo(ss.str());
1040  std::stringstream ss2; ss2 << "Quality source 1 " << q1-5 << ", quality source 2: " << q2-5;
1041  ex.addAdditionalInfo(ss2.str());
1042  throw ex;
1043  }
1044  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
1045  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
1046  }
1047  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
1048  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
1049 
1050  IOSize size1 = validateList(req1);
1051  IOSize size2 = validateList(req2);
1052 
1053  assert(size_orig == size1 + size2);
1054 
1055  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig << " bytes) split into requests size " << req1.size() << " (" << size1 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
1056 }
#define XRD_CL_MAX_CHUNK
double q2[4]
Definition: TauolaWrapper.h:88
#define XRD_ADAPTOR_CHUNK_THRESHOLD
void addConnections(cms::Exception &) const
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
XrdCl::OpenFlags::Flags m_flags
double q1[4]
Definition: TauolaWrapper.h:87
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
XrdCl::Access::Mode m_perms
static IOSize validateList(const std::vector< IOPosBuffer > req)
size_t IOSize
Definition: IOTypes.h:14
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void RequestManager::updateCurrentServer ( )
inlineprivate

Update the StatisticsSenderService, if necessary, with the current server.

Update the StatisticsSenderService with the current server info.

As this accesses the edm::Service infrastructure, this MUST be called from an edm-managed thread. It CANNOT be called from an Xrootd-managed thread.

Definition at line 261 of file XrdRequestManager.cc.

References edm::Service< T >::isAvailable(), LIKELY, m_serverToAdvertise, edm::storage::StatisticsSenderService::setCurrentServer(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by handle(), and initialize().

262 {
263  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
264  // a pending update. *However*, since we call this for every read, we'll get it
265  // eventually.
266  if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {return;}
267  std::string *hostname_ptr;
268  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr)))
269  {
270  std::unique_ptr<std::string> hostname(hostname_ptr);
272  if (statsService.isAvailable()) {
273  statsService->setCurrentServer(*hostname_ptr);
274  }
275  }
276 }
void setCurrentServer(const std::string &servername)
#define LIKELY(x)
Definition: Likely.h:20
std::atomic< std::string * > m_serverToAdvertise
bool isAvailable() const
Definition: Service.h:40

Member Data Documentation

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 212 of file XrdRequestManager.h.

Referenced by getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), handle(), handleOpen(), initialize(), pickSingleSource(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 216 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 217 of file XrdRequestManager.h.

Referenced by requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 215 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 237 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 239 of file XrdRequestManager.h.

Referenced by handleOpen().

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private
std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 236 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 213 of file XrdRequestManager.h.

Referenced by handle(), handleOpen(), and prepareOpaqueString().

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 223 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

const std::string XrdAdaptor::RequestManager::m_name
private
timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 228 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), handleOpen(), and initialize().

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 226 of file XrdRequestManager.h.

Referenced by pickSingleSource().

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 297 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private
std::atomic<std::string*> XrdAdaptor::RequestManager::m_serverToAdvertise
private

Definition at line 221 of file XrdRequestManager.h.

Referenced by queueUpdateCurrentServer(), and updateCurrentServer().

std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
mutableprivate
int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 224 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 229 of file XrdRequestManager.h.

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3*60
static

Definition at line 56 of file XrdRequestManager.h.