CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Inheritance diagram for XrdAdaptor::RequestManager:

Classes

class  OpenHandler
 

Public Member Functions

void addConnections (cms::Exception &) const
 
std::shared_ptr< XrdCl::File > getActiveFile () const
 
void getActiveSourceNames (std::vector< std::string > &sources) const
 
void getDisabledSourceNames (std::vector< std::string > &sources) const
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources) const
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer > > iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
virtual ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr< RequestManagergetInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3*60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
void checkSourcesImpl (timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
 
bool compareSources (const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString () const
 
void queueUpdateCurrentServer (const std::string &)
 
void reportSiteChange (std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
 
void updateCurrentServer ()
 

Private Attributes

std::vector< std::shared_ptr< Source > > m_activeSources
 
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
 
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHashm_disabledSources
 
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
 
std::uniform_real_distribution< float > m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr< Source > > m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
std::atomic< std::string * > m_serverToAdvertise
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 53 of file XrdRequestManager.h.

Constructor & Destructor Documentation

virtual XrdAdaptor::RequestManager::~RequestManager ( )
virtualdefault
RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 116 of file XrdRequestManager.cc.

117  : m_serverToAdvertise(nullptr),
120  m_name(filename),
121  m_flags(flags),
122  m_perms(perms),
123  m_distribution(0,100),
125 {
126 }
std::uniform_real_distribution< float > m_distribution
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const unsigned int XRD_DEFAULT_TIMEOUT
std::atomic< std::string * > m_serverToAdvertise
XrdCl::OpenFlags::Flags m_flags
XrdCl::Access::Mode m_perms
std::atomic< unsigned > m_excluded_active_count

Member Function Documentation

void RequestManager::addConnections ( cms::Exception ex) const

Add the list of active connections to the exception extra info.

Definition at line 511 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), and source.

Referenced by getActiveFile(), handle(), initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), pickSingleSource(), requestFailure(), and splitClientRequest().

512 {
513  std::vector<std::string> sources;
515  for (auto const& source : sources)
516  {
517  ex.addAdditionalInfo("Active source: " + source);
518  }
519  sources.clear();
520  getDisabledSourceNames(sources);
521  for (auto const& source : sources)
522  {
523  ex.addAdditionalInfo("Disabled source: " + source);
524  }
525 }
void getDisabledSourceNames(std::vector< std::string > &sources) const
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
static std::string const source
Definition: EdmProvDump.cc:43
void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 325 of file XrdRequestManager.cc.

References checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, and timeDiffMS().

Referenced by handle().

328 {
329  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check "
330  << timeDiffMS(now, m_lastSourceCheck) << "; last check "
331  << m_lastSourceCheck.tv_sec << "; now " <<now.tv_sec
332  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
333  if (timeDiffMS(now, m_lastSourceCheck) > 1000)
334  {
335  { // Be more aggressive about getting rid of very bad sources.
336  compareSources(now, 0, 1, activeSources, inactiveSources);
337  compareSources(now, 1, 0, activeSources, inactiveSources);
338  }
340  {
341  checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
342  }
343  }
344 }
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
)
private

Definition at line 372 of file XrdRequestManager.cc.

