CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Inheritance diagram for XrdAdaptor::RequestManager:

Classes

class  OpenHandler
 

Public Member Functions

void addConnections (cms::Exception &) const
 
std::shared_ptr< XrdCl::File > getActiveFile () const
 
void getActiveSourceNames (std::vector< std::string > &sources) const
 
void getDisabledSourceNames (std::vector< std::string > &sources) const
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources) const
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer > > iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
virtual ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr< RequestManagergetInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3*60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
void checkSourcesImpl (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
bool compareSources (const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString () const
 
void queueUpdateCurrentServer (const std::string &)
 
void reportSiteChange (std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
 
void updateCurrentServer ()
 

Private Attributes

std::vector< std::shared_ptr< Source > > m_activeSources
 
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
 
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHashm_disabledSources
 
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
 
std::uniform_real_distribution< float > m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr< Source > > m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
std::atomic< std::string * > m_serverToAdvertise
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 53 of file XrdRequestManager.h.

Constructor & Destructor Documentation

virtual XrdAdaptor::RequestManager::~RequestManager ( )
virtualdefault
RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 115 of file XrdRequestManager.cc.

116  : m_serverToAdvertise(nullptr),
119  m_name(filename),
120  m_flags(flags),
121  m_perms(perms),
122  m_distribution(0,100),
124 {
125 }
std::uniform_real_distribution< float > m_distribution
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const unsigned int XRD_DEFAULT_TIMEOUT
std::atomic< std::string * > m_serverToAdvertise
XrdCl::OpenFlags::Flags m_flags
XrdCl::Access::Mode m_perms
std::atomic< unsigned > m_excluded_active_count

Member Function Documentation

void RequestManager::addConnections ( cms::Exception ex) const

Add the list of active connections to the exception extra info.

Definition at line 510 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), and source.

Referenced by getActiveFile(), handle(), initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), pickSingleSource(), requestFailure(), and splitClientRequest().

511 {
512  std::vector<std::string> sources;
514  for (auto const& source : sources)
515  {
516  ex.addAdditionalInfo("Active source: " + source);
517  }
518  sources.clear();
519  getDisabledSourceNames(sources);
520  for (auto const& source : sources)
521  {
522  ex.addAdditionalInfo("Disabled source: " + source);
523  }
524 }
void getDisabledSourceNames(std::vector< std::string > &sources) const
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
static std::string const source
Definition: EdmProvDump.cc:43
void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 324 of file XrdRequestManager.cc.

References checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, and timeDiffMS().

Referenced by handle().

327 {
328  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check "
329  << timeDiffMS(now, m_lastSourceCheck) << "; last check "
330  << m_lastSourceCheck.tv_sec << "; now " <<now.tv_sec
331  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
332  if (timeDiffMS(now, m_lastSourceCheck) > 1000)
333  {
334  { // Be more aggressive about getting rid of very bad sources.
335  compareSources(now, 0, 1, activeSources, inactiveSources);
336  compareSources(now, 1, 0, activeSources, inactiveSources);
337  }
339  {
340  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
341  }
342  }
343 }
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Definition at line 371 of file XrdRequestManager.cc.

References compareSources(), m_distribution, m_generator, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, eostools::move(), cmsPerfSuiteHarvest::now, alignCSCRings::r, reportSiteChange(), indexGen::s2, source, timeDiffMS(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

375 {
376 
377  bool findNewSource = false;
378  if (activeSources.size() <= 1)
379  {
380  findNewSource = true;
381  }
382  else if (activeSources.size() > 1)
383  {
384  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality() << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
385  findNewSource |= compareSources(now, 0, 1, activeSources,inactiveSources);
386  findNewSource |= compareSources(now, 1, 0,activeSources, inactiveSources);
387 
388  // NOTE: We could probably replace the copy with a better sort function.
389  // However, there are typically very few sources and the correctness is more obvious right now.
390  std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(inactiveSources.size());
391  for (const auto & source : inactiveSources)
392  {
393  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY-1)*1000) {eligibleInactiveSources.push_back(source);}
394  }
395  auto bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
396  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
397  auto worstActiveSource = std::max_element(activeSources.cbegin(), activeSources.cend(),
398  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
399  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
400  {
401  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " <<(*bestInactiveSource)->PrettyID()
402  << ", quality " << (*bestInactiveSource)->getQuality();
403  }
404  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " <<(*worstActiveSource)->PrettyID()
405  << ", quality " << (*worstActiveSource)->getQuality();
406  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
407  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
408  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*activeSources[0]->getQuality()))
409  {
410  auto oldSources = activeSources;
411  activeSources.push_back(*bestInactiveSource);
412  reportSiteChange(oldSources, activeSources);
413  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {inactiveSources.erase(it); break;}
414  }
415  else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
416  {
417  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << (*worstActiveSource)->PrettyID()
418  << " from active sources due to quality (" << (*worstActiveSource)->getQuality()
419  << ") and promoting " << (*bestInactiveSource)->PrettyID() << " (quality: "
420  << (*bestInactiveSource)->getQuality() << ")" << std::endl;
421  (*worstActiveSource)->setLastDowngrade(now);
422  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {inactiveSources.erase(it); break;}
423  inactiveSources.emplace_back(std::move(*worstActiveSource));
424  auto oldSources = activeSources;
425  activeSources.erase(worstActiveSource);
426  activeSources.emplace_back(std::move(*bestInactiveSource));
427  reportSiteChange(oldSources, activeSources);
428  eligibleInactiveSources.clear();
429  for (const auto & source : inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source);
430  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
431  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
432  worstActiveSource = std::max_element(activeSources.begin(), activeSources.end(),
433  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
434  }
435  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000*XRD_ADAPTOR_LONG_OPEN_DELAY))
436  {
437  float r = m_distribution(m_generator);
439  {
440  findNewSource = true;
441  }
442  }
443  }
444  if (findNewSource)
445  {
446  m_open_handler->open();
448  }
449 
450  // Only aggressively look for new sources if we don't have two.
451  if (activeSources.size() == 2)
452  {
454  }
455  else
456  {
458  }
460 }
std::uniform_real_distribution< float > m_distribution
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_ptr< OpenHandler > m_open_handler
#define XRD_ADAPTOR_LONG_OPEN_DELAY
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static std::string const source
Definition: EdmProvDump.cc:43
def move(src, dest)
Definition: eostools.py:510
bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
) const
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 347 of file XrdRequestManager.cc.

