17 #define XRD_CL_MAX_CHUNK 512*1024 18 #define XRD_CL_MAX_SIZE 1024 20 #define XRD_CL_MAX_READ_SIZE (8*1024*1024) 40 open (name, flags, perms);
59 <<
"Destructor called on XROOTD file '" <<
m_name 60 <<
"' but the file is still open";
98 if ((name == 0) || (*name == 0)) {
100 ex <<
"Cannot open a file without a name";
106 ex <<
"Must open file '" << name <<
"' at least for read or write";
115 openflags |= XrdCl::OpenFlags::Update;
117 openflags |= XrdCl::OpenFlags::Read;
121 ex <<
"Opening file '" << name <<
"' in append mode not supported";
129 openflags |= XrdCl::OpenFlags::Delete;
130 openflags |= XrdCl::OpenFlags::New;
131 openflags |= XrdCl::OpenFlags::MakePath;
135 openflags |= XrdCl::OpenFlags::Delete;
154 XrdCl::XRootDStatus
status;
155 XrdCl::StatInfo *statInfo =
NULL;
156 if (! (status =
file->Stat(
false, statInfo)).IsOK()) {
158 ex <<
"XrdCl::File::Stat(name='" << name
159 <<
") => error '" << status.ToStr()
160 <<
"' (errno=" << status.errNo <<
", code=" << status.code <<
")";
166 m_size = statInfo->GetSize();
186 std::vector<std::string> sources;
188 std::stringstream ss;
189 ss <<
"Active sources: ";
190 for (
auto const& it : sources)
201 <<
"XrdFile::close(name='" <<
m_name 202 <<
"') called but the file is not open";
228 if (n > 0x7fffffff) {
230 ex <<
"XrdFile::read(name='" <<
m_name <<
"', n=" << n
231 <<
") too many bytes, limit is 0x7fffffff";
245 if (n > 0x7fffffff) {
247 ex <<
"XrdFile::read(name='" <<
m_name <<
"', n=" << n
248 <<
") exceeds read size limit 0x7fffffff";
260 std::future<IOSize> prev_future, cur_future;
261 IOSize bytesRead = 0, prev_future_expected = 0, cur_future_expected = 0;
262 bool readReturnedShort =
false;
266 auto check_read = [&](std::future<IOSize> &future,
IOSize expected) {
267 if (!future.valid()) {
271 if (readReturnedShort && (result != 0)) {
273 ex <<
"XrdFile::read(name='" <<
m_name <<
"', n=" << n
274 <<
") remote server returned non-zero length read after EOF.";
278 }
else if (result != expected) {
279 readReturnedShort =
true;
289 prev_future_expected = cur_future_expected;
291 cur_future_expected = chunk;
294 check_read(prev_future, prev_future_expected);
297 into =
static_cast<char*
>(into) + chunk;
303 check_read(cur_future, cur_future_expected);
312 std::vector<IOPosBuffer> new_buf;
320 return readv(&(new_buf[0]), n);
338 auto cl = std::make_shared<std::vector<IOPosBuffer>>();
344 cl->reserve(n > adjust ? adjust : n);
347 std::vector<std::pair<std::future<IOSize>,
IOSize>> readv_futures;
353 unsigned rollback_count = 1;
367 cl->emplace_back(ci);
374 cl->emplace_back(ci);
376 if (
cl->size() > adjust)
378 while (rollback_count--)
cl->pop_back();
398 assert(last_idx < idx);
402 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
412 if (readv_futures.size() > 1)
414 for (
auto & readv_result : readv_futures)
416 if (readv_result.first.valid())
418 readv_result.first.wait();
423 for (
auto & readv_result : readv_futures)
428 const int retry_count = 5;
429 for (
int retries=0; retries<retry_count; retries++)
433 if (readv_result.first.valid())
435 result = readv_result.first.get();
440 if ((retries != retry_count-1) && (ex.
getCode() == XrdCl::errInvalidResponse))
442 edm::LogWarning(
"XrdAdaptorInternal") <<
"Got an invalid response from Xrootd server; retrying" << std::endl;
450 assert(result == readv_result.second);
461 newex <<
"A std::exception was thrown when processing an xrootd request: " << ex.
what();
469 edm::LogVerbatim(
"XrdAdaptorInternal") <<
"[" <<
m_op_count.fetch_add(1) <<
"] Time for readv: " <<
static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(end-
start).
count()) <<
" (sub-readv requests: " << readv_futures.size() <<
")" << std::endl;
477 if (n > 0x7fffffff) {
479 ex <<
"XrdFile::write(name='" <<
m_name <<
"', n=" << n
480 <<
") too many bytes, limit is 0x7fffffff";
490 ex <<
"XrdFile::write(name='" <<
m_name <<
"', n=" << n
491 <<
") failed with error '" << s.ToStr()
492 <<
"' (errno=" << s.errNo <<
", code=" << s.code <<
")";
508 if (n > 0x7fffffff) {
510 ex <<
"XrdFile::write(name='" <<
m_name <<
"', n=" << n
511 <<
") too many bytes, limit is 0x7fffffff";
518 XrdCl::XRootDStatus
s =
file->Write(pos, n, from);
521 ex <<
"XrdFile::write(name='" <<
m_name <<
"', n=" << n
522 <<
") failed with error '" << s.ToStr()
523 <<
"' (errno=" << s.errNo <<
", code=" << s.code <<
")";
529 if (static_cast<IOOffset>(pos + n) >
m_size)
551 ex <<
"XrdFile::position() called on a closed file";
574 ex <<
"XrdFile::position() called with incorrect 'whence' parameter";
593 ex <<
"XrdFile::resize(name='" <<
m_name <<
"') not implemented";
599 std::shared_ptr<XrdCl::File>
605 ex <<
"Xrd::getActiveFile(name='" <<
m_name <<
"') no active request manager";
606 ex.
addContext(
"Calling XrdFile::getActiveFile()");
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
virtual void resize(IOOffset size)
#define XRD_CL_MAX_READ_SIZE
virtual void create(const char *name, bool exclusive=false, int perms=0666)
std::vector< Variable::Flags > flags
char const * what() const override
void set_data(void *new_buffer)
void set_size(IOSize new_size)
virtual bool prefetch(const IOPosBuffer *what, IOSize n)
void set_offset(IOOffset new_offset)
virtual IOOffset position(void) const
std::atomic< unsigned int > m_op_count
virtual IOSize readv(IOBuffer *into, IOSize n)
std::shared_ptr< XrdCl::File > getActiveFile()
element_type const * get() const
IOOffset offset(void) const
virtual IOOffset size(void) const
void addContext(std::string const &context)
char data[epos_bytes_allocation]
void addConnection(cms::Exception &)
virtual void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)
virtual IOSize write(const void *from, IOSize n)