CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
TStorageFactoryFile.cc
Go to the documentation of this file.
8 #include "ReadRepacker.h"
9 #include "TFileCacheRead.h"
10 #include "TSystem.h"
11 #include "TROOT.h"
12 #include "TEnv.h"
13 #include <errno.h>
14 #include <sys/stat.h>
15 #include <unistd.h>
16 #include <fcntl.h>
17 #include <iostream>
18 #include <cassert>
19 
20 #if 0
21 #include "TTreeCache.h"
22 #include "TTree.h"
23 
24 class TTreeCacheDebug : public TTreeCache {
25 public:
26  void dump(const char *label, const char *trailer)
27  {
28  Long64_t entry = fOwner->GetReadEntry();
29  std::cerr
30  << label << ": " << entry << " "
31  << "{ fEntryMin=" << fEntryMin
32  << ", fEntryMax=" << fEntryMax
33  << ", fEntryNext=" << fEntryNext
34  << ", fZipBytes=" << fZipBytes
35  << ", fNbranches=" << fNbranches
36  << ", fNReadOk=" << fNReadOk
37  << ", fNReadMiss=" << fNReadMiss
38  << ", fNReadPref=" << fNReadPref
39  << ", fBranches=" << fBranches
40  << ", fBrNames=" << fBrNames
41  << ", fOwner=" << fOwner
42  << ", fTree=" << fTree
43  << ", fIsLearning=" << fIsLearning
44  << ", fIsManual=" << fIsManual
45  << "; fBufferSizeMin=" << fBufferSizeMin
46  << ", fBufferSize=" << fBufferSize
47  << ", fBufferLen=" << fBufferLen
48  << ", fBytesToPrefetch=" << fBytesToPrefetch
49  << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
50  << ", fAsyncReading=" << fAsyncReading
51  << ", fNseek=" << fNseek
52  << ", fNtot=" << fNtot
53  << ", fNb=" << fNb
54  << ", fSeekSize=" << fSeekSize
55  << ", fSeek=" << fSeek
56  << ", fSeekIndex=" << fSeekIndex
57  << ", fSeekSort=" << fSeekSort
58  << ", fPos=" << fPos
59  << ", fSeekLen=" << fSeekLen
60  << ", fSeekSortLen=" << fSeekSortLen
61  << ", fSeekPos=" << fSeekPos
62  << ", fLen=" << fLen
63  << ", fFile=" << fFile
64  << ", fBuffer=" << (void *) fBuffer
65  << ", fIsSorted=" << fIsSorted
66  << " }\n" << trailer;
67  }
68 };
69 #endif
70 
71 ClassImp(TStorageFactoryFile)
86 
87 static inline StorageAccount::Counter &
89 {
90  if (! c) c = &StorageAccount::counter("tstoragefile", label);
91  return *c;
92 }
93 
95  : storage_(0)
96 {
98  stats.tick(0);
99 }
100 
101 // This constructor must be compatible with *all* the various built-in TFile plugins,
102 // including TXNetFile. This is why some arguments in the constructor is ignored.
103 // If there's a future T*File that is incompatible with this constructor, a new
104 // constructor will have to be added.
106  Option_t *option,
107  const char *ftitle,
108  Int_t compress,
109  Int_t netopt,
110  Bool_t parallelopen /* = kFALSE */)
111  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
112  storage_(0)
113 {
114  Initialize(path, option);
115 }
116 
118  Option_t *option /* = "" */,
119  const char *ftitle /* = "" */,
120  Int_t compress /* = 1 */)
121  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
122  storage_(0)
123 {
124  Initialize(path, option);
125 }
126 
127 void
129  Option_t *option /* = "" */)
130 {
131  StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
132 
133  // Enable AsyncReading.
134  // This was the default for 5.27, but turned off by default for 5.32.
135  // In our testing, AsyncReading is the fastest mechanism available.
136  // In 5.32, the AsyncPrefetching mechanism is preferred, but has been a
137  // performance hit in our "average case" tests.
138  gEnv->SetValue("TFile.AsyncReading", 1);
139 
140  // Parse options; at the moment we only accept read!
141  fOption = option;
142  fOption.ToUpper();
143 
144  if (fOption == "NEW")
145  fOption = "CREATE";
146 
147  Bool_t create = (fOption == "CREATE");
148  Bool_t recreate = (fOption == "RECREATE");
149  Bool_t update = (fOption == "UPDATE");
150  Bool_t read = (fOption == "READ");
151 
152  if (!create && !recreate && !update && !read)
153  {
154  read = true;
155  fOption = "READ";
156  }
157 
158  if (recreate)
159  {
160  if (!gSystem->AccessPathName(path, kFileExists))
161  gSystem->Unlink(path);
162 
163  recreate = false;
164  create = true;
165  fOption = "CREATE";
166  }
167  assert(!recreate);
168 
169  if (update && gSystem->AccessPathName(path, kFileExists))
170  {
171  update = kFALSE;
172  create = kTRUE;
173  }
174 
175  assert(read || update || create);
176 
177  int openFlags = IOFlags::OpenRead;
178  if (!read) openFlags |= IOFlags::OpenWrite;
179  if (create) openFlags |= IOFlags::OpenCreate;
180  //if (recreate) openFlags |= IOFlags::OpenCreate | IOFlags::OpenTruncate;
181 
182  // Open storage
183  if (! (storage_ = StorageFactory::get()->open(path, openFlags)))
184  {
185  MakeZombie();
186  gDirectory = gROOT;
187  throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()")
188  << "Cannot open file '" << path << "'";
189  }
190 
191  // Record the statistics.
192  try {
194  if (statsService.isAvailable()) {
195  statsService->setSize(storage_->size());
196  }
197  } catch (edm::Exception e) {
198  if (e.categoryCode() != edm::errors::NotFound) {
199  throw;
200  }
201  }
202 
203  fRealName = path;
204  fD = 0; // sorry, meaningless
205  fWritable = read ? kFALSE : kTRUE;
206 
207  Init(create);
208 
209  stats.tick(0);
210 }
211 
213 {
214  Close();
215  delete storage_;
216 }
217 
221 
222 Bool_t
223 TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len)
224 {
225  // This function needs to be optimized to minimize seeks.
226  // See TFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) in ROOT 5.27.06.
227  Seek(pos);
228  return ReadBuffer(buf, len);
229 }
230 
231 Bool_t
232 TStorageFactoryFile::ReadBuffer(char *buf, Int_t len)
233 {
234  // Check that it's valid to access this file.
235  if (IsZombie())
236  {
237  Error("ReadBuffer", "Cannot read from a zombie file");
238  return kTRUE;
239  }
240 
241  if (! IsOpen())
242  {
243  Error("ReadBuffer", "Cannot read from a file that is not open");
244  return kTRUE;
245  }
246 
247  // Read specified byte range from the storage. Returns kTRUE in
248  // case of error. Note that ROOT uses this function recursively
249  // to fill the cache; we use a flag to make sure our accounting
250  // is reflected in a comprehensible manner. The "read" counter
251  // will include both, "readc" indicates how much read from the
252  // cache, "readu" indicates how much we failed to read from the
253  // cache (excluding those recursive reads), and "readx" counts
254  // the amount actually passed to read from the storage object.
256 
257  // If we have a cache, read from there first. This returns 0
258  // if the block hasn't been prefetched, 1 if it was in cache,
259  // and 2 if there was an error.
260  if (TFileCacheRead *c = GetCacheRead())
261  {
262  Long64_t here = GetRelOffset();
263  Bool_t async = c->IsAsyncReading();
264 
265  StorageAccount::Stamp cstats(async
266  ? storageCounter(s_statsCPrefetch, "readPrefetchToCache")
267  : storageCounter(s_statsCRead, "readViaCache"));
268 
269  Int_t st = ReadBufferViaCache(async ? 0 : buf, len);
270 
271  if (st == 2) {
272  return kTRUE;
273  }
274 
275  if (st == 1) {
276  if (async) {
277  cstats.tick(len);
278  Seek(here);
279  } else {
280  cstats.tick(len);
281  stats.tick(len);
282  return kFALSE;
283  }
284  }
285  }
286 
287  // FIXME: Re-enable read-ahead if the data wasn't in cache.
288  // if (! st) storage_->caching(true, -1, s_readahead);
289 
290  // A real read
291  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
292  IOSize n = storage_->xread(buf, len);
293  xstats.tick(n);
294  stats.tick(n);
295  return n ? kFALSE : kTRUE;
296 }
297 
298 Bool_t
299 TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len)
300 {
301  // Check that it's valid to access this file.
302  if (IsZombie())
303  {
304  Error("ReadBufferAsync", "Cannot read from a zombie file");
305  return kTRUE;
306  }
307 
308  if (! IsOpen())
309  {
310  Error("ReadBufferAsync", "Cannot read from a file that is not open");
311  return kTRUE;
312  }
313 
315 
316  // If asynchronous reading is disabled, bail out now, regardless
317  // whether the underlying storage supports prefetching. If it is
318  // forced on, pretend it's on, even if the storage doesn't support
319  // it, as this turns off the caching in ROOT's side.
321 
322  // Verify that we never using async reads in app-only mode
324  return kTRUE;
325 
326  // Let the I/O method indicate if it can do client-side prefetch.
327  // If it does, then for example TTreeCache will drop its own cache
328  // and will use the client-side cache of the actual I/O layer.
329  // If len is zero ROOT is probing for prefetch support.
330  if (len) {
331  // FIXME: Synchronise caching.
332  // storage_->caching(true, -1, 0);
333  ;
334  }
335 
336  IOPosBuffer iov(off, (void *) 0, len ? len : PREFETCH_PROBE_LENGTH);
337  if (storage_->prefetch(&iov, 1))
338  {
339  stats.tick(len);
340  return kFALSE;
341  }
342 
343  // Always ask ROOT to use async reads in storage-only mode,
344  // regardless of whether the storage system supports it.
346  return kFALSE;
347 
348  // Prefetching not available right now.
349  return kTRUE;
350 }
351 
352 Bool_t
353 TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
354 {
375  Int_t remaining = nbuf; // Number of read requests left to process.
376  Int_t pack_count; // Number of read requests processed by this iteration.
377 
378  IOSize remaining_buffer_size=0;
379  // Calculate the remaining buffer size for the ROOT-owned buffer by adding
380  // the size of the various requests.
381  for (Int_t i=0; i<nbuf; i++) remaining_buffer_size+=len[i];
382 
383  char *current_buffer = buf;
384  Long64_t *current_pos = pos;
385  Int_t *current_len = len;
386 
387  ReadRepacker repacker;
388 
389  while (remaining > 0) {
390 
391  pack_count = repacker.pack(static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
392 
393  int real_bytes_processed = repacker.realBytesProcessed();
394  IOSize io_buffer_used = repacker.bufferUsed();
395 
396  // Issue readv, then unpack buffers.
397  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
398  std::vector<IOPosBuffer> &iov = repacker.iov();
399  IOSize result = storage_->readv(&iov[0], iov.size());
400  if (result != io_buffer_used) {
401  return kTRUE;
402  }
403  xstats.tick(io_buffer_used);
404  repacker.unpack(current_buffer);
405 
406  // Update the location of the unused part of the input buffer.
407  remaining_buffer_size -= real_bytes_processed;
408  current_buffer += real_bytes_processed;
409 
410  current_pos += pack_count;
411  current_len += pack_count;
412  remaining -= pack_count;
413 
414  }
415  assert(remaining_buffer_size == 0);
416  return kFALSE;
417 }
418 
419 Bool_t
420 TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
421 {
422  // Check that it's valid to access this file.
423  if (IsZombie())
424  {
425  Error("ReadBuffers", "Cannot read from a zombie file");
426  return kTRUE;
427  }
428 
429  if (! IsOpen())
430  {
431  Error("ReadBuffers", "Cannot read from a file that is not open");
432  return kTRUE;
433  }
434 
435  // For synchronous reads, we have special logic to optimize the I/O requests
436  // from ROOT before handing it to the storage.
437  if (buf)
438  {
439  return ReadBuffersSync(buf, pos, len, nbuf);
440  }
441  // For an async read, we assume the storage system is smart enough to do the
442  // optimization itself.
443 
444  // Read from underlying storage.
445  void* const nobuf = 0;
446  Int_t total = 0;
447  std::vector<IOPosBuffer> iov;
448  iov.reserve(nbuf);
449  for (Int_t i = 0; i < nbuf; ++i)
450  {
451  iov.push_back(IOPosBuffer(pos[i], nobuf, len[i]));
452  total += len[i];
453  }
454 
455  // Null buffer means asynchronous reads into I/O system's cache.
456  bool success;
457  StorageAccount::Stamp astats(storageCounter(s_statsARead, "readAsync"));
458  // Synchronise low-level cache with the supposed cache in TFile.
459  // storage_->caching(true, -1, 0);
460  success = storage_->prefetch(&iov[0], nbuf);
461  astats.tick(total);
462 
463  // If it didn't suceeed, pass down to the base class.
464  return success ? kFALSE : TFile::ReadBuffers(buf, pos, len, nbuf);
465 }
466 
467 Bool_t
468 TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len)
469 {
470  // Check that it's valid to access this file.
471  if (IsZombie())
472  {
473  Error("WriteBuffer", "Cannot write to a zombie file");
474  return kTRUE;
475  }
476 
477  if (! IsOpen())
478  {
479  Error("WriteBuffer", "Cannot write to a file that is not open");
480  return kTRUE;
481  }
482 
483  if (! fWritable)
484  {
485  Error("WriteBuffer", "File is not writable");
486  return kTRUE;
487  }
488 
490  StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, "writeViaCache"));
491 
492  // Try first writing via a cache, and if that's not possible, directly.
493  Int_t st;
494  switch ((st = WriteBufferViaCache(buf, len)))
495  {
496  case 0:
497  // Actual write.
498  {
499  StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, "writeActual"));
500  IOSize n = storage_->xwrite(buf, len);
501  xstats.tick(n);
502  stats.tick(n);
503 
504  // FIXME: What if it's a short write?
505  return n > 0 ? kFALSE : kTRUE;
506  }
507 
508  case 1:
509  cstats.tick(len);
510  stats.tick(len);
511  return kFALSE;
512 
513  case 2:
514  default:
515  Error("WriteBuffer", "Error writing to cache");
516  return kTRUE;
517  }
518 }
519 
523 // FIXME: Override GetBytesToPrefetch() so XROOTD can suggest how
524 // large a prefetch cache to use.
525 // FIXME: Asynchronous open support?
526 
530 Int_t
531 TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t /* mode */)
532 {
534 
535  if (storage_)
536  {
537  storage_->close();
538  delete storage_;
539  storage_ = 0;
540  }
541 
542  int openFlags = IOFlags::OpenRead;
543  if (flags & O_WRONLY) openFlags = IOFlags::OpenWrite;
544  else if (flags & O_RDWR) openFlags |= IOFlags::OpenWrite;
545  if (flags & O_CREAT) openFlags |= IOFlags::OpenCreate;
546  if (flags & O_APPEND) openFlags |= IOFlags::OpenAppend;
547  if (flags & O_EXCL) openFlags |= IOFlags::OpenExclusive;
548  if (flags & O_TRUNC) openFlags |= IOFlags::OpenTruncate;
549  if (flags & O_NONBLOCK) openFlags |= IOFlags::OpenNonBlock;
550 
551  if (! (storage_ = StorageFactory::get()->open(pathname, openFlags)))
552  {
553  MakeZombie();
554  gDirectory = gROOT;
555  throw cms::Exception("TStorageFactoryFile::SysOpen()")
556  << "Cannot open file '" << pathname << "'";
557  }
558 
559  stats.tick();
560  return 0;
561 }
562 
563 Int_t
565 {
567 
568  if (storage_)
569  {
570  storage_->close();
571  delete storage_;
572  storage_ = 0;
573  }
574 
575  stats.tick();
576  return 0;
577 }
578 
579 Long64_t
580 TStorageFactoryFile::SysSeek(Int_t /* fd */, Long64_t offset, Int_t whence)
581 {
583  Storage::Relative rel = (whence == SEEK_SET ? Storage::SET
584  : whence == SEEK_CUR ? Storage::CURRENT
585  : Storage::END);
586 
587  offset = storage_->position(offset, rel);
588  stats.tick();
589  return offset;
590 }
591 
592 Int_t
594 {
596  storage_->flush();
597  stats.tick();
598  return 0;
599 }
600 
601 Int_t
602 TStorageFactoryFile::SysStat(Int_t /* fd */, Long_t *id, Long64_t *size,
603  Long_t *flags, Long_t *modtime)
604 {
606  // FIXME: Most of this is unsupported or makes no sense with Storage
607  *id = ::Hash(fRealName);
608  *size = storage_->size();
609  *flags = 0;
610  *modtime = 0;
611  stats.tick();
612  return 0;
613 }
614 
615 void
617 {
618  TSystem::ResetErrno();
619 }
Code categoryCode() const
Definition: EDMException.h:96
IOSize xwrite(const void *from, IOSize n)
Definition: IOOutput.cc:176
int i
Definition: DBlmapReader.cc:9
CacheHint cacheHint(void) const
static StorageAccount::Counter * s_statsWrite
static StorageAccount::Counter * s_statsStat
#define PREFETCH_PROBE_LENGTH
Definition: Storage.h:18
virtual Long64_t SysSeek(Int_t fd, Long64_t offset, Int_t whence)
void tick(double amount=0., int64_t tick=0) const
virtual IOSize readv(IOPosBuffer *into, IOSize buffers)
Definition: Storage.cc:31
virtual Int_t SysOpen(const char *pathname, Int_t flags, UInt_t mode)
static StorageAccount::Counter * s_statsXWrite
static StorageAccount::Counter * s_statsClose
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
IOSize xread(IOBuffer into)
Definition: IOInput.cc:166
virtual Bool_t ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
Relative
Definition: Storage.h:23
static StorageFactory * get(void)
static StorageAccount::Counter & storageCounter(StorageAccount::Counter *&c, const char *label)
IOSize realBytesProcessed() const
Definition: ReadRepacker.h:52
static StorageAccount::Counter * s_statsARead
tuple iov
Definition: o2o.py:307
virtual Bool_t ReadBufferAsync(Long64_t off, Int_t len)
unsigned int(* Counter)(align::ID, const TrackerTopology *)
Definition: Counters.h:27
virtual void close(void)
Definition: Storage.cc:128
std::pair< std::string, MonitorElement * > entry
Definition: ME_MAP.h:8
virtual IOOffset position(void) const
Definition: Storage.cc:95
static StorageAccount::Counter * s_statsCtor
tuple result
Definition: query.py:137
virtual void flush(void)
Definition: Storage.cc:124
virtual Bool_t ReadBuffer(char *buf, Int_t len)
bool isAvailable() const
Definition: Service.h:47
virtual Bool_t WriteBuffer(const char *buf, Int_t len)
double f[11][100]
int pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
Definition: ReadRepacker.cc:22
unsigned int offset(bool)
void unpack(char *buf)
static StorageAccount::Counter * s_statsRead
static StorageAccount::Counter * s_statsXRead
static StorageAccount::Counter * s_statsFlush
virtual bool prefetch(const IOPosBuffer *what, IOSize n)
Definition: Storage.cc:119
unsigned int UInt_t
Definition: FUTypes.h:12
static StorageAccount::Counter * s_statsCWrite
static Counter & counter(const std::string &storageClass, const std::string &operation)
IOSize bufferUsed() const
Definition: ReadRepacker.h:49
virtual IOOffset size(void) const
Definition: Storage.cc:102
string const
Definition: compareJSON.py:14
std::vector< IOPosBuffer > & iov()
Definition: ReadRepacker.h:47
void ResetErrno(void) const
#define update(a, b)
virtual Int_t SysStat(Int_t fd, Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime)
#define O_NONBLOCK
Definition: SysFile.h:21
size_t IOSize
Definition: IOTypes.h:14
virtual Int_t SysClose(Int_t fd)
static StorageAccount::Counter * s_statsSeek
static StorageAccount::Counter * s_statsOpen
virtual Int_t SysSync(Int_t fd)
Bool_t ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
void Initialize(const char *name, Option_t *option="")
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.
static StorageAccount::Counter * s_statsCPrefetch
static StorageAccount::Counter * s_statsCRead