References a, b, hpstanc_transforms::max, and reportSiteChange().

Referenced by checkSources(), and checkSourcesImpl().

350 {
351  if (activeSources.size() < std::max(a, b)+1) {return false;}
352 
353  bool findNewSource = false;
354  if ((activeSources[a]->getQuality() > 5130) ||
355  ((activeSources[a]->getQuality() > 260) && (activeSources[b]->getQuality()*4 < activeSources[a]->getQuality())))
356  {
357  edm::LogVerbatim("XrdAdaptorInternal") << "Removing "
358  << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
359  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
360  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource = true;}
361  activeSources[a]->setLastDowngrade(now);
362  inactiveSources.emplace_back(activeSources[a]);
363  auto oldSources = activeSources;
364  activeSources.erase(activeSources.begin()+a);
365  reportSiteChange(oldSources,activeSources);
366  }
367  return findNewSource;
368 }
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  ) const

Return a pointer to an active file. Useful for metadata operations.

Definition at line 463 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_perms, and m_source_mutex.

464 {
465  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
466  if (m_activeSources.empty())
467  {
469  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name
470  << "', flags=0x" << std::hex << m_flags
471  << ", permissions=0" << std::oct << m_perms << std::dec
472  << ") => Source used after fatal exception.";
473  ex.addContext("In XrdAdaptor::RequestManager::handle()");
474  addConnections(ex);
475  throw ex;
476  }
477  return m_activeSources[0]->getFileHandle();
478 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
std::recursive_mutex m_source_mutex
void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 481 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

482 {
483  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
484  sources.reserve(m_activeSources.size());
485  for (auto const& source : m_activeSources) {
486  sources.push_back(source->ID());
487  }
488 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 501 of file XrdRequestManager.cc.

References m_disabledSourceStrings, and source.

Referenced by addConnections().

502 {
503  sources.reserve(m_disabledSourceStrings.size());
504  for (auto const& source : m_disabledSourceStrings) {
505  sources.push_back(source);
506  }
507 }
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static std::string const source
Definition: EdmProvDump.cc:43
const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 111 of file XrdRequestManager.h.

111 {return m_name;}
static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 121 of file XrdRequestManager.h.

References a, b, corrVsCorr::filename, flags, initialize(), instance, cmsPerfSuiteHarvest::now, mps_update::status, and AlCaHLTBitMon_QueryRunRegistry::string.

122  {
123  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
124  instance->initialize(instance);
125  return instance;
126  }
static PFTauRenderPlugin instance
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources) const

Definition at line 491 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

Referenced by addConnections().

492 {
493  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
494  sources.reserve(m_activeSources.size());
495  for (auto const& source : m_activeSources) {
496  sources.push_back(source->PrettyID());
497  }
498 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 63 of file XrdRequestManager.h.

References cmsBatch::handle, findQualityFiles::size, and btagGenBb_cfi::Status.

64  {
65  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
66  return handle(c_ptr);
67  }
size
Write out results.
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::future< IOSize > XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer > >  iolist)

Definition at line 675 of file XrdRequestManager.cc.

References a, addConnections(), cms::Exception::addContext(), b, checkSources(), TauDecayModes::dec, edm::errors::FileReadError, GET_CLOCK_MONOTONIC, m_activeSources, m_flags, m_inactiveSources, m_name, m_perms, m_source_mutex, eostools::move(), cmsPerfSuiteHarvest::now, AlCaHLTBitMon_ParallelJobs::p, splitClientRequest(), edm::CPUTimer::start(), edm::CPUTimer::stop(), and updateCurrentServer().

676 {
677  //Use a copy of m_activeSources and m_inactiveSources throughout this function
678  // in order to avoid holding the lock a long time and causing a deadlock.
679  // When the function is over we will update the values of the containers
680  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
681  {
682  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
683  activeSources = m_activeSources;
684  inactiveSources = m_inactiveSources;
685  }
686  //Make sure we update changes when we leave the function
687  std::shared_ptr<void*> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
688  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
689  m_activeSources = std::move(activeSources);
690  m_inactiveSources = std::move(inactiveSources);
691  });
692 
694 
695  timespec now;
696  GET_CLOCK_MONOTONIC(now);
697 
698  edm::CPUTimer timer;
699  timer.start();
700 
701  if (activeSources.size() == 1)
702  {
703  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
704  checkSources(now, c_ptr->getSize(), activeSources,inactiveSources);
705  activeSources[0]->handle(c_ptr);
706  return c_ptr->get_future();
707  }
708  // Make sure active
709  else if (activeSources.empty())
710  {
712  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name
713  << "', flags=0x" << std::hex << m_flags
714  << ", permissions=0" << std::oct << m_perms << std::dec
715  << ") => Source used after fatal exception.";
716  ex.addContext("In XrdAdaptor::RequestManager::handle()");
717  addConnections(ex);
718  throw ex;
719  }
720 
721  assert(iolist.get());
722  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
723  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
724  splitClientRequest(*iolist, *req1, *req2, activeSources);
725 
726  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
727  // CheckSources may have removed a source
728  if (activeSources.size() == 1)
729  {
730  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
731  activeSources[0]->handle(c_ptr);
732  return c_ptr->get_future();
733  }
734 
735  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
736  std::future<IOSize> future1, future2;
737  if (req1->size())
738  {
739  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
740  activeSources[0]->handle(c_ptr1);
741  future1 = c_ptr1->get_future();
742  }
743  if (req2->size())
744  {
745  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
746  activeSources[1]->handle(c_ptr2);
747  future2 = c_ptr2->get_future();
748  }
749  if (req1->size() && req2->size())
750  {
751  std::future<IOSize> task = std::async(std::launch::deferred,
752  [](std::future<IOSize> a, std::future<IOSize> b){
753  // Wait until *both* results are available. This is essential
754  // as the callback may try referencing the RequestManager. If one
755  // throws an exception (causing the RequestManager to be destroyed by
756  // XrdFile) and the other has a failure, then the recovery code will
757  // reference the destroyed RequestManager.
758  //
759  // Unlike other places where we use shared/weak ptrs to maintain object
760  // lifetime and destruction asynchronously, we *cannot* destroy the request
761  // asynchronously as it is associated with a ROOT buffer. We must wait until we
762  // are guaranteed that XrdCl will not write into the ROOT buffer before we
763  // can return.
764  b.wait(); a.wait();
765  return b.get() + a.get();
766  },
767  std::move(future1),
768  std::move(future2));
769  timer.stop();
770  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
771  return task;
772  }
773  else if (req1->size()) { return future1; }
774  else if (req2->size()) { return future2; }
775  else
776  { // Degenerate case - no bytes to read.
777  std::promise<IOSize> p; p.set_value(0);
778  return p.get_future();
779  }
780 }
void start()
Definition: CPUTimer.cc:74
#define GET_CLOCK_MONOTONIC(ts)
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
std::vector< std::shared_ptr< Source > > m_inactiveSources
Times stop()
Definition: CPUTimer.cc:94
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
def move(src, dest)
Definition: eostools.py:510
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 565 of file XrdRequestManager.cc.

