CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
XrdFile Class Reference

#include <XrdFile.h>

Inheritance diagram for XrdFile:
Storage IOInput IOOutput

Public Member Functions

virtual void abort (void)
 
void close (void) override
 
virtual void create (const char *name, bool exclusive=false, int perms=0666)
 
virtual void create (const std::string &name, bool exclusive=false, int perms=0666)
 
virtual void open (const char *name, int flags=IOFlags::OpenRead, int perms=0666)
 
virtual void open (const std::string &name, int flags=IOFlags::OpenRead, int perms=0666)
 
IOOffset position (IOOffset offset, Relative whence=SET) override
 
virtual IOOffset position (IOOffset offset, Relative whence=SET)=0
 
virtual IOOffset position (void) const
 
bool prefetch (const IOPosBuffer *what, IOSize n) override
 
IOSize read (IOBuffer into)
 
IOSize read (IOBuffer into, IOOffset pos)
 
IOSize read (void *into, IOSize n) override
 
virtual IOSize read (void *into, IOSize n)=0
 
virtual IOSize read (void *into, IOSize n, IOOffset pos)
 
IOSize read (void *into, IOSize n, IOOffset pos) override
 
int read (void)
 
virtual IOSize readv (IOBuffer *into, IOSize buffers)
 
IOSize readv (IOBuffer *into, IOSize n) override
 
virtual IOSize readv (IOPosBuffer *into, IOSize buffers)
 
IOSize readv (IOPosBuffer *into, IOSize n) override
 
void resize (IOOffset size) override
 
IOSize write (const void *from, IOSize n) override
 
virtual IOSize write (const void *from, IOSize n)=0
 
virtual IOSize write (const void *from, IOSize n, IOOffset pos)
 
IOSize write (const void *from, IOSize n, IOOffset pos) override
 
IOSize write (IOBuffer from)
 
IOSize write (IOBuffer from, IOOffset pos)
 
IOSize write (unsigned char byte)
 
 XrdFile (const char *name, int flags=IOFlags::OpenRead, int perms=0666)
 
 XrdFile (const std::string &name, int flags=IOFlags::OpenRead, int perms=0666)
 
 XrdFile (IOFD fd)
 
 XrdFile (void)
 
 ~XrdFile (void) override
 
- Public Member Functions inherited from Storage
virtual bool eof (void) const
 
virtual void flush (void)
 
virtual IOOffset position (void) const
 
IOSize read (IOBuffer into)
 
IOSize read (IOBuffer into, IOOffset pos)
 
virtual IOSize read (void *into, IOSize n)=0
 
int read (void)
 
virtual IOSize readv (IOBuffer *into, IOSize buffers)
 
virtual void rewind (void)
 
virtual IOOffset size (void) const
 
 Storage (void)
 
virtual IOSize write (const void *from, IOSize n)=0
 
IOSize write (IOBuffer from)
 
IOSize write (IOBuffer from, IOOffset pos)
 
IOSize write (unsigned char byte)
 
virtual IOSize writev (const IOBuffer *from, IOSize buffers)
 
virtual IOSize writev (const IOPosBuffer *from, IOSize buffers)
 
 ~Storage (void) override
 
- Public Member Functions inherited from IOInput
IOSize read (IOBuffer into)
 
int read (void)
 
IOSize xread (IOBuffer into)
 
IOSize xread (void *into, IOSize n)
 
IOSize xreadv (IOBuffer *into, IOSize buffers)
 
virtual ~IOInput (void)
 Destruct the stream. A no-op. More...
 
- Public Member Functions inherited from IOOutput
IOSize write (IOBuffer from)
 
IOSize write (unsigned char byte)
 
virtual IOSize writev (const IOBuffer *from, IOSize buffers)
 
IOSize xwrite (const void *from, IOSize n)
 
IOSize xwrite (IOBuffer from)
 
IOSize xwritev (const IOBuffer *from, IOSize buffers)
 
virtual ~IOOutput (void)
 Destruct the stream. A no-op. More...
 

Private Member Functions

void addConnection (cms::Exception &)
 
std::shared_ptr< XrdCl::File > getActiveFile ()
 

Private Attributes

bool m_close
 
std::string m_name
 
IOOffset m_offset
 
std::atomic< unsigned int > m_op_count
 
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
 
IOOffset m_size
 

Additional Inherited Members

- Public Types inherited from Storage
enum  Relative { SET, CURRENT, END }
 

Detailed Description

Definition at line 17 of file XrdFile.h.

Constructor & Destructor Documentation

◆ XrdFile() [1/4]

XrdFile::XrdFile ( void  )

Definition at line 22 of file XrdFile.cc.

22 : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {}

◆ XrdFile() [2/4]

XrdFile::XrdFile ( IOFD  fd)

◆ XrdFile() [3/4]

XrdFile::XrdFile ( const char *  name,
int  flags = IOFlags::OpenRead,
int  perms = 0666 
)

Definition at line 24 of file XrdFile.cc.

25  : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {
26  open(name, flags, perms);
27 }

References HLT_2018_cff::flags, Skims_PA_cff::name, and open().

◆ XrdFile() [4/4]

XrdFile::XrdFile ( const std::string &  name,
int  flags = IOFlags::OpenRead,
int  perms = 0666 
)

Definition at line 29 of file XrdFile.cc.

30  : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {
31  open(name.c_str(), flags, perms);
32 }

References HLT_2018_cff::flags, Skims_PA_cff::name, and open().

◆ ~XrdFile()

XrdFile::~XrdFile ( void  )
override

Definition at line 34 of file XrdFile.cc.

