CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes
XrdAdaptor::RequestManager Class Reference

#include <XrdRequestManager.h>

Inheritance diagram for XrdAdaptor::RequestManager:

Classes

class  OpenHandler
 

Public Member Functions

void addConnections (cms::Exception &)
 
std::shared_ptr< XrdCl::File > getActiveFile ()
 
void getActiveSourceNames (std::vector< std::string > &sources)
 
void getDisabledSourceNames (std::vector< std::string > &sources)
 
const std::string & getFilename () const
 
void getPrettyActiveSourceNames (std::vector< std::string > &sources)
 
std::future< IOSizehandle (void *into, IOSize size, IOOffset off)
 
std::future< IOSizehandle (std::shared_ptr< std::vector< IOPosBuffer > > iolist)
 
std::future< IOSizehandle (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr)
 
void requestFailure (std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
 
 ~RequestManager ()=default
 

Static Public Member Functions

static std::shared_ptr
< RequestManager
getInstance (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 

Static Public Attributes

static const unsigned int XRD_DEFAULT_TIMEOUT = 3*60
 

Private Member Functions

void broadcastRequest (const ClientRequest &, bool active)
 
void checkSources (timespec &now, IOSize requestSize)
 
void checkSourcesImpl (timespec &now, IOSize requestSize)
 
bool compareSources (const timespec &now, unsigned a, unsigned b)
 
virtual void handleOpen (XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
 
void initialize (std::weak_ptr< RequestManager > selfref)
 
std::shared_ptr< SourcepickSingleSource ()
 
std::string prepareOpaqueString ()
 
 RequestManager (const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
 
void splitClientRequest (const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2)
 
void updateSiteInfo (std::string orig_site="")
 

Private Attributes

std::string m_activeSites
 
std::vector< std::shared_ptr
< Source > > 
m_activeSources
 
std::set< std::string > m_disabledExcludeStrings
 
std::set< std::shared_ptr
< Source > > 
m_disabledSources
 
std::set< std::string > m_disabledSourceStrings
 
std::uniform_real_distribution
< float > 
m_distribution
 
std::atomic< unsigned > m_excluded_active_count
 
XrdCl::OpenFlags::Flags m_flags
 
std::mt19937 m_generator
 
std::vector< std::shared_ptr
< Source > > 
m_inactiveSources
 
timespec m_lastSourceCheck
 
const std::string m_name
 
timespec m_nextActiveSourceCheck
 
bool m_nextInitialSourceToggle
 
std::shared_ptr< OpenHandlerm_open_handler
 
XrdCl::Access::Mode m_perms
 
std::recursive_mutex m_source_mutex
 
int m_timeout
 
bool searchMode
 

Detailed Description

Definition at line 43 of file XrdRequestManager.h.

Constructor & Destructor Documentation

XrdAdaptor::RequestManager::~RequestManager ( )
default
RequestManager::RequestManager ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
private

Definition at line 106 of file XrdRequestManager.cc.

Referenced by getInstance().

109  m_name(filename),
110  m_flags(flags),
111  m_perms(perms),
112  m_distribution(0,100),
114 {
115 }
std::uniform_real_distribution< float > m_distribution
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
static const unsigned int XRD_DEFAULT_TIMEOUT
XrdCl::OpenFlags::Flags m_flags
XrdCl::Access::Mode m_perms
std::atomic< unsigned > m_excluded_active_count
tuple filename
Definition: lut2db_cfg.py:20

Member Function Documentation

void RequestManager::addConnections ( cms::Exception ex)

Add the list of active connections to the exception extra info.

Definition at line 440 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), getDisabledSourceNames(), getPrettyActiveSourceNames(), and source.

Referenced by getActiveFile(), initialize(), XrdAdaptor::RequestManager::OpenHandler::open(), pickSingleSource(), and requestFailure().

441 {
442  std::vector<std::string> sources;
444  for (auto const& source : sources)
445  {
446  ex.addAdditionalInfo("Active source: " + source);
447  }
448  sources.clear();
449  getDisabledSourceNames(sources);
450  for (auto const& source : sources)
451  {
452  ex.addAdditionalInfo("Disabled source: " + source);
453  }
454 }
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
void getDisabledSourceNames(std::vector< std::string > &sources)
void getPrettyActiveSourceNames(std::vector< std::string > &sources)
static std::string const source
Definition: EdmProvDump.cc:42
void XrdAdaptor::RequestManager::broadcastRequest ( const ClientRequest ,
bool  active 
)
private

Given a request, broadcast it to all sources. If active is true, broadcast is made to all active sources. Otherwise, broadcast is made to the inactive sources.

void RequestManager::checkSources ( timespec &  now,
IOSize  requestSize 
)
private

Check our set of active sources. If necessary, this will kick off a search for a new source. The source check is somewhat expensive so it is only done once every second.

Definition at line 261 of file XrdRequestManager.cc.

References checkSourcesImpl(), compareSources(), m_lastSourceCheck, m_nextActiveSourceCheck, m_source_mutex, and timeDiffMS().

Referenced by handle().

262 {
263  edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check "
264  << timeDiffMS(now, m_lastSourceCheck) << "; last check "
265  << m_lastSourceCheck.tv_sec << "; now " <<now.tv_sec
266  << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
267  if (timeDiffMS(now, m_lastSourceCheck) > 1000)
268  {
269  { // Be more aggressive about getting rid of very bad sources.
270  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
271  compareSources(now, 0, 1);
272  compareSources(now, 1, 0);
273  }
275  {
276  checkSourcesImpl(now, requestSize);
277  }
278  }
279 }
bool compareSources(const timespec &now, unsigned a, unsigned b)
long long timeDiffMS(const timespec &a, const timespec &b)
void checkSourcesImpl(timespec &now, IOSize requestSize)
std::recursive_mutex m_source_mutex
void RequestManager::checkSourcesImpl ( timespec &  now,
IOSize  requestSize 
)
private

Definition at line 304 of file XrdRequestManager.cc.

References compareSources(), m_activeSources, m_distribution, m_generator, m_inactiveSources, m_lastSourceCheck, m_nextActiveSourceCheck, m_open_handler, m_source_mutex, fileCollector::now, alignCSCRings::r, indexGen::s2, source, timeDiffMS(), updateSiteInfo(), XRD_ADAPTOR_LONG_OPEN_DELAY, XRD_ADAPTOR_OPEN_PROBE_PERCENT, XRD_ADAPTOR_SHORT_OPEN_DELAY, and XRD_ADAPTOR_SOURCE_QUALITY_FUDGE.

Referenced by checkSources().

305 {
306  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
307 
308  bool findNewSource = false;
309  if (m_activeSources.size() <= 1)
310  {
311  findNewSource = true;
312  }
313  else if (m_activeSources.size() > 1)
314  {
315  edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << m_activeSources[0]->getQuality() << ", source 1 quality " << m_activeSources[1]->getQuality() << std::endl;
316  findNewSource |= compareSources(now, 0, 1);
317  findNewSource |= compareSources(now, 1, 0);
318 
319  // NOTE: We could probably replace the copy with a better sort function.
320  // However, there are typically very few sources and the correctness is more obvious right now.
321  std::vector<std::shared_ptr<Source> > eligibleInactiveSources; eligibleInactiveSources.reserve(m_inactiveSources.size());
322  for (const auto & source : m_inactiveSources)
323  {
324  if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY-1)*1000) {eligibleInactiveSources.push_back(source);}
325  }
326  std::vector<std::shared_ptr<Source> >::iterator bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
327  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
328  std::vector<std::shared_ptr<Source> >::iterator worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(),
329  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
330  if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get())
331  {
332  edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " <<(*bestInactiveSource)->PrettyID()
333  << ", quality " << (*bestInactiveSource)->getQuality();
334  }
335  edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " <<(*worstActiveSource)->PrettyID()
336  << ", quality " << (*worstActiveSource)->getQuality();
337  // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
338  // Regardless, we will want to re-evaluate the new source quickly (within 5s).
339  if ((bestInactiveSource != eligibleInactiveSources.end()) && m_activeSources.size() == 1 && ((*bestInactiveSource)->getQuality() < 4*m_activeSources[0]->getQuality()))
340  {
341  m_activeSources.push_back(*bestInactiveSource);
342  updateSiteInfo();
343  for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;}
344  }
345  else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+XRD_ADAPTOR_SOURCE_QUALITY_FUDGE)
346  {
347  edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << (*worstActiveSource)->PrettyID()
348  << " from active sources due to quality (" << (*worstActiveSource)->getQuality()
349  << ") and promoting " << (*bestInactiveSource)->PrettyID() << " (quality: "
350  << (*bestInactiveSource)->getQuality() << ")" << std::endl;
351  (*worstActiveSource)->setLastDowngrade(now);
352  for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;}
353  m_inactiveSources.emplace_back(std::move(*worstActiveSource));
354  m_activeSources.erase(worstActiveSource);
355  m_activeSources.emplace_back(std::move(*bestInactiveSource));
356  updateSiteInfo();
357  eligibleInactiveSources.clear();
358  for (const auto & source : m_inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source);
359  bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(),
360  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
361  worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(),
362  [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {return s1->getQuality() < s2->getQuality();});
363  }
364  if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000*XRD_ADAPTOR_LONG_OPEN_DELAY))
365  {
366  float r = m_distribution(m_generator);
368  {
369  findNewSource = true;
370  }
371  }
372  }
373  if (findNewSource)
374  {
375  m_open_handler->open();
377  }
378 
379  // Only aggressively look for new sources if we don't have two.
380  if (m_activeSources.size() == 2)
381  {
383  }
384  else
385  {
387  }
389 }
std::uniform_real_distribution< float > m_distribution
void updateSiteInfo(std::string orig_site="")
tuple s2
Definition: indexGen.py:106
bool compareSources(const timespec &now, unsigned a, unsigned b)
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE
long long timeDiffMS(const timespec &a, const timespec &b)
std::vector< std::shared_ptr< Source > > m_inactiveSources
#define XRD_ADAPTOR_OPEN_PROBE_PERCENT
std::shared_ptr< OpenHandler > m_open_handler
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
bool RequestManager::compareSources ( const timespec &  now,
unsigned  a,
unsigned  b 
)
private