References checkSources(), GET_CLOCK_MONOTONIC, m_activeSources, m_inactiveSources, m_source_mutex, eostools::move(), cmsPerfSuiteHarvest::now, pickSingleSource(), source, and AlCaHLTBitMon_QueryRunRegistry::string.

566 {
567  assert(c_ptr.get());
568  timespec now;
569  GET_CLOCK_MONOTONIC(now);
570  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
571  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
572  {
573  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
574  activeSources = m_activeSources;
575  inactiveSources = m_inactiveSources;
576  }
577  {
578  //make sure we update values before calling pickSingelSource
579  std::shared_ptr<void*> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
580  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
581  m_activeSources = std::move(activeSources);
582  m_inactiveSources = std::move(inactiveSources);
583  });
584 
585  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
586  }
587 
588  std::shared_ptr<Source> source = pickSingleSource();
589  source->handle(c_ptr);
590  return c_ptr->get_future();
591 }
#define GET_CLOCK_MONOTONIC(ts)
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
std::shared_ptr< Source > pickSingleSource()
static std::string const source
Definition: EdmProvDump.cc:43
def move(src, dest)
Definition: eostools.py:510
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 627 of file XrdRequestManager.cc.

References m_activeSources, m_excluded_active_count, m_inactiveSources, m_nextActiveSourceCheck, m_source_mutex, queueUpdateCurrentServer(), reportSiteChange(), alignCSCRings::s, XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

