1 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h
2 #define Utilities_XrdAdaptor_XrdRequestManager_h
7 #include <condition_variable>
11 #include <boost/utility.hpp>
12 #include "tbb/concurrent_unordered_set.h"
16 #include "XrdCl/XrdClFileSystem.hh"
26 namespace XrdAdaptor {
29 using Key =std::shared_ptr<Source>;
31 return tbb::tbb_hasher(iKey.get());
65 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, into,
size, off);
69 std::future<IOSize>
handle(std::shared_ptr<std::vector<IOPosBuffer> > iolist);
77 std::future<IOSize>
handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
82 void requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status);
120 static std::shared_ptr<RequestManager>
124 instance->initialize(instance);
138 void initialize(std::weak_ptr<RequestManager> selfref);
143 virtual void handleOpen(XrdCl::XRootDStatus &
status, std::shared_ptr<Source>);
149 std::vector<IOPosBuffer> &req1, std::vector<IOPosBuffer> &req2,
150 std::vector<std::shared_ptr<Source>>
const&
activeSources)
const;
167 std::vector<std::shared_ptr<Source>>& inactiveSources);
170 std::vector<std::shared_ptr<Source>>& inactiveSources);
181 std::vector<std::shared_ptr<Source>>& inactiveSources)
const;
188 std::vector<std::shared_ptr<Source> >
const& iNew,
241 class OpenHandler : boost::noncopyable,
public XrdCl::ResponseHandler {
245 static std::shared_ptr<OpenHandler>
getInstance(std::weak_ptr<RequestManager> manager)
248 std::shared_ptr<OpenHandler>
instance(instance_ptr);
269 std::shared_future<std::shared_ptr<Source> >
open();
278 OpenHandler(std::weak_ptr<RequestManager> manager);
std::shared_future< std::shared_ptr< Source > > open()
~RequestManager()=default
std::shared_ptr< XrdCl::File > getActiveFile() const
void getDisabledSourceNames(std::vector< std::string > &sources) const
const std::string & getFilename() const
static PFTauRenderPlugin instance
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::uniform_real_distribution< float > m_distribution
size_t operator()(const Key &iKey) const
XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
std::string prepareOpaqueString() const
std::vector< Variable::Flags > flags
std::atomic< bool > m_outstanding_open
OpenHandler(std::weak_ptr< RequestManager > manager)
std::shared_ptr< OpenHandler > m_self
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
static std::shared_ptr< RequestManager > getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::weak_ptr< OpenHandler > m_self_weak
static const unsigned int XRD_DEFAULT_TIMEOUT
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
std::shared_future< std::shared_ptr< Source > > m_shared_future
void queueUpdateCurrentServer(const std::string &)
std::weak_ptr< RequestManager > m_manager
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::atomic< std::string * > m_serverToAdvertise
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
timespec m_nextActiveSourceCheck
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::string current_source()
std::shared_ptr< OpenHandler > m_open_handler
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void getActiveSourceNames(std::vector< std::string > &sources) const
XrdCl::OpenFlags::Flags m_flags
std::recursive_mutex m_mutex
virtual ~XrootdException()
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::atomic< unsigned > m_excluded_active_count
std::shared_ptr< Source > pickSingleSource()
void updateCurrentServer()
std::unique_ptr< XrdCl::File > m_file
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
tuple size
Write out results.
std::recursive_mutex m_source_mutex
bool m_nextInitialSourceToggle
timespec m_lastSourceCheck
void broadcastRequest(const ClientRequest &, bool active)
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)