CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
TStorageFactoryFile.cc
Go to the documentation of this file.
8 #include "ReadRepacker.h"
9 #include "TFileCacheRead.h"
10 #include "TSystem.h"
11 #include "TROOT.h"
12 #include "TEnv.h"
13 #include <errno.h>
14 #include <sys/stat.h>
15 #include <unistd.h>
16 #include <fcntl.h>
17 #include <iostream>
18 #include <cassert>
19 
20 #if 0
21 #include "TTreeCache.h"
22 #include "TTree.h"
23 
24 class TTreeCacheDebug : public TTreeCache {
25 public:
26  void dump(const char *label, const char *trailer)
27  {
28  Long64_t entry = fOwner->GetReadEntry();
29  std::cerr
30  << label << ": " << entry << " "
31  << "{ fEntryMin=" << fEntryMin
32  << ", fEntryMax=" << fEntryMax
33  << ", fEntryNext=" << fEntryNext
34  << ", fZipBytes=" << fZipBytes
35  << ", fNbranches=" << fNbranches
36  << ", fNReadOk=" << fNReadOk
37  << ", fNReadMiss=" << fNReadMiss
38  << ", fNReadPref=" << fNReadPref
39  << ", fBranches=" << fBranches
40  << ", fBrNames=" << fBrNames
41  << ", fOwner=" << fOwner
42  << ", fTree=" << fTree
43  << ", fIsLearning=" << fIsLearning
44  << ", fIsManual=" << fIsManual
45  << "; fBufferSizeMin=" << fBufferSizeMin
46  << ", fBufferSize=" << fBufferSize
47  << ", fBufferLen=" << fBufferLen
48  << ", fBytesToPrefetch=" << fBytesToPrefetch
49  << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
50  << ", fAsyncReading=" << fAsyncReading
51  << ", fNseek=" << fNseek
52  << ", fNtot=" << fNtot
53  << ", fNb=" << fNb
54  << ", fSeekSize=" << fSeekSize
55  << ", fSeek=" << fSeek
56  << ", fSeekIndex=" << fSeekIndex
57  << ", fSeekSort=" << fSeekSort
58  << ", fPos=" << fPos
59  << ", fSeekLen=" << fSeekLen
60  << ", fSeekSortLen=" << fSeekSortLen
61  << ", fSeekPos=" << fSeekPos
62  << ", fLen=" << fLen
63  << ", fFile=" << fFile
64  << ", fBuffer=" << (void *) fBuffer
65  << ", fIsSorted=" << fIsSorted
66  << " }\n" << trailer;
67  }
68 };
69 #endif
70 
71 ClassImp(TStorageFactoryFile)
86 
87 static inline StorageAccount::Counter &
89 {
90  if (! c) c = &StorageAccount::counter("tstoragefile", label);
91  return *c;
92 }
93 
95  : storage_(0)
96 {
98  stats.tick(0);
99 }
100 
101 // This constructor must be compatible with *all* the various built-in TFile plugins,
102 // including TXNetFile. This is why some arguments in the constructor is ignored.
103 // If there's a future T*File that is incompatible with this constructor, a new
104 // constructor will have to be added.
106  Option_t *option,
107  const char *ftitle,
108  Int_t compress,
109  Int_t netopt,
110  Bool_t parallelopen /* = kFALSE */)
111  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
112  storage_(0)
113 {
114  Initialize(path, option);
115 }
116 
118  Option_t *option /* = "" */,
119  const char *ftitle /* = "" */,
120  Int_t compress /* = 1 */)
121  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
122  storage_(0)
123 {
124  Initialize(path, option);
125 }
126 
127 void
129  Option_t *option /* = "" */)
130 {
131  StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
132 
133  // Enable AsyncReading.
134  // This was the default for 5.27, but turned off by default for 5.32.
135  // In our testing, AsyncReading is the fastest mechanism available.
136  // In 5.32, the AsyncPrefetching mechanism is preferred, but has been a
137  // performance hit in our "average case" tests.
138  gEnv->SetValue("TFile.AsyncReading", 1);
139 
140  // Parse options; at the moment we only accept read!
141  fOption = option;
142  fOption.ToUpper();
143 
144  if (fOption == "NEW")
145  fOption = "CREATE";
146 
147  Bool_t create = (fOption == "CREATE");
148  Bool_t recreate = (fOption == "RECREATE");
149  Bool_t update = (fOption == "UPDATE");
150  Bool_t read = (fOption == "READ") || (fOption == "READWRAP");
151  Bool_t readwrap = (fOption == "READWRAP");
152 
153  if (!create && !recreate && !update && !read)
154  {
155  read = true;
156  fOption = "READ";
157  }
158 
159  if (recreate)
160  {
161  if (!gSystem->AccessPathName(path, kFileExists))
162  gSystem->Unlink(path);
163 
164  recreate = false;
165  create = true;
166  fOption = "CREATE";
167  }
168  assert(!recreate);
169 
170  if (update && gSystem->AccessPathName(path, kFileExists))
171  {
172  update = kFALSE;
173  create = kTRUE;
174  }
175 
176  assert(read || update || create);
177 
178  int openFlags = IOFlags::OpenRead;
179  if (!read) openFlags |= IOFlags::OpenWrite;
180  if (create) openFlags |= IOFlags::OpenCreate;
181  //if (recreate) openFlags |= IOFlags::OpenCreate | IOFlags::OpenTruncate;
182  if (readwrap) openFlags |= IOFlags::OpenWrap;
183 
184  // Open storage
185  if (! (storage_ = StorageFactory::get()->open(path, openFlags)))
186  {
187  MakeZombie();
188  gDirectory = gROOT;
189  throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()")
190  << "Cannot open file '" << path << "'";
191  }
192 
193  // Record the statistics.
194  try {
196  if (statsService.isAvailable()) {
197  statsService->setSize(storage_->size());
198  }
199  } catch (edm::Exception e) {
200  if (e.categoryCode() != edm::errors::NotFound) {
201  throw;
202  }
203  }
204 
205  fRealName = path;
206  fD = 0; // sorry, meaningless
207  fWritable = read ? kFALSE : kTRUE;
208 
209  Init(create);
210 
211  stats.tick(0);
212 }
213 
215 {
216  Close();
217  delete storage_;
218 }
219 
223 
224 Bool_t
225 TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len)
226 {
227  // This function needs to be optimized to minimize seeks.
228  // See TFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) in ROOT 5.27.06.
229  Seek(pos);
230  return ReadBuffer(buf, len);
231 }
232 
233 Bool_t
234 TStorageFactoryFile::ReadBuffer(char *buf, Int_t len)
235 {
236  // Check that it's valid to access this file.
237  if (IsZombie())
238  {
239  Error("ReadBuffer", "Cannot read from a zombie file");
240  return kTRUE;
241  }
242 
243  if (! IsOpen())
244  {
245  Error("ReadBuffer", "Cannot read from a file that is not open");
246  return kTRUE;
247  }
248 
249  // Read specified byte range from the storage. Returns kTRUE in
250  // case of error. Note that ROOT uses this function recursively
251  // to fill the cache; we use a flag to make sure our accounting
252  // is reflected in a comprehensible manner. The "read" counter
253  // will include both, "readc" indicates how much read from the
254  // cache, "readu" indicates how much we failed to read from the
255  // cache (excluding those recursive reads), and "readx" counts
256  // the amount actually passed to read from the storage object.
258 
259  // If we have a cache, read from there first. This returns 0
260  // if the block hasn't been prefetched, 1 if it was in cache,
261  // and 2 if there was an error.
262  if (TFileCacheRead *c = GetCacheRead())
263  {
264  Long64_t here = GetRelOffset();
265  Bool_t async = c->IsAsyncReading();
266 
267  StorageAccount::Stamp cstats(async
268  ? storageCounter(s_statsCPrefetch, "readPrefetchToCache")
269  : storageCounter(s_statsCRead, "readViaCache"));
270 
271  Int_t st = ReadBufferViaCache(async ? 0 : buf, len);
272 
273  if (st == 2) {
274  return kTRUE;
275  }
276 
277  if (st == 1) {
278  if (async) {
279  cstats.tick(len);
280  Seek(here);
281  } else {
282  cstats.tick(len);
283  stats.tick(len);
284  return kFALSE;
285  }
286  }
287  }
288 
289  // FIXME: Re-enable read-ahead if the data wasn't in cache.
290  // if (! st) storage_->caching(true, -1, s_readahead);
291 
292  // A real read
293  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
294  IOSize n = storage_->xread(buf, len);
295  xstats.tick(n);
296  stats.tick(n);
297  return n ? kFALSE : kTRUE;
298 }
299 
300 Bool_t
301 TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len)
302 {
303  // Check that it's valid to access this file.
304  if (IsZombie())
305  {
306  Error("ReadBufferAsync", "Cannot read from a zombie file");
307  return kTRUE;
308  }
309 
310  if (! IsOpen())
311  {
312  Error("ReadBufferAsync", "Cannot read from a file that is not open");
313  return kTRUE;
314  }
315 
317 
318  // If asynchronous reading is disabled, bail out now, regardless
319  // whether the underlying storage supports prefetching. If it is
320  // forced on, pretend it's on, even if the storage doesn't support
321  // it, as this turns off the caching in ROOT's side.
323 
324  // Verify that we never using async reads in app-only mode
326  return kTRUE;
327 
328  // Let the I/O method indicate if it can do client-side prefetch.
329  // If it does, then for example TTreeCache will drop its own cache
330  // and will use the client-side cache of the actual I/O layer.
331  // If len is zero ROOT is probing for prefetch support.
332  if (len) {
333  // FIXME: Synchronise caching.
334  // storage_->caching(true, -1, 0);
335  ;
336  }
337 
338  IOPosBuffer iov(off, (void *) 0, len ? len : PREFETCH_PROBE_LENGTH);
339  if (storage_->prefetch(&iov, 1))
340  {
341  stats.tick(len);
342  return kFALSE;
343  }
344 
345  // Always ask ROOT to use async reads in storage-only mode,
346  // regardless of whether the storage system supports it.
348  return kFALSE;
349 
350  // Prefetching not available right now.
351  return kTRUE;
352 }
353 
354 Bool_t
355 TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
356 {
377  Int_t remaining = nbuf; // Number of read requests left to process.
378  Int_t pack_count; // Number of read requests processed by this iteration.
379 
380  IOSize remaining_buffer_size=0;
381  // Calculate the remaining buffer size for the ROOT-owned buffer by adding
382  // the size of the various requests.
383  for (Int_t i=0; i<nbuf; i++) remaining_buffer_size+=len[i];
384 
385  char *current_buffer = buf;
386  Long64_t *current_pos = pos;
387  Int_t *current_len = len;
388 
389  ReadRepacker repacker;
390 
391  while (remaining > 0) {
392 
393  pack_count = repacker.pack(static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
394 
395  int real_bytes_processed = repacker.realBytesProcessed();
396  IOSize io_buffer_used = repacker.bufferUsed();
397 
398  // Issue readv, then unpack buffers.
399  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
400  std::vector<IOPosBuffer> &iov = repacker.iov();
401  IOSize result = storage_->readv(&iov[0], iov.size());
402  if (result != io_buffer_used) {
403  return kTRUE;
404  }
405  xstats.tick(io_buffer_used);
406  repacker.unpack(current_buffer);
407 
408  // Update the location of the unused part of the input buffer.
409  remaining_buffer_size -= real_bytes_processed;
410  current_buffer += real_bytes_processed;
411 
412  current_pos += pack_count;
413  current_len += pack_count;
414  remaining -= pack_count;
415 
416  }
417  assert(remaining_buffer_size == 0);
418  return kFALSE;
419 }
420 
421 Bool_t
422 TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
423 {
424  // Check that it's valid to access this file.
425  if (IsZombie())
426  {
427  Error("ReadBuffers", "Cannot read from a zombie file");
428  return kTRUE;
429  }
430 
431  if (! IsOpen())
432  {
433  Error("ReadBuffers", "Cannot read from a file that is not open");
434  return kTRUE;
435  }
436 
437  // For synchronous reads, we have special logic to optimize the I/O requests
438  // from ROOT before handing it to the storage.
439  if (buf)
440  {
441  return ReadBuffersSync(buf, pos, len, nbuf);
442  }
443  // For an async read, we assume the storage system is smart enough to do the
444  // optimization itself.
445 
446  // Read from underlying storage.
447  void* const nobuf = 0;
448  Int_t total = 0;
449  std::vector<IOPosBuffer> iov;
450  iov.reserve(nbuf);
451  for (Int_t i = 0; i < nbuf; ++i)
452  {
453  iov.push_back(IOPosBuffer(pos[i], nobuf, len[i]));
454  total += len[i];
455  }
456 
457  // Null buffer means asynchronous reads into I/O system's cache.
458  bool success;
459  StorageAccount::Stamp astats(storageCounter(s_statsARead, "readAsync"));
460  // Synchronise low-level cache with the supposed cache in TFile.
461  // storage_->caching(true, -1, 0);
462  success = storage_->prefetch(&iov[0], nbuf);
463  astats.tick(total);
464 
465  // If it didn't suceeed, pass down to the base class.
466  return success ? kFALSE : TFile::ReadBuffers(buf, pos, len, nbuf);
467 }
468 
469 Bool_t
470 TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len)
471 {
472  // Check that it's valid to access this file.
473  if (IsZombie())
474  {
475  Error("WriteBuffer", "Cannot write to a zombie file");
476  return kTRUE;
477  }
478 
479  if (! IsOpen())
480  {
481  Error("WriteBuffer", "Cannot write to a file that is not open");
482  return kTRUE;
483  }
484 
485  if (! fWritable)
486  {
487  Error("WriteBuffer", "File is not writable");
488  return kTRUE;
489  }
490 
492  StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, "writeViaCache"));
493 
494  // Try first writing via a cache, and if that's not possible, directly.
495  Int_t st;
496  switch ((st = WriteBufferViaCache(buf, len)))
497  {
498  case 0:
499  // Actual write.
500  {
501  StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, "writeActual"));
502  IOSize n = storage_->xwrite(buf, len);
503  xstats.tick(n);
504  stats.tick(n);
505 
506  // FIXME: What if it's a short write?
507  return n > 0 ? kFALSE : kTRUE;
508  }
509 
510  case 1:
511  cstats.tick(len);
512  stats.tick(len);
513  return kFALSE;
514 
515  case 2:
516  default:
517  Error("WriteBuffer", "Error writing to cache");
518  return kTRUE;
519  }
520 }
521 
525 // FIXME: Override GetBytesToPrefetch() so XROOTD can suggest how
526 // large a prefetch cache to use.
527 // FIXME: Asynchronous open support?
528 
532 Int_t
533 TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t /* mode */)
534 {
536 
537  if (storage_)
538  {
539  storage_->close();
540  delete storage_;
541  storage_ = 0;
542  }
543 
544  int openFlags = IOFlags::OpenRead;
545  if (flags & O_WRONLY) openFlags = IOFlags::OpenWrite;
546  else if (flags & O_RDWR) openFlags |= IOFlags::OpenWrite;
547  if (flags & O_CREAT) openFlags |= IOFlags::OpenCreate;
548  if (flags & O_APPEND) openFlags |= IOFlags::OpenAppend;
549  if (flags & O_EXCL) openFlags |= IOFlags::OpenExclusive;
550  if (flags & O_TRUNC) openFlags |= IOFlags::OpenTruncate;
551  if (flags & O_NONBLOCK) openFlags |= IOFlags::OpenNonBlock;
552 
553  if (! (storage_ = StorageFactory::get()->open(pathname, openFlags)))
554  {
555  MakeZombie();
556  gDirectory = gROOT;
557  throw cms::Exception("TStorageFactoryFile::SysOpen()")
558  << "Cannot open file '" << pathname << "'";
559  }
560 
561  stats.tick();
562  return 0;
563 }
564 
565 Int_t
567 {
569 
570  if (storage_)
571  {
572  storage_->close();
573  delete storage_;
574  storage_ = 0;
575  }
576 
577  stats.tick();
578  return 0;
579 }
580 
581 Long64_t
582 TStorageFactoryFile::SysSeek(Int_t /* fd */, Long64_t offset, Int_t whence)
583 {
585  Storage::Relative rel = (whence == SEEK_SET ? Storage::SET
586  : whence == SEEK_CUR ? Storage::CURRENT
587  : Storage::END);
588 
589  offset = storage_->position(offset, rel);
590  stats.tick();
591  return offset;
592 }
593 
594 Int_t
596 {
598  storage_->flush();
599  stats.tick();
600  return 0;
601 }
602 
603 Int_t
604 TStorageFactoryFile::SysStat(Int_t /* fd */, Long_t *id, Long64_t *size,
605  Long_t *flags, Long_t *modtime)
606 {
608  // FIXME: Most of this is unsupported or makes no sense with Storage
609  *id = ::Hash(fRealName);
610  *size = storage_->size();
611  *flags = 0;
612  *modtime = 0;
613  stats.tick();
614  return 0;
615 }
616 
617 void
619 {
620  TSystem::ResetErrno();
621 }
Code categoryCode() const
Definition: EDMException.h:93
IOSize xwrite(const void *from, IOSize n)
Definition: IOOutput.cc:176
int i
Definition: DBlmapReader.cc:9
CacheHint cacheHint(void) const
static StorageAccount::Counter * s_statsWrite
static StorageAccount::Counter * s_statsStat
#define PREFETCH_PROBE_LENGTH
Definition: Storage.h:18
virtual Long64_t SysSeek(Int_t fd, Long64_t offset, Int_t whence)
void tick(double amount=0., int64_t tick=0) const
virtual IOSize readv(IOPosBuffer *into, IOSize buffers)
Definition: Storage.cc:31
virtual Int_t SysOpen(const char *pathname, Int_t flags, UInt_t mode)
static StorageAccount::Counter * s_statsXWrite
static StorageAccount::Counter * s_statsClose
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
IOSize xread(IOBuffer into)
Definition: IOInput.cc:166
virtual Bool_t ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
Relative
Definition: Storage.h:23
static StorageFactory * get(void)
static StorageAccount::Counter & storageCounter(StorageAccount::Counter *&c, const char *label)
IOSize realBytesProcessed() const
Definition: ReadRepacker.h:52
static StorageAccount::Counter * s_statsARead
tuple path
else: Piece not in the list, fine.
tuple iov
Definition: o2o.py:307
virtual Bool_t ReadBufferAsync(Long64_t off, Int_t len)
unsigned int(* Counter)(align::ID, const TrackerTopology *)
Definition: Counters.h:27
virtual void close(void)
Definition: Storage.cc:128
virtual IOOffset position(void) const
Definition: Storage.cc:95
static StorageAccount::Counter * s_statsCtor
tuple result
Definition: query.py:137
virtual void flush(void)
Definition: Storage.cc:124
virtual Bool_t ReadBuffer(char *buf, Int_t len)
bool isAvailable() const
Definition: Service.h:46
std::string Hash
Definition: Types.h:43
virtual Bool_t WriteBuffer(const char *buf, Int_t len)
double f[11][100]
int pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
Definition: ReadRepacker.cc:22
unsigned int offset(bool)
void unpack(char *buf)
static StorageAccount::Counter * s_statsRead
static StorageAccount::Counter * s_statsXRead
static StorageAccount::Counter * s_statsFlush
virtual bool prefetch(const IOPosBuffer *what, IOSize n)
Definition: Storage.cc:119
static StorageAccount::Counter * s_statsCWrite
static Counter & counter(const std::string &storageClass, const std::string &operation)
IOSize bufferUsed() const
Definition: ReadRepacker.h:49
virtual IOOffset size(void) const
Definition: Storage.cc:102
string const
Definition: compareJSON.py:14
std::vector< IOPosBuffer > & iov()
Definition: ReadRepacker.h:47
void ResetErrno(void) const
#define update(a, b)
virtual Int_t SysStat(Int_t fd, Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime)
#define O_NONBLOCK
Definition: SysFile.h:21
size_t IOSize
Definition: IOTypes.h:14
virtual Int_t SysClose(Int_t fd)
static StorageAccount::Counter * s_statsSeek
static StorageAccount::Counter * s_statsOpen
virtual Int_t SysSync(Int_t fd)
Bool_t ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
void Initialize(const char *name, Option_t *option="")
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.
static StorageAccount::Counter * s_statsCPrefetch
static StorageAccount::Counter * s_statsCRead