CMS 3D CMS Logo

List of all members | Public Member Functions | Private Member Functions | Private Attributes
XrdFile Class Reference

#include <XrdFile.h>

Inheritance diagram for XrdFile:
Storage IOInput IOOutput

Public Member Functions

virtual void abort (void)
 
void close (void) override
 
virtual void create (const char *name, bool exclusive=false, int perms=0666)
 
virtual void create (const std::string &name, bool exclusive=false, int perms=0666)
 
virtual void open (const char *name, int flags=IOFlags::OpenRead, int perms=0666)
 
virtual void open (const std::string &name, int flags=IOFlags::OpenRead, int perms=0666)
 
IOOffset position (IOOffset offset, Relative whence=SET) override
 
bool prefetch (const IOPosBuffer *what, IOSize n) override
 
IOSize read (void *into, IOSize n) override
 
IOSize read (void *into, IOSize n, IOOffset pos) override
 
IOSize readv (IOBuffer *into, IOSize n) override
 
IOSize readv (IOPosBuffer *into, IOSize n) override
 
void resize (IOOffset size) override
 
IOSize write (const void *from, IOSize n) override
 
IOSize write (const void *from, IOSize n, IOOffset pos) override
 
 XrdFile (void)
 
 XrdFile (IOFD fd)
 
 XrdFile (const char *name, int flags=IOFlags::OpenRead, int perms=0666)
 
 XrdFile (const std::string &name, int flags=IOFlags::OpenRead, int perms=0666)
 
 ~XrdFile (void) override
 
- Public Member Functions inherited from Storage
virtual bool eof (void) const
 
virtual void flush (void)
 
virtual IOOffset position (void) const
 
IOSize read (IOBuffer into, IOOffset pos)
 
virtual void rewind (void)
 
virtual IOOffset size (void) const
 
 Storage (void)
 
IOSize write (IOBuffer from, IOOffset pos)
 
virtual IOSize writev (const IOPosBuffer *from, IOSize buffers)
 
 ~Storage (void) override
 
- Public Member Functions inherited from IOInput
int read (void)
 
IOSize read (IOBuffer into)
 
IOSize xread (IOBuffer into)
 
IOSize xread (void *into, IOSize n)
 
IOSize xreadv (IOBuffer *into, IOSize buffers)
 
virtual ~IOInput (void)
 Destruct the stream. A no-op. More...
 
- Public Member Functions inherited from IOOutput
IOSize write (unsigned char byte)
 
IOSize write (IOBuffer from)
 
virtual IOSize writev (const IOBuffer *from, IOSize buffers)
 
IOSize xwrite (const void *from, IOSize n)
 
IOSize xwrite (IOBuffer from)
 
IOSize xwritev (const IOBuffer *from, IOSize buffers)
 
virtual ~IOOutput (void)
 Destruct the stream. A no-op. More...
 

Private Member Functions

void addConnection (cms::Exception &)
 
std::shared_ptr< XrdCl::File > getActiveFile ()
 

Private Attributes

bool m_close
 
std::string m_name
 
IOOffset m_offset
 
std::atomic< unsigned int > m_op_count
 
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
 
IOOffset m_size
 

Additional Inherited Members

- Public Types inherited from Storage
enum  Relative { SET, CURRENT, END }
 

Detailed Description

Definition at line 17 of file XrdFile.h.

Constructor & Destructor Documentation

XrdFile::XrdFile ( void  )

Definition at line 22 of file XrdFile.cc.

23  : m_offset (0),
24  m_size(-1),
25  m_close (false),
26  m_name(),
27  m_op_count(0)
28 {
29 }
bool m_close
Definition: XrdFile.h:71
IOOffset m_offset
Definition: XrdFile.h:69
std::atomic< unsigned int > m_op_count
Definition: XrdFile.h:73
IOOffset m_size
Definition: XrdFile.h:70
std::string m_name
Definition: XrdFile.h:72
XrdFile::XrdFile ( IOFD  fd)
XrdFile::XrdFile ( const char *  name,
int  flags = IOFlags::OpenRead,
int  perms = 0666 
)

Definition at line 31 of file XrdFile.cc.

References open().

34  : m_offset (0),
35  m_size(-1),
36  m_close (false),
37  m_name(),
38  m_op_count(0)
39 {
40  open (name, flags, perms);
41 }
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
bool m_close
Definition: XrdFile.h:71
IOOffset m_offset
Definition: XrdFile.h:69
std::atomic< unsigned int > m_op_count
Definition: XrdFile.h:73
IOOffset m_size
Definition: XrdFile.h:70
std::string m_name
Definition: XrdFile.h:72
virtual void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)
Definition: XrdFile.cc:93
XrdFile::XrdFile ( const std::string &  name,
int  flags = IOFlags::OpenRead,
int  perms = 0666 
)