34  {
35  if (m_close)
36  edm::LogError("XrdFileError") << "Destructor called on XROOTD file '" << m_name << "' but the file is still open";
37 }

References m_close, and m_name.

Member Function Documentation

◆ abort()

void XrdFile::abort ( void  )
virtual

Definition at line 168 of file XrdFile.cc.

168  {
169  m_requestmanager = nullptr; // propagate_const<T> has no reset() function
170  m_close = false;
171  m_offset = 0;
172  m_size = -1;
173 }

References m_close, m_offset, m_requestmanager, and m_size.

◆ addConnection()

void XrdFile::addConnection ( cms::Exception ex)
private

Definition at line 517 of file XrdFile.cc.

517  {
518  if (m_requestmanager.get()) {
519  m_requestmanager->addConnections(ex);
520  }
521 }

References edm::propagate_const< T >::get(), and m_requestmanager.

Referenced by open(), position(), read(), resize(), and write().

◆ close()

void XrdFile::close ( void  )
overridevirtual

Reimplemented from Storage.

Definition at line 153 of file XrdFile.cc.

153  {
154  if (!m_requestmanager.get()) {
155  edm::LogError("XrdFileError") << "XrdFile::close(name='" << m_name << "') called but the file is not open";
156  m_close = false;
157  return;
158  }
159 
160  m_requestmanager = nullptr; // propagate_const<T> has no reset() function
161 
162  m_close = false;
163  m_offset = 0;
164  m_size = -1;
165  edm::LogInfo("XrdFileInfo") << "Closed " << m_name;
166 }

References edm::propagate_const< T >::get(), m_close, m_name, m_offset, m_requestmanager, and m_size.

Referenced by esMonitoring.AsyncLineReaderMixin::handle_close(), and esMonitoring.FDJsonServer::handle_close().

◆ create() [1/2]

void XrdFile::create ( const char *  name,
bool  exclusive = false,
int  perms = 0666 
)
virtual

◆ create() [2/2]

void XrdFile::create ( const std::string &  name,
bool  exclusive = false,
int  perms = 0666 
)
virtual

◆ getActiveFile()

std::shared_ptr< XrdCl::File > XrdFile::getActiveFile ( void  )
private

Returns a file handle from one of the active sources. Verifies the file is open and throws an exception as necessary.

Definition at line 505 of file XrdFile.cc.

505  {
506  if (!m_requestmanager.get()) {
507  cms::Exception ex("XrdFileLogicError");
508  ex << "Xrd::getActiveFile(name='" << m_name << "') no active request manager";
509  ex.addContext("Calling XrdFile::getActiveFile()");
510  m_requestmanager->addConnections(ex);
511  m_close = false;
512  throw ex;
513  }
514  return m_requestmanager->getActiveFile();
515 }

References cms::Exception::addContext(), edm::propagate_const< T >::get(), m_close, m_name, and m_requestmanager.

Referenced by open(), and write().

◆ open() [1/2]

void XrdFile::open ( const char *  name,
int  flags = IOFlags::OpenRead,
int  perms = 0666 
)
virtual

Definition at line 56 of file XrdFile.cc.

56  {
57  // Actual open
58  if ((name == nullptr) || (*name == 0)) {
60  ex << "Cannot open a file without a name";
61  ex.addContext("Calling XrdFile::open()");
62  throw ex;
63  }
64  if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) {
66  ex << "Must open file '" << name << "' at least for read or write";
67  ex.addContext("Calling XrdFile::open()");
68  throw ex;
69  }
70 
71  // Translate our flags to system flags
72  XrdCl::OpenFlags::Flags openflags = XrdCl::OpenFlags::None;
73 
75  openflags |= XrdCl::OpenFlags::Update;
76  else if (flags & IOFlags::OpenRead)
77  openflags |= XrdCl::OpenFlags::Read;
78 
79  if (flags & IOFlags::OpenAppend) {
81  ex << "Opening file '" << name << "' in append mode not supported";
82  ex.addContext("Calling XrdFile::open()");
83  throw ex;
84  }
85 
86  if (flags & IOFlags::OpenCreate) {
88  openflags |= XrdCl::OpenFlags::Delete;
89  openflags |= XrdCl::OpenFlags::New;
90  openflags |= XrdCl::OpenFlags::MakePath;
91  }
92 
94  openflags |= XrdCl::OpenFlags::Delete;
95 
96  // Translate mode flags
98  modeflags |= (perms & S_IRUSR) ? XrdCl::Access::UR : XrdCl::Access::None;
99  modeflags |= (perms & S_IWUSR) ? XrdCl::Access::UW : XrdCl::Access::None;
100  modeflags |= (perms & S_IXUSR) ? XrdCl::Access::UX : XrdCl::Access::None;
101  modeflags |= (perms & S_IRGRP) ? XrdCl::Access::GR : XrdCl::Access::None;
102  modeflags |= (perms & S_IWGRP) ? XrdCl::Access::GW : XrdCl::Access::None;
103  modeflags |= (perms & S_IXGRP) ? XrdCl::Access::GX : XrdCl::Access::None;
104  modeflags |= (perms & S_IROTH) ? XrdCl::Access::GR : XrdCl::Access::None;
105  modeflags |= (perms & S_IWOTH) ? XrdCl::Access::GW : XrdCl::Access::None;
106  modeflags |= (perms & S_IXOTH) ? XrdCl::Access::GX : XrdCl::Access::None;
107 
108  m_requestmanager = RequestManager::getInstance(name, openflags, modeflags);
109  m_name = name;
110 
111  // Stat the file so we can keep track of the offset better.
112  auto file = getActiveFile();
113  XrdCl::XRootDStatus status;
114  XrdCl::StatInfo *statInfo = nullptr;
115  if (!(status = file->Stat(false, statInfo)).IsOK()) {
117  ex << "XrdCl::File::Stat(name='" << name << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
118  << ", code=" << status.code << ")";
119  ex.addContext("Calling XrdFile::open()");
120  addConnection(ex);
121  throw ex;
122  }
123  assert(statInfo);
124  m_size = statInfo->GetSize();
125  delete (statInfo);
126 
127  m_offset = 0;
128  m_close = true;
129 
130  // Send the monitoring info, if available.
131  // Note: std::getenv is not reentrant.
132  // Commenting out until this is available in the new client.
133  /*
134  char * crabJobId = std::getenv(kCrabJobIdEnv);
135  if (crabJobId) {
136  kXR_unt32 dictId;
137  m_file->SendMonitoringInfo(crabJobId, &dictId);
138  edm::LogInfo("XrdFileInfo") << "Set monitoring ID to " << crabJobId << " with resulting dictId " << dictId << ".";
139  }
140 */
141 
142  edm::LogInfo("XrdFileInfo") << "Opened " << m_name;
143 
144  std::vector<std::string> sources;
145  m_requestmanager->getActiveSourceNames(sources);
146  std::stringstream ss;
147  ss << "Active sources: ";
148  for (auto const &it : sources)
149  ss << it << ", ";
150  edm::LogInfo("XrdFileInfo") << ss.str();
151 }

