CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/Utilities/RFIOAdaptor/src/RFIOFile.cc

Go to the documentation of this file.
00001 #define __STDC_LIMIT_MACROS 1
00002 #include "Utilities/RFIOAdaptor/interface/RFIOFile.h"
00003 #include "Utilities/RFIOAdaptor/interface/RFIO.h"
00004 #include "FWCore/Utilities/interface/Exception.h"
00005 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00006 #include <cerrno>
00007 #include <unistd.h>
00008 #include <stdint.h>
00009 #include <time.h>
00010 #include <sys/time.h>
00011 
00012 #include <cstring>
00013 #include <vector>
00014 
00015 static double realNanoSecs (void)
00016 {
00017 #if _POSIX_TIMERS > 0
00018   struct timespec tm;
00019   if (clock_gettime(CLOCK_REALTIME, &tm) == 0)
00020     return tm.tv_sec * 1e9 + tm.tv_nsec;
00021 #else
00022   struct timeval tm;
00023   if (gettimeofday(&tm, 0) == 0)
00024     return tm.tv_sec * 1e9 + tm.tv_usec * 1e3;
00025 #endif
00026   return 0;
00027 }
00028 
00029 RFIOFile::RFIOFile (void)
00030   : m_fd (EDM_IOFD_INVALID),
00031     m_close (false),
00032     m_flags (0),
00033     m_perms (0),
00034     m_curpos (0)
00035 {}
00036 
00037 RFIOFile::RFIOFile (IOFD fd)
00038   : m_fd (fd),
00039     m_close (true),
00040     m_flags (0),
00041     m_perms (0),
00042     m_curpos (0)
00043 {}
00044 
00045 RFIOFile::RFIOFile (const char *name,
00046                     int flags /* = IOFlags::OpenRead */,
00047                     int perms /* = 066 */)
00048   : m_fd (EDM_IOFD_INVALID),
00049     m_close (false),
00050     m_flags (0),
00051     m_perms (0),
00052     m_curpos (0)
00053 { open (name, flags, perms); }
00054 
00055 RFIOFile::RFIOFile (const std::string &name,
00056                     int flags /* = IOFlags::OpenRead */,
00057                     int perms /* = 066 */)
00058   : m_fd (EDM_IOFD_INVALID),
00059     m_close (false),
00060     m_flags (0),
00061     m_perms (0),
00062     m_curpos (0)
00063 { open (name.c_str (), flags, perms); }
00064 
00065 RFIOFile::~RFIOFile (void)
00066 {
00067   if (m_close)
00068     edm::LogError("RFIOFileError")
00069       << "Destructor called on RFIO file '" << m_name
00070       << "' but the file is still open";
00071 }
00072 
00074 
00075 void
00076 RFIOFile::create (const char *name,
00077                   bool exclusive /* = false */,
00078                   int perms /* = 066 */)
00079 {
00080   open (name,
00081         (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate
00082          | (exclusive ? IOFlags::OpenExclusive : 0)),
00083         perms);
00084 }
00085 
00086 void
00087 RFIOFile::create (const std::string &name,
00088                   bool exclusive /* = false */,
00089                   int perms /* = 066 */)
00090 {
00091   open (name.c_str (),
00092         (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate
00093          | (exclusive ? IOFlags::OpenExclusive : 0)),
00094         perms);
00095 }
00096 
00097 void
00098 RFIOFile::open (const std::string &name,
00099                 int flags /* = IOFlags::OpenRead */,
00100                 int perms /* = 066 */)
00101 { open (name.c_str (), flags, perms); }
00102 
00103 void
00104 RFIOFile::open (const char *name,
00105                 int flags /* = IOFlags::OpenRead */,
00106                 int perms /* = 066 */)
00107 {
00108   // Save parameters for error recovery.
00109   m_name = name;
00110   m_flags = flags;
00111   m_perms = perms;
00112 
00113   // Reset RFIO error code.
00114   serrno = 0;
00115 
00116   // Disable buffering in rfio library?  Note that doing this on
00117   // one file disables it for everything.  Not much we can do...
00118   // but it does make a significant performance difference to the
00119   // clients.  Note also that docs say the flag turns off write
00120   // buffering -- this turns off all buffering.
00121   if (flags & IOFlags::OpenUnbuffered)
00122   {
00123     int readopt = 0;
00124     rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
00125   }
00126   else 
00127   {
00128     int readopt = 1;
00129     rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
00130   }
00131 
00132   if ((name == 0) || (*name == 0))
00133     throw cms::Exception("RFIOFile::open()")
00134       << "Cannot open a file without a name";
00135 
00136   if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0)
00137     throw cms::Exception("RFIOFile::open()")
00138       << "Must open file '" << name << "' at least for read or write";
00139 
00140   std::string lname (name);
00141   if (lname.find ("//") == 0)
00142     lname.erase(0, 1);
00143 
00144   // If I am already open, close old file first
00145   if (m_fd != EDM_IOFD_INVALID && m_close)
00146     close ();
00147 
00148   // Translate our flags to system flags
00149   int openflags = 0;
00150 
00151   if ((flags & IOFlags::OpenRead) && (flags & IOFlags::OpenWrite))
00152     openflags |= O_RDWR;
00153   else if (flags & IOFlags::OpenRead)
00154     openflags |= O_RDONLY;
00155   else if (flags & IOFlags::OpenWrite)
00156     openflags |= O_WRONLY;
00157 
00158   if (flags & IOFlags::OpenNonBlock)
00159     openflags |= O_NONBLOCK;
00160 
00161   if (flags & IOFlags::OpenAppend)
00162     openflags |= O_APPEND;
00163 
00164   if (flags & IOFlags::OpenCreate)
00165     openflags |= O_CREAT;
00166 
00167   if (flags & IOFlags::OpenExclusive)
00168     openflags |= O_EXCL;
00169 
00170   if (flags & IOFlags::OpenTruncate)
00171     openflags |= O_TRUNC;
00172 
00173   IOFD newfd = EDM_IOFD_INVALID;
00174   if ((newfd = rfio_open64 (lname.c_str(), openflags, perms)) == -1)
00175     throw cms::Exception("RFIOFile::open()")
00176       << "rfio_open(name='" << lname
00177       << "', flags=0x" << std::hex << openflags
00178       << ", permissions=0" << std::oct << perms << std::dec
00179       << ") => error '" << rfio_serror ()
00180       << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00181 
00182   m_fd = newfd;
00183   m_close = true;
00184   m_curpos = 0;
00185 
00186   edm::LogInfo("RFIOFileInfo") << "Opened " << lname;
00187 }
00188 
00189 void
00190 RFIOFile::close (void)
00191 {
00192   if (m_fd == EDM_IOFD_INVALID)
00193   {
00194     edm::LogError("RFIOFileError")
00195       << "RFIOFile::close(name='" << m_name
00196       << "') called but the file is not open";
00197     m_close = false;
00198     return;
00199   }
00200 
00201   serrno = 0;
00202   if (rfio_close64 (m_fd) == -1)
00203   {
00204     // If we fail to close the file, report a warning.
00205     edm::LogWarning("RFIOFileWarning")
00206       << "rfio_close64(name='" << m_name
00207       << "') failed with error '" << rfio_serror()
00208       << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00209 
00210     // When rfio_close64 fails then try the system close function as
00211     // per the advice from Olof Barring from the Castor operations.
00212     int status = ::close(m_fd);
00213     if (status < 0)
00214       edm::LogWarning("RFIOFileWarning")
00215         << "RFIOFile::close(): system level close after a failed"
00216         << " rfio_close64 also failed with error '" << strerror (errno)
00217         << "' (error code " << errno << ")";
00218     else
00219       edm::LogWarning("RFIOFileWarning")
00220         << "RFIOFile::close(): system level close after a failed"
00221         << " rfio_close64 succeeded";
00222 
00223     sleep(5);
00224   }
00225 
00226   m_close = false;
00227   m_fd = EDM_IOFD_INVALID;
00228 
00229   // Caused hang.  Will be added back after problem is fix
00230   // edm::LogInfo("RFIOFileInfo") << "Closed " << m_name;
00231 }
00232 
00233 void
00234 RFIOFile::abort (void)
00235 {
00236   serrno = 0;
00237   if (m_fd != EDM_IOFD_INVALID)
00238     rfio_close64 (m_fd);
00239 
00240   m_close = false;
00241   m_fd = EDM_IOFD_INVALID;
00242 }
00243 
00244 void RFIOFile::reopen (void)
00245 {
00246   // Remember the current position in the file
00247   IOOffset lastpos = m_curpos;
00248   close();
00249   sleep(5);
00250   open(m_name, m_flags, m_perms);
00251 
00252   // Set the position back to the same place it was
00253   // before the file closed and opened.
00254   position(lastpos);
00255 }
00256 
00257 ssize_t
00258 RFIOFile::retryRead (void *into, IOSize n, int maxRetry /* = 10 */)
00259 {
00260   // Attempt to read up to maxRetry times.
00261   ssize_t s;
00262   do
00263   {
00264     serrno = 0;
00265     s = rfio_read64 (m_fd, into, n);
00266     if ((s == -1 && serrno == 1004) || (s > ssize_t (n)))
00267     {
00268       // Wait a little while to allow Castor to recover from the timeout.
00269       const char *sleepTimeMsg;
00270       int secondsToSleep = 5;
00271       switch (maxRetry)
00272       {
00273       case 1:
00274         sleepTimeMsg = "10 minutes";
00275         secondsToSleep = 600;
00276         break;
00277 
00278       case 2:
00279         sleepTimeMsg = "5 minutes";
00280         secondsToSleep = 300;
00281         break;
00282 
00283       default:
00284         sleepTimeMsg = "1 minute";
00285         secondsToSleep = 60;
00286       }
00287 
00288       edm::LogWarning("RFIOFileRetry")
00289         << "RFIOFile retrying read\n"
00290         << "  return value from rfio_read64 = " << s << " (normally this is bytes read, -1 for error)\n"
00291         << "  bytes requested = " << n << "  (this and bytes read are equal unless error or EOF)\n"
00292         << "  rfio error message = " << rfio_serror() << " (explanation from server, if possible)\n"
00293         << "  serrno = " << serrno << " (rfio server error code, 0 = OK, 1004 = timeout, ...)\n"
00294         << "  rfio_errno = " << rfio_errno << " (rfio error from actually accessing the file)\n"
00295         << "  current position = " << m_curpos << " (in bytes, beginning of file is 0)\n"
00296         << "  retries left before quitting = " << maxRetry << "\n"
00297         << "  will close and reopen file " << m_name << "\n"
00298         << "  will sleep for " << sleepTimeMsg << " before attempting retry";
00299       edm::FlushMessageLog();
00300       sleep(secondsToSleep);
00301 
00302       // Improve the chances of success by closing and reopening
00303       // the file before retrying the read.  This also resets
00304       // the position in the file to the correct place.
00305       reopen();
00306     }
00307     else
00308       break;
00309   } while (--maxRetry > 0);
00310 
00311   return s;
00312 }
00313 
00314 IOSize
00315 RFIOFile::read (void *into, IOSize n)
00316 {
00317   // Be aware that when enabled these LogDebug prints
00318   // will take more time than the read itself unless the reads
00319   // are proceeding slower than under optimal conditions.
00320   LogDebug("RFIOFileDebug") << "Entering RFIOFile read()";
00321   double start = realNanoSecs();
00322 
00323   ssize_t s;
00324   serrno = 0;
00325   if ((s = retryRead (into, n, 3)) < 0)
00326     throw cms::Exception("RFIOFile::read()")
00327       << "rfio_read(name='" << m_name << "', n=" << n << ") failed"
00328       << " at position " << m_curpos << " with error '" << rfio_serror()
00329       << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00330 
00331   m_curpos += s;
00332 
00333   double end = realNanoSecs();
00334   LogDebug("RFIOFileDebug")
00335     << "Exiting RFIOFile read(), elapsed time = " << end - start
00336     << " ns, bytes read = " << s << ", file position = " << m_curpos;
00337 
00338   return s;
00339 }
00340 
00341 IOSize
00342 RFIOFile::readv (IOPosBuffer *into, IOSize buffers)
00343 {
00344   if (! (m_flags & IOFlags::OpenUnbuffered))
00345     prefetch(into, buffers);
00346   return Storage::readv(into, buffers);
00347 }
00348 
00349 IOSize
00350 RFIOFile::write (const void *from, IOSize n)
00351 {
00352   serrno = 0;
00353   ssize_t s = rfio_write64 (m_fd, from, n);
00354   if (s < 0)
00355     throw cms::Exception("RFIOFile::write()")
00356       << "rfio_write(name='" << m_name << "', n=" << n << ") failed"
00357       << " at position " << m_curpos << " with error '" << rfio_serror()
00358       << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00359   return s;
00360 }
00361 
00365 IOOffset
00366 RFIOFile::position (IOOffset offset, Relative whence /* = SET */)
00367 {
00368   if (m_fd == EDM_IOFD_INVALID)
00369     throw cms::Exception("RFIOFile::position()")
00370       << "RFIOFile::position() called on a closed file";
00371   if (whence != CURRENT && whence != SET && whence != END)
00372     throw cms::Exception("RFIOFile::position()")
00373       << "RFIOFile::position() called with incorrect 'whence' parameter";
00374 
00375   IOOffset      result;
00376   int           mywhence = (whence == SET ? SEEK_SET
00377                             : whence == CURRENT ? SEEK_CUR
00378                             : SEEK_END);
00379 
00380   serrno = 0;
00381   if ((result = rfio_lseek64 (m_fd, offset, mywhence)) == -1)
00382     throw cms::Exception("RFIOFile::position()")
00383       << "rfio_lseek(name='" << m_name << "', offset=" << offset
00384       << ", whence=" << mywhence << ") failed at position "
00385       << m_curpos << " with error '" << rfio_serror()
00386       << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00387 
00388   m_curpos = result;
00389   return result;
00390 }
00391 
00392 void
00393 RFIOFile::resize (IOOffset /* size */)
00394 {
00395   throw cms::Exception("RFIOFile::resize()")
00396     << "RFIOFile::resize(name='" << m_name << "') not implemented";
00397 }
00398 
00399 bool
00400 RFIOFile::prefetch (const IOPosBuffer *what, IOSize n)
00401 {
00402   if (rfioreadopt (RFIO_READOPT) != 1)
00403     throw cms::Exception("RFIOFile::preseek()")
00404       << "RFIOFile::prefetch() called but RFIO_READOPT="
00405       << rfioreadopt (RFIO_READOPT) << " (must be 1)";
00406 
00407   std::vector<iovec64> iov (n);
00408   for (IOSize i = 0; i < n; ++i)
00409   {
00410     iov[i].iov_base = what[i].offset();
00411     iov[i].iov_len = what[i].size();
00412   }
00413 
00414   serrno = 0;
00415   int retry = 5;
00416   int result;
00417   while ((result = rfio_preseek64(m_fd, &iov[0], n)) == -1)
00418   {
00419     if (--retry <= 0)
00420     {
00421       edm::LogError("RFIOFile::prefetch")
00422         << "RFIOFile::prefetch(name='" << m_name << "') failed with error '"
00423         << rfio_serror() << "' (rfio_errno=" << rfio_errno
00424         << ", serrno=" << serrno << ")";
00425       return false;
00426     }
00427     else
00428     {
00429       edm::LogWarning("RFIOFileRetry")
00430         << "RFIOFile::prefetch(name='" << m_name << "') failed at position "
00431         << m_curpos << " with error '" << rfio_serror()
00432         << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno
00433         << "); retrying " << (retry+1) << " times";
00434       serrno = 0;
00435       sleep(5);
00436     }
00437   }
00438 
00439   return true;
00440 }