CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
RFIOFile.cc
Go to the documentation of this file.
1 #define __STDC_LIMIT_MACROS 1
6 #include <cerrno>
7 #include <unistd.h>
8 #include <stdint.h>
9 #include <time.h>
10 #include <sys/time.h>
11 
12 #include <cstring>
13 #include <vector>
14 
15 static double realNanoSecs (void)
16 {
17 #if _POSIX_TIMERS > 0
18  struct timespec tm;
19  if (clock_gettime(CLOCK_REALTIME, &tm) == 0)
20  return tm.tv_sec * 1e9 + tm.tv_nsec;
21 #else
22  struct timeval tm;
23  if (gettimeofday(&tm, 0) == 0)
24  return tm.tv_sec * 1e9 + tm.tv_usec * 1e3;
25 #endif
26  return 0;
27 }
28 
30  : m_fd (EDM_IOFD_INVALID),
31  m_close (false),
32  m_flags (0),
33  m_perms (0),
34  m_curpos (0)
35 {}
36 
38  : m_fd (fd),
39  m_close (true),
40  m_flags (0),
41  m_perms (0),
42  m_curpos (0)
43 {}
44 
45 RFIOFile::RFIOFile (const char *name,
46  int flags /* = IOFlags::OpenRead */,
47  int perms /* = 066 */)
48  : m_fd (EDM_IOFD_INVALID),
49  m_close (false),
50  m_flags (0),
51  m_perms (0),
52  m_curpos (0)
53 { open (name, flags, perms); }
54 
55 RFIOFile::RFIOFile (const std::string &name,
56  int flags /* = IOFlags::OpenRead */,
57  int perms /* = 066 */)
58  : m_fd (EDM_IOFD_INVALID),
59  m_close (false),
60  m_flags (0),
61  m_perms (0),
62  m_curpos (0)
63 { open (name.c_str (), flags, perms); }
64 
66 {
67  if (m_close)
68  edm::LogError("RFIOFileError")
69  << "Destructor called on RFIO file '" << m_name
70  << "' but the file is still open";
71 }
72 
74 
75 void
76 RFIOFile::create (const char *name,
77  bool exclusive /* = false */,
78  int perms /* = 066 */)
79 {
80  open (name,
82  | (exclusive ? IOFlags::OpenExclusive : 0)),
83  perms);
84 }
85 
86 void
87 RFIOFile::create (const std::string &name,
88  bool exclusive /* = false */,
89  int perms /* = 066 */)
90 {
91  open (name.c_str (),
93  | (exclusive ? IOFlags::OpenExclusive : 0)),
94  perms);
95 }
96 
97 void
98 RFIOFile::open (const std::string &name,
99  int flags /* = IOFlags::OpenRead */,
100  int perms /* = 066 */)
101 { open (name.c_str (), flags, perms); }
102 
103 void
104 RFIOFile::open (const char *name,
105  int flags /* = IOFlags::OpenRead */,
106  int perms /* = 066 */)
107 {
108  // Save parameters for error recovery.
109  m_name = name;
110  m_flags = flags;
111  m_perms = perms;
112 
113  // Reset RFIO error code.
114  serrno = 0;
115 
116  // Disable buffering in rfio library? Note that doing this on
117  // one file disables it for everything. Not much we can do...
118  // but it does make a significant performance difference to the
119  // clients. Note also that docs say the flag turns off write
120  // buffering -- this turns off all buffering.
121  if (flags & IOFlags::OpenUnbuffered)
122  {
123  int readopt = 0;
124  rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
125  }
126  else
127  {
128  int readopt = 1;
129  rfiosetopt (RFIO_READOPT, &readopt, sizeof (readopt));
130  }
131 
132  if ((name == 0) || (*name == 0))
133  throw cms::Exception("RFIOFile::open()")
134  << "Cannot open a file without a name";
135 
136  if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0)
137  throw cms::Exception("RFIOFile::open()")
138  << "Must open file '" << name << "' at least for read or write";
139 
140  std::string lname (name);
141  if (lname.find ("//") == 0)
142  lname.erase(0, 1);
143 
144  // If I am already open, close old file first
145  if (m_fd != EDM_IOFD_INVALID && m_close)
146  close ();
147 
148  // Translate our flags to system flags
149  int openflags = 0;
150 
151  if ((flags & IOFlags::OpenRead) && (flags & IOFlags::OpenWrite))
152  openflags |= O_RDWR;
153  else if (flags & IOFlags::OpenRead)
154  openflags |= O_RDONLY;
155  else if (flags & IOFlags::OpenWrite)
156  openflags |= O_WRONLY;
157 
158  if (flags & IOFlags::OpenNonBlock)
159  openflags |= O_NONBLOCK;
160 
161  if (flags & IOFlags::OpenAppend)
162  openflags |= O_APPEND;
163 
164  if (flags & IOFlags::OpenCreate)
165  openflags |= O_CREAT;
166 
167  if (flags & IOFlags::OpenExclusive)
168  openflags |= O_EXCL;
169 
170  if (flags & IOFlags::OpenTruncate)
171  openflags |= O_TRUNC;
172 
173  IOFD newfd = EDM_IOFD_INVALID;
174  if ((newfd = rfio_open64 (lname.c_str(), openflags, perms)) == -1)
175  throw cms::Exception("RFIOFile::open()")
176  << "rfio_open(name='" << lname
177  << "', flags=0x" << std::hex << openflags
178  << ", permissions=0" << std::oct << perms << std::dec
179  << ") => error '" << rfio_serror ()
180  << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
181 
182  m_fd = newfd;
183  m_close = true;
184  m_curpos = 0;
185 
186  edm::LogInfo("RFIOFileInfo") << "Opened " << lname;
187 }
188 
189 void
191 {
192  if (m_fd == EDM_IOFD_INVALID)
193  {
194  edm::LogError("RFIOFileError")
195  << "RFIOFile::close(name='" << m_name
196  << "') called but the file is not open";
197  m_close = false;
198  return;
199  }
200 
201  serrno = 0;
202  if (rfio_close64 (m_fd) == -1)
203  {
204  // If we fail to close the file, report a warning.
205  edm::LogWarning("RFIOFileWarning")
206  << "rfio_close64(name='" << m_name
207  << "') failed with error '" << rfio_serror()
208  << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
209 
210  // When rfio_close64 fails then try the system close function as
211  // per the advice from Olof Barring from the Castor operations.
212  int status = ::close(m_fd);
213  if (status < 0)
214  edm::LogWarning("RFIOFileWarning")
215  << "RFIOFile::close(): system level close after a failed"
216  << " rfio_close64 also failed with error '" << strerror (errno)
217  << "' (error code " << errno << ")";
218  else
219  edm::LogWarning("RFIOFileWarning")
220  << "RFIOFile::close(): system level close after a failed"
221  << " rfio_close64 succeeded";
222 
223  sleep(5);
224  }
225 
226  m_close = false;
228 
229  // Caused hang. Will be added back after problem is fix
230  // edm::LogInfo("RFIOFileInfo") << "Closed " << m_name;
231 }
232 
233 void
235 {
236  serrno = 0;
237  if (m_fd != EDM_IOFD_INVALID)
238  rfio_close64 (m_fd);
239 
240  m_close = false;
242 }
243 
244 void RFIOFile::reopen (void)
245 {
246  // Remember the current position in the file
247  IOOffset lastpos = m_curpos;
248  close();
249  sleep(5);
251 
252  // Set the position back to the same place it was
253  // before the file closed and opened.
254  position(lastpos);
255 }
256 
257 ssize_t
258 RFIOFile::retryRead (void *into, IOSize n, int maxRetry /* = 10 */)
259 {
260  // Attempt to read up to maxRetry times.
261  ssize_t s;
262  do
263  {
264  serrno = 0;
265  s = rfio_read64 (m_fd, into, n);
266  if ((s == -1 && serrno == 1004) || (s > ssize_t (n)))
267  {
268  // Wait a little while to allow Castor to recover from the timeout.
269  const char *sleepTimeMsg;
270  int secondsToSleep = 5;
271  switch (maxRetry)
272  {
273  case 1:
274  sleepTimeMsg = "10 minutes";
275  secondsToSleep = 600;
276  break;
277 
278  case 2:
279  sleepTimeMsg = "5 minutes";
280  secondsToSleep = 300;
281  break;
282 
283  default:
284  sleepTimeMsg = "1 minute";
285  secondsToSleep = 60;
286  }
287 
288  edm::LogWarning("RFIOFileRetry")
289  << "RFIOFile retrying read\n"
290  << " return value from rfio_read64 = " << s << " (normally this is bytes read, -1 for error)\n"
291  << " bytes requested = " << n << " (this and bytes read are equal unless error or EOF)\n"
292  << " rfio error message = " << rfio_serror() << " (explanation from server, if possible)\n"
293  << " serrno = " << serrno << " (rfio server error code, 0 = OK, 1004 = timeout, ...)\n"
294  << " rfio_errno = " << rfio_errno << " (rfio error from actually accessing the file)\n"
295  << " current position = " << m_curpos << " (in bytes, beginning of file is 0)\n"
296  << " retries left before quitting = " << maxRetry << "\n"
297  << " will close and reopen file " << m_name << "\n"
298  << " will sleep for " << sleepTimeMsg << " before attempting retry";
300  sleep(secondsToSleep);
301 
302  // Improve the chances of success by closing and reopening
303  // the file before retrying the read. This also resets
304  // the position in the file to the correct place.
305  reopen();
306  }
307  else
308  break;
309  } while (--maxRetry > 0);
310 
311  return s;
312 }
313 
314 IOSize
315 RFIOFile::read (void *into, IOSize n)
316 {
317  // Be aware that when enabled these LogDebug prints
318  // will take more time than the read itself unless the reads
319  // are proceeding slower than under optimal conditions.
320  LogDebug("RFIOFileDebug") << "Entering RFIOFile read()";
321  double start = realNanoSecs();
322 
323  ssize_t s;
324  serrno = 0;
325  if ((s = retryRead (into, n, 3)) < 0)
326  throw cms::Exception("RFIOFile::read()")
327  << "rfio_read(name='" << m_name << "', n=" << n << ") failed"
328  << " at position " << m_curpos << " with error '" << rfio_serror()
329  << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
330 
331  m_curpos += s;
332 
333  double end = realNanoSecs();
334  LogDebug("RFIOFileDebug")
335  << "Exiting RFIOFile read(), elapsed time = " << end - start
336  << " ns, bytes read = " << s << ", file position = " << m_curpos;
337 
338  return s;
339 }
340 
341 IOSize
343 {
345  prefetch(into, buffers);
346  return Storage::readv(into, buffers);
347 }
348 
349 IOSize
350 RFIOFile::write (const void *from, IOSize n)
351 {
352  serrno = 0;
353  ssize_t s = rfio_write64 (m_fd, from, n);
354  if (s < 0)
355  throw cms::Exception("RFIOFile::write()")
356  << "rfio_write(name='" << m_name << "', n=" << n << ") failed"
357  << " at position " << m_curpos << " with error '" << rfio_serror()
358  << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
359  return s;
360 }
361 
365 IOOffset
367 {
368  if (m_fd == EDM_IOFD_INVALID)
369  throw cms::Exception("RFIOFile::position()")
370  << "RFIOFile::position() called on a closed file";
371  if (whence != CURRENT && whence != SET && whence != END)
372  throw cms::Exception("RFIOFile::position()")
373  << "RFIOFile::position() called with incorrect 'whence' parameter";
374 
376  int mywhence = (whence == SET ? SEEK_SET
377  : whence == CURRENT ? SEEK_CUR
378  : SEEK_END);
379 
380  serrno = 0;
381  if ((result = rfio_lseek64 (m_fd, offset, mywhence)) == -1)
382  throw cms::Exception("RFIOFile::position()")
383  << "rfio_lseek(name='" << m_name << "', offset=" << offset
384  << ", whence=" << mywhence << ") failed at position "
385  << m_curpos << " with error '" << rfio_serror()
386  << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno << ")";
387 
388  m_curpos = result;
389  return result;
390 }
391 
392 void
394 {
395  throw cms::Exception("RFIOFile::resize()")
396  << "RFIOFile::resize(name='" << m_name << "') not implemented";
397 }
398 
399 bool
401 {
402  if (rfioreadopt (RFIO_READOPT) != 1)
403  throw cms::Exception("RFIOFile::preseek()")
404  << "RFIOFile::prefetch() called but RFIO_READOPT="
405  << rfioreadopt (RFIO_READOPT) << " (must be 1)";
406 
407  std::vector<iovec64> iov (n);
408  for (IOSize i = 0; i < n; ++i)
409  {
410  iov[i].iov_base = what[i].offset();
411  iov[i].iov_len = what[i].size();
412  }
413 
414  serrno = 0;
415  int retry = 5;
416  int result;
417  while ((result = rfio_preseek64(m_fd, &iov[0], n)) == -1)
418  {
419  if (--retry <= 0)
420  {
421  edm::LogError("RFIOFile::prefetch")
422  << "RFIOFile::prefetch(name='" << m_name << "') failed with error '"
423  << rfio_serror() << "' (rfio_errno=" << rfio_errno
424  << ", serrno=" << serrno << ")";
425  return false;
426  }
427  else
428  {
429  edm::LogWarning("RFIOFileRetry")
430  << "RFIOFile::prefetch(name='" << m_name << "') failed at position "
431  << m_curpos << " with error '" << rfio_serror()
432  << "' (rfio_errno=" << rfio_errno << ", serrno=" << serrno
433  << "); retrying " << (retry+1) << " times";
434  serrno = 0;
435  sleep(5);
436  }
437  }
438 
439  return true;
440 }
#define LogDebug(id)
#define RFIO_READOPT
Definition: RFIO.h:9
bool m_close
Definition: RFIOFile.h:51
int i
Definition: DBlmapReader.cc:9
virtual void abort(void)
Definition: RFIOFile.cc:234
virtual IOSize write(const void *from, IOSize n)
Definition: RFIOFile.cc:350
int rfio_write64(int s, const void *ptr, int size)
virtual IOSize readv(IOPosBuffer *into, IOSize buffers)
Definition: Storage.cc:31
void FlushMessageLog()
std::string m_name
Definition: RFIOFile.h:52
#define rfio_errno
Definition: RFIO.h:11
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
int m_flags
Definition: RFIOFile.h:53
void sleep(Duration_t)
Definition: Utils.h:163
virtual IOSize readv(IOPosBuffer *into, IOSize buffers)
Definition: RFIOFile.cc:342
virtual void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)
Definition: RFIOFile.cc:104
Relative
Definition: Storage.h:11
RFIOFile(void)
Definition: RFIOFile.cc:29
int rfiosetopt(int opt, int *pval, int len)
int rfio_read64(int s, void *ptr, int size)
virtual void resize(IOOffset size)
Definition: RFIOFile.cc:393
tuple iov
Definition: o2o.py:307
#define serrno
Definition: RFIO.h:10
virtual IOOffset position(void) const
Definition: Storage.cc:95
tuple result
Definition: query.py:137
virtual void create(const char *name, bool exclusive=false, int perms=0666)
Definition: RFIOFile.cc:76
virtual void close(void)
Definition: RFIOFile.cc:190
#define end
Definition: vmac.h:38
static double realNanoSecs(void)
Definition: RFIOFile.cc:15
int read(void)
Definition: IOInput.cc:54
unsigned int offset(bool)
int rfio_open64(const char *filepath, int flags, int mode)
IOOffset offset(void) const
Definition: IOPosBuffer.h:50
ssize_t retryRead(void *into, IOSize n, int maxRetry=10)
Definition: RFIOFile.cc:258
static std::string from(" from ")
char * rfio_serror()
void reopen()
Definition: RFIOFile.cc:244
IOSize size(void) const
Definition: IOPosBuffer.h:60
int64_t IOOffset
Definition: IOTypes.h:19
int IOFD
Definition: IOTypes.h:22
IOOffset m_curpos
Definition: RFIOFile.h:55
#define EDM_IOFD_INVALID
Definition: IOTypes.h:8
int rfio_close64(int s)
Definition: RFIO.h:26
int m_perms
Definition: RFIOFile.h:54
#define O_NONBLOCK
Definition: SysFile.h:21
size_t IOSize
Definition: IOTypes.h:14
string s
Definition: asciidump.py:422
virtual bool prefetch(const IOPosBuffer *what, IOSize n)
Definition: RFIOFile.cc:400
tuple status
Definition: ntuplemaker.py:245
int rfioreadopt(int opt)
IOFD m_fd
Definition: RFIOFile.h:50
~RFIOFile(void)
Definition: RFIOFile.cc:65
off64_t rfio_lseek64(int s, off64_t offset, int how)
int rfio_preseek64(int, struct iovec64 *, int)