CMS 3D CMS Logo

XrdStorageMaker.cc
Go to the documentation of this file.
1 
6 
12 
13 #include "XrdCl/XrdClDefaultEnv.hh"
14 #include "XrdNet/XrdNetUtils.hh"
15 
16 #include <atomic>
17 #include <mutex>
18 
19 namespace {
20 
21  class PrepareHandler : public XrdCl::ResponseHandler {
22  public:
23  PrepareHandler(const XrdCl::URL &url) : m_fs(url) { m_fileList.push_back(url.GetPath()); }
24 
25  void callAsyncPrepare() {
26  auto status = m_fs.Prepare(m_fileList, XrdCl::PrepareFlags::Stage, 0, this);
27  if (!status.IsOK()) {
28  LogDebug("StageInError") << "XrdCl::FileSystem::Prepare submit failed with error '" << status.ToStr()
29  << "' (errNo = " << status.errNo << ")";
30  delete this;
31  }
32  }
33 
34  void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
35  // Note: Prepare call has a response object.
36  if (!status->IsOK()) {
37  LogDebug("StageInError") << "XrdCl::FileSystem::Prepare failed with error '" << status->ToStr()
38  << "' (errNo = " << status->errNo << ")";
39  }
40  delete response;
41  delete status;
42  delete this;
43  }
44 
45  private:
46  XrdCl::FileSystem m_fs;
47  std::vector<std::string> m_fileList;
48  };
49 
50 } // namespace
51 
52 class XrdStorageMaker final : public StorageMaker {
53 public:
54  static const unsigned int XRD_DEFAULT_TIMEOUT = 3*60;
55 
57  m_lastDebugLevel(1),//so that 0 will trigger change
58  m_lastTimeout(0)
59  {
60  // When CMSSW loads, both XrdCl and XrdClient end up being loaded
61  // (ROOT loads XrdClient). XrdClient forces IPv4-only. Accordingly,
62  // we must explicitly set the default network stack in XrdCl to
63  // whatever is available on the node (IPv4 or IPv6).
64  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
65  if (env)
66  {
67  env->PutString("NetworkStack", "IPAuto");
68  }
69  XrdNetUtils::SetAuto(XrdNetUtils::prefAuto);
70  setTimeout(XRD_DEFAULT_TIMEOUT);
71  setDebugLevel(0);
72  }
73 
76  std::unique_ptr<Storage> open (const std::string &proto,
77  const std::string &path,
78  int mode,
79  const AuxSettings& aux) const override
80  {
81  setDebugLevel(aux.debugLevel);
82  setTimeout(aux.timeout);
83 
85  StorageFactory::ReadHint readHint = f->readHint();
86  StorageFactory::CacheHint cacheHint = f->cacheHint();
87 
89  || cacheHint == StorageFactory::CACHE_HINT_STORAGE)
90  mode &= ~IOFlags::OpenUnbuffered;
91  else
93 
94  std::string fullpath(proto + ":" + path);
95  auto file = std::make_unique<XrdFile>(fullpath, mode);
96  return f->wrapNonLocalFile(std::move(file), proto, std::string(), mode);
97  }
98 
99  void stagein (const std::string &proto, const std::string &path,
100  const AuxSettings& aux) const override
101  {
102  setDebugLevel(aux.debugLevel);
103  setTimeout(aux.timeout);
104 
105  std::string fullpath(proto + ":" + path);
106  XrdCl::URL url(fullpath);
107 
108  auto prep_handler = new PrepareHandler(url);
109  prep_handler->callAsyncPrepare();
110  }
111 
112  bool check (const std::string &proto,
113  const std::string &path,
114  const AuxSettings& aux,
115  IOOffset *size = nullptr) const override
116  {
117  setDebugLevel(aux.debugLevel);
118  setTimeout(aux.timeout);
119 
120  std::string fullpath(proto + ":" + path);
121  XrdCl::URL url(fullpath);
122  XrdCl::FileSystem fs(url);
123 
124  XrdCl::StatInfo *stat;
125  if (!(fs.Stat(url.GetPath(), stat)).IsOK() || (stat == nullptr))
126  {
127  return false;
128  }
129 
130  if (size) *size = stat->GetSize();
131  return true;
132  }
133 
134  void setDebugLevel (unsigned int level) const
135  {
136  auto oldLevel = m_lastDebugLevel.load();
137  if(level == oldLevel) {
138  return;
139  }
140  std::lock_guard<std::mutex> guard(m_envMutex);
141  if(oldLevel != m_lastDebugLevel) {
142  //another thread just changed this value
143  return;
144  }
145 
146  // 'Error' is way too low of debug level - we have interest
147  // in warning in the default
148  switch (level)
149  {
150  case 0:
151  XrdCl::DefaultEnv::SetLogLevel("Warning");
152  break;
153  case 1:
154  XrdCl::DefaultEnv::SetLogLevel("Info");
155  break;
156  case 2:
157  XrdCl::DefaultEnv::SetLogLevel("Debug");
158  break;
159  case 3:
160  XrdCl::DefaultEnv::SetLogLevel("Dump");
161  break;
162  case 4:
163  XrdCl::DefaultEnv::SetLogLevel("Dump");
164  break;
165  default:
167  ex << "Invalid log level specified " << level;
168  ex.addContext("Calling XrdStorageMaker::setDebugLevel()");
169  throw ex;
170  }
171  m_lastDebugLevel = level;
172  }
173 
174  void setTimeout(unsigned int timeout) const
175  {
176  timeout = timeout ? timeout : XRD_DEFAULT_TIMEOUT;
177 
178  auto oldTimeout = m_lastTimeout.load();
179  if (oldTimeout == timeout) {
180  return;
181  }
182 
183  std::lock_guard<std::mutex> guard(m_envMutex);
184  if (oldTimeout != m_lastTimeout) {
185  //Another thread beat us to changing the value
186  return;
187  }
188 
189  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
190  if (env)
191  {
192  env->PutInt("StreamTimeout", timeout);
193  env->PutInt("RequestTimeout", timeout);
194  env->PutInt("ConnectionWindow", timeout);
195  env->PutInt("StreamErrorWindow", timeout);
196  // Crank down some of the connection defaults. We have more
197  // aggressive error recovery than the default client so we
198  // can error out sooner.
199  env->PutInt("ConnectionWindow", timeout/6+1);
200  env->PutInt("ConnectionRetry", 2);
201  }
202  m_lastTimeout = timeout;
203  }
204 
205 private:
207  mutable std::atomic<unsigned int> m_lastDebugLevel;
208  mutable std::atomic<unsigned int> m_lastTimeout;
209 };
210 
213 
#define LogDebug(id)
size
Write out results.
static boost::mutex mutex
Definition: Proxy.cc:11
CacheHint cacheHint(void) const
std::unique_ptr< Storage > wrapNonLocalFile(std::unique_ptr< Storage > s, const std::string &proto, const std::string &path, int mode) const
std::atomic< unsigned int > m_lastDebugLevel
bool check(const std::string &proto, const std::string &path, const AuxSettings &aux, IOOffset *size=0) const override
Definition: IOFlags.h:4
std::unique_ptr< Storage > open(const std::string &proto, const std::string &path, int mode, const AuxSettings &aux) const override
std::mutex m_envMutex
int timeout
Definition: mps_check.py:53
void stagein(const std::string &proto, const std::string &path, const AuxSettings &aux) const override
static const StorageFactory * get(void)
void setTimeout(unsigned int timeout) const
std::atomic< unsigned int > m_lastTimeout
double f[11][100]
#define DEFINE_FWK_SERVICE(type)
Definition: ServiceMaker.h:105
ReadHint readHint(void) const
void addContext(std::string const &context)
Definition: Exception.cc:165
int64_t IOOffset
Definition: IOTypes.h:19
#define DEFINE_EDM_PLUGIN(factory, type, name)
def move(src, dest)
Definition: eostools.py:511
void setDebugLevel(unsigned int level) const