References compareSources(), m_distribution, m_generator, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, eostools::move(), cmsPerfSuiteHarvest::now, alignCSCRings::r, reportSiteChange(), indexGen::s2, source, timeDiffMS(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

376 {
377 
378  bool findNewSource = false;
379  if (activeSources.size() <= 1)
380  {
381  findNewSource = true;
382  }
383  else if (activeSources.size() > 1)
384  {
385  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality() << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
386  findNewSource |= compareSources(now, 0, 1, activeSources,inactiveSources);
387  findNewSource |= compareSources(now, 1, 0,activeSources, inactiveSources);
388 
389  // NOTE: We could probably replace the copy with a better sort function.
390  // However, there are typically very few sources and the correctness is more obvious right now.
391  std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(inactiveSources.size());
392  for (const auto & source : inactiveSources)
393  {
394  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY-1)*1000) {eligibleInactiveSources.push_back(source);}
395  }
396  auto bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
397  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
398  auto worstActiveSource = std::max_element(activeSources.cbegin(), activeSources.cend(),
399  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
400  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
401  {
402  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " <<(*bestInactiveSource)->PrettyID()
403  << ", quality " << (*bestInactiveSource)->getQuality();
404  }
405  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " <<(*worstActiveSource)->PrettyID()
406  << ", quality " << (*worstActiveSource)->getQuality();
407  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
408  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
409  if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*activeSources[0]->getQuality()))
410  {
411  auto oldSources = activeSources;
412  activeSources.push_back(*bestInactiveSource);
413  reportSiteChange(oldSources, activeSources);
414  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {inactiveSources.erase(it); break;}
415  }
416  else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
417  {
418  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << (*worstActiveSource)->PrettyID()
419  << " from active sources due to quality (" << (*worstActiveSource)->getQuality()
420  << ") and promoting " << (*bestInactiveSource)->PrettyID() << " (quality: "
421  << (*bestInactiveSource)->getQuality() << ")" << std::endl;
422  (*worstActiveSource)->setLastDowngrade(now);
423  for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {inactiveSources.erase(it); break;}
424  inactiveSources.emplace_back(std::move(*worstActiveSource));
425  auto oldSources = activeSources;
426  activeSources.erase(worstActiveSource);
427  activeSources.emplace_back(std::move(*bestInactiveSource));
428  reportSiteChange(oldSources, activeSources);
429  eligibleInactiveSources.clear();
430  for (const auto & source : inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source);
431  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
432  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
433  worstActiveSource = std::max_element(activeSources.begin(), activeSources.end(),
434  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
435  }
436  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000*XRD_ADAPTOR_LONG_OPEN_DELAY))
437  {
438  float r = m_distribution(m_generator);
440  {
441  findNewSource = true;
442  }
443  }
444  }
445  if (findNewSource)
446  {
447  m_open_handler->open();
449  }
450 
451  // Only aggressively look for new sources if we don't have two.
452  if (activeSources.size() == 2)
453  {
455  }
456  else
457  {
459  }
461 }
std::uniform_real_distribution< float > m_distribution
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
long long timeDiffMS(const timespec &a, const timespec &b)
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_ptr< OpenHandler > m_open_handler
#define XRD_ADAPTOR_LONG_OPEN_DELAY
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static std::string const source
Definition: EdmProvDump.cc:43
def move(src, dest)
Definition: eostools.py:510
bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b,
std::vector< std::shared_ptr< Source >> &  activeSources,
std::vector< std::shared_ptr< Source >> &  inactiveSources 
) const
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 348 of file XrdRequestManager.cc.

References a, b, SiStripPI::max, and reportSiteChange().

Referenced by checkSources(), and checkSourcesImpl().

351 {
352  if (activeSources.size() < std::max(a, b)+1) {return false;}
353 
354  bool findNewSource = false;
355  if ((activeSources[a]->getQuality() > 5130) ||
356  ((activeSources[a]->getQuality() > 260) && (activeSources[b]->getQuality()*4 < activeSources[a]->getQuality())))
357  {
358  edm::LogVerbatim("XrdAdaptorInternal") << "Removing "
359  << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
360  << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
361  if (activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource = true;}
362  activeSources[a]->setLastDowngrade(now);
363  inactiveSources.emplace_back(activeSources[a]);
364  auto oldSources = activeSources;
365  activeSources.erase(activeSources.begin()+a);
366  reportSiteChange(oldSources,activeSources);
367  }
368  return findNewSource;
369 }
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  ) const

Return a pointer to an active file. Useful for metadata operations.

Definition at line 464 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_perms, and m_source_mutex.