Helper function for checkSources; compares the quality of source A versus source B; if source A is significantly worse, remove it from the list of active sources.

NOTE: assumes two sources are active and the caller must already hold m_source_mutex

Definition at line 283 of file XrdRequestManager.cc.

References a, b, m_activeSources, m_inactiveSources, bookConverter::max, and updateSiteInfo().

Referenced by checkSources(), and checkSourcesImpl().

284 {
285  if (m_activeSources.size() < std::max(a, b)+1) {return false;}
286 
287  bool findNewSource = false;
288  if ((m_activeSources[a]->getQuality() > 5130) ||
289  ((m_activeSources[a]->getQuality() > 260) && (m_activeSources[b]->getQuality()*4 < m_activeSources[a]->getQuality())))
290  {
291  edm::LogVerbatim("XrdAdaptorInternal") << "Removing "
292  << m_activeSources[a]->PrettyID() << " from active sources due to poor quality ("
293  << m_activeSources[a]->getQuality() << " vs " << m_activeSources[b]->getQuality() << ")" << std::endl;
294  if (m_activeSources[a]->getLastDowngrade().tv_sec != 0) {findNewSource = true;}
295  m_activeSources[a]->setLastDowngrade(now);
296  m_inactiveSources.emplace_back(m_activeSources[a]);
297  m_activeSources.erase(m_activeSources.begin()+a);
298  updateSiteInfo();
299  }
300  return findNewSource;
301 }
void updateSiteInfo(std::string orig_site="")
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::vector< std::shared_ptr< Source > > m_activeSources
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
std::shared_ptr< XrdCl::File > RequestManager::getActiveFile ( void  )

