00001 #define __STDC_LIMIT_MACROS 1
00002 #include "Utilities/RFIOAdaptor/interface/RFIOFile.h"
00003 #include "Utilities/RFIOAdaptor/interface/RFIO.h"
00004 #include "FWCore/Utilities/interface/Exception.h"
00005 #include "FWCore/Utilities/interface/EDMException.h"
00006 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00007 #include <cerrno>
00008 #include <unistd.h>
00009 #include <stdint.h>
00010 #include <time.h>
00011 #include <sys/time.h>
00012
00013 #include <cstring>
00014 #include <vector>
00015
00016 static double realNanoSecs (void)
00017 {
00018 #if _POSIX_TIMERS > 0
00019 struct timespec tm;
00020 if (clock_gettime(CLOCK_REALTIME, &tm) == 0)
00021 return tm.tv_sec * 1e9 + tm.tv_nsec;
00022 #else
00023 struct timeval tm;
00024 if (gettimeofday(&tm, 0) == 0)
00025 return tm.tv_sec * 1e9 + tm.tv_usec * 1e3;
00026 #endif
00027 return 0;
00028 }
00029
00030 RFIOFile::RFIOFile (void)
00031 : m_fd (EDM_IOFD_INVALID),
00032 m_close (false),
00033 m_flags (0),
00034 m_perms (0),
00035 m_curpos (0)
00036 {}
00037
00038 RFIOFile::RFIOFile (IOFD fd)
00039 : m_fd (fd),
00040 m_close (true),
00041 m_flags (0),
00042 m_perms (0),
00043 m_curpos (0)
00044 {}
00045
00046 RFIOFile::RFIOFile (const char *name,
00047 int flags ,
00048 int perms )
00049 : m_fd (EDM_IOFD_INVALID),
00050 m_close (false),
00051 m_flags (0),
00052 m_perms (0),
00053 m_curpos (0)
00054 { open (name, flags, perms); }
00055
00056 RFIOFile::RFIOFile (const std::string &name,
00057 int flags ,
00058 int perms )
00059 : m_fd (EDM_IOFD_INVALID),
00060 m_close (false),
00061 m_flags (0),
00062 m_perms (0),
00063 m_curpos (0)
00064 { open (name.c_str (), flags, perms); }
00065
00066 RFIOFile::~RFIOFile (void)
00067 {
00068 if (m_close)
00069 edm::LogError("RFIOFileError")
00070 << "Destructor called on RFIO file '" << m_name
00071 << "' but the file is still open";
00072 }
00073
00075
00076 void
00077 RFIOFile::create (const char *name,
00078 bool exclusive ,
00079 int perms )
00080 {
00081 open (name,
00082 (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate
00083 | (exclusive ? IOFlags::OpenExclusive : 0)),
00084 perms);
00085 }
00086
00087 void
00088 RFIOFile::create (const std::string &name,
00089 bool exclusive ,
00090 int perms )
00091 {
00092 open (name.c_str (),
00093 (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate
00094 | (exclusive ? IOFlags::OpenExclusive : 0)),
00095 perms);
00096 }
00097
00098 void
00099 RFIOFile::open (const std::string &name,
00100 int flags ,
00101 int perms )
00102 { open (name.c_str (), flags, perms); }
00103
00104 void
00105 RFIOFile::open (const char *name,
00106 int flags ,
00107 int perms )
00108 {
00109
00110 m_name = name;
00111 m_flags = flags;
00112 m_perms = perms;
00113
00114
00115 serrno = 0;
00116
00117
00118
00119
00120
00121
00122 if (flags & IOFlags::OpenUnbuffered)
00123 {
00124 int readopt = 0;
00125 rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
00126 }
00127 else
00128 {
00129 int readopt = 1;
00130 rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
00131 }
00132
00133 if ((name == 0) || (*name == 0)) {
00134 edm::Exception ex(edm::errors::FileOpenError);
00135 ex << "Cannot open a file without a name";
00136 ex.addContext("Calling RFIOFile::open()");
00137 throw ex;
00138 }
00139 if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) {
00140 edm::Exception ex(edm::errors::FileOpenError);
00141 ex << "Must open file '" << name << "' at least for read or write";
00142 ex.addContext("Calling RFIOFile::open()");
00143 throw ex;
00144 }
00145 std::string lname (name);
00146 if (lname.find ("//") == 0)
00147 lname.erase(0, 1);
00148
00149
00150 if (m_fd != EDM_IOFD_INVALID && m_close)
00151 close ();
00152
00153
00154 int openflags = 0;
00155
00156 if ((flags & IOFlags::OpenRead) && (flags & IOFlags::OpenWrite))
00157 openflags |= O_RDWR;
00158 else if (flags & IOFlags::OpenRead)
00159 openflags |= O_RDONLY;
00160 else if (flags & IOFlags::OpenWrite)
00161 openflags |= O_WRONLY;
00162
00163 if (flags & IOFlags::OpenNonBlock)
00164 openflags |= O_NONBLOCK;
00165
00166 if (flags & IOFlags::OpenAppend)
00167 openflags |= O_APPEND;
00168
00169 if (flags & IOFlags::OpenCreate)
00170 openflags |= O_CREAT;
00171
00172 if (flags & IOFlags::OpenExclusive)
00173 openflags |= O_EXCL;
00174
00175 if (flags & IOFlags::OpenTruncate)
00176 openflags |= O_TRUNC;
00177
00178 IOFD newfd = EDM_IOFD_INVALID;
00179 if ((newfd = rfio_open64 (lname.c_str(), openflags, perms)) == -1) {
00180 edm::Exception ex(edm::errors::FileOpenError);
00181 ex << "rfio_open(name='" << lname
00182 << "', flags=0x" << std::hex << openflags
00183 << ", permissions=0" << std::oct << perms << std::dec
00184 << ") => error '" << rfio_serror ()
00185 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00186 ex.addContext("Calling RFIOFile::open()");
00187 throw ex;
00188 }
00189 m_fd = newfd;
00190 m_close = true;
00191 m_curpos = 0;
00192
00193 edm::LogInfo("RFIOFileInfo") << "Opened " << lname;
00194 }
00195
00196 void
00197 RFIOFile::close (void)
00198 {
00199 if (m_fd == EDM_IOFD_INVALID)
00200 {
00201 edm::LogError("RFIOFileError")
00202 << "RFIOFile::close(name='" << m_name
00203 << "') called but the file is not open";
00204 m_close = false;
00205 return;
00206 }
00207
00208 serrno = 0;
00209 if (rfio_close64 (m_fd) == -1)
00210 {
00211
00212 edm::LogWarning("RFIOFileWarning")
00213 << "rfio_close64(name='" << m_name
00214 << "') failed with error '" << rfio_serror()
00215 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00216
00217
00218
00219 int status = ::close(m_fd);
00220 if (status < 0)
00221 edm::LogWarning("RFIOFileWarning")
00222 << "RFIOFile::close(): system level close after a failed"
00223 << " rfio_close64 also failed with error '" << strerror (errno)
00224 << "' (error code " << errno << ")";
00225 else
00226 edm::LogWarning("RFIOFileWarning")
00227 << "RFIOFile::close(): system level close after a failed"
00228 << " rfio_close64 succeeded";
00229
00230 sleep(5);
00231 }
00232
00233 m_close = false;
00234 m_fd = EDM_IOFD_INVALID;
00235
00236
00237
00238 }
00239
00240 void
00241 RFIOFile::abort (void)
00242 {
00243 serrno = 0;
00244 if (m_fd != EDM_IOFD_INVALID)
00245 rfio_close64 (m_fd);
00246
00247 m_close = false;
00248 m_fd = EDM_IOFD_INVALID;
00249 }
00250
00251 void RFIOFile::reopen (void)
00252 {
00253
00254 IOOffset lastpos = m_curpos;
00255 close();
00256 sleep(5);
00257 open(m_name, m_flags, m_perms);
00258
00259
00260
00261 position(lastpos);
00262 }
00263
00264 ssize_t
00265 RFIOFile::retryRead (void *into, IOSize n, int maxRetry )
00266 {
00267
00268 ssize_t s;
00269 do
00270 {
00271 serrno = 0;
00272 s = rfio_read64 (m_fd, into, n);
00273 if ((s == -1 && serrno == 1004) || (s > ssize_t (n)))
00274 {
00275
00276 const char *sleepTimeMsg;
00277 int secondsToSleep = 5;
00278 switch (maxRetry)
00279 {
00280 case 1:
00281 sleepTimeMsg = "10 minutes";
00282 secondsToSleep = 600;
00283 break;
00284
00285 case 2:
00286 sleepTimeMsg = "5 minutes";
00287 secondsToSleep = 300;
00288 break;
00289
00290 default:
00291 sleepTimeMsg = "1 minute";
00292 secondsToSleep = 60;
00293 }
00294
00295 edm::LogWarning("RFIOFileRetry")
00296 << "RFIOFile retrying read\n"
00297 << " return value from rfio_read64 = " << s << " (normally this is bytes read, -1 for error)\n"
00298 << " bytes requested = " << n << " (this and bytes read are equal unless error or EOF)\n"
00299 << " rfio error message = " << rfio_serror() << " (explanation from server, if possible)\n"
00300 << " serrno = " << serrno << " (rfio server error code, 0 = OK, 1004 = timeout, ...)\n"
00301 << " rfio_errno = " << rfio_errno << " (rfio error from actually accessing the file)\n"
00302 << " current position = " << m_curpos << " (in bytes, beginning of file is 0)\n"
00303 << " retries left before quitting = " << maxRetry << "\n"
00304 << " will close and reopen file " << m_name << "\n"
00305 << " will sleep for " << sleepTimeMsg << " before attempting retry";
00306 edm::FlushMessageLog();
00307 sleep(secondsToSleep);
00308
00309
00310
00311
00312 reopen();
00313 }
00314 else
00315 break;
00316 } while (--maxRetry > 0);
00317
00318 return s;
00319 }
00320
00321 IOSize
00322 RFIOFile::read (void *into, IOSize n)
00323 {
00324
00325
00326
00327 LogDebug("RFIOFileDebug") << "Entering RFIOFile read()";
00328 double start = realNanoSecs();
00329
00330 ssize_t s;
00331 serrno = 0;
00332 if ((s = retryRead (into, n, 3)) < 0) {
00333 edm::Exception ex(edm::errors::FileReadError);
00334 ex << "rfio_read(name='" << m_name << "', n=" << n << ") failed"
00335 << " at position " << m_curpos << " with error '" << rfio_serror()
00336 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00337 ex.addContext("Calling RFIOFile::read()");
00338 throw ex;
00339 }
00340 m_curpos += s;
00341
00342 double end = realNanoSecs();
00343 LogDebug("RFIOFileDebug")
00344 << "Exiting RFIOFile read(), elapsed time = " << end - start
00345 << " ns, bytes read = " << s << ", file position = " << m_curpos;
00346
00347 return s;
00348 }
00349
00350 IOSize
00351 RFIOFile::readv (IOPosBuffer *into, IOSize buffers)
00352 {
00353 if (! (m_flags & IOFlags::OpenUnbuffered))
00354 prefetch(into, buffers);
00355 return Storage::readv(into, buffers);
00356 }
00357
00358 IOSize
00359 RFIOFile::write (const void *from, IOSize n)
00360 {
00361 serrno = 0;
00362 ssize_t s = rfio_write64 (m_fd, from, n);
00363 if (s < 0) {
00364 cms::Exception ex("FileWriteError");
00365 ex << "rfio_write(name='" << m_name << "', n=" << n << ") failed"
00366 << " at position " << m_curpos << " with error '" << rfio_serror()
00367 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00368 ex.addContext("Calling RFIOFile::write()");
00369 throw ex;
00370 }
00371 return s;
00372 }
00373
00377 IOOffset
00378 RFIOFile::position (IOOffset offset, Relative whence )
00379 {
00380 if (m_fd == EDM_IOFD_INVALID) {
00381 cms::Exception ex("FilePositionError");
00382 ex << "RFIOFile::position() called on a closed file";
00383 throw ex;
00384 }
00385 if (whence != CURRENT && whence != SET && whence != END) {
00386 cms::Exception ex("FilePositionError");
00387 ex << "RFIOFile::position() called with incorrect 'whence' parameter";
00388 throw ex;
00389 }
00390 IOOffset result;
00391 int mywhence = (whence == SET ? SEEK_SET
00392 : whence == CURRENT ? SEEK_CUR
00393 : SEEK_END);
00394
00395 serrno = 0;
00396 if ((result = rfio_lseek64 (m_fd, offset, mywhence)) == -1) {
00397 cms::Exception ex("FilePositionError");
00398 ex << "rfio_lseek(name='" << m_name << "', offset=" << offset
00399 << ", whence=" << mywhence << ") failed at position "
00400 << m_curpos << " with error '" << rfio_serror()
00401 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
00402 ex.addContext("Calling RFIOFile::position()");
00403 throw ex;
00404 }
00405 m_curpos = result;
00406 return result;
00407 }
00408
00409 void
00410 RFIOFile::resize (IOOffset )
00411 {
00412 cms::Exception ex("FileResizeError");
00413 ex << "RFIOFile::resize(name='" << m_name << "') not implemented";
00414 throw ex;
00415 }
00416
00417 bool
00418 RFIOFile::prefetch (const IOPosBuffer *what, IOSize n)
00419 {
00420 if (rfioreadopt (RFIO_READOPT) != 1) {
00421 cms::Exception ex("FilePrefetchError");
00422 ex << "RFIOFile::prefetch() called but RFIO_READOPT="
00423 << rfioreadopt (RFIO_READOPT) << " (must be 1)";
00424 throw ex;
00425 }
00426 std::vector<iovec64> iov (n);
00427 for (IOSize i = 0; i < n; ++i)
00428 {
00429 iov[i].iov_base = what[i].offset();
00430 iov[i].iov_len = what[i].size();
00431 }
00432
00433 serrno = 0;
00434 int retry = 5;
00435 int result;
00436 while ((result = rfio_preseek64(m_fd, &iov[0], n)) == -1)
00437 {
00438 if (--retry <= 0)
00439 {
00440 edm::LogError("RFIOFile::prefetch")
00441 << "RFIOFile::prefetch(name='" << m_name << "') failed with error '"
00442 << rfio_serror() << "' (rfio_errno=" << rfio_errno
00443 << ", serrno=" << serrno << ")";
00444 return false;
00445 }
00446 else
00447 {
00448 edm::LogWarning("RFIOFileRetry")
00449 << "RFIOFile::prefetch(name='" << m_name << "') failed at position "
00450 << m_curpos << " with error '" << rfio_serror()
00451 << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno
00452 << "); retrying " << (retry+1) << " times";
00453 serrno = 0;
00454 sleep(5);
00455 }
00456 }
00457
00458 return true;
00459 }