CMS 3D CMS Logo

XrdFile.cc
Go to the documentation of this file.
6 #include <vector>
7 #include <sstream>
8 #include <iostream>
9 #include <cassert>
10 #include <chrono>
11 
12 using namespace XrdAdaptor;
13 
14 // To be re-enabled when the monitoring interface is back.
15 //static const char *kCrabJobIdEnv = "CRAB_UNIQUE_JOB_ID";
16 
17 static constexpr int XRD_CL_MAX_CHUNK = 512 * 1024;
18 static constexpr int XRD_CL_MAX_SIZE = 1024;
19 
20 static constexpr int XRD_CL_MAX_READ_SIZE = (8 * 1024 * 1024);
21 
22 using namespace edm::storage;
23 
24 XrdFile::XrdFile() : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {}
25 
26 XrdFile::XrdFile(const char *name, int flags /* = IOFlags::OpenRead */, int perms /* = 066 */)
27  : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {
28  open(name, flags, perms);
29 }
30 
31 XrdFile::XrdFile(const std::string &name, int flags /* = IOFlags::OpenRead */, int perms /* = 066 */)
32  : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {
33  open(name.c_str(), flags, perms);
34 }
35 
37  if (m_close)
38  edm::LogError("XrdFileError") << "Destructor called on XROOTD file '" << m_name << "' but the file is still open";
39 }
40 
42 void XrdFile::create(const char *name, bool exclusive /* = false */, int perms /* = 066 */) {
43  open(name,
45  perms);
46 }
47 
48 void XrdFile::create(const std::string &name, bool exclusive /* = false */, int perms /* = 066 */) {
49  open(name.c_str(),
51  perms);
52 }
53 
54 void XrdFile::open(const std::string &name, int flags /* = IOFlags::OpenRead */, int perms /* = 066 */) {
55  open(name.c_str(), flags, perms);
56 }
57 
58 void XrdFile::open(const char *name, int flags /* = IOFlags::OpenRead */, int perms /* = 066 */) {
59  // Actual open
60  if ((name == nullptr) || (*name == 0)) {
62  ex << "Cannot open a file without a name";
63  ex.addContext("Calling XrdFile::open()");
64  throw ex;
65  }
66  if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) {
68  ex << "Must open file '" << name << "' at least for read or write";
69  ex.addContext("Calling XrdFile::open()");
70  throw ex;
71  }
72 
73  // Translate our flags to system flags
74  XrdCl::OpenFlags::Flags openflags = XrdCl::OpenFlags::None;
75 
77  openflags |= XrdCl::OpenFlags::Update;
78  else if (flags & IOFlags::OpenRead)
79  openflags |= XrdCl::OpenFlags::Read;
80 
81  if (flags & IOFlags::OpenAppend) {
83  ex << "Opening file '" << name << "' in append mode not supported";
84  ex.addContext("Calling XrdFile::open()");
85  throw ex;
86  }
87 
88  if (flags & IOFlags::OpenCreate) {
90  openflags |= XrdCl::OpenFlags::Delete;
91  openflags |= XrdCl::OpenFlags::New;
92  openflags |= XrdCl::OpenFlags::MakePath;
93  }
94 
96  openflags |= XrdCl::OpenFlags::Delete;
97 
98  // Translate mode flags
100  modeflags |= (perms & S_IRUSR) ? XrdCl::Access::UR : XrdCl::Access::None;
101  modeflags |= (perms & S_IWUSR) ? XrdCl::Access::UW : XrdCl::Access::None;
102  modeflags |= (perms & S_IXUSR) ? XrdCl::Access::UX : XrdCl::Access::None;
103  modeflags |= (perms & S_IRGRP) ? XrdCl::Access::GR : XrdCl::Access::None;
104  modeflags |= (perms & S_IWGRP) ? XrdCl::Access::GW : XrdCl::Access::None;
105  modeflags |= (perms & S_IXGRP) ? XrdCl::Access::GX : XrdCl::Access::None;
106  modeflags |= (perms & S_IROTH) ? XrdCl::Access::GR : XrdCl::Access::None;
107  modeflags |= (perms & S_IWOTH) ? XrdCl::Access::GW : XrdCl::Access::None;
108  modeflags |= (perms & S_IXOTH) ? XrdCl::Access::GX : XrdCl::Access::None;
109 
110  m_requestmanager = RequestManager::getInstance(name, openflags, modeflags);
111  m_name = name;
112 
113  // Stat the file so we can keep track of the offset better.
114  auto file = getActiveFile();
115  XrdCl::XRootDStatus status;
116  XrdCl::StatInfo *statInfo = nullptr;
117  if (!(status = file->Stat(false, statInfo)).IsOK()) {
119  ex << "XrdCl::File::Stat(name='" << name << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
120  << ", code=" << status.code << ")";
121  ex.addContext("Calling XrdFile::open()");
122  addConnection(ex);
123  throw ex;
124  }
125  assert(statInfo);
126  m_size = statInfo->GetSize();
127  delete (statInfo);
128 
129  m_offset = 0;
130  m_close = true;
131 
132  // Send the monitoring info, if available.
133  // Note: std::getenv is not reentrant.
134  // Commenting out until this is available in the new client.
135  /*
136  char * crabJobId = std::getenv(kCrabJobIdEnv);
137  if (crabJobId) {
138  kXR_unt32 dictId;
139  m_file->SendMonitoringInfo(crabJobId, &dictId);
140  edm::LogInfo("XrdFileInfo") << "Set monitoring ID to " << crabJobId << " with resulting dictId " << dictId << ".";
141  }
142 */
143 
144  edm::LogInfo("XrdFileInfo") << "Opened " << m_name;
145 
146  std::vector<std::string> sources;
147  m_requestmanager->getActiveSourceNames(sources);
148  std::stringstream ss;
149  ss << "Active sources: ";
150  for (auto const &it : sources)
151  ss << it << ", ";
152  edm::LogInfo("XrdFileInfo") << ss.str();
153 }
154 
156  if (!m_requestmanager.get()) {
157  edm::LogError("XrdFileError") << "XrdFile::close(name='" << m_name << "') called but the file is not open";
158  m_close = false;
159  return;
160  }
161 
162  m_requestmanager = nullptr; // propagate_const<T> has no reset() function
163 
164  m_close = false;
165  m_offset = 0;
166  m_size = -1;
167  edm::LogInfo("XrdFileInfo") << "Closed " << m_name;
168 }
169 
171  m_requestmanager = nullptr; // propagate_const<T> has no reset() function
172  m_close = false;
173  m_offset = 0;
174  m_size = -1;
175 }
176 
178 IOSize XrdFile::read(void *into, IOSize n) {
179  if (n > 0x7fffffff) {
181  ex << "XrdFile::read(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
182  ex.addContext("Calling XrdFile::read()");
183  addConnection(ex);
184  throw ex;
185  }
186 
187  uint32_t bytesRead = m_requestmanager->handle(into, n, m_offset).get();
188  m_offset += bytesRead;
189  return bytesRead;
190 }
191 
193  if (n > 0x7fffffff) {
195  ex << "XrdFile::read(name='" << m_name << "', n=" << n << ") exceeds read size limit 0x7fffffff";
196  ex.addContext("Calling XrdFile::read()");
197  addConnection(ex);
198  throw ex;
199  }
200  if (n == 0) {
201  return 0;
202  }
203 
204  // In some cases, the IO layers above us (particularly, if lazy-download is
205  // enabled) will emit very large reads. We break this up into multiple
206  // reads in order to avoid hitting timeouts.
207  std::future<IOSize> prev_future, cur_future;
208  IOSize bytesRead = 0, prev_future_expected = 0, cur_future_expected = 0;
209  bool readReturnedShort = false;
210 
211  // Check the status of a read operation; updates bytesRead and
212  // readReturnedShort.
213  auto check_read = [&](std::future<IOSize> &future, IOSize expected) {
214  if (!future.valid()) {
215  return;
216  }
217  IOSize result = future.get();
218  if (readReturnedShort && (result != 0)) {
220  ex << "XrdFile::read(name='" << m_name << "', n=" << n
221  << ") remote server returned non-zero length read after EOF.";
222  ex.addContext("Calling XrdFile::read()");
223  addConnection(ex);
224  throw ex;
225  } else if (result != expected) {
226  readReturnedShort = true;
227  }
228  bytesRead += result;
229  };
230 
231  while (n) {
232  IOSize chunk = std::min(n, static_cast<IOSize>(XRD_CL_MAX_READ_SIZE));
233 
234  // Save prior read state; issue new read.
235  prev_future = std::move(cur_future);
236  prev_future_expected = cur_future_expected;
237  cur_future = m_requestmanager->handle(into, chunk, pos);
238  cur_future_expected = chunk;
239 
240  // Wait for the prior read; update bytesRead.
241  check_read(prev_future, prev_future_expected);
242 
243  // Update counters.
244  into = static_cast<char *>(into) + chunk;
245  n -= chunk;
246  pos += chunk;
247  }
248 
249  // Wait for the last read to finish.
250  check_read(cur_future, cur_future_expected);
251 
252  return bytesRead;
253 }
254 
255 // This method is rarely used by CMS; hence, it is a small wrapper and not efficient.
257  std::vector<IOPosBuffer> new_buf;
258  new_buf.reserve(n);
259  IOOffset off = 0;
260  for (IOSize i = 0; i < n; i++) {
261  IOSize size = into[i].size();
262  new_buf[i] = IOPosBuffer(off, into[i].data(), size);
263  off += size;
264  }
265  return readv(&(new_buf[0]), n);
266 }
267 
268 /*
269  * A vectored scatter-gather read.
270  * Returns the total number of bytes successfully read.
271  */
273  // A trivial vector read - unlikely, considering ROOT data format.
274  if (UNLIKELY(n == 0)) {
275  return 0;
276  }
277  if (UNLIKELY(n == 1)) {
278  return read(into[0].data(), into[0].size(), into[0].offset());
279  }
280 
281  auto cl = std::make_shared<std::vector<IOPosBuffer>>();
282 
283  // CMSSW may issue large readv's; Xrootd is only able to handle
284  // 1024. Further, the splitting algorithm may slightly increase
285  // the number of buffers.
286  IOSize adjust = XRD_CL_MAX_SIZE - 2;
287  cl->reserve(n > adjust ? adjust : n);
288  IOSize idx = 0, last_idx = 0;
289  IOSize final_result = 0;
290  std::vector<std::pair<std::future<IOSize>, IOSize>> readv_futures;
291  while (idx < n) {
292  cl->clear();
293  IOSize size = 0;
294  while (idx < n) {
295  unsigned rollback_count = 1;
296  IOSize current_size = size;
297  IOOffset offset = into[idx].offset();
298  IOSize length = into[idx].size();
299  size += length;
300  char *buffer = static_cast<char *>(into[idx].data());
301  while (length > XRD_CL_MAX_CHUNK) {
302  IOPosBuffer ci;
304  length -= XRD_CL_MAX_CHUNK;
305  ci.set_offset(offset);
307  ci.set_data(buffer);
309  cl->emplace_back(ci);
310  rollback_count++;
311  }
312  IOPosBuffer ci;
313  ci.set_size(length);
314  ci.set_offset(offset);
315  ci.set_data(buffer);
316  cl->emplace_back(ci);
317 
318  if (cl->size() > adjust) {
319  while (rollback_count--)
320  cl->pop_back();
321  size = current_size;
322  break;
323  } else {
324  idx++;
325  }
326  }
327  try {
328  readv_futures.emplace_back(m_requestmanager->handle(cl), size);
329  } catch (edm::Exception &ex) {
330  ex.addContext("Calling XrdFile::readv()");
331  throw;
332  }
333 
334  // Assure that we have made some progress.
335  assert(last_idx < idx);
336  last_idx = idx;
337  }
338  std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
340 
341  // If there are multiple readv calls, wait until all return until looking
342  // at the results of any. This guarantees that all readv's have finished
343  // by time we call .get() for the first time (in case one of the readv's
344  // result in an exception).
345  //
346  // We cannot have outstanding readv's on function exit as the XrdCl may
347  // write into the corresponding buffer at the same time as ROOT.
348  if (readv_futures.size() > 1) {
349  for (auto &readv_result : readv_futures) {
350  if (readv_result.first.valid()) {
351  readv_result.first.wait();
352  }
353  }
354  }
355 
356  for (auto &readv_result : readv_futures) {
357  IOSize result = 0;
358  try {
359  const int retry_count = 5;
360  for (int retries = 0; retries < retry_count; retries++) {
361  try {
362  if (readv_result.first.valid()) {
363  result = readv_result.first.get();
364  }
365  } catch (XrootdException &ex) {
366  if ((retries != retry_count - 1) && (ex.getCode() == XrdCl::errInvalidResponse)) {
367  edm::LogWarning("XrdAdaptorInternal")
368  << "Got an invalid response from Xrootd server; retrying" << std::endl;
369  result = m_requestmanager->handle(cl).get();
370  } else {
371  throw;
372  }
373  }
374  assert(result == readv_result.second);
375  }
376  } catch (edm::Exception &ex) {
377  ex.addContext("Calling XrdFile::readv()");
378  throw;
379  } catch (std::exception &ex) {
381  newex << "A std::exception was thrown when processing an xrootd request: " << ex.what();
382  newex.addContext("Calling XrdFile::readv()");
383  throw newex;
384  }
385  final_result += result;
386  }
388 
389  edm::LogVerbatim("XrdAdaptorInternal")
390  << "[" << m_op_count.fetch_add(1) << "] Time for readv: "
391  << static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count())
392  << " (sub-readv requests: " << readv_futures.size() << ")" << std::endl;
393 
394  return final_result;
395 }
396 
397 IOSize XrdFile::write(const void *from, IOSize n) {
398  if (n > 0x7fffffff) {
400  ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
401  ex.addContext("Calling XrdFile::write()");
402  addConnection(ex);
403  throw ex;
404  }
405  auto file = getActiveFile();
406 
407  XrdCl::XRootDStatus s = file->Write(m_offset, n, from);
408  if (!s.IsOK()) {
410  ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") failed with error '" << s.ToStr()
411  << "' (errno=" << s.errNo << ", code=" << s.code << ")";
412  ex.addContext("Calling XrdFile::write()");
413  addConnection(ex);
414  throw ex;
415  }
416  m_offset += n;
417  assert(m_size != -1);
418  if (m_offset > m_size)
419  m_size = m_offset;
420 
421  return n;
422 }
423 
424 IOSize XrdFile::write(const void *from, IOSize n, IOOffset pos) {
425  if (n > 0x7fffffff) {
427  ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
428  ex.addContext("Calling XrdFile::write()");
429  addConnection(ex);
430  throw ex;
431  }
432  auto file = getActiveFile();
433 
434  XrdCl::XRootDStatus s = file->Write(pos, n, from);
435  if (!s.IsOK()) {
437  ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") failed with error '" << s.ToStr()
438  << "' (errno=" << s.errNo << ", code=" << s.code << ")";
439  ex.addContext("Calling XrdFile::write()");
440  addConnection(ex);
441  throw ex;
442  }
443  assert(m_size != -1);
444  if (static_cast<IOOffset>(pos + n) > m_size)
445  m_size = pos + n;
446 
447  return n;
448 }
449 
450 bool XrdFile::prefetch(const IOPosBuffer *what, IOSize n) {
451  // The new Xrootd client does not contain any internal buffers.
452  // Hence, prefetching is disabled completely.
453  return false;
454 }
455 
460  if (!m_requestmanager.get()) {
461  cms::Exception ex("FilePositionError");
462  ex << "XrdFile::position() called on a closed file";
463  ex.addContext("Calling XrdFile::position()");
464  addConnection(ex);
465  throw ex;
466  }
467  switch (whence) {
468  case SET:
469  m_offset = offset;
470  break;
471 
472  case CURRENT:
473  m_offset += offset;
474  break;
475 
476  // TODO: None of this works with concurrent writers to the file.
477  case END:
478  assert(m_size != -1);
479  m_offset = m_size + offset;
480  break;
481 
482  default:
483  cms::Exception ex("FilePositionError");
484  ex << "XrdFile::position() called with incorrect 'whence' parameter";
485  ex.addContext("Calling XrdFile::position()");
486  addConnection(ex);
487  throw ex;
488  }
489 
490  if (m_offset < 0)
491  m_offset = 0;
492  assert(m_size != -1);
493  if (m_offset > m_size)
494  m_size = m_offset;
495 
496  return m_offset;
497 }
498 
499 void XrdFile::resize(IOOffset /* size */) {
500  cms::Exception ex("FileResizeError");
501  ex << "XrdFile::resize(name='" << m_name << "') not implemented";
502  ex.addContext("Calling XrdFile::resize()");
503  addConnection(ex);
504  throw ex;
505 }
506 
507 std::shared_ptr<XrdCl::File> XrdFile::getActiveFile(void) {
508  if (!m_requestmanager.get()) {
509  cms::Exception ex("XrdFileLogicError");
510  ex << "Xrd::getActiveFile(name='" << m_name << "') no active request manager";
511  ex.addContext("Calling XrdFile::getActiveFile()");
512  m_requestmanager->addConnections(ex);
513  m_close = false;
514  throw ex;
515  }
516  return m_requestmanager->getActiveFile();
517 }
518 
520  if (m_requestmanager.get()) {
521  m_requestmanager->addConnections(ex);
522  }
523 }
Definition: start.py:1
Log< level::Info, true > LogVerbatim
virtual void create(const char *name, bool exclusive=false, int perms=0666)
Definition: XrdFile.cc:42
int64_t IOOffset
Definition: IOTypes.h:20
std::string m_name
Definition: XrdFile.h:63
std::atomic< unsigned int > m_op_count
Definition: XrdFile.h:64
bool prefetch(const IOPosBuffer *what, IOSize n) override
Definition: XrdFile.cc:450
std::shared_ptr< XrdCl::File > getActiveFile()
Definition: XrdFile.cc:507
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:56
Log< level::Error, false > LogError
assert(be >=bs)
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:50
IOOffset offset() const
Definition: IOPosBuffer.h:41
void resize(IOOffset size) override
Definition: XrdFile.cc:499
constexpr element_type const * get() const
IOSize write(const void *from, IOSize n) override
Definition: XrdFile.cc:397
virtual IOOffset size() const
Definition: Storage.cc:509
IOOffset m_offset
Definition: XrdFile.h:60
static constexpr int XRD_CL_MAX_CHUNK
Definition: XrdFile.cc:17
IOSize size() const
Definition: IOBuffer.h:36
void addConnection(cms::Exception &)
Definition: XrdFile.cc:519
Log< level::Info, false > LogInfo
void close(void) override
Definition: XrdFile.cc:155
size_t IOSize
Definition: IOTypes.h:15
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual void abort(void)
Definition: XrdFile.cc:170
IOSize readv(IOBuffer *into, IOSize n) override
Definition: XrdFile.cc:256
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
virtual void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)
Definition: XrdFile.cc:58
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:53
virtual IOOffset position() const
Definition: Storage.cc:504
#define UNLIKELY(x)
Definition: Likely.h:21
Log< level::Warning, false > LogWarning
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
Definition: XrdFile.h:59
static constexpr int XRD_CL_MAX_SIZE
Definition: XrdFile.cc:18
char const * what() const noexcept override
Definition: Exception.cc:103
static constexpr int XRD_CL_MAX_READ_SIZE
Definition: XrdFile.cc:20
~XrdFile(void) override
Definition: XrdFile.cc:36
def move(src, dest)
Definition: eostools.py:511