#include <RFIOFile.h>
Public Member Functions | |
virtual void | abort (void) |
virtual void | close (void) |
virtual void | create (const char *name, bool exclusive=false, int perms=0666) |
virtual void | create (const std::string &name, bool exclusive=false, int perms=0666) |
virtual void | open (const std::string &name, int flags=IOFlags::OpenRead, int perms=0666) |
virtual void | open (const char *name, int flags=IOFlags::OpenRead, int perms=0666) |
virtual IOOffset | position (IOOffset offset, Relative whence=SET) |
virtual bool | prefetch (const IOPosBuffer *what, IOSize n) |
virtual IOSize | read (void *into, IOSize n) |
virtual IOSize | readv (IOPosBuffer *into, IOSize buffers) |
virtual void | resize (IOOffset size) |
RFIOFile (IOFD fd) | |
RFIOFile (void) | |
RFIOFile (const std::string &name, int flags=IOFlags::OpenRead, int perms=0666) | |
RFIOFile (const char *name, int flags=IOFlags::OpenRead, int perms=0666) | |
virtual IOSize | write (const void *from, IOSize n) |
~RFIOFile (void) | |
Private Member Functions | |
void | reopen () |
ssize_t | retryRead (void *into, IOSize n, int maxRetry=10) |
Private Attributes | |
bool | m_close |
IOOffset | m_curpos |
IOFD | m_fd |
int | m_flags |
std::string | m_name |
int | m_perms |
RFIO Storage object.
Definition at line 9 of file RFIOFile.h.
RFIOFile::RFIOFile | ( | void | ) |
Definition at line 30 of file RFIOFile.cc.
RFIOFile::RFIOFile | ( | IOFD | fd | ) |
RFIOFile::RFIOFile | ( | const char * | name, |
int | flags = IOFlags::OpenRead , |
||
int | perms = 0666 |
||
) |
RFIOFile::RFIOFile | ( | const std::string & | name, |
int | flags = IOFlags::OpenRead , |
||
int | perms = 0666 |
||
) |
RFIOFile::~RFIOFile | ( | void | ) |
Definition at line 66 of file RFIOFile.cc.
References m_close, and m_name.
{ if (m_close) edm::LogError("RFIOFileError") << "Destructor called on RFIO file '" << m_name << "' but the file is still open"; }
void RFIOFile::abort | ( | void | ) | [virtual] |
Definition at line 245 of file RFIOFile.cc.
References EDM_IOFD_INVALID, m_close, m_fd, rfio_close64(), and serrno.
{ serrno = 0; if (m_fd != EDM_IOFD_INVALID) rfio_close64 (m_fd); m_close = false; m_fd = EDM_IOFD_INVALID; }
void RFIOFile::close | ( | void | ) | [virtual] |
Reimplemented from Storage.
Definition at line 201 of file RFIOFile.cc.
References EDM_IOFD_INVALID, m_close, m_fd, m_name, rfio_close64(), rfio_errno, rfio_serror(), serrno, stor::utils::sleep(), and ntuplemaker::status.
Referenced by open(), and reopen().
{ if (m_fd == EDM_IOFD_INVALID) { edm::LogError("RFIOFileError") << "RFIOFile::close(name='" << m_name << "') called but the file is not open"; m_close = false; return; } serrno = 0; if (rfio_close64 (m_fd) == -1) { // If we fail to close the file, report a warning. edm::LogWarning("RFIOFileWarning") << "rfio_close64(name='" << m_name << "') failed with error '" << rfio_serror() << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")"; // When rfio_close64 fails then try the system close function as // per the advice from Olof Barring from the Castor operations. int status = ::close(m_fd); if (status < 0) edm::LogWarning("RFIOFileWarning") << "RFIOFile::close(): system level close after a failed" << " rfio_close64 also failed with error '" << strerror (errno) << "' (error code " << errno << ")"; else edm::LogWarning("RFIOFileWarning") << "RFIOFile::close(): system level close after a failed" << " rfio_close64 succeeded"; sleep(5); } m_close = false; m_fd = EDM_IOFD_INVALID; // Caused hang. Will be added back after problem is fix // edm::LogInfo("RFIOFileInfo") << "Closed " << m_name; }
void RFIOFile::create | ( | const char * | name, |
bool | exclusive = false , |
||
int | perms = 0666 |
||
) | [virtual] |
Definition at line 77 of file RFIOFile.cc.
References open(), IOFlags::OpenCreate, IOFlags::OpenExclusive, IOFlags::OpenTruncate, and IOFlags::OpenWrite.
{ open (name, (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate | (exclusive ? IOFlags::OpenExclusive : 0)), perms); }
void RFIOFile::create | ( | const std::string & | name, |
bool | exclusive = false , |
||
int | perms = 0666 |
||
) | [virtual] |
Definition at line 88 of file RFIOFile.cc.
References open(), IOFlags::OpenCreate, IOFlags::OpenExclusive, IOFlags::OpenTruncate, and IOFlags::OpenWrite.
{ open (name.c_str (), (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate | (exclusive ? IOFlags::OpenExclusive : 0)), perms); }
void RFIOFile::open | ( | const std::string & | name, |
int | flags = IOFlags::OpenRead , |
||
int | perms = 0666 |
||
) | [virtual] |
void RFIOFile::open | ( | const char * | name, |
int | flags = IOFlags::OpenRead , |
||
int | perms = 0666 |
||
) | [virtual] |
Definition at line 105 of file RFIOFile.cc.
References cms::Exception::addContext(), close(), EDM_IOFD_INVALID, edm::errors::FileOpenError, flags, m_close, m_curpos, m_fd, m_flags, m_name, m_perms, mergeVDriftHistosByStation::name, O_NONBLOCK, IOFlags::OpenAppend, IOFlags::OpenCreate, IOFlags::OpenExclusive, IOFlags::OpenNonBlock, IOFlags::OpenRead, IOFlags::OpenTruncate, IOFlags::OpenUnbuffered, IOFlags::OpenWrite, rfio_errno, rfio_open64(), RFIO_READOPT, rfio_serror(), rfiosetopt(), and serrno.
Referenced by create(), open(), reopen(), and RFIOFile().
{ // Save parameters for error recovery. if (name == 0) { m_name = ""; } else { m_name = name; } m_flags = flags; m_perms = perms; // Reset RFIO error code. serrno = 0; // Disable buffering in rfio library? Note that doing this on // one file disables it for everything. Not much we can do... // but it does make a significant performance difference to the // clients. Note also that docs say the flag turns off write // buffering -- this turns off all buffering. if (flags & IOFlags::OpenUnbuffered) { int readopt = 0; rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt)); } else { int readopt = 1; rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt)); } if ((name == 0) || (*name == 0)) { edm::Exception ex(edm::errors::FileOpenError); ex << "Cannot open a file without a name"; ex.addContext("Calling RFIOFile::open()"); throw ex; } if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) { edm::Exception ex(edm::errors::FileOpenError); ex << "Must open file '" << name << "' at least for read or write"; ex.addContext("Calling RFIOFile::open()"); throw ex; } std::string lname (name); if (lname.find ("//") == 0) lname.erase(0, 1); // If I am already open, close old file first if (m_fd != EDM_IOFD_INVALID && m_close) close (); // Translate our flags to system flags int openflags = 0; if ((flags & IOFlags::OpenRead) && (flags & IOFlags::OpenWrite)) openflags |= O_RDWR; else if (flags & IOFlags::OpenRead) openflags |= O_RDONLY; else if (flags & IOFlags::OpenWrite) openflags |= O_WRONLY; if (flags & IOFlags::OpenNonBlock) openflags |= O_NONBLOCK; if (flags & IOFlags::OpenAppend) openflags |= O_APPEND; if (flags & IOFlags::OpenCreate) openflags |= O_CREAT; if (flags & IOFlags::OpenExclusive) openflags |= O_EXCL; if (flags & IOFlags::OpenTruncate) openflags |= O_TRUNC; IOFD newfd = EDM_IOFD_INVALID; if ((newfd = rfio_open64 (lname.c_str(), openflags, perms)) == -1) { edm::Exception ex(edm::errors::FileOpenError); ex << "rfio_open(name='" << lname << "', flags=0x" << std::hex << openflags << ", permissions=0" << std::oct << perms << std::dec << ") => error '" << rfio_serror () << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")"; ex.addContext("Calling RFIOFile::open()"); throw ex; } m_fd = newfd; m_close = true; m_curpos = 0; edm::LogInfo("RFIOFileInfo") << "Opened " << lname; }
Implements Storage.
Definition at line 382 of file RFIOFile.cc.
References cms::Exception::addContext(), Storage::CURRENT, EDM_IOFD_INVALID, Storage::END, m_curpos, m_fd, m_name, query::result, rfio_errno, rfio_lseek64(), rfio_serror(), serrno, and Storage::SET.
{ if (m_fd == EDM_IOFD_INVALID) { cms::Exception ex("FilePositionError"); ex << "RFIOFile::position() called on a closed file"; throw ex; } if (whence != CURRENT && whence != SET && whence != END) { cms::Exception ex("FilePositionError"); ex << "RFIOFile::position() called with incorrect 'whence' parameter"; throw ex; } IOOffset result; int mywhence = (whence == SET ? SEEK_SET : whence == CURRENT ? SEEK_CUR : SEEK_END); serrno = 0; if ((result = rfio_lseek64 (m_fd, offset, mywhence)) == -1) { cms::Exception ex("FilePositionError"); ex << "rfio_lseek(name='" << m_name << "', offset=" << offset << ", whence=" << mywhence << ") failed at position " << m_curpos << " with error '" << rfio_serror() << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")"; ex.addContext("Calling RFIOFile::position()"); throw ex; } m_curpos = result; return result; }
bool RFIOFile::prefetch | ( | const IOPosBuffer * | what, |
IOSize | n | ||
) | [virtual] |
Reimplemented from Storage.
Definition at line 422 of file RFIOFile.cc.
References i, o2o::iov, m_curpos, m_fd, m_name, n, IOPosBuffer::offset(), query::result, rfio_errno, rfio_preseek64(), RFIO_READOPT, rfio_serror(), rfioreadopt(), serrno, IOPosBuffer::size(), and stor::utils::sleep().
Referenced by readv().
{ if (rfioreadopt (RFIO_READOPT) != 1) { cms::Exception ex("FilePrefetchError"); ex << "RFIOFile::prefetch() called but RFIO_READOPT=" << rfioreadopt (RFIO_READOPT) << " (must be 1)"; throw ex; } std::vector<iovec64> iov (n); for (IOSize i = 0; i < n; ++i) { iov[i].iov_base = what[i].offset(); iov[i].iov_len = what[i].size(); } serrno = 0; int retry = 5; int result; while ((result = rfio_preseek64(m_fd, &iov[0], n)) == -1) { if (--retry == 0) { edm::LogError("RFIOFile::prefetch") << "RFIOFile::prefetch(name='" << m_name << "') failed with error '" << rfio_serror() << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")"; return false; } else { edm::LogWarning("RFIOFileRetry") << "RFIOFile::prefetch(name='" << m_name << "') failed at position " << m_curpos << " with error '" << rfio_serror() << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << "); retrying " << (retry+1) << " times"; serrno = 0; sleep(5); } } return true; }
Read into into at most n number of bytes.
If this is a blocking stream, the call will block until some data can be read, end of input is reached, or an exception is thrown. For a non-blocking stream the available input is returned. If none is available, an exception is thrown.
In | case of error, a #IOError exception is thrown. This includes the situation where the input stream is in non-blocking mode and no input is currently available (FIXME: make this simpler; clarify which exception). |
Implements IOInput.
Definition at line 326 of file RFIOFile.cc.
References cms::Exception::addContext(), end, edm::errors::FileReadError, LogDebug, m_curpos, m_name, realNanoSecs(), retryRead(), rfio_errno, rfio_serror(), alignCSCRings::s, serrno, and dqm_diff::start.
{ // Be aware that when enabled these LogDebug prints // will take more time than the read itself unless the reads // are proceeding slower than under optimal conditions. LogDebug("RFIOFileDebug") << "Entering RFIOFile read()"; double start = realNanoSecs(); ssize_t s; serrno = 0; if ((s = retryRead (into, n, 3)) < 0) { edm::Exception ex(edm::errors::FileReadError); ex << "rfio_read(name='" << m_name << "', n=" << n << ") failed" << " at position " << m_curpos << " with error '" << rfio_serror() << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")"; ex.addContext("Calling RFIOFile::read()"); throw ex; } m_curpos += s; double end = realNanoSecs(); LogDebug("RFIOFileDebug") << "Exiting RFIOFile read(), elapsed time = " << end - start << " ns, bytes read = " << s << ", file position = " << m_curpos; return s; }
IOSize RFIOFile::readv | ( | IOPosBuffer * | into, |
IOSize | buffers | ||
) | [virtual] |
Reimplemented from Storage.
Definition at line 355 of file RFIOFile.cc.
References m_flags, IOFlags::OpenUnbuffered, and prefetch().
{ if (! (m_flags & IOFlags::OpenUnbuffered)) prefetch(into, buffers); return Storage::readv(into, buffers); }
void RFIOFile::reopen | ( | void | ) | [private] |
Definition at line 255 of file RFIOFile.cc.
References close(), m_curpos, m_flags, m_name, m_perms, open(), Storage::position(), and stor::utils::sleep().
Referenced by retryRead().
void RFIOFile::resize | ( | IOOffset | size | ) | [virtual] |
Implements Storage.
Definition at line 414 of file RFIOFile.cc.
References m_name.
{ cms::Exception ex("FileResizeError"); ex << "RFIOFile::resize(name='" << m_name << "') not implemented"; throw ex; }
ssize_t RFIOFile::retryRead | ( | void * | into, |
IOSize | n, | ||
int | maxRetry = 10 |
||
) | [private] |
Definition at line 269 of file RFIOFile.cc.
References edm::FlushMessageLog(), m_curpos, m_fd, m_name, reopen(), rfio_errno, rfio_read64(), rfio_serror(), alignCSCRings::s, serrno, and stor::utils::sleep().
Referenced by read().
{ // Attempt to read up to maxRetry times. ssize_t s; do { serrno = 0; s = rfio_read64 (m_fd, into, n); if ((s == -1 && serrno == 1004) || (s > ssize_t (n))) { // Wait a little while to allow Castor to recover from the timeout. const char *sleepTimeMsg; int secondsToSleep = 5; switch (maxRetry) { case 1: sleepTimeMsg = "10 minutes"; secondsToSleep = 600; break; case 2: sleepTimeMsg = "5 minutes"; secondsToSleep = 300; break; default: sleepTimeMsg = "1 minute"; secondsToSleep = 60; } edm::LogWarning("RFIOFileRetry") << "RFIOFile retrying read\n" << " return value from rfio_read64 = " << s << " (normally this is bytes read, -1 for error)\n" << " bytes requested = " << n << " (this and bytes read are equal unless error or EOF)\n" << " rfio error message = " << rfio_serror() << " (explanation from server, if possible)\n" << " serrno = " << serrno << " (rfio server error code, 0 = OK, 1004 = timeout, ...)\n" << " rfio_errno = " << rfio_errno << " (rfio error from actually accessing the file)\n" << " current position = " << m_curpos << " (in bytes, beginning of file is 0)\n" << " retries left before quitting = " << maxRetry << "\n" << " will close and reopen file " << m_name << "\n" << " will sleep for " << sleepTimeMsg << " before attempting retry"; edm::FlushMessageLog(); sleep(secondsToSleep); // Improve the chances of success by closing and reopening // the file before retrying the read. This also resets // the position in the file to the correct place. reopen(); } else break; } while (--maxRetry > 0); return s; }
Write n bytes of data starting at address from.
In | case of error, an exception is thrown. However if the stream is in non-blocking mode and cannot accept output, it will not throw an exception -- the return value will be less than requested. |
Implements IOOutput.
Definition at line 363 of file RFIOFile.cc.
References cms::Exception::addContext(), m_curpos, m_fd, m_name, rfio_errno, rfio_serror(), rfio_write64(), alignCSCRings::s, and serrno.
{ serrno = 0; ssize_t s = rfio_write64 (m_fd, from, n); if (s < 0) { cms::Exception ex("FileWriteError"); ex << "rfio_write(name='" << m_name << "', n=" << n << ") failed" << " at position " << m_curpos << " with error '" << rfio_serror() << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")"; ex.addContext("Calling RFIOFile::write()"); throw ex; } return s; }
bool RFIOFile::m_close [private] |
Definition at line 51 of file RFIOFile.h.
Referenced by abort(), close(), open(), and ~RFIOFile().
IOOffset RFIOFile::m_curpos [private] |
Definition at line 55 of file RFIOFile.h.
Referenced by open(), position(), prefetch(), read(), reopen(), retryRead(), and write().
IOFD RFIOFile::m_fd [private] |
Definition at line 50 of file RFIOFile.h.
Referenced by abort(), close(), open(), position(), prefetch(), retryRead(), and write().
int RFIOFile::m_flags [private] |
Definition at line 53 of file RFIOFile.h.
std::string RFIOFile::m_name [private] |
Definition at line 52 of file RFIOFile.h.
Referenced by close(), open(), position(), prefetch(), read(), reopen(), resize(), retryRead(), write(), and ~RFIOFile().
int RFIOFile::m_perms [private] |
Definition at line 54 of file RFIOFile.h.