References addConnection(), cms::Exception::addContext(), cms::cuda::assert(), FrontierConditions_GlobalTag_cff::file, edm::errors::FileOpenError, HLT_2018_cff::flags, getActiveFile(), m_close, m_name, m_offset, m_requestmanager, m_size, Skims_PA_cff::name, None, IOFlags::OpenAppend, IOFlags::OpenCreate, IOFlags::OpenExclusive, IOFlags::OpenRead, IOFlags::OpenTruncate, IOFlags::OpenWrite, CalibrationSummaryClient_cfi::sources, contentValuesCheck::ss, and mps_update::status.

Referenced by create(), open(), and XrdFile().

◆ open() [2/2]

void XrdFile::open ( const std::string &  name,
int  flags = IOFlags::OpenRead,
int  perms = 0666 
)
virtual

Definition at line 52 of file XrdFile.cc.

52  {
53  open(name.c_str(), flags, perms);
54 }

References HLT_2018_cff::flags, Skims_PA_cff::name, and open().

◆ position() [1/3]

IOOffset XrdFile::position ( IOOffset  offset,
Relative  whence = SET 
)
overridevirtual

Implements Storage.

Definition at line 457 of file XrdFile.cc.

457  {
458  if (!m_requestmanager.get()) {
459  cms::Exception ex("FilePositionError");
460  ex << "XrdFile::position() called on a closed file";
461  ex.addContext("Calling XrdFile::position()");
462  addConnection(ex);
463  throw ex;
464  }
465  switch (whence) {
466  case SET:
467  m_offset = offset;
468  break;
469 
470  case CURRENT:
471  m_offset += offset;
472  break;
473 
474  // TODO: None of this works with concurrent writers to the file.
475  case END:
476  assert(m_size != -1);
477  m_offset = m_size + offset;
478  break;
479 
480  default:
481  cms::Exception ex("FilePositionError");
482  ex << "XrdFile::position() called with incorrect 'whence' parameter";
483  ex.addContext("Calling XrdFile::position()");
484  addConnection(ex);
485  throw ex;
486  }
487 
488  if (m_offset < 0)
489  m_offset = 0;
490  assert(m_size != -1);
491  if (m_offset > m_size)
492  m_size = m_offset;
493 
494  return m_offset;
495 }

References addConnection(), cms::Exception::addContext(), cms::cuda::assert(), Storage::CURRENT, Storage::END, edm::propagate_const< T >::get(), m_offset, m_requestmanager, m_size, hltrates_dqm_sourceclient-live_cfg::offset, and Storage::SET.

◆ position() [2/3]

virtual IOOffset Storage::position

◆ position() [3/3]

IOOffset Storage::position

Definition at line 72 of file Storage.cc.

72  {
73  Storage *self = const_cast<Storage *>(this);
74  return self->position(0, CURRENT);
75 }

◆ prefetch()

bool XrdFile::prefetch ( const IOPosBuffer what,
IOSize  n 
)
overridevirtual

Reimplemented from Storage.

Definition at line 448 of file XrdFile.cc.

448  {
449  // The new Xrootd client does not contain any internal buffers.
450  // Hence, prefetching is disabled completely.
451  return false;
452 }

◆ read() [1/7]

IOSize IOInput::read

Read from the input stream into the buffer starting at into and of size n.

If this is a blocking stream, the call will block until some data can be read, end of input is reached, or an exception is thrown. For a non-blocking stream the available input is returned. If none is available, an exception is thrown.

The base class implementation simply forwards the call to read(void *, IOSize) method.

Returns
The number of bytes actually read. This is less or equal to the size of the buffer. Zero indicates that the end of the input has been reached: end of file, or remote end closing for a connected channel like a pipe or a socket. Otherwise the value can be less than requested if limited amount of input is currently available for platform or implementation reasons.
Exceptions
Incase of error, a #IOError exception is thrown. This includes the situation where the input stream is in non-blocking mode and no input is currently available (FIXME: make this simpler; clarify which exception).

Definition at line 80 of file IOInput.cc.

80 { return read(into.data(), into.size()); }

◆ read() [2/7]

IOSize Storage::read

Definition at line 10 of file Storage.cc.

10 { return read(into.data(), into.size(), pos); }

