CMS 3D CMS Logo

TStorageFactoryFile.cc
Go to the documentation of this file.
9 #include "ReadRepacker.h"
10 #include "TFileCacheRead.h"
11 #include "TSystem.h"
12 #include "TROOT.h"
13 #include "TEnv.h"
14 #include <cerrno>
15 #include <sys/stat.h>
16 #include <unistd.h>
17 #include <fcntl.h>
18 #include <iostream>
19 #include <cassert>
20 
21 #if 0
22 #include "TTreeCache.h"
23 #include "TTree.h"
24 
25 class TTreeCacheDebug : public TTreeCache {
26 public:
27  void dump(const char *label, const char *trailer)
28  {
29  Long64_t entry = fOwner->GetReadEntry();
30  std::cerr
31  << label << ": " << entry << " "
32  << "{ fEntryMin=" << fEntryMin
33  << ", fEntryMax=" << fEntryMax
34  << ", fEntryNext=" << fEntryNext
35  << ", fZipBytes=" << fZipBytes
36  << ", fNbranches=" << fNbranches
37  << ", fNReadOk=" << fNReadOk
38  << ", fNReadMiss=" << fNReadMiss
39  << ", fNReadPref=" << fNReadPref
40  << ", fBranches=" << fBranches
41  << ", fBrNames=" << fBrNames
42  << ", fOwner=" << fOwner
43  << ", fTree=" << fTree
44  << ", fIsLearning=" << fIsLearning
45  << ", fIsManual=" << fIsManual
46  << "; fBufferSizeMin=" << fBufferSizeMin
47  << ", fBufferSize=" << fBufferSize
48  << ", fBufferLen=" << fBufferLen
49  << ", fBytesToPrefetch=" << fBytesToPrefetch
50  << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
51  << ", fAsyncReading=" << fAsyncReading
52  << ", fNseek=" << fNseek
53  << ", fNtot=" << fNtot
54  << ", fNb=" << fNb
55  << ", fSeekSize=" << fSeekSize
56  << ", fSeek=" << fSeek
57  << ", fSeekIndex=" << fSeekIndex
58  << ", fSeekSort=" << fSeekSort
59  << ", fPos=" << fPos
60  << ", fSeekLen=" << fSeekLen
61  << ", fSeekSortLen=" << fSeekSortLen
62  << ", fSeekPos=" << fSeekPos
63  << ", fLen=" << fLen
64  << ", fFile=" << fFile
65  << ", fBuffer=" << (void *) fBuffer
66  << ", fIsSorted=" << fIsSorted
67  << " }\n" << trailer;
68  }
69 };
70 #endif
71 
87 
89  StorageAccount::Operation operation) {
90  static const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
91  if (!c)
92  c = &StorageAccount::counter(token, operation);
93  return *c;
94 }
95 
98  stats.tick(0);
99 }
100 
101 // This constructor must be compatible with *all* the various built-in TFile plugins,
102 // including TXNetFile. This is why some arguments in the constructor is ignored.
103 // If there's a future T*File that is incompatible with this constructor, a new
104 // constructor will have to be added.
106  Option_t *option,
107  const char *ftitle,
108  Int_t compress,
109  Int_t netopt,
110  Bool_t parallelopen /* = kFALSE */)
111  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
112  storage_() {
113  try {
114  Initialize(path, option);
115  } catch (...) {
116  edm::threadLocalException::setException(std::current_exception()); // capture
117  }
118 }
119 
121  Option_t *option /* = "" */,
122  const char *ftitle /* = "" */,
123  Int_t compress /* = 1 */)
124  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
125  storage_() {
126  try {
127  Initialize(path, option);
128  } catch (...) {
129  edm::threadLocalException::setException(std::current_exception()); // capture
130  }
131 }
132 
133 void TStorageFactoryFile::Initialize(const char *path, Option_t *option /* = "" */) {
135 
136  // Enable AsyncReading.
137  // This was the default for 5.27, but turned off by default for 5.32.
138  // In our testing, AsyncReading is the fastest mechanism available.
139  // In 5.32, the AsyncPrefetching mechanism is preferred, but has been a
140  // performance hit in our "average case" tests.
141  gEnv->SetValue("TFile.AsyncReading", 1);
142 
143  // Parse options; at the moment we only accept read!
144  fOption = option;
145  fOption.ToUpper();
146 
147  if (fOption == "NEW")
148  fOption = "CREATE";
149 
150  Bool_t create = (fOption == "CREATE");
151  Bool_t recreate = (fOption == "RECREATE");
152  Bool_t update = (fOption == "UPDATE");
153  Bool_t read = (fOption == "READ") || (fOption == "READWRAP");
154  Bool_t readwrap = (fOption == "READWRAP");
155 
156  if (!create && !recreate && !update && !read) {
157  read = true;
158  fOption = "READ";
159  }
160 
161  if (recreate) {
162  if (!gSystem->AccessPathName(path, kFileExists))
163  gSystem->Unlink(path);
164 
165  recreate = false;
166  create = true;
167  fOption = "CREATE";
168  }
169  assert(!recreate);
170 
171  if (update && gSystem->AccessPathName(path, kFileExists)) {
172  update = kFALSE;
173  create = kTRUE;
174  }
175 
176  assert(read || update || create);
177 
178  int openFlags = IOFlags::OpenRead;
179  if (!read)
180  openFlags |= IOFlags::OpenWrite;
181  if (create)
182  openFlags |= IOFlags::OpenCreate;
183  //if (recreate) openFlags |= IOFlags::OpenCreate | IOFlags::OpenTruncate;
184  if (readwrap)
185  openFlags |= IOFlags::OpenWrap;
186 
187  // Open storage
188  if (!(storage_ = StorageFactory::get()->open(path, openFlags))) {
189  MakeZombie();
190  gDirectory = gROOT;
191  throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()") << "Cannot open file '" << path << "'";
192  }
193 
194  // Record the statistics.
195  try {
197  if (statsService.isAvailable()) {
198  statsService->setSize(path, storage_->size());
199  }
200  } catch (edm::Exception const &e) {
201  if (e.categoryCode() != edm::errors::NotFound) {
202  throw;
203  }
204  }
205 
206  fRealName = path;
207  fD = 0; // sorry, meaningless
208  fWritable = read ? kFALSE : kTRUE;
209 
210  Init(create);
211 
212  stats.tick(0);
213 }
214 
216 
220 
221 Bool_t TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) {
222  // This function needs to be optimized to minimize seeks.
223  // See TFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) in ROOT 5.27.06.
224  Seek(pos);
225  return ReadBuffer(buf, len);
226 }
227 
228 Bool_t TStorageFactoryFile::ReadBuffer(char *buf, Int_t len) {
229  // Check that it's valid to access this file.
230  if (IsZombie()) {
231  Error("ReadBuffer", "Cannot read from a zombie file");
232  return kTRUE;
233  }
234 
235  if (!IsOpen()) {
236  Error("ReadBuffer", "Cannot read from a file that is not open");
237  return kTRUE;
238  }
239 
240  // Read specified byte range from the storage. Returns kTRUE in
241  // case of error. Note that ROOT uses this function recursively
242  // to fill the cache; we use a flag to make sure our accounting
243  // is reflected in a comprehensible manner. The "read" counter
244  // will include both, "readc" indicates how much read from the
245  // cache, "readu" indicates how much we failed to read from the
246  // cache (excluding those recursive reads), and "readx" counts
247  // the amount actually passed to read from the storage object.
249 
250  // If we have a cache, read from there first. This returns 0
251  // if the block hasn't been prefetched, 1 if it was in cache,
252  // and 2 if there was an error.
253  if (TFileCacheRead *c = GetCacheRead()) {
254  Long64_t here = GetRelOffset();
255  Bool_t async = c->IsAsyncReading();
256 
257  StorageAccount::Stamp cstats(async
260 
261  Int_t st = ReadBufferViaCache(async ? nullptr : buf, len);
262 
263  if (st == 2) {
264  Error("ReadBuffer",
265  "ReadBufferViaCache failed. Asked to read nBytes: %d from offset: %lld with file size: %lld",
266  len,
267  here,
268  GetSize());
269  return kTRUE;
270  }
271 
272  if (st == 1) {
273  if (async) {
274  cstats.tick(len);
275  Seek(here);
276  } else {
277  cstats.tick(len);
278  stats.tick(len);
279  return kFALSE;
280  }
281  }
282  }
283 
284  // FIXME: Re-enable read-ahead if the data wasn't in cache.
285  // if (! st) storage_->caching(true, -1, s_readahead);
286 
287  // A real read
289  IOSize n = storage_->xread(buf, len);
290  xstats.tick(n);
291  stats.tick(n);
292  if (n < static_cast<IOSize>(len)) {
293  Error("ReadBuffer",
294  "read from Storage::xread returned %ld. Asked to read n bytes: %d from offset: %lld with file size: %lld",
295  n,
296  len,
297  GetRelOffset(),
298  GetSize());
299  }
300  return n ? kFALSE : kTRUE;
301 }
302 
303 Bool_t TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len) {
304  // Check that it's valid to access this file.
305  if (IsZombie()) {
306  Error("ReadBufferAsync", "Cannot read from a zombie file");
307  return kTRUE;
308  }
309 
310  if (!IsOpen()) {
311  Error("ReadBufferAsync", "Cannot read from a file that is not open");
312  return kTRUE;
313  }
314 
316 
317  // If asynchronous reading is disabled, bail out now, regardless
318  // whether the underlying storage supports prefetching. If it is
319  // forced on, pretend it's on, even if the storage doesn't support
320  // it, as this turns off the caching in ROOT's side.
322 
323  // Verify that we never using async reads in app-only mode
325  return kTRUE;
326 
327  // Let the I/O method indicate if it can do client-side prefetch.
328  // If it does, then for example TTreeCache will drop its own cache
329  // and will use the client-side cache of the actual I/O layer.
330  // If len is zero ROOT is probing for prefetch support.
331  if (len) {
332  // FIXME: Synchronise caching.
333  // storage_->caching(true, -1, 0);
334  ;
335  }
336 
337  IOPosBuffer iov(off, (void *)nullptr, len ? len : PREFETCH_PROBE_LENGTH);
338  if (storage_->prefetch(&iov, 1)) {
339  stats.tick(len);
340  return kFALSE;
341  }
342 
343  // Always ask ROOT to use async reads in storage-only mode,
344  // regardless of whether the storage system supports it.
346  return kFALSE;
347 
348  // Prefetching not available right now.
349  return kTRUE;
350 }
351 
352 Bool_t TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) {
373  Int_t remaining = nbuf; // Number of read requests left to process.
374  Int_t pack_count; // Number of read requests processed by this iteration.
375 
376  IOSize remaining_buffer_size = 0;
377  // Calculate the remaining buffer size for the ROOT-owned buffer by adding
378  // the size of the various requests.
379  for (Int_t i = 0; i < nbuf; i++)
380  remaining_buffer_size += len[i];
381 
382  char *current_buffer = buf;
383  Long64_t *current_pos = pos;
384  Int_t *current_len = len;
385 
386  ReadRepacker repacker;
387 
388  while (remaining > 0) {
389  pack_count = repacker.pack(
390  static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
391 
392  int real_bytes_processed = repacker.realBytesProcessed();
393  IOSize io_buffer_used = repacker.bufferUsed();
394 
395  // Issue readv, then unpack buffers.
397  std::vector<IOPosBuffer> &iov = repacker.iov();
398  IOSize result = storage_->readv(&iov[0], iov.size());
399  if (result != io_buffer_used) {
400  Error(
401  "ReadBuffersSync", "Storage::readv returned different size result=%ld expected=%ld", result, io_buffer_used);
402  return kTRUE;
403  }
404  xstats.tick(io_buffer_used);
405  repacker.unpack(current_buffer);
406 
407  // Update the location of the unused part of the input buffer.
408  remaining_buffer_size -= real_bytes_processed;
409  current_buffer += real_bytes_processed;
410 
411  current_pos += pack_count;
412  current_len += pack_count;
413  remaining -= pack_count;
414  }
415  assert(remaining_buffer_size == 0);
416  return kFALSE;
417 }
418 
419 Bool_t TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) {
420  // Check that it's valid to access this file.
421  if (IsZombie()) {
422  Error("ReadBuffers", "Cannot read from a zombie file");
423  return kTRUE;
424  }
425 
426  if (!IsOpen()) {
427  Error("ReadBuffers", "Cannot read from a file that is not open");
428  return kTRUE;
429  }
430 
431  // For synchronous reads, we have special logic to optimize the I/O requests
432  // from ROOT before handing it to the storage.
433  if (buf) {
434  return ReadBuffersSync(buf, pos, len, nbuf);
435  }
436  // For an async read, we assume the storage system is smart enough to do the
437  // optimization itself.
438 
439  // Read from underlying storage.
440  void *const nobuf = nullptr;
441  Int_t total = 0;
442  std::vector<IOPosBuffer> iov;
443  iov.reserve(nbuf);
444  for (Int_t i = 0; i < nbuf; ++i) {
445  iov.push_back(IOPosBuffer(pos[i], nobuf, len[i]));
446  total += len[i];
447  }
448 
449  // Null buffer means asynchronous reads into I/O system's cache.
450  bool success;
452  // Synchronise low-level cache with the supposed cache in TFile.
453  // storage_->caching(true, -1, 0);
454  success = storage_->prefetch(&iov[0], nbuf);
455  astats.tick(total);
456 
457  // If it didn't suceeed, pass down to the base class.
458  if (not success) {
459  if (TFile::ReadBuffers(buf, pos, len, nbuf)) {
460  Error("ReadBuffers", "call to TFile::ReadBuffers failed after prefetch already failed.");
461  return kTRUE;
462  }
463  }
464  return kFALSE;
465 }
466 
467 Bool_t TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len) {
468  // Check that it's valid to access this file.
469  if (IsZombie()) {
470  Error("WriteBuffer", "Cannot write to a zombie file");
471  return kTRUE;
472  }
473 
474  if (!IsOpen()) {
475  Error("WriteBuffer", "Cannot write to a file that is not open");
476  return kTRUE;
477  }
478 
479  if (!fWritable) {
480  Error("WriteBuffer", "File is not writable");
481  return kTRUE;
482  }
483 
486 
487  // Try first writing via a cache, and if that's not possible, directly.
488  Int_t st;
489  switch ((st = WriteBufferViaCache(buf, len))) {
490  case 0:
491  // Actual write.
492  {
494  IOSize n = storage_->xwrite(buf, len);
495  xstats.tick(n);
496  stats.tick(n);
497 
498  // FIXME: What if it's a short write?
499  return n > 0 ? kFALSE : kTRUE;
500  }
501 
502  case 1:
503  cstats.tick(len);
504  stats.tick(len);
505  return kFALSE;
506 
507  case 2:
508  default:
509  Error("WriteBuffer", "Error writing to cache");
510  return kTRUE;
511  }
512 }
513 
517 // FIXME: Override GetBytesToPrefetch() so XROOTD can suggest how
518 // large a prefetch cache to use.
519 // FIXME: Asynchronous open support?
520 
524 Int_t TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t /* mode */) {
526 
527  if (storage_) {
528  storage_->close();
529  }
530 
531  int openFlags = IOFlags::OpenRead;
532  if (flags & O_WRONLY)
533  openFlags = IOFlags::OpenWrite;
534  else if (flags & O_RDWR)
535  openFlags |= IOFlags::OpenWrite;
536  if (flags & O_CREAT)
537  openFlags |= IOFlags::OpenCreate;
538  if (flags & O_APPEND)
539  openFlags |= IOFlags::OpenAppend;
540  if (flags & O_EXCL)
541  openFlags |= IOFlags::OpenExclusive;
542  if (flags & O_TRUNC)
543  openFlags |= IOFlags::OpenTruncate;
544  if (flags & O_NONBLOCK)
545  openFlags |= IOFlags::OpenNonBlock;
546 
547  if (!(storage_ = StorageFactory::get()->open(pathname, openFlags))) {
548  MakeZombie();
549  gDirectory = gROOT;
550  throw cms::Exception("TStorageFactoryFile::SysOpen()") << "Cannot open file '" << pathname << "'";
551  }
552 
553  stats.tick();
554  return 0;
555 }
556 
557 Int_t TStorageFactoryFile::SysClose(Int_t /* fd */) {
559 
560  if (storage_) {
561  storage_->close();
562  releaseStorage();
563  }
564 
565  stats.tick();
566  return 0;
567 }
568 
569 Long64_t TStorageFactoryFile::SysSeek(Int_t /* fd */, Long64_t offset, Int_t whence) {
571  Storage::Relative rel = (whence == SEEK_SET ? Storage::SET : whence == SEEK_CUR ? Storage::CURRENT : Storage::END);
572 
573  offset = storage_->position(offset, rel);
574  stats.tick();
575  return offset;
576 }
577 
578 Int_t TStorageFactoryFile::SysSync(Int_t /* fd */) {
580  storage_->flush();
581  stats.tick();
582  return 0;
583 }
584 
585 Int_t TStorageFactoryFile::SysStat(Int_t /* fd */, Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime) {
587  // FIXME: Most of this is unsupported or makes no sense with Storage
588  *id = ::Hash(fRealName);
589  *size = storage_->size();
590  *flags = 0;
591  *modtime = 0;
592  stats.tick();
593  return 0;
594 }
595 
596 void TStorageFactoryFile::ResetErrno(void) const { TSystem::ResetErrno(); }
Code categoryCode() const
Definition: EDMException.h:97
size
Write out results.
edm::ErrorSummaryEntry Error
void ResetErrno(void) const override
CacheHint cacheHint(void) const
static StorageAccount::Counter * s_statsWrite
static StorageAccount::Counter * s_statsStat
#define PREFETCH_PROBE_LENGTH
Definition: Storage.h:18
void setSize(const std::string &urlOrLfn, size_t size)
Long64_t SysSeek(Int_t fd, Long64_t offset, Int_t whence) override
def create(alignables, pedeDump, additionalData, outputFile, config)
static StorageAccount::Counter * s_statsXWrite
static StorageAccount::Counter * s_statsClose
Int_t SysClose(Int_t fd) override
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
Bool_t ReadBufferAsync(Long64_t off, Int_t len) override
Bool_t ReadBuffer(char *buf, Int_t len) override
edm::propagate_const< std::unique_ptr< Storage > > storage_
Relative
Definition: Storage.h:23
char const * label
static const StorageFactory * get(void)
IOSize realBytesProcessed() const
Definition: ReadRepacker.h:52
static StorageAccount::Counter * s_statsARead
static StorageClassToken tokenForStorageClassName(std::string const &iName)
static StorageAccount::Counter & storageCounter(StorageAccount::Counter *&c, StorageAccount::Operation operation)
static StorageAccount::Counter * s_statsCtor
bool isAvailable() const
Definition: Service.h:40
std::string Hash
Definition: Types.h:45
double f[11][100]
int pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
Definition: ReadRepacker.cc:21
void unpack(char *buf)
static StorageAccount::Counter * s_statsRead
static StorageAccount::Counter * s_statsXRead
static StorageAccount::Counter * s_statsFlush
Int_t SysOpen(const char *pathname, Int_t flags, UInt_t mode) override
static Counter & counter(StorageClassToken token, Operation operation)
static StorageAccount::Counter * s_statsCWrite
IOSize bufferUsed() const
Definition: ReadRepacker.h:47
std::vector< IOPosBuffer > & iov()
Definition: ReadRepacker.h:45
void setException(std::exception_ptr e)
void tick(uint64_t amount=0, int64_t tick=0) const
#define update(a, b)
~TStorageFactoryFile(void) override
#define O_NONBLOCK
Definition: SysFile.h:21
size_t IOSize
Definition: IOTypes.h:14
static StorageAccount::Counter * s_statsSeek
Int_t SysSync(Int_t fd) override
static StorageAccount::Counter * s_statsOpen
Bool_t WriteBuffer(const char *buf, Int_t len) override
ClassImp(TStorageFactoryFile)
Bool_t ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) override
Bool_t ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
void Initialize(const char *name, Option_t *option="")
static StorageAccount::Counter * s_statsCPrefetch
Int_t SysStat(Int_t fd, Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime) override
static StorageAccount::Counter * s_statsCRead