628 {
629  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
630  if (status.IsOK())
631  {
632  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
633  for (const auto & s : m_activeSources)
634  {
635  if (source->ID() == s->ID())
636  {
637  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded source " << source->PrettyID()
638  << "; ignoring" << std::endl;
639  unsigned returned_count = ++m_excluded_active_count;
642  return;
643  }
644  }
645  for (const auto & s : m_inactiveSources)
646  {
647  if (source->ID() == s->ID())
648  {
649  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded inactive source " << source->PrettyID()
650  << "; ignoring" << std::endl;
652  return;
653  }
654  }
655  if (m_activeSources.size() < 2)
656  {
657  auto oldSources = m_activeSources;
658  m_activeSources.push_back(source);
659  reportSiteChange(oldSources, m_activeSources);
661  }
662  else
663  {
664  m_inactiveSources.push_back(source);
665  }
666  }
667  else
668  { // File-open failure - wait at least 120s before next attempt.
669  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
671  }
672 }
void queueUpdateCurrentServer(const std::string &)
std::vector< std::shared_ptr< Source > > m_inactiveSources
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
std::atomic< unsigned > m_excluded_active_count
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 129 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, FrontierConditions_GlobalTag_cff::file, ecalTB2006H4_GenSimDigiReco_cfg::File, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), training_settings::idx, m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, eostools::move(), pileupDistInMC::oldList, prepareOpaqueString(), queueUpdateCurrentServer(), reportSiteChange(), SendMonitoringInfo(), source, mps_update::status, AlCaHLTBitMon_QueryRunRegistry::string, updateCurrentServer(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

130 {
132 
133  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
134  if (env) {env->GetInt("StreamErrorWindow", m_timeout);}
135 
136  std::string orig_site;
137  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos))
138  {
139  std::string hostname;
140  if (Source::getHostname(orig_site, hostname))
141  {
142  Source::getDomain(hostname, orig_site);
143  }
144  }
145 
146  std::unique_ptr<XrdCl::File> file;
148  bool validFile = false;
149  const int retries = 5;
150  std::string excludeString;
151  for (int idx=0; idx<retries; idx++)
152  {
153  file.reset(new XrdCl::File());
154  auto opaque = prepareOpaqueString();
155  std::string new_filename = m_name + (opaque.size() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
156  SyncHostResponseHandler handler;
157  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
158  if (!openStatus.IsOK())
159  { // In this case, we failed immediately - this indicates we have previously tried to talk to this
160  // server and it was marked bad - xrootd couldn't even queue up the request internally!
161  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
162  // redirector being down or authentication failures.
163  ex.clearMessage();
164  ex.clearContext();
165  ex.clearAdditionalInfo();
166  ex << "XrdCl::File::Open(name='" << m_name
167  << "', flags=0x" << std::hex << m_flags
168  << ", permissions=0" << std::oct << m_perms << std::dec
169  << ") => error '" << openStatus.ToStr()
170  << "' (errno=" << openStatus.errNo << ", code=" << openStatus.code << ")";
171  ex.addContext("Calling XrdFile::open()");
172  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
173  throw ex;
174  }
175  handler.WaitForResponse();
176  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
177  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
178  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
179  assert(status);
180  if (status->IsOK())
181  {
182  validFile = true;
183  break;
184  }
185  else
186  {
187  ex.clearMessage();
188  ex.clearContext();
189  ex.clearAdditionalInfo();
190  ex << "XrdCl::File::Open(name='" << m_name
191  << "', flags=0x" << std::hex << m_flags
192  << ", permissions=0" << std::oct << m_perms << std::dec
193  << ") => error '" << status->ToStr()
194  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
195  ex.addContext("Calling XrdFile::open()");
196  addConnections(ex);
197  std::string dataServer, lastUrl;
198  file->GetProperty("DataServer", dataServer);
199  file->GetProperty("LastURL", lastUrl);
200  if (dataServer.size())
201  {
202  ex.addAdditionalInfo("Problematic data server: " + dataServer);
203  }
204  if (lastUrl.size())
205  {
206  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
207  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
208  }
209  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) != m_disabledSourceStrings.end())
210  {
211  ex << ". No additional data servers were found.";
212  throw ex;
213  }
214  if (dataServer.size())
215  {
216  m_disabledSourceStrings.insert(dataServer);
217  m_disabledExcludeStrings.insert(excludeString);
218  }
219  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
220  if (lastUrl == new_filename)
221  {
222  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
223  throw ex;
224  }
225  }
226  }
227  if (!validFile)
228  {
229  throw ex;
230  }
231  SendMonitoringInfo(*file);
232 
233  timespec ts;
235 
236  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
237  {
238  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
239  auto oldList = m_activeSources;
240  m_activeSources.push_back(source);
242  }
245 
246  m_lastSourceCheck = ts;
247  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
249 }
#define GET_CLOCK_MONOTONIC(ts)
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:307
std::string prepareOpaqueString() const
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
static void SendMonitoringInfo(XrdCl::File &file)
void addConnections(cms::Exception &) const
void queueUpdateCurrentServer(const std::string &)
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:255
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:344
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
static std::string const source
Definition: EdmProvDump.cc:43
def move(src, dest)
Definition: eostools.py:510
std::recursive_mutex m_source_mutex
std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 527 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_nextInitialSourceToggle, m_perms, m_source_mutex, and source.

