CMS 3D CMS Logo

XrdRequestManager.h
Go to the documentation of this file.
1 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h
2 #define Utilities_XrdAdaptor_XrdRequestManager_h
3 
4 #include <mutex>
5 #include <vector>
6 #include <set>
7 #include <condition_variable>
8 #include <random>
9 #include <sys/stat.h>
10 
11 #include "oneapi/tbb/concurrent_unordered_set.h"
12 
14 
15 #include "XrdCl/XrdClFileSystem.hh"
16 
17 #include "XrdRequest.h"
18 #include "XrdSource.h"
19 
20 namespace XrdCl {
21  class File;
22 }
23 
24 namespace XrdAdaptor {
25 
26  struct SourceHash {
27  using Key = std::shared_ptr<Source>;
28  size_t operator()(const Key &iKey) const { return std::hash<Key::element_type *>{}(iKey.get()); }
29  };
30 
32  public:
34  : Exception(code), m_code(xrootd_status.code) {}
35 
36  ~XrootdException() noexcept override{};
37 
38  uint16_t getCode() { return m_code; }
39 
40  private:
41  uint16_t m_code;
42  };
43 
45  public:
49 
50  RequestManager(const RequestManager &) = delete;
51  RequestManager &operator=(const RequestManager &) = delete;
52 
53  static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60;
54 
55  virtual ~RequestManager() = default;
56 
60  std::future<IOSize> handle(void *into, IOSize size, IOOffset off) {
61  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
62  return handle(c_ptr);
63  }
64 
65  std::future<IOSize> handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist);
66 
73  std::future<IOSize> handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
74 
78  void requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status);
79 
84  void getActiveSourceNames(std::vector<std::string> &sources) const;
85  void getPrettyActiveSourceNames(std::vector<std::string> &sources) const;
86 
91  void getDisabledSourceNames(std::vector<std::string> &sources) const;
92 
97  std::shared_ptr<XrdCl::File> getActiveFile() const;
98 
102  void addConnections(cms::Exception &) const;
103 
107  const std::string &getFilename() const { return m_name; }
108 
116  static std::shared_ptr<RequestManager> getInstance(const std::string &filename,
117  XrdCl::OpenFlags::Flags flags,
118  XrdCl::Access::Mode perms) {
119  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
120  instance->initialize(instance);
121  return instance;
122  }
123 
124  private:
125  RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms);
126 
133  void initialize(std::weak_ptr<RequestManager> selfref);
134 
138  virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source>);
139 
143  void splitClientRequest(const std::vector<IOPosBuffer> &iolist,
144  std::vector<IOPosBuffer> &req1,
145  std::vector<IOPosBuffer> &req2,
146  std::vector<std::shared_ptr<Source>> const &activeSources) const;
147 
153  void broadcastRequest(const ClientRequest &, bool active);
154 
161  void checkSources(timespec &now,
162  IOSize requestSize,
163  std::vector<std::shared_ptr<Source>> &activeSources,
164  std::vector<std::shared_ptr<Source>> &inactiveSources); // TODO: inline
165  void checkSourcesImpl(timespec &now,
166  IOSize requestSize,
167  std::vector<std::shared_ptr<Source>> &activeSources,
168  std::vector<std::shared_ptr<Source>> &inactiveSources);
177  bool compareSources(const timespec &now,
178  unsigned a,
179  unsigned b,
180  std::vector<std::shared_ptr<Source>> &activeSources,
181  std::vector<std::shared_ptr<Source>> &inactiveSources) const;
182 
187  void reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
188  std::vector<std::shared_ptr<Source>> const &iNew,
189  std::string orig_site = std::string{}) const;
190 
194  inline void updateCurrentServer();
196 
200  std::shared_ptr<Source> pickSingleSource();
201 
207 
212  std::vector<std::shared_ptr<Source>> m_activeSources;
213  std::vector<std::shared_ptr<Source>> m_inactiveSources;
214 
215  oneapi::tbb::concurrent_unordered_set<std::string> m_disabledSourceStrings;
216  oneapi::tbb::concurrent_unordered_set<std::string> m_disabledExcludeStrings;
217  oneapi::tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> m_disabledSources;
218 
219  // StatisticsSenderService wants to know what our current server is;
220  // this holds last-successfully-opened server name
221  std::atomic<std::string *> m_serverToAdvertise;
222 
225  // If set to true, the next active source should be 1; 0 otherwise.
227  // The time when the next active source check should be performed.
231 
233  XrdCl::OpenFlags::Flags m_flags;
235  mutable std::recursive_mutex m_source_mutex;
236 
237  std::mt19937 m_generator;
238  std::uniform_real_distribution<float> m_distribution;
239 
240  std::atomic<unsigned> m_excluded_active_count;
241 
242  class OpenHandler : public XrdCl::ResponseHandler {
243  public:
244  OpenHandler(const OpenHandler &) = delete;
245  OpenHandler &operator=(const OpenHandler &) = delete;
246 
247  static std::shared_ptr<OpenHandler> getInstance(std::weak_ptr<RequestManager> manager) {
248  OpenHandler *instance_ptr = new OpenHandler(manager);
249  std::shared_ptr<OpenHandler> instance(instance_ptr);
250  instance_ptr->m_self_weak = instance;
251  return instance;
252  }
253 
254  ~OpenHandler() override;
255 
259  void HandleResponseWithHosts(XrdCl::XRootDStatus *status,
260  XrdCl::AnyObject *response,
261  XrdCl::HostList *hostList) override;
262 
272  std::shared_future<std::shared_ptr<Source>> open();
273 
278 
279  private:
280  OpenHandler(std::weak_ptr<RequestManager> manager);
281  std::shared_future<std::shared_ptr<Source>> m_shared_future;
282  std::promise<std::shared_ptr<Source>> m_promise;
283  // Set to true only when there is an outstanding open request; not
284  // protected by m_mutex, so the caller is required to know it is in a
285  // thread-safe context.
286  std::atomic<bool> m_outstanding_open{false};
287  // Can only be touched when m_mutex is held.
288  std::unique_ptr<XrdCl::File> m_file;
289  std::recursive_mutex m_mutex;
290  std::shared_ptr<OpenHandler> m_self;
291 
292  // Always maintain a weak self-reference; when the open is in-progress,
293  // this is upgraded to a strong reference to prevent this object from
294  // deletion as long as XrdCl has not performed the callback.
295  std::weak_ptr<OpenHandler> m_self_weak;
296  std::weak_ptr<RequestManager> m_manager;
297  };
298 
299  std::shared_ptr<OpenHandler> m_open_handler;
300  };
301 
302 } // namespace XrdAdaptor
303 
304 #endif
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
size
Write out results.
int64_t IOOffset
Definition: IOTypes.h:20
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
virtual ~RequestManager()=default
~XrootdException() noexcept override
std::shared_future< std::shared_ptr< Source > > open()
static PFTauRenderPlugin instance
const std::string & getFilename() const
std::uniform_real_distribution< float > m_distribution
XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
OpenHandler & operator=(const OpenHandler &)=delete
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::shared_ptr< OpenHandler > m_self
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
std::string prepareOpaqueString() const
static std::shared_ptr< RequestManager > getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::weak_ptr< OpenHandler > m_self_weak
static const unsigned int XRD_DEFAULT_TIMEOUT
OpenHandler(const OpenHandler &)=delete
std::shared_future< std::shared_ptr< Source > > m_shared_future
void queueUpdateCurrentServer(const std::string &)
std::weak_ptr< RequestManager > m_manager
RequestManager & operator=(const RequestManager &)=delete
oneapi::tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
std::vector< std::shared_ptr< Source > > m_inactiveSources
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void addConnections(cms::Exception &) const
std::shared_ptr< XrdCl::File > getActiveFile() const
std::atomic< std::string * > m_serverToAdvertise
std::shared_ptr< OpenHandler > m_open_handler
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
size_t IOSize
Definition: IOTypes.h:15
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:118
size_t operator()(const Key &iKey) const
edm::storage::IOOffset IOOffset
std::atomic< unsigned > m_excluded_active_count
void getDisabledSourceNames(std::vector< std::string > &sources) const
double a
Definition: hdecay.h:119
std::shared_ptr< Source > pickSingleSource()
oneapi::tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::unique_ptr< XrdCl::File > m_file
edm::storage::IOSize IOSize
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
RequestManager(const RequestManager &)=delete
void getActiveSourceNames(std::vector< std::string > &sources) const
std::recursive_mutex m_source_mutex
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
void broadcastRequest(const ClientRequest &, bool active)
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)