Definition at line 43 of file XrdFile.cc.

References flags, and open().

46  : m_offset (0),
47  m_size(-1),
48  m_close (false),
49  m_name(),
50  m_op_count(0)
51 {
52  open (name.c_str (), flags, perms);
53 }
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
bool m_close
Definition: XrdFile.h:71
IOOffset m_offset
Definition: XrdFile.h:69
std::atomic< unsigned int > m_op_count
Definition: XrdFile.h:73
IOOffset m_size
Definition: XrdFile.h:70
std::string m_name
Definition: XrdFile.h:72
virtual void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)
Definition: XrdFile.cc:93
XrdFile::~XrdFile ( void  )
override

Definition at line 55 of file XrdFile.cc.

References m_close, and m_name.

56 {
57  if (m_close)
58  edm::LogError("XrdFileError")
59  << "Destructor called on XROOTD file '" << m_name
60  << "' but the file is still open";
61 }
bool m_close
Definition: XrdFile.h:71
std::string m_name
Definition: XrdFile.h:72

Member Function Documentation

void XrdFile::abort ( void  )
virtual

Definition at line 216 of file XrdFile.cc.

References m_close, m_offset, m_requestmanager, and m_size.

217 {
218  m_requestmanager = nullptr; // propagate_const<T> has no reset() function
219  m_close = false;
220  m_offset = 0;
221  m_size = -1;
222 }
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
Definition: XrdFile.h:68
bool m_close
Definition: XrdFile.h:71
IOOffset m_offset
Definition: XrdFile.h:69
IOOffset m_size
Definition: XrdFile.h:70
void XrdFile::addConnection ( cms::Exception ex)
private

Definition at line 615 of file XrdFile.cc.

References edm::propagate_const< T >::get(), and m_requestmanager.

Referenced by open(), position(), read(), resize(), and write().

616 {
617  if (m_requestmanager.get())
618  {
619  m_requestmanager->addConnections(ex);
620  }
621 }
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
Definition: XrdFile.h:68
element_type const * get() const
void XrdFile::close ( void  )
overridevirtual

Reimplemented from Storage.

Definition at line 196 of file XrdFile.cc.

References edm::propagate_const< T >::get(), m_close, m_name, m_offset, m_requestmanager, and m_size.

Referenced by lumiQTWidget.ApplicationWindow::fileQuit(), esMonitoring.AsyncLineReaderMixin::handle_close(), esMonitoring.FDJsonServer::handle_close(), Vispa.Gui.BoxContentDialog.BoxContentDialog::keyPressEvent(), and Vispa.Gui.FindDialog.FindDialog::keyPressEvent().

197 {
198  if (! m_requestmanager.get())
199  {
200  edm::LogError("XrdFileError")
201  << "XrdFile::close(name='" << m_name
202  << "') called but the file is not open";
203  m_close = false;
204  return;
205  }
206 
207  m_requestmanager = nullptr; // propagate_const<T> has no reset() function
208 
209  m_close = false;
210  m_offset = 0;
211  m_size = -1;
212  edm::LogInfo("XrdFileInfo") << "Closed " << m_name;
213 }
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
Definition: XrdFile.h:68
bool m_close
Definition: XrdFile.h:71
IOOffset m_offset
Definition: XrdFile.h:69
element_type const * get() const
IOOffset m_size
Definition: XrdFile.h:70
std::string m_name
Definition: XrdFile.h:72
void XrdFile::create ( const char *  name,
bool  exclusive = false,
int  perms = 0666 
)
virtual

Definition at line 65 of file XrdFile.cc.

References open(), IOFlags::OpenCreate, IOFlags::OpenExclusive, IOFlags::OpenTruncate, and IOFlags::OpenWrite.

68 {
69  open (name,
71  | (exclusive ? IOFlags::OpenExclusive : 0)),
72  perms);
73 }
virtual void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)
Definition: XrdFile.cc:93
void XrdFile::create ( const std::string &  name,
bool  exclusive = false,
int  perms = 0666 
)
virtual

Definition at line 76 of file XrdFile.cc.

References open(), IOFlags::OpenCreate, IOFlags::OpenExclusive, IOFlags::OpenTruncate, and IOFlags::OpenWrite.

79 {
80  open (name.c_str (),
82  | (exclusive ? IOFlags::OpenExclusive : 0)),
83  perms);
84 }
virtual void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)
Definition: XrdFile.cc:93
std::shared_ptr< XrdCl::File > XrdFile::getActiveFile ( void  )
private

