00001 #define __STDC_LIMIT_MACROS 1
00002 #include "Utilities/RFIOAdaptor/interface/RFIOFile.h"
00003 #include "Utilities/RFIOAdaptor/interface/RFIO.h"
00004 #include "FWCore/Utilities/interface/Exception.h"
00005 #include "FWCore/Utilities/interface/EDMException.h"
00006 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00007 #include <cerrno>
00008 #include <unistd.h>
00009 #include <stdint.h>
00010 #include <time.h>
00011 #include <sys/time.h>
00012
00013 #include <cstring>
00014 #include <vector>
00015
00016 static double realNanoSecs (void)
00017 {
00018 #if _POSIX_TIMERS > 0
00019 struct timespec tm;
00020 if (clock_gettime(CLOCK_REALTIME, &tm) == 0)
00021 return tm.tv_sec * 1e9 + tm.tv_nsec;
00022 #else
00023 struct timeval tm;
00024 if (gettimeofday(&tm, 0) == 0)
00025 return tm.tv_sec * 1e9 + tm.tv_usec * 1e3;
00026 #endif
00027 return 0;
00028 }
00029
00030 RFIOFile::RFIOFile (void)
00031 : m_fd (EDM_IOFD_INVALID),
00032 m_close (false),
00033 m_flags (0),
00034 m_perms (0),
00035 m_curpos (0)
00036 {}
00037
00038 RFIOFile::RFIOFile (IOFD fd)
00039 : m_fd (fd),
00040 m_close (true),
00041 m_flags (0),
00042 m_perms (0),
00043 m_curpos (0)
00044 {}
00045
00046 RFIOFile::RFIOFile (const char *name,
00047 int flags ,
00048 int perms )
00049 : m_fd (EDM_IOFD_INVALID),
00050 m_close (false),
00051 m_flags (0),
00052 m_perms (0),
00053 m_curpos (0)
00054 { open (name, flags, perms); }
00055
00056 RFIOFile::RFIOFile (const std::string &name,
00057 int flags ,
00058 int perms )
00059 : m_fd (EDM_IOFD_INVALID),
00060 m_close (false),
00061 m_flags (0),
00062 m_perms (0),
00063 m_curpos (0)
00064 { open (name.c_str (), flags, perms); }
00065
00066 RFIOFile::~RFIOFile (void)
00067 {
00068 if (m_close)
00069 edm::LogError("RFIOFileError")
00070 << "Destructor called on RFIO file '" << m_name
00071 << "' but the file is still open";
00072 }
00073
00075
00076 void
00077 RFIOFile::create (const char *name,
00078 bool exclusive ,
00079 int perms )
00080 {
00081 open (name,
00082 (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate
00083 | (exclusive ? IOFlags::OpenExclusive : 0)),
00084 perms);
00085 }
00086
00087 void
00088 RFIOFile::create (const std::string &name,
00089 bool exclusive ,
00090 int perms )
00091 {
00092 open (name.c_str (),
00093 (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate
00094 | (exclusive ? IOFlags::OpenExclusive : 0)),
00095 perms);
00096 }
00097
00098 void
00099 RFIOFile::open (const std::string &name,
00100 int flags ,
00101 int perms )
00102 { open (name.c_str (), flags, perms); }
00103
00104 void
00105 RFIOFile::open (const char *name,
00106 int flags ,
00107 int perms )
00108 {
00109
00110 if (name == 0) {
00111 m_name = "";
00112 } else {
00113 m_name = name;
00114 }
00115 m_flags = flags;
00116 m_perms = perms;
00117
00118
00119 serrno = 0;
00120
00121
00122
00123
00124
00125
00126 if (flags & IOFlags::OpenUnbuffered)
00127 {
00128 int readopt = 0;
00129 rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
00130 }
00131 else
00132 {
00133 int readopt = 1;
00134 rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
00135 }
00136
00137 if ((name == 0) || (*name == 0)) {
00138 edm::Exception ex(edm::errors::FileOpenError);
00139 ex << "Cannot open a file without a name";
00140 ex.addContext("Calling RFIOFile::open()");
00141 throw ex;
00142 }
00143 if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) {
00144 edm::Exception ex(edm::errors::FileOpenError);
00145 ex << "Must open file '" << name << "' at least for read or write";
00146 ex.addContext("Calling RFIOFile::open()");
00147 throw ex;
00148 }
00149 std::string lname (name);
00150 if (lname.find ("//") == 0)
00151 lname.erase(0, 1);
00152
00153
00154 if (m_fd != EDM_IOFD_INVALID && m_close)
00155 close ();
00156
00157
00158 int openflags = 0;
00159
00160 if ((flags & IOFlags::OpenRead) && (flags & IOFlags::OpenWrite))
00161 openflags |= O_RDWR;
00162 else if (flags & IOFlags::OpenRead)
00163 openflags |= O_RDONLY;
00164 else if (flags & IOFlags::OpenWrite)
00165 openflags |= O_WRONLY;
00166
00167 if (flags & IOFlags::OpenNonBlock)
00168 openflags |= O_NONBLOCK;
00169
00170 if (flags & IOFlags::OpenAppend)
00171 openflags |= O_APPEND;
00172
00173 if (flags & IOFlags::OpenCreate)
00174 openflags |= O_CREAT;
00175
00176 if (flags & IOFlags::OpenExclusive)
00177 openflags |= O_EXCL;
00178
00179 if (flags & IOFlags::OpenTruncate)
00180 openflags |= O_TRUNC;
00181
00182 IOFD newfd = EDM_IOFD_INVALID;
00183 if ((newfd = rfio_open64 (lname.c_str(), openflags, perms)) == -1) {
00184 edm::Exception ex(edm::errors::FileOpenError);
00185 ex << "rfio_open(name='" << lname
00186 << "', flags=0x" << std::hex << openflags
00187 << ", permissions=0" << std::oct << perms << std::dec
00188 << ") => error '" << rfio_serror ()
00189 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00190 ex.addContext("Calling RFIOFile::open()");
00191 throw ex;
00192 }
00193 m_fd = newfd;
00194 m_close = true;
00195 m_curpos = 0;
00196
00197 edm::LogInfo("RFIOFileInfo") << "Opened " << lname;
00198 }
00199
00200 void
00201 RFIOFile::close (void)
00202 {
00203 if (m_fd == EDM_IOFD_INVALID)
00204 {
00205 edm::LogError("RFIOFileError")
00206 << "RFIOFile::close(name='" << m_name
00207 << "') called but the file is not open";
00208 m_close = false;
00209 return;
00210 }
00211
00212 serrno = 0;
00213 if (rfio_close64 (m_fd) == -1)
00214 {
00215
00216 edm::LogWarning("RFIOFileWarning")
00217 << "rfio_close64(name='" << m_name
00218 << "') failed with error '" << rfio_serror()
00219 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00220
00221
00222
00223 int status = ::close(m_fd);
00224 if (status < 0)
00225 edm::LogWarning("RFIOFileWarning")
00226 << "RFIOFile::close(): system level close after a failed"
00227 << " rfio_close64 also failed with error '" << strerror (errno)
00228 << "' (error code " << errno << ")";
00229 else
00230 edm::LogWarning("RFIOFileWarning")
00231 << "RFIOFile::close(): system level close after a failed"
00232 << " rfio_close64 succeeded";
00233
00234 sleep(5);
00235 }
00236
00237 m_close = false;
00238 m_fd = EDM_IOFD_INVALID;
00239
00240
00241
00242 }
00243
00244 void
00245 RFIOFile::abort (void)
00246 {
00247 serrno = 0;
00248 if (m_fd != EDM_IOFD_INVALID)
00249 rfio_close64 (m_fd);
00250
00251 m_close = false;
00252 m_fd = EDM_IOFD_INVALID;
00253 }
00254
00255 void RFIOFile::reopen (void)
00256 {
00257
00258 IOOffset lastpos = m_curpos;
00259 close();
00260 sleep(5);
00261 open(m_name, m_flags, m_perms);
00262
00263
00264
00265 position(lastpos);
00266 }
00267
00268 ssize_t
00269 RFIOFile::retryRead (void *into, IOSize n, int maxRetry )
00270 {
00271
00272 ssize_t s;
00273 do
00274 {
00275 serrno = 0;
00276 s = rfio_read64 (m_fd, into, n);
00277 if ((s == -1 && serrno == 1004) || (s > ssize_t (n)))
00278 {
00279
00280 const char *sleepTimeMsg;
00281 int secondsToSleep = 5;
00282 switch (maxRetry)
00283 {
00284 case 1:
00285 sleepTimeMsg = "10 minutes";
00286 secondsToSleep = 600;
00287 break;
00288
00289 case 2:
00290 sleepTimeMsg = "5 minutes";
00291 secondsToSleep = 300;
00292 break;
00293
00294 default:
00295 sleepTimeMsg = "1 minute";
00296 secondsToSleep = 60;
00297 }
00298
00299 edm::LogWarning("RFIOFileRetry")
00300 << "RFIOFile retrying read\n"
00301 << " return value from rfio_read64 = " << s << " (normally this is bytes read, -1 for error)\n"
00302 << " bytes requested = " << n << " (this and bytes read are equal unless error or EOF)\n"
00303 << " rfio error message = " << rfio_serror() << " (explanation from server, if possible)\n"
00304 << " serrno = " << serrno << " (rfio server error code, 0 = OK, 1004 = timeout, ...)\n"
00305 << " rfio_errno = " << rfio_errno << " (rfio error from actually accessing the file)\n"
00306 << " current position = " << m_curpos << " (in bytes, beginning of file is 0)\n"
00307 << " retries left before quitting = " << maxRetry << "\n"
00308 << " will close and reopen file " << m_name << "\n"
00309 << " will sleep for " << sleepTimeMsg << " before attempting retry";
00310 edm::FlushMessageLog();
00311 sleep(secondsToSleep);
00312
00313
00314
00315
00316 reopen();
00317 }
00318 else
00319 break;
00320 } while (--maxRetry > 0);
00321
00322 return s;
00323 }
00324
00325 IOSize
00326 RFIOFile::read (void *into, IOSize n)
00327 {
00328
00329
00330
00331 LogDebug("RFIOFileDebug") << "Entering RFIOFile read()";
00332 double start = realNanoSecs();
00333
00334 ssize_t s;
00335 serrno = 0;
00336 if ((s = retryRead (into, n, 3)) < 0) {
00337 edm::Exception ex(edm::errors::FileReadError);
00338 ex << "rfio_read(name='" << m_name << "', n=" << n << ") failed"
00339 << " at position " << m_curpos << " with error '" << rfio_serror()
00340 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00341 ex.addContext("Calling RFIOFile::read()");
00342 throw ex;
00343 }
00344 m_curpos += s;
00345
00346 double end = realNanoSecs();
00347 LogDebug("RFIOFileDebug")
00348 << "Exiting RFIOFile read(), elapsed time = " << end - start
00349 << " ns, bytes read = " << s << ", file position = " << m_curpos;
00350
00351 return s;
00352 }
00353
00354 IOSize
00355 RFIOFile::readv (IOPosBuffer *into, IOSize buffers)
00356 {
00357 if (! (m_flags & IOFlags::OpenUnbuffered))
00358 prefetch(into, buffers);
00359 return Storage::readv(into, buffers);
00360 }
00361
00362 IOSize
00363 RFIOFile::write (const void *from, IOSize n)
00364 {
00365 serrno = 0;
00366 ssize_t s = rfio_write64 (m_fd, from, n);
00367 if (s < 0) {
00368 cms::Exception ex("FileWriteError");
00369 ex << "rfio_write(name='" << m_name << "', n=" << n << ") failed"
00370 << " at position " << m_curpos << " with error '" << rfio_serror()
00371 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00372 ex.addContext("Calling RFIOFile::write()");
00373 throw ex;
00374 }
00375 return s;
00376 }
00377
00381 IOOffset
00382 RFIOFile::position (IOOffset offset, Relative whence )
00383 {
00384 if (m_fd == EDM_IOFD_INVALID) {
00385 cms::Exception ex("FilePositionError");
00386 ex << "RFIOFile::position() called on a closed file";
00387 throw ex;
00388 }
00389 if (whence != CURRENT && whence != SET && whence != END) {
00390 cms::Exception ex("FilePositionError");
00391 ex << "RFIOFile::position() called with incorrect 'whence' parameter";
00392 throw ex;
00393 }
00394 IOOffset result;
00395 int mywhence = (whence == SET ? SEEK_SET
00396 : whence == CURRENT ? SEEK_CUR
00397 : SEEK_END);
00398
00399 serrno = 0;
00400 if ((result = rfio_lseek64 (m_fd, offset, mywhence)) == -1) {
00401 cms::Exception ex("FilePositionError");
00402 ex << "rfio_lseek(name='" << m_name << "', offset=" << offset
00403 << ", whence=" << mywhence << ") failed at position "
00404 << m_curpos << " with error '" << rfio_serror()
00405 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00406 ex.addContext("Calling RFIOFile::position()");
00407 throw ex;
00408 }
00409 m_curpos = result;
00410 return result;
00411 }
00412
00413 void
00414 RFIOFile::resize (IOOffset )
00415 {
00416 cms::Exception ex("FileResizeError");
00417 ex << "RFIOFile::resize(name='" << m_name << "') not implemented";
00418 throw ex;
00419 }
00420
00421 bool
00422 RFIOFile::prefetch (const IOPosBuffer *what, IOSize n)
00423 {
00424 if (rfioreadopt (RFIO_READOPT) != 1) {
00425 cms::Exception ex("FilePrefetchError");
00426 ex << "RFIOFile::prefetch() called but RFIO_READOPT="
00427 << rfioreadopt (RFIO_READOPT) << " (must be 1)";
00428 throw ex;
00429 }
00430 std::vector<iovec64> iov (n);
00431 for (IOSize i = 0; i < n; ++i)
00432 {
00433 iov[i].iov_base = what[i].offset();
00434 iov[i].iov_len = what[i].size();
00435 }
00436
00437 serrno = 0;
00438 int retry = 5;
00439 int result;
00440 while ((result = rfio_preseek64(m_fd, &iov[0], n)) == -1)
00441 {
00442 if (--retry == 0)
00443 {
00444 edm::LogError("RFIOFile::prefetch")
00445 << "RFIOFile::prefetch(name='" << m_name << "') failed with error '"
00446 << rfio_serror() << "' (rfio_errno=" << rfio_errno
00447 << ", serrno=" << serrno << ")";
00448 return false;
00449 }
00450 else
00451 {
00452 edm::LogWarning("RFIOFileRetry")
00453 << "RFIOFile::prefetch(name='" << m_name << "') failed at position "
00454 << m_curpos << " with error '" << rfio_serror()
00455 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno
00456 << "); retrying " << (retry+1) << " times";
00457 serrno = 0;
00458 sleep(5);
00459 }
00460 }
00461
00462 return true;
00463 }