◆ read() [3/7]

IOSize XrdFile::read ( void *  into,
IOSize  n 
)
overridevirtual

Read into into at most n number of bytes.

If this is a blocking stream, the call will block until some data can be read, end of input is reached, or an exception is thrown. For a non-blocking stream the available input is returned. If none is available, an exception is thrown.

Returns
The number of bytes actually read. This is less or equal to the size of the buffer. Zero indicates that the end of the input has been reached: end of file, or remote end closing for a connected channel like a pipe or a socket. Otherwise the value can be less than requested if limited amount of input is currently available for platform or implementation reasons.
Exceptions
Incase of error, a #IOError exception is thrown. This includes the situation where the input stream is in non-blocking mode and no input is currently available (FIXME: make this simpler; clarify which exception).

Implements IOInput.

Definition at line 176 of file XrdFile.cc.

176  {
177  if (n > 0x7fffffff) {
179  ex << "XrdFile::read(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
180  ex.addContext("Calling XrdFile::read()");
181  addConnection(ex);
182  throw ex;
183  }
184 
185  uint32_t bytesRead = m_requestmanager->handle(into, n, m_offset).get();
186  m_offset += bytesRead;
187  return bytesRead;
188 }

References addConnection(), cms::Exception::addContext(), edm::errors::FileReadError, edm::propagate_const< T >::get(), m_name, m_offset, m_requestmanager, and dqmiodumpmetadata::n.

◆ read() [4/7]

IOSize IOInput::read

Read into into at most n number of bytes.

If this is a blocking stream, the call will block until some data can be read, end of input is reached, or an exception is thrown. For a non-blocking stream the available input is returned. If none is available, an exception is thrown.

Returns
The number of bytes actually read. This is less or equal to the size of the buffer. Zero indicates that the end of the input has been reached: end of file, or remote end closing for a connected channel like a pipe or a socket. Otherwise the value can be less than requested if limited amount of input is currently available for platform or implementation reasons.
Exceptions
Incase of error, a #IOError exception is thrown. This includes the situation where the input stream is in non-blocking mode and no input is currently available (FIXME: make this simpler; clarify which exception).

◆ read() [5/7]

IOSize Storage::read

Definition at line 12 of file Storage.cc.

12  {
13  // FIXME: this is not thread safe! split into separate interface
14  // that a particular storage can choose to support or not? make
15  // sure that throw semantics are correct here!
16  // FIXME: use saveposition object in case exceptions are thrown?
17  IOOffset here = position();
18  position(pos);
19  n = read(into, n);
20  position(here);
21  return n;
22 }

◆ read() [6/7]

IOSize XrdFile::read ( void *  into,
IOSize  n,
IOOffset  pos 
)
overridevirtual

Reimplemented from Storage.

Definition at line 190 of file XrdFile.cc.

190  {
191  if (n > 0x7fffffff) {
193  ex << "XrdFile::read(name='" << m_name << "', n=" << n << ") exceeds read size limit 0x7fffffff";
194  ex.addContext("Calling XrdFile::read()");
195  addConnection(ex);
196  throw ex;
197  }
198  if (n == 0) {
199  return 0;
200  }
201 
202  // In some cases, the IO layers above us (particularly, if lazy-download is
203  // enabled) will emit very large reads. We break this up into multiple
204  // reads in order to avoid hitting timeouts.
205  std::future<IOSize> prev_future, cur_future;
206  IOSize bytesRead = 0, prev_future_expected = 0, cur_future_expected = 0;
207  bool readReturnedShort = false;
208 
209  // Check the status of a read operation; updates bytesRead and
210  // readReturnedShort.
211  auto check_read = [&](std::future<IOSize> &future, IOSize expected) {
212  if (!future.valid()) {
213  return;
214  }
215  IOSize result = future.get();
216  if (readReturnedShort && (result != 0)) {
218  ex << "XrdFile::read(name='" << m_name << "', n=" << n
219  << ") remote server returned non-zero length read after EOF.";
220  ex.addContext("Calling XrdFile::read()");
221  addConnection(ex);
222  throw ex;
223  } else if (result != expected) {
224  readReturnedShort = true;
225  }
226  bytesRead += result;
227  };
228 
229  while (n) {
230  IOSize chunk = std::min(n, static_cast<IOSize>(XRD_CL_MAX_READ_SIZE));
231 
232  // Save prior read state; issue new read.
233  prev_future = std::move(cur_future);
234  prev_future_expected = cur_future_expected;
235  cur_future = m_requestmanager->handle(into, chunk, pos);
236  cur_future_expected = chunk;
237 
238  // Wait for the prior read; update bytesRead.
239  check_read(prev_future, prev_future_expected);
240 
241  // Update counters.
242  into = static_cast<char *>(into) + chunk;
243  n -= chunk;
244  pos += chunk;
245  }
246 
247  // Wait for the last read to finish.
248  check_read(cur_future, cur_future_expected);
249 
250  return bytesRead;
251 }

References addConnection(), cms::Exception::addContext(), edm::errors::FileReadError, m_name, m_requestmanager, min(), eostools::move(), dqmiodumpmetadata::n, mps_fire::result, and XRD_CL_MAX_READ_SIZE.

◆ read() [7/7]

int IOInput::read

Read the next single byte from the input stream and return it as an unsigned char cast to an int, -1 to indicate end of intput data.

If this is a blocking stream, the call will block until the byte can be read, end of data is reached, or an exception is thrown. For a non-blocking input a character is returned if one is available, otherwise an exception is thrown.

The base class implementation simply forwards the call to read(void *, IOSize) method.