Referenced by handle().

528 {
529  std::shared_ptr<Source> source = nullptr;
530  {
531  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
532  if (m_activeSources.size() == 2)
533  {
535  {
536  source = m_activeSources[0];
538  }
539  else
540  {
541  source = m_activeSources[1];
543  }
544  }
545  else if (m_activeSources.empty())
546  {
548  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name
549  << "', flags=0x" << std::hex << m_flags
550  << ", permissions=0" << std::oct << m_perms << std::dec
551  << ") => Source used after fatal exception.";
552  ex.addContext("In XrdAdaptor::RequestManager::handle()");
553  addConnections(ex);
554  throw ex;
555  }
556  else
557  {
558  source = m_activeSources[0];
559  }
560  }
561  return source;
562 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
std::string RequestManager::prepareOpaqueString ( ) const
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 594 of file XrdRequestManager.cc.

References KineDebug3::count(), m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, AlCaHLTBitMon_QueryRunRegistry::string, and create_public_lumi_plots::tmp_str.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

595 {
596  std::stringstream ss;
597  ss << "tried=";
598  size_t count = 0;
599  {
600  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
601 
602  for ( const auto & it : m_activeSources )
603  {
604  count++;
605  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
606  }
607  for ( const auto & it : m_inactiveSources )
608  {
609  count++;
610  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
611  }
612  }
613  for ( const auto & it : m_disabledExcludeStrings )
614  {
615  count++;
616  ss << it.substr(0, it.find(":")) << ",";
617  }
618  if (count)
619  {
620  std::string tmp_str = ss.str();
621  return tmp_str.substr(0, tmp_str.size()-1);
622  }
623  return "";
624 }
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::recursive_mutex m_source_mutex
void RequestManager::queueUpdateCurrentServer ( const std::string &  id)
private

Definition at line 278 of file XrdRequestManager.cc.

References XrdAdaptor::Source::getHostname(), hcalTTPDigis_cfi::id, m_serverToAdvertise, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by handleOpen(), and initialize().

279 {
280  auto hostname = std::make_unique<std::string>(id);
281  if (Source::getHostname(id, *hostname))
282  {
283  std::string *null_hostname = nullptr;
284  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get()))
285  {
286  hostname.release();
287  }
288  }
289 }
std::atomic< std::string * > m_serverToAdvertise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
void RequestManager::reportSiteChange ( std::vector< std::shared_ptr< Source > > const &  iOld,
std::vector< std::shared_ptr< Source > > const &  iNew,
std::string  orig_site = std::string{} 
) const
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 303 of file XrdRequestManager.cc.

