CMS 3D CMS Logo

XrdRequestManager.h
Go to the documentation of this file.
1 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h
2 #define Utilities_XrdAdaptor_XrdRequestManager_h
3 
4 #include <mutex>
5 #include <vector>
6 #include <set>
7 #include <condition_variable>
8 #include <random>
9 #include <sys/stat.h>
10 
11 #include "tbb/concurrent_unordered_set.h"
12 
14 
15 #include "XrdCl/XrdClFileSystem.hh"
16 
17 #include "XrdRequest.h"
18 #include "XrdSource.h"
19 
20 namespace XrdCl {
21  class File;
22 }
23 
24 namespace XrdAdaptor {
25 
26  struct SourceHash {
27  using Key = std::shared_ptr<Source>;
28  size_t operator()(const Key &iKey) const { return std::hash<Key::element_type *>{}(iKey.get()); }
29  };
30 
32  public:
34  : Exception(code), m_code(xrootd_status.code) {}
35 
36  ~XrootdException() noexcept override{};
37 
38  uint16_t getCode() { return m_code; }
39 
40  private:
41  uint16_t m_code;
42  };
43 
45  public:
46  RequestManager(const RequestManager &) = delete;
47  RequestManager &operator=(const RequestManager &) = delete;
48 
49  static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60;
50 
51  virtual ~RequestManager() = default;
52 
56  std::future<IOSize> handle(void *into, IOSize size, IOOffset off) {
57  auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
58  return handle(c_ptr);
59  }
60 
61  std::future<IOSize> handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist);
62 
69  std::future<IOSize> handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
70 
74  void requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status);
75 
80  void getActiveSourceNames(std::vector<std::string> &sources) const;
81  void getPrettyActiveSourceNames(std::vector<std::string> &sources) const;
82 
87  void getDisabledSourceNames(std::vector<std::string> &sources) const;
88 
93  std::shared_ptr<XrdCl::File> getActiveFile() const;
94 
98  void addConnections(cms::Exception &) const;
99 
103  const std::string &getFilename() const { return m_name; }
104 
112  static std::shared_ptr<RequestManager> getInstance(const std::string &filename,
113  XrdCl::OpenFlags::Flags flags,
114  XrdCl::Access::Mode perms) {
115  std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
116  instance->initialize(instance);
117  return instance;
118  }
119 
120  private:
121  RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms);
122 
129  void initialize(std::weak_ptr<RequestManager> selfref);
130 
134  virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source>);
135 
139  void splitClientRequest(const std::vector<IOPosBuffer> &iolist,
140  std::vector<IOPosBuffer> &req1,
141  std::vector<IOPosBuffer> &req2,
142  std::vector<std::shared_ptr<Source>> const &activeSources) const;
143 
149  void broadcastRequest(const ClientRequest &, bool active);
150 
157  void checkSources(timespec &now,
158  IOSize requestSize,
159  std::vector<std::shared_ptr<Source>> &activeSources,
160  std::vector<std::shared_ptr<Source>> &inactiveSources); // TODO: inline
161  void checkSourcesImpl(timespec &now,
162  IOSize requestSize,
163  std::vector<std::shared_ptr<Source>> &activeSources,
164  std::vector<std::shared_ptr<Source>> &inactiveSources);
173  bool compareSources(const timespec &now,
174  unsigned a,
175  unsigned b,
176  std::vector<std::shared_ptr<Source>> &activeSources,
177  std::vector<std::shared_ptr<Source>> &inactiveSources) const;
178 
183  void reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
184  std::vector<std::shared_ptr<Source>> const &iNew,
185  std::string orig_site = std::string{}) const;
186 
190  inline void updateCurrentServer();
192 
196  std::shared_ptr<Source> pickSingleSource();
197 
203 
208  std::vector<std::shared_ptr<Source>> m_activeSources;
209  std::vector<std::shared_ptr<Source>> m_inactiveSources;
210 
211  tbb::concurrent_unordered_set<std::string> m_disabledSourceStrings;
212  tbb::concurrent_unordered_set<std::string> m_disabledExcludeStrings;
213  tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> m_disabledSources;
214 
215  // StatisticsSenderService wants to know what our current server is;
216  // this holds last-successfully-opened server name
217  std::atomic<std::string *> m_serverToAdvertise;
218 
221  // If set to true, the next active source should be 1; 0 otherwise.
223  // The time when the next active source check should be performed.
226 
228  XrdCl::OpenFlags::Flags m_flags;
230  mutable std::recursive_mutex m_source_mutex;
231 
232  std::mt19937 m_generator;
233  std::uniform_real_distribution<float> m_distribution;
234 
235  std::atomic<unsigned> m_excluded_active_count;
236 
237  class OpenHandler : public XrdCl::ResponseHandler {
238  public:
239  OpenHandler(const OpenHandler &) = delete;
240  OpenHandler &operator=(const OpenHandler &) = delete;
241 
242  static std::shared_ptr<OpenHandler> getInstance(std::weak_ptr<RequestManager> manager) {
243  OpenHandler *instance_ptr = new OpenHandler(manager);
244  std::shared_ptr<OpenHandler> instance(instance_ptr);
245  instance_ptr->m_self_weak = instance;
246  return instance;
247  }
248 
249  ~OpenHandler() override;
250 
254  void HandleResponseWithHosts(XrdCl::XRootDStatus *status,
255  XrdCl::AnyObject *response,
256  XrdCl::HostList *hostList) override;
257 
267  std::shared_future<std::shared_ptr<Source>> open();
268 
273 
274  private:
275  OpenHandler(std::weak_ptr<RequestManager> manager);
276  std::shared_future<std::shared_ptr<Source>> m_shared_future;
277  std::promise<std::shared_ptr<Source>> m_promise;
278  // Set to true only when there is an outstanding open request; not
279  // protected by m_mutex, so the caller is required to know it is in a
280  // thread-safe context.
281  std::atomic<bool> m_outstanding_open{false};
282  // Can only be touched when m_mutex is held.
283  std::unique_ptr<XrdCl::File> m_file;
284  std::recursive_mutex m_mutex;
285  std::shared_ptr<OpenHandler> m_self;
286 
287  // Always maintain a weak self-reference; when the open is in-progress,
288  // this is upgraded to a strong reference to prevent this object from
289  // deletion as long as XrdCl has not performed the callback.
290  std::weak_ptr<OpenHandler> m_self_weak;
291  std::weak_ptr<RequestManager> m_manager;
292  };
293 
294  std::shared_ptr<OpenHandler> m_open_handler;
295  };
296 
297 } // namespace XrdAdaptor
298 
299 #endif
XrdAdaptor::XrootdException::~XrootdException
~XrootdException() noexcept override
Definition: XrdRequestManager.h:36
XrdAdaptor::RequestManager::OpenHandler::m_mutex
std::recursive_mutex m_mutex
Definition: XrdRequestManager.h:284
XrdAdaptor::RequestManager::m_flags
XrdCl::OpenFlags::Flags m_flags
Definition: XrdRequestManager.h:228
XrdAdaptor::RequestManager::getActiveFile
std::shared_ptr< XrdCl::File > getActiveFile() const
Definition: XrdRequestManager.cc:452
XrdAdaptor::RequestManager::queueUpdateCurrentServer
void queueUpdateCurrentServer(const std::string &)
Definition: XrdRequestManager.cc:258
XrdAdaptor::RequestManager::m_generator
std::mt19937 m_generator
Definition: XrdRequestManager.h:232
HcalTopologyMode::Mode
Mode
Definition: HcalTopologyMode.h:26
XrdAdaptor::RequestManager::m_nextActiveSourceCheck
timespec m_nextActiveSourceCheck
Definition: XrdRequestManager.h:224
XrdAdaptor::RequestManager::addConnections
void addConnections(cms::Exception &) const
Definition: XrdRequestManager.cc:488
XrdAdaptor::RequestManager::m_disabledExcludeStrings
tbb::concurrent_unordered_set< std::string > m_disabledExcludeStrings
Definition: XrdRequestManager.h:212
submitPVValidationJobs.now
now
Definition: submitPVValidationJobs.py:639
mps_update.status
status
Definition: mps_update.py:69
XrdAdaptor::RequestManager::m_inactiveSources
std::vector< std::shared_ptr< Source > > m_inactiveSources
Definition: XrdRequestManager.h:209
XrdAdaptor::RequestManager::OpenHandler
Definition: XrdRequestManager.h:237
XrdAdaptor::RequestManager::m_open_handler
std::shared_ptr< OpenHandler > m_open_handler
Definition: XrdRequestManager.h:294
XrdAdaptor::RequestManager::m_excluded_active_count
std::atomic< unsigned > m_excluded_active_count
Definition: XrdRequestManager.h:235
XrdAdaptor::XrootdException::XrootdException
XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
Definition: XrdRequestManager.h:33
XrdAdaptor::XrootdException::getCode
uint16_t getCode()
Definition: XrdRequestManager.h:38
XrdAdaptor::RequestManager::getDisabledSourceNames
void getDisabledSourceNames(std::vector< std::string > &sources) const
Definition: XrdRequestManager.cc:481
XrdAdaptor::RequestManager::m_activeSources
std::vector< std::shared_ptr< Source > > m_activeSources
Definition: XrdRequestManager.h:208
XrdSource.h
XrdAdaptor::RequestManager::OpenHandler::open
std::shared_future< std::shared_ptr< Source > > open()
Definition: XrdRequestManager.cc:1042
XrdAdaptor::RequestManager::m_perms
XrdCl::Access::Mode m_perms
Definition: XrdRequestManager.h:229
edm::errors::ErrorCodes
ErrorCodes
Definition: EDMException.h:22
CalibrationSummaryClient_cfi.sources
sources
Definition: CalibrationSummaryClient_cfi.py:23
XrdAdaptor::RequestManager::m_disabledSources
tbb::concurrent_unordered_set< std::shared_ptr< Source >, SourceHash > m_disabledSources
Definition: XrdRequestManager.h:213
XrdAdaptor::RequestManager::m_serverToAdvertise
std::atomic< std::string * > m_serverToAdvertise
Definition: XrdRequestManager.h:217
btagGenBb_cfi.Status
Status
Definition: btagGenBb_cfi.py:4
XrdAdaptor::RequestManager::OpenHandler::m_shared_future
std::shared_future< std::shared_ptr< Source > > m_shared_future
Definition: XrdRequestManager.h:276
XrdAdaptor::RequestManager::updateCurrentServer
void updateCurrentServer()
Definition: XrdRequestManager.cc:241
XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts
void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
Definition: XrdRequestManager.cc:974
XrdAdaptor::RequestManager::RequestManager
RequestManager(const RequestManager &)=delete
edm::Exception
Definition: EDMException.h:77
EDMException.h
XrdAdaptor::RequestManager::broadcastRequest
void broadcastRequest(const ClientRequest &, bool active)
Key
Definition: GoldenPattern.h:15
XrdAdaptor::RequestManager::m_lastSourceCheck
timespec m_lastSourceCheck
Definition: XrdRequestManager.h:219
XrdAdaptor::RequestManager::getPrettyActiveSourceNames
void getPrettyActiveSourceNames(std::vector< std::string > &sources) const
Definition: XrdRequestManager.cc:473
XrdAdaptor::RequestManager::OpenHandler::operator=
OpenHandler & operator=(const OpenHandler &)=delete
XrdAdaptor::RequestManager::pickSingleSource
std::shared_ptr< Source > pickSingleSource()
Definition: XrdRequestManager.cc:501
XrdAdaptor::RequestManager::OpenHandler::m_file
std::unique_ptr< XrdCl::File > m_file
Definition: XrdRequestManager.h:283
File
Definition: File.h:11
XrdAdaptor::RequestManager::compareSources
bool compareSources(const timespec &now, unsigned a, unsigned b, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources) const
Definition: XrdRequestManager.cc:318
XrdAdaptor::RequestManager::reportSiteChange
void reportSiteChange(std::vector< std::shared_ptr< Source >> const &iOld, std::vector< std::shared_ptr< Source >> const &iNew, std::string orig_site=std::string{}) const
Definition: XrdRequestManager.cc:285
XrdAdaptor::RequestManager::OpenHandler::m_self_weak
std::weak_ptr< OpenHandler > m_self_weak
Definition: XrdRequestManager.h:290
XrdAdaptor::RequestManager::OpenHandler::m_outstanding_open
std::atomic< bool > m_outstanding_open
Definition: XrdRequestManager.h:281
XrdAdaptor::RequestManager::initialize
void initialize(std::weak_ptr< RequestManager > selfref)
Definition: XrdRequestManager.cc:125
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
b
double b
Definition: hdecay.h:118
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
XrdAdaptor::RequestManager::getActiveSourceNames
void getActiveSourceNames(std::vector< std::string > &sources) const
Definition: XrdRequestManager.cc:465
XrdAdaptor::RequestManager::getFilename
const std::string & getFilename() const
Definition: XrdRequestManager.h:103
XrdAdaptor::RequestManager::m_name
const std::string m_name
Definition: XrdRequestManager.h:227
XrdAdaptor::RequestManager::searchMode
bool searchMode
Definition: XrdRequestManager.h:225
XrdAdaptor::RequestManager::m_nextInitialSourceToggle
bool m_nextInitialSourceToggle
Definition: XrdRequestManager.h:222
a
double a
Definition: hdecay.h:119
XrdAdaptor::RequestManager::splitClientRequest
void splitClientRequest(const std::vector< IOPosBuffer > &iolist, std::vector< IOPosBuffer > &req1, std::vector< IOPosBuffer > &req2, std::vector< std::shared_ptr< Source >> const &activeSources) const
Definition: XrdRequestManager.cc:896
XrdRequest.h
XrdAdaptor::RequestManager::handle
std::future< IOSize > handle(void *into, IOSize size, IOOffset off)
Definition: XrdRequestManager.h:56
Exception
IOOffset
int64_t IOOffset
Definition: IOTypes.h:19
XrdAdaptor::RequestManager::~RequestManager
virtual ~RequestManager()=default
XrdAdaptor::RequestManager::m_distribution
std::uniform_real_distribution< float > m_distribution
Definition: XrdRequestManager.h:233
XrdAdaptor::RequestManager::m_source_mutex
std::recursive_mutex m_source_mutex
Definition: XrdRequestManager.h:230
CalibrationSummaryClient_cfi.activeSources
activeSources
Definition: CalibrationSummaryClient_cfi.py:11
trackerHitRTTI::vector
Definition: trackerHitRTTI.h:21
XrdAdaptor::RequestManager::checkSources
void checkSources(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
Definition: XrdRequestManager.cc:300
XrdAdaptor::RequestManager::OpenHandler::OpenHandler
OpenHandler(const OpenHandler &)=delete
XrdAdaptor::SourceHash
Definition: XrdRequestManager.h:26
instance
static PFTauRenderPlugin instance
Definition: PFTauRenderPlugin.cc:70
XrdAdaptor::XrootdException::m_code
uint16_t m_code
Definition: XrdRequestManager.h:41
XrdCl
Definition: XrdRequestManager.h:20
XrdAdaptor::RequestManager::handleOpen
virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr< Source >)
Definition: XrdRequestManager.cc:581
XrdAdaptor::RequestManager::OpenHandler::getInstance
static std::shared_ptr< OpenHandler > getInstance(std::weak_ptr< RequestManager > manager)
Definition: XrdRequestManager.h:242
XrdAdaptor::SourceHash::operator()
size_t operator()(const Key &iKey) const
Definition: XrdRequestManager.h:28
XrdAdaptor::RequestManager::OpenHandler::m_self
std::shared_ptr< OpenHandler > m_self
Definition: XrdRequestManager.h:285
XrdAdaptor::RequestManager::OpenHandler::~OpenHandler
~OpenHandler() override
Definition: XrdRequestManager.cc:972
XrdAdaptor::RequestManager::prepareOpaqueString
std::string prepareOpaqueString() const
Definition: XrdRequestManager.cc:554
XrdAdaptor::XrootdException
Definition: XrdRequestManager.h:31
XrdAdaptor::RequestManager::m_disabledSourceStrings
tbb::concurrent_unordered_set< std::string > m_disabledSourceStrings
Definition: XrdRequestManager.h:211
cms::Exception
Definition: Exception.h:70
XrdAdaptor::RequestManager::m_timeout
int m_timeout
Definition: XrdRequestManager.h:220
XrdAdaptor::RequestManager::XRD_DEFAULT_TIMEOUT
static const unsigned int XRD_DEFAULT_TIMEOUT
Definition: XrdRequestManager.h:49
XrdAdaptor::RequestManager::OpenHandler::current_source
std::string current_source()
Definition: XrdRequestManager.cc:1028
XrdAdaptor::RequestManager
Definition: XrdRequestManager.h:44
IOSize
size_t IOSize
Definition: IOTypes.h:14
HLT_FULL_cff.flags
flags
Definition: HLT_FULL_cff.py:13150
XrdAdaptor::RequestManager::OpenHandler::m_manager
std::weak_ptr< RequestManager > m_manager
Definition: XrdRequestManager.h:291
XrdAdaptor
Definition: QualityMetric.h:14
XrdAdaptor::RequestManager::checkSourcesImpl
void checkSourcesImpl(timespec &now, IOSize requestSize, std::vector< std::shared_ptr< Source >> &activeSources, std::vector< std::shared_ptr< Source >> &inactiveSources)
Definition: XrdRequestManager.cc:346
XrdAdaptor::RequestManager::requestFailure
void requestFailure(std::shared_ptr< XrdAdaptor::ClientRequest > c_ptr, XrdCl::Status &c_status)
Definition: XrdRequestManager.cc:720
XrdAdaptor::RequestManager::getInstance
static std::shared_ptr< RequestManager > getInstance(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
Definition: XrdRequestManager.h:112
XrdAdaptor::ClientRequest
Definition: XrdRequest.h:22
XrdAdaptor::RequestManager::operator=
RequestManager & operator=(const RequestManager &)=delete
XrdAdaptor::RequestManager::OpenHandler::m_promise
std::promise< std::shared_ptr< Source > > m_promise
Definition: XrdRequestManager.h:277
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443