Returns
The byte cast from a unsigned char to an int (in range 0...255, inclusive) if one could be read, or -1 to indicate end of input data.
Exceptions
Incase of error, a #IOError exception is thrown. This includes the situation where the input stream is in non-blocking mode and no input is currently available (FIXME: make this simpler; clarify which exception).

Definition at line 52 of file IOInput.cc.

52  {
53  unsigned char byte;
54  IOSize n = read(&byte, 1);
55  return n == 0 ? -1 : byte;
56 }

◆ readv() [1/4]

IOSize IOInput::readv

Read from the input stream into multiple scattered buffers. There are buffers to fill in an array starting at into; the memory those buffers occupy does not need to be contiguous. The buffers are filled in the order given, eac buffer is filled fully before the subsequent buffers.

If this is a blocking stream, the call will block until some data can be read, end of input is reached, or an exception is thrown. For a non-blocking stream the available input is returned. If none is available, an exception is thrown.

The base class implementation uses read(void *, IOSize) method, but derived classes may implement a more efficient alternative.

Returns
The number of bytes actually read. This is less or equal to the size of the buffer. Zero indicates that the end of the input has been reached: end of file, or remote end closing for a connected channel like a pipe or a socket. Otherwise the value can be less than requested if limited amount of input is currently available for platform or implementation reasons. Note that the return value indicates the number of bytes read, not the number of buffers; it is the sum total of bytes filled into all the buffers.
Exceptions
Incase of error, a #IOError exception is thrown. However if some data has already been read, the error is swallowed and the method returns the data read so far. It is assumed that persistent errors will occur anyway on the next read and sporadic errors like stream becoming unvailable can be ignored. Use xread() if a different policy is desirable.

Definition at line 111 of file IOInput.cc.

111  {
112  assert(!buffers || into);
113 
114  // Keep reading as long as possible; ignore errors if we have read
115  // something, otherwise pass it on.
116  IOSize status;
117  IOSize done = 0;
118  try {
119  for (IOSize i = 0; i < buffers; done += status, ++i)
120  if ((status = read(into[i])) == 0)
121  break;
122  } catch (cms::Exception &) {
123  if (!done)
124  throw;
125  }
126 
127  return done;
128 }

◆ readv() [2/4]

IOSize XrdFile::readv ( IOBuffer into,
IOSize  buffers 
)
overridevirtual

Read from the input stream into multiple scattered buffers. There are buffers to fill in an array starting at into; the memory those buffers occupy does not need to be contiguous. The buffers are filled in the order given, eac buffer is filled fully before the subsequent buffers.

If this is a blocking stream, the call will block until some data can be read, end of input is reached, or an exception is thrown. For a non-blocking stream the available input is returned. If none is available, an exception is thrown.

The base class implementation uses read(void *, IOSize) method, but derived classes may implement a more efficient alternative.

Returns
The number of bytes actually read. This is less or equal to the size of the buffer. Zero indicates that the end of the input has been reached: end of file, or remote end closing for a connected channel like a pipe or a socket. Otherwise the value can be less than requested if limited amount of input is currently available for platform or implementation reasons. Note that the return value indicates the number of bytes read, not the number of buffers; it is the sum total of bytes filled into all the buffers.
Exceptions
Incase of error, a #IOError exception is thrown. However if some data has already been read, the error is swallowed and the method returns the data read so far. It is assumed that persistent errors will occur anyway on the next read and sporadic errors like stream becoming unvailable can be ignored. Use xread() if a different policy is desirable.

Reimplemented from IOInput.

Definition at line 254 of file XrdFile.cc.

254  {
255  std::vector<IOPosBuffer> new_buf;
256  new_buf.reserve(n);
257  IOOffset off = 0;
258  for (IOSize i = 0; i < n; i++) {
259  IOSize size = into[i].size();
260  new_buf[i] = IOPosBuffer(off, into[i].data(), size);
261  off += size;
262  }
263  return readv(&(new_buf[0]), n);
264 }

References data, mps_fire::i, dqmiodumpmetadata::n, IOBuffer::size(), and Storage::size().

◆ readv() [3/4]

IOSize Storage::readv

Definition at line 24 of file Storage.cc.

24  {
25  IOOffset here = position();
26  IOSize total = 0;
27  for (IOSize i = 0; i < n; ++i) {
28  try {
29  position(into[i].offset());
30  total += read(into[i].data(), into[i].size());
31  } catch (cms::Exception &) {
32  if (!total)
33  throw;
34  break;
35  }
36  }
37  position(here);
38  return total;
39 }

◆ readv() [4/4]

IOSize XrdFile::readv ( IOPosBuffer into,
IOSize  n 
)
overridevirtual

Reimplemented from Storage.

Definition at line 270 of file XrdFile.cc.

