test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
XrdFile.cc
Go to the documentation of this file.
6 #include <vector>
7 #include <sstream>
8 #include <iostream>
9 #include <assert.h>
10 #include <chrono>
11 
12 using namespace XrdAdaptor;
13 
14 // To be re-enabled when the monitoring interface is back.
15 //static const char *kCrabJobIdEnv = "CRAB_UNIQUE_JOB_ID";
16 
17 #define XRD_CL_MAX_CHUNK 512*1024
18 #define XRD_CL_MAX_SIZE 1024
19 
21  : m_offset (0),
22  m_size(-1),
23  m_close (false),
24  m_name(),
25  m_op_count(0)
26 {
27 }
28 
29 XrdFile::XrdFile (const char *name,
30  int flags /* = IOFlags::OpenRead */,
31  int perms /* = 066 */)
32  : m_offset (0),
33  m_size(-1),
34  m_close (false),
35  m_name(),
36  m_op_count(0)
37 {
38  open (name, flags, perms);
39 }
40 
42  int flags /* = IOFlags::OpenRead */,
43  int perms /* = 066 */)
44  : m_offset (0),
45  m_size(-1),
46  m_close (false),
47  m_name(),
48  m_op_count(0)
49 {
50  open (name.c_str (), flags, perms);
51 }
52 
54 {
55  if (m_close)
56  edm::LogError("XrdFileError")
57  << "Destructor called on XROOTD file '" << m_name
58  << "' but the file is still open";
59 }
60 
62 void
63 XrdFile::create (const char *name,
64  bool exclusive /* = false */,
65  int perms /* = 066 */)
66 {
67  open (name,
69  | (exclusive ? IOFlags::OpenExclusive : 0)),
70  perms);
71 }
72 
73 void
75  bool exclusive /* = false */,
76  int perms /* = 066 */)
77 {
78  open (name.c_str (),
80  | (exclusive ? IOFlags::OpenExclusive : 0)),
81  perms);
82 }
83 
84 void
86  int flags /* = IOFlags::OpenRead */,
87  int perms /* = 066 */)
88 { open (name.c_str (), flags, perms); }
89 
90 void
91 XrdFile::open (const char *name,
92  int flags /* = IOFlags::OpenRead */,
93  int perms /* = 066 */)
94 {
95  // Actual open
96  if ((name == 0) || (*name == 0)) {
98  ex << "Cannot open a file without a name";
99  ex.addContext("Calling XrdFile::open()");
100  throw ex;
101  }
102  if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) {
104  ex << "Must open file '" << name << "' at least for read or write";
105  ex.addContext("Calling XrdFile::open()");
106  throw ex;
107  }
108 
109  // Translate our flags to system flags
110  XrdCl::OpenFlags::Flags openflags = XrdCl::OpenFlags::None;
111 
112  if (flags & IOFlags::OpenWrite)
113  openflags |= XrdCl::OpenFlags::Update;
114  else if (flags & IOFlags::OpenRead)
115  openflags |= XrdCl::OpenFlags::Read;
116 
117  if (flags & IOFlags::OpenAppend) {
119  ex << "Opening file '" << name << "' in append mode not supported";
120  ex.addContext("Calling XrdFile::open()");
121  throw ex;
122  }
123 
124  if (flags & IOFlags::OpenCreate)
125  {
126  if (! (flags & IOFlags::OpenExclusive))
127  openflags |= XrdCl::OpenFlags::Delete;
128  openflags |= XrdCl::OpenFlags::New;
129  openflags |= XrdCl::OpenFlags::MakePath;
130  }
131 
132  if ((flags & IOFlags::OpenTruncate) && (flags & IOFlags::OpenWrite))
133  openflags |= XrdCl::OpenFlags::Delete;
134 
135  // Translate mode flags
137  modeflags |= (perms & S_IRUSR) ? XrdCl::Access::UR : XrdCl::Access::None;
138  modeflags |= (perms & S_IWUSR) ? XrdCl::Access::UW : XrdCl::Access::None;
139  modeflags |= (perms & S_IXUSR) ? XrdCl::Access::UX : XrdCl::Access::None;
140  modeflags |= (perms & S_IRGRP) ? XrdCl::Access::GR : XrdCl::Access::None;
141  modeflags |= (perms & S_IWGRP) ? XrdCl::Access::GW : XrdCl::Access::None;
142  modeflags |= (perms & S_IXGRP) ? XrdCl::Access::GX : XrdCl::Access::None;
143  modeflags |= (perms & S_IROTH) ? XrdCl::Access::GR : XrdCl::Access::None;
144  modeflags |= (perms & S_IWOTH) ? XrdCl::Access::GW : XrdCl::Access::None;
145  modeflags |= (perms & S_IXOTH) ? XrdCl::Access::GX : XrdCl::Access::None;
146 
147  m_requestmanager = RequestManager::getInstance(name, openflags, modeflags);
148  m_name = name;
149 
150  // Stat the file so we can keep track of the offset better.
151  auto file = getActiveFile();
152  XrdCl::XRootDStatus status;
153  XrdCl::StatInfo *statInfo = NULL;
154  if (! (status = file->Stat(false, statInfo)).IsOK()) {
156  ex << "XrdCl::File::Stat(name='" << name
157  << ") => error '" << status.ToStr()
158  << "' (errno=" << status.errNo << ", code=" << status.code << ")";
159  ex.addContext("Calling XrdFile::open()");
160  addConnection(ex);
161  throw ex;
162  }
163  assert(statInfo);
164  m_size = statInfo->GetSize();
165  delete(statInfo);
166 
167  m_offset = 0;
168  m_close = true;
169 
170  // Send the monitoring info, if available.
171  // Note: getenv is not reentrant.
172  // Commenting out until this is available in the new client.
173 /*
174  char * crabJobId = getenv(kCrabJobIdEnv);
175  if (crabJobId) {
176  kXR_unt32 dictId;
177  m_file->SendMonitoringInfo(crabJobId, &dictId);
178  edm::LogInfo("XrdFileInfo") << "Set monitoring ID to " << crabJobId << " with resulting dictId " << dictId << ".";
179  }
180 */
181 
182  edm::LogInfo("XrdFileInfo") << "Opened " << m_name;
183 
184  std::vector<std::string> sources;
185  m_requestmanager->getActiveSourceNames(sources);
186  std::stringstream ss;
187  ss << "Active sources: ";
188  for (auto const& it : sources)
189  ss << it << ", ";
190  edm::LogInfo("XrdFileInfo") << ss.str();
191 }
192 
193 void
195 {
196  if (! m_requestmanager.get())
197  {
198  edm::LogError("XrdFileError")
199  << "XrdFile::close(name='" << m_name
200  << "') called but the file is not open";
201  m_close = false;
202  return;
203  }
204 
205  m_requestmanager = nullptr; // propagate_const<T> has no reset() function
206 
207  m_close = false;
208  m_offset = 0;
209  m_size = -1;
210  edm::LogInfo("XrdFileInfo") << "Closed " << m_name;
211 }
212 
213 void
215 {
216  m_requestmanager = nullptr; // propagate_const<T> has no reset() function
217  m_close = false;
218  m_offset = 0;
219  m_size = -1;
220 }
221 
223 IOSize
224 XrdFile::read (void *into, IOSize n)
225 {
226  if (n > 0x7fffffff) {
228  ex << "XrdFile::read(name='" << m_name << "', n=" << n
229  << ") too many bytes, limit is 0x7fffffff";
230  ex.addContext("Calling XrdFile::read()");
231  addConnection(ex);
232  throw ex;
233  }
234 
235  uint32_t bytesRead = m_requestmanager->handle(into, n, m_offset).get();
236  m_offset += bytesRead;
237  return bytesRead;
238 }
239 
240 IOSize
241 XrdFile::read (void *into, IOSize n, IOOffset pos)
242 {
243  if (n > 0x7fffffff) {
245  ex << "XrdFile::read(name='" << m_name << "', n=" << n
246  << ") exceeds read size limit 0x7fffffff";
247  ex.addContext("Calling XrdFile::read()");
248  addConnection(ex);
249  throw ex;
250  }
251 
252  uint32_t bytesRead = m_requestmanager->handle(into, n, pos).get();
253 
254  return bytesRead;
255 }
256 
257 // This method is rarely used by CMS; hence, it is a small wrapper and not efficient.
258 IOSize
260 {
261  std::vector<IOPosBuffer> new_buf;
262  new_buf.reserve(n);
263  IOOffset off = 0;
264  for (IOSize i=0; i<n; i++) {
265  IOSize size = into[i].size();
266  new_buf[i] = IOPosBuffer(off, into[i].data(), size);
267  off += size;
268  }
269  return readv(&(new_buf[0]), n);
270 }
271 
272 /*
273  * A vectored scatter-gather read.
274  * Returns the total number of bytes successfully read.
275  */
276 IOSize
278 {
279  // A trivial vector read - unlikely, considering ROOT data format.
280  if (unlikely(n == 0)) {
281  return 0;
282  }
283  if (unlikely(n == 1)) {
284  return read(into[0].data(), into[0].size(), into[0].offset());
285  }
286 
287  std::shared_ptr<std::vector<IOPosBuffer> >cl(new std::vector<IOPosBuffer>);
288 
289  // CMSSW may issue large readv's; Xrootd is only able to handle
290  // 1024. Further, the splitting algorithm may slightly increase
291  // the number of buffers.
292  IOSize adjust = XRD_CL_MAX_SIZE - 2;
293  cl->reserve(n > adjust ? adjust : n);
294  IOSize idx = 0, last_idx = 0;
295  IOSize final_result = 0;
296  std::vector<std::pair<std::future<IOSize>, IOSize>> readv_futures;
297  while (idx < n)
298  {
299  cl->clear();
300  IOSize size = 0;
301  while (idx < n) {
302  unsigned rollback_count = 1;
303  IOSize current_size = size;
304  IOOffset offset = into[idx].offset();
305  IOSize length = into[idx].size();
306  size += length;
307  char * buffer = static_cast<char *>(into[idx].data());
308  while (length > XRD_CL_MAX_CHUNK) {
309  IOPosBuffer ci;
311  length -= XRD_CL_MAX_CHUNK;
312  ci.set_offset(offset);
313  offset += XRD_CL_MAX_CHUNK;
314  ci.set_data(buffer);
315  buffer += XRD_CL_MAX_CHUNK;
316  cl->emplace_back(ci);
317  rollback_count ++;
318  }
319  IOPosBuffer ci;
320  ci.set_size(length);
321  ci.set_offset(offset);
322  ci.set_data(buffer);
323  cl->emplace_back(ci);
324 
325  if (cl->size() > adjust)
326  {
327  while (rollback_count--) cl->pop_back();
328  size = current_size;
329  break;
330  }
331  else
332  {
333  idx++;
334  }
335  }
336  try
337  {
338  readv_futures.emplace_back(m_requestmanager->handle(cl), size);
339  }
340  catch (edm::Exception& ex)
341  {
342  ex.addContext("Calling XrdFile::readv()");
343  throw;
344  }
345 
346  // Assure that we have made some progress.
347  assert(last_idx < idx);
348  last_idx = idx;
349 
350  }
351  std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
353 
354  // If there are multiple readv calls, wait until all return until looking
355  // at the results of any. This guarantees that all readv's have finished
356  // by time we call .get() for the first time (in case one of the readv's
357  // result in an exception).
358  //
359  // We cannot have outstanding readv's on function exit as the XrdCl may
360  // write into the corresponding buffer at the same time as ROOT.
361  if (readv_futures.size() > 1)
362  {
363  for (auto & readv_result : readv_futures)
364  {
365  if (readv_result.first.valid())
366  {
367  readv_result.first.wait();
368  }
369  }
370  }
371 
372  for (auto & readv_result : readv_futures)
373  {
374  IOSize result = 0;
375  try
376  {
377  const int retry_count = 5;
378  for (int retries=0; retries<retry_count; retries++)
379  {
380  try
381  {
382  if (readv_result.first.valid())
383  {
384  result = readv_result.first.get();
385  }
386  }
387  catch (XrootdException& ex)
388  {
389  if ((retries != retry_count-1) && (ex.getCode() == XrdCl::errInvalidResponse))
390  {
391  edm::LogWarning("XrdAdaptorInternal") << "Got an invalid response from Xrootd server; retrying" << std::endl;
392  result = m_requestmanager->handle(cl).get();
393  }
394  else
395  {
396  throw;
397  }
398  }
399  assert(result == readv_result.second);
400  }
401  }
402  catch (edm::Exception& ex)
403  {
404  ex.addContext("Calling XrdFile::readv()");
405  throw;
406  }
407  catch (std::exception& ex)
408  {
410  newex << "A std::exception was thrown when processing an xrootd request: " << ex.what();
411  newex.addContext("Calling XrdFile::readv()");
412  throw newex;
413  }
414  final_result += result;
415  }
417 
418  edm::LogVerbatim("XrdAdaptorInternal") << "[" << m_op_count.fetch_add(1) << "] Time for readv: " << static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()) << " (sub-readv requests: " << readv_futures.size() << ")" << std::endl;
419 
420  return final_result;
421 }
422 
423 IOSize
424 XrdFile::write (const void *from, IOSize n)
425 {
426  if (n > 0x7fffffff) {
427  cms::Exception ex("FileWriteError");
428  ex << "XrdFile::write(name='" << m_name << "', n=" << n
429  << ") too many bytes, limit is 0x7fffffff";
430  ex.addContext("Calling XrdFile::write()");
431  addConnection(ex);
432  throw ex;
433  }
434  auto file = getActiveFile();
435 
436  XrdCl::XRootDStatus s = file->Write(m_offset, n, from);
437  if (!s.IsOK()) {
438  cms::Exception ex("FileWriteError");
439  ex << "XrdFile::write(name='" << m_name << "', n=" << n
440  << ") failed with error '" << s.ToStr()
441  << "' (errno=" << s.errNo << ", code=" << s.code << ")";
442  ex.addContext("Calling XrdFile::write()");
443  addConnection(ex);
444  throw ex;
445  }
446  m_offset += n;
447  assert(m_size != -1);
448  if (m_offset > m_size)
449  m_size = m_offset;
450 
451  return n;
452 }
453 
454 IOSize
455 XrdFile::write (const void *from, IOSize n, IOOffset pos)
456 {
457  if (n > 0x7fffffff) {
458  cms::Exception ex("FileWriteError");
459  ex << "XrdFile::write(name='" << m_name << "', n=" << n
460  << ") too many bytes, limit is 0x7fffffff";
461  ex.addContext("Calling XrdFile::write()");
462  addConnection(ex);
463  throw ex;
464  }
465  auto file = getActiveFile();
466 
467  XrdCl::XRootDStatus s = file->Write(pos, n, from);
468  if (!s.IsOK()) {
469  cms::Exception ex("FileWriteError");
470  ex << "XrdFile::write(name='" << m_name << "', n=" << n
471  << ") failed with error '" << s.ToStr()
472  << "' (errno=" << s.errNo << ", code=" << s.code << ")";
473  ex.addContext("Calling XrdFile::write()");
474  addConnection(ex);
475  throw ex;
476  }
477  assert (m_size != -1);
478  if (static_cast<IOOffset>(pos + n) > m_size)
479  m_size = pos + n;
480 
481  return n;
482 }
483 
484 bool
486 {
487  // The new Xrootd client does not contain any internal buffers.
488  // Hence, prefetching is disabled completely.
489  return false;
490 }
491 
495 IOOffset
497 {
498  if (! m_requestmanager.get()) {
499  cms::Exception ex("FilePositionError");
500  ex << "XrdFile::position() called on a closed file";
501  ex.addContext("Calling XrdFile::position()");
502  addConnection(ex);
503  throw ex;
504  }
505  switch (whence)
506  {
507  case SET:
508  m_offset = offset;
509  break;
510 
511  case CURRENT:
512  m_offset += offset;
513  break;
514 
515  // TODO: None of this works with concurrent writers to the file.
516  case END:
517  assert(m_size != -1);
518  m_offset = m_size + offset;
519  break;
520 
521  default:
522  cms::Exception ex("FilePositionError");
523  ex << "XrdFile::position() called with incorrect 'whence' parameter";
524  ex.addContext("Calling XrdFile::position()");
525  addConnection(ex);
526  throw ex;
527  }
528 
529  if (m_offset < 0)
530  m_offset = 0;
531  assert(m_size != -1);
532  if (m_offset > m_size)
533  m_size = m_offset;
534 
535  return m_offset;
536 }
537 
538 void
540 {
541  cms::Exception ex("FileResizeError");
542  ex << "XrdFile::resize(name='" << m_name << "') not implemented";
543  ex.addContext("Calling XrdFile::resize()");
544  addConnection(ex);
545  throw ex;
546 }
547 
548 std::shared_ptr<XrdCl::File>
550 {
551  if (!m_requestmanager.get())
552  {
553  cms::Exception ex("XrdFileLogicError");
554  ex << "Xrd::getActiveFile(name='" << m_name << "') no active request manager";
555  ex.addContext("Calling XrdFile::getActiveFile()");
556  m_requestmanager->addConnections(ex);
557  m_close = false;
558  throw ex;
559  }
560  return m_requestmanager->getActiveFile();
561 }
562 
563 void
565 {
566  if (m_requestmanager.get())
567  {
568  m_requestmanager->addConnections(ex);
569  }
570 }
571 
edm::propagate_const< std::shared_ptr< XrdAdaptor::RequestManager > > m_requestmanager
Definition: XrdFile.h:68
virtual char const * what() const
Definition: Exception.cc:141
virtual void resize(IOOffset size)
Definition: XrdFile.cc:539
int i
Definition: DBlmapReader.cc:9
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void close(void)
Definition: XrdFile.cc:194
assert(m_qm.get())
virtual void create(const char *name, bool exclusive=false, int perms=0666)
Definition: XrdFile.cc:63
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
#define NULL
Definition: scimark2.h:8
bool m_close
Definition: XrdFile.h:71
IOOffset m_offset
Definition: XrdFile.h:69
tuple result
Definition: mps_fire.py:84
Relative
Definition: Storage.h:23
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
#define unlikely(x)
virtual bool prefetch(const IOPosBuffer *what, IOSize n)
Definition: XrdFile.cc:485
XrdFile(void)
Definition: XrdFile.cc:20
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
virtual IOOffset position(void) const
Definition: Storage.cc:95
#define XRD_CL_MAX_CHUNK
Definition: XrdFile.cc:17
std::atomic< unsigned int > m_op_count
Definition: XrdFile.h:73
virtual IOSize readv(IOBuffer *into, IOSize n)
Definition: XrdFile.cc:259
#define end
Definition: vmac.h:37
int read(void)
Definition: IOInput.cc:54
std::shared_ptr< XrdCl::File > getActiveFile()
Definition: XrdFile.cc:549
element_type const * get() const
IOOffset m_size
Definition: XrdFile.h:70
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void * data(void) const
Definition: IOPosBuffer.h:59
IOSize size(void) const
Definition: IOBuffer.h:50
std::string m_name
Definition: XrdFile.h:72
IOSize size(void) const
Definition: IOPosBuffer.h:64
~XrdFile(void)
Definition: XrdFile.cc:53
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
virtual IOOffset size(void) const
Definition: Storage.cc:102
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t IOSize
Definition: IOTypes.h:14
volatile std::atomic< bool > shutdown_flag false
void addConnection(cms::Exception &)
Definition: XrdFile.cc:564
virtual void open(const char *name, int flags=IOFlags::OpenRead, int perms=0666)
Definition: XrdFile.cc:91
virtual IOSize write(const void *from, IOSize n)
Definition: XrdFile.cc:424
virtual void abort(void)
Definition: XrdFile.cc:214
#define XRD_CL_MAX_SIZE
Definition: XrdFile.cc:18
tuple status
Definition: mps_update.py:57