00001 #define __STDC_LIMIT_MACROS 1
00002 #include "Utilities/RFIOAdaptor/interface/RFIOFile.h"
00003 #include "Utilities/RFIOAdaptor/interface/RFIO.h"
00004 #include "FWCore/Utilities/interface/Exception.h"
00005 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00006 #include <cerrno>
00007 #include <unistd.h>
00008 #include <stdint.h>
00009 #include <time.h>
00010
00011 static double realNanoSecs (void)
00012 {
00013 struct timespec ts;
00014 if (clock_gettime (CLOCK_REALTIME, &ts) == 0)
00015 return ts.tv_sec * 1e9 + ts.tv_nsec;
00016 return 0;
00017 }
00018
00019 RFIOFile::RFIOFile (void)
00020 : m_fd (EDM_IOFD_INVALID),
00021 m_close (false),
00022 m_flags (0),
00023 m_perms (0),
00024 m_curpos (0)
00025 {}
00026
00027 RFIOFile::RFIOFile (IOFD fd)
00028 : m_fd (fd),
00029 m_close (true),
00030 m_flags (0),
00031 m_perms (0),
00032 m_curpos (0)
00033 {}
00034
00035 RFIOFile::RFIOFile (const char *name,
00036 int flags ,
00037 int perms )
00038 : m_fd (EDM_IOFD_INVALID),
00039 m_close (false),
00040 m_flags (0),
00041 m_perms (0),
00042 m_curpos (0)
00043 { open (name, flags, perms); }
00044
00045 RFIOFile::RFIOFile (const std::string &name,
00046 int flags ,
00047 int perms )
00048 : m_fd (EDM_IOFD_INVALID),
00049 m_close (false),
00050 m_flags (0),
00051 m_perms (0),
00052 m_curpos (0)
00053 { open (name.c_str (), flags, perms); }
00054
00055 RFIOFile::~RFIOFile (void)
00056 {
00057 if (m_close)
00058 edm::LogError("RFIOFileError")
00059 << "Destructor called on RFIO file '" << m_name
00060 << "' but the file is still open";
00061 }
00062
00064
00065 void
00066 RFIOFile::create (const char *name,
00067 bool exclusive ,
00068 int perms )
00069 {
00070 open (name,
00071 (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate
00072 | (exclusive ? IOFlags::OpenExclusive : 0)),
00073 perms);
00074 }
00075
00076 void
00077 RFIOFile::create (const std::string &name,
00078 bool exclusive ,
00079 int perms )
00080 {
00081 open (name.c_str (),
00082 (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate
00083 | (exclusive ? IOFlags::OpenExclusive : 0)),
00084 perms);
00085 }
00086
00087 void
00088 RFIOFile::open (const std::string &name,
00089 int flags ,
00090 int perms )
00091 { open (name.c_str (), flags, perms); }
00092
00093 void
00094 RFIOFile::open (const char *name,
00095 int flags ,
00096 int perms )
00097 {
00098
00099 m_name = name;
00100 m_flags = flags;
00101 m_perms = perms;
00102
00103
00104 serrno = 0;
00105
00106
00107
00108
00109
00110
00111 if (flags & IOFlags::OpenUnbuffered)
00112 {
00113 int readopt = 0;
00114 rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
00115 }
00116 else
00117 {
00118 int readopt = 1;
00119 rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
00120 }
00121
00122 if ((name == 0) || (*name == 0))
00123 throw cms::Exception("RFIOFile::open()")
00124 << "Cannot open a file without a name";
00125
00126 if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0)
00127 throw cms::Exception("RFIOFile::open()")
00128 << "Must open file '" << name << "' at least for read or write";
00129
00130 std::string lname (name);
00131 if (lname.find ("//") == 0)
00132 lname.erase(0, 1);
00133
00134
00135 if (m_fd != EDM_IOFD_INVALID && m_close)
00136 close ();
00137
00138
00139 int openflags = 0;
00140
00141 if ((flags & IOFlags::OpenRead) && (flags & IOFlags::OpenWrite))
00142 openflags |= O_RDWR;
00143 else if (flags & IOFlags::OpenRead)
00144 openflags |= O_RDONLY;
00145 else if (flags & IOFlags::OpenWrite)
00146 openflags |= O_WRONLY;
00147
00148 if (flags & IOFlags::OpenNonBlock)
00149 openflags |= O_NONBLOCK;
00150
00151 if (flags & IOFlags::OpenAppend)
00152 openflags |= O_APPEND;
00153
00154 if (flags & IOFlags::OpenCreate)
00155 openflags |= O_CREAT;
00156
00157 if (flags & IOFlags::OpenExclusive)
00158 openflags |= O_EXCL;
00159
00160 if (flags & IOFlags::OpenTruncate)
00161 openflags |= O_TRUNC;
00162
00163 IOFD newfd = EDM_IOFD_INVALID;
00164 if ((newfd = rfio_open64 (lname.c_str(), openflags, perms)) == -1)
00165 throw cms::Exception("RFIOFile::open()")
00166 << "rfio_open(name='" << lname
00167 << "', flags=0x" << std::hex << openflags
00168 << ", permissions=0" << std::oct << perms << std::dec
00169 << ") => error '" << rfio_serror ()
00170 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00171
00172 m_fd = newfd;
00173 m_close = true;
00174 m_curpos = 0;
00175
00176 edm::LogInfo("RFIOFileInfo") << "Opened " << lname;
00177 }
00178
00179 void
00180 RFIOFile::close (void)
00181 {
00182 if (m_fd == EDM_IOFD_INVALID)
00183 {
00184 edm::LogError("RFIOFileError")
00185 << "RFIOFile::close(name='" << m_name
00186 << "') called but the file is not open";
00187 m_close = false;
00188 return;
00189 }
00190
00191 serrno = 0;
00192 if (rfio_close64 (m_fd) == -1)
00193 {
00194
00195 edm::LogWarning("RFIOFileWarning")
00196 << "rfio_close64(name='" << m_name
00197 << "') failed with error '" << rfio_serror()
00198 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00199
00200
00201
00202 int status = ::close(m_fd);
00203 if (status < 0)
00204 edm::LogWarning("RFIOFileWarning")
00205 << "RFIOFile::close(): system level close after a failed"
00206 << " rfio_close64 also failed with error '" << strerror (errno)
00207 << "' (error code " << errno << ")";
00208 else
00209 edm::LogWarning("RFIOFileWarning")
00210 << "RFIOFile::close(): system level close after a failed"
00211 << " rfio_close64 succeeded";
00212
00213 sleep(5);
00214 }
00215
00216 m_close = false;
00217 m_fd = EDM_IOFD_INVALID;
00218
00219
00220
00221 }
00222
00223 void
00224 RFIOFile::abort (void)
00225 {
00226 serrno = 0;
00227 if (m_fd != EDM_IOFD_INVALID)
00228 rfio_close64 (m_fd);
00229
00230 m_close = false;
00231 m_fd = EDM_IOFD_INVALID;
00232 }
00233
00234 void RFIOFile::reopen (void)
00235 {
00236
00237 IOOffset lastpos = m_curpos;
00238 close();
00239 sleep(5);
00240 open(m_name, m_flags, m_perms);
00241
00242
00243
00244 position(lastpos);
00245 }
00246
00247 ssize_t
00248 RFIOFile::retryRead (void *into, IOSize n, int maxRetry )
00249 {
00250
00251 ssize_t s;
00252 do
00253 {
00254 serrno = 0;
00255 s = rfio_read64 (m_fd, into, n);
00256 if ((s == -1 && serrno == 1004) || (s > ssize_t (n)))
00257 {
00258
00259 const char *sleepTimeMsg;
00260 int secondsToSleep = 5;
00261 switch (maxRetry)
00262 {
00263 case 1:
00264 sleepTimeMsg = "10 minutes";
00265 secondsToSleep = 600;
00266 break;
00267
00268 case 2:
00269 sleepTimeMsg = "5 minutes";
00270 secondsToSleep = 300;
00271 break;
00272
00273 default:
00274 sleepTimeMsg = "1 minute";
00275 secondsToSleep = 60;
00276 }
00277
00278 edm::LogWarning("RFIOFileRetry")
00279 << "RFIOFile retrying read\n"
00280 << " return value from rfio_read64 = " << s << " (normally this is bytes read, -1 for error)\n"
00281 << " bytes requested = " << n << " (this and bytes read are equal unless error or EOF)\n"
00282 << " rfio error message = " << rfio_serror() << " (explanation from server, if possible)\n"
00283 << " serrno = " << serrno << " (rfio server error code, 0 = OK, 1004 = timeout, ...)\n"
00284 << " rfio_errno = " << rfio_errno << " (rfio error from actually accessing the file)\n"
00285 << " current position = " << m_curpos << " (in bytes, beginning of file is 0)\n"
00286 << " retries left before quitting = " << maxRetry << "\n"
00287 << " will close and reopen file " << m_name << "\n"
00288 << " will sleep for " << sleepTimeMsg << " before attempting retry";
00289 edm::FlushMessageLog();
00290 sleep(secondsToSleep);
00291
00292
00293
00294
00295 reopen();
00296 }
00297 else
00298 break;
00299 } while (--maxRetry > 0);
00300
00301 return s;
00302 }
00303
00304 IOSize
00305 RFIOFile::read (void *into, IOSize n)
00306 {
00307
00308
00309
00310 LogDebug("RFIOFileDebug") << "Entering RFIOFile read()";
00311 double start = realNanoSecs();
00312
00313 ssize_t s;
00314 serrno = 0;
00315 if ((s = retryRead (into, n, 3)) < 0)
00316 throw cms::Exception("RFIOFile::read()")
00317 << "rfio_read(name='" << m_name << "', n=" << n << ") failed"
00318 << " at position " << m_curpos << " with error '" << rfio_serror()
00319 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00320
00321 m_curpos += s;
00322
00323 double end = realNanoSecs();
00324 LogDebug("RFIOFileDebug")
00325 << "Exiting RFIOFile read(), elapsed time = " << end - start
00326 << " ns, bytes read = " << s << ", file position = " << m_curpos;
00327
00328 return s;
00329 }
00330
00331 IOSize
00332 RFIOFile::readv (IOPosBuffer *into, IOSize buffers)
00333 {
00334 if (! (m_flags & IOFlags::OpenUnbuffered))
00335 prefetch(into, buffers);
00336 return Storage::readv(into, buffers);
00337 }
00338
00339 IOSize
00340 RFIOFile::write (const void *from, IOSize n)
00341 {
00342 serrno = 0;
00343 ssize_t s = rfio_write64 (m_fd, from, n);
00344 if (s < 0)
00345 throw cms::Exception("RFIOFile::write()")
00346 << "rfio_write(name='" << m_name << "', n=" << n << ") failed"
00347 << " at position " << m_curpos << " with error '" << rfio_serror()
00348 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00349 return s;
00350 }
00351
00355 IOOffset
00356 RFIOFile::position (IOOffset offset, Relative whence )
00357 {
00358 if (m_fd == EDM_IOFD_INVALID)
00359 throw cms::Exception("RFIOFile::position()")
00360 << "RFIOFile::position() called on a closed file";
00361 if (whence != CURRENT && whence != SET && whence != END)
00362 throw cms::Exception("RFIOFile::position()")
00363 << "RFIOFile::position() called with incorrect 'whence' parameter";
00364
00365 IOOffset result;
00366 int mywhence = (whence == SET ? SEEK_SET
00367 : whence == CURRENT ? SEEK_CUR
00368 : SEEK_END);
00369
00370 serrno = 0;
00371 if ((result = rfio_lseek64 (m_fd, offset, mywhence)) == -1)
00372 throw cms::Exception("RFIOFile::position()")
00373 << "rfio_lseek(name='" << m_name << "', offset=" << offset
00374 << ", whence=" << mywhence << ") failed at position "
00375 << m_curpos << " with error '" << rfio_serror()
00376 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00377
00378 m_curpos = result;
00379 return result;
00380 }
00381
00382 void
00383 RFIOFile::resize (IOOffset )
00384 {
00385 throw cms::Exception("RFIOFile::resize()")
00386 << "RFIOFile::resize(name='" << m_name << "') not implemented";
00387 }
00388
00389 bool
00390 RFIOFile::prefetch (const IOPosBuffer *what, IOSize n)
00391 {
00392 if (rfioreadopt (RFIO_READOPT) != 1)
00393 throw cms::Exception("RFIOFile::preseek()")
00394 << "RFIOFile::prefetch() called but RFIO_READOPT="
00395 << rfioreadopt (RFIO_READOPT) << " (must be 1)";
00396
00397 std::vector<iovec64> iov (n);
00398 for (IOSize i = 0; i < n; ++i)
00399 {
00400 iov[i].iov_base = what[i].offset();
00401 iov[i].iov_len = what[i].size();
00402 }
00403
00404 serrno = 0;
00405 int retry = 5;
00406 int result;
00407 while ((result = rfio_preseek64(m_fd, &iov[0], n)) == -1)
00408 {
00409 if (--retry <= 0)
00410 {
00411 edm::LogError("RFIOFile::prefetch")
00412 << "RFIOFile::prefetch(name='" << m_name << "') failed with error '"
00413 << rfio_serror() << "' (rfio_errno=" << rfio_errno
00414 << ", serrno=" << serrno << ")";
00415 return false;
00416 }
00417 else
00418 {
00419 edm::LogWarning("RFIOFileRetry")
00420 << "RFIOFile::prefetch(name='" << m_name << "') failed at position "
00421 << m_curpos << " with error '" << rfio_serror()
00422 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno
00423 << "); retrying " << (retry+1) << " times";
00424 serrno = 0;
00425 sleep(5);
00426 }
00427 }
00428
00429 return true;
00430 }