270  {
271  // A trivial vector read - unlikely, considering ROOT data format.
272  if (UNLIKELY(n == 0)) {
273  return 0;
274  }
275  if (UNLIKELY(n == 1)) {
276  return read(into[0].data(), into[0].size(), into[0].offset());
277  }
278 
279  auto cl = std::make_shared<std::vector<IOPosBuffer>>();
280 
281  // CMSSW may issue large readv's; Xrootd is only able to handle
282  // 1024. Further, the splitting algorithm may slightly increase
283  // the number of buffers.
284  IOSize adjust = XRD_CL_MAX_SIZE - 2;
285  cl->reserve(n > adjust ? adjust : n);
286  IOSize idx = 0, last_idx = 0;
287  IOSize final_result = 0;
288  std::vector<std::pair<std::future<IOSize>, IOSize>> readv_futures;
289  while (idx < n) {
290  cl->clear();
291  IOSize size = 0;
292  while (idx < n) {
293  unsigned rollback_count = 1;
294  IOSize current_size = size;
295  IOOffset offset = into[idx].offset();
296  IOSize length = into[idx].size();
297  size += length;
298  char *buffer = static_cast<char *>(into[idx].data());
299  while (length > XRD_CL_MAX_CHUNK) {
300  IOPosBuffer ci;
302  length -= XRD_CL_MAX_CHUNK;
303  ci.set_offset(offset);
305  ci.set_data(buffer);
307  cl->emplace_back(ci);
308  rollback_count++;
309  }
310  IOPosBuffer ci;
311  ci.set_size(length);
312  ci.set_offset(offset);
313  ci.set_data(buffer);
314  cl->emplace_back(ci);
315 
316  if (cl->size() > adjust) {
317  while (rollback_count--)
318  cl->pop_back();
319  size = current_size;
320  break;
321  } else {
322  idx++;
323  }
324  }
325  try {
326  readv_futures.emplace_back(m_requestmanager->handle(cl), size);
327  } catch (edm::Exception &ex) {
328  ex.addContext("Calling XrdFile::readv()");
329  throw;
330  }
331 
332  // Assure that we have made some progress.
333  assert(last_idx < idx);
334  last_idx = idx;
335  }
336  std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
338 
339  // If there are multiple readv calls, wait until all return until looking
340  // at the results of any. This guarantees that all readv's have finished
341  // by time we call .get() for the first time (in case one of the readv's
342  // result in an exception).
343  //
344  // We cannot have outstanding readv's on function exit as the XrdCl may
345  // write into the corresponding buffer at the same time as ROOT.
346  if (readv_futures.size() > 1) {
347  for (auto &readv_result : readv_futures) {
348  if (readv_result.first.valid()) {
349  readv_result.first.wait();
350  }
351  }
352  }
353 
354  for (auto &readv_result : readv_futures) {
355  IOSize result = 0;
356  try {
357  const int retry_count = 5;
358  for (int retries = 0; retries < retry_count; retries++) {
359  try {
360  if (readv_result.first.valid()) {
361  result = readv_result.first.get();
362  }
363  } catch (XrootdException &ex) {
364  if ((retries != retry_count - 1) && (ex.getCode() == XrdCl::errInvalidResponse)) {
365  edm::LogWarning("XrdAdaptorInternal")
366  << "Got an invalid response from Xrootd server; retrying" << std::endl;
367  result = m_requestmanager->handle(cl).get();
368  } else {
369  throw;
370  }
371  }
372  assert(result == readv_result.second);
373  }
374  } catch (edm::Exception &ex) {
375  ex.addContext("Calling XrdFile::readv()");
376  throw;
377  } catch (std::exception &ex) {
379  newex << "A std::exception was thrown when processing an xrootd request: " << ex.what();
380  newex.addContext("Calling XrdFile::readv()");
381  throw newex;
382  }
383  final_result += result;
384  }
386 
387  edm::LogVerbatim("XrdAdaptorInternal")
388  << "[" << m_op_count.fetch_add(1) << "] Time for readv: "
389  << static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count())
390  << " (sub-readv requests: " << readv_futures.size() << ")" << std::endl;
391 
392  return final_result;
393 }

References cms::Exception::addContext(), cms::cuda::assert(), edmScanValgrind::buffer, GetRecoTauVFromDQM_MC_cff::cl, KineDebug3::count(), data, end, cppFunctionSkipper::exception, edm::propagate_const< T >::get(), XrdAdaptor::XrootdException::getCode(), training_settings::idx, m_op_count, m_requestmanager, dqmiodumpmetadata::n, fileCollector::now, IOPosBuffer::offset(), hltrates_dqm_sourceclient-live_cfg::offset, IOInput::read(), mps_fire::result, IOPosBuffer::set_data(), IOPosBuffer::set_offset(), IOPosBuffer::set_size(), IOPosBuffer::size(), Storage::size(), command_line::start, edm::errors::StdException, UNLIKELY, cms::Exception::what(), XRD_CL_MAX_CHUNK, and XRD_CL_MAX_SIZE.

◆ resize()

void XrdFile::resize ( IOOffset  size)
overridevirtual

Implements Storage.

Definition at line 497 of file XrdFile.cc.

497  {
498  cms::Exception ex("FileResizeError");
499  ex << "XrdFile::resize(name='" << m_name << "') not implemented";
500  ex.addContext("Calling XrdFile::resize()");
501  addConnection(ex);
502  throw ex;
503 }

References addConnection(), cms::Exception::addContext(), and m_name.

◆ write() [1/7]

IOSize XrdFile::write ( const void *  from,
IOSize  n 
)
overridevirtual

Write n bytes of data starting at address from.

Returns
The number of bytes written. Normally this will be n, but can be less, even zero, for example if the stream is non-blocking mode and cannot accept input at this time.
Exceptions
Incase of error, an exception is thrown. However if the stream is in non-blocking mode and cannot accept output, it will not throw an exception – the return value will be less than requested.

Implements IOOutput.

Definition at line 395 of file XrdFile.cc.

395  {
396  if (n > 0x7fffffff) {
398  ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
399  ex.addContext("Calling XrdFile::write()");
400  addConnection(ex);
401  throw ex;
402  }
403  auto file = getActiveFile();
404 
405  XrdCl::XRootDStatus s = file->Write(m_offset, n, from);
406  if (!s.IsOK()) {
408  ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") failed with error '" << s.ToStr()
409  << "' (errno=" << s.errNo << ", code=" << s.code << ")";
410  ex.addContext("Calling XrdFile::write()");
411  addConnection(ex);
412  throw ex;
413  }
414  m_offset += n;
415  assert(m_size != -1);
416  if (m_offset > m_size)
417  m_size = m_offset;
418 
419  return n;
420 }

