17 #define XRD_CL_MAX_CHUNK 512 * 1024 18 #define XRD_CL_MAX_SIZE 1024 20 #define XRD_CL_MAX_READ_SIZE (8 * 1024 * 1024) 26 open(name, flags, perms);
36 edm::LogError(
"XrdFileError") <<
"Destructor called on XROOTD file '" <<
m_name <<
"' but the file is still open";
58 if ((name ==
nullptr) || (*name == 0)) {
60 ex <<
"Cannot open a file without a name";
66 ex <<
"Must open file '" << name <<
"' at least for read or write";
75 openflags |= XrdCl::OpenFlags::Update;
77 openflags |= XrdCl::OpenFlags::Read;
81 ex <<
"Opening file '" << name <<
"' in append mode not supported";
88 openflags |= XrdCl::OpenFlags::Delete;
89 openflags |= XrdCl::OpenFlags::New;
90 openflags |= XrdCl::OpenFlags::MakePath;
94 openflags |= XrdCl::OpenFlags::Delete;
113 XrdCl::XRootDStatus
status;
114 XrdCl::StatInfo *statInfo =
nullptr;
115 if (!(status =
file->Stat(
false, statInfo)).IsOK()) {
117 ex <<
"XrdCl::File::Stat(name='" << name <<
") => error '" << status.ToStr() <<
"' (errno=" << status.errNo
118 <<
", code=" << status.code <<
")";
124 m_size = statInfo->GetSize();
144 std::vector<std::string>
sources;
146 std::stringstream
ss;
147 ss <<
"Active sources: ";
148 for (
auto const &it : sources)
155 edm::LogError(
"XrdFileError") <<
"XrdFile::close(name='" <<
m_name <<
"') called but the file is not open";
177 if (n > 0x7fffffff) {
179 ex <<
"XrdFile::read(name='" <<
m_name <<
"', n=" << n <<
") too many bytes, limit is 0x7fffffff";
191 if (n > 0x7fffffff) {
193 ex <<
"XrdFile::read(name='" <<
m_name <<
"', n=" << n <<
") exceeds read size limit 0x7fffffff";
205 std::future<IOSize> prev_future, cur_future;
206 IOSize bytesRead = 0, prev_future_expected = 0, cur_future_expected = 0;
207 bool readReturnedShort =
false;
211 auto check_read = [&](std::future<IOSize> &future,
IOSize expected) {
212 if (!future.valid()) {
216 if (readReturnedShort && (result != 0)) {
218 ex <<
"XrdFile::read(name='" <<
m_name <<
"', n=" << n
219 <<
") remote server returned non-zero length read after EOF.";
223 }
else if (result != expected) {
224 readReturnedShort =
true;
234 prev_future_expected = cur_future_expected;
236 cur_future_expected = chunk;
239 check_read(prev_future, prev_future_expected);
242 into =
static_cast<char *
>(into) + chunk;
248 check_read(cur_future, cur_future_expected);
255 std::vector<IOPosBuffer> new_buf;
263 return readv(&(new_buf[0]), n);
279 auto cl = std::make_shared<std::vector<IOPosBuffer>>();
285 cl->reserve(n > adjust ? adjust : n);
288 std::vector<std::pair<std::future<IOSize>,
IOSize>> readv_futures;
293 unsigned rollback_count = 1;
307 cl->emplace_back(ci);
314 cl->emplace_back(ci);
316 if (
cl->size() > adjust) {
317 while (rollback_count--)
333 assert(last_idx < idx);
336 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
346 if (readv_futures.size() > 1) {
347 for (
auto &readv_result : readv_futures) {
348 if (readv_result.first.valid()) {
349 readv_result.first.wait();
354 for (
auto &readv_result : readv_futures) {
357 const int retry_count = 5;
358 for (
int retries = 0; retries < retry_count; retries++) {
360 if (readv_result.first.valid()) {
361 result = readv_result.first.get();
364 if ((retries != retry_count - 1) && (ex.
getCode() == XrdCl::errInvalidResponse)) {
366 <<
"Got an invalid response from Xrootd server; retrying" << std::endl;
372 assert(result == readv_result.second);
379 newex <<
"A std::exception was thrown when processing an xrootd request: " << ex.
what();
388 <<
"[" <<
m_op_count.fetch_add(1) <<
"] Time for readv: " 389 <<
static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(end -
start).
count())
390 <<
" (sub-readv requests: " << readv_futures.size() <<
")" << std::endl;
396 if (n > 0x7fffffff) {
398 ex <<
"XrdFile::write(name='" <<
m_name <<
"', n=" << n <<
") too many bytes, limit is 0x7fffffff";
408 ex <<
"XrdFile::write(name='" <<
m_name <<
"', n=" << n <<
") failed with error '" << s.ToStr()
409 <<
"' (errno=" << s.errNo <<
", code=" << s.code <<
")";
423 if (n > 0x7fffffff) {
425 ex <<
"XrdFile::write(name='" <<
m_name <<
"', n=" << n <<
") too many bytes, limit is 0x7fffffff";
432 XrdCl::XRootDStatus
s =
file->Write(pos, n, from);
435 ex <<
"XrdFile::write(name='" <<
m_name <<
"', n=" << n <<
") failed with error '" << s.ToStr()
436 <<
"' (errno=" << s.errNo <<
", code=" << s.code <<
")";
442 if (static_cast<IOOffset>(pos + n) >
m_size)
460 ex <<
"XrdFile::position() called on a closed file";
482 ex <<
"XrdFile::position() called with incorrect 'whence' parameter";
499 ex <<
"XrdFile::resize(name='" <<
m_name <<
"') not implemented";
508 ex <<
"Xrd::getActiveFile(name='" <<
m_name <<
"') no active request manager";
509 ex.
addContext(
"Calling XrdFile::getActiveFile()");
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
#define XRD_CL_MAX_READ_SIZE
IOSize write(const void *from, IOSize n) override
virtual void create(const char *name, bool exclusive=false, int perms=0666)
char const * what() const override
void close(void) override
IOSize readv(IOBuffer *into, IOSize n) override
void resize(IOOffset size) override
void set_data(void *new_buffer)
void set_size(IOSize new_size)
void set_offset(IOOffset new_offset)
virtual IOOffset position(void) const
std::atomic< unsigned int > m_op_count
bool prefetch(const IOPosBuffer *what, IOSize n) override
std::shared_ptr< XrdCl::File > getActiveFile()
element_type const * get() const
IOOffset offset(void) const
virtual IOOffset size(void) const
void addContext(std::string const &context)
char data[epos_bytes_allocation]
void addConnection(cms::Exception &)
virtual void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)