CMS 3D CMS Logo

XrdRequestManager.h
Go to the documentation of this file.
1 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h
2 #define Utilities_XrdAdaptor_XrdRequestManager_h
3 
4 #include <mutex>
5 #include <vector>
6 #include <set>
7 #include <condition_variable>
8 #include <random>
9 #include <sys/stat.h>
10 
11 #include <boost/utility.hpp>
12 #include "tbb/concurrent_unordered_set.h"
13 
15 
16 #include "XrdCl/XrdClFileSystem.hh"
17 
18 #include "XrdRequest.h"
19 #include "XrdSource.h"
20 
21 namespace XrdCl {
22  class File;
23 }
24 
25 namespace XrdAdaptor {
26 
27  struct SourceHash {
28  using Key = std::shared_ptr<Source>;
29  size_t operator()(const Key &iKey) const { return tbb::tbb_hasher(iKey.get()); }
30  };
31 
33  public:
35  : Exception(code), m_code(xrootd_status.code) {}
36 
37  ~XrootdException() noexcept override{};
38 
39  uint16_t getCode() { return m_code; }
40 
41  private:
42  uint16_t m_code;
43  };
44 
45  class RequestManager : boost::noncopyable {
46  public:
47  static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60;
48 
49  virtual ~RequestManager() = default;
50 
54  std::future<IOSize> handle(void *into, IOSize size, IOOffset off) {
55  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
56  return handle(c_ptr);
57  }
58 
59  std::future<IOSize> handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist);
60 
67  std::future<IOSize> handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
68 
72  void requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status);
73 
78  void getActiveSourceNames(std::vector<std::string> &sources) const;
79  void getPrettyActiveSourceNames(std::vector<std::string> &sources) const;
80 
85  void getDisabledSourceNames(std::vector<std::string> &sources) const;
86 
91  std::shared_ptr<XrdCl::File> getActiveFile() const;
92 
96  void addConnections(cms::Exception &) const;
97 
101  const std::string &getFilename() const { return m_name; }
102 
110  static std::shared_ptr<RequestManager> getInstance(const std::string &filename,
111  XrdCl::OpenFlags::Flags flags,
112  XrdCl::Access::Mode perms) {
113  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
114  instance->initialize(instance);
115  return instance;
116  }
117 
118  private:
119  RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms);
120 
127  void initialize(std::weak_ptr<RequestManager> selfref);
128 
132  virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source>);
133 
137  void splitClientRequest(const std::vector<IOPosBuffer> &iolist,
138  std::vector<IOPosBuffer> &req1,
139  std::vector<IOPosBuffer> &req2,
140  std::vector<std::shared_ptr<Source>> const &activeSources) const;
141 
147  void broadcastRequest(const ClientRequest &, bool active);
148 
155  void checkSources(timespec &now,
156  IOSize requestSize,
157  std::vector<std::shared_ptr<Source>> &activeSources,
158  std::vector<std::shared_ptr<Source>> &inactiveSources); // TODO: inline
159  void checkSourcesImpl(timespec &now,
160  IOSize requestSize,
161  std::vector<std::shared_ptr<Source>> &activeSources,
162  std::vector<std::shared_ptr<Source>> &inactiveSources);
171  bool compareSources(const timespec &now,
172  unsigned a,
173  unsigned b,
174  std::vector<std::shared_ptr<Source>> &activeSources,
175  std::vector<std::shared_ptr<Source>> &inactiveSources) const;
176 
181  void reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
182  std::vector<std::shared_ptr<Source>> const &iNew,
183  std::string orig_site = std::string{}) const;
184 
188  inline void updateCurrentServer();
189  void queueUpdateCurrentServer(const std::string &);
190 
194  std::shared_ptr<Source> pickSingleSource();
195 
200  std::string prepareOpaqueString() const;
201 
206  std::vector<std::shared_ptr<Source>> m_activeSources;
207  std::vector<std::shared_ptr<Source>> m_inactiveSources;
208 
209  tbb::concurrent_unordered_set<std::string> m_disabledSourceStrings;
210  tbb::concurrent_unordered_set<std::string> m_disabledExcludeStrings;
211  tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> m_disabledSources;
212 
213  // StatisticsSenderService wants to know what our current server is;
214  // this holds last-successfully-opened server name
215  std::atomic<std::string *> m_serverToAdvertise;
216 
219  // If set to true, the next active source should be 1; 0 otherwise.
221  // The time when the next active source check should be performed.
224 
226  XrdCl::OpenFlags::Flags m_flags;
228  mutable std::recursive_mutex m_source_mutex;
229 
230  std::mt19937 m_generator;
231  std::uniform_real_distribution<float> m_distribution;
232 
233  std::atomic<unsigned> m_excluded_active_count;
234 
235  class OpenHandler : boost::noncopyable, public XrdCl::ResponseHandler {
236  public:
237  static std::shared_ptr<OpenHandler> getInstance(std::weak_ptr<RequestManager> manager) {
238  OpenHandler *instance_ptr = new OpenHandler(manager);
239  std::shared_ptr<OpenHandler> instance(instance_ptr);
240  instance_ptr->m_self_weak = instance;
241  return instance;
242  }
243 
244  ~OpenHandler() override;
245 
249  void HandleResponseWithHosts(XrdCl::XRootDStatus *status,
250  XrdCl::AnyObject *response,
251  XrdCl::HostList *hostList) override;
252 
262  std::shared_future<std::shared_ptr<Source>> open();
263 
267  std::string current_source();
268 
269  private:
270  OpenHandler(std::weak_ptr<RequestManager> manager);
271  std::shared_future<std::shared_ptr<Source>> m_shared_future;
272  std::promise<std::shared_ptr<Source>> m_promise;
273  // Set to true only when there is an outstanding open request; not
274  // protected by m_mutex, so the caller is required to know it is in a
275  // thread-safe context.
276  std::atomic<bool> m_outstanding_open{false};
277  // Can only be touched when m_mutex is held.
278  std::unique_ptr<XrdCl::File> m_file;
279  std::recursive_mutex m_mutex;
280  std::shared_ptr<OpenHandler> m_self;
281 
282  // Always maintain a weak self-reference; when the open is in-progress,
283  // this is upgraded to a strong reference to prevent this object from
284  // deletion as long as XrdCl has not performed the callback.
285  std::weak_ptr<OpenHandler> m_self_weak;
286  std::weak_ptr<RequestManager> m_manager;
287  };
288 
289  std::shared_ptr<OpenHandler> m_open_handler;
290  };
291 
292 } // namespace XrdAdaptor
293 
294 #endif
size
Write out results.
static AlgebraicMatrix initialize()
const std::string & getFilename() const
static PFTauRenderPlugin instance
std::uniform_real_distribution< float > m_distribution
size_t operator()(const Key &iKey) const
XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
std::shared_ptr< OpenHandler > m_self
static std::shared_ptr< RequestManager > getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::weak_ptr< OpenHandler > m_self_weak
std::shared_future< std::shared_ptr< Source > > m_shared_future
std::weak_ptr< RequestManager > m_manager
std::vector< std::shared_ptr< Source > > m_inactiveSources
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::atomic< std::string * > m_serverToAdvertise
std::shared_ptr< OpenHandler > m_open_handler
#define noexcept
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:118
int64_t IOOffset
Definition: IOTypes.h:19
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::atomic< unsigned > m_excluded_active_count
double a
Definition: hdecay.h:119
std::unique_ptr< XrdCl::File > m_file
size_t IOSize
Definition: IOTypes.h:14
Definition: File.h:11
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
std::recursive_mutex m_source_mutex