465 {
466  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
467  if (m_activeSources.empty())
468  {
470  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name
471  << "', flags=0x" << std::hex << m_flags
472  << ", permissions=0" << std::oct << m_perms << std::dec
473  << ") => Source used after fatal exception.";
474  ex.addContext("In XrdAdaptor::RequestManager::handle()");
475  addConnections(ex);
476  throw ex;
477  }
478  return m_activeSources[0]->getFileHandle();
479 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
std::recursive_mutex m_source_mutex
void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 482 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

483 {
484  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
485  sources.reserve(m_activeSources.size());
486  for (auto const& source : m_activeSources) {
487  sources.push_back(source->ID());
488  }
489 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources) const

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 502 of file XrdRequestManager.cc.

References m_disabledSourceStrings, and source.

Referenced by addConnections().

503 {
504  sources.reserve(m_disabledSourceStrings.size());
505  for (auto const& source : m_disabledSourceStrings) {
506  sources.push_back(source);
507  }
508 }
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static std::string const source
Definition: EdmProvDump.cc:43
const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 111 of file XrdRequestManager.h.

111 {return m_name;}
static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 121 of file XrdRequestManager.h.

References a, b, corrVsCorr::filename, flags, initialize(), instance, cmsPerfSuiteHarvest::now, mps_update::status, and AlCaHLTBitMon_QueryRunRegistry::string.

122  {
123  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
124  instance->initialize(instance);
125  return instance;
126  }
static PFTauRenderPlugin instance
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources) const

Definition at line 492 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

Referenced by addConnections().

493 {
494  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
495  sources.reserve(m_activeSources.size());
496  for (auto const& source : m_activeSources) {
497  sources.push_back(source->PrettyID());
498  }
499 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 63 of file XrdRequestManager.h.

References cmsBatch::handle, findQualityFiles::size, and btagGenBb_cfi::Status.

64  {
65  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
66  return handle(c_ptr);
67  }
size
Write out results.
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::future< IOSize > XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer > >  iolist)

Definition at line 676 of file XrdRequestManager.cc.

References a, addConnections(), cms::Exception::addContext(), b, checkSources(), TauDecayModes::dec, edm::errors::FileReadError, GET_CLOCK_MONOTONIC, m_activeSources, m_flags, m_inactiveSources, m_name, m_perms, m_source_mutex, eostools::move(), cmsPerfSuiteHarvest::now, AlCaHLTBitMon_ParallelJobs::p, splitClientRequest(), edm::CPUTimer::start(), edm::CPUTimer::stop(), and updateCurrentServer().