Returns a file handle from one of the active sources. Verifies the file is open and throws an exception as necessary.

Definition at line 600 of file XrdFile.cc.

References cms::Exception::addContext(), edm::propagate_const< T >::get(), m_close, m_name, and m_requestmanager.

Referenced by open(), and write().

601 {
602  if (!m_requestmanager.get())
603  {
604  cms::Exception ex("XrdFileLogicError");
605  ex << "Xrd::getActiveFile(name='" << m_name << "') no active request manager";
606  ex.addContext("Calling XrdFile::getActiveFile()");
607  m_requestmanager->addConnections(ex);
608  m_close = false;
609  throw ex;
610  }
611  return m_requestmanager->getActiveFile();
612 }
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
Definition: XrdFile.h:68
bool m_close
Definition: XrdFile.h:71
element_type const * get() const
std::string m_name
Definition: XrdFile.h:72
void XrdFile::open ( const char *  name,
int  flags = IOFlags::OpenRead,
int  perms = 0666 
)
virtual

Definition at line 93 of file XrdFile.cc.

References addConnection(), cms::Exception::addContext(), FrontierConditions_GlobalTag_cff::file, edm::errors::FileOpenError, getActiveFile(), m_close, m_name, m_offset, m_requestmanager, m_size, dataset::name, None, IOFlags::OpenAppend, IOFlags::OpenCreate, IOFlags::OpenExclusive, IOFlags::OpenRead, IOFlags::OpenTruncate, IOFlags::OpenWrite, and mps_update::status.

Referenced by create(), open(), and XrdFile().

96 {
97  // Actual open
98  if ((name == nullptr) || (*name == 0)) {
100  ex << "Cannot open a file without a name";
101  ex.addContext("Calling XrdFile::open()");
102  throw ex;
103  }
104  if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) {
106  ex << "Must open file '" << name << "' at least for read or write";
107  ex.addContext("Calling XrdFile::open()");
108  throw ex;
109  }
110 
111  // Translate our flags to system flags
112  XrdCl::OpenFlags::Flags openflags = XrdCl::OpenFlags::None;
113 
115  openflags |= XrdCl::OpenFlags::Update;
116  else if (flags & IOFlags::OpenRead)
117  openflags |= XrdCl::OpenFlags::Read;
118 
119  if (flags & IOFlags::OpenAppend) {
121  ex << "Opening file '" << name << "' in append mode not supported";
122  ex.addContext("Calling XrdFile::open()");
123  throw ex;
124  }
125 
127  {
128  if (! (flags & IOFlags::OpenExclusive))
129  openflags |= XrdCl::OpenFlags::Delete;
130  openflags |= XrdCl::OpenFlags::New;
131  openflags |= XrdCl::OpenFlags::MakePath;
132  }
133 
134  if ((flags & IOFlags::OpenTruncate) && (flags & IOFlags::OpenWrite))
135  openflags |= XrdCl::OpenFlags::Delete;
136 
137  // Translate mode flags
139  modeflags |= (perms & S_IRUSR) ? XrdCl::Access::UR : XrdCl::Access::None;
140  modeflags |= (perms & S_IWUSR) ? XrdCl::Access::UW : XrdCl::Access::None;
141  modeflags |= (perms & S_IXUSR) ? XrdCl::Access::UX : XrdCl::Access::None;
142  modeflags |= (perms & S_IRGRP) ? XrdCl::Access::GR : XrdCl::Access::None;
143  modeflags |= (perms & S_IWGRP) ? XrdCl::Access::GW : XrdCl::Access::None;
144  modeflags |= (perms & S_IXGRP) ? XrdCl::Access::GX : XrdCl::Access::None;
145  modeflags |= (perms & S_IROTH) ? XrdCl::Access::GR : XrdCl::Access::None;
146  modeflags |= (perms & S_IWOTH) ? XrdCl::Access::GW : XrdCl::Access::None;
147  modeflags |= (perms & S_IXOTH) ? XrdCl::Access::GX : XrdCl::Access::None;
148 
149  m_requestmanager = RequestManager::getInstance(name, openflags, modeflags);
150  m_name = name;
151 
152  // Stat the file so we can keep track of the offset better.
153  auto file = getActiveFile();
154  XrdCl::XRootDStatus status;
155  XrdCl::StatInfo *statInfo = nullptr;
156  if (! (status = file->Stat(false, statInfo)).IsOK()) {
158  ex << "XrdCl::File::Stat(name='" << name
159  << ") => error '" << status.ToStr()
160  << "' (errno=" << status.errNo << ", code=" << status.code << ")";
161  ex.addContext("Calling XrdFile::open()");
162  addConnection(ex);
163  throw ex;
164  }
165  assert(statInfo);
166  m_size = statInfo->GetSize();
167  delete(statInfo);
168 
169  m_offset = 0;
170  m_close = true;
171 
172  // Send the monitoring info, if available.
173  // Note: getenv is not reentrant.
174  // Commenting out until this is available in the new client.
175 /*
176  char * crabJobId = getenv(kCrabJobIdEnv);
177  if (crabJobId) {
178  kXR_unt32 dictId;
179  m_file->SendMonitoringInfo(crabJobId, &dictId);
180  edm::LogInfo("XrdFileInfo") << "Set monitoring ID to " << crabJobId << " with resulting dictId " << dictId << ".";
181  }
182 */
183 
184  edm::LogInfo("XrdFileInfo") << "Opened " << m_name;
185 
186  std::vector<std::string> sources;
187  m_requestmanager->getActiveSourceNames(sources);
188  std::stringstream ss;
189  ss << "Active sources: ";
190  for (auto const& it : sources)
191  ss << it << ", ";
192  edm::LogInfo("XrdFileInfo") << ss.str();
193 }
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
Definition: XrdFile.h:68
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
bool m_close
Definition: XrdFile.h:71
IOOffset m_offset
Definition: XrdFile.h:69
std::shared_ptr< XrdCl::File > getActiveFile()
Definition: XrdFile.cc:600
IOOffset m_size
Definition: XrdFile.h:70
std::string m_name
Definition: XrdFile.h:72
void addConnection(cms::Exception &)
Definition: XrdFile.cc:615
void XrdFile::open ( const std::string &  name,
int  flags = IOFlags::OpenRead,
int  perms = 0666 
)
virtual

