CMS 3D CMS Logo

TStorageFactoryFile.cc
Go to the documentation of this file.
9 #include "ReadRepacker.h"
10 #include "TFileCacheRead.h"
11 #include "TSystem.h"
12 #include "TROOT.h"
13 #include "TEnv.h"
14 #include <cerrno>
15 #include <sys/stat.h>
16 #include <unistd.h>
17 #include <fcntl.h>
18 #include <iostream>
19 #include <cassert>
20 #include <atomic>
21 
22 #if 0
23 #include "TTreeCache.h"
24 #include "TTree.h"
25 
26 class TTreeCacheDebug : public TTreeCache {
27 public:
28  void dump(const char *label, const char *trailer)
29  {
30  Long64_t entry = fOwner->GetReadEntry();
31  std::cerr
32  << label << ": " << entry << " "
33  << "{ fEntryMin=" << fEntryMin
34  << ", fEntryMax=" << fEntryMax
35  << ", fEntryNext=" << fEntryNext
36  << ", fZipBytes=" << fZipBytes
37  << ", fNbranches=" << fNbranches
38  << ", fNReadOk=" << fNReadOk
39  << ", fNReadMiss=" << fNReadMiss
40  << ", fNReadPref=" << fNReadPref
41  << ", fBranches=" << fBranches
42  << ", fBrNames=" << fBrNames
43  << ", fOwner=" << fOwner
44  << ", fTree=" << fTree
45  << ", fIsLearning=" << fIsLearning
46  << ", fIsManual=" << fIsManual
47  << "; fBufferSizeMin=" << fBufferSizeMin
48  << ", fBufferSize=" << fBufferSize
49  << ", fBufferLen=" << fBufferLen
50  << ", fBytesToPrefetch=" << fBytesToPrefetch
51  << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
52  << ", fAsyncReading=" << fAsyncReading
53  << ", fNseek=" << fNseek
54  << ", fNtot=" << fNtot
55  << ", fNb=" << fNb
56  << ", fSeekSize=" << fSeekSize
57  << ", fSeek=" << fSeek
58  << ", fSeekIndex=" << fSeekIndex
59  << ", fSeekSort=" << fSeekSort
60  << ", fPos=" << fPos
61  << ", fSeekLen=" << fSeekLen
62  << ", fSeekSortLen=" << fSeekSortLen
63  << ", fSeekPos=" << fSeekPos
64  << ", fLen=" << fLen
65  << ", fFile=" << fFile
66  << ", fBuffer=" << (void *) fBuffer
67  << ", fIsSorted=" << fIsSorted
68  << " }\n" << trailer;
69  }
70 };
71 #endif
72 
73 using namespace edm::storage;
74 
76 static std::atomic<StorageAccount::Counter *> s_statsCtor = nullptr;
77 static std::atomic<StorageAccount::Counter *> s_statsOpen = nullptr;
78 static std::atomic<StorageAccount::Counter *> s_statsClose = nullptr;
79 static std::atomic<StorageAccount::Counter *> s_statsFlush = nullptr;
80 static std::atomic<StorageAccount::Counter *> s_statsStat = nullptr;
81 static std::atomic<StorageAccount::Counter *> s_statsSeek = nullptr;
82 static std::atomic<StorageAccount::Counter *> s_statsRead = nullptr;
83 static std::atomic<StorageAccount::Counter *> s_statsCRead = nullptr;
84 static std::atomic<StorageAccount::Counter *> s_statsCPrefetch = nullptr;
85 static std::atomic<StorageAccount::Counter *> s_statsARead = nullptr;
86 static std::atomic<StorageAccount::Counter *> s_statsXRead = nullptr;
87 static std::atomic<StorageAccount::Counter *> s_statsWrite = nullptr;
88 static std::atomic<StorageAccount::Counter *> s_statsCWrite = nullptr;
89 static std::atomic<StorageAccount::Counter *> s_statsXWrite = nullptr;
90 
91 static inline StorageAccount::Counter &storageCounter(std::atomic<StorageAccount::Counter *> &c,
92  StorageAccount::Operation operation) {
93  static const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
94  if (!c) {
95  auto v = &StorageAccount::counter(token, operation);
96  StorageAccount::Counter *expected = nullptr;
97  c.compare_exchange_strong(expected, v);
98  }
99  return *c.load();
100 }
101 
104  stats.tick(0);
105 }
106 
107 // This constructor must be compatible with *all* the various built-in TFile plugins,
108 // including TXNetFile. This is why some arguments in the constructor is ignored.
109 // If there's a future T*File that is incompatible with this constructor, a new
110 // constructor will have to be added.
112  Option_t *option,
113  const char *ftitle,
114  Int_t compress,
115  Int_t netopt,
116  Bool_t parallelopen /* = kFALSE */)
117  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
118  storage_() {
119  try {
121  } catch (...) {
122  edm::threadLocalException::setException(std::current_exception()); // capture
123  }
124 }
125 
127  Option_t *option /* = "" */,
128  const char *ftitle /* = "" */,
129  Int_t compress /* = 1 */)
130  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
131  storage_() {
132  try {
134  } catch (...) {
135  edm::threadLocalException::setException(std::current_exception()); // capture
136  }
137 }
138 
139 void TStorageFactoryFile::Initialize(const char *path, Option_t *option /* = "" */) {
141 
142  // Enable AsyncReading.
143  // This was the default for 5.27, but turned off by default for 5.32.
144  // In our testing, AsyncReading is the fastest mechanism available.
145  // In 5.32, the AsyncPrefetching mechanism is preferred, but has been a
146  // performance hit in our "average case" tests.
147  gEnv->SetValue("TFile.AsyncReading", 1);
148 
149  // Parse options; at the moment we only accept read!
150  fOption = option;
151  fOption.ToUpper();
152 
153  if (fOption == "NEW")
154  fOption = "CREATE";
155 
156  Bool_t create = (fOption == "CREATE");
157  Bool_t recreate = (fOption == "RECREATE");
158  Bool_t update = (fOption == "UPDATE");
159  Bool_t read = (fOption == "READ") || (fOption == "READWRAP");
160  Bool_t readwrap = (fOption == "READWRAP");
161 
162  if (!create && !recreate && !update && !read) {
163  read = true;
164  fOption = "READ";
165  }
166 
167  if (recreate) {
168  if (!gSystem->AccessPathName(path, kFileExists))
169  gSystem->Unlink(path);
170 
171  recreate = false;
172  create = true;
173  fOption = "CREATE";
174  }
175  assert(!recreate);
176 
177  if (update && gSystem->AccessPathName(path, kFileExists)) {
178  update = kFALSE;
179  create = kTRUE;
180  }
181 
182  assert(read || update || create);
183 
184  int openFlags = IOFlags::OpenRead;
185  if (!read)
186  openFlags |= IOFlags::OpenWrite;
187  if (create)
188  openFlags |= IOFlags::OpenCreate;
189  //if (recreate) openFlags |= IOFlags::OpenCreate | IOFlags::OpenTruncate;
190  if (readwrap)
191  openFlags |= IOFlags::OpenWrap;
192 
193  // Open storage
194  if (!(storage_ = StorageFactory::get()->open(path, openFlags))) {
195  MakeZombie();
196  gDirectory = gROOT;
197  throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()") << "Cannot open file '" << path << "'";
198  }
199 
200  // Record the statistics.
201  try {
203  if (statsService.isAvailable()) {
204  statsService->setSize(path, storage_->size());
205  }
206  } catch (edm::Exception const &e) {
207  if (e.categoryCode() != edm::errors::NotFound) {
208  throw;
209  }
210  }
211 
212  fRealName = path;
213  fD = 0; // sorry, meaningless
214  fWritable = read ? kFALSE : kTRUE;
215 
216  Init(create);
217 
218  stats.tick(0);
219 }
220 
222 
226 
227 Bool_t TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) {
228  // This function needs to be optimized to minimize seeks.
229  // See TFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) in ROOT 5.27.06.
230  Seek(pos);
231  return ReadBuffer(buf, len);
232 }
233 
234 Bool_t TStorageFactoryFile::ReadBuffer(char *buf, Int_t len) {
235  // Check that it's valid to access this file.
236  if (IsZombie()) {
237  Error("ReadBuffer", "Cannot read from a zombie file");
238  return kTRUE;
239  }
240 
241  if (!IsOpen()) {
242  Error("ReadBuffer", "Cannot read from a file that is not open");
243  return kTRUE;
244  }
245 
246  // Read specified byte range from the storage. Returns kTRUE in
247  // case of error. Note that ROOT uses this function recursively
248  // to fill the cache; we use a flag to make sure our accounting
249  // is reflected in a comprehensible manner. The "read" counter
250  // will include both, "readc" indicates how much read from the
251  // cache, "readu" indicates how much we failed to read from the
252  // cache (excluding those recursive reads), and "readx" counts
253  // the amount actually passed to read from the storage object.
255 
256  // If we have a cache, read from there first. This returns 0
257  // if the block hasn't been prefetched, 1 if it was in cache,
258  // and 2 if there was an error.
259  if (TFileCacheRead *c = GetCacheRead()) {
260  Long64_t here = GetRelOffset();
261  Bool_t async = c->IsAsyncReading();
262 
263  StorageAccount::Stamp cstats(async
264  ? storageCounter(s_statsCPrefetch, StorageAccount::Operation::readPrefetchToCache)
265  : storageCounter(s_statsCRead, StorageAccount::Operation::readViaCache));
266 
267  Int_t st = ReadBufferViaCache(async ? nullptr : buf, len);
268 
269  if (st == 2) {
270  Error("ReadBuffer",
271  "ReadBufferViaCache failed. Asked to read nBytes: %d from offset: %lld with file size: %lld",
272  len,
273  here,
274  GetSize());
275  return kTRUE;
276  }
277 
278  if (st == 1) {
279  if (async) {
280  cstats.tick(len);
281  Seek(here);
282  } else {
283  cstats.tick(len);
284  stats.tick(len);
285  return kFALSE;
286  }
287  }
288  }
289 
290  // FIXME: Re-enable read-ahead if the data wasn't in cache.
291  // if (! st) storage_->caching(true, -1, s_readahead);
292 
293  // A real read
294  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, StorageAccount::Operation::readActual));
295  IOSize n = storage_->xread(buf, len);
296  xstats.tick(n);
297  stats.tick(n);
298  if (n < static_cast<IOSize>(len)) {
299  Error("ReadBuffer",
300  "read from Storage::xread returned %ld. Asked to read n bytes: %d from offset: %lld with file size: %lld",
301  n,
302  len,
303  GetRelOffset(),
304  GetSize());
305  }
306  return n ? kFALSE : kTRUE;
307 }
308 
309 Bool_t TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len) {
310  // Check that it's valid to access this file.
311  if (IsZombie()) {
312  Error("ReadBufferAsync", "Cannot read from a zombie file");
313  return kTRUE;
314  }
315 
316  if (!IsOpen()) {
317  Error("ReadBufferAsync", "Cannot read from a file that is not open");
318  return kTRUE;
319  }
320 
321  StorageAccount::Stamp stats(storageCounter(s_statsARead, StorageAccount::Operation::readAsync));
322 
323  // If asynchronous reading is disabled, bail out now, regardless
324  // whether the underlying storage supports prefetching. If it is
325  // forced on, pretend it's on, even if the storage doesn't support
326  // it, as this turns off the caching in ROOT's side.
328 
329  // Verify that we never using async reads in app-only mode
330  if (f->cacheHint() == StorageFactory::CACHE_HINT_APPLICATION)
331  return kTRUE;
332 
333  // Let the I/O method indicate if it can do client-side prefetch.
334  // If it does, then for example TTreeCache will drop its own cache
335  // and will use the client-side cache of the actual I/O layer.
336  // If len is zero ROOT is probing for prefetch support.
337  if (len) {
338  // FIXME: Synchronise caching.
339  // storage_->caching(true, -1, 0);
340  ;
341  }
342 
343  IOPosBuffer iov(off, (void *)nullptr, len ? len : PREFETCH_PROBE_LENGTH);
344  if (storage_->prefetch(&iov, 1)) {
345  stats.tick(len);
346  return kFALSE;
347  }
348 
349  // Always ask ROOT to use async reads in storage-only mode,
350  // regardless of whether the storage system supports it.
351  if (f->cacheHint() == StorageFactory::CACHE_HINT_STORAGE)
352  return kFALSE;
353 
354  // Prefetching not available right now.
355  return kTRUE;
356 }
357 
358 Bool_t TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) {
379  Int_t remaining = nbuf; // Number of read requests left to process.
380  Int_t pack_count; // Number of read requests processed by this iteration.
381 
382  IOSize remaining_buffer_size = 0;
383  // Calculate the remaining buffer size for the ROOT-owned buffer by adding
384  // the size of the various requests.
385  for (Int_t i = 0; i < nbuf; i++)
386  remaining_buffer_size += len[i];
387 
388  char *current_buffer = buf;
389  Long64_t *current_pos = pos;
390  Int_t *current_len = len;
391 
392  ReadRepacker repacker;
393 
394  while (remaining > 0) {
395  pack_count = repacker.pack(
396  static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
397 
398  int real_bytes_processed = repacker.realBytesProcessed();
399  IOSize io_buffer_used = repacker.bufferUsed();
400 
401  // Issue readv, then unpack buffers.
402  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, StorageAccount::Operation::readActual));
403  std::vector<IOPosBuffer> &iov = repacker.iov();
404  IOSize result = storage_->readv(&iov[0], iov.size());
405  if (result != io_buffer_used) {
406  Error(
407  "ReadBuffersSync", "Storage::readv returned different size result=%ld expected=%ld", result, io_buffer_used);
408  return kTRUE;
409  }
410  xstats.tick(io_buffer_used);
411  repacker.unpack(current_buffer);
412 
413  // Update the location of the unused part of the input buffer.
414  remaining_buffer_size -= real_bytes_processed;
415  current_buffer += real_bytes_processed;
416 
417  current_pos += pack_count;
418  current_len += pack_count;
419  remaining -= pack_count;
420  }
421  assert(remaining_buffer_size == 0);
422  return kFALSE;
423 }
424 
425 Bool_t TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) {
426  // Check that it's valid to access this file.
427  if (IsZombie()) {
428  Error("ReadBuffers", "Cannot read from a zombie file");
429  return kTRUE;
430  }
431 
432  if (!IsOpen()) {
433  Error("ReadBuffers", "Cannot read from a file that is not open");
434  return kTRUE;
435  }
436 
437  // For synchronous reads, we have special logic to optimize the I/O requests
438  // from ROOT before handing it to the storage.
439  if (buf) {
440  return ReadBuffersSync(buf, pos, len, nbuf);
441  }
442  // For an async read, we assume the storage system is smart enough to do the
443  // optimization itself.
444 
445  // Read from underlying storage.
446  void *const nobuf = nullptr;
447  Int_t total = 0;
448  std::vector<IOPosBuffer> iov;
449  iov.reserve(nbuf);
450  for (Int_t i = 0; i < nbuf; ++i) {
451  iov.emplace_back(pos[i], nobuf, len[i]);
452  total += len[i];
453  }
454 
455  // Null buffer means asynchronous reads into I/O system's cache.
456  bool success;
457  StorageAccount::Stamp astats(storageCounter(s_statsARead, StorageAccount::Operation::readAsync));
458  // Synchronise low-level cache with the supposed cache in TFile.
459  // storage_->caching(true, -1, 0);
460  success = storage_->prefetch(iov.data(), nbuf);
461  astats.tick(total);
462 
463  // If it didn't suceeed, pass down to the base class.
464  if (not success) {
465  if (TFile::ReadBuffers(buf, pos, len, nbuf)) {
466  Error("ReadBuffers", "call to TFile::ReadBuffers failed after prefetch already failed.");
467  return kTRUE;
468  }
469  }
470  return kFALSE;
471 }
472 
473 Bool_t TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len) {
474  // Check that it's valid to access this file.
475  if (IsZombie()) {
476  Error("WriteBuffer", "Cannot write to a zombie file");
477  return kTRUE;
478  }
479 
480  if (!IsOpen()) {
481  Error("WriteBuffer", "Cannot write to a file that is not open");
482  return kTRUE;
483  }
484 
485  if (!fWritable) {
486  Error("WriteBuffer", "File is not writable");
487  return kTRUE;
488  }
489 
491  StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, StorageAccount::Operation::writeViaCache));
492 
493  // Try first writing via a cache, and if that's not possible, directly.
494  Int_t st;
495  switch ((st = WriteBufferViaCache(buf, len))) {
496  case 0:
497  // Actual write.
498  {
499  StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, StorageAccount::Operation::writeActual));
500  IOSize n = storage_->xwrite(buf, len);
501  xstats.tick(n);
502  stats.tick(n);
503 
504  // FIXME: What if it's a short write?
505  return n > 0 ? kFALSE : kTRUE;
506  }
507 
508  case 1:
509  cstats.tick(len);
510  stats.tick(len);
511  return kFALSE;
512 
513  case 2:
514  default:
515  Error("WriteBuffer", "Error writing to cache");
516  return kTRUE;
517  }
518 }
519 
523 // FIXME: Override GetBytesToPrefetch() so XROOTD can suggest how
524 // large a prefetch cache to use.
525 // FIXME: Asynchronous open support?
526 
530 Int_t TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t /* mode */) {
531  StorageAccount::Stamp stats(storageCounter(s_statsOpen, StorageAccount::Operation::open));
532 
533  if (storage_) {
534  storage_->close();
535  }
536 
537  int openFlags = IOFlags::OpenRead;
538  if (flags & O_WRONLY)
539  openFlags = IOFlags::OpenWrite;
540  else if (flags & O_RDWR)
541  openFlags |= IOFlags::OpenWrite;
542  if (flags & O_CREAT)
543  openFlags |= IOFlags::OpenCreate;
544  if (flags & O_APPEND)
545  openFlags |= IOFlags::OpenAppend;
546  if (flags & O_EXCL)
547  openFlags |= IOFlags::OpenExclusive;
548  if (flags & O_TRUNC)
549  openFlags |= IOFlags::OpenTruncate;
550  if (flags & O_NONBLOCK)
551  openFlags |= IOFlags::OpenNonBlock;
552 
553  if (!(storage_ = StorageFactory::get()->open(pathname, openFlags))) {
554  MakeZombie();
555  gDirectory = gROOT;
556  throw cms::Exception("TStorageFactoryFile::SysOpen()") << "Cannot open file '" << pathname << "'";
557  }
558 
559  stats.tick();
560  return 0;
561 }
562 
563 Int_t TStorageFactoryFile::SysClose(Int_t /* fd */) {
564  StorageAccount::Stamp stats(storageCounter(s_statsClose, StorageAccount::Operation::close));
565 
566  if (storage_) {
567  storage_->close();
568  releaseStorage();
569  }
570 
571  stats.tick();
572  return 0;
573 }
574 
575 Long64_t TStorageFactoryFile::SysSeek(Int_t /* fd */, Long64_t offset, Int_t whence) {
576  StorageAccount::Stamp stats(storageCounter(s_statsSeek, StorageAccount::Operation::seek));
577  Storage::Relative rel = (whence == SEEK_SET ? Storage::SET : whence == SEEK_CUR ? Storage::CURRENT : Storage::END);
578 
579  offset = storage_->position(offset, rel);
580  stats.tick();
581  return offset;
582 }
583 
584 Int_t TStorageFactoryFile::SysSync(Int_t /* fd */) {
585  StorageAccount::Stamp stats(storageCounter(s_statsFlush, StorageAccount::Operation::flush));
586  storage_->flush();
587  stats.tick();
588  return 0;
589 }
590 
591 Int_t TStorageFactoryFile::SysStat(Int_t /* fd */, Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime) {
593  // FIXME: Most of this is unsupported or makes no sense with Storage
594  *id = ::Hash(fRealName);
595  *size = storage_->size();
596  *flags = 0;
597  *modtime = 0;
598  stats.tick();
599  return 0;
600 }
601 
602 void TStorageFactoryFile::ResetErrno(void) const { TSystem::ResetErrno(); }
size
Write out results.
edm::ErrorSummaryEntry Error
static std::atomic< StorageAccount::Counter * > s_statsXRead
static StorageClassToken tokenForStorageClassName(std::string const &iName)
void setSize(const std::string &urlOrLfn, size_t size)
static StorageAccount::Counter & storageCounter(std::atomic< StorageAccount::Counter *> &c, StorageAccount::Operation operation)
Long64_t SysSeek(Int_t fd, Long64_t offset, Int_t whence) override
def create(alignables, pedeDump, additionalData, outputFile, config)
Int_t SysClose(Int_t fd) override
static Counter & counter(StorageClassToken token, Operation operation)
static std::atomic< StorageAccount::Counter * > s_statsFlush
static std::atomic< StorageAccount::Counter * > s_statsSeek
void tick(uint64_t amount=0, int64_t tick=0) const
Bool_t ReadBufferAsync(Long64_t off, Int_t len) override
static std::atomic< StorageAccount::Counter * > s_statsCPrefetch
static std::atomic< StorageAccount::Counter * > s_statsWrite
static std::atomic< StorageAccount::Counter * > s_statsXWrite
assert(be >=bs)
Bool_t ReadBuffer(char *buf, Int_t len) override
static std::atomic< StorageAccount::Counter * > s_statsOpen
edm::propagate_const< std::unique_ptr< edm::storage::Storage > > storage_
char const * label
static std::atomic< StorageAccount::Counter * > s_statsRead
constexpr int PREFETCH_PROBE_LENGTH
Definition: Storage.h:18
static std::atomic< StorageAccount::Counter * > s_statsCtor
std::string Hash
Definition: Types.h:43
double f[11][100]
int pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
Definition: ReadRepacker.cc:21
void unpack(char *buf)
Int_t SysOpen(const char *pathname, Int_t flags, UInt_t mode) override
static std::atomic< StorageAccount::Counter * > s_statsARead
size_t IOSize
Definition: IOTypes.h:15
static std::atomic< StorageAccount::Counter * > s_statsCRead
static std::atomic< StorageAccount::Counter * > s_statsClose
std::vector< IOPosBuffer > & iov()
Definition: ReadRepacker.h:47
void setException(std::exception_ptr e)
void ResetErrno(void) const override
IOSize bufferUsed() const
Definition: ReadRepacker.h:49
IOSize realBytesProcessed() const
Definition: ReadRepacker.h:54
#define update(a, b)
static std::atomic< StorageAccount::Counter * > s_statsCWrite
~TStorageFactoryFile(void) override
bool isAvailable() const
Definition: Service.h:40
#define O_NONBLOCK
Definition: SysFile.h:23
Int_t SysSync(Int_t fd) override
#define get
Bool_t WriteBuffer(const char *buf, Int_t len) override
std::unique_ptr< GeometricDet > construct(DDCompactView const &cpv, std::vector< int > const &detidShifts)
ClassImp(TStorageFactoryFile)
Bool_t ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) override
Bool_t ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
static std::atomic< StorageAccount::Counter * > s_statsStat
void Initialize(const char *name, Option_t *option="")
Int_t SysStat(Int_t fd, Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime) override