677 {
678  //Use a copy of m_activeSources and m_inactiveSources throughout this function
679  // in order to avoid holding the lock a long time and causing a deadlock.
680  // When the function is over we will update the values of the containers
681  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
682  {
683  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
684  activeSources = m_activeSources;
685  inactiveSources = m_inactiveSources;
686  }
687  //Make sure we update changes when we leave the function
688  std::shared_ptr<void*> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
689  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
690  m_activeSources = std::move(activeSources);
691  m_inactiveSources = std::move(inactiveSources);
692  });
693 
695 
696  timespec now;
697  GET_CLOCK_MONOTONIC(now);
698 
699  edm::CPUTimer timer;
700  timer.start();
701 
702  if (activeSources.size() == 1)
703  {
704  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
705  checkSources(now, c_ptr->getSize(), activeSources,inactiveSources);
706  activeSources[0]->handle(c_ptr);
707  return c_ptr->get_future();
708  }
709  // Make sure active
710  else if (activeSources.empty())
711  {
713  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name
714  << "', flags=0x" << std::hex << m_flags
715  << ", permissions=0" << std::oct << m_perms << std::dec
716  << ") => Source used after fatal exception.";
717  ex.addContext("In XrdAdaptor::RequestManager::handle()");
718  addConnections(ex);
719  throw ex;
720  }
721 
722  assert(iolist.get());
723  auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
724  auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
725  splitClientRequest(*iolist, *req1, *req2, activeSources);
726 
727  checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
728  // CheckSources may have removed a source
729  if (activeSources.size() == 1)
730  {
731  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
732  activeSources[0]->handle(c_ptr);
733  return c_ptr->get_future();
734  }
735 
736  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
737  std::future<IOSize> future1, future2;
738  if (!req1->empty())
739  {
740  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
741  activeSources[0]->handle(c_ptr1);
742  future1 = c_ptr1->get_future();
743  }
744  if (!req2->empty())
745  {
746  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
747  activeSources[1]->handle(c_ptr2);
748  future2 = c_ptr2->get_future();
749  }
750  if (!req1->empty() && !req2->empty())
751  {
752  std::future<IOSize> task = std::async(std::launch::deferred,
753  [](std::future<IOSize> a, std::future<IOSize> b){
754  // Wait until *both* results are available. This is essential
755  // as the callback may try referencing the RequestManager. If one
756  // throws an exception (causing the RequestManager to be destroyed by
757  // XrdFile) and the other has a failure, then the recovery code will
758  // reference the destroyed RequestManager.
759  //
760  // Unlike other places where we use shared/weak ptrs to maintain object
761  // lifetime and destruction asynchronously, we *cannot* destroy the request
762  // asynchronously as it is associated with a ROOT buffer. We must wait until we
763  // are guaranteed that XrdCl will not write into the ROOT buffer before we
764  // can return.
765  b.wait(); a.wait();
766  return b.get() + a.get();
767  },
768  std::move(future1),
769  std::move(future2));
770  timer.stop();
771  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
772  return task;
773  }
774  else if (!req1->empty()) { return future1; }
775  else if (!req2->empty()) { return future2; }
776  else
777  { // Degenerate case - no bytes to read.
778  std::promise<IOSize> p; p.set_value(0);
779  return p.get_future();
780  }
781 }
void start()
Definition: CPUTimer.cc:74
#define GET_CLOCK_MONOTONIC(ts)
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
std::vector< std::shared_ptr< Source > > m_inactiveSources
Times stop()
Definition: CPUTimer.cc:94
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
def move(src, dest)
Definition: eostools.py:510
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 566 of file XrdRequestManager.cc.

References checkSources(), GET_CLOCK_MONOTONIC, m_activeSources, m_inactiveSources, m_source_mutex, eostools::move(), cmsPerfSuiteHarvest::now, pickSingleSource(), source, and AlCaHLTBitMon_QueryRunRegistry::string.

567 {
568  assert(c_ptr.get());
569  timespec now;
570  GET_CLOCK_MONOTONIC(now);
571  //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
572  std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
573  {
574  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
575  activeSources = m_activeSources;
576  inactiveSources = m_inactiveSources;
577  }
578  {
579  //make sure we update values before calling pickSingelSource
580  std::shared_ptr<void*> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
581  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
582  m_activeSources = std::move(activeSources);
583  m_inactiveSources = std::move(inactiveSources);
584  });
585 
586  checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
587  }
588 
589  std::shared_ptr<Source> source = pickSingleSource();
590  source->handle(c_ptr);
591  return c_ptr->get_future();
592 }
#define GET_CLOCK_MONOTONIC(ts)
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
std::shared_ptr< Source > pickSingleSource()
static std::string const source
Definition: EdmProvDump.cc:43
def move(src, dest)
Definition: eostools.py:510
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 628 of file XrdRequestManager.cc.

References m_activeSources, m_excluded_active_count, m_inactiveSources, m_nextActiveSourceCheck, m_source_mutex, queueUpdateCurrentServer(), reportSiteChange(), alignCSCRings::s, XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