Definition at line 87 of file XrdFile.cc.

References flags, and open().

90 { open (name.c_str (), flags, perms); }
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
virtual void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)
Definition: XrdFile.cc:93
IOOffset XrdFile::position ( IOOffset  offset,
Relative  whence = SET 
)
overridevirtual

Implements Storage.

Definition at line 547 of file XrdFile.cc.

References addConnection(), cms::Exception::addContext(), Storage::CURRENT, Storage::END, edm::propagate_const< T >::get(), m_offset, m_requestmanager, m_size, PFRecoTauDiscriminationByIsolation_cfi::offset, and Storage::SET.

548 {
549  if (! m_requestmanager.get()) {
550  cms::Exception ex("FilePositionError");
551  ex << "XrdFile::position() called on a closed file";
552  ex.addContext("Calling XrdFile::position()");
553  addConnection(ex);
554  throw ex;
555  }
556  switch (whence)
557  {
558  case SET:
559  m_offset = offset;
560  break;
561 
562  case CURRENT:
563  m_offset += offset;
564  break;
565 
566  // TODO: None of this works with concurrent writers to the file.
567  case END:
568  assert(m_size != -1);
569  m_offset = m_size + offset;
570  break;
571 
572  default:
573  cms::Exception ex("FilePositionError");
574  ex << "XrdFile::position() called with incorrect 'whence' parameter";
575  ex.addContext("Calling XrdFile::position()");
576  addConnection(ex);
577  throw ex;
578  }
579 
580  if (m_offset < 0)
581  m_offset = 0;
582  assert(m_size != -1);
583  if (m_offset > m_size)
584  m_size = m_offset;
585 
586  return m_offset;
587 }
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
Definition: XrdFile.h:68
IOOffset m_offset
Definition: XrdFile.h:69
element_type const * get() const
IOOffset m_size
Definition: XrdFile.h:70
void addConnection(cms::Exception &)
Definition: XrdFile.cc:615
bool XrdFile::prefetch ( const IOPosBuffer what,
IOSize  n 
)
overridevirtual

Reimplemented from Storage.

Definition at line 536 of file XrdFile.cc.

537 {
538  // The new Xrootd client does not contain any internal buffers.
539  // Hence, prefetching is disabled completely.
540  return false;
541 }
IOSize XrdFile::read ( void *  into,
IOSize  n 
)
overridevirtual

Read into into at most n number of bytes.

If this is a blocking stream, the call will block until some data can be read, end of input is reached, or an exception is thrown. For a non-blocking stream the available input is returned. If none is available, an exception is thrown.

