|
|
Go to the documentation of this file. 1 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h
2 #define Utilities_XrdAdaptor_XrdRequestManager_h
7 #include <condition_variable>
11 #include <boost/utility.hpp>
12 #include "tbb/concurrent_unordered_set.h"
16 #include "XrdCl/XrdClFileSystem.hh"
28 using Key = std::shared_ptr<Source>;
29 size_t operator()(
const Key &iKey)
const {
return std::hash<Key::element_type *>{}(iKey.get()); }
55 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, into,
size, off);
59 std::future<IOSize>
handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist);
67 std::future<IOSize>
handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
111 XrdCl::OpenFlags::Flags
flags,
127 void initialize(std::weak_ptr<RequestManager> selfref);
132 virtual void handleOpen(XrdCl::XRootDStatus &
status, std::shared_ptr<Source>);
138 std::vector<IOPosBuffer> &req1,
139 std::vector<IOPosBuffer> &req2,
140 std::vector<std::shared_ptr<Source>>
const &
activeSources)
const;
158 std::vector<std::shared_ptr<Source>> &inactiveSources);
162 std::vector<std::shared_ptr<Source>> &inactiveSources);
175 std::vector<std::shared_ptr<Source>> &inactiveSources)
const;
182 std::vector<std::shared_ptr<Source>>
const &iNew,
235 class OpenHandler : boost::noncopyable,
public XrdCl::ResponseHandler {
237 static std::shared_ptr<OpenHandler>
getInstance(std::weak_ptr<RequestManager> manager) {
239 std::shared_ptr<OpenHandler>
instance(instance_ptr);
250 XrdCl::AnyObject *response,
251 XrdCl::HostList *hostList)
override;
262 std::shared_future<std::shared_ptr<Source>>
open();
270 OpenHandler(std::weak_ptr<RequestManager> manager);
~XrootdException() noexcept override
std::recursive_mutex m_mutex
XrdCl::OpenFlags::Flags m_flags
std::shared_ptr< XrdCl::File > getActiveFile() const
void queueUpdateCurrentServer(const std::string &)
timespec m_nextActiveSourceCheck
void addConnections(cms::Exception &) const
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::shared_ptr< OpenHandler > m_open_handler
std::atomic< unsigned > m_excluded_active_count
XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
void getDisabledSourceNames(std::vector< std::string > &sources) const
std::vector< std::shared_ptr< Source > > m_activeSources
std::shared_future< std::shared_ptr< Source > > open()
XrdCl::Access::Mode m_perms
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::atomic< std::string * > m_serverToAdvertise
std::shared_future< std::shared_ptr< Source > > m_shared_future
void updateCurrentServer()
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void broadcastRequest(const ClientRequest &, bool active)
timespec m_lastSourceCheck
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
std::shared_ptr< Source > pickSingleSource()
std::unique_ptr< XrdCl::File > m_file
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
std::weak_ptr< OpenHandler > m_self_weak
std::atomic< bool > m_outstanding_open
void initialize(std::weak_ptr< RequestManager > selfref)
void getActiveSourceNames(std::vector< std::string > &sources) const
const std::string & getFilename() const
bool m_nextInitialSourceToggle
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
virtual ~RequestManager()=default
std::uniform_real_distribution< float > m_distribution
std::recursive_mutex m_source_mutex
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
static PFTauRenderPlugin instance
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
size_t operator()(const Key &iKey) const
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::shared_ptr< OpenHandler > m_self
std::string prepareOpaqueString() const
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
static const unsigned int XRD_DEFAULT_TIMEOUT
std::string current_source()
std::weak_ptr< RequestManager > m_manager
OpenHandler(std::weak_ptr< RequestManager > manager)
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
static std::shared_ptr< RequestManager > getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::promise< std::shared_ptr< Source > > m_promise