Return a pointer to an active file. Useful for metadata operations.

Definition at line 392 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_perms, and m_source_mutex.

393 {
394  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
395  if (m_activeSources.empty())
396  {
398  ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name
399  << "', flags=0x" << std::hex << m_flags
400  << ", permissions=0" << std::oct << m_perms << std::dec
401  << ") => Source used after fatal exception.";
402  ex.addContext("In XrdAdaptor::RequestManager::handle()");
403  addConnections(ex);
404  throw ex;
405  }
406  return m_activeSources[0]->getFileHandle();
407 }
void addConnections(cms::Exception &)
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
std::recursive_mutex m_source_mutex
void RequestManager::getActiveSourceNames ( std::vector< std::string > &  sources)

Retrieve the names of the active sources (primarily meant to enable meaningful log messages).

Definition at line 410 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

411 {
412  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
413  sources.reserve(m_activeSources.size());
414  for (auto const& source : m_activeSources) {
415  sources.push_back(source->ID());
416  }
417 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
void RequestManager::getDisabledSourceNames ( std::vector< std::string > &  sources)

Retrieve the names of the disabled sources (primarily meant to enable meaningful log messages).

Definition at line 430 of file XrdRequestManager.cc.

References m_disabledSourceStrings, m_source_mutex, and source.

Referenced by addConnections().

431 {
432  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
433  sources.reserve(m_disabledSourceStrings.size());
434  for (auto const& source : m_disabledSourceStrings) {
435  sources.push_back(source);
436  }
437 }
std::set< std::string > m_disabledSourceStrings
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
const std::string& XrdAdaptor::RequestManager::getFilename ( ) const
inline

Return current filename

Definition at line 101 of file XrdRequestManager.h.

References m_name.

101 {return m_name;}
static std::shared_ptr<RequestManager> XrdAdaptor::RequestManager::getInstance ( const std::string &  filename,
XrdCl::OpenFlags::Flags  flags,
XrdCl::Access::Mode  perms 
)
inlinestatic

Some of the callback handlers need a weak_ptr reference to the RequestManager. This allows the callback handler to know when it is OK to invoke RequestManager methods.

Hence, all instances need to be created through this factory function.

Definition at line 111 of file XrdRequestManager.h.

References instance, and RequestManager().

112  {
113  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
114  instance->initialize(instance);
115  return instance;
116  }
static PFTauRenderPlugin instance
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
tuple filename
Definition: lut2db_cfg.py:20
void RequestManager::getPrettyActiveSourceNames ( std::vector< std::string > &  sources)

Definition at line 420 of file XrdRequestManager.cc.

References m_activeSources, m_source_mutex, and source.

Referenced by addConnections().

421 {
422  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
423  sources.reserve(m_activeSources.size());
424  for (auto const& source : m_activeSources) {
425  sources.push_back(source->PrettyID());
426  }
427 }
std::vector< std::shared_ptr< Source > > m_activeSources
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
std::future<IOSize> XrdAdaptor::RequestManager::handle ( void *  into,
IOSize  size,
IOOffset  off 
)
inline

Interface for handling a client request.

Definition at line 53 of file XrdRequestManager.h.

54  {
55  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, into, size, off));
56  return handle(c_ptr);
57  }
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
tuple size
Write out results.
std::future< IOSize > XrdAdaptor::RequestManager::handle ( std::shared_ptr< std::vector< IOPosBuffer > >  iolist)

Definition at line 584 of file XrdRequestManager.cc.

References a, cms::Exception::addContext(), assert(), b, TauDecayModes::dec, edm::errors::FileReadError, GET_CLOCK_MONOTONIC, fileCollector::now, AlCaHLTBitMon_ParallelJobs::p, edm::CPUTimer::start(), and edm::CPUTimer::stop().

