CMS 3D CMS Logo

XrdRequestManager.h
Go to the documentation of this file.
1 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h
2 #define Utilities_XrdAdaptor_XrdRequestManager_h
3 
4 #include <mutex>
5 #include <vector>
6 #include <set>
7 #include <condition_variable>
8 #include <random>
9 #include <sys/stat.h>
10 
11 #include <boost/utility.hpp>
12 #include "tbb/concurrent_unordered_set.h"
13 
15 
16 #include "XrdCl/XrdClFileSystem.hh"
17 
18 #include "XrdRequest.h"
19 #include "XrdSource.h"
20 
21 namespace XrdCl {
22  class File;
23 }
24 
25 namespace XrdAdaptor {
26 
27  struct SourceHash {
28  using Key = std::shared_ptr<Source>;
29  size_t operator()(const Key &iKey) const { return std::hash<Key::element_type *>{}(iKey.get()); }
30  };
31 
33  public:
35  : Exception(code), m_code(xrootd_status.code) {}
36 
37  ~XrootdException() noexcept override{};
38 
39  uint16_t getCode() { return m_code; }
40 
41  private:
42  uint16_t m_code;
43  };
44 
45  class RequestManager : boost::noncopyable {
46  public:
47  static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60;
48 
49  virtual ~RequestManager() = default;
50 
54  std::future<IOSize> handle(void *into, IOSize size, IOOffset off) {
55  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
56  return handle(c_ptr);
57  }
58 
59  std::future<IOSize> handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist);
60 
67  std::future<IOSize> handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
68 
72  void requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status);
73 
78  void getActiveSourceNames(std::vector<std::string> &sources) const;
79  void getPrettyActiveSourceNames(std::vector<std::string> &sources) const;
80 
85  void getDisabledSourceNames(std::vector<std::string> &sources) const;
86 
91  std::shared_ptr<XrdCl::File> getActiveFile() const;
92 
96  void addConnections(cms::Exception &) const;
97 
101  const std::string &getFilename() const { return m_name; }
102 
110  static std::shared_ptr<RequestManager> getInstance(const std::string &filename,
111  XrdCl::OpenFlags::Flags flags,
112  XrdCl::Access::Mode perms) {
113  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
114  instance->initialize(instance);
115  return instance;
116  }
117 
118  private:
119  RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms);
120 
127  void initialize(std::weak_ptr<RequestManager> selfref);
128 
132  virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source>);
133 
137  void splitClientRequest(const std::vector<IOPosBuffer> &iolist,
138  std::vector<IOPosBuffer> &req1,
139  std::vector<IOPosBuffer> &req2,
140  std::vector<std::shared_ptr<Source>> const &activeSources) const;
141 
147  void broadcastRequest(const ClientRequest &, bool active);
148 
155  void checkSources(timespec &now,
156  IOSize requestSize,
157  std::vector<std::shared_ptr<Source>> &activeSources,
158  std::vector<std::shared_ptr<Source>> &inactiveSources); // TODO: inline
159  void checkSourcesImpl(timespec &now,
160  IOSize requestSize,
161  std::vector<std::shared_ptr<Source>> &activeSources,
162  std::vector<std::shared_ptr<Source>> &inactiveSources);
171  bool compareSources(const timespec &now,
172  unsigned a,
173  unsigned b,
174  std::vector<std::shared_ptr<Source>> &activeSources,
175  std::vector<std::shared_ptr<Source>> &inactiveSources) const;
176 
181  void reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
182  std::vector<std::shared_ptr<Source>> const &iNew,
183  std::string orig_site = std::string{}) const;
184 
188  inline void updateCurrentServer();
190 
194  std::shared_ptr<Source> pickSingleSource();
195 
201 
206  std::vector<std::shared_ptr<Source>> m_activeSources;
207  std::vector<std::shared_ptr<Source>> m_inactiveSources;
208 
209  tbb::concurrent_unordered_set<std::string> m_disabledSourceStrings;
210  tbb::concurrent_unordered_set<std::string> m_disabledExcludeStrings;
211  tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> m_disabledSources;
212 
213  // StatisticsSenderService wants to know what our current server is;
214  // this holds last-successfully-opened server name
215  std::atomic<std::string *> m_serverToAdvertise;
216 
219  // If set to true, the next active source should be 1; 0 otherwise.
221  // The time when the next active source check should be performed.
224 
226  XrdCl::OpenFlags::Flags m_flags;
228  mutable std::recursive_mutex m_source_mutex;
229 
230  std::mt19937 m_generator;
231  std::uniform_real_distribution<float> m_distribution;
232 
233  std::atomic<unsigned> m_excluded_active_count;
234 
235  class OpenHandler : boost::noncopyable, public XrdCl::ResponseHandler {
236  public:
237  static std::shared_ptr<OpenHandler> getInstance(std::weak_ptr<RequestManager> manager) {
238  OpenHandler *instance_ptr = new OpenHandler(manager);
239  std::shared_ptr<OpenHandler> instance(instance_ptr);
240  instance_ptr->m_self_weak = instance;
241  return instance;
242  }
243 
244  ~OpenHandler() override;
245 
249  void HandleResponseWithHosts(XrdCl::XRootDStatus *status,
250  XrdCl::AnyObject *response,
251  XrdCl::HostList *hostList) override;
252 
262  std::shared_future<std::shared_ptr<Source>> open();
263 
268 
269  private:
270  OpenHandler(std::weak_ptr<RequestManager> manager);
271  std::shared_future<std::shared_ptr<Source>> m_shared_future;
272  std::promise<std::shared_ptr<Source>> m_promise;
273  // Set to true only when there is an outstanding open request; not
274  // protected by m_mutex, so the caller is required to know it is in a
275  // thread-safe context.
276  std::atomic<bool> m_outstanding_open{false};
277  // Can only be touched when m_mutex is held.
278  std::unique_ptr<XrdCl::File> m_file;
279  std::recursive_mutex m_mutex;
280  std::shared_ptr<OpenHandler> m_self;
281 
282  // Always maintain a weak self-reference; when the open is in-progress,
283  // this is upgraded to a strong reference to prevent this object from
284  // deletion as long as XrdCl has not performed the callback.
285  std::weak_ptr<OpenHandler> m_self_weak;
286  std::weak_ptr<RequestManager> m_manager;
287  };
288 
289  std::shared_ptr<OpenHandler> m_open_handler;
290  };
291 
292 } // namespace XrdAdaptor
293 
294 #endif
XrdAdaptor::XrootdException::~XrootdException
~XrootdException() noexcept override
Definition: XrdRequestManager.h:37
XrdAdaptor::RequestManager::OpenHandler::m_mutex
std::recursive_mutex m_mutex
Definition: XrdRequestManager.h:279
XrdAdaptor::RequestManager::m_flags
XrdCl::OpenFlags::Flags m_flags
Definition: XrdRequestManager.h:226
XrdAdaptor::RequestManager::getActiveFile
std::shared_ptr< XrdCl::File > getActiveFile() const
Definition: XrdRequestManager.cc:445
XrdAdaptor::RequestManager::queueUpdateCurrentServer
void queueUpdateCurrentServer(const std::string &)
Definition: XrdRequestManager.cc:251
XrdAdaptor::RequestManager::m_generator
std::mt19937 m_generator
Definition: XrdRequestManager.h:230
HcalTopologyMode::Mode
Mode
Definition: HcalTopologyMode.h:26
XrdAdaptor::RequestManager::m_nextActiveSourceCheck
timespec m_nextActiveSourceCheck
Definition: XrdRequestManager.h:222
XrdAdaptor::RequestManager::addConnections
void addConnections(cms::Exception &) const
Definition: XrdRequestManager.cc:481
XrdAdaptor::RequestManager::m_disabledExcludeStrings
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
Definition: XrdRequestManager.h:210
mps_update.status
status
Definition: mps_update.py:69
XrdAdaptor::RequestManager::m_inactiveSources
std::vector< std::shared_ptr< Source > > m_inactiveSources
Definition: XrdRequestManager.h:207
XrdAdaptor::RequestManager::OpenHandler
Definition: XrdRequestManager.h:235
XrdAdaptor::RequestManager::m_open_handler
std::shared_ptr< OpenHandler > m_open_handler
Definition: XrdRequestManager.h:289
XrdAdaptor::RequestManager::m_excluded_active_count
std::atomic< unsigned > m_excluded_active_count
Definition: XrdRequestManager.h:233
XrdAdaptor::XrootdException::XrootdException
XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
Definition: XrdRequestManager.h:34
XrdAdaptor::XrootdException::getCode
uint16_t getCode()
Definition: XrdRequestManager.h:39
XrdAdaptor::RequestManager::getDisabledSourceNames
void getDisabledSourceNames(std::vector< std::string > &sources) const
Definition: XrdRequestManager.cc:474
XrdAdaptor::RequestManager::m_activeSources
std::vector< std::shared_ptr< Source > > m_activeSources
Definition: XrdRequestManager.h:206
XrdSource.h
XrdAdaptor::RequestManager::OpenHandler::open
std::shared_future< std::shared_ptr< Source > > open()
Definition: XrdRequestManager.cc:1035
XrdAdaptor::RequestManager::m_perms
XrdCl::Access::Mode m_perms
Definition: XrdRequestManager.h:227
edm::errors::ErrorCodes
ErrorCodes
Definition: EDMException.h:22
CalibrationSummaryClient_cfi.sources
sources
Definition: CalibrationSummaryClient_cfi.py:23
XrdAdaptor::RequestManager::m_disabledSources
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
Definition: XrdRequestManager.h:211
XrdAdaptor::RequestManager::m_serverToAdvertise
std::atomic< std::string * > m_serverToAdvertise
Definition: XrdRequestManager.h:215
btagGenBb_cfi.Status
Status
Definition: btagGenBb_cfi.py:4
XrdAdaptor::RequestManager::OpenHandler::m_shared_future
std::shared_future< std::shared_ptr< Source > > m_shared_future
Definition: XrdRequestManager.h:271
XrdAdaptor::RequestManager::updateCurrentServer
void updateCurrentServer()
Definition: XrdRequestManager.cc:234
XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
Definition: XrdRequestManager.cc:967
edm::Exception
Definition: EDMException.h:77
EDMException.h
XrdAdaptor::RequestManager::broadcastRequest
void broadcastRequest(const ClientRequest &, bool active)
fileCollector.now
now
Definition: fileCollector.py:207
Key
Definition: GoldenPattern.h:15
XrdAdaptor::RequestManager::m_lastSourceCheck
timespec m_lastSourceCheck
Definition: XrdRequestManager.h:217
XrdAdaptor::RequestManager::getPrettyActiveSourceNames
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
Definition: XrdRequestManager.cc:466
XrdAdaptor::RequestManager::pickSingleSource
std::shared_ptr< Source > pickSingleSource()
Definition: XrdRequestManager.cc:494
XrdAdaptor::RequestManager::OpenHandler::m_file
std::unique_ptr< XrdCl::File > m_file
Definition: XrdRequestManager.h:278
File
Definition: File.h:11
XrdAdaptor::RequestManager::compareSources
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
Definition: XrdRequestManager.cc:311
XrdAdaptor::RequestManager::reportSiteChange
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
Definition: XrdRequestManager.cc:278
XrdAdaptor::RequestManager::OpenHandler::m_self_weak
std::weak_ptr< OpenHandler > m_self_weak
Definition: XrdRequestManager.h:285
XrdAdaptor::RequestManager::OpenHandler::m_outstanding_open
std::atomic< bool > m_outstanding_open
Definition: XrdRequestManager.h:276
XrdAdaptor::RequestManager::initialize
void initialize(std::weak_ptr< RequestManager > selfref)
Definition: XrdRequestManager.cc:118
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
b
double b
Definition: hdecay.h:118
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
XrdAdaptor::RequestManager::getActiveSourceNames
void getActiveSourceNames(std::vector< std::string > &sources) const
Definition: XrdRequestManager.cc:458
XrdAdaptor::RequestManager::getFilename
const std::string & getFilename() const
Definition: XrdRequestManager.h:101
XrdAdaptor::RequestManager::m_name
const std::string m_name
Definition: XrdRequestManager.h:225
XrdAdaptor::RequestManager::searchMode
bool searchMode
Definition: XrdRequestManager.h:223
XrdAdaptor::RequestManager::m_nextInitialSourceToggle
bool m_nextInitialSourceToggle
Definition: XrdRequestManager.h:220
a
double a
Definition: hdecay.h:119
XrdAdaptor::RequestManager::splitClientRequest
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
Definition: XrdRequestManager.cc:889
XrdRequest.h
XrdAdaptor::RequestManager::handle
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
Definition: XrdRequestManager.h:54
Exception
IOOffset
int64_t IOOffset
Definition: IOTypes.h:19
XrdAdaptor::RequestManager::~RequestManager
virtual ~RequestManager()=default
XrdAdaptor::RequestManager::m_distribution
std::uniform_real_distribution< float > m_distribution
Definition: XrdRequestManager.h:231
XrdAdaptor::RequestManager::m_source_mutex
std::recursive_mutex m_source_mutex
Definition: XrdRequestManager.h:228
CalibrationSummaryClient_cfi.activeSources
activeSources
Definition: CalibrationSummaryClient_cfi.py:11
XrdAdaptor::RequestManager::checkSources
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
Definition: XrdRequestManager.cc:293
XrdAdaptor::SourceHash
Definition: XrdRequestManager.h:27
instance
static PFTauRenderPlugin instance
Definition: PFTauRenderPlugin.cc:70
XrdAdaptor::XrootdException::m_code
uint16_t m_code
Definition: XrdRequestManager.h:42
XrdCl
Definition: XrdRequestManager.h:21
XrdAdaptor::RequestManager::handleOpen
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
Definition: XrdRequestManager.cc:574
XrdAdaptor::RequestManager::OpenHandler::getInstance
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
Definition: XrdRequestManager.h:237
XrdAdaptor::SourceHash::operator()
size_t operator()(const Key &iKey) const
Definition: XrdRequestManager.h:29
XrdAdaptor::RequestManager::RequestManager
RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
Definition: XrdRequestManager.cc:108
XrdAdaptor::RequestManager::OpenHandler::m_self
std::shared_ptr< OpenHandler > m_self
Definition: XrdRequestManager.h:280
XrdAdaptor::RequestManager::OpenHandler::~OpenHandler
~OpenHandler() override
Definition: XrdRequestManager.cc:965
XrdAdaptor::RequestManager::prepareOpaqueString
std::string prepareOpaqueString() const
Definition: XrdRequestManager.cc:547
XrdAdaptor::XrootdException
Definition: XrdRequestManager.h:32
XrdAdaptor::RequestManager::m_disabledSourceStrings
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
Definition: XrdRequestManager.h:209
cms::Exception
Definition: Exception.h:70
XrdAdaptor::RequestManager::m_timeout
int m_timeout
Definition: XrdRequestManager.h:218
XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT
static const unsigned int XRD_DEFAULT_TIMEOUT
Definition: XrdRequestManager.h:47
XrdAdaptor::RequestManager::OpenHandler::current_source
std::string current_source()
Definition: XrdRequestManager.cc:1021
XrdAdaptor::RequestManager
Definition: XrdRequestManager.h:45
IOSize
size_t IOSize
Definition: IOTypes.h:14
XrdAdaptor::RequestManager::OpenHandler::m_manager
std::weak_ptr< RequestManager > m_manager
Definition: XrdRequestManager.h:286
HLT_2018_cff.flags
flags
Definition: HLT_2018_cff.py:11758
XrdAdaptor
Definition: QualityMetric.h:15
XrdAdaptor::RequestManager::OpenHandler::OpenHandler
OpenHandler(std::weak_ptr< RequestManager > manager)
Definition: XrdRequestManager.cc:961
XrdAdaptor::RequestManager::checkSourcesImpl
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
Definition: XrdRequestManager.cc:339
XrdAdaptor::RequestManager::requestFailure
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
Definition: XrdRequestManager.cc:713
XrdAdaptor::RequestManager::getInstance
static std::shared_ptr< RequestManager > getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
Definition: XrdRequestManager.h:110
XrdAdaptor::ClientRequest
Definition: XrdRequest.h:23
XrdAdaptor::RequestManager::OpenHandler::m_promise
std::promise< std::shared_ptr< Source > > m_promise
Definition: XrdRequestManager.h:272
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443