24 XrdFile::XrdFile() : m_offset(0), m_size(-1), m_close(
false), m_name(), m_op_count(0) {}
27 : m_offset(0), m_size(-1), m_close(
false), m_name(), m_op_count(0) {
32 : m_offset(0), m_size(-1), m_close(
false), m_name(), m_op_count(0) {
38 edm::LogError(
"XrdFileError") <<
"Destructor called on XROOTD file '" <<
m_name <<
"' but the file is still open";
60 if ((
name ==
nullptr) || (*
name == 0)) {
62 ex <<
"Cannot open a file without a name";
68 ex <<
"Must open file '" <<
name <<
"' at least for read or write";
77 openflags |= XrdCl::OpenFlags::Update;
79 openflags |= XrdCl::OpenFlags::Read;
83 ex <<
"Opening file '" <<
name <<
"' in append mode not supported";
90 openflags |= XrdCl::OpenFlags::Delete;
91 openflags |= XrdCl::OpenFlags::New;
92 openflags |= XrdCl::OpenFlags::MakePath;
96 openflags |= XrdCl::OpenFlags::Delete;
115 XrdCl::XRootDStatus
status;
116 XrdCl::StatInfo *statInfo =
nullptr;
117 if (!(
status =
file->Stat(
false, statInfo)).IsOK()) {
119 ex <<
"XrdCl::File::Stat(name='" <<
name <<
") => error '" <<
status.ToStr() <<
"' (errno=" <<
status.errNo
120 <<
", code=" <<
status.code <<
")";
126 m_size = statInfo->GetSize();
146 std::vector<std::string>
sources;
148 std::stringstream
ss;
149 ss <<
"Active sources: ";
157 edm::LogError(
"XrdFileError") <<
"XrdFile::close(name='" <<
m_name <<
"') called but the file is not open";
179 if (
n > 0x7fffffff) {
181 ex <<
"XrdFile::read(name='" <<
m_name <<
"', n=" <<
n <<
") too many bytes, limit is 0x7fffffff";
193 if (
n > 0x7fffffff) {
195 ex <<
"XrdFile::read(name='" <<
m_name <<
"', n=" <<
n <<
") exceeds read size limit 0x7fffffff";
207 std::future<IOSize> prev_future, cur_future;
208 IOSize bytesRead = 0, prev_future_expected = 0, cur_future_expected = 0;
209 bool readReturnedShort =
false;
213 auto check_read = [&](std::future<IOSize> &future,
IOSize expected) {
214 if (!future.valid()) {
218 if (readReturnedShort && (
result != 0)) {
220 ex <<
"XrdFile::read(name='" <<
m_name <<
"', n=" <<
n 221 <<
") remote server returned non-zero length read after EOF.";
225 }
else if (
result != expected) {
226 readReturnedShort =
true;
236 prev_future_expected = cur_future_expected;
238 cur_future_expected = chunk;
241 check_read(prev_future, prev_future_expected);
244 into =
static_cast<char *
>(into) + chunk;
250 check_read(cur_future, cur_future_expected);
257 std::vector<IOPosBuffer> new_buf;
265 return readv(&(new_buf[0]),
n);
281 auto cl = std::make_shared<std::vector<IOPosBuffer>>();
287 cl->reserve(
n > adjust ? adjust :
n);
290 std::vector<std::pair<std::future<IOSize>,
IOSize>> readv_futures;
295 unsigned rollback_count = 1;
309 cl->emplace_back(ci);
316 cl->emplace_back(ci);
318 if (
cl->size() > adjust) {
319 while (rollback_count--)
338 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
348 if (readv_futures.size() > 1) {
349 for (
auto &readv_result : readv_futures) {
350 if (readv_result.first.valid()) {
351 readv_result.first.wait();
356 for (
auto &readv_result : readv_futures) {
359 const int retry_count = 5;
360 for (
int retries = 0; retries < retry_count; retries++) {
362 if (readv_result.first.valid()) {
363 result = readv_result.first.get();
366 if ((retries != retry_count - 1) && (ex.
getCode() == XrdCl::errInvalidResponse)) {
368 <<
"Got an invalid response from Xrootd server; retrying" << std::endl;
381 newex <<
"A std::exception was thrown when processing an xrootd request: " << ex.
what();
390 <<
"[" <<
m_op_count.fetch_add(1) <<
"] Time for readv: " 391 <<
static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(
end -
start).
count())
392 <<
" (sub-readv requests: " << readv_futures.size() <<
")" << std::endl;
398 if (
n > 0x7fffffff) {
400 ex <<
"XrdFile::write(name='" <<
m_name <<
"', n=" <<
n <<
") too many bytes, limit is 0x7fffffff";
410 ex <<
"XrdFile::write(name='" <<
m_name <<
"', n=" <<
n <<
") failed with error '" <<
s.ToStr()
411 <<
"' (errno=" <<
s.errNo <<
", code=" <<
s.code <<
")";
425 if (
n > 0x7fffffff) {
427 ex <<
"XrdFile::write(name='" <<
m_name <<
"', n=" <<
n <<
") too many bytes, limit is 0x7fffffff";
434 XrdCl::XRootDStatus
s =
file->Write(
pos,
n, from);
437 ex <<
"XrdFile::write(name='" <<
m_name <<
"', n=" <<
n <<
") failed with error '" <<
s.ToStr()
438 <<
"' (errno=" <<
s.errNo <<
", code=" <<
s.code <<
")";
462 ex <<
"XrdFile::position() called on a closed file";
484 ex <<
"XrdFile::position() called with incorrect 'whence' parameter";
501 ex <<
"XrdFile::resize(name='" <<
m_name <<
"') not implemented";
510 ex <<
"Xrd::getActiveFile(name='" <<
m_name <<
"') no active request manager";
511 ex.
addContext(
"Calling XrdFile::getActiveFile()");
Log< level::Info, true > LogVerbatim
virtual void create(const char *name, bool exclusive=false, int perms=0666)
std::atomic< unsigned int > m_op_count
bool prefetch(const IOPosBuffer *what, IOSize n) override
std::shared_ptr< XrdCl::File > getActiveFile()
void set_size(IOSize new_size)
Log< level::Error, false > LogError
void set_offset(IOOffset new_offset)
void resize(IOOffset size) override
constexpr element_type const * get() const
IOSize write(const void *from, IOSize n) override
virtual IOOffset size() const
static constexpr int XRD_CL_MAX_CHUNK
void addConnection(cms::Exception &)
Log< level::Info, false > LogInfo
void close(void) override
void addContext(std::string const &context)
IOSize readv(IOBuffer *into, IOSize n) override
char data[epos_bytes_allocation]
virtual void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)
void set_data(void *new_buffer)
virtual IOOffset position() const
Log< level::Warning, false > LogWarning
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
static constexpr int XRD_CL_MAX_SIZE
char const * what() const noexcept override
static constexpr int XRD_CL_MAX_READ_SIZE