References addConnection(), cms::Exception::addContext(), cms::cuda::assert(), FrontierConditions_GlobalTag_cff::file, edm::errors::FileWriteError, getActiveFile(), m_name, m_offset, m_size, dqmiodumpmetadata::n, and alignCSCRings::s.

◆ write() [2/7]

IOSize IOOutput::write

Write n bytes of data starting at address from.

Returns
The number of bytes written. Normally this will be n, but can be less, even zero, for example if the stream is non-blocking mode and cannot accept input at this time.
Exceptions
Incase of error, an exception is thrown. However if the stream is in non-blocking mode and cannot accept output, it will not throw an exception – the return value will be less than requested.

◆ write() [3/7]

IOSize Storage::write

Definition at line 44 of file Storage.cc.

44  {
45  // FIXME: this is not thread safe! split into separate interface
46  // that a particular storage can choose to support or not? make
47  // sure that throw semantics are correct here!
48 
49  // FIXME: use saveposition object in case exceptions are thrown?
50  IOOffset here = position();
51  position(pos);
52  n = write(from, n);
53  position(here);
54  return n;
55 }

◆ write() [4/7]

IOSize XrdFile::write ( const void *  from,
IOSize  n,
IOOffset  pos 
)
overridevirtual

Reimplemented from Storage.

Definition at line 422 of file XrdFile.cc.

422  {
423  if (n > 0x7fffffff) {
425  ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
426  ex.addContext("Calling XrdFile::write()");
427  addConnection(ex);
428  throw ex;
429  }
430  auto file = getActiveFile();
431 
432  XrdCl::XRootDStatus s = file->Write(pos, n, from);
433  if (!s.IsOK()) {
435  ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") failed with error '" << s.ToStr()
436  << "' (errno=" << s.errNo << ", code=" << s.code << ")";
437  ex.addContext("Calling XrdFile::write()");
438  addConnection(ex);
439  throw ex;
440  }
441  assert(m_size != -1);
442  if (static_cast<IOOffset>(pos + n) > m_size)
443  m_size = pos + n;
444 
445  return n;
446 }

References addConnection(), cms::Exception::addContext(), cms::cuda::assert(), FrontierConditions_GlobalTag_cff::file, edm::errors::FileWriteError, getActiveFile(), m_name, m_size, dqmiodumpmetadata::n, and alignCSCRings::s.

◆ write() [5/7]

IOSize IOOutput::write

Write to the output stream the buffer from. This method is simply redirected to write(const void *, IOSize).

Note that derived classes should not normally call this method, as it simply routes the call back to derived class through the other virtual functions. Use this method only at the "outside edge" when transferring calls from one object to another, not in up/down calls in the inheritance tree.

Returns
The number of bytes actually written. This is less or equal to the size of the buffer. The value can be less than requested if the stream is unable to accept all the output for platform or implementation reasons.
Exceptions
Incase of error, an exception is thrown. However if the stream is in non-blocking mode and cannot accept output, it will not throw an exception – the return value will be less than requested.

Definition at line 59 of file IOOutput.cc.

59 { return write(from.data(), from.size()); }

◆ write() [6/7]

IOSize Storage::write

Definition at line 42 of file Storage.cc.

42 { return write(from.data(), from.size(), pos); }

◆ write() [7/7]

IOSize IOOutput::write

Write a single byte to the output stream. This method is simply redirected to write(const void *, IOSize).

Note that derived classes should not normally call this method, as it simply routes the call back to derived class through the other virtual functions. Use this method only at the "outside edge" when transferring calls from one object to another, not in up/down calls in the inheritance tree.

Returns
The number of bytes written. Normally this will be one, but can be zero if the stream is in non-blocking mode and cannot accept output at this time.
Exceptions
Incase of error, an exception is thrown. However if the stream is in non-blocking mode and cannot accept output, it will not throw an exception – zero will be returned.

Definition at line 39 of file IOOutput.cc.

39 { return write(&byte, 1); }

Member Data Documentation

◆ m_close

bool XrdFile::m_close
private

Definition at line 61 of file XrdFile.h.

Referenced by abort(), close(), getActiveFile(), open(), and ~XrdFile().

◆ m_name

std::string XrdFile::m_name
private

Definition at line 62 of file XrdFile.h.

Referenced by close(), getActiveFile(), open(), read(), resize(), write(), and ~XrdFile().

◆ m_offset

IOOffset XrdFile::m_offset
private

Definition at line 59 of file XrdFile.h.

Referenced by abort(), close(), open(), position(), read(), and write().

◆ m_op_count

std::atomic<unsigned int> XrdFile::m_op_count
private

Definition at line 63 of file XrdFile.h.

Referenced by readv().

◆ m_requestmanager

edm::propagate_const<std::shared_ptr<XrdAdaptor::RequestManager> > XrdFile::m_requestmanager
private

Definition at line 58 of file XrdFile.h.

Referenced by abort(), addConnection(), close(), getActiveFile(), open(), position(), read(), and readv().

◆ m_size

IOOffset XrdFile::m_size
private

Definition at line 60 of file XrdFile.h.

Referenced by abort(), close(), open(), position(), and write().