Returns
The number of bytes actually read. This is less or equal to the size of the buffer. Zero indicates that the end of the input has been reached: end of file, or remote end closing for a connected channel like a pipe or a socket. Otherwise the value can be less than requested if limited amount of input is currently available for platform or implementation reasons.
Exceptions
Incase of error, a #IOError exception is thrown. This includes the situation where the input stream is in non-blocking mode and no input is currently available (FIXME: make this simpler; clarify which exception).

Implements IOInput.

Definition at line 226 of file XrdFile.cc.

References addConnection(), cms::Exception::addContext(), edm::errors::FileReadError, edm::propagate_const< T >::get(), m_name, m_offset, and m_requestmanager.

227 {
228  if (n > 0x7fffffff) {
230  ex << "XrdFile::read(name='" << m_name << "', n=" << n
231  << ") too many bytes, limit is 0x7fffffff";
232  ex.addContext("Calling XrdFile::read()");
233  addConnection(ex);
234  throw ex;
235  }
236 
237  uint32_t bytesRead = m_requestmanager->handle(into, n, m_offset).get();
238  m_offset += bytesRead;
239  return bytesRead;
240 }
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
Definition: XrdFile.h:68
IOOffset m_offset
Definition: XrdFile.h:69
element_type const * get() const
std::string m_name
Definition: XrdFile.h:72
void addConnection(cms::Exception &)
Definition: XrdFile.cc:615
IOSize XrdFile::read ( void *  into,
IOSize  n,
IOOffset  pos 
)
overridevirtual

Reimplemented from Storage.

Definition at line 243 of file XrdFile.cc.

References addConnection(), cms::Exception::addContext(), edm::errors::FileReadError, m_name, m_requestmanager, min(), eostools::move(), mps_fire::result, and XRD_CL_MAX_READ_SIZE.

244 {
245  if (n > 0x7fffffff) {
247  ex << "XrdFile::read(name='" << m_name << "', n=" << n
248  << ") exceeds read size limit 0x7fffffff";
249  ex.addContext("Calling XrdFile::read()");
250  addConnection(ex);
251  throw ex;
252  }
253  if (n == 0) {
254  return 0;
255  }
256 
257  // In some cases, the IO layers above us (particularly, if lazy-download is
258  // enabled) will emit very large reads. We break this up into multiple
259  // reads in order to avoid hitting timeouts.
260  std::future<IOSize> prev_future, cur_future;
261  IOSize bytesRead = 0, prev_future_expected = 0, cur_future_expected = 0;
262  bool readReturnedShort = false;
263 
264  // Check the status of a read operation; updates bytesRead and
265  // readReturnedShort.
266  auto check_read = [&](std::future<IOSize> &future, IOSize expected) {
267  if (!future.valid()) {
268  return;
269  }
270  IOSize result = future.get();
271  if (readReturnedShort && (result != 0)) {
273  ex << "XrdFile::read(name='" << m_name << "', n=" << n
274  << ") remote server returned non-zero length read after EOF.";
275  ex.addContext("Calling XrdFile::read()");
276  addConnection(ex);
277  throw ex;
278  } else if (result != expected) {
279  readReturnedShort = true;
280  }
281  bytesRead += result;
282  };
283 
284  while (n) {
285  IOSize chunk = std::min(n, static_cast<IOSize>(XRD_CL_MAX_READ_SIZE));
286 
287  // Save prior read state; issue new read.
288  prev_future = std::move(cur_future);
289  prev_future_expected = cur_future_expected;
290  cur_future = m_requestmanager->handle(into, chunk, pos);
291  cur_future_expected = chunk;
292 
293  // Wait for the prior read; update bytesRead.
294  check_read(prev_future, prev_future_expected);
295 
296  // Update counters.
297  into = static_cast<char*>(into) + chunk;
298  n -= chunk;
299  pos += chunk;
300  }
301 
302  // Wait for the last read to finish.
303  check_read(cur_future, cur_future_expected);
304 
305  return bytesRead;
306 }
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
Definition: XrdFile.h:68
#define XRD_CL_MAX_READ_SIZE
Definition: XrdFile.cc:20
T min(T a, T b)
Definition: MathUtil.h:58
std::string m_name
Definition: XrdFile.h:72
size_t IOSize
Definition: IOTypes.h:14
void addConnection(cms::Exception &)
Definition: XrdFile.cc:615
def move(src, dest)
Definition: eostools.py:510
IOSize XrdFile::readv ( IOBuffer into,
IOSize  buffers 
)
overridevirtual

Read from the input stream into multiple scattered buffers. There are buffers to fill in an array starting at into; the memory those buffers occupy does not need to be contiguous. The buffers are filled in the order given, eac buffer is filled fully before the subsequent buffers.

