00001 #define __STDC_LIMIT_MACROS 1
00002 #include "Utilities/RFIOAdaptor/interface/RFIOFile.h"
00003 #include "Utilities/RFIOAdaptor/interface/RFIO.h"
00004 #include "FWCore/Utilities/interface/Exception.h"
00005 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00006 #include <cerrno>
00007 #include <unistd.h>
00008 #include <stdint.h>
00009 #include <time.h>
00010 #include <sys/time.h>
00011
00012 #include <cstring>
00013 #include <vector>
00014
00015 static double realNanoSecs (void)
00016 {
00017 #if _POSIX_TIMERS > 0
00018 struct timespec tm;
00019 if (clock_gettime(CLOCK_REALTIME, &tm) == 0)
00020 return tm.tv_sec * 1e9 + tm.tv_nsec;
00021 #else
00022 struct timeval tm;
00023 if (gettimeofday(&tm, 0) == 0)
00024 return tm.tv_sec * 1e9 + tm.tv_usec * 1e3;
00025 #endif
00026 return 0;
00027 }
00028
00029 RFIOFile::RFIOFile (void)
00030 : m_fd (EDM_IOFD_INVALID),
00031 m_close (false),
00032 m_flags (0),
00033 m_perms (0),
00034 m_curpos (0)
00035 {}
00036
00037 RFIOFile::RFIOFile (IOFD fd)
00038 : m_fd (fd),
00039 m_close (true),
00040 m_flags (0),
00041 m_perms (0),
00042 m_curpos (0)
00043 {}
00044
00045 RFIOFile::RFIOFile (const char *name,
00046 int flags ,
00047 int perms )
00048 : m_fd (EDM_IOFD_INVALID),
00049 m_close (false),
00050 m_flags (0),
00051 m_perms (0),
00052 m_curpos (0)
00053 { open (name, flags, perms); }
00054
00055 RFIOFile::RFIOFile (const std::string &name,
00056 int flags ,
00057 int perms )
00058 : m_fd (EDM_IOFD_INVALID),
00059 m_close (false),
00060 m_flags (0),
00061 m_perms (0),
00062 m_curpos (0)
00063 { open (name.c_str (), flags, perms); }
00064
00065 RFIOFile::~RFIOFile (void)
00066 {
00067 if (m_close)
00068 edm::LogError("RFIOFileError")
00069 << "Destructor called on RFIO file '" << m_name
00070 << "' but the file is still open";
00071 }
00072
00074
00075 void
00076 RFIOFile::create (const char *name,
00077 bool exclusive ,
00078 int perms )
00079 {
00080 open (name,
00081 (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate
00082 | (exclusive ? IOFlags::OpenExclusive : 0)),
00083 perms);
00084 }
00085
00086 void
00087 RFIOFile::create (const std::string &name,
00088 bool exclusive ,
00089 int perms )
00090 {
00091 open (name.c_str (),
00092 (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate
00093 | (exclusive ? IOFlags::OpenExclusive : 0)),
00094 perms);
00095 }
00096
00097 void
00098 RFIOFile::open (const std::string &name,
00099 int flags ,
00100 int perms )
00101 { open (name.c_str (), flags, perms); }
00102
00103 void
00104 RFIOFile::open (const char *name,
00105 int flags ,
00106 int perms )
00107 {
00108
00109 m_name = name;
00110 m_flags = flags;
00111 m_perms = perms;
00112
00113
00114 serrno = 0;
00115
00116
00117
00118
00119
00120
00121 if (flags & IOFlags::OpenUnbuffered)
00122 {
00123 int readopt = 0;
00124 rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
00125 }
00126 else
00127 {
00128 int readopt = 1;
00129 rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
00130 }
00131
00132 if ((name == 0) || (*name == 0))
00133 throw cms::Exception("RFIOFile::open()")
00134 << "Cannot open a file without a name";
00135
00136 if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0)
00137 throw cms::Exception("RFIOFile::open()")
00138 << "Must open file '" << name << "' at least for read or write";
00139
00140 std::string lname (name);
00141 if (lname.find ("//") == 0)
00142 lname.erase(0, 1);
00143
00144
00145 if (m_fd != EDM_IOFD_INVALID && m_close)
00146 close ();
00147
00148
00149 int openflags = 0;
00150
00151 if ((flags & IOFlags::OpenRead) && (flags & IOFlags::OpenWrite))
00152 openflags |= O_RDWR;
00153 else if (flags & IOFlags::OpenRead)
00154 openflags |= O_RDONLY;
00155 else if (flags & IOFlags::OpenWrite)
00156 openflags |= O_WRONLY;
00157
00158 if (flags & IOFlags::OpenNonBlock)
00159 openflags |= O_NONBLOCK;
00160
00161 if (flags & IOFlags::OpenAppend)
00162 openflags |= O_APPEND;
00163
00164 if (flags & IOFlags::OpenCreate)
00165 openflags |= O_CREAT;
00166
00167 if (flags & IOFlags::OpenExclusive)
00168 openflags |= O_EXCL;
00169
00170 if (flags & IOFlags::OpenTruncate)
00171 openflags |= O_TRUNC;
00172
00173 IOFD newfd = EDM_IOFD_INVALID;
00174 if ((newfd = rfio_open64 (lname.c_str(), openflags, perms)) == -1)
00175 throw cms::Exception("RFIOFile::open()")
00176 << "rfio_open(name='" << lname
00177 << "', flags=0x" << std::hex << openflags
00178 << ", permissions=0" << std::oct << perms << std::dec
00179 << ") => error '" << rfio_serror ()
00180 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00181
00182 m_fd = newfd;
00183 m_close = true;
00184 m_curpos = 0;
00185
00186 edm::LogInfo("RFIOFileInfo") << "Opened " << lname;
00187 }
00188
00189 void
00190 RFIOFile::close (void)
00191 {
00192 if (m_fd == EDM_IOFD_INVALID)
00193 {
00194 edm::LogError("RFIOFileError")
00195 << "RFIOFile::close(name='" << m_name
00196 << "') called but the file is not open";
00197 m_close = false;
00198 return;
00199 }
00200
00201 serrno = 0;
00202 if (rfio_close64 (m_fd) == -1)
00203 {
00204
00205 edm::LogWarning("RFIOFileWarning")
00206 << "rfio_close64(name='" << m_name
00207 << "') failed with error '" << rfio_serror()
00208 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00209
00210
00211
00212 int status = ::close(m_fd);
00213 if (status < 0)
00214 edm::LogWarning("RFIOFileWarning")
00215 << "RFIOFile::close(): system level close after a failed"
00216 << " rfio_close64 also failed with error '" << strerror (errno)
00217 << "' (error code " << errno << ")";
00218 else
00219 edm::LogWarning("RFIOFileWarning")
00220 << "RFIOFile::close(): system level close after a failed"
00221 << " rfio_close64 succeeded";
00222
00223 sleep(5);
00224 }
00225
00226 m_close = false;
00227 m_fd = EDM_IOFD_INVALID;
00228
00229
00230
00231 }
00232
00233 void
00234 RFIOFile::abort (void)
00235 {
00236 serrno = 0;
00237 if (m_fd != EDM_IOFD_INVALID)
00238 rfio_close64 (m_fd);
00239
00240 m_close = false;
00241 m_fd = EDM_IOFD_INVALID;
00242 }
00243
00244 void RFIOFile::reopen (void)
00245 {
00246
00247 IOOffset lastpos = m_curpos;
00248 close();
00249 sleep(5);
00250 open(m_name, m_flags, m_perms);
00251
00252
00253
00254 position(lastpos);
00255 }
00256
00257 ssize_t
00258 RFIOFile::retryRead (void *into, IOSize n, int maxRetry )
00259 {
00260
00261 ssize_t s;
00262 do
00263 {
00264 serrno = 0;
00265 s = rfio_read64 (m_fd, into, n);
00266 if ((s == -1 && serrno == 1004) || (s > ssize_t (n)))
00267 {
00268
00269 const char *sleepTimeMsg;
00270 int secondsToSleep = 5;
00271 switch (maxRetry)
00272 {
00273 case 1:
00274 sleepTimeMsg = "10 minutes";
00275 secondsToSleep = 600;
00276 break;
00277
00278 case 2:
00279 sleepTimeMsg = "5 minutes";
00280 secondsToSleep = 300;
00281 break;
00282
00283 default:
00284 sleepTimeMsg = "1 minute";
00285 secondsToSleep = 60;
00286 }
00287
00288 edm::LogWarning("RFIOFileRetry")
00289 << "RFIOFile retrying read\n"
00290 << " return value from rfio_read64 = " << s << " (normally this is bytes read, -1 for error)\n"
00291 << " bytes requested = " << n << " (this and bytes read are equal unless error or EOF)\n"
00292 << " rfio error message = " << rfio_serror() << " (explanation from server, if possible)\n"
00293 << " serrno = " << serrno << " (rfio server error code, 0 = OK, 1004 = timeout, ...)\n"
00294 << " rfio_errno = " << rfio_errno << " (rfio error from actually accessing the file)\n"
00295 << " current position = " << m_curpos << " (in bytes, beginning of file is 0)\n"
00296 << " retries left before quitting = " << maxRetry << "\n"
00297 << " will close and reopen file " << m_name << "\n"
00298 << " will sleep for " << sleepTimeMsg << " before attempting retry";
00299 edm::FlushMessageLog();
00300 sleep(secondsToSleep);
00301
00302
00303
00304
00305 reopen();
00306 }
00307 else
00308 break;
00309 } while (--maxRetry > 0);
00310
00311 return s;
00312 }
00313
00314 IOSize
00315 RFIOFile::read (void *into, IOSize n)
00316 {
00317
00318
00319
00320 LogDebug("RFIOFileDebug") << "Entering RFIOFile read()";
00321 double start = realNanoSecs();
00322
00323 ssize_t s;
00324 serrno = 0;
00325 if ((s = retryRead (into, n, 3)) < 0)
00326 throw cms::Exception("RFIOFile::read()")
00327 << "rfio_read(name='" << m_name << "', n=" << n << ") failed"
00328 << " at position " << m_curpos << " with error '" << rfio_serror()
00329 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00330
00331 m_curpos += s;
00332
00333 double end = realNanoSecs();
00334 LogDebug("RFIOFileDebug")
00335 << "Exiting RFIOFile read(), elapsed time = " << end - start
00336 << " ns, bytes read = " << s << ", file position = " << m_curpos;
00337
00338 return s;
00339 }
00340
00341 IOSize
00342 RFIOFile::readv (IOPosBuffer *into, IOSize buffers)
00343 {
00344 if (! (m_flags & IOFlags::OpenUnbuffered))
00345 prefetch(into, buffers);
00346 return Storage::readv(into, buffers);
00347 }
00348
00349 IOSize
00350 RFIOFile::write (const void *from, IOSize n)
00351 {
00352 serrno = 0;
00353 ssize_t s = rfio_write64 (m_fd, from, n);
00354 if (s < 0)
00355 throw cms::Exception("RFIOFile::write()")
00356 << "rfio_write(name='" << m_name << "', n=" << n << ") failed"
00357 << " at position " << m_curpos << " with error '" << rfio_serror()
00358 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00359 return s;
00360 }
00361
00365 IOOffset
00366 RFIOFile::position (IOOffset offset, Relative whence )
00367 {
00368 if (m_fd == EDM_IOFD_INVALID)
00369 throw cms::Exception("RFIOFile::position()")
00370 << "RFIOFile::position() called on a closed file";
00371 if (whence != CURRENT && whence != SET && whence != END)
00372 throw cms::Exception("RFIOFile::position()")
00373 << "RFIOFile::position() called with incorrect 'whence' parameter";
00374
00375 IOOffset result;
00376 int mywhence = (whence == SET ? SEEK_SET
00377 : whence == CURRENT ? SEEK_CUR
00378 : SEEK_END);
00379
00380 serrno = 0;
00381 if ((result = rfio_lseek64 (m_fd, offset, mywhence)) == -1)
00382 throw cms::Exception("RFIOFile::position()")
00383 << "rfio_lseek(name='" << m_name << "', offset=" << offset
00384 << ", whence=" << mywhence << ") failed at position "
00385 << m_curpos << " with error '" << rfio_serror()
00386 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00387
00388 m_curpos = result;
00389 return result;
00390 }
00391
00392 void
00393 RFIOFile::resize (IOOffset )
00394 {
00395 throw cms::Exception("RFIOFile::resize()")
00396 << "RFIOFile::resize(name='" << m_name << "') not implemented";
00397 }
00398
00399 bool
00400 RFIOFile::prefetch (const IOPosBuffer *what, IOSize n)
00401 {
00402 if (rfioreadopt (RFIO_READOPT) != 1)
00403 throw cms::Exception("RFIOFile::preseek()")
00404 << "RFIOFile::prefetch() called but RFIO_READOPT="
00405 << rfioreadopt (RFIO_READOPT) << " (must be 1)";
00406
00407 std::vector<iovec64> iov (n);
00408 for (IOSize i = 0; i < n; ++i)
00409 {
00410 iov[i].iov_base = what[i].offset();
00411 iov[i].iov_len = what[i].size();
00412 }
00413
00414 serrno = 0;
00415 int retry = 5;
00416 int result;
00417 while ((result = rfio_preseek64(m_fd, &iov[0], n)) == -1)
00418 {
00419 if (--retry <= 0)
00420 {
00421 edm::LogError("RFIOFile::prefetch")
00422 << "RFIOFile::prefetch(name='" << m_name << "') failed with error '"
00423 << rfio_serror() << "' (rfio_errno=" << rfio_errno
00424 << ", serrno=" << serrno << ")";
00425 return false;
00426 }
00427 else
00428 {
00429 edm::LogWarning("RFIOFileRetry")
00430 << "RFIOFile::prefetch(name='" << m_name << "') failed at position "
00431 << m_curpos << " with error '" << rfio_serror()
00432 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno
00433 << "); retrying " << (retry+1) << " times";
00434 serrno = 0;
00435 sleep(5);
00436 }
00437 }
00438
00439 return true;
00440 }