585 {
586  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
587 
588  timespec now;
589  GET_CLOCK_MONOTONIC(now);
590 
591  edm::CPUTimer timer;
592  timer.start();
593 
594  if (m_activeSources.size() == 1)
595  {
596  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
597  checkSources(now, c_ptr->getSize());
598  m_activeSources[0]->handle(c_ptr);
599  return c_ptr->get_future();
600  }
601  // Make sure active
602  else if (m_activeSources.empty())
603  {
605  ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name
606  << "', flags=0x" << std::hex << m_flags
607  << ", permissions=0" << std::oct << m_perms << std::dec
608  << ") => Source used after fatal exception.";
609  ex.addContext("In XrdAdaptor::RequestManager::handle()");
610  addConnections(ex);
611  throw ex;
612  }
613 
614  assert(iolist.get());
615  std::shared_ptr<std::vector<IOPosBuffer> > req1(new std::vector<IOPosBuffer>);
616  std::shared_ptr<std::vector<IOPosBuffer> > req2(new std::vector<IOPosBuffer>);
617  splitClientRequest(*iolist, *req1, *req2);
618 
619  checkSources(now, req1->size() + req2->size());
620  // CheckSources may have removed a source
621  if (m_activeSources.size() == 1)
622  {
623  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, iolist));
624  m_activeSources[0]->handle(c_ptr);
625  return c_ptr->get_future();
626  }
627 
628  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
629  std::future<IOSize> future1, future2;
630  if (req1->size())
631  {
632  c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
633  m_activeSources[0]->handle(c_ptr1);
634  future1 = c_ptr1->get_future();
635  }
636  if (req2->size())
637  {
638  c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
639  m_activeSources[1]->handle(c_ptr2);
640  future2 = c_ptr2->get_future();
641  }
642  if (req1->size() && req2->size())
643  {
644  std::future<IOSize> task = std::async(std::launch::deferred,
645  [](std::future<IOSize> a, std::future<IOSize> b){
646  // Wait until *both* results are available. This is essential
647  // as the callback may try referencing the RequestManager. If one
648  // throws an exception (causing the RequestManager to be destroyed by
649  // XrdFile) and the other has a failure, then the recovery code will
650  // reference the destroyed RequestManager.
651  //
652  // Unlike other places where we use shared/weak ptrs to maintain object
653  // lifetime and destruction asynchronously, we *cannot* destroy the request
654  // asynchronously as it is associated with a ROOT buffer. We must wait until we
655  // are guaranteed that XrdCl will not write into the ROOT buffer before we
656  // can return.
657  b.wait(); a.wait();
658  return b.get() + a.get();
659  },
660  std::move(future1),
661  std::move(future2));
662  timer.stop();
663  //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
664  return task;
665  }
666  else if (req1->size()) { return future1; }
667  else if (req2->size()) { return future2; }
668  else
669  { // Degenerate case - no bytes to read.
670  std::promise<IOSize> p; p.set_value(0);
671  return p.get_future();
672  }
673 }
void start()
Definition: CPUTimer.cc:74
#define GET_CLOCK_MONOTONIC(ts)
assert(m_qm.get())
void addConnections(cms::Exception &)
Times stop()
Definition: CPUTimer.cc:94
XrdCl::OpenFlags::Flags m_flags
void checkSources(timespec &now, IOSize requestSize)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2)
std::recursive_mutex m_source_mutex
std::future< IOSize > RequestManager::handle ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr)

Handle a client request. NOTE: The shared_ptr interface is required. Depending on the state of the manager, it may decide to issue multiple requests and return the first successful. In that case, some references to the client request may still be outstanding when this function returns.

Definition at line 495 of file XrdRequestManager.cc.

References assert(), checkSources(), GET_CLOCK_MONOTONIC, fileCollector::now, pickSingleSource(), and source.

496 {
497  assert(c_ptr.get());
498  timespec now;
499  GET_CLOCK_MONOTONIC(now);
500  checkSources(now, c_ptr->getSize());
501 
502  std::shared_ptr<Source> source = pickSingleSource();
503  source->handle(c_ptr);
504  return c_ptr->get_future();
505 }
#define GET_CLOCK_MONOTONIC(ts)
assert(m_qm.get())
void checkSources(timespec &now, IOSize requestSize)
std::shared_ptr< Source > pickSingleSource()
static std::string const source
Definition: EdmProvDump.cc:42
void XrdAdaptor::RequestManager::handleOpen ( XrdCl::XRootDStatus &  status,
std::shared_ptr< Source source 
)
privatevirtual

Handle the file-open response

Definition at line 538 of file XrdRequestManager.cc.

References alignCSCRings::s, XRD_ADAPTOR_LONG_OPEN_DELAY, and XRD_ADAPTOR_SHORT_OPEN_DELAY.