Referenced by checkSourcesImpl(), compareSources(), handleOpen(), initialize(), and requestFailure().

306 {
307  auto siteList = formatSites(iNew);
308  if (orig_site.size() && (orig_site != siteList))
309  {
310  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
311  }
312  else {
313  auto oldSites = formatSites(iOld);
314  if (!orig_site.size() && (siteList != oldSites))
315  {
316  if (oldSites.size() >0 )
317  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
318  }
319  }
320 }
void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 783 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, cmsPerfSuiteHarvest::now, reportSiteChange(), seconds(), mps_update::status, and mps_check::timeout.

784 {
785  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
786 
787  // Fail early for invalid responses - XrdFile has a separate path for handling this.
788  if (c_status.code == XrdCl::errInvalidResponse)
789  {
790  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
792  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name
793  << "', flags=0x" << std::hex << m_flags
794  << ", permissions=0" << std::oct << m_perms << std::dec
795  << ", old source=" << source_ptr->PrettyID()
796  << ") => Invalid ReadV response from server";
797  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
798  addConnections(ex);
799  throw ex;
800  }
801  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
802 
803  // Note that we do not delete the Source itself. That is because this
804  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
805  // In such a case, if you close a file in the handler, it will deadlock
806  m_disabledSourceStrings.insert(source_ptr->ID());
807  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
808  m_disabledSources.insert(source_ptr);
809 
810  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
811  if ((m_activeSources.size() > 0) && (m_activeSources[0].get() == source_ptr.get()))
812  {
813  auto oldSources = m_activeSources;
814  m_activeSources.erase(m_activeSources.begin());
815  reportSiteChange(oldSources, m_activeSources);
816  }
817  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
818  {
819  auto oldSources = m_activeSources;
820  m_activeSources.erase(m_activeSources.begin()+1);
821  reportSiteChange(oldSources, m_activeSources);
822  }
823  std::shared_ptr<Source> new_source;
824  if (m_activeSources.size() == 0)
825  {
826  std::shared_future<std::shared_ptr<Source> > future = m_open_handler->open();
827  timespec now;
828  GET_CLOCK_MONOTONIC(now);
830  // Note we only wait for 180 seconds here. This is because we've already failed
831  // once and the likelihood the program has some inconsistent state is decent.
832  // We'd much rather fail hard than deadlock!
833  sentry.unlock();
834  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout+10));
835  if (status == std::future_status::timeout)
836  {
838  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
839  << "', flags=0x" << std::hex << m_flags
840  << ", permissions=0" << std::oct << m_perms << std::dec
841  << ", old source=" << source_ptr->PrettyID()
842  << ") => timeout when waiting for file open";
843  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
844  addConnections(ex);
845  throw ex;
846  }
847  else
848  {
849  try
850  {
851  new_source = future.get();
852  }
853  catch (edm::Exception &ex)
854  {
855  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
856  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
857  throw;
858  }
859  }
860 
861  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) != m_disabledSourceStrings.end())
862  {
863  // The server gave us back a data node we requested excluded. Fatal!
865  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
866  << "', flags=0x" << std::hex << m_flags
867  << ", permissions=0" << std::oct << m_perms << std::dec
868  << ", old source=" << source_ptr->PrettyID()
869  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
870  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
871  addConnections(ex);
872  throw ex;
873  }
874  sentry.lock();
875 
876  auto oldSources = m_activeSources;
877  m_activeSources.push_back(new_source);
878  reportSiteChange(oldSources,m_activeSources);
879  }
880  else
881  {
882  new_source = m_activeSources[0];
883  }
884  new_source->handle(c_ptr);
885 }
double seconds()
#define GET_CLOCK_MONOTONIC(ts)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
int timeout
Definition: mps_check.py:48
void addConnections(cms::Exception &) const
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
Definition: Exception.cc:227
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::recursive_mutex m_source_mutex
void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2,
std::vector< std::shared_ptr< Source >> const &  activeSources 
) const
private