If this is a blocking stream, the call will block until some data can be read, end of input is reached, or an exception is thrown. For a non-blocking stream the available input is returned. If none is available, an exception is thrown.

The base class implementation uses read(void *, IOSize) method, but derived classes may implement a more efficient alternative.

Returns
The number of bytes actually read. This is less or equal to the size of the buffer. Zero indicates that the end of the input has been reached: end of file, or remote end closing for a connected channel like a pipe or a socket. Otherwise the value can be less than requested if limited amount of input is currently available for platform or implementation reasons. Note that the return value indicates the number of bytes read, not the number of buffers; it is the sum total of bytes filled into all the buffers.
Exceptions
Incase of error, a #IOError exception is thrown. However if some data has already been read, the error is swallowed and the method returns the data read so far. It is assumed that persistent errors will occur anyway on the next read and sporadic errors like stream becoming unvailable can be ignored. Use xread() if a different policy is desirable.

Reimplemented from IOInput.

Definition at line 310 of file XrdFile.cc.

References data, mps_fire::i, gen::n, IOBuffer::size(), and Storage::size().

311 {
312  std::vector<IOPosBuffer> new_buf;
313  new_buf.reserve(n);
314  IOOffset off = 0;
315  for (IOSize i=0; i<n; i++) {
316  IOSize size = into[i].size();
317  new_buf[i] = IOPosBuffer(off, into[i].data(), size);
318  off += size;
319  }
320  return readv(&(new_buf[0]), n);
321 }
IOSize readv(IOBuffer *into, IOSize n) override
Definition: XrdFile.cc:310
IOSize size(void) const
Definition: IOBuffer.h:50
virtual IOOffset size(void) const
Definition: Storage.cc:102
int64_t IOOffset
Definition: IOTypes.h:19
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t IOSize
Definition: IOTypes.h:14
IOSize XrdFile::readv ( IOPosBuffer into,
IOSize  n 
)
overridevirtual

Reimplemented from Storage.

Definition at line 328 of file XrdFile.cc.

References cms::Exception::addContext(), edmScanValgrind::buffer, GetRecoTauVFromDQM_MC_cff::cl, KineDebug3::count(), IOPosBuffer::data(), data, end, cppFunctionSkipper::exception, edm::propagate_const< T >::get(), XrdAdaptor::XrootdException::getCode(), training_settings::idx, m_op_count, m_requestmanager, cmsPerfSuiteHarvest::now, IOPosBuffer::offset(), PFRecoTauDiscriminationByIsolation_cfi::offset, IOInput::read(), mps_fire::result, IOPosBuffer::set_data(), IOPosBuffer::set_offset(), IOPosBuffer::set_size(), IOPosBuffer::size(), Storage::size(), command_line::start, edm::errors::StdException, unlikely, cms::Exception::what(), XRD_CL_MAX_CHUNK, and XRD_CL_MAX_SIZE.

