CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
XrdRequestManager.h
Go to the documentation of this file.
1 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h
2 #define Utilities_XrdAdaptor_XrdRequestManager_h
3 
4 #include <mutex>
5 #include <vector>
6 #include <set>
7 #include <condition_variable>
8 #include <random>
9 #include <sys/stat.h>
10 
11 #include <boost/utility.hpp>
12 
14 
15 #include "XrdCl/XrdClFileSystem.hh"
16 
17 #include "XrdRequest.h"
18 #include "XrdSource.h"
19 
20 namespace XrdCl {
21  class File;
22 }
23 
24 namespace XrdAdaptor {
25 
27 
28 public:
29 
30  XrootdException(XrdCl::Status & xrootd_status, edm::Exception::Code code)
31  : Exception(code), m_code(xrootd_status.code)
32  {}
33 
34  virtual ~XrootdException() throw() {};
35 
36  uint16_t getCode() { return m_code; }
37 
38 private:
39 
40  uint16_t m_code;
41 };
42 
43 class RequestManager : boost::noncopyable {
44 
45 public:
46  static const unsigned int XRD_DEFAULT_TIMEOUT = 3*60;
47 
48  ~RequestManager() = default;
49 
53  std::future<IOSize> handle(void * into, IOSize size, IOOffset off)
54  {
55  std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr(new XrdAdaptor::ClientRequest(*this, into, size, off));
56  return handle(c_ptr);
57  }
58 
59  std::future<IOSize> handle(std::shared_ptr<std::vector<IOPosBuffer> > iolist);
60 
67  std::future<IOSize> handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
68 
72  void requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status);
73 
78  void getActiveSourceNames(std::vector<std::string> & sources);
79  void getPrettyActiveSourceNames(std::vector<std::string> & sources);
80 
85  void getDisabledSourceNames(std::vector<std::string> & sources);
86 
91  std::shared_ptr<XrdCl::File> getActiveFile();
92 
97 
101  const std::string & getFilename() const {return m_name;}
102 
110  static std::shared_ptr<RequestManager>
111  getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
112  {
113  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
114  instance->initialize(instance);
115  return instance;
116  }
117 
118 private:
119 
120  RequestManager(const std::string & filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms);
121 
128  void initialize(std::weak_ptr<RequestManager> selfref);
129 
133  virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source>);
134 
138  void splitClientRequest(const std::vector<IOPosBuffer> &iolist, std::vector<IOPosBuffer> &req1, std::vector<IOPosBuffer> &req2);
139 
145  void broadcastRequest(const ClientRequest &, bool active);
146 
153  void checkSources(timespec &now, IOSize requestSize); // TODO: inline
154  void checkSourcesImpl(timespec &now, IOSize requestSize);
163  bool compareSources(const timespec &now, unsigned a, unsigned b);
164 
169  void updateSiteInfo(std::string orig_site="");
170 
174  inline void updateCurrentServer();
176 
180  std::shared_ptr<Source> pickSingleSource();
181 
187 
192  std::vector<std::shared_ptr<Source> > m_activeSources;
193  std::vector<std::shared_ptr<Source> > m_inactiveSources;
194  std::set<std::string> m_disabledSourceStrings;
195  std::set<std::string> m_disabledExcludeStrings;
196  std::set<std::shared_ptr<Source> > m_disabledSources;
198  // StatisticsSenderService wants to know what our current server is;
199  // this holds last-successfully-opened server name
200  std::atomic<std::string*> m_serverToAdvertise;
201 
204  // If set to true, the next active source should be 1; 0 otherwise.
206  // The time when the next active source check should be performed.
209 
211  XrdCl::OpenFlags::Flags m_flags;
213  std::recursive_mutex m_source_mutex;
214 
215  std::mt19937 m_generator;
216  std::uniform_real_distribution<float> m_distribution;
217 
218  std::atomic<unsigned> m_excluded_active_count;
219 
220  class OpenHandler : boost::noncopyable, public XrdCl::ResponseHandler {
221 
222  public:
223 
224  static std::shared_ptr<OpenHandler> getInstance(std::weak_ptr<RequestManager> manager)
225  {
226  OpenHandler *instance_ptr = new OpenHandler(manager);
227  std::shared_ptr<OpenHandler> instance(instance_ptr);
228  instance_ptr->m_self_weak = instance;
229  return instance;
230  }
231 
232  ~OpenHandler();
233 
237  virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override;
238 
248  std::shared_future<std::shared_ptr<Source> > open();
249 
254 
255  private:
256 
257  OpenHandler(std::weak_ptr<RequestManager> manager);
258  std::shared_future<std::shared_ptr<Source> > m_shared_future;
259  std::promise<std::shared_ptr<Source> > m_promise;
260  // When this is not null, there is a file-open in process
261  // Can only be touched when m_mutex is held.
262  std::unique_ptr<XrdCl::File> m_file;
263  std::recursive_mutex m_mutex;
264  std::shared_ptr<OpenHandler> m_self;
265 
266  // Always maintain a weak self-reference; when the open is in-progress,
267  // this is upgraded to a strong reference to prevent this object from
268  // deletion as long as XrdCl has not performed the callback.
269  std::weak_ptr<OpenHandler> m_self_weak;
270  std::weak_ptr<RequestManager> m_manager;
271  };
272 
273  std::shared_ptr<OpenHandler> m_open_handler;
274 };
275 
276 }
277 
278 #endif
std::shared_future< std::shared_ptr< Source > > open()
const std::string & getFilename() const
static PFTauRenderPlugin instance
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::uniform_real_distribution< float > m_distribution
XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
void updateSiteInfo(std::string orig_site="")
std::set< std::string > m_disabledSourceStrings
OpenHandler(std::weak_ptr< RequestManager > manager)
std::shared_ptr< OpenHandler > m_self
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
static std::shared_ptr< RequestManager > getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
std::weak_ptr< OpenHandler > m_self_weak
bool compareSources(const timespec &now, unsigned a, unsigned b)
static const unsigned int XRD_DEFAULT_TIMEOUT
std::shared_future< std::shared_ptr< Source > > m_shared_future
void queueUpdateCurrentServer(const std::string &)
void addConnections(cms::Exception &)
std::weak_ptr< RequestManager > m_manager
std::set< std::shared_ptr< Source > > m_disabledSources
std::vector< std::shared_ptr< Source > > m_inactiveSources
std::atomic< std::string * > m_serverToAdvertise
std::set< std::string > m_disabledExcludeStrings
std::shared_ptr< XrdCl::File > getActiveFile()
std::shared_ptr< OpenHandler > m_open_handler
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
XrdCl::OpenFlags::Flags m_flags
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
void getActiveSourceNames(std::vector< std::string > &sources)
void checkSources(timespec &now, IOSize requestSize)
std::vector< std::shared_ptr< Source > > m_activeSources
XrdCl::Access::Mode m_perms
double b
Definition: hdecay.h:120
int64_t IOOffset
Definition: IOTypes.h:19
void checkSourcesImpl(timespec &now, IOSize requestSize)
std::atomic< unsigned > m_excluded_active_count
double a
Definition: hdecay.h:121
std::shared_ptr< Source > pickSingleSource()
tuple filename
Definition: lut2db_cfg.py:20
void getDisabledSourceNames(std::vector< std::string > &sources)
void getPrettyActiveSourceNames(std::vector< std::string > &sources)
std::unique_ptr< XrdCl::File > m_file
size_t IOSize
Definition: IOTypes.h:14
tuple status
Definition: ntuplemaker.py:245
Definition: File.h:11
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2)
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
std::promise< std::shared_ptr< Source > > m_promise
tuple size
Write out results.
std::recursive_mutex m_source_mutex
void broadcastRequest(const ClientRequest &, bool active)
void initialize(std::weak_ptr< RequestManager > selfref)