1 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h 2 #define Utilities_XrdAdaptor_XrdRequestManager_h 7 #include <condition_variable> 11 #include <boost/utility.hpp> 12 #include "tbb/concurrent_unordered_set.h" 16 #include "XrdCl/XrdClFileSystem.hh" 28 using Key = std::shared_ptr<Source>;
29 size_t operator()(
const Key &iKey)
const {
return tbb::tbb_hasher(iKey.get()); }
35 :
Exception(code), m_code(xrootd_status.code) {}
47 static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60;
55 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, into,
size, off);
59 std::future<IOSize>
handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist);
67 std::future<IOSize>
handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
72 void requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr,
XrdCl::Status &c_status);
78 void getActiveSourceNames(std::vector<std::string> &
sources)
const;
79 void getPrettyActiveSourceNames(std::vector<std::string> &sources)
const;
85 void getDisabledSourceNames(std::vector<std::string> &sources)
const;
91 std::shared_ptr<XrdCl::File> getActiveFile()
const;
111 XrdCl::OpenFlags::Flags
flags,
114 instance->initialize(instance);
127 void initialize(std::weak_ptr<RequestManager> selfref);
132 virtual void handleOpen(XrdCl::XRootDStatus &
status, std::shared_ptr<Source>);
137 void splitClientRequest(
const std::vector<IOPosBuffer> &iolist,
138 std::vector<IOPosBuffer> &req1,
139 std::vector<IOPosBuffer> &req2,
140 std::vector<std::shared_ptr<Source>>
const &
activeSources)
const;
155 void checkSources(timespec &
now,
158 std::vector<std::shared_ptr<Source>> &inactiveSources);
159 void checkSourcesImpl(timespec &now,
162 std::vector<std::shared_ptr<Source>> &inactiveSources);
171 bool compareSources(
const timespec &now,
175 std::vector<std::shared_ptr<Source>> &inactiveSources)
const;
181 void reportSiteChange(std::vector<std::shared_ptr<Source>>
const &iOld,
182 std::vector<std::shared_ptr<Source>>
const &iNew,
188 inline void updateCurrentServer();
189 void queueUpdateCurrentServer(
const std::string &);
194 std::shared_ptr<Source> pickSingleSource();
235 class OpenHandler : boost::noncopyable,
public XrdCl::ResponseHandler {
237 static std::shared_ptr<OpenHandler>
getInstance(std::weak_ptr<RequestManager> manager) {
239 std::shared_ptr<OpenHandler>
instance(instance_ptr);
249 void HandleResponseWithHosts(XrdCl::XRootDStatus *status,
250 XrdCl::AnyObject *response,
251 XrdCl::HostList *hostList)
override;
262 std::shared_future<std::shared_ptr<Source>> open();
270 OpenHandler(std::weak_ptr<RequestManager> manager);
276 std::atomic<bool> m_outstanding_open{
false};
static AlgebraicMatrix initialize()
const std::string & getFilename() const
static PFTauRenderPlugin instance
std::uniform_real_distribution< float > m_distribution
size_t operator()(const Key &iKey) const
XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
std::shared_ptr< OpenHandler > m_self
static std::shared_ptr< RequestManager > getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::weak_ptr< OpenHandler > m_self_weak
std::shared_future< std::shared_ptr< Source > > m_shared_future
std::weak_ptr< RequestManager > m_manager
std::vector< std::shared_ptr< Source > > m_inactiveSources
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
~XrootdException() override
timespec m_nextActiveSourceCheck
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::atomic< std::string * > m_serverToAdvertise
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
std::recursive_mutex m_mutex
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::atomic< unsigned > m_excluded_active_count
std::unique_ptr< XrdCl::File > m_file
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
std::recursive_mutex m_source_mutex
bool m_nextInitialSourceToggle
timespec m_lastSourceCheck