329 {
330  // A trivial vector read - unlikely, considering ROOT data format.
331  if (unlikely(n == 0)) {
332  return 0;
333  }
334  if (unlikely(n == 1)) {
335  return read(into[0].data(), into[0].size(), into[0].offset());
336  }
337 
338  auto cl = std::make_shared<std::vector<IOPosBuffer>>();
339 
340  // CMSSW may issue large readv's; Xrootd is only able to handle
341  // 1024. Further, the splitting algorithm may slightly increase
342  // the number of buffers.
343  IOSize adjust = XRD_CL_MAX_SIZE - 2;
344  cl->reserve(n > adjust ? adjust : n);
345  IOSize idx = 0, last_idx = 0;
346  IOSize final_result = 0;
347  std::vector<std::pair<std::future<IOSize>, IOSize>> readv_futures;
348  while (idx < n)
349  {
350  cl->clear();
351  IOSize size = 0;
352  while (idx < n) {
353  unsigned rollback_count = 1;
354  IOSize current_size = size;
355  IOOffset offset = into[idx].offset();
356  IOSize length = into[idx].size();
357  size += length;
358  char * buffer = static_cast<char *>(into[idx].data());
359  while (length > XRD_CL_MAX_CHUNK) {
360  IOPosBuffer ci;
362  length -= XRD_CL_MAX_CHUNK;
363  ci.set_offset(offset);
364  offset += XRD_CL_MAX_CHUNK;
365  ci.set_data(buffer);
366  buffer += XRD_CL_MAX_CHUNK;
367  cl->emplace_back(ci);
368  rollback_count ++;
369  }
370  IOPosBuffer ci;
371  ci.set_size(length);
372  ci.set_offset(offset);
373  ci.set_data(buffer);
374  cl->emplace_back(ci);
375 
376  if (cl->size() > adjust)
377  {
378  while (rollback_count--) cl->pop_back();
379  size = current_size;
380  break;
381  }
382  else
383  {
384  idx++;
385  }
386  }
387  try
388  {
389  readv_futures.emplace_back(m_requestmanager->handle(cl), size);
390  }
391  catch (edm::Exception& ex)
392  {
393  ex.addContext("Calling XrdFile::readv()");
394  throw;
395  }
396 
397  // Assure that we have made some progress.
398  assert(last_idx < idx);
399  last_idx = idx;
400 
401  }
402  std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
404 
405  // If there are multiple readv calls, wait until all return until looking
406  // at the results of any. This guarantees that all readv's have finished
407  // by time we call .get() for the first time (in case one of the readv's
408  // result in an exception).
409  //
410  // We cannot have outstanding readv's on function exit as the XrdCl may
411  // write into the corresponding buffer at the same time as ROOT.
412  if (readv_futures.size() > 1)
413  {
414  for (auto & readv_result : readv_futures)
415  {
416  if (readv_result.first.valid())
417  {
418  readv_result.first.wait();
419  }
420  }
421  }
422 
423  for (auto & readv_result : readv_futures)
424  {
425  IOSize result = 0;
426  try
427  {
428  const int retry_count = 5;
429  for (int retries=0; retries<retry_count; retries++)
430  {
431  try
432  {
433  if (readv_result.first.valid())
434  {
435  result = readv_result.first.get();
436  }
437  }
438  catch (XrootdException& ex)
439  {
440  if ((retries != retry_count-1) && (ex.getCode() == XrdCl::errInvalidResponse))
441  {
442  edm::LogWarning("XrdAdaptorInternal") << "Got an invalid response from Xrootd server; retrying" << std::endl;
443  result = m_requestmanager->handle(cl).get();
444  }
445  else
446  {
447  throw;
448  }
449  }
450  assert(result == readv_result.second);
451  }
452  }
453  catch (edm::Exception& ex)
454  {
455  ex.addContext("Calling XrdFile::readv()");
456  throw;
457  }
458  catch (std::exception& ex)
459  {
461  newex << "A std::exception was thrown when processing an xrootd request: " << ex.what();
462  newex.addContext("Calling XrdFile::readv()");
463  throw newex;
464  }
465  final_result += result;
466  }
468 
469  edm::LogVerbatim("XrdAdaptorInternal") << "[" << m_op_count.fetch_add(1) << "] Time for readv: " << static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()) << " (sub-readv requests: " << readv_futures.size() << ")" << std::endl;
470 
471  return final_result;
472 }
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
Definition: XrdFile.h:68
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
#define unlikely(x)
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
#define XRD_CL_MAX_CHUNK
Definition: XrdFile.cc:17
std::atomic< unsigned int > m_op_count
Definition: XrdFile.h:73
#define end
Definition: vmac.h:39
int read(void)
Definition: IOInput.cc:54
element_type const * get() const
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void * data(void) const
Definition: IOPosBuffer.h:59
IOSize size(void) const
Definition: IOPosBuffer.h:64
virtual IOOffset size(void) const
Definition: Storage.cc:102
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t IOSize
Definition: IOTypes.h:14
#define XRD_CL_MAX_SIZE
Definition: XrdFile.cc:18
void XrdFile::resize ( IOOffset  size)
overridevirtual

Implements Storage.

Definition at line 590 of file XrdFile.cc.

References addConnection(), cms::Exception::addContext(), and m_name.

Referenced by Vispa.Gui.TextDialog.TextDialog::__init__(), Vispa.Plugins.ConfigEditor.ToolDialog.ToolDialog::__init__(), Vispa.Main.MainWindow.MainWindow::_loadIni(), and Vispa.Gui.PortConnection.PointToPointConnection::updateConnection().

591 {
592  cms::Exception ex("FileResizeError");
593  ex << "XrdFile::resize(name='" << m_name << "') not implemented";
594  ex.addContext("Calling XrdFile::resize()");
595  addConnection(ex);
596  throw ex;
597 }
std::string m_name
Definition: XrdFile.h:72
void addConnection(cms::Exception &)
Definition: XrdFile.cc:615
IOSize XrdFile::write ( const void *  from,
IOSize  n 
)
overridevirtual

Write n bytes of data starting at address from.

