CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_0/src/Utilities/RFIOAdaptor/src/RFIOFile.cc

Go to the documentation of this file.
00001 #define __STDC_LIMIT_MACROS 1
00002 #include "Utilities/RFIOAdaptor/interface/RFIOFile.h"
00003 #include "Utilities/RFIOAdaptor/interface/RFIO.h"
00004 #include "FWCore/Utilities/interface/Exception.h"
00005 #include "FWCore/Utilities/interface/EDMException.h"
00006 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00007 #include <cerrno>
00008 #include <unistd.h>
00009 #include <stdint.h>
00010 #include <time.h>
00011 #include <sys/time.h>
00012 
00013 #include <cstring>
00014 #include <vector>
00015 
00016 static double realNanoSecs (void)
00017 {
00018 #if _POSIX_TIMERS > 0
00019   struct timespec tm;
00020   if (clock_gettime(CLOCK_REALTIME, &tm) == 0)
00021     return tm.tv_sec * 1e9 + tm.tv_nsec;
00022 #else
00023   struct timeval tm;
00024   if (gettimeofday(&tm, 0) == 0)
00025     return tm.tv_sec * 1e9 + tm.tv_usec * 1e3;
00026 #endif
00027   return 0;
00028 }
00029 
00030 RFIOFile::RFIOFile (void)
00031   : m_fd (EDM_IOFD_INVALID),
00032     m_close (false),
00033     m_flags (0),
00034     m_perms (0),
00035     m_curpos (0)
00036 {}
00037 
00038 RFIOFile::RFIOFile (IOFD fd)
00039   : m_fd (fd),
00040     m_close (true),
00041     m_flags (0),
00042     m_perms (0),
00043     m_curpos (0)
00044 {}
00045 
00046 RFIOFile::RFIOFile (const char *name,
00047                     int flags /* = IOFlags::OpenRead */,
00048                     int perms /* = 066 */)
00049   : m_fd (EDM_IOFD_INVALID),
00050     m_close (false),
00051     m_flags (0),
00052     m_perms (0),
00053     m_curpos (0)
00054 { open (name, flags, perms); }
00055 
00056 RFIOFile::RFIOFile (const std::string &name,
00057                     int flags /* = IOFlags::OpenRead */,
00058                     int perms /* = 066 */)
00059   : m_fd (EDM_IOFD_INVALID),
00060     m_close (false),
00061     m_flags (0),
00062     m_perms (0),
00063     m_curpos (0)
00064 { open (name.c_str (), flags, perms); }
00065 
00066 RFIOFile::~RFIOFile (void)
00067 {
00068   if (m_close)
00069     edm::LogError("RFIOFileError")
00070       << "Destructor called on RFIO file '" << m_name
00071       << "' but the file is still open";
00072 }
00073 
00075 
00076 void
00077 RFIOFile::create (const char *name,
00078                   bool exclusive /* = false */,
00079                   int perms /* = 066 */)
00080 {
00081   open (name,
00082         (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate
00083          | (exclusive ? IOFlags::OpenExclusive : 0)),
00084         perms);
00085 }
00086 
00087 void
00088 RFIOFile::create (const std::string &name,
00089                   bool exclusive /* = false */,
00090                   int perms /* = 066 */)
00091 {
00092   open (name.c_str (),
00093         (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate
00094          | (exclusive ? IOFlags::OpenExclusive : 0)),
00095         perms);
00096 }
00097 
00098 void
00099 RFIOFile::open (const std::string &name,
00100                 int flags /* = IOFlags::OpenRead */,
00101                 int perms /* = 066 */)
00102 { open (name.c_str (), flags, perms); }
00103 
00104 void
00105 RFIOFile::open (const char *name,
00106                 int flags /* = IOFlags::OpenRead */,
00107                 int perms /* = 066 */)
00108 {
00109   // Save parameters for error recovery.
00110   if (name == 0) {
00111     m_name = "";
00112   } else {
00113     m_name = name;
00114   }
00115   m_flags = flags;
00116   m_perms = perms;
00117 
00118   // Reset RFIO error code.
00119   serrno = 0;
00120 
00121   // Disable buffering in rfio library?  Note that doing this on
00122   // one file disables it for everything.  Not much we can do...
00123   // but it does make a significant performance difference to the
00124   // clients.  Note also that docs say the flag turns off write
00125   // buffering -- this turns off all buffering.
00126   if (flags & IOFlags::OpenUnbuffered)
00127   {
00128     int readopt = 0;
00129     rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
00130   }
00131   else 
00132   {
00133     int readopt = 1;
00134     rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
00135   }
00136 
00137   if ((name == 0) || (*name == 0)) {
00138     edm::Exception ex(edm::errors::FileOpenError);
00139     ex << "Cannot open a file without a name";
00140     ex.addContext("Calling RFIOFile::open()");
00141     throw ex;
00142   }
00143   if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) {
00144     edm::Exception ex(edm::errors::FileOpenError);
00145     ex << "Must open file '" << name << "' at least for read or write";
00146     ex.addContext("Calling RFIOFile::open()");
00147     throw ex;
00148   }
00149   std::string lname (name);
00150   if (lname.find ("//") == 0)
00151     lname.erase(0, 1);
00152 
00153   // If I am already open, close old file first
00154   if (m_fd != EDM_IOFD_INVALID && m_close)
00155     close ();
00156 
00157   // Translate our flags to system flags
00158   int openflags = 0;
00159 
00160   if ((flags & IOFlags::OpenRead) && (flags & IOFlags::OpenWrite))
00161     openflags |= O_RDWR;
00162   else if (flags & IOFlags::OpenRead)
00163     openflags |= O_RDONLY;
00164   else if (flags & IOFlags::OpenWrite)
00165     openflags |= O_WRONLY;
00166 
00167   if (flags & IOFlags::OpenNonBlock)
00168     openflags |= O_NONBLOCK;
00169 
00170   if (flags & IOFlags::OpenAppend)
00171     openflags |= O_APPEND;
00172 
00173   if (flags & IOFlags::OpenCreate)
00174     openflags |= O_CREAT;
00175 
00176   if (flags & IOFlags::OpenExclusive)
00177     openflags |= O_EXCL;
00178 
00179   if (flags & IOFlags::OpenTruncate)
00180     openflags |= O_TRUNC;
00181 
00182   IOFD newfd = EDM_IOFD_INVALID;
00183   if ((newfd = rfio_open64 (lname.c_str(), openflags, perms)) == -1) {
00184     edm::Exception ex(edm::errors::FileOpenError);
00185     ex << "rfio_open(name='" << lname
00186        << "', flags=0x" << std::hex << openflags
00187        << ", permissions=0" << std::oct << perms << std::dec
00188        << ") => error '" << rfio_serror ()
00189        << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00190     ex.addContext("Calling RFIOFile::open()");
00191     throw ex;
00192   }
00193   m_fd = newfd;
00194   m_close = true;
00195   m_curpos = 0;
00196 
00197   edm::LogInfo("RFIOFileInfo") << "Opened " << lname;
00198 }
00199 
00200 void
00201 RFIOFile::close (void)
00202 {
00203   if (m_fd == EDM_IOFD_INVALID)
00204   {
00205     edm::LogError("RFIOFileError")
00206       << "RFIOFile::close(name='" << m_name
00207       << "') called but the file is not open";
00208     m_close = false;
00209     return;
00210   }
00211 
00212   serrno = 0;
00213   if (rfio_close64 (m_fd) == -1)
00214   {
00215     // If we fail to close the file, report a warning.
00216     edm::LogWarning("RFIOFileWarning")
00217       << "rfio_close64(name='" << m_name
00218       << "') failed with error '" << rfio_serror()
00219       << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00220 
00221     // When rfio_close64 fails then try the system close function as
00222     // per the advice from Olof Barring from the Castor operations.
00223     int status = ::close(m_fd);
00224     if (status < 0)
00225       edm::LogWarning("RFIOFileWarning")
00226         << "RFIOFile::close(): system level close after a failed"
00227         << " rfio_close64 also failed with error '" << strerror (errno)
00228         << "' (error code " << errno << ")";
00229     else
00230       edm::LogWarning("RFIOFileWarning")
00231         << "RFIOFile::close(): system level close after a failed"
00232         << " rfio_close64 succeeded";
00233 
00234     sleep(5);
00235   }
00236 
00237   m_close = false;
00238   m_fd = EDM_IOFD_INVALID;
00239 
00240   // Caused hang.  Will be added back after problem is fix
00241   // edm::LogInfo("RFIOFileInfo") << "Closed " << m_name;
00242 }
00243 
00244 void
00245 RFIOFile::abort (void)
00246 {
00247   serrno = 0;
00248   if (m_fd != EDM_IOFD_INVALID)
00249     rfio_close64 (m_fd);
00250 
00251   m_close = false;
00252   m_fd = EDM_IOFD_INVALID;
00253 }
00254 
00255 void RFIOFile::reopen (void)
00256 {
00257   // Remember the current position in the file
00258   IOOffset lastpos = m_curpos;
00259   close();
00260   sleep(5);
00261   open(m_name, m_flags, m_perms);
00262 
00263   // Set the position back to the same place it was
00264   // before the file closed and opened.
00265   position(lastpos);
00266 }
00267 
00268 ssize_t
00269 RFIOFile::retryRead (void *into, IOSize n, int maxRetry /* = 10 */)
00270 {
00271   // Attempt to read up to maxRetry times.
00272   ssize_t s;
00273   do
00274   {
00275     serrno = 0;
00276     s = rfio_read64 (m_fd, into, n);
00277     if ((s == -1 && serrno == 1004) || (s > ssize_t (n)))
00278     {
00279       // Wait a little while to allow Castor to recover from the timeout.
00280       const char *sleepTimeMsg;
00281       int secondsToSleep = 5;
00282       switch (maxRetry)
00283       {
00284       case 1:
00285         sleepTimeMsg = "10 minutes";
00286         secondsToSleep = 600;
00287         break;
00288 
00289       case 2:
00290         sleepTimeMsg = "5 minutes";
00291         secondsToSleep = 300;
00292         break;
00293 
00294       default:
00295         sleepTimeMsg = "1 minute";
00296         secondsToSleep = 60;
00297       }
00298 
00299       edm::LogWarning("RFIOFileRetry")
00300         << "RFIOFile retrying read\n"
00301         << "  return value from rfio_read64 = " << s << " (normally this is bytes read, -1 for error)\n"
00302         << "  bytes requested = " << n << "  (this and bytes read are equal unless error or EOF)\n"
00303         << "  rfio error message = " << rfio_serror() << " (explanation from server, if possible)\n"
00304         << "  serrno = " << serrno << " (rfio server error code, 0 = OK, 1004 = timeout, ...)\n"
00305         << "  rfio_errno = " << rfio_errno << " (rfio error from actually accessing the file)\n"
00306         << "  current position = " << m_curpos << " (in bytes, beginning of file is 0)\n"
00307         << "  retries left before quitting = " << maxRetry << "\n"
00308         << "  will close and reopen file " << m_name << "\n"
00309         << "  will sleep for " << sleepTimeMsg << " before attempting retry";
00310       edm::FlushMessageLog();
00311       sleep(secondsToSleep);
00312 
00313       // Improve the chances of success by closing and reopening
00314       // the file before retrying the read.  This also resets
00315       // the position in the file to the correct place.
00316       reopen();
00317     }
00318     else
00319       break;
00320   } while (--maxRetry > 0);
00321 
00322   return s;
00323 }
00324 
00325 IOSize
00326 RFIOFile::read (void *into, IOSize n)
00327 {
00328   // Be aware that when enabled these LogDebug prints
00329   // will take more time than the read itself unless the reads
00330   // are proceeding slower than under optimal conditions.
00331   LogDebug("RFIOFileDebug") << "Entering RFIOFile read()";
00332   double start = realNanoSecs();
00333 
00334   ssize_t s;
00335   serrno = 0;
00336   if ((s = retryRead (into, n, 3)) < 0) {
00337     edm::Exception ex(edm::errors::FileReadError);
00338     ex << "rfio_read(name='" << m_name << "', n=" << n << ") failed"
00339        << " at position " << m_curpos << " with error '" << rfio_serror()
00340        << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00341     ex.addContext("Calling RFIOFile::read()");
00342     throw ex;
00343   }
00344   m_curpos += s;
00345 
00346   double end = realNanoSecs();
00347   LogDebug("RFIOFileDebug")
00348     << "Exiting RFIOFile read(), elapsed time = " << end - start
00349     << " ns, bytes read = " << s << ", file position = " << m_curpos;
00350 
00351   return s;
00352 }
00353 
00354 IOSize
00355 RFIOFile::readv (IOPosBuffer *into, IOSize buffers)
00356 {
00357   if (! (m_flags & IOFlags::OpenUnbuffered))
00358     prefetch(into, buffers);
00359   return Storage::readv(into, buffers);
00360 }
00361 
00362 IOSize
00363 RFIOFile::write (const void *from, IOSize n)
00364 {
00365   serrno = 0;
00366   ssize_t s = rfio_write64 (m_fd, from, n);
00367   if (s < 0) {
00368     cms::Exception ex("FileWriteError");
00369     ex << "rfio_write(name='" << m_name << "', n=" << n << ") failed"
00370        << " at position " << m_curpos << " with error '" << rfio_serror()
00371        << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00372     ex.addContext("Calling RFIOFile::write()");
00373     throw ex;
00374   }
00375   return s;
00376 }
00377 
00381 IOOffset
00382 RFIOFile::position (IOOffset offset, Relative whence /* = SET */)
00383 {
00384   if (m_fd == EDM_IOFD_INVALID) {
00385     cms::Exception ex("FilePositionError");
00386     ex << "RFIOFile::position() called on a closed file";
00387     throw ex;
00388   }
00389   if (whence != CURRENT && whence != SET && whence != END) {
00390     cms::Exception ex("FilePositionError");
00391     ex << "RFIOFile::position() called with incorrect 'whence' parameter";
00392     throw ex;
00393   }
00394   IOOffset      result;
00395   int           mywhence = (whence == SET ? SEEK_SET
00396                             : whence == CURRENT ? SEEK_CUR
00397                             : SEEK_END);
00398 
00399   serrno = 0;
00400   if ((result = rfio_lseek64 (m_fd, offset, mywhence)) == -1) {
00401     cms::Exception ex("FilePositionError");
00402     ex << "rfio_lseek(name='" << m_name << "', offset=" << offset
00403        << ", whence=" << mywhence << ") failed at position "
00404        << m_curpos << " with error '" << rfio_serror()
00405        << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00406     ex.addContext("Calling RFIOFile::position()");
00407     throw ex;
00408   }
00409   m_curpos = result;
00410   return result;
00411 }
00412 
00413 void
00414 RFIOFile::resize (IOOffset /* size */)
00415 {
00416   cms::Exception ex("FileResizeError");
00417   ex << "RFIOFile::resize(name='" << m_name << "') not implemented";
00418   throw ex;
00419 }
00420 
00421 bool
00422 RFIOFile::prefetch (const IOPosBuffer *what, IOSize n)
00423 {
00424   if (rfioreadopt (RFIO_READOPT) != 1) {
00425     cms::Exception ex("FilePrefetchError");
00426     ex << "RFIOFile::prefetch() called but RFIO_READOPT="
00427        << rfioreadopt (RFIO_READOPT) << " (must be 1)";
00428     throw ex;
00429   }
00430   std::vector<iovec64> iov (n);
00431   for (IOSize i = 0; i < n; ++i)
00432   {
00433     iov[i].iov_base = what[i].offset();
00434     iov[i].iov_len = what[i].size();
00435   }
00436 
00437   serrno = 0;
00438   int retry = 5;
00439   int result;
00440   while ((result = rfio_preseek64(m_fd, &iov[0], n)) == -1)
00441   {
00442     if (--retry == 0)
00443     {
00444       edm::LogError("RFIOFile::prefetch")
00445         << "RFIOFile::prefetch(name='" << m_name << "') failed with error '"
00446         << rfio_serror() << "' (rfio_errno=" << rfio_errno
00447         << ", serrno=" << serrno << ")";
00448       return false;
00449     }
00450     else
00451     {
00452       edm::LogWarning("RFIOFileRetry")
00453         << "RFIOFile::prefetch(name='" << m_name << "') failed at position "
00454         << m_curpos << " with error '" << rfio_serror()
00455         << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno
00456         << "); retrying " << (retry+1) << " times";
00457       serrno = 0;
00458       sleep(5);
00459     }
00460   }
00461 
00462   return true;
00463 }