#include <XrdFile.h>
Public Member Functions | |
virtual void | abort (void) |
virtual void | close (void) |
virtual void | create (const char *name, bool exclusive=false, int perms=0666) |
virtual void | create (const std::string &name, bool exclusive=false, int perms=0666) |
virtual void | open (const char *name, int flags=IOFlags::OpenRead, int perms=0666) |
virtual void | open (const std::string &name, int flags=IOFlags::OpenRead, int perms=0666) |
virtual IOOffset | position (IOOffset offset, Relative whence=SET) |
virtual bool | prefetch (const IOPosBuffer *what, IOSize n) |
virtual IOSize | read (void *into, IOSize n) |
virtual IOSize | read (void *into, IOSize n, IOOffset pos) |
virtual IOSize | readv (IOBuffer *into, IOSize n) |
virtual IOSize | readv (IOPosBuffer *into, IOSize n) |
virtual void | resize (IOOffset size) |
virtual IOSize | write (const void *from, IOSize n, IOOffset pos) |
virtual IOSize | write (const void *from, IOSize n) |
XrdFile (void) | |
XrdFile (IOFD fd) | |
XrdFile (const std::string &name, int flags=IOFlags::OpenRead, int perms=0666) | |
XrdFile (const char *name, int flags=IOFlags::OpenRead, int perms=0666) | |
~XrdFile (void) | |
Private Member Functions | |
void | addConnection (cms::Exception &) |
IOSize | readv_send (char **result_buffer, readahead_list &read_chunk_list, IOSize n, IOSize total_len) |
IOSize | readv_unpack (char **result_buffer, std::vector< char > &res_buf, IOSize datalen, readahead_list &read_chunk_list, IOSize n) |
Private Attributes | |
XrdClient * | m_client |
bool | m_close |
std::string | m_name |
IOOffset | m_offset |
pthread_mutex_t | m_readv_mutex |
XrdClientStatInfo | m_stat |
XrdFile::XrdFile | ( | void | ) |
Definition at line 10 of file XrdFile.cc.
References m_readv_mutex, and m_stat.
XrdFile::XrdFile | ( | IOFD | fd | ) |
XrdFile::XrdFile | ( | const char * | name, |
int | flags = IOFlags::OpenRead , |
||
int | perms = 0666 |
||
) |
XrdFile::XrdFile | ( | const std::string & | name, |
int | flags = IOFlags::OpenRead , |
||
int | perms = 0666 |
||
) |
XrdFile::~XrdFile | ( | void | ) |
Definition at line 49 of file XrdFile.cc.
References m_close, m_name, and m_readv_mutex.
{ if (m_close) edm::LogError("XrdFileError") << "Destructor called on XROOTD file '" << m_name << "' but the file is still open"; pthread_mutex_destroy(&m_readv_mutex); }
void XrdFile::abort | ( | void | ) | [virtual] |
void XrdFile::addConnection | ( | cms::Exception & | ex | ) | [private] |
Definition at line 403 of file XrdFile.cc.
References cms::Exception::addAdditionalInfo(), results_mgr::conn, and m_client.
Referenced by open(), position(), read(), readv(), readv_unpack(), resize(), and write().
{ XrdClientConn *conn = m_client->GetClientConn(); if (conn) { std::stringstream ss; ss << "Current server connection: " << conn->GetCurrentUrl().GetUrl().c_str(); ex.addAdditionalInfo(ss.str()); } }
void XrdFile::close | ( | void | ) | [virtual] |
Reimplemented from Storage.
Definition at line 185 of file XrdFile.cc.
References m_client, m_close, m_name, m_offset, and m_stat.
Referenced by open().
{ if (! m_client) { edm::LogError("XrdFileError") << "XrdFile::close(name='" << m_name << "') called but the file is not open"; m_close = false; return; } if (! m_client->Close()) edm::LogWarning("XrdFileWarning") << "XrdFile::close(name='" << m_name << "') failed with error '" << m_client->LastServerError()->errmsg << "' (errno=" << m_client->LastServerError()->errnum << ")"; delete m_client; m_client = 0; m_close = false; m_offset = 0; memset(&m_stat, 0, sizeof (m_stat)); edm::LogInfo("XrdFileInfo") << "Closed " << m_name; }
void XrdFile::create | ( | const std::string & | name, |
bool | exclusive = false , |
||
int | perms = 0666 |
||
) | [virtual] |
Definition at line 71 of file XrdFile.cc.
References open(), IOFlags::OpenCreate, IOFlags::OpenExclusive, IOFlags::OpenTruncate, and IOFlags::OpenWrite.
{ open (name.c_str (), (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate | (exclusive ? IOFlags::OpenExclusive : 0)), perms); }
void XrdFile::create | ( | const char * | name, |
bool | exclusive = false , |
||
int | perms = 0666 |
||
) | [virtual] |
Definition at line 60 of file XrdFile.cc.
References open(), IOFlags::OpenCreate, IOFlags::OpenExclusive, IOFlags::OpenTruncate, and IOFlags::OpenWrite.
{ open (name, (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate | (exclusive ? IOFlags::OpenExclusive : 0)), perms); }
void XrdFile::open | ( | const std::string & | name, |
int | flags = IOFlags::OpenRead , |
||
int | perms = 0666 |
||
) | [virtual] |
void XrdFile::open | ( | const char * | name, |
int | flags = IOFlags::OpenRead , |
||
int | perms = 0666 |
||
) | [virtual] |
Definition at line 88 of file XrdFile.cc.
References abort(), addConnection(), cms::Exception::addContext(), close(), results_mgr::conn, edm::errors::FileOpenError, edm::storage::StatisticsSenderService::getJobID(), query::host, m_client, m_close, m_name, m_offset, m_stat, mergeVDriftHistosByStation::name, IOFlags::OpenAppend, IOFlags::OpenCreate, IOFlags::OpenExclusive, IOFlags::OpenRead, IOFlags::OpenTruncate, IOFlags::OpenWrite, and AlCaHLTBitMon_QueryRunRegistry::string.
Referenced by create(), open(), and XrdFile().
{ // Actual open if ((name == 0) || (*name == 0)) { edm::Exception ex(edm::errors::FileOpenError); ex << "Cannot open a file without a name"; ex.addContext("Calling XrdFile::open()"); throw ex; } if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) { edm::Exception ex(edm::errors::FileOpenError); ex << "Must open file '" << name << "' at least for read or write"; ex.addContext("Calling XrdFile::open()"); throw ex; } // If I am already open, close old file first if (m_client && m_close) close(); else abort(); // Translate our flags to system flags int openflags = 0; if (flags & IOFlags::OpenWrite) openflags |= kXR_open_updt; else if (flags & IOFlags::OpenRead) openflags |= kXR_open_read; if (flags & IOFlags::OpenAppend) { edm::Exception ex(edm::errors::FileOpenError); ex << "Opening file '" << name << "' in append mode not supported"; ex.addContext("Calling XrdFile::open()"); throw ex; } if (flags & IOFlags::OpenCreate) { if (! (flags & IOFlags::OpenExclusive)) openflags |= kXR_delete; openflags |= kXR_new; openflags |= kXR_mkpath; } if ((flags & IOFlags::OpenTruncate) && (flags & IOFlags::OpenWrite)) openflags |= kXR_delete; m_name = name; m_client = new XrdClient(name); if (! m_client->Open(perms, openflags) || m_client->LastServerResp()->status != kXR_ok) { edm::Exception ex(edm::errors::FileOpenError); ex << "XrdClient::Open(name='" << name << "', flags=0x" << std::hex << openflags << ", permissions=0" << std::oct << perms << std::dec << ") => error '" << m_client->LastServerError()->errmsg << "' (errno=" << m_client->LastServerError()->errnum << ")"; ex.addContext("Calling XrdFile::open()"); addConnection(ex); throw ex; } if (! m_client->Stat(&m_stat)) { edm::Exception ex(edm::errors::FileOpenError); ex << "XrdClient::Stat(name='" << name << ") => error '" << m_client->LastServerError()->errmsg << "' (errno=" << m_client->LastServerError()->errnum << ")"; ex.addContext("Calling XrdFile::open()"); addConnection(ex); throw ex; } m_offset = 0; m_close = true; // Send the monitoring info, if available. // Note: getenv is not reentrant. const char * crabJobId = edm::storage::StatisticsSenderService::getJobID(); if (crabJobId) { kXR_unt32 dictId; m_client->SendMonitoringInfo(crabJobId, &dictId); edm::LogInfo("XrdFileInfo") << "Set monitoring ID to " << crabJobId << " with resulting dictId " << dictId << "."; } edm::LogInfo("XrdFileInfo") << "Opened " << m_name; XrdClientConn *conn = m_client->GetClientConn(); edm::LogInfo("XrdFileInfo") << "Connection URL " << conn->GetCurrentUrl().GetUrl().c_str(); std::string host = std::string(conn->GetCurrentUrl().Host.c_str()); edm::Service<edm::storage::StatisticsSenderService> statsService; if (statsService.isAvailable()) { statsService->setCurrentServer(host); } }
Implements Storage.
Definition at line 353 of file XrdFile.cc.
References addConnection(), cms::Exception::addContext(), Storage::CURRENT, Storage::END, m_client, m_offset, m_stat, evf::evtn::offset(), and Storage::SET.
{ if (! m_client) { cms::Exception ex("FilePositionError"); ex << "XrdFile::position() called on a closed file"; ex.addContext("Calling XrdFile::position()"); addConnection(ex); throw ex; } switch (whence) { case SET: m_offset = offset; break; case CURRENT: m_offset += offset; break; case END: m_offset = m_stat.size + offset; break; default: cms::Exception ex("FilePositionError"); ex << "XrdFile::position() called with incorrect 'whence' parameter"; ex.addContext("Calling XrdFile::position()"); addConnection(ex); throw ex; } if (m_offset < 0) m_offset = 0; if (m_offset > m_stat.size) m_stat.size = m_offset; return m_offset; }
bool XrdFile::prefetch | ( | const IOPosBuffer * | what, |
IOSize | n | ||
) | [virtual] |
Reimplemented from Storage.
Definition at line 328 of file XrdFile.cc.
References i, m_client, n, NULL, IOPosBuffer::offset(), evf::evtn::offset(), PREFETCH_PROBE_LENGTH, alignCSCRings::r, IOPosBuffer::size(), Storage::size(), pileupDistInMC::total, and unlikely.
{ // Detect a prefetch support probe, and claim we don't support it. // This will make the default application-only mode, but allows us to still // effectively support storage-only mode. if (unlikely((n == 1) && (what[0].offset() == 0) && (what[0].size() == PREFETCH_PROBE_LENGTH))) { return false; } std::vector<long long> offsets; offsets.resize(n); std::vector<int> lens; lens.resize(n); kXR_int64 total = 0; for (IOSize i = 0; i < n; ++i) { offsets[i] = what[i].offset(); lens[i] = what[i].size(); total += what[i].size(); } kXR_int64 r = m_client->ReadV(NULL, &offsets[0], &lens[0], n); return r == total; }
Read into into at most n number of bytes.
If this is a blocking stream, the call will block until some data can be read, end of input is reached, or an exception is thrown. For a non-blocking stream the available input is returned. If none is available, an exception is thrown.
In | case of error, a #IOError exception is thrown. This includes the situation where the input stream is in non-blocking mode and no input is currently available (FIXME: make this simpler; clarify which exception). |
Implements IOInput.
Definition at line 222 of file XrdFile.cc.
References addConnection(), cms::Exception::addContext(), edm::errors::FileReadError, m_client, m_name, m_offset, and alignCSCRings::s.
{ if (n > 0x7fffffff) { edm::Exception ex(edm::errors::FileReadError); ex << "XrdFile::read(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff"; ex.addContext("Calling XrdFile::read()"); addConnection(ex); throw ex; } int s = m_client->Read(into, m_offset, n); if (s < 0) { edm::Exception ex(edm::errors::FileReadError); ex << "XrdClient::Read(name='" << m_name << "', offset=" << m_offset << ", n=" << n << ") failed with error '" << m_client->LastServerError()->errmsg << "' (errno=" << m_client->LastServerError()->errnum << ")"; ex.addContext("Calling XrdFile::read()"); addConnection(ex); throw ex; } m_offset += s; return s; }
Reimplemented from Storage.
Definition at line 248 of file XrdFile.cc.
References addConnection(), cms::Exception::addContext(), edm::errors::FileReadError, m_client, m_name, m_offset, and alignCSCRings::s.
{ if (n > 0x7fffffff) { edm::Exception ex(edm::errors::FileReadError); ex << "XrdFile::read(name='" << m_name << "', n=" << n << ") exceeds read size limit 0x7fffffff"; ex.addContext("Calling XrdFile::read()"); addConnection(ex); throw ex; } int s = m_client->Read(into, pos, n); if (s < 0) { edm::Exception ex(edm::errors::FileReadError); ex << "XrdClient::Read(name='" << m_name << "', offset=" << m_offset << ", n=" << n << ") failed with error '" << m_client->LastServerError()->errmsg << "' (errno=" << m_client->LastServerError()->errnum << ")"; ex.addContext("Calling XrdFile::read()"); addConnection(ex); throw ex; } return s; }
Read from the input stream into multiple scattered buffers. There are buffers to fill in an array starting at into; the memory those buffers occupy does not need to be contiguous. The buffers are filled in the order given, eac buffer is filled fully before the subsequent buffers.
If this is a blocking stream, the call will block until some data can be read, end of input is reached, or an exception is thrown. For a non-blocking stream the available input is returned. If none is available, an exception is thrown.
The base class implementation uses read(void *, IOSize) method, but derived classes may implement a more efficient alternative.
In | case of error, a #IOError exception is thrown. However if some data has already been read, the error is swallowed and the method returns the data read so far. It is assumed that persistent errors will occur anyway on the next read and sporadic errors like stream becoming unvailable can be ignored. Use xread() if a different policy is desirable. |
Reimplemented from IOInput.
Definition at line 41 of file XrdReadv.cc.
References data, i, n, Storage::size(), and IOBuffer::size().
IOSize XrdFile::readv | ( | IOPosBuffer * | into, |
IOSize | n | ||
) | [virtual] |
Reimplemented from Storage.
Definition at line 60 of file XrdReadv.cc.
References addConnection(), cms::Exception::addContext(), data, IOPosBuffer::data(), edm::errors::FileReadError, patZpeak::handle, i, m_client, m_name, n, IOPosBuffer::offset(), evf::evtn::offset(), IOInput::read(), readv_send(), IOPosBuffer::size(), Storage::size(), and unlikely.
{ assert(m_client); // A trivial vector read - unlikely, considering ROOT data format. if (unlikely(n == 0)) { return 0; } if (unlikely(n == 1)) { return read(into[0].data(), into[0].size(), into[0].offset()); } // The main challenge here is to turn the request into a form that can be // fed to the Xrootd connection. In particular, Xrootd has a limit on the // maximum number of chunks and the maximum size of each chunk. Hence, the // loop unrolling. // IOSize total_len = 0; readahead_list read_chunk_list[READV_MAXCHUNKS]; char *result_list[READV_MAXCHUNKS]; IOSize chunk_off = 0; IOSize readv_total_len = 0; const char * handle = m_client->GetHandle(); // also - 16 bytes offset from the location of m_client. for (IOSize i = 0; i < n; ++i) { IOSize len = into[i].size(); if (unlikely(len > 0x7fffffff)) { edm::Exception ex(edm::errors::FileReadError); ex << "XrdFile::readv(name='" << m_name << "')[" << i << "].size=" << len << " exceeds read size limit 0x7fffffff"; ex.addContext("Calling XrdFile::readv()"); addConnection(ex); throw ex; } IOOffset off = into[i].offset(); char *chunk_data = static_cast<char *>(into[i].data()); while (len > 0) { // Iterate as long as there is additional data to read. // Each iteration will read up to READV_MAXCHUNKSIZE of this request. IOSize chunk_size = len > READV_MAXCHUNKSIZE ? READV_MAXCHUNKSIZE : len; len -= chunk_size; readv_total_len += chunk_size; read_chunk_list[chunk_off].rlen = chunk_size; read_chunk_list[chunk_off].offset = off; result_list[chunk_off] = chunk_data; chunk_data += chunk_size; off += chunk_size; memcpy(&(read_chunk_list[chunk_off].fhandle), handle, 4); chunk_off++; if (chunk_off == READV_MAXCHUNKS) { // Now that we have broken the readv into Xrootd-sized chunks, send the actual command. // readv_send will also parse the response and place the data into the result_list buffers. IOSize tmp_total_len = readv_send(result_list, *read_chunk_list, chunk_off, readv_total_len); total_len += tmp_total_len; if (tmp_total_len != readv_total_len) return total_len; readv_total_len = 0; chunk_off = 0; } } } // Do the actual readv for all remaining chunks. if (chunk_off) { total_len += readv_send(result_list, *read_chunk_list, chunk_off, readv_total_len); } return total_len; }
IOSize XrdFile::readv_send | ( | char ** | result_buffer, |
readahead_list & | read_chunk_list, | ||
IOSize | n, | ||
IOSize | total_len | ||
) | [private] |
Definition at line 136 of file XrdReadv.cc.
References FALSE, m_client, m_readv_mutex, readv_unpack(), evf::utils::sid, and summarizeEdmComparisonLogfiles::success.
Referenced by readv().
{ // Per the xrootd protocol document: // Sending requests using the same streamid when a kXR_oksofar status code has been // returned may produced unpredictable results. A client must serialize all requests // using the streamid in the presence of partial results. XrdClientConn *xrdc = m_client->GetClientConn(); ClientRequest readvFileRequest; memset( &readvFileRequest, 0, sizeof(readvFileRequest) ); kXR_unt16 sid = ConnectionManager->SidManager()->GetNewSid(); memcpy(readvFileRequest.header.streamid, &sid, sizeof(kXR_unt16)); readvFileRequest.header.requestid = kXR_readv; readvFileRequest.readv.dlen = n * sizeof(struct readahead_list); std::vector<char> res_buf; res_buf.reserve( total_len + (n * sizeof(struct readahead_list)) ); // Encode, then send the command. clientMarshallReadAheadList(&read_chunk_list, readvFileRequest.readv.dlen); bool success; IOSize data_length; { MutexSentry sentry(m_readv_mutex); success = xrdc->SendGenCommand(&readvFileRequest, &read_chunk_list, 0, (void *)&(res_buf[0]), FALSE, (char *)"ReadV"); data_length = xrdc->LastServerResp.dlen; } clientUnMarshallReadAheadList(&read_chunk_list, readvFileRequest.readv.dlen); ConnectionManager->SidManager()->ReleaseSid(sid); if (success) { return readv_unpack(result_list, res_buf, data_length, read_chunk_list, n); } else { return 0; } }
IOSize XrdFile::readv_unpack | ( | char ** | result_buffer, |
std::vector< char > & | res_buf, | ||
IOSize | datalen, | ||
readahead_list & | read_chunk_list, | ||
IOSize | n | ||
) | [private] |
Definition at line 181 of file XrdReadv.cc.
References addConnection(), cms::Exception::addContext(), edm::errors::FileReadError, i, m_name, n, evf::evtn::offset(), and unlikely.
Referenced by readv_send().
{ IOSize response_offset = 0; IOSize total_len = 0; for (IOSize i = 0; i < n; i++) { if (unlikely(response_offset + sizeof(struct readahead_list) > response_length)) { edm::Exception ex(edm::errors::FileReadError); ex << "XrdFile::readv(name='" << m_name << "')[" << i << "] returned an incorrectly-sized response (short header)"; ex.addContext("Calling XrdFile::readv()"); addConnection(ex); } kXR_int64 offset; kXR_int32 rlen; { // Done as a separate block so response is not used later - as it is all in network order! const readahead_list *response = reinterpret_cast<struct readahead_list*>(&result_buf[response_offset]); offset = ntohll(response->offset); rlen = ntohl(response->rlen); } // Sanity / consistency checks; verify the results correspond to the requested chunk // Also check that the response buffer is sufficient large to read from. if (unlikely((&read_chunk_list)[i].offset != offset)) { edm::Exception ex(edm::errors::FileReadError); ex << "XrdFile::readv(name='" << m_name << "')[" << i << "] returned offset " << offset << " does not match requested offset " << (&read_chunk_list)[i].offset; ex.addContext("Calling XrdFile::readv()"); addConnection(ex); throw ex; } if (unlikely((&read_chunk_list)[i].rlen != rlen)) { edm::Exception ex(edm::errors::FileReadError); ex << "XrdFile::readv(name='" << m_name << "')[" << i << "] returned size " << rlen << " does not match requested size " << (&read_chunk_list)[i].rlen; ex.addContext("Calling XrdFile::readv()"); addConnection(ex); throw ex; } if (unlikely(response_offset + rlen > response_length)) { edm::Exception ex(edm::errors::FileReadError); ex << "XrdFile::readv(name='" << m_name << "')[" << i << "] returned an incorrectly-sized response (short data)"; ex.addContext("Calling XrdFile::readv()"); addConnection(ex); } response_offset += sizeof(struct readahead_list); // Data is stored after header. total_len += rlen; // Copy the data into place; increase the offset. memcpy(result_list[i], &result_buf[response_offset], rlen); response_offset += rlen; } return total_len; }
void XrdFile::resize | ( | IOOffset | size | ) | [virtual] |
Implements Storage.
Definition at line 393 of file XrdFile.cc.
References addConnection(), cms::Exception::addContext(), and m_name.
{ cms::Exception ex("FileResizeError"); ex << "XrdFile::resize(name='" << m_name << "') not implemented"; ex.addContext("Calling XrdFile::resize()"); addConnection(ex); throw ex; }
Reimplemented from Storage.
Definition at line 301 of file XrdFile.cc.
References addConnection(), cms::Exception::addContext(), m_client, m_name, m_stat, and alignCSCRings::s.
{ if (n > 0x7fffffff) { cms::Exception ex("FileWriteError"); ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff"; ex.addContext("Calling XrdFile::write()"); addConnection(ex); throw ex; } ssize_t s = m_client->Write(from, pos, n); if (s < 0) { cms::Exception ex("FileWriteError"); ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") failed with error '" << m_client->LastServerError()->errmsg << "' (errno=" << m_client->LastServerError()->errnum << ")"; ex.addContext("Calling XrdFile::write()"); addConnection(ex); throw ex; } if (pos + s > m_stat.size) m_stat.size = pos + s; return s; }
Write n bytes of data starting at address from.
In | case of error, an exception is thrown. However if the stream is in non-blocking mode and cannot accept output, it will not throw an exception -- the return value will be less than requested. |
Implements IOOutput.
Definition at line 273 of file XrdFile.cc.
References addConnection(), cms::Exception::addContext(), m_client, m_name, m_offset, m_stat, and alignCSCRings::s.
{ if (n > 0x7fffffff) { cms::Exception ex("FileWriteError"); ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff"; ex.addContext("Calling XrdFile::write()"); addConnection(ex); throw ex; } ssize_t s = m_client->Write(from, m_offset, n); if (s < 0) { cms::Exception ex("FileWriteError"); ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") failed with error '" << m_client->LastServerError()->errmsg << "' (errno=" << m_client->LastServerError()->errnum << ")"; ex.addContext("Calling XrdFile::write()"); addConnection(ex); throw ex; } m_offset += s; if (m_offset > m_stat.size) m_stat.size = m_offset; return s; }
XrdClient* XrdFile::m_client [private] |
Definition at line 61 of file XrdFile.h.
Referenced by abort(), addConnection(), close(), open(), position(), prefetch(), read(), readv(), readv_send(), and write().
bool XrdFile::m_close [private] |
std::string XrdFile::m_name [private] |
Definition at line 65 of file XrdFile.h.
Referenced by close(), open(), read(), readv(), readv_unpack(), resize(), write(), and ~XrdFile().
IOOffset XrdFile::m_offset [private] |
pthread_mutex_t XrdFile::m_readv_mutex [private] |
Definition at line 68 of file XrdFile.h.
Referenced by readv_send(), XrdFile(), and ~XrdFile().
XrdClientStatInfo XrdFile::m_stat [private] |