539 {
540  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
541  if (status.IsOK())
542  {
543  edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
544  for (const auto & s : m_activeSources)
545  {
546  if (source->ID() == s->ID())
547  {
548  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded source " << source->PrettyID()
549  << "; ignoring" << std::endl;
550  unsigned returned_count = ++m_excluded_active_count;
553  return;
554  }
555  }
556  for (const auto & s : m_inactiveSources)
557  {
558  if (source->ID() == s->ID())
559  {
560  edm::LogVerbatim("XrdAdaptorInternal") << "Xrootd server returned excluded inactive source " << source->PrettyID()
561  << "; ignoring" << std::endl;
563  return;
564  }
565  }
566  if (m_activeSources.size() < 2)
567  {
568  m_activeSources.push_back(source);
569  updateSiteInfo();
570  }
571  else
572  {
573  m_inactiveSources.push_back(source);
574  }
575  }
576  else
577  { // File-open failure - wait at least 120s before next attempt.
578  edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
580  }
581 }
void updateSiteInfo(std::string orig_site="")
std::vector< std::shared_ptr< Source > > m_inactiveSources
#define XRD_ADAPTOR_LONG_OPEN_DELAY
std::vector< std::shared_ptr< Source > > m_activeSources
std::atomic< unsigned > m_excluded_active_count
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
tuple status
Definition: ntuplemaker.py:245
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
void RequestManager::initialize ( std::weak_ptr< RequestManager selfref)
private

Some of the callback handlers (particularly, file-open one) will want to call back into the RequestManager. However, the XrdFile may have already thrown away the reference. Hence, we need a weak_ptr to the original object before we can initialize. This way, callback knows to not reference the RequestManager.

Definition at line 119 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), assert(), cms::Exception::clearAdditionalInfo(), cms::Exception::clearContext(), cms::Exception::clearMessage(), TauDecayModes::dec, XrdAdaptor::Source::determineHostExcludeString(), web.browse_db::env, mergeVDriftHistosByStation::file, reco_application_tbsim_DetSim-Digi_cfg::File, edm::errors::FileOpenError, spr::find(), GET_CLOCK_MONOTONIC, XrdAdaptor::Source::getDomain(), XrdAdaptor::Source::getHostname(), XrdAdaptor::RequestManager::OpenHandler::getInstance(), XrdAdaptor::Source::getXrootdSiteFromURL(), customizeTrackingMonitorSeedNumber::idx, m_activeSources, m_disabledExcludeStrings, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_nextActiveSourceCheck, m_open_handler, m_perms, m_source_mutex, m_timeout, prepareOpaqueString(), SendMonitoringInfo(), source, popcon_last_value_cfg::Source, ntuplemaker::status, AlCaHLTBitMon_QueryRunRegistry::string, updateSiteInfo(), and XRD_ADAPTOR_SHORT_OPEN_DELAY.

120 {
122 
123  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
124  if (env) {env->GetInt("StreamErrorWindow", m_timeout);}
125 
126  std::string orig_site;
127  if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find(".") == std::string::npos))
128  {
129  std::string hostname;
130  if (Source::getHostname(orig_site, hostname))
131  {
132  Source::getDomain(hostname, orig_site);
133  }
134  }
135 
136  std::unique_ptr<XrdCl::File> file;
138  bool validFile = false;
139  const int retries = 5;
140  std::string excludeString;
141  for (int idx=0; idx<retries; idx++)
142  {
143  file.reset(new XrdCl::File());
144  auto opaque = prepareOpaqueString();
145  std::string new_filename = m_name + (opaque.size() ? ((m_name.find("?") == m_name.npos) ? "?" : "&") + opaque : "");
146  SyncHostResponseHandler handler;
147  XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
148  if (!openStatus.IsOK())
149  { // In this case, we failed immediately - this indicates we have previously tried to talk to this
150  // server and it was marked bad - xrootd couldn't even queue up the request internally!
151  // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
152  // redirector being down or authentication failures.
153  ex.clearMessage();
154  ex.clearContext();
155  ex.clearAdditionalInfo();
156  ex << "XrdCl::File::Open(name='" << m_name
157  << "', flags=0x" << std::hex << m_flags
158  << ", permissions=0" << std::oct << m_perms << std::dec
159  << ") => error '" << openStatus.ToStr()
160  << "' (errno=" << openStatus.errNo << ", code=" << openStatus.code << ")";
161  ex.addContext("Calling XrdFile::open()");
162  ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
163  throw ex;
164  }
165  handler.WaitForResponse();
166  std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
167  std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
168  Source::determineHostExcludeString(*file, hostList.get(), excludeString);
169  assert(status);
170  if (status->IsOK())
171  {
172  validFile = true;
173  break;
174  }
175  else
176  {
177  ex.clearMessage();
178  ex.clearContext();
179  ex.clearAdditionalInfo();
180  ex << "XrdCl::File::Open(name='" << m_name
181  << "', flags=0x" << std::hex << m_flags
182  << ", permissions=0" << std::oct << m_perms << std::dec
183  << ") => error '" << status->ToStr()
184  << "' (errno=" << status->errNo << ", code=" << status->code << ")";
185  ex.addContext("Calling XrdFile::open()");
186  addConnections(ex);
187  std::string dataServer, lastUrl;
188  file->GetProperty("DataServer", dataServer);
189  file->GetProperty("LastURL", lastUrl);
190  if (dataServer.size())
191  {
192  ex.addAdditionalInfo("Problematic data server: " + dataServer);
193  }
194  if (lastUrl.size())
195  {
196  ex.addAdditionalInfo("Last URL tried: " + lastUrl);
197  edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
198  }
199  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) != m_disabledSourceStrings.end())
200  {
201  ex << ". No additional data servers were found.";
202  throw ex;
203  }
204  if (dataServer.size())
205  {
206  m_disabledSourceStrings.insert(dataServer);
207  m_disabledExcludeStrings.insert(excludeString);
208  }
209  // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
210  if (lastUrl == new_filename)
211  {
212  edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
213  throw ex;
214  }
215  }
216  }
217  if (!validFile)
218  {
219  throw ex;
220  }
221  SendMonitoringInfo(*file);
222 
223  timespec ts;
225 
226  std::shared_ptr<Source> source(new Source(ts, std::move(file), excludeString));
227  {
228  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
229  m_activeSources.push_back(source);
230  updateSiteInfo(orig_site);
231  }
232 
233  m_lastSourceCheck = ts;
234  ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
236 }
#define GET_CLOCK_MONOTONIC(ts)
assert(m_qm.get())
static void determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude)
Definition: XrdSource.cc:196
void updateSiteInfo(std::string orig_site="")
std::set< std::string > m_disabledSourceStrings
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
static void SendMonitoringInfo(XrdCl::File &file)
void addConnections(cms::Exception &)
static bool getDomain(const std::string &host, std::string &domain)
Definition: XrdSource.cc:144
std::set< std::string > m_disabledExcludeStrings
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static bool getXrootdSiteFromURL(std::string url, std::string &site)
Definition: XrdSource.cc:233
#define XRD_ADAPTOR_SHORT_OPEN_DELAY
tuple status
Definition: ntuplemaker.py:245
static bool getHostname(const std::string &id, std::string &hostname)
Definition: XrdSource.cc:116
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
std::shared_ptr< Source > RequestManager::pickSingleSource ( )
private

