CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
XrdRequestManager.h
Go to the documentation of this file.
1 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h
2 #define Utilities_XrdAdaptor_XrdRequestManager_h
3 
4 #include <mutex>
5 #include <vector>
6 #include <set>
7 #include <condition_variable>
8 #include <random>
9 #include <sys/stat.h>
10 
11 #include <boost/utility.hpp>
12 #include "tbb/concurrent_unordered_set.h"
13 
15 
16 #include "XrdCl/XrdClFileSystem.hh"
17 
18 #include "XrdRequest.h"
19 #include "XrdSource.h"
20 
21 namespace XrdCl {
22  class File;
23 }
24 
25 
26 namespace XrdAdaptor {
27 
28 struct SourceHash {
29  using Key =std::shared_ptr<Source>;
30  size_t operator()(const Key& iKey) const {
31  return tbb::tbb_hasher(iKey.get());
32  }
33  };
34 
35 
37 
38 public:
39 
40  XrootdException(XrdCl::Status & xrootd_status, edm::Exception::Code code)
41  : Exception(code), m_code(xrootd_status.code)
42  {}
43 
44  virtual ~XrootdException() throw() {};
45 
46  uint16_t getCode() { return m_code; }
47 
48 private:
49 
50  uint16_t m_code;
51 };
52 
53 class RequestManager : boost::noncopyable {
54 
55 public:
56  static const unsigned int XRD_DEFAULT_TIMEOUT = 3*60;
57 
58  ~RequestManager() = default;
59 
63  std::future<IOSize> handle(void * into, IOSize size, IOOffset off)
64  {
65  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, into, size, off));
66  return handle(c_ptr);
67  }
68 
69  std::future<IOSize> handle(std::shared_ptr<std::vector<IOPosBuffer> > iolist);
70 
77  std::future<IOSize> handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
78 
82  void requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status);
83 
88  void getActiveSourceNames(std::vector<std::string> & sources) const;
89  void getPrettyActiveSourceNames(std::vector<std::string> & sources) const;
90 
95  void getDisabledSourceNames(std::vector<std::string> & sources) const;
96 
101  std::shared_ptr<XrdCl::File> getActiveFile() const;
102 
106  void addConnections(cms::Exception &) const;
107 
111  const std::string & getFilename() const {return m_name;}
112 
120  static std::shared_ptr<RequestManager>
121  getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
122  {
123  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
124  instance->initialize(instance);
125  return instance;
126  }
127 
128 private:
129 
130  RequestManager(const std::string & filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms);
131 
138  void initialize(std::weak_ptr<RequestManager> selfref);
139 
143  virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source>);
144 
148  void splitClientRequest(const std::vector<IOPosBuffer> &iolist,
149  std::vector<IOPosBuffer> &req1, std::vector<IOPosBuffer> &req2,
150  std::vector<std::shared_ptr<Source>> const& activeSources) const;
151 
157  void broadcastRequest(const ClientRequest &, bool active);
158 
165  void checkSources(timespec &now, IOSize requestSize,
166  std::vector<std::shared_ptr<Source>>& activeSources,
167  std::vector<std::shared_ptr<Source>>& inactiveSources); // TODO: inline
168  void checkSourcesImpl(timespec &now, IOSize requestSize,
169  std::vector<std::shared_ptr<Source>>& activeSources,
170  std::vector<std::shared_ptr<Source>>& inactiveSources);
179  bool compareSources(const timespec &now, unsigned a, unsigned b,
180  std::vector<std::shared_ptr<Source>>& activeSources,
181  std::vector<std::shared_ptr<Source>>& inactiveSources) const;
182 
187  void reportSiteChange(std::vector<std::shared_ptr<Source> > const& iOld,
188  std::vector<std::shared_ptr<Source> > const& iNew,
189  std::string orig_site=std::string{}) const;
190 
194  inline void updateCurrentServer();
196 
200  std::shared_ptr<Source> pickSingleSource();
201 
207 
212  std::vector<std::shared_ptr<Source> > m_activeSources;
213  std::vector<std::shared_ptr<Source> > m_inactiveSources;
214 
215  tbb::concurrent_unordered_set<std::string> m_disabledSourceStrings;
216  tbb::concurrent_unordered_set<std::string> m_disabledExcludeStrings;
217  tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> m_disabledSources;
218 
219  // StatisticsSenderService wants to know what our current server is;
220  // this holds last-successfully-opened server name
221  std::atomic<std::string*> m_serverToAdvertise;
222 
225  // If set to true, the next active source should be 1; 0 otherwise.
227  // The time when the next active source check should be performed.
230 
232  XrdCl::OpenFlags::Flags m_flags;
234  mutable std::recursive_mutex m_source_mutex;
235 
236  std::mt19937 m_generator;
237  std::uniform_real_distribution<float> m_distribution;
238 
239  std::atomic<unsigned> m_excluded_active_count;
240 
241  class OpenHandler : boost::noncopyable, public XrdCl::ResponseHandler {
242 
243  public:
244 
245  static std::shared_ptr<OpenHandler> getInstance(std::weak_ptr<RequestManager> manager)
246  {
247  OpenHandler *instance_ptr = new OpenHandler(manager);
248  std::shared_ptr<OpenHandler> instance(instance_ptr);
249  instance_ptr->m_self_weak = instance;
250  return instance;
251  }
252 
253  ~OpenHandler();
254 
258  virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override;
259 
269  std::shared_future<std::shared_ptr<Source> > open();
270 
275 
276  private:
277 
278  OpenHandler(std::weak_ptr<RequestManager> manager);
279  std::shared_future<std::shared_ptr<Source> > m_shared_future;
280  std::promise<std::shared_ptr<Source> > m_promise;
281  // Set to true only when there is an outstanding open request; not
282  // protected by m_mutex, so the caller is required to know it is in a
283  // thread-safe context.
284  std::atomic<bool> m_outstanding_open {false};
285  // Can only be touched when m_mutex is held.
286  std::unique_ptr<XrdCl::File> m_file;
287  std::recursive_mutex m_mutex;
288  std::shared_ptr<OpenHandler> m_self;
289 
290  // Always maintain a weak self-reference; when the open is in-progress,
291  // this is upgraded to a strong reference to prevent this object from
292  // deletion as long as XrdCl has not performed the callback.
293  std::weak_ptr<OpenHandler> m_self_weak;
294  std::weak_ptr<RequestManager> m_manager;
295  };
296 
297  std::shared_ptr<OpenHandler> m_open_handler;
298 };
299 
300 }
301 
302 #endif
std::shared_future< std::shared_ptr< Source > > open()
std::shared_ptr< XrdCl::File > getActiveFile() const
void getDisabledSourceNames(std::vector< std::string > &sources) const
const std::string & getFilename() const
static PFTauRenderPlugin instance
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::uniform_real_distribution< float > m_distribution
size_t operator()(const Key &iKey) const
XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
std::string prepareOpaqueString() const
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
OpenHandler(std::weak_ptr< RequestManager > manager)
std::shared_ptr< OpenHandler > m_self
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
static std::shared_ptr< RequestManager > getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::weak_ptr< OpenHandler > m_self_weak
static const unsigned int XRD_DEFAULT_TIMEOUT
void addConnections(cms::Exception &) const
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
std::shared_future< std::shared_ptr< Source > > m_shared_future
void queueUpdateCurrentServer(const std::string &)
std::weak_ptr< RequestManager > m_manager
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::atomic< std::string * > m_serverToAdvertise
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
std::shared_ptr< OpenHandler > m_open_handler
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
void getActiveSourceNames(std::vector< std::string > &sources) const
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
int64_t IOOffset
Definition: IOTypes.h:19
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
std::atomic< unsigned > m_excluded_active_count
double a
Definition: hdecay.h:121
std::shared_ptr< Source > pickSingleSource()
tuple filename
Definition: lut2db_cfg.py:20
std::unique_ptr< XrdCl::File > m_file
size_t IOSize
Definition: IOTypes.h:14
tuple status
Definition: ntuplemaker.py:245
Definition: File.h:11
void reportSiteChange(std::vector< std::shared_ptr< Source > > const &iOld, std::vector< std::shared_ptr< Source > > const &iNew, std::string orig_site=std::string{}) const
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
tuple size
Write out results.
std::recursive_mutex m_source_mutex
void broadcastRequest(const ClientRequest &, bool active)
void initialize(std::weak_ptr< RequestManager > selfref)
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)