test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
RFIOFile.cc
Go to the documentation of this file.
1 #define __STDC_LIMIT_MACROS 1
7 #include <cerrno>
8 #include <unistd.h>
9 #include <stdint.h>
10 #include <time.h>
11 #include <sys/time.h>
12 
13 #include <cstring>
14 #include <vector>
15 
16 static double realNanoSecs (void)
17 {
18 #if _POSIX_TIMERS > 0
19  struct timespec tm;
20  if (clock_gettime(CLOCK_REALTIME, &tm) == 0)
21  return tm.tv_sec * 1e9 + tm.tv_nsec;
22 #else
23  struct timeval tm;
24  if (gettimeofday(&tm, 0) == 0)
25  return tm.tv_sec * 1e9 + tm.tv_usec * 1e3;
26 #endif
27  return 0;
28 }
29 
31  : m_fd (EDM_IOFD_INVALID),
32  m_close (false),
33  m_flags (0),
34  m_perms (0),
35  m_curpos (0)
36 {}
37 
39  : m_fd (fd),
40  m_close (true),
41  m_flags (0),
42  m_perms (0),
43  m_curpos (0)
44 {}
45 
46 RFIOFile::RFIOFile (const char *name,
47  int flags /* = IOFlags::OpenRead */,
48  int perms /* = 066 */)
49  : m_fd (EDM_IOFD_INVALID),
50  m_close (false),
51  m_flags (0),
52  m_perms (0),
53  m_curpos (0)
54 { open (name, flags, perms); }
55 
57  int flags /* = IOFlags::OpenRead */,
58  int perms /* = 066 */)
59  : m_fd (EDM_IOFD_INVALID),
60  m_close (false),
61  m_flags (0),
62  m_perms (0),
63  m_curpos (0)
64 { open (name.c_str (), flags, perms); }
65 
67 {
68  if (m_close)
69  edm::LogError("RFIOFileError")
70  << "Destructor called on RFIO file '" << m_name
71  << "' but the file is still open";
72 }
73 
75 
76 void
77 RFIOFile::create (const char *name,
78  bool exclusive /* = false */,
79  int perms /* = 066 */)
80 {
81  open (name,
83  | (exclusive ? IOFlags::OpenExclusive : 0)),
84  perms);
85 }
86 
87 void
89  bool exclusive /* = false */,
90  int perms /* = 066 */)
91 {
92  open (name.c_str (),
94  | (exclusive ? IOFlags::OpenExclusive : 0)),
95  perms);
96 }
97 
98 void
100  int flags /* = IOFlags::OpenRead */,
101  int perms /* = 066 */)
102 { open (name.c_str (), flags, perms); }
103 
104 void
105 RFIOFile::open (const char *name,
106  int flags /* = IOFlags::OpenRead */,
107  int perms /* = 066 */)
108 {
109  // Save parameters for error recovery.
110  if (name == 0) {
111  m_name = "";
112  } else {
113  m_name = name;
114  }
115  m_flags = flags;
116  m_perms = perms;
117 
118  // Reset RFIO error code.
119  serrno = 0;
120 
121  // Disable buffering in rfio library? Note that doing this on
122  // one file disables it for everything. Not much we can do...
123  // but it does make a significant performance difference to the
124  // clients. Note also that docs say the flag turns off write
125  // buffering -- this turns off all buffering.
126  if (flags & IOFlags::OpenUnbuffered)
127  {
128  int readopt = 0;
129  rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
130  }
131  else
132  {
133  int readopt = 1;
134  rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
135  }
136 
137  if ((name == 0) || (*name == 0)) {
139  ex << "Cannot open a file without a name";
140  ex.addContext("Calling RFIOFile::open()");
141  throw ex;
142  }
143  if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) {
145  ex << "Must open file '" << name << "' at least for read or write";
146  ex.addContext("Calling RFIOFile::open()");
147  throw ex;
148  }
149  std::string lname (name);
150  if (lname.find ("//") == 0)
151  lname.erase(0, 1);
152 
153  // If I am already open, close old file first
154  if (m_fd != EDM_IOFD_INVALID && m_close)
155  close ();
156 
157  // Translate our flags to system flags
158  int openflags = 0;
159 
160  if ((flags & IOFlags::OpenRead) && (flags & IOFlags::OpenWrite))
161  openflags |= O_RDWR;
162  else if (flags & IOFlags::OpenRead)
163  openflags |= O_RDONLY;
164  else if (flags & IOFlags::OpenWrite)
165  openflags |= O_WRONLY;
166 
167  if (flags & IOFlags::OpenNonBlock)
168  openflags |= O_NONBLOCK;
169 
170  if (flags & IOFlags::OpenAppend)
171  openflags |= O_APPEND;
172 
173  if (flags & IOFlags::OpenCreate)
174  openflags |= O_CREAT;
175 
176  if (flags & IOFlags::OpenExclusive)
177  openflags |= O_EXCL;
178 
179  if (flags & IOFlags::OpenTruncate)
180  openflags |= O_TRUNC;
181 
182  IOFD newfd = EDM_IOFD_INVALID;
183  if ((newfd = rfio_open64 (lname.c_str(), openflags, perms)) == -1) {
185  ex << "rfio_open(name='" << lname
186  << "', flags=0x" << std::hex << openflags
187  << ", permissions=0" << std::oct << perms << std::dec
188  << ") => error '" << rfio_serror ()
189  << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
190  ex.addContext("Calling RFIOFile::open()");
191  throw ex;
192  }
193  m_fd = newfd;
194  m_close = true;
195  m_curpos = 0;
196 
197  edm::LogInfo("RFIOFileInfo") << "Opened " << lname;
198 }
199 
200 void
202 {
203  if (m_fd == EDM_IOFD_INVALID)
204  {
205  edm::LogError("RFIOFileError")
206  << "RFIOFile::close(name='" << m_name
207  << "') called but the file is not open";
208  m_close = false;
209  return;
210  }
211 
212  serrno = 0;
213  if (rfio_close64 (m_fd) == -1)
214  {
215  // If we fail to close the file, report a warning.
216  edm::LogWarning("RFIOFileWarning")
217  << "rfio_close64(name='" << m_name
218  << "') failed with error '" << rfio_serror()
219  << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
220 
221  // When rfio_close64 fails then try the system close function as
222  // per the advice from Olof Barring from the Castor operations.
223  int status = ::close(m_fd);
224  if (status < 0)
225  edm::LogWarning("RFIOFileWarning")
226  << "RFIOFile::close(): system level close after a failed"
227  << " rfio_close64 also failed with error '" << strerror (errno)
228  << "' (error code " << errno << ")";
229  else
230  edm::LogWarning("RFIOFileWarning")
231  << "RFIOFile::close(): system level close after a failed"
232  << " rfio_close64 succeeded";
233 
234  sleep(5);
235  }
236 
237  m_close = false;
239 
240  // Caused hang. Will be added back after problem is fix
241  // edm::LogInfo("RFIOFileInfo") << "Closed " << m_name;
242 }
243 
244 void
246 {
247  serrno = 0;
248  if (m_fd != EDM_IOFD_INVALID)
249  rfio_close64 (m_fd);
250 
251  m_close = false;
253 }
254 
255 void RFIOFile::reopen (void)
256 {
257  // Remember the current position in the file
258  IOOffset lastpos = m_curpos;
259  close();
260  sleep(5);
262 
263  // Set the position back to the same place it was
264  // before the file closed and opened.
265  position(lastpos);
266 }
267 
268 ssize_t
269 RFIOFile::retryRead (void *into, IOSize n, int maxRetry /* = 10 */)
270 {
271  // Attempt to read up to maxRetry times.
272  ssize_t s;
273  do
274  {
275  serrno = 0;
276  s = rfio_read64 (m_fd, into, n);
277  if ((s == -1 && serrno == 1004) || (s > ssize_t (n)))
278  {
279  // Wait a little while to allow Castor to recover from the timeout.
280  const char *sleepTimeMsg;
281  int secondsToSleep = 5;
282  switch (maxRetry)
283  {
284  case 1:
285  sleepTimeMsg = "10 minutes";
286  secondsToSleep = 600;
287  break;
288 
289  case 2:
290  sleepTimeMsg = "5 minutes";
291  secondsToSleep = 300;
292  break;
293 
294  default:
295  sleepTimeMsg = "1 minute";
296  secondsToSleep = 60;
297  }
298 
299  edm::LogWarning("RFIOFileRetry")
300  << "RFIOFile retrying read\n"
301  << " return value from rfio_read64 = " << s << " (normally this is bytes read, -1 for error)\n"
302  << " bytes requested = " << n << " (this and bytes read are equal unless error or EOF)\n"
303  << " rfio error message = " << rfio_serror() << " (explanation from server, if possible)\n"
304  << " serrno = " << serrno << " (rfio server error code, 0 = OK, 1004 = timeout, ...)\n"
305  << " rfio_errno = " << rfio_errno << " (rfio error from actually accessing the file)\n"
306  << " current position = " << m_curpos << " (in bytes, beginning of file is 0)\n"
307  << " retries left before quitting = " << maxRetry << "\n"
308  << " will close and reopen file " << m_name << "\n"
309  << " will sleep for " << sleepTimeMsg << " before attempting retry";
311  sleep(secondsToSleep);
312 
313  // Improve the chances of success by closing and reopening
314  // the file before retrying the read. This also resets
315  // the position in the file to the correct place.
316  reopen();
317  }
318  else
319  break;
320  } while (--maxRetry > 0);
321 
322  return s;
323 }
324 
325 IOSize
326 RFIOFile::read (void *into, IOSize n)
327 {
328  // Be aware that when enabled these LogDebug prints
329  // will take more time than the read itself unless the reads
330  // are proceeding slower than under optimal conditions.
331  LogDebug("RFIOFileDebug") << "Entering RFIOFile read()";
332  double start = realNanoSecs();
333 
334  ssize_t s;
335  serrno = 0;
336  if ((s = retryRead (into, n, 3)) < 0) {
338  ex << "rfio_read(name='" << m_name << "', n=" << n << ") failed"
339  << " at position " << m_curpos << " with error '" << rfio_serror()
340  << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
341  ex.addContext("Calling RFIOFile::read()");
342  throw ex;
343  }
344  m_curpos += s;
345 
346  double end = realNanoSecs();
347  LogDebug("RFIOFileDebug")
348  << "Exiting RFIOFile read(), elapsed time = " << end - start
349  << " ns, bytes read = " << s << ", file position = " << m_curpos;
350 
351  return s;
352 }
353 
354 IOSize
356 {
358  prefetch(into, buffers);
359  return Storage::readv(into, buffers);
360 }
361 
362 IOSize
363 RFIOFile::write (const void *from, IOSize n)
364 {
365  serrno = 0;
366  ssize_t s = rfio_write64 (m_fd, from, n);
367  if (s < 0) {
368  cms::Exception ex("FileWriteError");
369  ex << "rfio_write(name='" << m_name << "', n=" << n << ") failed"
370  << " at position " << m_curpos << " with error '" << rfio_serror()
371  << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
372  ex.addContext("Calling RFIOFile::write()");
373  throw ex;
374  }
375  return s;
376 }
377 
381 IOOffset
383 {
384  if (m_fd == EDM_IOFD_INVALID) {
385  cms::Exception ex("FilePositionError");
386  ex << "RFIOFile::position() called on a closed file";
387  throw ex;
388  }
389  if (whence != CURRENT && whence != SET && whence != END) {
390  cms::Exception ex("FilePositionError");
391  ex << "RFIOFile::position() called with incorrect 'whence' parameter";
392  throw ex;
393  }
395  int mywhence = (whence == SET ? SEEK_SET
396  : whence == CURRENT ? SEEK_CUR
397  : SEEK_END);
398 
399  serrno = 0;
400  if ((result = rfio_lseek64 (m_fd, offset, mywhence)) == -1) {
401  cms::Exception ex("FilePositionError");
402  ex << "rfio_lseek(name='" << m_name << "', offset=" << offset
403  << ", whence=" << mywhence << ") failed at position "
404  << m_curpos << " with error '" << rfio_serror()
405  << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
406  ex.addContext("Calling RFIOFile::position()");
407  throw ex;
408  }
409  m_curpos = result;
410  return result;
411 }
412 
413 void
415 {
416  cms::Exception ex("FileResizeError");
417  ex << "RFIOFile::resize(name='" << m_name << "') not implemented";
418  throw ex;
419 }
420 
#define LogDebug(id)
#define RFIO_READOPT
Definition: RFIO.h:9
bool m_close
Definition: RFIOFile.h:55
virtual void resize(IOOffset size) override
Definition: RFIOFile.cc:414
void abort(void)
Definition: RFIOFile.cc:245
int rfio_write64(int s, const void *ptr, int size)
virtual IOSize readv(IOPosBuffer *into, IOSize buffers)
Definition: Storage.cc:31
void FlushMessageLog()
virtual IOSize readv(IOPosBuffer *into, IOSize buffers) override
Definition: RFIOFile.cc:355
std::string m_name
Definition: RFIOFile.h:56
virtual IOSize write(const void *from, IOSize n) override
Definition: RFIOFile.cc:363
#define rfio_errno
Definition: RFIO.h:11
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
int m_flags
Definition: RFIOFile.h:57
void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)
Definition: RFIOFile.cc:105
tuple result
Definition: mps_fire.py:84
Relative
Definition: Storage.h:23
RFIOFile(void)
Definition: RFIOFile.cc:30
int rfiosetopt(int opt, int *pval, int len)
int rfio_read64(int s, void *ptr, int size)
#define serrno
Definition: RFIO.h:10
virtual IOOffset position(void) const
Definition: Storage.cc:95
void create(const char *name, bool exclusive=false, int perms=0666)
Definition: RFIOFile.cc:77
tuple fd
Definition: ztee.py:136
#define end
Definition: vmac.h:37
static double realNanoSecs(void)
Definition: RFIOFile.cc:16
int read(void)
Definition: IOInput.cc:54
int rfio_open64(const char *filepath, int flags, int mode)
ssize_t retryRead(void *into, IOSize n, int maxRetry=10)
Definition: RFIOFile.cc:269
virtual bool prefetch(const IOPosBuffer *what, IOSize n)
Definition: Storage.cc:119
char * rfio_serror()
void reopen()
Definition: RFIOFile.cc:255
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
int IOFD
Definition: IOTypes.h:22
IOOffset m_curpos
Definition: RFIOFile.h:59
virtual void close(void) override
Definition: RFIOFile.cc:201
#define EDM_IOFD_INVALID
Definition: IOTypes.h:8
int rfio_close64(int s)
Definition: RFIO.h:29
int m_perms
Definition: RFIOFile.h:58
#define O_NONBLOCK
Definition: SysFile.h:21
size_t IOSize
Definition: IOTypes.h:14
volatile std::atomic< bool > shutdown_flag false
IOFD m_fd
Definition: RFIOFile.h:54
#define CLOCK_REALTIME
Definition: TimerService.h:32
~RFIOFile(void)
Definition: RFIOFile.cc:66
off64_t rfio_lseek64(int s, off64_t offset, int how)
tuple status
Definition: mps_update.py:57