Picks a single source for the next operation.

Definition at line 457 of file XrdRequestManager.cc.

References addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileReadError, m_activeSources, m_flags, m_name, m_nextInitialSourceToggle, m_perms, m_source_mutex, and source.

Referenced by handle().

458 {
459  std::shared_ptr<Source> source = nullptr;
460  {
461  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
462  if (m_activeSources.size() == 2)
463  {
465  {
466  source = m_activeSources[0];
468  }
469  else
470  {
471  source = m_activeSources[1];
473  }
474  }
475  else if (m_activeSources.empty())
476  {
478  ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name
479  << "', flags=0x" << std::hex << m_flags
480  << ", permissions=0" << std::oct << m_perms << std::dec
481  << ") => Source used after fatal exception.";
482  ex.addContext("In XrdAdaptor::RequestManager::handle()");
483  addConnections(ex);
484  throw ex;
485  }
486  else
487  {
488  source = m_activeSources[0];
489  }
490  }
491  return source;
492 }
void addConnections(cms::Exception &)
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static std::string const source
Definition: EdmProvDump.cc:42
std::recursive_mutex m_source_mutex
std::string RequestManager::prepareOpaqueString ( )
private

Prepare an opaque string appropriate for asking a redirector to open the current file but avoiding servers which we already have connections to.

Definition at line 508 of file XrdRequestManager.cc.

References prof2calltree::count, m_activeSources, m_disabledExcludeStrings, m_inactiveSources, m_source_mutex, contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, and create_public_lumi_plots::tmp_str.

Referenced by initialize(), and XrdAdaptor::RequestManager::OpenHandler::open().

509 {
510  std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
511  std::stringstream ss;
512  ss << "tried=";
513  size_t count = 0;
514  for ( const auto & it : m_activeSources )
515  {
516  count++;
517  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
518  }
519  for ( const auto & it : m_inactiveSources )
520  {
521  count++;
522  ss << it->ExcludeID().substr(0, it->ExcludeID().find(":")) << ",";
523  }
524  for ( const auto & it : m_disabledExcludeStrings )
525  {
526  count++;
527  ss << it.substr(0, it.find(":")) << ",";
528  }
529  if (count)
530  {
531  std::string tmp_str = ss.str();
532  return tmp_str.substr(0, tmp_str.size()-1);
533  }
534  return "";
535 }
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::set< std::string > m_disabledExcludeStrings
std::vector< std::shared_ptr< Source > > m_activeSources
std::recursive_mutex m_source_mutex
void RequestManager::requestFailure ( std::shared_ptr< XrdAdaptor::ClientRequest c_ptr,
XrdCl::Status &  c_status 
)

Handle a failed client request.

Definition at line 676 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), addConnections(), cms::Exception::addContext(), TauDecayModes::dec, edm::errors::FileOpenError, edm::errors::FileReadError, spr::find(), GET_CLOCK_MONOTONIC, m_activeSources, m_disabledExcludeStrings, m_disabledSources, m_disabledSourceStrings, m_flags, m_lastSourceCheck, m_name, m_open_handler, m_perms, m_source_mutex, m_timeout, fileCollector::now, seconds(), ntuplemaker::status, and updateSiteInfo().

