CMS 3D CMS Logo

XrdFile.cc
Go to the documentation of this file.
6 #include <vector>
7 #include <sstream>
8 #include <iostream>
9 #include <cassert>
10 #include <chrono>
11 
12 using namespace XrdAdaptor;
13 
14 // To be re-enabled when the monitoring interface is back.
15 //static const char *kCrabJobIdEnv = "CRAB_UNIQUE_JOB_ID";
16 
17 #define XRD_CL_MAX_CHUNK 512*1024
18 #define XRD_CL_MAX_SIZE 1024
19 
20 #define XRD_CL_MAX_READ_SIZE (8*1024*1024)
21 
23  : m_offset (0),
24  m_size(-1),
25  m_close (false),
26  m_name(),
27  m_op_count(0)
28 {
29 }
30 
31 XrdFile::XrdFile (const char *name,
32  int flags /* = IOFlags::OpenRead */,
33  int perms /* = 066 */)
34  : m_offset (0),
35  m_size(-1),
36  m_close (false),
37  m_name(),
38  m_op_count(0)
39 {
40  open (name, flags, perms);
41 }
42 
44  int flags /* = IOFlags::OpenRead */,
45  int perms /* = 066 */)
46  : m_offset (0),
47  m_size(-1),
48  m_close (false),
49  m_name(),
50  m_op_count(0)
51 {
52  open (name.c_str (), flags, perms);
53 }
54 
56 {
57  if (m_close)
58  edm::LogError("XrdFileError")
59  << "Destructor called on XROOTD file '" << m_name
60  << "' but the file is still open";
61 }
62 
64 void
65 XrdFile::create (const char *name,
66  bool exclusive /* = false */,
67  int perms /* = 066 */)
68 {
69  open (name,
71  | (exclusive ? IOFlags::OpenExclusive : 0)),
72  perms);
73 }
74 
75 void
77  bool exclusive /* = false */,
78  int perms /* = 066 */)
79 {
80  open (name.c_str (),
82  | (exclusive ? IOFlags::OpenExclusive : 0)),
83  perms);
84 }
85 
86 void
88  int flags /* = IOFlags::OpenRead */,
89  int perms /* = 066 */)
90 { open (name.c_str (), flags, perms); }
91 
92 void
93 XrdFile::open (const char *name,
94  int flags /* = IOFlags::OpenRead */,
95  int perms /* = 066 */)
96 {
97  // Actual open
98  if ((name == nullptr) || (*name == 0)) {
100  ex << "Cannot open a file without a name";
101  ex.addContext("Calling XrdFile::open()");
102  throw ex;
103  }
104  if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) {
106  ex << "Must open file '" << name << "' at least for read or write";
107  ex.addContext("Calling XrdFile::open()");
108  throw ex;
109  }
110 
111  // Translate our flags to system flags
112  XrdCl::OpenFlags::Flags openflags = XrdCl::OpenFlags::None;
113 
114  if (flags & IOFlags::OpenWrite)
115  openflags |= XrdCl::OpenFlags::Update;
116  else if (flags & IOFlags::OpenRead)
117  openflags |= XrdCl::OpenFlags::Read;
118 
119  if (flags & IOFlags::OpenAppend) {
121  ex << "Opening file '" << name << "' in append mode not supported";
122  ex.addContext("Calling XrdFile::open()");
123  throw ex;
124  }
125 
126  if (flags & IOFlags::OpenCreate)
127  {
128  if (! (flags & IOFlags::OpenExclusive))
129  openflags |= XrdCl::OpenFlags::Delete;
130  openflags |= XrdCl::OpenFlags::New;
131  openflags |= XrdCl::OpenFlags::MakePath;
132  }
133 
134  if ((flags & IOFlags::OpenTruncate) && (flags & IOFlags::OpenWrite))
135  openflags |= XrdCl::OpenFlags::Delete;
136 
137  // Translate mode flags
139  modeflags |= (perms & S_IRUSR) ? XrdCl::Access::UR : XrdCl::Access::None;
140  modeflags |= (perms & S_IWUSR) ? XrdCl::Access::UW : XrdCl::Access::None;
141  modeflags |= (perms & S_IXUSR) ? XrdCl::Access::UX : XrdCl::Access::None;
142  modeflags |= (perms & S_IRGRP) ? XrdCl::Access::GR : XrdCl::Access::None;
143  modeflags |= (perms & S_IWGRP) ? XrdCl::Access::GW : XrdCl::Access::None;
144  modeflags |= (perms & S_IXGRP) ? XrdCl::Access::GX : XrdCl::Access::None;
145  modeflags |= (perms & S_IROTH) ? XrdCl::Access::GR : XrdCl::Access::None;
146  modeflags |= (perms & S_IWOTH) ? XrdCl::Access::GW : XrdCl::Access::None;
147  modeflags |= (perms & S_IXOTH) ? XrdCl::Access::GX : XrdCl::Access::None;
148 
149  m_requestmanager = RequestManager::getInstance(name, openflags, modeflags);
150  m_name = name;
151 
152  // Stat the file so we can keep track of the offset better.
153  auto file = getActiveFile();
154  XrdCl::XRootDStatus status;
155  XrdCl::StatInfo *statInfo = nullptr;
156  if (! (status = file->Stat(false, statInfo)).IsOK()) {
158  ex << "XrdCl::File::Stat(name='" << name
159  << ") => error '" << status.ToStr()
160  << "' (errno=" << status.errNo << ", code=" << status.code << ")";
161  ex.addContext("Calling XrdFile::open()");
162  addConnection(ex);
163  throw ex;
164  }
165  assert(statInfo);
166  m_size = statInfo->GetSize();
167  delete(statInfo);
168 
169  m_offset = 0;
170  m_close = true;
171 
172  // Send the monitoring info, if available.
173  // Note: getenv is not reentrant.
174  // Commenting out until this is available in the new client.
175 /*
176  char * crabJobId = getenv(kCrabJobIdEnv);
177  if (crabJobId) {
178  kXR_unt32 dictId;
179  m_file->SendMonitoringInfo(crabJobId, &dictId);
180  edm::LogInfo("XrdFileInfo") << "Set monitoring ID to " << crabJobId << " with resulting dictId " << dictId << ".";
181  }
182 */
183 
184  edm::LogInfo("XrdFileInfo") << "Opened " << m_name;
185 
186  std::vector<std::string> sources;
187  m_requestmanager->getActiveSourceNames(sources);
188  std::stringstream ss;
189  ss << "Active sources: ";
190  for (auto const& it : sources)
191  ss << it << ", ";
192  edm::LogInfo("XrdFileInfo") << ss.str();
193 }
194 
195 void
197 {
198  if (! m_requestmanager.get())
199  {
200  edm::LogError("XrdFileError")
201  << "XrdFile::close(name='" << m_name
202  << "') called but the file is not open";
203  m_close = false;
204  return;
205  }
206 
207  m_requestmanager = nullptr; // propagate_const<T> has no reset() function
208 
209  m_close = false;
210  m_offset = 0;
211  m_size = -1;
212  edm::LogInfo("XrdFileInfo") << "Closed " << m_name;
213 }
214 
215 void
217 {
218  m_requestmanager = nullptr; // propagate_const<T> has no reset() function
219  m_close = false;
220  m_offset = 0;
221  m_size = -1;
222 }
223 
225 IOSize
226 XrdFile::read (void *into, IOSize n)
227 {
228  if (n > 0x7fffffff) {
230  ex << "XrdFile::read(name='" << m_name << "', n=" << n
231  << ") too many bytes, limit is 0x7fffffff";
232  ex.addContext("Calling XrdFile::read()");
233  addConnection(ex);
234  throw ex;
235  }
236 
237  uint32_t bytesRead = m_requestmanager->handle(into, n, m_offset).get();
238  m_offset += bytesRead;
239  return bytesRead;
240 }
241 
242 IOSize
244 {
245  if (n > 0x7fffffff) {
247  ex << "XrdFile::read(name='" << m_name << "', n=" << n
248  << ") exceeds read size limit 0x7fffffff";
249  ex.addContext("Calling XrdFile::read()");
250  addConnection(ex);
251  throw ex;
252  }
253  if (n == 0) {
254  return 0;
255  }
256 
257  // In some cases, the IO layers above us (particularly, if lazy-download is
258  // enabled) will emit very large reads. We break this up into multiple
259  // reads in order to avoid hitting timeouts.
260  std::future<IOSize> prev_future, cur_future;
261  IOSize bytesRead = 0, prev_future_expected = 0, cur_future_expected = 0;
262  bool readReturnedShort = false;
263 
264  // Check the status of a read operation; updates bytesRead and
265  // readReturnedShort.
266  auto check_read = [&](std::future<IOSize> &future, IOSize expected) {
267  if (!future.valid()) {
268  return;
269  }
270  IOSize result = future.get();
271  if (readReturnedShort && (result != 0)) {
273  ex << "XrdFile::read(name='" << m_name << "', n=" << n
274  << ") remote server returned non-zero length read after EOF.";
275  ex.addContext("Calling XrdFile::read()");
276  addConnection(ex);
277  throw ex;
278  } else if (result != expected) {
279  readReturnedShort = true;
280  }
281  bytesRead += result;
282  };
283 
284  while (n) {
285  IOSize chunk = std::min(n, static_cast<IOSize>(XRD_CL_MAX_READ_SIZE));
286 
287  // Save prior read state; issue new read.
288  prev_future = std::move(cur_future);
289  prev_future_expected = cur_future_expected;
290  cur_future = m_requestmanager->handle(into, chunk, pos);
291  cur_future_expected = chunk;
292 
293  // Wait for the prior read; update bytesRead.
294  check_read(prev_future, prev_future_expected);
295 
296  // Update counters.
297  into = static_cast<char*>(into) + chunk;
298  n -= chunk;
299  pos += chunk;
300  }
301 
302  // Wait for the last read to finish.
303  check_read(cur_future, cur_future_expected);
304 
305  return bytesRead;
306 }
307 
308 // This method is rarely used by CMS; hence, it is a small wrapper and not efficient.
309 IOSize
311 {
312  std::vector<IOPosBuffer> new_buf;
313  new_buf.reserve(n);
314  IOOffset off = 0;
315  for (IOSize i=0; i<n; i++) {
316  IOSize size = into[i].size();
317  new_buf[i] = IOPosBuffer(off, into[i].data(), size);
318  off += size;
319  }
320  return readv(&(new_buf[0]), n);
321 }
322 
323 /*
324  * A vectored scatter-gather read.
325  * Returns the total number of bytes successfully read.
326  */
327 IOSize
329 {
330  // A trivial vector read - unlikely, considering ROOT data format.
331  if (unlikely(n == 0)) {
332  return 0;
333  }
334  if (unlikely(n == 1)) {
335  return read(into[0].data(), into[0].size(), into[0].offset());
336  }
337 
338  auto cl = std::make_shared<std::vector<IOPosBuffer>>();
339 
340  // CMSSW may issue large readv's; Xrootd is only able to handle
341  // 1024. Further, the splitting algorithm may slightly increase
342  // the number of buffers.
343  IOSize adjust = XRD_CL_MAX_SIZE - 2;
344  cl->reserve(n > adjust ? adjust : n);
345  IOSize idx = 0, last_idx = 0;
346  IOSize final_result = 0;
347  std::vector<std::pair<std::future<IOSize>, IOSize>> readv_futures;
348  while (idx < n)
349  {
350  cl->clear();
351  IOSize size = 0;
352  while (idx < n) {
353  unsigned rollback_count = 1;
354  IOSize current_size = size;
355  IOOffset offset = into[idx].offset();
356  IOSize length = into[idx].size();
357  size += length;
358  char * buffer = static_cast<char *>(into[idx].data());
359  while (length > XRD_CL_MAX_CHUNK) {
360  IOPosBuffer ci;
362  length -= XRD_CL_MAX_CHUNK;
363  ci.set_offset(offset);
364  offset += XRD_CL_MAX_CHUNK;
365  ci.set_data(buffer);
366  buffer += XRD_CL_MAX_CHUNK;
367  cl->emplace_back(ci);
368  rollback_count ++;
369  }
370  IOPosBuffer ci;
371  ci.set_size(length);
372  ci.set_offset(offset);
373  ci.set_data(buffer);
374  cl->emplace_back(ci);
375 
376  if (cl->size() > adjust)
377  {
378  while (rollback_count--) cl->pop_back();
379  size = current_size;
380  break;
381  }
382  else
383  {
384  idx++;
385  }
386  }
387  try
388  {
389  readv_futures.emplace_back(m_requestmanager->handle(cl), size);
390  }
391  catch (edm::Exception& ex)
392  {
393  ex.addContext("Calling XrdFile::readv()");
394  throw;
395  }
396 
397  // Assure that we have made some progress.
398  assert(last_idx < idx);
399  last_idx = idx;
400 
401  }
402  std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
404 
405  // If there are multiple readv calls, wait until all return until looking
406  // at the results of any. This guarantees that all readv's have finished
407  // by time we call .get() for the first time (in case one of the readv's
408  // result in an exception).
409  //
410  // We cannot have outstanding readv's on function exit as the XrdCl may
411  // write into the corresponding buffer at the same time as ROOT.
412  if (readv_futures.size() > 1)
413  {
414  for (auto & readv_result : readv_futures)
415  {
416  if (readv_result.first.valid())
417  {
418  readv_result.first.wait();
419  }
420  }
421  }
422 
423  for (auto & readv_result : readv_futures)
424  {
425  IOSize result = 0;
426  try
427  {
428  const int retry_count = 5;
429  for (int retries=0; retries<retry_count; retries++)
430  {
431  try
432  {
433  if (readv_result.first.valid())
434  {
435  result = readv_result.first.get();
436  }
437  }
438  catch (XrootdException& ex)
439  {
440  if ((retries != retry_count-1) && (ex.getCode() == XrdCl::errInvalidResponse))
441  {
442  edm::LogWarning("XrdAdaptorInternal") << "Got an invalid response from Xrootd server; retrying" << std::endl;
443  result = m_requestmanager->handle(cl).get();
444  }
445  else
446  {
447  throw;
448  }
449  }
450  assert(result == readv_result.second);
451  }
452  }
453  catch (edm::Exception& ex)
454  {
455  ex.addContext("Calling XrdFile::readv()");
456  throw;
457  }
458  catch (std::exception& ex)
459  {
461  newex << "A std::exception was thrown when processing an xrootd request: " << ex.what();
462  newex.addContext("Calling XrdFile::readv()");
463  throw newex;
464  }
465  final_result += result;
466  }
468 
469  edm::LogVerbatim("XrdAdaptorInternal") << "[" << m_op_count.fetch_add(1) << "] Time for readv: " << static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()) << " (sub-readv requests: " << readv_futures.size() << ")" << std::endl;
470 
471  return final_result;
472 }
473 
474 IOSize
475 XrdFile::write (const void *from, IOSize n)
476 {
477  if (n > 0x7fffffff) {
478  cms::Exception ex("FileWriteError");
479  ex << "XrdFile::write(name='" << m_name << "', n=" << n
480  << ") too many bytes, limit is 0x7fffffff";
481  ex.addContext("Calling XrdFile::write()");
482  addConnection(ex);
483  throw ex;
484  }
485  auto file = getActiveFile();
486 
487  XrdCl::XRootDStatus s = file->Write(m_offset, n, from);
488  if (!s.IsOK()) {
489  cms::Exception ex("FileWriteError");
490  ex << "XrdFile::write(name='" << m_name << "', n=" << n
491  << ") failed with error '" << s.ToStr()
492  << "' (errno=" << s.errNo << ", code=" << s.code << ")";
493  ex.addContext("Calling XrdFile::write()");
494  addConnection(ex);
495  throw ex;
496  }
497  m_offset += n;
498  assert(m_size != -1);
499  if (m_offset > m_size)
500  m_size = m_offset;
501 
502  return n;
503 }
504 
505 IOSize
506 XrdFile::write (const void *from, IOSize n, IOOffset pos)
507 {
508  if (n > 0x7fffffff) {
509  cms::Exception ex("FileWriteError");
510  ex << "XrdFile::write(name='" << m_name << "', n=" << n
511  << ") too many bytes, limit is 0x7fffffff";
512  ex.addContext("Calling XrdFile::write()");
513  addConnection(ex);
514  throw ex;
515  }
516  auto file = getActiveFile();
517 
518  XrdCl::XRootDStatus s = file->Write(pos, n, from);
519  if (!s.IsOK()) {
520  cms::Exception ex("FileWriteError");
521  ex << "XrdFile::write(name='" << m_name << "', n=" << n
522  << ") failed with error '" << s.ToStr()
523  << "' (errno=" << s.errNo << ", code=" << s.code << ")";
524  ex.addContext("Calling XrdFile::write()");
525  addConnection(ex);
526  throw ex;
527  }
528  assert (m_size != -1);
529  if (static_cast<IOOffset>(pos + n) > m_size)
530  m_size = pos + n;
531 
532  return n;
533 }
534 
535 bool
537 {
538  // The new Xrootd client does not contain any internal buffers.
539  // Hence, prefetching is disabled completely.
540  return false;
541 }
542 
546 IOOffset
548 {
549  if (! m_requestmanager.get()) {
550  cms::Exception ex("FilePositionError");
551  ex << "XrdFile::position() called on a closed file";
552  ex.addContext("Calling XrdFile::position()");
553  addConnection(ex);
554  throw ex;
555  }
556  switch (whence)
557  {
558  case SET:
559  m_offset = offset;
560  break;
561 
562  case CURRENT:
563  m_offset += offset;
564  break;
565 
566  // TODO: None of this works with concurrent writers to the file.
567  case END:
568  assert(m_size != -1);
569  m_offset = m_size + offset;
570  break;
571 
572  default:
573  cms::Exception ex("FilePositionError");
574  ex << "XrdFile::position() called with incorrect 'whence' parameter";
575  ex.addContext("Calling XrdFile::position()");
576  addConnection(ex);
577  throw ex;
578  }
579 
580  if (m_offset < 0)
581  m_offset = 0;
582  assert(m_size != -1);
583  if (m_offset > m_size)
584  m_size = m_offset;
585 
586  return m_offset;
587 }
588 
589 void
591 {
592  cms::Exception ex("FileResizeError");
593  ex << "XrdFile::resize(name='" << m_name << "') not implemented";
594  ex.addContext("Calling XrdFile::resize()");
595  addConnection(ex);
596  throw ex;
597 }
598 
599 std::shared_ptr<XrdCl::File>
601 {
602  if (!m_requestmanager.get())
603  {
604  cms::Exception ex("XrdFileLogicError");
605  ex << "Xrd::getActiveFile(name='" << m_name << "') no active request manager";
606  ex.addContext("Calling XrdFile::getActiveFile()");
607  m_requestmanager->addConnections(ex);
608  m_close = false;
609  throw ex;
610  }
611  return m_requestmanager->getActiveFile();
612 }
613 
614 void
616 {
617  if (m_requestmanager.get())
618  {
619  m_requestmanager->addConnections(ex);
620  }
621 }
622 
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
Definition: XrdFile.h:68
#define XRD_CL_MAX_READ_SIZE
Definition: XrdFile.cc:20
IOSize write(const void *from, IOSize n) override
Definition: XrdFile.cc:475
virtual void create(const char *name, bool exclusive=false, int perms=0666)
Definition: XrdFile.cc:65
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
char const * what() const override
Definition: Exception.cc:141
void close(void) override
Definition: XrdFile.cc:196
bool m_close
Definition: XrdFile.h:71
IOSize readv(IOBuffer *into, IOSize n) override
Definition: XrdFile.cc:310
IOOffset m_offset
Definition: XrdFile.h:69
void resize(IOOffset size) override
Definition: XrdFile.cc:590
Relative
Definition: Storage.h:23
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
#define unlikely(x)
XrdFile(void)
Definition: XrdFile.cc:22
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
virtual IOOffset position(void) const
Definition: Storage.cc:95
#define XRD_CL_MAX_CHUNK
Definition: XrdFile.cc:17
std::atomic< unsigned int > m_op_count
Definition: XrdFile.h:73
bool prefetch(const IOPosBuffer *what, IOSize n) override
Definition: XrdFile.cc:536
#define end
Definition: vmac.h:39
T min(T a, T b)
Definition: MathUtil.h:58
int read(void)
Definition: IOInput.cc:54
std::shared_ptr< XrdCl::File > getActiveFile()
Definition: XrdFile.cc:600
element_type const * get() const
IOOffset m_size
Definition: XrdFile.h:70
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void * data(void) const
Definition: IOPosBuffer.h:59
IOSize size(void) const
Definition: IOBuffer.h:50
std::string m_name
Definition: XrdFile.h:72
IOSize size(void) const
Definition: IOPosBuffer.h:64
virtual IOOffset size(void) const
Definition: Storage.cc:102
~XrdFile(void) override
Definition: XrdFile.cc:55
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t IOSize
Definition: IOTypes.h:14
void addConnection(cms::Exception &)
Definition: XrdFile.cc:615
virtual void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)
Definition: XrdFile.cc:93
def move(src, dest)
Definition: eostools.py:510
virtual void abort(void)
Definition: XrdFile.cc:216
#define XRD_CL_MAX_SIZE
Definition: XrdFile.cc:18