IOPosBuffer::set_offset
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:48
Storage::size
virtual IOOffset size(void) const
Definition: Storage.cc:77
XrdFile::m_offset
IOOffset m_offset
Definition: XrdFile.h:59
XrdFile::addConnection
void addConnection(cms::Exception &)
Definition: XrdFile.cc:517
IOBuffer::size
IOSize size(void) const
Definition: IOBuffer.h:34
mps_fire.i
i
Definition: mps_fire.py:355
start
Definition: start.py:1
dqmiodumpmetadata.n
n
Definition: dqmiodumpmetadata.py:28
HcalTopologyMode::Mode
Mode
Definition: HcalTopologyMode.h:26
XrdFile::getActiveFile
std::shared_ptr< XrdCl::File > getActiveFile()
Definition: XrdFile.cc:505
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
mps_update.status
status
Definition: mps_update.py:69
min
T min(T a, T b)
Definition: MathUtil.h:58
XrdFile::m_requestmanager
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
Definition: XrdFile.h:58
XrdAdaptor::XrootdException::getCode
uint16_t getCode()
Definition: XrdRequestManager.h:39
pos
Definition: PixelAliasList.h:18
XrdFile::readv
IOSize readv(IOBuffer *into, IOSize n) override
Definition: XrdFile.cc:254
edm::LogInfo
Definition: MessageLogger.h:254
Storage::END
Definition: Storage.h:22
Storage::SET
Definition: Storage.h:22
edm::errors::FileWriteError
Definition: EDMException.h:66
IOPosBuffer::set_data
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:51
cms::cuda::assert
assert(be >=bs)
XRD_CL_MAX_READ_SIZE
#define XRD_CL_MAX_READ_SIZE
Definition: XrdFile.cc:20
CalibrationSummaryClient_cfi.sources
sources
Definition: CalibrationSummaryClient_cfi.py:23
XrdFile::open
virtual void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)
Definition: XrdFile.cc:56
IOFlags::OpenTruncate
Definition: IOFlags.h:28
IOFlags::OpenWrite
Definition: IOFlags.h:8
training_settings.idx
idx
Definition: training_settings.py:16
end
#define end
Definition: vmac.h:39
edm::errors::FileOpenError
Definition: EDMException.h:49
edm::Exception
Definition: EDMException.h:77
edmScanValgrind.buffer
buffer
Definition: edmScanValgrind.py:171
GetRecoTauVFromDQM_MC_cff.cl
cl
Definition: GetRecoTauVFromDQM_MC_cff.py:38
contentValuesCheck.ss
ss
Definition: contentValuesCheck.py:33
fileCollector.now
now
Definition: fileCollector.py:207
UNLIKELY
#define UNLIKELY(x)
Definition: Likely.h:21
edm::propagate_const::get
element_type const * get() const
Definition: propagate_const.h:64
alignCSCRings.s
s
Definition: alignCSCRings.py:92
None
Definition: APVGainStruct.h:52
fileCollector.done
done
Definition: fileCollector.py:123
IOFlags::OpenRead
Definition: IOFlags.h:7
edm::LogWarning
Definition: MessageLogger.h:141
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
IOPosBuffer::set_size
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:54
edm::LogError
Definition: MessageLogger.h:183
XRD_CL_MAX_CHUNK
#define XRD_CL_MAX_CHUNK
Definition: XrdFile.cc:17
KineDebug3::count
void count()
Definition: KinematicConstrainedVertexUpdatorT.h:21
IOOffset
int64_t IOOffset
Definition: IOTypes.h:19
FrontierConditions_GlobalTag_cff.file
file
Definition: FrontierConditions_GlobalTag_cff.py:13
edm::LogVerbatim
Definition: MessageLogger.h:297
XRD_CL_MAX_SIZE
#define XRD_CL_MAX_SIZE
Definition: XrdFile.cc:18
IOFlags::OpenExclusive
Definition: IOFlags.h:25
IOFlags::OpenCreate
Definition: IOFlags.h:24
edm::errors::StdException
Definition: EDMException.h:28
IOFlags::OpenAppend
Definition: IOFlags.h:18
XrdFile::m_size
IOOffset m_size
Definition: XrdFile.h:60
eostools.move
def move(src, dest)
Definition: eostools.py:511
IOPosBuffer::size
IOSize size(void) const
Definition: IOPosBuffer.h:45
XrdFile::m_name
std::string m_name
Definition: XrdFile.h:62
generator_cfi.exclusive
exclusive
Definition: generator_cfi.py:24
IOPosBuffer::offset
IOOffset offset(void) const
Definition: IOPosBuffer.h:39
Storage::position
virtual IOOffset position(void) const
Definition: Storage.cc:72
IOInput::read
int read(void)
Definition: IOInput.cc:52
Skims_PA_cff.name
name
Definition: Skims_PA_cff.py:17
data
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
Storage::CURRENT
Definition: Storage.h:22
XrdFile::m_op_count
std::atomic< unsigned int > m_op_count
Definition: XrdFile.h:63
dqmMemoryStats.total
total
Definition: dqmMemoryStats.py:152
IOPosBuffer
Definition: IOPosBuffer.h:7
XrdAdaptor::XrootdException
Definition: XrdRequestManager.h:32
mps_fire.result
result
Definition: mps_fire.py:303
cms::Exception
Definition: Exception.h:70
command_line.start
start
Definition: command_line.py:167
XrdFile::write
IOSize write(const void *from, IOSize n) override
Definition: XrdFile.cc:395
hltrates_dqm_sourceclient-live_cfg.offset
offset
Definition: hltrates_dqm_sourceclient-live_cfg.py:78
IOSize
size_t IOSize
Definition: IOTypes.h:14
Storage
Definition: Storage.h:20
edm::errors::FileReadError
Definition: EDMException.h:50
HLT_2018_cff.flags
flags
Definition: HLT_2018_cff.py:11758
XrdFile::m_close
bool m_close
Definition: XrdFile.h:61