CMS 3D CMS Logo

XrdFile.cc
Go to the documentation of this file.
6 #include <vector>
7 #include <sstream>
8 #include <iostream>
9 #include <cassert>
10 #include <chrono>
11 
12 using namespace XrdAdaptor;
13 
14 // To be re-enabled when the monitoring interface is back.
15 //static const char *kCrabJobIdEnv = "CRAB_UNIQUE_JOB_ID";
16 
17 #define XRD_CL_MAX_CHUNK 512 * 1024
18 #define XRD_CL_MAX_SIZE 1024
19 
20 #define XRD_CL_MAX_READ_SIZE (8 * 1024 * 1024)
21 
22 XrdFile::XrdFile(void) : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {}
23 
24 XrdFile::XrdFile(const char *name, int flags /* = IOFlags::OpenRead */, int perms /* = 066 */)
25  : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {
26  open(name, flags, perms);
27 }
28 
29 XrdFile::XrdFile(const std::string &name, int flags /* = IOFlags::OpenRead */, int perms /* = 066 */)
30  : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {
31  open(name.c_str(), flags, perms);
32 }
33 
35  if (m_close)
36  edm::LogError("XrdFileError") << "Destructor called on XROOTD file '" << m_name << "' but the file is still open";
37 }
38 
40 void XrdFile::create(const char *name, bool exclusive /* = false */, int perms /* = 066 */) {
41  open(name,
43  perms);
44 }
45 
46 void XrdFile::create(const std::string &name, bool exclusive /* = false */, int perms /* = 066 */) {
47  open(name.c_str(),
49  perms);
50 }
51 
52 void XrdFile::open(const std::string &name, int flags /* = IOFlags::OpenRead */, int perms /* = 066 */) {
53  open(name.c_str(), flags, perms);
54 }
55 
56 void XrdFile::open(const char *name, int flags /* = IOFlags::OpenRead */, int perms /* = 066 */) {
57  // Actual open
58  if ((name == nullptr) || (*name == 0)) {
60  ex << "Cannot open a file without a name";
61  ex.addContext("Calling XrdFile::open()");
62  throw ex;
63  }
64  if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) {
66  ex << "Must open file '" << name << "' at least for read or write";
67  ex.addContext("Calling XrdFile::open()");
68  throw ex;
69  }
70 
71  // Translate our flags to system flags
72  XrdCl::OpenFlags::Flags openflags = XrdCl::OpenFlags::None;
73 
74  if (flags & IOFlags::OpenWrite)
75  openflags |= XrdCl::OpenFlags::Update;
76  else if (flags & IOFlags::OpenRead)
77  openflags |= XrdCl::OpenFlags::Read;
78 
79  if (flags & IOFlags::OpenAppend) {
81  ex << "Opening file '" << name << "' in append mode not supported";
82  ex.addContext("Calling XrdFile::open()");
83  throw ex;
84  }
85 
86  if (flags & IOFlags::OpenCreate) {
87  if (!(flags & IOFlags::OpenExclusive))
88  openflags |= XrdCl::OpenFlags::Delete;
89  openflags |= XrdCl::OpenFlags::New;
90  openflags |= XrdCl::OpenFlags::MakePath;
91  }
92 
93  if ((flags & IOFlags::OpenTruncate) && (flags & IOFlags::OpenWrite))
94  openflags |= XrdCl::OpenFlags::Delete;
95 
96  // Translate mode flags
98  modeflags |= (perms & S_IRUSR) ? XrdCl::Access::UR : XrdCl::Access::None;
99  modeflags |= (perms & S_IWUSR) ? XrdCl::Access::UW : XrdCl::Access::None;
100  modeflags |= (perms & S_IXUSR) ? XrdCl::Access::UX : XrdCl::Access::None;
101  modeflags |= (perms & S_IRGRP) ? XrdCl::Access::GR : XrdCl::Access::None;
102  modeflags |= (perms & S_IWGRP) ? XrdCl::Access::GW : XrdCl::Access::None;
103  modeflags |= (perms & S_IXGRP) ? XrdCl::Access::GX : XrdCl::Access::None;
104  modeflags |= (perms & S_IROTH) ? XrdCl::Access::GR : XrdCl::Access::None;
105  modeflags |= (perms & S_IWOTH) ? XrdCl::Access::GW : XrdCl::Access::None;
106  modeflags |= (perms & S_IXOTH) ? XrdCl::Access::GX : XrdCl::Access::None;
107 
108  m_requestmanager = RequestManager::getInstance(name, openflags, modeflags);
109  m_name = name;
110 
111  // Stat the file so we can keep track of the offset better.
112  auto file = getActiveFile();
113  XrdCl::XRootDStatus status;
114  XrdCl::StatInfo *statInfo = nullptr;
115  if (!(status = file->Stat(false, statInfo)).IsOK()) {
117  ex << "XrdCl::File::Stat(name='" << name << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
118  << ", code=" << status.code << ")";
119  ex.addContext("Calling XrdFile::open()");
120  addConnection(ex);
121  throw ex;
122  }
123  assert(statInfo);
124  m_size = statInfo->GetSize();
125  delete (statInfo);
126 
127  m_offset = 0;
128  m_close = true;
129 
130  // Send the monitoring info, if available.
131  // Note: std::getenv is not reentrant.
132  // Commenting out until this is available in the new client.
133  /*
134  char * crabJobId = std::getenv(kCrabJobIdEnv);
135  if (crabJobId) {
136  kXR_unt32 dictId;
137  m_file->SendMonitoringInfo(crabJobId, &dictId);
138  edm::LogInfo("XrdFileInfo") << "Set monitoring ID to " << crabJobId << " with resulting dictId " << dictId << ".";
139  }
140 */
141 
142  edm::LogInfo("XrdFileInfo") << "Opened " << m_name;
143 
144  std::vector<std::string> sources;
145  m_requestmanager->getActiveSourceNames(sources);
146  std::stringstream ss;
147  ss << "Active sources: ";
148  for (auto const &it : sources)
149  ss << it << ", ";
150  edm::LogInfo("XrdFileInfo") << ss.str();
151 }
152 
153 void XrdFile::close(void) {
154  if (!m_requestmanager.get()) {
155  edm::LogError("XrdFileError") << "XrdFile::close(name='" << m_name << "') called but the file is not open";
156  m_close = false;
157  return;
158  }
159 
160  m_requestmanager = nullptr; // propagate_const<T> has no reset() function
161 
162  m_close = false;
163  m_offset = 0;
164  m_size = -1;
165  edm::LogInfo("XrdFileInfo") << "Closed " << m_name;
166 }
167 
168 void XrdFile::abort(void) {
169  m_requestmanager = nullptr; // propagate_const<T> has no reset() function
170  m_close = false;
171  m_offset = 0;
172  m_size = -1;
173 }
174 
176 IOSize XrdFile::read(void *into, IOSize n) {
177  if (n > 0x7fffffff) {
179  ex << "XrdFile::read(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
180  ex.addContext("Calling XrdFile::read()");
181  addConnection(ex);
182  throw ex;
183  }
184 
185  uint32_t bytesRead = m_requestmanager->handle(into, n, m_offset).get();
186  m_offset += bytesRead;
187  return bytesRead;
188 }
189 
191  if (n > 0x7fffffff) {
193  ex << "XrdFile::read(name='" << m_name << "', n=" << n << ") exceeds read size limit 0x7fffffff";
194  ex.addContext("Calling XrdFile::read()");
195  addConnection(ex);
196  throw ex;
197  }
198  if (n == 0) {
199  return 0;
200  }
201 
202  // In some cases, the IO layers above us (particularly, if lazy-download is
203  // enabled) will emit very large reads. We break this up into multiple
204  // reads in order to avoid hitting timeouts.
205  std::future<IOSize> prev_future, cur_future;
206  IOSize bytesRead = 0, prev_future_expected = 0, cur_future_expected = 0;
207  bool readReturnedShort = false;
208 
209  // Check the status of a read operation; updates bytesRead and
210  // readReturnedShort.
211  auto check_read = [&](std::future<IOSize> &future, IOSize expected) {
212  if (!future.valid()) {
213  return;
214  }
215  IOSize result = future.get();
216  if (readReturnedShort && (result != 0)) {
218  ex << "XrdFile::read(name='" << m_name << "', n=" << n
219  << ") remote server returned non-zero length read after EOF.";
220  ex.addContext("Calling XrdFile::read()");
221  addConnection(ex);
222  throw ex;
223  } else if (result != expected) {
224  readReturnedShort = true;
225  }
226  bytesRead += result;
227  };
228 
229  while (n) {
230  IOSize chunk = std::min(n, static_cast<IOSize>(XRD_CL_MAX_READ_SIZE));
231 
232  // Save prior read state; issue new read.
233  prev_future = std::move(cur_future);
234  prev_future_expected = cur_future_expected;
235  cur_future = m_requestmanager->handle(into, chunk, pos);
236  cur_future_expected = chunk;
237 
238  // Wait for the prior read; update bytesRead.
239  check_read(prev_future, prev_future_expected);
240 
241  // Update counters.
242  into = static_cast<char *>(into) + chunk;
243  n -= chunk;
244  pos += chunk;
245  }
246 
247  // Wait for the last read to finish.
248  check_read(cur_future, cur_future_expected);
249 
250  return bytesRead;
251 }
252 
253 // This method is rarely used by CMS; hence, it is a small wrapper and not efficient.
255  std::vector<IOPosBuffer> new_buf;
256  new_buf.reserve(n);
257  IOOffset off = 0;
258  for (IOSize i = 0; i < n; i++) {
259  IOSize size = into[i].size();
260  new_buf[i] = IOPosBuffer(off, into[i].data(), size);
261  off += size;
262  }
263  return readv(&(new_buf[0]), n);
264 }
265 
266 /*
267  * A vectored scatter-gather read.
268  * Returns the total number of bytes successfully read.
269  */
271  // A trivial vector read - unlikely, considering ROOT data format.
272  if (UNLIKELY(n == 0)) {
273  return 0;
274  }
275  if (UNLIKELY(n == 1)) {
276  return read(into[0].data(), into[0].size(), into[0].offset());
277  }
278 
279  auto cl = std::make_shared<std::vector<IOPosBuffer>>();
280 
281  // CMSSW may issue large readv's; Xrootd is only able to handle
282  // 1024. Further, the splitting algorithm may slightly increase
283  // the number of buffers.
284  IOSize adjust = XRD_CL_MAX_SIZE - 2;
285  cl->reserve(n > adjust ? adjust : n);
286  IOSize idx = 0, last_idx = 0;
287  IOSize final_result = 0;
288  std::vector<std::pair<std::future<IOSize>, IOSize>> readv_futures;
289  while (idx < n) {
290  cl->clear();
291  IOSize size = 0;
292  while (idx < n) {
293  unsigned rollback_count = 1;
294  IOSize current_size = size;
295  IOOffset offset = into[idx].offset();
296  IOSize length = into[idx].size();
297  size += length;
298  char *buffer = static_cast<char *>(into[idx].data());
299  while (length > XRD_CL_MAX_CHUNK) {
300  IOPosBuffer ci;
302  length -= XRD_CL_MAX_CHUNK;
303  ci.set_offset(offset);
304  offset += XRD_CL_MAX_CHUNK;
305  ci.set_data(buffer);
306  buffer += XRD_CL_MAX_CHUNK;
307  cl->emplace_back(ci);
308  rollback_count++;
309  }
310  IOPosBuffer ci;
311  ci.set_size(length);
312  ci.set_offset(offset);
313  ci.set_data(buffer);
314  cl->emplace_back(ci);
315 
316  if (cl->size() > adjust) {
317  while (rollback_count--)
318  cl->pop_back();
319  size = current_size;
320  break;
321  } else {
322  idx++;
323  }
324  }
325  try {
326  readv_futures.emplace_back(m_requestmanager->handle(cl), size);
327  } catch (edm::Exception &ex) {
328  ex.addContext("Calling XrdFile::readv()");
329  throw;
330  }
331 
332  // Assure that we have made some progress.
333  assert(last_idx < idx);
334  last_idx = idx;
335  }
336  std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
338 
339  // If there are multiple readv calls, wait until all return until looking
340  // at the results of any. This guarantees that all readv's have finished
341  // by time we call .get() for the first time (in case one of the readv's
342  // result in an exception).
343  //
344  // We cannot have outstanding readv's on function exit as the XrdCl may
345  // write into the corresponding buffer at the same time as ROOT.
346  if (readv_futures.size() > 1) {
347  for (auto &readv_result : readv_futures) {
348  if (readv_result.first.valid()) {
349  readv_result.first.wait();
350  }
351  }
352  }
353 
354  for (auto &readv_result : readv_futures) {
355  IOSize result = 0;
356  try {
357  const int retry_count = 5;
358  for (int retries = 0; retries < retry_count; retries++) {
359  try {
360  if (readv_result.first.valid()) {
361  result = readv_result.first.get();
362  }
363  } catch (XrootdException &ex) {
364  if ((retries != retry_count - 1) && (ex.getCode() == XrdCl::errInvalidResponse)) {
365  edm::LogWarning("XrdAdaptorInternal")
366  << "Got an invalid response from Xrootd server; retrying" << std::endl;
367  result = m_requestmanager->handle(cl).get();
368  } else {
369  throw;
370  }
371  }
372  assert(result == readv_result.second);
373  }
374  } catch (edm::Exception &ex) {
375  ex.addContext("Calling XrdFile::readv()");
376  throw;
377  } catch (std::exception &ex) {
379  newex << "A std::exception was thrown when processing an xrootd request: " << ex.what();
380  newex.addContext("Calling XrdFile::readv()");
381  throw newex;
382  }
383  final_result += result;
384  }
386 
387  edm::LogVerbatim("XrdAdaptorInternal")
388  << "[" << m_op_count.fetch_add(1) << "] Time for readv: "
389  << static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count())
390  << " (sub-readv requests: " << readv_futures.size() << ")" << std::endl;
391 
392  return final_result;
393 }
394 
395 IOSize XrdFile::write(const void *from, IOSize n) {
396  if (n > 0x7fffffff) {
398  ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
399  ex.addContext("Calling XrdFile::write()");
400  addConnection(ex);
401  throw ex;
402  }
403  auto file = getActiveFile();
404 
405  XrdCl::XRootDStatus s = file->Write(m_offset, n, from);
406  if (!s.IsOK()) {
408  ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") failed with error '" << s.ToStr()
409  << "' (errno=" << s.errNo << ", code=" << s.code << ")";
410  ex.addContext("Calling XrdFile::write()");
411  addConnection(ex);
412  throw ex;
413  }
414  m_offset += n;
415  assert(m_size != -1);
416  if (m_offset > m_size)
417  m_size = m_offset;
418 
419  return n;
420 }
421 
422 IOSize XrdFile::write(const void *from, IOSize n, IOOffset pos) {
423  if (n > 0x7fffffff) {
425  ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
426  ex.addContext("Calling XrdFile::write()");
427  addConnection(ex);
428  throw ex;
429  }
430  auto file = getActiveFile();
431 
432  XrdCl::XRootDStatus s = file->Write(pos, n, from);
433  if (!s.IsOK()) {
435  ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") failed with error '" << s.ToStr()
436  << "' (errno=" << s.errNo << ", code=" << s.code << ")";
437  ex.addContext("Calling XrdFile::write()");
438  addConnection(ex);
439  throw ex;
440  }
441  assert(m_size != -1);
442  if (static_cast<IOOffset>(pos + n) > m_size)
443  m_size = pos + n;
444 
445  return n;
446 }
447 
448 bool XrdFile::prefetch(const IOPosBuffer *what, IOSize n) {
449  // The new Xrootd client does not contain any internal buffers.
450  // Hence, prefetching is disabled completely.
451  return false;
452 }
453 
458  if (!m_requestmanager.get()) {
459  cms::Exception ex("FilePositionError");
460  ex << "XrdFile::position() called on a closed file";
461  ex.addContext("Calling XrdFile::position()");
462  addConnection(ex);
463  throw ex;
464  }
465  switch (whence) {
466  case SET:
467  m_offset = offset;
468  break;
469 
470  case CURRENT:
471  m_offset += offset;
472  break;
473 
474  // TODO: None of this works with concurrent writers to the file.
475  case END:
476  assert(m_size != -1);
477  m_offset = m_size + offset;
478  break;
479 
480  default:
481  cms::Exception ex("FilePositionError");
482  ex << "XrdFile::position() called with incorrect 'whence' parameter";
483  ex.addContext("Calling XrdFile::position()");
484  addConnection(ex);
485  throw ex;
486  }
487 
488  if (m_offset < 0)
489  m_offset = 0;
490  assert(m_size != -1);
491  if (m_offset > m_size)
492  m_size = m_offset;
493 
494  return m_offset;
495 }
496 
497 void XrdFile::resize(IOOffset /* size */) {
498  cms::Exception ex("FileResizeError");
499  ex << "XrdFile::resize(name='" << m_name << "') not implemented";
500  ex.addContext("Calling XrdFile::resize()");
501  addConnection(ex);
502  throw ex;
503 }
504 
505 std::shared_ptr<XrdCl::File> XrdFile::getActiveFile(void) {
506  if (!m_requestmanager.get()) {
507  cms::Exception ex("XrdFileLogicError");
508  ex << "Xrd::getActiveFile(name='" << m_name << "') no active request manager";
509  ex.addContext("Calling XrdFile::getActiveFile()");
510  m_requestmanager->addConnections(ex);
511  m_close = false;
512  throw ex;
513  }
514  return m_requestmanager->getActiveFile();
515 }
516 
518  if (m_requestmanager.get()) {
519  m_requestmanager->addConnections(ex);
520  }
521 }
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
Definition: XrdFile.h:58
#define XRD_CL_MAX_READ_SIZE
Definition: XrdFile.cc:20
IOSize write(const void *from, IOSize n) override
Definition: XrdFile.cc:395
virtual void create(const char *name, bool exclusive=false, int perms=0666)
Definition: XrdFile.cc:40
char const * what() const override
Definition: Exception.cc:103
void close(void) override
Definition: XrdFile.cc:153
bool m_close
Definition: XrdFile.h:61
IOSize readv(IOBuffer *into, IOSize n) override
Definition: XrdFile.cc:254
IOOffset m_offset
Definition: XrdFile.h:59
void resize(IOOffset size) override
Definition: XrdFile.cc:497
Relative
Definition: Storage.h:22
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:51
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:54
XrdFile(void)
Definition: XrdFile.cc:22
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:48
virtual IOOffset position(void) const
Definition: Storage.cc:72
#define XRD_CL_MAX_CHUNK
Definition: XrdFile.cc:17
std::atomic< unsigned int > m_op_count
Definition: XrdFile.h:63
bool prefetch(const IOPosBuffer *what, IOSize n) override
Definition: XrdFile.cc:448
#define end
Definition: vmac.h:39
T min(T a, T b)
Definition: MathUtil.h:58
int read(void)
Definition: IOInput.cc:52
std::shared_ptr< XrdCl::File > getActiveFile()
Definition: XrdFile.cc:505
element_type const * get() const
IOOffset m_size
Definition: XrdFile.h:60
IOOffset offset(void) const
Definition: IOPosBuffer.h:39
void * data(void) const
Definition: IOPosBuffer.h:42
IOSize size(void) const
Definition: IOBuffer.h:34
std::string m_name
Definition: XrdFile.h:62
IOSize size(void) const
Definition: IOPosBuffer.h:45
virtual IOOffset size(void) const
Definition: Storage.cc:77
~XrdFile(void) override
Definition: XrdFile.cc:34
void addContext(std::string const &context)
Definition: Exception.cc:165
int64_t IOOffset
Definition: IOTypes.h:19
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
size_t IOSize
Definition: IOTypes.h:14
#define UNLIKELY(x)
Definition: Likely.h:21
void addConnection(cms::Exception &)
Definition: XrdFile.cc:517
virtual void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)
Definition: XrdFile.cc:56
def move(src, dest)
Definition: eostools.py:511
virtual void abort(void)
Definition: XrdFile.cc:168
#define XRD_CL_MAX_SIZE
Definition: XrdFile.cc:18