Given a client request, split it into two requests lists.

Definition at line 1002 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), consumeChunkBack(), consumeChunkFront(), TauDecayModes::dec, edm::errors::FileReadError, m_flags, m_name, m_perms, hpstanc_transforms::max, IOPosBuffer::offset(), q1, q2, validateList(), XRD_ADAPTOR_CHUNK_THRESHOLD, and XRD_CL_MAX_CHUNK.

Referenced by handle().

1003 {
1004  if (iolist.size() == 0) return;
1005  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
1006  req1.reserve(iolist.size()/2+1);
1007  req2.reserve(iolist.size()/2+1);
1008  size_t front=0;
1009 
1010  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
1011  float q1 = static_cast<float>(activeSources[0]->getQuality())+5;
1012  float q2 = static_cast<float>(activeSources[1]->getQuality())+5;
1013  IOSize chunk1, chunk2;
1014  // Make sure the chunk size is at least 1024; little point to reads less than that size.
1015  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
1016  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
1017 
1018  IOSize size_orig = 0;
1019  for (const auto & it : iolist) size_orig += it.size();
1020 
1021  while (tmp_iolist.size()-front > 0)
1022  {
1023  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) && (req2.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD))
1024  { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
1025  // are passed to the request manager. However, because we have a max chunk size, we increase
1026  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
1027  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
1028  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
1030  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name
1031  << "', flags=0x" << std::hex << m_flags
1032  << ", permissions=0" << std::oct << m_perms << std::dec
1033  << ") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
1034  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
1035  addConnections(ex);
1036  std::stringstream ss; ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
1037  ex.addAdditionalInfo(ss.str());
1038  std::stringstream ss2; ss2 << "Quality source 1 " << q1-5 << ", quality source 2: " << q2-5;
1039  ex.addAdditionalInfo(ss2.str());
1040  throw ex;
1041  }
1042  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
1043  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
1044  }
1045  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
1046  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
1047 
1048  IOSize size1 = validateList(req1);
1049  IOSize size2 = validateList(req2);
1050 
1051  assert(size_orig == size1 + size2);
1052 
1053  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig << " bytes) split into requests size " << req1.size() << " (" << size1 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
1054 }
#define XRD_CL_MAX_CHUNK
double q2[4]
Definition: TauolaWrapper.h:88
#define XRD_ADAPTOR_CHUNK_THRESHOLD
void addConnections(cms::Exception &) const
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
XrdCl::OpenFlags::Flags m_flags
double q1[4]
Definition: TauolaWrapper.h:87
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
XrdCl::Access::Mode m_perms
static IOSize validateList(const std::vector< IOPosBuffer > req)
size_t IOSize
Definition: IOTypes.h:14
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void RequestManager::updateCurrentServer ( )
inlineprivate

