CMS 3D CMS Logo

TStorageFactoryFile.cc
Go to the documentation of this file.
9 #include "ReadRepacker.h"
10 #include "TFileCacheRead.h"
11 #include "TSystem.h"
12 #include "TROOT.h"
13 #include "TEnv.h"
14 #include <errno.h>
15 #include <sys/stat.h>
16 #include <unistd.h>
17 #include <fcntl.h>
18 #include <iostream>
19 #include <cassert>
20 
21 #if 0
22 #include "TTreeCache.h"
23 #include "TTree.h"
24 
25 class TTreeCacheDebug : public TTreeCache {
26 public:
27  void dump(const char *label, const char *trailer)
28  {
29  Long64_t entry = fOwner->GetReadEntry();
30  std::cerr
31  << label << ": " << entry << " "
32  << "{ fEntryMin=" << fEntryMin
33  << ", fEntryMax=" << fEntryMax
34  << ", fEntryNext=" << fEntryNext
35  << ", fZipBytes=" << fZipBytes
36  << ", fNbranches=" << fNbranches
37  << ", fNReadOk=" << fNReadOk
38  << ", fNReadMiss=" << fNReadMiss
39  << ", fNReadPref=" << fNReadPref
40  << ", fBranches=" << fBranches
41  << ", fBrNames=" << fBrNames
42  << ", fOwner=" << fOwner
43  << ", fTree=" << fTree
44  << ", fIsLearning=" << fIsLearning
45  << ", fIsManual=" << fIsManual
46  << "; fBufferSizeMin=" << fBufferSizeMin
47  << ", fBufferSize=" << fBufferSize
48  << ", fBufferLen=" << fBufferLen
49  << ", fBytesToPrefetch=" << fBytesToPrefetch
50  << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
51  << ", fAsyncReading=" << fAsyncReading
52  << ", fNseek=" << fNseek
53  << ", fNtot=" << fNtot
54  << ", fNb=" << fNb
55  << ", fSeekSize=" << fSeekSize
56  << ", fSeek=" << fSeek
57  << ", fSeekIndex=" << fSeekIndex
58  << ", fSeekSort=" << fSeekSort
59  << ", fPos=" << fPos
60  << ", fSeekLen=" << fSeekLen
61  << ", fSeekSortLen=" << fSeekSortLen
62  << ", fSeekPos=" << fSeekPos
63  << ", fLen=" << fLen
64  << ", fFile=" << fFile
65  << ", fBuffer=" << (void *) fBuffer
66  << ", fIsSorted=" << fIsSorted
67  << " }\n" << trailer;
68  }
69 };
70 #endif
71 
73 static StorageAccount::Counter *s_statsCtor = 0;
87 
88 
89 static inline StorageAccount::Counter &
91 {
92  static const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
93  if (! c) c = &StorageAccount::counter(token, operation);
94  return *c;
95 }
96 
98  : storage_()
99 {
101  stats.tick(0);
102 }
103 
104 // This constructor must be compatible with *all* the various built-in TFile plugins,
105 // including TXNetFile. This is why some arguments in the constructor is ignored.
106 // If there's a future T*File that is incompatible with this constructor, a new
107 // constructor will have to be added.
109  Option_t *option,
110  const char *ftitle,
111  Int_t compress,
112  Int_t netopt,
113  Bool_t parallelopen /* = kFALSE */)
114  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
115  storage_()
116 {
117  try {
118  Initialize(path, option);
119  } catch (...) {
120  edm::threadLocalException::setException(std::current_exception()); // capture
121  }
122 }
123 
125  Option_t *option /* = "" */,
126  const char *ftitle /* = "" */,
127  Int_t compress /* = 1 */)
128  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
129  storage_()
130 {
131  try {
132  Initialize(path, option);
133  } catch (...) {
134  edm::threadLocalException::setException(std::current_exception()); // capture
135  }
136 }
137 
138 void
140  Option_t *option /* = "" */)
141 {
143 
144  // Enable AsyncReading.
145  // This was the default for 5.27, but turned off by default for 5.32.
146  // In our testing, AsyncReading is the fastest mechanism available.
147  // In 5.32, the AsyncPrefetching mechanism is preferred, but has been a
148  // performance hit in our "average case" tests.
149  gEnv->SetValue("TFile.AsyncReading", 1);
150 
151  // Parse options; at the moment we only accept read!
152  fOption = option;
153  fOption.ToUpper();
154 
155  if (fOption == "NEW")
156  fOption = "CREATE";
157 
158  Bool_t create = (fOption == "CREATE");
159  Bool_t recreate = (fOption == "RECREATE");
160  Bool_t update = (fOption == "UPDATE");
161  Bool_t read = (fOption == "READ") || (fOption == "READWRAP");
162  Bool_t readwrap = (fOption == "READWRAP");
163 
164  if (!create && !recreate && !update && !read)
165  {
166  read = true;
167  fOption = "READ";
168  }
169 
170  if (recreate)
171  {
172  if (!gSystem->AccessPathName(path, kFileExists))
173  gSystem->Unlink(path);
174 
175  recreate = false;
176  create = true;
177  fOption = "CREATE";
178  }
179  assert(!recreate);
180 
181  if (update && gSystem->AccessPathName(path, kFileExists))
182  {
183  update = kFALSE;
184  create = kTRUE;
185  }
186 
187  assert(read || update || create);
188 
189  int openFlags = IOFlags::OpenRead;
190  if (!read) openFlags |= IOFlags::OpenWrite;
191  if (create) openFlags |= IOFlags::OpenCreate;
192  //if (recreate) openFlags |= IOFlags::OpenCreate | IOFlags::OpenTruncate;
193  if (readwrap) openFlags |= IOFlags::OpenWrap;
194 
195  // Open storage
196  if (! (storage_ = StorageFactory::get()->open(path, openFlags)))
197  {
198  MakeZombie();
199  gDirectory = gROOT;
200  throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()")
201  << "Cannot open file '" << path << "'";
202  }
203 
204  // Record the statistics.
205  try {
207  if (statsService.isAvailable()) {
208  statsService->setSize(storage_->size());
209  }
210  } catch (edm::Exception e) {
211  if (e.categoryCode() != edm::errors::NotFound) {
212  throw;
213  }
214  }
215 
216  fRealName = path;
217  fD = 0; // sorry, meaningless
218  fWritable = read ? kFALSE : kTRUE;
219 
220  Init(create);
221 
222  stats.tick(0);
223 }
224 
226 {
227  Close();
228 }
229 
233 
234 Bool_t
235 TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len)
236 {
237  // This function needs to be optimized to minimize seeks.
238  // See TFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) in ROOT 5.27.06.
239  Seek(pos);
240  return ReadBuffer(buf, len);
241 }
242 
243 Bool_t
244 TStorageFactoryFile::ReadBuffer(char *buf, Int_t len)
245 {
246  // Check that it's valid to access this file.
247  if (IsZombie())
248  {
249  Error("ReadBuffer", "Cannot read from a zombie file");
250  return kTRUE;
251  }
252 
253  if (! IsOpen())
254  {
255  Error("ReadBuffer", "Cannot read from a file that is not open");
256  return kTRUE;
257  }
258 
259  // Read specified byte range from the storage. Returns kTRUE in
260  // case of error. Note that ROOT uses this function recursively
261  // to fill the cache; we use a flag to make sure our accounting
262  // is reflected in a comprehensible manner. The "read" counter
263  // will include both, "readc" indicates how much read from the
264  // cache, "readu" indicates how much we failed to read from the
265  // cache (excluding those recursive reads), and "readx" counts
266  // the amount actually passed to read from the storage object.
268 
269  // If we have a cache, read from there first. This returns 0
270  // if the block hasn't been prefetched, 1 if it was in cache,
271  // and 2 if there was an error.
272  if (TFileCacheRead *c = GetCacheRead())
273  {
274  Long64_t here = GetRelOffset();
275  Bool_t async = c->IsAsyncReading();
276 
277  StorageAccount::Stamp cstats(async
280 
281  Int_t st = ReadBufferViaCache(async ? 0 : buf, len);
282 
283  if (st == 2) {
284  Error("ReadBuffer","ReadBufferViaCache failed. Asked to read nBytes: %d from offset: %lld with file size: %lld",len, here, GetSize());
285  return kTRUE;
286  }
287 
288  if (st == 1) {
289  if (async) {
290  cstats.tick(len);
291  Seek(here);
292  } else {
293  cstats.tick(len);
294  stats.tick(len);
295  return kFALSE;
296  }
297  }
298  }
299 
300  // FIXME: Re-enable read-ahead if the data wasn't in cache.
301  // if (! st) storage_->caching(true, -1, s_readahead);
302 
303  // A real read
305  IOSize n = storage_->xread(buf, len);
306  xstats.tick(n);
307  stats.tick(n);
308  if(n < static_cast<IOSize>(len)) {
309  Error("ReadBuffer", "read from Storage::xread returned %ld. Asked to read n bytes: %d from offset: %lld with file size: %lld",n, len, GetRelOffset(), GetSize());
310  }
311  return n ? kFALSE : kTRUE;
312 }
313 
314 Bool_t
315 TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len)
316 {
317  // Check that it's valid to access this file.
318  if (IsZombie())
319  {
320  Error("ReadBufferAsync", "Cannot read from a zombie file");
321  return kTRUE;
322  }
323 
324  if (! IsOpen())
325  {
326  Error("ReadBufferAsync", "Cannot read from a file that is not open");
327  return kTRUE;
328  }
329 
331 
332  // If asynchronous reading is disabled, bail out now, regardless
333  // whether the underlying storage supports prefetching. If it is
334  // forced on, pretend it's on, even if the storage doesn't support
335  // it, as this turns off the caching in ROOT's side.
337 
338  // Verify that we never using async reads in app-only mode
340  return kTRUE;
341 
342  // Let the I/O method indicate if it can do client-side prefetch.
343  // If it does, then for example TTreeCache will drop its own cache
344  // and will use the client-side cache of the actual I/O layer.
345  // If len is zero ROOT is probing for prefetch support.
346  if (len) {
347  // FIXME: Synchronise caching.
348  // storage_->caching(true, -1, 0);
349  ;
350  }
351 
352  IOPosBuffer iov(off, (void *) 0, len ? len : PREFETCH_PROBE_LENGTH);
353  if (storage_->prefetch(&iov, 1))
354  {
355  stats.tick(len);
356  return kFALSE;
357  }
358 
359  // Always ask ROOT to use async reads in storage-only mode,
360  // regardless of whether the storage system supports it.
362  return kFALSE;
363 
364  // Prefetching not available right now.
365  return kTRUE;
366 }
367 
368 Bool_t
369 TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
370 {
391  Int_t remaining = nbuf; // Number of read requests left to process.
392  Int_t pack_count; // Number of read requests processed by this iteration.
393 
394  IOSize remaining_buffer_size=0;
395  // Calculate the remaining buffer size for the ROOT-owned buffer by adding
396  // the size of the various requests.
397  for (Int_t i=0; i<nbuf; i++) remaining_buffer_size+=len[i];
398 
399  char *current_buffer = buf;
400  Long64_t *current_pos = pos;
401  Int_t *current_len = len;
402 
403  ReadRepacker repacker;
404 
405  while (remaining > 0) {
406 
407  pack_count = repacker.pack(static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
408 
409  int real_bytes_processed = repacker.realBytesProcessed();
410  IOSize io_buffer_used = repacker.bufferUsed();
411 
412  // Issue readv, then unpack buffers.
414  std::vector<IOPosBuffer> &iov = repacker.iov();
415  IOSize result = storage_->readv(&iov[0], iov.size());
416  if (result != io_buffer_used) {
417  Error("ReadBuffersSync","Storage::readv returned different size result=%ld expected=%ld",result,io_buffer_used);
418  return kTRUE;
419  }
420  xstats.tick(io_buffer_used);
421  repacker.unpack(current_buffer);
422 
423  // Update the location of the unused part of the input buffer.
424  remaining_buffer_size -= real_bytes_processed;
425  current_buffer += real_bytes_processed;
426 
427  current_pos += pack_count;
428  current_len += pack_count;
429  remaining -= pack_count;
430 
431  }
432  assert(remaining_buffer_size == 0);
433  return kFALSE;
434 }
435 
436 Bool_t
437 TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
438 {
439  // Check that it's valid to access this file.
440  if (IsZombie())
441  {
442  Error("ReadBuffers", "Cannot read from a zombie file");
443  return kTRUE;
444  }
445 
446  if (! IsOpen())
447  {
448  Error("ReadBuffers", "Cannot read from a file that is not open");
449  return kTRUE;
450  }
451 
452  // For synchronous reads, we have special logic to optimize the I/O requests
453  // from ROOT before handing it to the storage.
454  if (buf)
455  {
456  return ReadBuffersSync(buf, pos, len, nbuf);
457  }
458  // For an async read, we assume the storage system is smart enough to do the
459  // optimization itself.
460 
461  // Read from underlying storage.
462  void* const nobuf = 0;
463  Int_t total = 0;
464  std::vector<IOPosBuffer> iov;
465  iov.reserve(nbuf);
466  for (Int_t i = 0; i < nbuf; ++i)
467  {
468  iov.push_back(IOPosBuffer(pos[i], nobuf, len[i]));
469  total += len[i];
470  }
471 
472  // Null buffer means asynchronous reads into I/O system's cache.
473  bool success;
475  // Synchronise low-level cache with the supposed cache in TFile.
476  // storage_->caching(true, -1, 0);
477  success = storage_->prefetch(&iov[0], nbuf);
478  astats.tick(total);
479 
480  // If it didn't suceeed, pass down to the base class.
481  if(not success) {
482  if(TFile::ReadBuffers(buf, pos, len, nbuf)) {
483  Error("ReadBuffers", "call to TFile::ReadBuffers failed after prefetch already failed.");
484  return kTRUE;
485  }
486  }
487  return kFALSE;
488 }
489 
490 Bool_t
491 TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len)
492 {
493  // Check that it's valid to access this file.
494  if (IsZombie())
495  {
496  Error("WriteBuffer", "Cannot write to a zombie file");
497  return kTRUE;
498  }
499 
500  if (! IsOpen())
501  {
502  Error("WriteBuffer", "Cannot write to a file that is not open");
503  return kTRUE;
504  }
505 
506  if (! fWritable)
507  {
508  Error("WriteBuffer", "File is not writable");
509  return kTRUE;
510  }
511 
514 
515  // Try first writing via a cache, and if that's not possible, directly.
516  Int_t st;
517  switch ((st = WriteBufferViaCache(buf, len)))
518  {
519  case 0:
520  // Actual write.
521  {
523  IOSize n = storage_->xwrite(buf, len);
524  xstats.tick(n);
525  stats.tick(n);
526 
527  // FIXME: What if it's a short write?
528  return n > 0 ? kFALSE : kTRUE;
529  }
530 
531  case 1:
532  cstats.tick(len);
533  stats.tick(len);
534  return kFALSE;
535 
536  case 2:
537  default:
538  Error("WriteBuffer", "Error writing to cache");
539  return kTRUE;
540  }
541 }
542 
546 // FIXME: Override GetBytesToPrefetch() so XROOTD can suggest how
547 // large a prefetch cache to use.
548 // FIXME: Asynchronous open support?
549 
553 Int_t
554 TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t /* mode */)
555 {
557 
558  if (storage_)
559  {
560  storage_->close();
561  }
562 
563  int openFlags = IOFlags::OpenRead;
564  if (flags & O_WRONLY) openFlags = IOFlags::OpenWrite;
565  else if (flags & O_RDWR) openFlags |= IOFlags::OpenWrite;
566  if (flags & O_CREAT) openFlags |= IOFlags::OpenCreate;
567  if (flags & O_APPEND) openFlags |= IOFlags::OpenAppend;
568  if (flags & O_EXCL) openFlags |= IOFlags::OpenExclusive;
569  if (flags & O_TRUNC) openFlags |= IOFlags::OpenTruncate;
570  if (flags & O_NONBLOCK) openFlags |= IOFlags::OpenNonBlock;
571 
572  if (! (storage_ = StorageFactory::get()->open(pathname, openFlags)))
573  {
574  MakeZombie();
575  gDirectory = gROOT;
576  throw cms::Exception("TStorageFactoryFile::SysOpen()")
577  << "Cannot open file '" << pathname << "'";
578  }
579 
580  stats.tick();
581  return 0;
582 }
583 
584 Int_t
586 {
588 
589  if (storage_)
590  {
591  storage_->close();
592  releaseStorage();
593  }
594 
595  stats.tick();
596  return 0;
597 }
598 
599 Long64_t
600 TStorageFactoryFile::SysSeek(Int_t /* fd */, Long64_t offset, Int_t whence)
601 {
603  Storage::Relative rel = (whence == SEEK_SET ? Storage::SET
604  : whence == SEEK_CUR ? Storage::CURRENT
605  : Storage::END);
606 
607  offset = storage_->position(offset, rel);
608  stats.tick();
609  return offset;
610 }
611 
612 Int_t
614 {
616  storage_->flush();
617  stats.tick();
618  return 0;
619 }
620 
621 Int_t
622 TStorageFactoryFile::SysStat(Int_t /* fd */, Long_t *id, Long64_t *size,
623  Long_t *flags, Long_t *modtime)
624 {
626  // FIXME: Most of this is unsupported or makes no sense with Storage
627  *id = ::Hash(fRealName);
628  *size = storage_->size();
629  *flags = 0;
630  *modtime = 0;
631  stats.tick();
632  return 0;
633 }
634 
635 void
637 {
638  TSystem::ResetErrno();
639 }
Code categoryCode() const
Definition: EDMException.h:93
size
Write out results.
CacheHint cacheHint(void) const
static StorageAccount::Counter * s_statsWrite
static StorageAccount::Counter * s_statsStat
#define PREFETCH_PROBE_LENGTH
Definition: Storage.h:18
virtual Long64_t SysSeek(Int_t fd, Long64_t offset, Int_t whence)
def create(alignables, pedeDump, additionalData, outputFile, config)
virtual Int_t SysOpen(const char *pathname, Int_t flags, UInt_t mode)
static StorageAccount::Counter * s_statsXWrite
static StorageAccount::Counter * s_statsClose
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
virtual Bool_t ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
edm::propagate_const< std::unique_ptr< Storage > > storage_
Relative
Definition: Storage.h:23
static const StorageFactory * get(void)
std::function< unsigned int(align::ID)> Counter
IOSize realBytesProcessed() const
Definition: ReadRepacker.h:53
ClassImp(AliDaqEventHeader) ClassImp(AliDaqPosition2D) ClassImp(AliDaqPositionCOPS) ClassImp(AliDaqTilt) ClassImp(AliDaqDistance) ClassImp(AliDaqTemperature) ClassImp(CocoaDaqRootEvent) CocoaDaqRootEvent
static StorageAccount::Counter * s_statsARead
static StorageClassToken tokenForStorageClassName(std::string const &iName)
static ClassImp(TStorageFactoryFile) static StorageAccount StorageAccount::Counter * s_statsOpen
static StorageAccount::Counter & storageCounter(StorageAccount::Counter *&c, StorageAccount::Operation operation)
virtual Bool_t ReadBufferAsync(Long64_t off, Int_t len)
virtual Bool_t ReadBuffer(char *buf, Int_t len)
bool isAvailable() const
Definition: Service.h:46
std::string Hash
Definition: Types.h:45
virtual Bool_t WriteBuffer(const char *buf, Int_t len)
double f[11][100]
int pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
Definition: ReadRepacker.cc:22
void unpack(char *buf)
static StorageAccount::Counter * s_statsRead
static StorageAccount::Counter * s_statsXRead
static StorageAccount::Counter * s_statsFlush
static Counter & counter(StorageClassToken token, Operation operation)
static StorageAccount::Counter * s_statsCWrite
IOSize bufferUsed() const
Definition: ReadRepacker.h:50
std::vector< IOPosBuffer > & iov()
Definition: ReadRepacker.h:48
void setException(std::exception_ptr e)
void ResetErrno(void) const
void tick(uint64_t amount=0, int64_t tick=0) const
#define update(a, b)
virtual Int_t SysStat(Int_t fd, Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime)
#define O_NONBLOCK
Definition: SysFile.h:21
size_t IOSize
Definition: IOTypes.h:14
virtual Int_t SysClose(Int_t fd)
static StorageAccount::Counter * s_statsSeek
virtual Int_t SysSync(Int_t fd)
Bool_t ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
void Initialize(const char *name, Option_t *option="")
static StorageAccount::Counter * s_statsCPrefetch
static StorageAccount::Counter * s_statsCRead