1 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h 2 #define Utilities_XrdAdaptor_XrdRequestManager_h 7 #include <condition_variable> 11 #include <boost/utility.hpp> 12 #include "tbb/concurrent_unordered_set.h" 16 #include "XrdCl/XrdClFileSystem.hh" 29 using Key =std::shared_ptr<Source>;
31 return tbb::tbb_hasher(iKey.get());
41 :
Exception(code), m_code(xrootd_status.code)
56 static const unsigned int XRD_DEFAULT_TIMEOUT = 3*60;
65 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*
this, into,
size, off);
69 std::future<IOSize>
handle(std::shared_ptr<std::vector<IOPosBuffer> > iolist);
77 std::future<IOSize>
handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
82 void requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr,
XrdCl::Status &c_status);
88 void getActiveSourceNames(std::vector<std::string> & sources)
const;
89 void getPrettyActiveSourceNames(std::vector<std::string> & sources)
const;
95 void getDisabledSourceNames(std::vector<std::string> & sources)
const;
101 std::shared_ptr<XrdCl::File> getActiveFile()
const;
120 static std::shared_ptr<RequestManager>
124 instance->initialize(instance);
138 void initialize(std::weak_ptr<RequestManager> selfref);
143 virtual void handleOpen(XrdCl::XRootDStatus &
status, std::shared_ptr<Source>);
148 void splitClientRequest(
const std::vector<IOPosBuffer> &iolist,
149 std::vector<IOPosBuffer> &req1, std::vector<IOPosBuffer> &req2,
150 std::vector<std::shared_ptr<Source>>
const& activeSources)
const;
165 void checkSources(timespec &
now,
IOSize requestSize,
166 std::vector<std::shared_ptr<Source>>& activeSources,
167 std::vector<std::shared_ptr<Source>>& inactiveSources);
168 void checkSourcesImpl(timespec &now,
IOSize requestSize,
169 std::vector<std::shared_ptr<Source>>& activeSources,
170 std::vector<std::shared_ptr<Source>>& inactiveSources);
179 bool compareSources(
const timespec &now,
unsigned a,
unsigned b,
180 std::vector<std::shared_ptr<Source>>& activeSources,
181 std::vector<std::shared_ptr<Source>>& inactiveSources)
const;
187 void reportSiteChange(std::vector<std::shared_ptr<Source> >
const& iOld,
188 std::vector<std::shared_ptr<Source> >
const& iNew,
194 inline void updateCurrentServer();
195 void queueUpdateCurrentServer(
const std::string &);
200 std::shared_ptr<Source> pickSingleSource();
241 class OpenHandler : boost::noncopyable,
public XrdCl::ResponseHandler {
245 static std::shared_ptr<OpenHandler>
getInstance(std::weak_ptr<RequestManager> manager)
248 std::shared_ptr<OpenHandler>
instance(instance_ptr);
258 virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList)
override;
269 std::shared_future<std::shared_ptr<Source> > open();
278 OpenHandler(std::weak_ptr<RequestManager> manager);
284 std::atomic<bool> m_outstanding_open {
false};
static AlgebraicMatrix initialize()
const std::string & getFilename() const
static PFTauRenderPlugin instance
std::uniform_real_distribution< float > m_distribution
size_t operator()(const Key &iKey) const
XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
std::vector< Variable::Flags > flags
std::shared_ptr< OpenHandler > m_self
static std::shared_ptr< RequestManager > getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::weak_ptr< OpenHandler > m_self_weak
std::shared_future< std::shared_ptr< Source > > m_shared_future
std::weak_ptr< RequestManager > m_manager
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::atomic< std::string * > m_serverToAdvertise
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
~XrootdException() override
timespec m_nextActiveSourceCheck
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
XrdCl::OpenFlags::Flags m_flags
std::recursive_mutex m_mutex
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::atomic< unsigned > m_excluded_active_count
std::unique_ptr< XrdCl::File > m_file
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
std::recursive_mutex m_source_mutex
bool m_nextInitialSourceToggle
timespec m_lastSourceCheck