Update the StatisticsSenderService, if necessary, with the current server.

Update the StatisticsSenderService with the current server info.

As this accesses the edm::Service infrastructure, this MUST be called from an edm-managed thread. It CANNOT be called from an Xrootd-managed thread.

Definition at line 259 of file XrdRequestManager.cc.

References edm::Service< T >::isAvailable(), likely, m_serverToAdvertise, edm::storage::StatisticsSenderService::setCurrentServer(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by handle(), and initialize().

260 {
261  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
262  // a pending update. *However*, since we call this for every read, we'll get it
263  // eventually.
264  if (likely(!m_serverToAdvertise.load(std::memory_order_relaxed))) {return;}
265  std::string *hostname_ptr;
266  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr)))
267  {
268  std::unique_ptr<std::string> hostname(hostname_ptr);
270  if (statsService.isAvailable()) {
271  statsService->setCurrentServer(*hostname_ptr);
272  }
273  }
274 }
void setCurrentServer(const std::string &servername)
#define likely(x)
std::atomic< std::string * > m_serverToAdvertise
bool isAvailable() const
Definition: Service.h:46

Member Data Documentation

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 212 of file XrdRequestManager.h.

Referenced by getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), handle(), handleOpen(), initialize(), pickSingleSource(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 216 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 217 of file XrdRequestManager.h.

Referenced by requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 215 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 237 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 239 of file XrdRequestManager.h.

Referenced by handleOpen().

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private
std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 236 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 213 of file XrdRequestManager.h.

Referenced by handle(), handleOpen(), and prepareOpaqueString().

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 223 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

const std::string XrdAdaptor::RequestManager::m_name
private
timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 228 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), handleOpen(), and initialize().

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 226 of file XrdRequestManager.h.

Referenced by pickSingleSource().

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 297 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private
std::atomic<std::string*> XrdAdaptor::RequestManager::m_serverToAdvertise
private

Definition at line 221 of file XrdRequestManager.h.

Referenced by queueUpdateCurrentServer(), and updateCurrentServer().

std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
mutableprivate
int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 224 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 229 of file XrdRequestManager.h.

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3*60
static

Definition at line 56 of file XrdRequestManager.h.