677 {
678  std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
679  std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
680 
681  // Fail early for invalid responses - XrdFile has a separate path for handling this.
682  if (c_status.code == XrdCl::errInvalidResponse)
683  {
684  edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
686  ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name
687  << "', flags=0x" << std::hex << m_flags
688  << ", permissions=0" << std::oct << m_perms << std::dec
689  << ", old source=" << source_ptr->PrettyID()
690  << ") => Invalid ReadV response from server";
691  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
692  addConnections(ex);
693  throw ex;
694  }
695  edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
696 
697  // Note that we do not delete the Source itself. That is because this
698  // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
699  // In such a case, if you close a file in the handler, it will deadlock
700  m_disabledSourceStrings.insert(source_ptr->ID());
701  m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
702  m_disabledSources.insert(source_ptr);
703 
704  if ((m_activeSources.size() > 0) && (m_activeSources[0].get() == source_ptr.get()))
705  {
706  m_activeSources.erase(m_activeSources.begin());
707  updateSiteInfo();
708  }
709  else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get()))
710  {
711  m_activeSources.erase(m_activeSources.begin()+1);
712  updateSiteInfo();
713  }
714  std::shared_ptr<Source> new_source;
715  if (m_activeSources.size() == 0)
716  {
717  std::shared_future<std::shared_ptr<Source> > future = m_open_handler->open();
718  timespec now;
719  GET_CLOCK_MONOTONIC(now);
721  // Note we only wait for 180 seconds here. This is because we've already failed
722  // once and the likelihood the program has some inconsistent state is decent.
723  // We'd much rather fail hard than deadlock!
724  sentry.unlock();
725  std::future_status status = future.wait_for(std::chrono::seconds(m_timeout+10));
726  if (status == std::future_status::timeout)
727  {
729  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
730  << "', flags=0x" << std::hex << m_flags
731  << ", permissions=0" << std::oct << m_perms << std::dec
732  << ", old source=" << source_ptr->PrettyID()
733  << ") => timeout when waiting for file open";
734  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
735  addConnections(ex);
736  throw ex;
737  }
738  else
739  {
740  try
741  {
742  new_source = future.get();
743  }
744  catch (edm::Exception &ex)
745  {
746  ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
747  ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
748  throw;
749  }
750  }
751  sentry.lock();
752 
753  if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) != m_disabledSourceStrings.end())
754  {
755  // The server gave us back a data node we requested excluded. Fatal!
757  ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name
758  << "', flags=0x" << std::hex << m_flags
759  << ", permissions=0" << std::oct << m_perms << std::dec
760  << ", old source=" << source_ptr->PrettyID()
761  << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
762  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
763  addConnections(ex);
764  throw ex;
765  }
766  m_activeSources.push_back(new_source);
767  updateSiteInfo();
768  }
769  else
770  {
771  new_source = m_activeSources[0];
772  }
773  new_source->handle(c_ptr);
774 }
double seconds()
#define GET_CLOCK_MONOTONIC(ts)
void updateSiteInfo(std::string orig_site="")
std::set< std::string > m_disabledSourceStrings
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
void addConnections(cms::Exception &)
std::set< std::shared_ptr< Source > > m_disabledSources
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
std::set< std::string > m_disabledExcludeStrings
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
void addContext(std::string const &context)
Definition: Exception.cc:227
tuple status
Definition: ntuplemaker.py:245
std::recursive_mutex m_source_mutex
void XrdAdaptor::RequestManager::splitClientRequest ( const std::vector< IOPosBuffer > &  iolist,
std::vector< IOPosBuffer > &  req1,
std::vector< IOPosBuffer > &  req2 
)
private

Given a client request, split it into two requests lists.

Definition at line 891 of file XrdRequestManager.cc.

References cms::Exception::addAdditionalInfo(), cms::Exception::addContext(), assert(), consumeChunkBack(), consumeChunkFront(), TauDecayModes::dec, edm::errors::FileReadError, prof2calltree::front, bookConverter::max, IOPosBuffer::offset(), q1, q2, python.multivaluedict::sort(), contentValuesCheck::ss, validateList(), XRD_ADAPTOR_CHUNK_THRESHOLD, and XRD_CL_MAX_CHUNK.