629 {
630  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
631  if (status.IsOK())
632  {
633  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
634  for (const auto & s : m_activeSources)
635  {
636  if (source->ID() == s->ID())
637  {
638  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded source " << source->PrettyID()
639  << "; ignoring" << std::endl;
640  unsigned returned_count = ++m_excluded_active_count;
643  return;
644  }
645  }
646  for (const auto & s : m_inactiveSources)
647  {
648  if (source->ID() == s->ID())
649  {
650  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded inactive source " << source->PrettyID()
651  << "; ignoring" << std::endl;
653  return;
654  }
655  }
656  if (m_activeSources.size() < 2)
657  {
658  auto oldSources = m_activeSources;
659  m_activeSources.push_back(source);
660  reportSiteChange(oldSources, m_activeSources);
662  }
663  else
664  {
665  m_inactiveSources.push_back(source);
666  }
667  }
668  else
669  { // File-open failure - wait at least 120s before next attempt.
670  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
672  }
673 }
void queueUpdateCurrentServer(const std::string &)
std::vector< std::shared_ptr< Source > > m_inactiveSources
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
std::atomic< unsigned > m_excluded_active_count
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 130 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, FrontierConditions_GlobalTag_cff::file, ecalTB2006H4_GenSimDigiReco_cfg::File, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), training_settings::idx, m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, eostools::move(), pileupDistInMC::oldList, prepareOpaqueString(), queueUpdateCurrentServer(), reportSiteChange(), SendMonitoringInfo(), source, mps_update::status, AlCaHLTBitMon_QueryRunRegistry::string, updateCurrentServer(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

131 {
133 
134  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
135  if (env) {env->GetInt("StreamErrorWindow", m_timeout);}
136 
137  std::string orig_site;
138  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos))
139  {
140  std::string hostname;
141  if (Source::getHostname(orig_site, hostname))
142  {
143  Source::getDomain(hostname, orig_site);
144  }
145  }
146 
147  std::unique_ptr<XrdCl::File> file;
149  bool validFile = false;
150  const int retries = 5;
151  std::string excludeString;
152  for (int idx=0; idx<retries; idx++)
153  {
154  file.reset(new XrdCl::File());
155  auto opaque = prepareOpaqueString();
156  std::string new_filename = m_name + (!opaque.empty() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
157  SyncHostResponseHandler handler;
158  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
159  if (!openStatus.IsOK())
160  { // In this case, we failed immediately - this indicates we have previously tried to talk to this
161  // server and it was marked bad - xrootd couldn't even queue up the request internally!
162  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
163  // redirector being down or authentication failures.
164  ex.clearMessage();
165  ex.clearContext();
166  ex.clearAdditionalInfo();
167  ex << "XrdCl::File::Open(name='" << m_name
168  << "', flags=0x" << std::hex << m_flags
169  << ", permissions=0" << std::oct << m_perms << std::dec
170  << ") => error '" << openStatus.ToStr()
171  << "' (errno=" << openStatus.errNo << ", code=" << openStatus.code << ")";
172  ex.addContext("Calling XrdFile::open()");
173  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
174  throw ex;
175  }
176  handler.WaitForResponse();
177  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
178  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
179  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
180  assert(status);
181  if (status->IsOK())
182  {
183  validFile = true;
184  break;
185  }
186  else
187  {
188  ex.clearMessage();
189  ex.clearContext();
190  ex.clearAdditionalInfo();
191  ex << "XrdCl::File::Open(name='" << m_name
192  << "', flags=0x" << std::hex << m_flags
193  << ", permissions=0" << std::oct << m_perms << std::dec
194  << ") => error '" << status->ToStr()
195  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
196  ex.addContext("Calling XrdFile::open()");
197  addConnections(ex);
198  std::string dataServer, lastUrl;
199  file->GetProperty("DataServer", dataServer);
200  file->GetProperty("LastURL", lastUrl);
201  if (!dataServer.empty())
202  {
203  ex.addAdditionalInfo("Problematic data server: " + dataServer);
204  }
205  if (!lastUrl.empty())
206  {
207  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
208  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
209  }
210  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) != m_disabledSourceStrings.end())
211  {
212  ex << ". No additional data servers were found.";
213  throw ex;
214  }
215  if (!dataServer.empty())
216  {
217  m_disabledSourceStrings.insert(dataServer);
218  m_disabledExcludeStrings.insert(excludeString);
219  }
220  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
221  if (lastUrl == new_filename)
222  {
223  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
224  throw ex;
225  }
226  }
227  }
228  if (!validFile)
229  {
230  throw ex;
231  }
232  SendMonitoringInfo(*file);
233 
234  timespec ts;
236 
237  auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
238  {
239  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
240  auto oldList = m_activeSources;
241  m_activeSources.push_back(source);
243  }
246 
247  m_lastSourceCheck = ts;
248  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
250 }
#define GET_CLOCK_MONOTONIC(ts)
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:307
std::string prepareOpaqueString() const
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
static void SendMonitoringInfo(XrdCl::File &file)
void addConnections(cms::Exception &) const
void queueUpdateCurrentServer(const std::string &)
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:255
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:344
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
static std::string const source
Definition: EdmProvDump.cc:43
def move(src, dest)
Definition: eostools.py:510
std::recursive_mutex m_source_mutex
std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 528 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_nextInitialSourceToggle, m_perms, m_source_mutex, and source.

