CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
TStorageFactoryFile.cc
Go to the documentation of this file.
6 #include "ReadRepacker.h"
7 #include "TFileCacheRead.h"
8 #include "TSystem.h"
9 #include "TROOT.h"
10 #include "TEnv.h"
11 #include <errno.h>
12 #include <sys/stat.h>
13 #include <unistd.h>
14 #include <fcntl.h>
15 #include <iostream>
16 
17 #if 0
18 #include "TTreeCache.h"
19 #include "TTree.h"
20 
21 class TTreeCacheDebug : public TTreeCache {
22 public:
23  void dump(const char *label, const char *trailer)
24  {
25  Long64_t entry = fOwner->GetReadEntry();
26  std::cerr
27  << label << ": " << entry << " "
28  << "{ fEntryMin=" << fEntryMin
29  << ", fEntryMax=" << fEntryMax
30  << ", fEntryNext=" << fEntryNext
31  << ", fZipBytes=" << fZipBytes
32  << ", fNbranches=" << fNbranches
33  << ", fNReadOk=" << fNReadOk
34  << ", fNReadMiss=" << fNReadMiss
35  << ", fNReadPref=" << fNReadPref
36  << ", fBranches=" << fBranches
37  << ", fBrNames=" << fBrNames
38  << ", fOwner=" << fOwner
39  << ", fTree=" << fTree
40  << ", fIsLearning=" << fIsLearning
41  << ", fIsManual=" << fIsManual
42  << "; fBufferSizeMin=" << fBufferSizeMin
43  << ", fBufferSize=" << fBufferSize
44  << ", fBufferLen=" << fBufferLen
45  << ", fBytesToPrefetch=" << fBytesToPrefetch
46  << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
47  << ", fAsyncReading=" << fAsyncReading
48  << ", fNseek=" << fNseek
49  << ", fNtot=" << fNtot
50  << ", fNb=" << fNb
51  << ", fSeekSize=" << fSeekSize
52  << ", fSeek=" << fSeek
53  << ", fSeekIndex=" << fSeekIndex
54  << ", fSeekSort=" << fSeekSort
55  << ", fPos=" << fPos
56  << ", fSeekLen=" << fSeekLen
57  << ", fSeekSortLen=" << fSeekSortLen
58  << ", fSeekPos=" << fSeekPos
59  << ", fLen=" << fLen
60  << ", fFile=" << fFile
61  << ", fBuffer=" << (void *) fBuffer
62  << ", fIsSorted=" << fIsSorted
63  << " }\n" << trailer;
64  }
65 };
66 #endif
67 
68 ClassImp(TStorageFactoryFile)
83 
84 static inline StorageAccount::Counter &
86 {
87  if (! c) c = &StorageAccount::counter("tstoragefile", label);
88  return *c;
89 }
90 
92  : storage_(0)
93 {
95  stats.tick(0);
96 }
97 
98 // This constructor must be compatible with *all* the various built-in TFile plugins,
99 // including TXNetFile. This is why some arguments in the constructor is ignored.
100 // If there's a future T*File that is incompatible with this constructor, a new
101 // constructor will have to be added.
103  Option_t *option,
104  const char *ftitle,
105  Int_t compress,
106  Int_t netopt,
107  Bool_t parallelopen /* = kFALSE */)
108  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
109  storage_(0)
110 {
111  Initialize(path, option);
112 }
113 
115  Option_t *option /* = "" */,
116  const char *ftitle /* = "" */,
117  Int_t compress /* = 1 */)
118  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
119  storage_(0)
120 {
121  Initialize(path, option);
122 }
123 
124 void
126  Option_t *option /* = "" */)
127 {
128  StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
129 
130  // Enable AsyncReading.
131  // This was the default for 5.27, but turned off by default for 5.32.
132  // In our testing, AsyncReading is the fastest mechanism available.
133  // In 5.32, the AsyncPrefetching mechanism is preferred, but has been a
134  // performance hit in our "average case" tests.
135  gEnv->SetValue("TFile.AsyncReading", 1);
136 
137  // Parse options; at the moment we only accept read!
138  fOption = option;
139  fOption.ToUpper();
140 
141  if (fOption == "NEW")
142  fOption = "CREATE";
143 
144  Bool_t create = (fOption == "CREATE");
145  Bool_t recreate = (fOption == "RECREATE");
146  Bool_t update = (fOption == "UPDATE");
147  Bool_t read = (fOption == "READ");
148 
149  if (!create && !recreate && !update && !read)
150  {
151  read = true;
152  fOption = "READ";
153  }
154 
155  if (recreate)
156  {
157  if (!gSystem->AccessPathName(path, kFileExists))
158  gSystem->Unlink(path);
159 
160  recreate = false;
161  create = true;
162  fOption = "CREATE";
163  }
164  assert(!recreate);
165 
166  if (update && gSystem->AccessPathName(path, kFileExists))
167  {
168  update = kFALSE;
169  create = kTRUE;
170  }
171 
172  int openFlags = IOFlags::OpenRead;
173  if (!read) openFlags |= IOFlags::OpenWrite;
174  if (create) openFlags |= IOFlags::OpenCreate;
175  //if (recreate) openFlags |= IOFlags::OpenCreate | IOFlags::OpenTruncate;
176 
177  // Open storage
178  if (! (storage_ = StorageFactory::get()->open(path, openFlags)))
179  {
180  MakeZombie();
181  gDirectory = gROOT;
182  throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()")
183  << "Cannot open file '" << path << "'";
184  }
185 
186  fRealName = path;
187  fD = 0; // sorry, meaningless
188  fWritable = read ? kFALSE : kTRUE;
189 
190  Init(create);
191 
192  stats.tick(0);
193 }
194 
196 {
197  Close();
198  delete storage_;
199 }
200 
204 
205 Bool_t
206 TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len)
207 {
208  // This function needs to be optimized to minimize seeks.
209  // See TFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) in ROOT 5.27.06.
210  Seek(pos);
211  return ReadBuffer(buf, len);
212 }
213 
214 Bool_t
215 TStorageFactoryFile::ReadBuffer(char *buf, Int_t len)
216 {
217  // Check that it's valid to access this file.
218  if (IsZombie())
219  {
220  Error("ReadBuffer", "Cannot read from a zombie file");
221  return kTRUE;
222  }
223 
224  if (! IsOpen())
225  {
226  Error("ReadBuffer", "Cannot read from a file that is not open");
227  return kTRUE;
228  }
229 
230  // Read specified byte range from the storage. Returns kTRUE in
231  // case of error. Note that ROOT uses this function recursively
232  // to fill the cache; we use a flag to make sure our accounting
233  // is reflected in a comprehensible manner. The "read" counter
234  // will include both, "readc" indicates how much read from the
235  // cache, "readu" indicates how much we failed to read from the
236  // cache (excluding those recursive reads), and "readx" counts
237  // the amount actually passed to read from the storage object.
239 
240  // If we have a cache, read from there first. This returns 0
241  // if the block hasn't been prefetched, 1 if it was in cache,
242  // and 2 if there was an error.
243  if (TFileCacheRead *c = GetCacheRead())
244  {
245  Long64_t here = GetRelOffset();
246  Bool_t async = c->IsAsyncReading();
247 
248  StorageAccount::Stamp cstats(async
249  ? storageCounter(s_statsCPrefetch, "readPrefetchToCache")
250  : storageCounter(s_statsCRead, "readViaCache"));
251 
252  Int_t st = ReadBufferViaCache(async ? 0 : buf, len);
253 
254  if (st == 2) {
255  return kTRUE;
256  }
257 
258  if (st == 1) {
259  if (async) {
260  cstats.tick(len);
261  Seek(here);
262  } else {
263  cstats.tick(len);
264  stats.tick(len);
265  return kFALSE;
266  }
267  }
268  }
269 
270  // FIXME: Re-enable read-ahead if the data wasn't in cache.
271  // if (! st) storage_->caching(true, -1, s_readahead);
272 
273  // A real read
274  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
275  IOSize n = storage_->xread(buf, len);
276  xstats.tick(n);
277  stats.tick(n);
278  return n ? kFALSE : kTRUE;
279 }
280 
281 Bool_t
282 TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len)
283 {
284  // Check that it's valid to access this file.
285  if (IsZombie())
286  {
287  Error("ReadBufferAsync", "Cannot read from a zombie file");
288  return kTRUE;
289  }
290 
291  if (! IsOpen())
292  {
293  Error("ReadBufferAsync", "Cannot read from a file that is not open");
294  return kTRUE;
295  }
296 
298 
299  // If asynchronous reading is disabled, bail out now, regardless
300  // whether the underlying storage supports prefetching. If it is
301  // forced on, pretend it's on, even if the storage doesn't support
302  // it, as this turns off the caching in ROOT's side.
304 
305  // Verify that we never using async reads in app-only mode
307  return kTRUE;
308 
309  // Let the I/O method indicate if it can do client-side prefetch.
310  // If it does, then for example TTreeCache will drop its own cache
311  // and will use the client-side cache of the actual I/O layer.
312  // If len is zero ROOT is probing for prefetch support.
313  if (len) {
314  // FIXME: Synchronise caching.
315  // storage_->caching(true, -1, 0);
316  ;
317  }
318 
319  IOPosBuffer iov(off, (void *) 0, len ? len : 4096);
320  if (storage_->prefetch(&iov, 1))
321  {
322  stats.tick(len);
323  return kFALSE;
324  }
325 
326  // Always ask ROOT to use async reads in storage-only mode,
327  // regardless of whether the storage system supports it.
329  return kFALSE;
330 
331  // Prefetching not available right now.
332  return kTRUE;
333 }
334 
335 Bool_t
336 TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
337 {
358  Int_t remaining = nbuf; // Number of read requests left to process.
359  Int_t pack_count; // Number of read requests processed by this iteration.
360 
361  IOSize remaining_buffer_size=0;
362  // Calculate the remaining buffer size for the ROOT-owned buffer by adding
363  // the size of the various requests.
364  for (Int_t i=0; i<nbuf; i++) remaining_buffer_size+=len[i];
365 
366  char *current_buffer = buf;
367  Long64_t *current_pos = pos;
368  Int_t *current_len = len;
369 
370  ReadRepacker repacker;
371 
372  while (remaining > 0) {
373 
374  pack_count = repacker.pack(static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
375 
376  int real_bytes_processed = repacker.realBytesProcessed();
377  IOSize io_buffer_used = repacker.bufferUsed();
378 
379  // Issue readv, then unpack buffers.
380  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
381  std::vector<IOPosBuffer> &iov = repacker.iov();
382  IOSize result = storage_->readv(&iov[0], iov.size());
383  if (result != io_buffer_used) {
384  return kTRUE;
385  }
386  xstats.tick(io_buffer_used);
387  repacker.unpack(current_buffer);
388 
389  // Update the location of the unused part of the input buffer.
390  remaining_buffer_size -= real_bytes_processed;
391  current_buffer += real_bytes_processed;
392 
393  current_pos += pack_count;
394  current_len += pack_count;
395  remaining -= pack_count;
396 
397  }
398  assert(remaining_buffer_size == 0);
399  return kFALSE;
400 }
401 
402 Bool_t
403 TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
404 {
405  // Check that it's valid to access this file.
406  if (IsZombie())
407  {
408  Error("ReadBuffers", "Cannot read from a zombie file");
409  return kTRUE;
410  }
411 
412  if (! IsOpen())
413  {
414  Error("ReadBuffers", "Cannot read from a file that is not open");
415  return kTRUE;
416  }
417 
418  // For synchronous reads, we have special logic to optimize the I/O requests
419  // from ROOT before handing it to the storage.
420  if (buf)
421  {
422  return ReadBuffersSync(buf, pos, len, nbuf);
423  }
424  // For an async read, we assume the storage system is smart enough to do the
425  // optimization itself.
426 
427  // Read from underlying storage.
428  void* const nobuf = 0;
429  Int_t total = 0;
430  std::vector<IOPosBuffer> iov;
431  iov.reserve(nbuf);
432  for (Int_t i = 0; i < nbuf; ++i)
433  {
434  iov.push_back(IOPosBuffer(pos[i], nobuf, len[i]));
435  total += len[i];
436  }
437 
438  // Null buffer means asynchronous reads into I/O system's cache.
439  bool success;
440  StorageAccount::Stamp astats(storageCounter(s_statsARead, "readAsync"));
441  // Synchronise low-level cache with the supposed cache in TFile.
442  // storage_->caching(true, -1, 0);
443  success = storage_->prefetch(&iov[0], nbuf);
444  astats.tick(total);
445 
446  // If it didn't suceeed, pass down to the base class.
447  return success ? kFALSE : TFile::ReadBuffers(buf, pos, len, nbuf);
448 }
449 
450 Bool_t
451 TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len)
452 {
453  // Check that it's valid to access this file.
454  if (IsZombie())
455  {
456  Error("WriteBuffer", "Cannot write to a zombie file");
457  return kTRUE;
458  }
459 
460  if (! IsOpen())
461  {
462  Error("WriteBuffer", "Cannot write to a file that is not open");
463  return kTRUE;
464  }
465 
466  if (! fWritable)
467  {
468  Error("WriteBuffer", "File is not writable");
469  return kTRUE;
470  }
471 
473  StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, "writeViaCache"));
474 
475  // Try first writing via a cache, and if that's not possible, directly.
476  Int_t st;
477  switch ((st = WriteBufferViaCache(buf, len)))
478  {
479  case 0:
480  // Actual write.
481  {
482  StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, "writeActual"));
483  IOSize n = storage_->xwrite(buf, len);
484  xstats.tick(n);
485  stats.tick(n);
486 
487  // FIXME: What if it's a short write?
488  return n > 0 ? kFALSE : kTRUE;
489  }
490 
491  case 1:
492  cstats.tick(len);
493  stats.tick(len);
494  return kFALSE;
495 
496  case 2:
497  default:
498  Error("WriteBuffer", "Error writing to cache");
499  return kTRUE;
500  }
501 }
502 
506 // FIXME: Override GetBytesToPrefetch() so XROOTD can suggest how
507 // large a prefetch cache to use.
508 // FIXME: Asynchronous open support?
509 
513 Int_t
514 TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t /* mode */)
515 {
517 
518  if (storage_)
519  {
520  storage_->close();
521  delete storage_;
522  storage_ = 0;
523  }
524 
525  int openFlags = IOFlags::OpenRead;
526  if (flags & O_WRONLY) openFlags = IOFlags::OpenWrite;
527  else if (flags & O_RDWR) openFlags |= IOFlags::OpenWrite;
528  if (flags & O_CREAT) openFlags |= IOFlags::OpenCreate;
529  if (flags & O_APPEND) openFlags |= IOFlags::OpenAppend;
530  if (flags & O_EXCL) openFlags |= IOFlags::OpenExclusive;
531  if (flags & O_TRUNC) openFlags |= IOFlags::OpenTruncate;
532  if (flags & O_NONBLOCK) openFlags |= IOFlags::OpenNonBlock;
533 
534  if (! (storage_ = StorageFactory::get()->open(pathname, openFlags)))
535  {
536  MakeZombie();
537  gDirectory = gROOT;
538  throw cms::Exception("TStorageFactoryFile::SysOpen()")
539  << "Cannot open file '" << pathname << "'";
540  }
541 
542  stats.tick();
543  return 0;
544 }
545 
546 Int_t
548 {
550 
551  if (storage_)
552  {
553  storage_->close();
554  delete storage_;
555  storage_ = 0;
556  }
557 
558  stats.tick();
559  return 0;
560 }
561 
562 Long64_t
563 TStorageFactoryFile::SysSeek(Int_t /* fd */, Long64_t offset, Int_t whence)
564 {
566  Storage::Relative rel = (whence == SEEK_SET ? Storage::SET
567  : whence == SEEK_CUR ? Storage::CURRENT
568  : Storage::END);
569 
570  offset = storage_->position(offset, rel);
571  stats.tick();
572  return offset;
573 }
574 
575 Int_t
577 {
579  storage_->flush();
580  stats.tick();
581  return 0;
582 }
583 
584 Int_t
585 TStorageFactoryFile::SysStat(Int_t /* fd */, Long_t *id, Long64_t *size,
586  Long_t *flags, Long_t *modtime)
587 {
589  // FIXME: Most of this is unsupported or makes no sense with Storage
590  *id = ::Hash(fRealName);
591  *size = storage_->size();
592  *flags = 0;
593  *modtime = 0;
594  stats.tick();
595  return 0;
596 }
597 
598 void
600 {
601  TSystem::ResetErrno();
602 }
IOSize xwrite(const void *from, IOSize n)
Definition: IOOutput.cc:176
int i
Definition: DBlmapReader.cc:9
CacheHint cacheHint(void) const
static StorageAccount::Counter * s_statsWrite
static StorageAccount::Counter * s_statsStat
virtual Long64_t SysSeek(Int_t fd, Long64_t offset, Int_t whence)
virtual IOSize readv(IOPosBuffer *into, IOSize buffers)
Definition: Storage.cc:31
virtual Int_t SysOpen(const char *pathname, Int_t flags, UInt_t mode)
static StorageAccount::Counter * s_statsXWrite
static StorageAccount::Counter * s_statsClose
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
IOSize xread(IOBuffer into)
Definition: IOInput.cc:166
virtual Bool_t ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
Relative
Definition: Storage.h:11
static StorageFactory * get(void)
static StorageAccount::Counter & storageCounter(StorageAccount::Counter *&c, const char *label)
list path
Definition: scaleCards.py:51
IOSize realBytesProcessed() const
Definition: ReadRepacker.h:52
static StorageAccount::Counter * s_statsARead
tuple iov
Definition: o2o.py:307
virtual Bool_t ReadBufferAsync(Long64_t off, Int_t len)
unsigned int(* Counter)(align::ID)
Definition: Counters.h:26
virtual void close(void)
Definition: Storage.cc:128
std::pair< std::string, MonitorElement * > entry
Definition: ME_MAP.h:8
virtual IOOffset position(void) const
Definition: Storage.cc:95
static StorageAccount::Counter * s_statsCtor
tuple result
Definition: query.py:137
virtual void flush(void)
Definition: Storage.cc:124
virtual Bool_t ReadBuffer(char *buf, Int_t len)
virtual Bool_t WriteBuffer(const char *buf, Int_t len)
double f[11][100]
int pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
Definition: ReadRepacker.cc:22
unsigned int offset(bool)
void unpack(char *buf)
static StorageAccount::Counter * s_statsRead
static StorageAccount::Counter * s_statsXRead
static StorageAccount::Counter * s_statsFlush
void tick(double amount=0.) const
virtual bool prefetch(const IOPosBuffer *what, IOSize n)
Definition: Storage.cc:119
unsigned int UInt_t
Definition: FUTypes.h:12
static StorageAccount::Counter * s_statsCWrite
static Counter & counter(const std::string &storageClass, const std::string &operation)
IOSize bufferUsed() const
Definition: ReadRepacker.h:49
virtual IOOffset size(void) const
Definition: Storage.cc:102
string const
Definition: compareJSON.py:14
std::vector< IOPosBuffer > & iov()
Definition: ReadRepacker.h:47
void ResetErrno(void) const
#define update(a, b)
virtual Int_t SysStat(Int_t fd, Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime)
#define O_NONBLOCK
Definition: SysFile.h:21
size_t IOSize
Definition: IOTypes.h:14
virtual Int_t SysClose(Int_t fd)
static StorageAccount::Counter * s_statsSeek
static StorageAccount::Counter * s_statsOpen
virtual Int_t SysSync(Int_t fd)
Bool_t ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
void Initialize(const char *name, Option_t *option="")
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.
static StorageAccount::Counter * s_statsCPrefetch
static StorageAccount::Counter * s_statsCRead