Returns
The number of bytes written. Normally this will be n, but can be less, even zero, for example if the stream is non-blocking mode and cannot accept input at this time.
Exceptions
Incase of error, an exception is thrown. However if the stream is in non-blocking mode and cannot accept output, it will not throw an exception – the return value will be less than requested.

Implements IOOutput.

Definition at line 475 of file XrdFile.cc.

References addConnection(), cms::Exception::addContext(), FrontierConditions_GlobalTag_cff::file, getActiveFile(), m_name, m_offset, m_size, gen::n, and alignCSCRings::s.

476 {
477  if (n > 0x7fffffff) {
478  cms::Exception ex("FileWriteError");
479  ex << "XrdFile::write(name='" << m_name << "', n=" << n
480  << ") too many bytes, limit is 0x7fffffff";
481  ex.addContext("Calling XrdFile::write()");
482  addConnection(ex);
483  throw ex;
484  }
485  auto file = getActiveFile();
486 
487  XrdCl::XRootDStatus s = file->Write(m_offset, n, from);
488  if (!s.IsOK()) {
489  cms::Exception ex("FileWriteError");
490  ex << "XrdFile::write(name='" << m_name << "', n=" << n
491  << ") failed with error '" << s.ToStr()
492  << "' (errno=" << s.errNo << ", code=" << s.code << ")";
493  ex.addContext("Calling XrdFile::write()");
494  addConnection(ex);
495  throw ex;
496  }
497  m_offset += n;
498  assert(m_size != -1);
499  if (m_offset > m_size)
500  m_size = m_offset;
501 
502  return n;
503 }
IOOffset m_offset
Definition: XrdFile.h:69
std::shared_ptr< XrdCl::File > getActiveFile()
Definition: XrdFile.cc:600
IOOffset m_size
Definition: XrdFile.h:70
std::string m_name
Definition: XrdFile.h:72
void addConnection(cms::Exception &)
Definition: XrdFile.cc:615
IOSize XrdFile::write ( const void *  from,
IOSize  n,
IOOffset  pos 
)
overridevirtual

Reimplemented from Storage.

Definition at line 506 of file XrdFile.cc.

References addConnection(), cms::Exception::addContext(), FrontierConditions_GlobalTag_cff::file, getActiveFile(), m_name, m_size, gen::n, and alignCSCRings::s.

507 {
508  if (n > 0x7fffffff) {
509  cms::Exception ex("FileWriteError");
510  ex << "XrdFile::write(name='" << m_name << "', n=" << n
511  << ") too many bytes, limit is 0x7fffffff";
512  ex.addContext("Calling XrdFile::write()");
513  addConnection(ex);
514  throw ex;
515  }
516  auto file = getActiveFile();
517 
518  XrdCl::XRootDStatus s = file->Write(pos, n, from);
519  if (!s.IsOK()) {
520  cms::Exception ex("FileWriteError");
521  ex << "XrdFile::write(name='" << m_name << "', n=" << n
522  << ") failed with error '" << s.ToStr()
523  << "' (errno=" << s.errNo << ", code=" << s.code << ")";
524  ex.addContext("Calling XrdFile::write()");
525  addConnection(ex);
526  throw ex;
527  }
528  assert (m_size != -1);
529  if (static_cast<IOOffset>(pos + n) > m_size)
530  m_size = pos + n;
531 
532  return n;
533 }
std::shared_ptr< XrdCl::File > getActiveFile()
Definition: XrdFile.cc:600
IOOffset m_size
Definition: XrdFile.h:70
std::string m_name
Definition: XrdFile.h:72
void addConnection(cms::Exception &)
Definition: XrdFile.cc:615

Member Data Documentation

bool XrdFile::m_close
private

Definition at line 71 of file XrdFile.h.

Referenced by abort(), close(), getActiveFile(), open(), and ~XrdFile().

std::string XrdFile::m_name
private

Definition at line 72 of file XrdFile.h.

Referenced by close(), getActiveFile(), open(), read(), resize(), write(), and ~XrdFile().

IOOffset XrdFile::m_offset
private

Definition at line 69 of file XrdFile.h.

Referenced by abort(), close(), open(), position(), read(), and write().

std::atomic<unsigned int> XrdFile::m_op_count
private

Definition at line 73 of file XrdFile.h.

Referenced by readv().

edm::propagate_const<std::shared_ptr<XrdAdaptor::RequestManager> > XrdFile::m_requestmanager
private

Definition at line 68 of file XrdFile.h.

Referenced by abort(), addConnection(), close(), getActiveFile(), open(), position(), read(), and readv().

IOOffset XrdFile::m_size
private

Definition at line 70 of file XrdFile.h.

Referenced by abort(), close(), open(), position(), and write().