892 {
893  if (iolist.size() == 0) return;
894  std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
895  req1.reserve(iolist.size()/2+1);
896  req2.reserve(iolist.size()/2+1);
897  size_t front=0;
898 
899  // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
900  float q1 = static_cast<float>(m_activeSources[0]->getQuality())+5;
901  float q2 = static_cast<float>(m_activeSources[1]->getQuality())+5;
902  IOSize chunk1, chunk2;
903  // Make sure the chunk size is at least 1024; little point to reads less than that size.
904  chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
905  chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
906 
907  IOSize size_orig = 0;
908  for (const auto & it : iolist) size_orig += it.size();
909 
910  while (tmp_iolist.size()-front > 0)
911  {
912  if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) && (req2.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD))
913  { // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
914  // are passed to the request manager. However, because we have a max chunk size, we increase
915  // the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
916  // each individual chunk is >1MB could result in this firing. However, within the context of CMSSW,
917  // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
919  ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name
920  << "', flags=0x" << std::hex << m_flags
921  << ", permissions=0" << std::oct << m_perms << std::dec
922  << ") => Unable to split request between active servers. This is an unexpected internal error and should be reported to CMSSW developers.";
923  ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
924  addConnections(ex);
925  std::stringstream ss; ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
926  ex.addAdditionalInfo(ss.str());
927  std::stringstream ss2; ss2 << "Quality source 1 " << q1-5 << ", quality source 2: " << q2-5;
928  ex.addAdditionalInfo(ss2.str());
929  throw ex;
930  }
931  if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
932  if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
933  }
934  std::sort(req1.begin(), req1.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
935  std::sort(req2.begin(), req2.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
936 
937  IOSize size1 = validateList(req1);
938  IOSize size2 = validateList(req2);
939 
940  assert(size_orig == size1 + size2);
941 
942  edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig << " bytes) split into requests size " << req1.size() << " (" << size1 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
943 }
assert(m_qm.get())
#define XRD_CL_MAX_CHUNK
double q2[4]
Definition: TauolaWrapper.h:88
#define XRD_ADAPTOR_CHUNK_THRESHOLD
void addConnections(cms::Exception &)
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
XrdCl::OpenFlags::Flags m_flags
double q1[4]
Definition: TauolaWrapper.h:87
static void consumeChunkFront(size_t &front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
static IOSize validateList(const std::vector< IOPosBuffer > req)
size_t IOSize
Definition: IOTypes.h:14
static void consumeChunkBack(size_t front, std::vector< IOPosBuffer > &input, std::vector< IOPosBuffer > &output, IOSize chunksize)
void RequestManager::updateSiteInfo ( std::string  orig_site = "")
private

Anytime we potentially switch sources, update the internal site source list; alert the user if necessary.

Definition at line 240 of file XrdRequestManager.cc.

References m_activeSites, m_activeSources, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by checkSourcesImpl(), compareSources(), initialize(), and requestFailure().

241 {
242  std::string siteA, siteB, siteList;
243  if (m_activeSources.size()) {siteA = m_activeSources[0]->Site();}
244  if (m_activeSources.size() == 2) {siteB = m_activeSources[1]->Site();}
245  siteList = siteA;
246  if (siteB.size() && (siteB != siteA)) {siteList = siteA + ", " + siteB;}
247  if (orig_site.size() && (orig_site != siteList))
248  {
249  edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
250  m_activeSites = siteList;
251  }
252  else if (!orig_site.size() && (siteList != m_activeSites))
253  {
254  edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << m_activeSites;
255  m_activeSites = siteList;
256  }
257 }
std::vector< std::shared_ptr< Source > > m_activeSources

Member Data Documentation

std::string XrdAdaptor::RequestManager::m_activeSites
private

Definition at line 191 of file XrdRequestManager.h.

Referenced by updateSiteInfo().

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_activeSources
private

Note these member variables can only be accessed when the source mutex is held.

Definition at line 186 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), compareSources(), getActiveFile(), getActiveSourceNames(), getPrettyActiveSourceNames(), initialize(), pickSingleSource(), prepareOpaqueString(), requestFailure(), and updateSiteInfo().

std::set<std::string> XrdAdaptor::RequestManager::m_disabledExcludeStrings
private

Definition at line 189 of file XrdRequestManager.h.

Referenced by initialize(), prepareOpaqueString(), and requestFailure().

std::set<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_disabledSources
private

Definition at line 190 of file XrdRequestManager.h.

Referenced by requestFailure().

std::set<std::string> XrdAdaptor::RequestManager::m_disabledSourceStrings
private

Definition at line 188 of file XrdRequestManager.h.

Referenced by getDisabledSourceNames(), initialize(), and requestFailure().

std::uniform_real_distribution<float> XrdAdaptor::RequestManager::m_distribution
private

Definition at line 207 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::atomic<unsigned> XrdAdaptor::RequestManager::m_excluded_active_count
private

Definition at line 209 of file XrdRequestManager.h.

XrdCl::OpenFlags::Flags XrdAdaptor::RequestManager::m_flags
private
std::mt19937 XrdAdaptor::RequestManager::m_generator
private

Definition at line 206 of file XrdRequestManager.h.

Referenced by checkSourcesImpl().

std::vector<std::shared_ptr<Source> > XrdAdaptor::RequestManager::m_inactiveSources
private

Definition at line 187 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), compareSources(), and prepareOpaqueString().

timespec XrdAdaptor::RequestManager::m_lastSourceCheck
private

Definition at line 193 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), initialize(), and requestFailure().

const std::string XrdAdaptor::RequestManager::m_name
private
timespec XrdAdaptor::RequestManager::m_nextActiveSourceCheck
private

Definition at line 198 of file XrdRequestManager.h.

Referenced by checkSources(), checkSourcesImpl(), and initialize().

bool XrdAdaptor::RequestManager::m_nextInitialSourceToggle
private

Definition at line 196 of file XrdRequestManager.h.

Referenced by pickSingleSource().

std::shared_ptr<OpenHandler> XrdAdaptor::RequestManager::m_open_handler
private

Definition at line 264 of file XrdRequestManager.h.

Referenced by checkSourcesImpl(), initialize(), and requestFailure().

XrdCl::Access::Mode XrdAdaptor::RequestManager::m_perms
private
std::recursive_mutex XrdAdaptor::RequestManager::m_source_mutex
private
int XrdAdaptor::RequestManager::m_timeout
private

Definition at line 194 of file XrdRequestManager.h.

Referenced by initialize(), and requestFailure().

bool XrdAdaptor::RequestManager::searchMode
private

Definition at line 199 of file XrdRequestManager.h.

const unsigned int XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT = 3*60
static

Definition at line 46 of file XrdRequestManager.h.