Referenced by handle().

529 {
530  std::shared_ptr<Source> source = nullptr;
531  {
532  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
533  if (m_activeSources.size() == 2)
534  {
536  {
537  source = m_activeSources[0];
539  }
540  else
541  {
542  source = m_activeSources[1];
544  }
545  }
546  else if (m_activeSources.empty())
547  {
549  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name
550  << "', flags=0x" << std::hex << m_flags
551  << ", permissions=0" << std::oct << m_perms << std::dec
552  << ") => Source used after fatal exception.";
553  ex.addContext("In XrdAdaptor::RequestManager::handle()");
554  addConnections(ex);
555  throw ex;
556  }
557  else
558  {
559  source = m_activeSources[0];
560  }
561  }
562  return source;
563 }
void addConnections(cms::Exception &) const
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static std::string const source
Definition: EdmProvDump.cc:43
std::recursive_mutex m_source_mutex
std::string RequestManager::prepareOpaqueString ( ) const
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 595 of file XrdRequestManager.cc.

References KineDebug3::count(), m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, AlCaHLTBitMon_QueryRunRegistry::string, and create_public_lumi_plots::tmp_str.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

596 {
597  std::stringstream ss;
598  ss << "tried=";
599  size_t count = 0;
600  {
601  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
602 
603  for ( const auto & it : m_activeSources )
604  {
605  count++;
606  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
607  }
608  for ( const auto & it : m_inactiveSources )
609  {
610  count++;
611  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
612  }
613  }
614  for ( const auto & it : m_disabledExcludeStrings )
615  {
616  count++;
617  ss << it.substr(0, it.find(":")) << ",";
618  }
619  if (count)
620  {
621  std::string tmp_str = ss.str();
622  return tmp_str.substr(0, tmp_str.size()-1);
623  }
624  return "";
625 }
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::recursive_mutex m_source_mutex
void RequestManager::queueUpdateCurrentServer ( const std::string &  id)
private

Definition at line 279 of file XrdRequestManager.cc.

References XrdAdaptor::Source::getHostname(), triggerObjects_cff::id, m_serverToAdvertise, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by handleOpen(), and initialize().

280 {
281  auto hostname = std::make_unique<std::string>(id);
282  if (Source::getHostname(id, *hostname))
283  {
284  std::string *null_hostname = nullptr;
285  if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get()))
286  {
287  hostname.release();
288  }
289  }
290 }
std::atomic< std::string * > m_serverToAdvertise
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:227
void RequestManager::reportSiteChange ( std::vector< std::shared_ptr< Source > > const &  iOld,
std::vector< std::shared_ptr< Source > > const &  iNew,
std::string  orig_site = std::string{} 
) const
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 304 of file XrdRequestManager.cc.

Referenced by checkSourcesImpl(), compareSources(), handleOpen(), initialize(), and requestFailure().

307 {
308  auto siteList = formatSites(iNew);
309  if (!orig_site.empty() && (orig_site != siteList))
310  {
311  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
312  }
313  else {
314  auto oldSites = formatSites(iOld);
315  if (orig_site.empty() && (siteList != oldSites))
316  {
317  if (!oldSites.empty() )
318  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
319  }
320  }
321 }
void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 784 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, cmsPerfSuiteHarvest::now, reportSiteChange(), seconds(), mps_update::status, and mps_check::timeout.

