1 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h
2 #define Utilities_XrdAdaptor_XrdRequestManager_h
7 #include <condition_variable>
11 #include "tbb/concurrent_unordered_set.h"
15 #include "XrdCl/XrdClFileSystem.hh"
24 namespace XrdAdaptor {
27 using Key = std::shared_ptr<Source>;
28 size_t operator()(
const Key &iKey)
const {
return std::hash<Key::element_type *>{}(iKey.get()); }
61 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, into,
size, off);
65 std::future<IOSize>
handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist);
73 std::future<IOSize>
handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
117 XrdCl::OpenFlags::Flags flags,
120 instance->initialize(instance);
133 void initialize(std::weak_ptr<RequestManager> selfref);
138 virtual void handleOpen(XrdCl::XRootDStatus &
status, std::shared_ptr<Source>);
144 std::vector<IOPosBuffer> &req1,
145 std::vector<IOPosBuffer> &req2,
164 std::vector<std::shared_ptr<Source>> &inactiveSources);
168 std::vector<std::shared_ptr<Source>> &inactiveSources);
181 std::vector<std::shared_ptr<Source>> &inactiveSources)
const;
246 static std::shared_ptr<OpenHandler>
getInstance(std::weak_ptr<RequestManager> manager) {
248 std::shared_ptr<OpenHandler>
instance(instance_ptr);
260 XrdCl::HostList *hostList)
override;
271 std::shared_future<std::shared_ptr<Source>>
open();
279 OpenHandler(std::weak_ptr<RequestManager> manager);
~XrootdException() noexceptoverride
virtual ~RequestManager()=default
std::shared_future< std::shared_ptr< Source > > open()
std::shared_ptr< XrdCl::File > getActiveFile() const
void getDisabledSourceNames(std::vector< std::string > &sources) const
const std::string & getFilename() const
static PFTauRenderPlugin instance
std::uniform_real_distribution< float > m_distribution
size_t operator()(const Key &iKey) const
XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
std::string prepareOpaqueString() const
OpenHandler & operator=(const OpenHandler &)=delete
std::atomic< bool > m_outstanding_open
std::shared_ptr< OpenHandler > m_self
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
static std::shared_ptr< RequestManager > getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::weak_ptr< OpenHandler > m_self_weak
static const unsigned int XRD_DEFAULT_TIMEOUT
OpenHandler(const OpenHandler &)=delete
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
std::shared_future< std::shared_ptr< Source > > m_shared_future
void queueUpdateCurrentServer(const std::string &)
std::weak_ptr< RequestManager > m_manager
RequestManager & operator=(const RequestManager &)=delete
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
std::vector< std::shared_ptr< Source > > m_inactiveSources
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
timespec m_nextActiveSourceCheck
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::atomic< std::string * > m_serverToAdvertise
std::string current_source()
std::shared_ptr< OpenHandler > m_open_handler
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void getActiveSourceNames(std::vector< std::string > &sources) const
XrdCl::OpenFlags::Flags m_flags
std::recursive_mutex m_mutex
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
edm::storage::IOOffset IOOffset
std::atomic< unsigned > m_excluded_active_count
std::shared_ptr< Source > pickSingleSource()
void updateCurrentServer()
std::unique_ptr< XrdCl::File > m_file
edm::storage::IOSize IOSize
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
RequestManager(const RequestManager &)=delete
tuple size
Write out results.
std::recursive_mutex m_source_mutex
bool m_nextInitialSourceToggle
timespec m_lastSourceCheck
void broadcastRequest(const ClientRequest &, bool active)
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)