CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
XrdRequestManager.h
Go to the documentation of this file.
1 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h
2 #define Utilities_XrdAdaptor_XrdRequestManager_h
3 
4 #include <mutex>
5 #include <vector>
6 #include <set>
7 #include <condition_variable>
8 #include <random>
9 #include <sys/stat.h>
10 
11 #include <boost/utility.hpp>
12 
14 
15 #include "XrdCl/XrdClFileSystem.hh"
16 
17 #include "XrdRequest.h"
18 #include "XrdSource.h"
19 
20 namespace XrdCl {
21  class File;
22 }
23 
24 namespace XrdAdaptor {
25 
27 
28 public:
29 
31  : Exception(code), m_code(xrootd_status.code)
32  {}
33 
34  virtual ~XrootdException() throw() {};
35 
36  uint16_t getCode() { return m_code; }
37 
38 private:
39 
40  uint16_t m_code;
41 };
42 
43 class RequestManager : boost::noncopyable {
44 
45 public:
46  static const unsigned int XRD_DEFAULT_TIMEOUT = 3*60;
47 
48  ~RequestManager() = default;
49 
53  std::future<IOSize> handle(void * into, IOSize size, IOOffset off)
54  {
55  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, into, size, off));
56  return handle(c_ptr);
57  }
58 
59  std::future<IOSize> handle(std::shared_ptr<std::vector<IOPosBuffer> > iolist);
60 
67  std::future<IOSize> handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
68 
72  void requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status);
73 
78  void getActiveSourceNames(std::vector<std::string> & sources);
79  void getPrettyActiveSourceNames(std::vector<std::string> & sources);
80 
85  void getDisabledSourceNames(std::vector<std::string> & sources);
86 
91  std::shared_ptr<XrdCl::File> getActiveFile();
92 
97 
101  const std::string & getFilename() const {return m_name;}
102 
110  static std::shared_ptr<RequestManager>
111  getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
112  {
113  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
114  instance->initialize(instance);
115  return instance;
116  }
117 
118 private:
119 
120  RequestManager(const std::string & filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms);
121 
128  void initialize(std::weak_ptr<RequestManager> selfref);
129 
133  virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source>);
134 
138  void splitClientRequest(const std::vector<IOPosBuffer> &iolist, std::vector<IOPosBuffer> &req1, std::vector<IOPosBuffer> &req2);
139 
145  void broadcastRequest(const ClientRequest &, bool active);
146 
153  void checkSources(timespec &now, IOSize requestSize); // TODO: inline
154  void checkSourcesImpl(timespec &now, IOSize requestSize);
163  bool compareSources(const timespec &now, unsigned a, unsigned b);
164 
169  void updateSiteInfo(std::string orig_site="");
170 
174  std::shared_ptr<Source> pickSingleSource();
175 
181 
186  std::vector<std::shared_ptr<Source> > m_activeSources;
187  std::vector<std::shared_ptr<Source> > m_inactiveSources;
188  std::set<std::string> m_disabledSourceStrings;
189  std::set<std::string> m_disabledExcludeStrings;
190  std::set<std::shared_ptr<Source> > m_disabledSources;
192 
195  // If set to true, the next active source should be 1; 0 otherwise.
197  // The time when the next active source check should be performed.
200 
202  XrdCl::OpenFlags::Flags m_flags;
204  std::recursive_mutex m_source_mutex;
205 
206  std::mt19937 m_generator;
207  std::uniform_real_distribution<float> m_distribution;
208 
209  std::atomic<unsigned> m_excluded_active_count;
210 
211  class OpenHandler : boost::noncopyable, public XrdCl::ResponseHandler {
212 
213  public:
214 
215  static std::shared_ptr<OpenHandler> getInstance(std::weak_ptr<RequestManager> manager)
216  {
217  OpenHandler *instance_ptr = new OpenHandler(manager);
218  std::shared_ptr<OpenHandler> instance(instance_ptr);
219  instance_ptr->m_self_weak = instance;
220  return instance;
221  }
222 
223  ~OpenHandler();
224 
228  virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override;
229 
239  std::shared_future<std::shared_ptr<Source> > open();
240 
245 
246  private:
247 
248  OpenHandler(std::weak_ptr<RequestManager> manager);
249  std::shared_future<std::shared_ptr<Source> > m_shared_future;
250  std::promise<std::shared_ptr<Source> > m_promise;
251  // When this is not null, there is a file-open in process
252  // Can only be touched when m_mutex is held.
253  std::unique_ptr<XrdCl::File> m_file;
254  std::recursive_mutex m_mutex;
255  std::shared_ptr<OpenHandler> m_self;
256 
257  // Always maintain a weak self-reference; when the open is in-progress,
258  // this is upgraded to a strong reference to prevent this object from
259  // deletion as long as XrdCl has not performed the callback.
260  std::weak_ptr<OpenHandler> m_self_weak;
261  std::weak_ptr<RequestManager> m_manager;
262  };
263 
264  std::shared_ptr<OpenHandler> m_open_handler;
265 };
266 
267 }
268 
269 #endif
std::shared_future< std::shared_ptr< Source > > open()
const std::string & getFilename() const
static PFTauRenderPlugin instance
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::uniform_real_distribution< float > m_distribution
XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
void updateSiteInfo(std::string orig_site="")
std::set< std::string > m_disabledSourceStrings
OpenHandler(std::weak_ptr< RequestManager > manager)
std::shared_ptr< OpenHandler > m_self
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
static std::shared_ptr< RequestManager > getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::weak_ptr< OpenHandler > m_self_weak
bool compareSources(const timespec &now, unsigned a, unsigned b)
static const unsigned int XRD_DEFAULT_TIMEOUT
std::shared_future< std::shared_ptr< Source > > m_shared_future
void addConnections(cms::Exception &)
std::weak_ptr< RequestManager > m_manager
std::set< std::shared_ptr< Source > > m_disabledSources
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::set< std::string > m_disabledExcludeStrings
std::shared_ptr< XrdCl::File > getActiveFile()
std::shared_ptr< OpenHandler > m_open_handler
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void getActiveSourceNames(std::vector< std::string > &sources)
void checkSources(timespec &now, IOSize requestSize)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
int64_t IOOffset
Definition: IOTypes.h:19
void checkSourcesImpl(timespec &now, IOSize requestSize)
std::atomic< unsigned > m_excluded_active_count
double a
Definition: hdecay.h:121
std::shared_ptr< Source > pickSingleSource()
tuple filename
Definition: lut2db_cfg.py:20
void getDisabledSourceNames(std::vector< std::string > &sources)
void getPrettyActiveSourceNames(std::vector< std::string > &sources)
std::unique_ptr< XrdCl::File > m_file
size_t IOSize
Definition: IOTypes.h:14
tuple status
Definition: ntuplemaker.py:245
Definition: File.h:11
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2)
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
tuple size
Write out results.
std::recursive_mutex m_source_mutex
void broadcastRequest(const ClientRequest &, bool active)
void initialize(std::weak_ptr< RequestManager > selfref)