785 {
786  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
787 
788  // Fail early for invalid responses - XrdFile has a separate path for handling this.
789  if (c_status.code == XrdCl::errInvalidResponse)
790  {
791  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
793  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name
794  << "', flags=0x" << std::hex << m_flags
795  << ", permissions=0" << std::oct << m_perms << std::dec
796  << ", old source=" << source_ptr->PrettyID()
797  << ") => Invalid ReadV response from server";
798  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
799  addConnections(ex);
800  throw ex;
801  }
802  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
803 
804  // Note that we do not delete the Source itself. That is because this
805  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
806  // In such a case, if you close a file in the handler, it will deadlock
807  m_disabledSourceStrings.insert(source_ptr->ID());
808  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
809  m_disabledSources.insert(source_ptr);
810 
811  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
812  if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get()))
813  {
814  auto oldSources = m_activeSources;
815  m_activeSources.erase(m_activeSources.begin());
816  reportSiteChange(oldSources, m_activeSources);
817  }
818  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
819  {
820  auto oldSources = m_activeSources;
821  m_activeSources.erase(m_activeSources.begin()+1);
822  reportSiteChange(oldSources, m_activeSources);
823  }
824  std::shared_ptr<Source> new_source;
825  if (m_activeSources.empty())
826  {
827  std::shared_future<std::shared_ptr<Source> > future = m_open_handler->open();
828  timespec now;
829  GET_CLOCK_MONOTONIC(now);
831  // Note we only wait for 180 seconds here. This is because we've already failed
832  // once and the likelihood the program has some inconsistent state is decent.
833  // We'd much rather fail hard than deadlock!
834  sentry.unlock();
835  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout+10));
836  if (status == std::future_status::timeout)
837  {
839  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
840  << "', flags=0x" << std::hex << m_flags
841  << ", permissions=0" << std::oct << m_perms << std::dec
842  << ", old source=" << source_ptr->PrettyID()
843  << ") => timeout when waiting for file open";
844  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
845  addConnections(ex);
846  throw ex;
847  }
848  else
849  {
850  try
851  {
852  new_source = future.get();
853  }
854  catch (edm::Exception &ex)
855  {
856  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
857  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
858  throw;
859  }
860  }
861 
862  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) != m_disabledSourceStrings.end())
863  {
864  // The server gave us back a data node we requested excluded. Fatal!
866  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
867  << "', flags=0x" << std::hex << m_flags
868  << ", permissions=0" << std::oct << m_perms << std::dec
869  << ", old source=" << source_ptr->PrettyID()
870  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
871  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
872  addConnections(ex);
873  throw ex;
874  }
875  sentry.lock();
876 
877  auto oldSources = m_activeSources;
878  m_activeSources.push_back(new_source);
879  reportSiteChange(oldSources,m_activeSources);
880  }
881  else
882  {
883  new_source = m_activeSources[0];
884  }
885  new_source->handle(c_ptr);
886 }
double seconds()
#define GET_CLOCK_MONOTONIC(ts)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
int timeout
Definition: mps_check.py:51
void addConnections(cms::Exception &) const
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
Definition: Exception.cc:227
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::recursive_mutex m_source_mutex
void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2,
std::vector< std::shared_ptr< Source >> const &  activeSources 
) const
private

Given a client request, split it into two requests lists.

Definition at line 1003 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), consumeChunkBack(), consumeChunkFront(), TauDecayModes::dec, edm::errors::FileReadError, m_flags, m_name, m_perms, SiStripPI::max, IOPosBuffer::offset(), q1, q2, validateList(), XRD_ADAPTOR_CHUNK_THRESHOLD, and XRD_CL_MAX_CHUNK.

Referenced by handle().

1004 {
1005  if (iolist.empty()) return;
1006  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
1007  req1.reserve(iolist.size()/2+1);
1008  req2.reserve(iolist.size()/2+1);
1009  size_t front=0;
1010 
1011  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
1012  float q1 = static_cast<float>(activeSources[0]->getQuality())+5;
1013  float q2 = static_cast<float>(activeSources[1]->getQuality())+5;
1014  IOSize chunk1, chunk2;
1015  // Make sure the chunk size is at least 1024; little point to reads less than that size.
1016  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
1017  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
1018 
1019  IOSize size_orig = 0;
1020  for (const auto & it : iolist) size_orig += it.size();
1021 
1022  while (tmp_iolist.size()-front > 0)
1023  {
1024  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) && (req2.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD))
1025  { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
1026  // are passed to the request manager. However, because we have a max chunk size, we increase
1027  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
1028  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
1029  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
1031  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name
1032  << "', flags=0x" << std::hex << m_flags
1033  << ", permissions=0" << std::oct << m_perms << std::dec
1034  << ") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
1035  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
1036  addConnections(ex);
1037  std::stringstream ss; ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
1038  ex.addAdditionalInfo(ss.str());
1039  std::stringstream ss2; ss2 << "Quality source 1 " << q1-5 << ", quality source 2: " << q2-5;
1040  ex.addAdditionalInfo(ss2.str());
1041  throw ex;
1042  }
1043  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
1044  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
1045  }
1046  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
1047  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
1048 
1049  IOSize size1 = validateList(req1);
1050  IOSize size2 = validateList(req2);
1051 
1052  assert(size_orig == size1 + size2);
1053 
1054  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig << " bytes) split into requests size " << req1.size() << " (" << size1 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
1055 }
#define XRD_CL_MAX_CHUNK
double q2[4]
Definition: TauolaWrapper.h:88
#define XRD_ADAPTOR_CHUNK_THRESHOLD
void addConnections(cms::Exception &) const
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
XrdCl::OpenFlags::Flags m_flags
double q1[4]
Definition: TauolaWrapper.h:87
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
XrdCl::Access::Mode m_perms
static IOSize validateList(const std::vector< IOPosBuffer > req)
size_t IOSize
Definition: IOTypes.h:14
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void RequestManager::updateCurrentServer ( )
inlineprivate

Update the StatisticsSenderService, if necessary, with the current server.

Update the StatisticsSenderService with the current server info.

As this accesses the edm::Service infrastructure, this MUST be called from an edm-managed thread. It CANNOT be called from an Xrootd-managed thread.

Definition at line 260 of file XrdRequestManager.cc.

References edm::Service< T >::isAvailable(), likely, m_serverToAdvertise, edm::storage::StatisticsSenderService::setCurrentServer(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by handle(), and initialize().

261 {
262  // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
263  // a pending update. *However*, since we call this for every read, we'll get it
264  // eventually.
265  if (likely(!m_serverToAdvertise.load(std::memory_order_relaxed))) {return;}
266  std::string *hostname_ptr;
267  if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr)))
268  {
269  std::unique_ptr<std::string> hostname(hostname_ptr);
271  if (statsService.isAvailable()) {
272  statsService->setCurrentServer(*hostname_ptr);
273  }
274  }
275 }
void setCurrentServer(const std::string &servername)
#define likely(x)
std::atomic< std::string * > m_serverToAdvertise
bool isAvailable() const
Definition: Service.h:46

Member Data Documentation

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 212 of file XrdRequestManager.h.

Referenced by getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), handle(), handleOpen(), initialize(), pickSingleSource(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 216 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 217 of file XrdRequestManager.h.

Referenced by requestFailure().

tbb::concurrent_unordered_set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 215 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 237 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 239 of file XrdRequestManager.h.

Referenced by handleOpen().

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private
std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 236 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 213 of file XrdRequestManager.h.

Referenced by handle(), handleOpen(), and prepareOpaqueString().

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 223 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

const std::string XrdAdaptor::RequestManager::m_name
private
timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 228 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), handleOpen(), and initialize().

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 226 of file XrdRequestManager.h.

Referenced by pickSingleSource().

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 297 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private
std::atomic<std::string*> XrdAdaptor::RequestManager::m_serverToAdvertise
private

Definition at line 221 of file XrdRequestManager.h.

Referenced by queueUpdateCurrentServer(), and updateCurrentServer().

std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
mutableprivate
int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 224 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 229 of file XrdRequestManager.h.

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3*60
static

Definition at line 56 of file XrdRequestManager.h.