CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
TStorageFactoryFile.cc
Go to the documentation of this file.
9 #include "ReadRepacker.h"
10 #include "TFileCacheRead.h"
11 #include "TSystem.h"
12 #include "TROOT.h"
13 #include "TEnv.h"
14 #include <errno.h>
15 #include <sys/stat.h>
16 #include <unistd.h>
17 #include <fcntl.h>
18 #include <iostream>
19 #include <cassert>
20 
21 #if 0
22 #include "TTreeCache.h"
23 #include "TTree.h"
24 
25 class TTreeCacheDebug : public TTreeCache {
26 public:
27  void dump(const char *label, const char *trailer)
28  {
29  Long64_t entry = fOwner->GetReadEntry();
30  std::cerr
31  << label << ": " << entry << " "
32  << "{ fEntryMin=" << fEntryMin
33  << ", fEntryMax=" << fEntryMax
34  << ", fEntryNext=" << fEntryNext
35  << ", fZipBytes=" << fZipBytes
36  << ", fNbranches=" << fNbranches
37  << ", fNReadOk=" << fNReadOk
38  << ", fNReadMiss=" << fNReadMiss
39  << ", fNReadPref=" << fNReadPref
40  << ", fBranches=" << fBranches
41  << ", fBrNames=" << fBrNames
42  << ", fOwner=" << fOwner
43  << ", fTree=" << fTree
44  << ", fIsLearning=" << fIsLearning
45  << ", fIsManual=" << fIsManual
46  << "; fBufferSizeMin=" << fBufferSizeMin
47  << ", fBufferSize=" << fBufferSize
48  << ", fBufferLen=" << fBufferLen
49  << ", fBytesToPrefetch=" << fBytesToPrefetch
50  << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
51  << ", fAsyncReading=" << fAsyncReading
52  << ", fNseek=" << fNseek
53  << ", fNtot=" << fNtot
54  << ", fNb=" << fNb
55  << ", fSeekSize=" << fSeekSize
56  << ", fSeek=" << fSeek
57  << ", fSeekIndex=" << fSeekIndex
58  << ", fSeekSort=" << fSeekSort
59  << ", fPos=" << fPos
60  << ", fSeekLen=" << fSeekLen
61  << ", fSeekSortLen=" << fSeekSortLen
62  << ", fSeekPos=" << fSeekPos
63  << ", fLen=" << fLen
64  << ", fFile=" << fFile
65  << ", fBuffer=" << (void *) fBuffer
66  << ", fIsSorted=" << fIsSorted
67  << " }\n" << trailer;
68  }
69 };
70 #endif
71 
72 ClassImp(TStorageFactoryFile)
87 
88 
89 static inline StorageAccount::Counter &
91 {
92  if (! c) c = &StorageAccount::counter("tstoragefile", label);
93  return *c;
94 }
95 
97  : storage_(0)
98 {
100  stats.tick(0);
101 }
102 
103 // This constructor must be compatible with *all* the various built-in TFile plugins,
104 // including TXNetFile. This is why some arguments in the constructor is ignored.
105 // If there's a future T*File that is incompatible with this constructor, a new
106 // constructor will have to be added.
108  Option_t *option,
109  const char *ftitle,
110  Int_t compress,
111  Int_t netopt,
112  Bool_t parallelopen /* = kFALSE */)
113  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
114  storage_(0)
115 {
116  try {
117  Initialize(path, option);
118  } catch (...) {
119  edm::threadLocalException::setException(std::current_exception()); // capture
120  }
121 }
122 
124  Option_t *option /* = "" */,
125  const char *ftitle /* = "" */,
126  Int_t compress /* = 1 */)
127  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
128  storage_(0)
129 {
130  try {
131  Initialize(path, option);
132  } catch (...) {
133  edm::threadLocalException::setException(std::current_exception()); // capture
134  }
135 }
136 
137 void
139  Option_t *option /* = "" */)
140 {
141  StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
142 
143  // Enable AsyncReading.
144  // This was the default for 5.27, but turned off by default for 5.32.
145  // In our testing, AsyncReading is the fastest mechanism available.
146  // In 5.32, the AsyncPrefetching mechanism is preferred, but has been a
147  // performance hit in our "average case" tests.
148  gEnv->SetValue("TFile.AsyncReading", 1);
149 
150  // Parse options; at the moment we only accept read!
151  fOption = option;
152  fOption.ToUpper();
153 
154  if (fOption == "NEW")
155  fOption = "CREATE";
156 
157  Bool_t create = (fOption == "CREATE");
158  Bool_t recreate = (fOption == "RECREATE");
159  Bool_t update = (fOption == "UPDATE");
160  Bool_t read = (fOption == "READ") || (fOption == "READWRAP");
161  Bool_t readwrap = (fOption == "READWRAP");
162 
163  if (!create && !recreate && !update && !read)
164  {
165  read = true;
166  fOption = "READ";
167  }
168 
169  if (recreate)
170  {
171  if (!gSystem->AccessPathName(path, kFileExists))
172  gSystem->Unlink(path);
173 
174  recreate = false;
175  create = true;
176  fOption = "CREATE";
177  }
178  assert(!recreate);
179 
180  if (update && gSystem->AccessPathName(path, kFileExists))
181  {
182  update = kFALSE;
183  create = kTRUE;
184  }
185 
186  assert(read || update || create);
187 
188  int openFlags = IOFlags::OpenRead;
189  if (!read) openFlags |= IOFlags::OpenWrite;
190  if (create) openFlags |= IOFlags::OpenCreate;
191  //if (recreate) openFlags |= IOFlags::OpenCreate | IOFlags::OpenTruncate;
192  if (readwrap) openFlags |= IOFlags::OpenWrap;
193 
194  // Open storage
195  if (! (storage_ = StorageFactory::get()->open(path, openFlags)))
196  {
197  MakeZombie();
198  gDirectory = gROOT;
199  throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()")
200  << "Cannot open file '" << path << "'";
201  }
202 
203  // Record the statistics.
204  try {
206  if (statsService.isAvailable()) {
207  statsService->setSize(storage_->size());
208  }
209  } catch (edm::Exception e) {
210  if (e.categoryCode() != edm::errors::NotFound) {
211  throw;
212  }
213  }
214 
215  fRealName = path;
216  fD = 0; // sorry, meaningless
217  fWritable = read ? kFALSE : kTRUE;
218 
219  Init(create);
220 
221  stats.tick(0);
222 }
223 
225 {
226  Close();
227  delete storage_;
228 }
229 
233 
234 Bool_t
235 TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len)
236 {
237  // This function needs to be optimized to minimize seeks.
238  // See TFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) in ROOT 5.27.06.
239  Seek(pos);
240  return ReadBuffer(buf, len);
241 }
242 
243 Bool_t
244 TStorageFactoryFile::ReadBuffer(char *buf, Int_t len)
245 {
246  // Check that it's valid to access this file.
247  if (IsZombie())
248  {
249  Error("ReadBuffer", "Cannot read from a zombie file");
250  return kTRUE;
251  }
252 
253  if (! IsOpen())
254  {
255  Error("ReadBuffer", "Cannot read from a file that is not open");
256  return kTRUE;
257  }
258 
259  // Read specified byte range from the storage. Returns kTRUE in
260  // case of error. Note that ROOT uses this function recursively
261  // to fill the cache; we use a flag to make sure our accounting
262  // is reflected in a comprehensible manner. The "read" counter
263  // will include both, "readc" indicates how much read from the
264  // cache, "readu" indicates how much we failed to read from the
265  // cache (excluding those recursive reads), and "readx" counts
266  // the amount actually passed to read from the storage object.
268 
269  // If we have a cache, read from there first. This returns 0
270  // if the block hasn't been prefetched, 1 if it was in cache,
271  // and 2 if there was an error.
272  if (TFileCacheRead *c = GetCacheRead())
273  {
274  Long64_t here = GetRelOffset();
275  Bool_t async = c->IsAsyncReading();
276 
277  StorageAccount::Stamp cstats(async
278  ? storageCounter(s_statsCPrefetch, "readPrefetchToCache")
279  : storageCounter(s_statsCRead, "readViaCache"));
280 
281  Int_t st = ReadBufferViaCache(async ? 0 : buf, len);
282 
283  if (st == 2) {
284  return kTRUE;
285  }
286 
287  if (st == 1) {
288  if (async) {
289  cstats.tick(len);
290  Seek(here);
291  } else {
292  cstats.tick(len);
293  stats.tick(len);
294  return kFALSE;
295  }
296  }
297  }
298 
299  // FIXME: Re-enable read-ahead if the data wasn't in cache.
300  // if (! st) storage_->caching(true, -1, s_readahead);
301 
302  // A real read
303  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
304  IOSize n = storage_->xread(buf, len);
305  xstats.tick(n);
306  stats.tick(n);
307  return n ? kFALSE : kTRUE;
308 }
309 
310 Bool_t
311 TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len)
312 {
313  // Check that it's valid to access this file.
314  if (IsZombie())
315  {
316  Error("ReadBufferAsync", "Cannot read from a zombie file");
317  return kTRUE;
318  }
319 
320  if (! IsOpen())
321  {
322  Error("ReadBufferAsync", "Cannot read from a file that is not open");
323  return kTRUE;
324  }
325 
327 
328  // If asynchronous reading is disabled, bail out now, regardless
329  // whether the underlying storage supports prefetching. If it is
330  // forced on, pretend it's on, even if the storage doesn't support
331  // it, as this turns off the caching in ROOT's side.
333 
334  // Verify that we never using async reads in app-only mode
336  return kTRUE;
337 
338  // Let the I/O method indicate if it can do client-side prefetch.
339  // If it does, then for example TTreeCache will drop its own cache
340  // and will use the client-side cache of the actual I/O layer.
341  // If len is zero ROOT is probing for prefetch support.
342  if (len) {
343  // FIXME: Synchronise caching.
344  // storage_->caching(true, -1, 0);
345  ;
346  }
347 
348  IOPosBuffer iov(off, (void *) 0, len ? len : PREFETCH_PROBE_LENGTH);
349  if (storage_->prefetch(&iov, 1))
350  {
351  stats.tick(len);
352  return kFALSE;
353  }
354 
355  // Always ask ROOT to use async reads in storage-only mode,
356  // regardless of whether the storage system supports it.
358  return kFALSE;
359 
360  // Prefetching not available right now.
361  return kTRUE;
362 }
363 
364 Bool_t
365 TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
366 {
387  Int_t remaining = nbuf; // Number of read requests left to process.
388  Int_t pack_count; // Number of read requests processed by this iteration.
389 
390  IOSize remaining_buffer_size=0;
391  // Calculate the remaining buffer size for the ROOT-owned buffer by adding
392  // the size of the various requests.
393  for (Int_t i=0; i<nbuf; i++) remaining_buffer_size+=len[i];
394 
395  char *current_buffer = buf;
396  Long64_t *current_pos = pos;
397  Int_t *current_len = len;
398 
399  ReadRepacker repacker;
400 
401  while (remaining > 0) {
402 
403  pack_count = repacker.pack(static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
404 
405  int real_bytes_processed = repacker.realBytesProcessed();
406  IOSize io_buffer_used = repacker.bufferUsed();
407 
408  // Issue readv, then unpack buffers.
409  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
410  std::vector<IOPosBuffer> &iov = repacker.iov();
411  IOSize result = storage_->readv(&iov[0], iov.size());
412  if (result != io_buffer_used) {
413  return kTRUE;
414  }
415  xstats.tick(io_buffer_used);
416  repacker.unpack(current_buffer);
417 
418  // Update the location of the unused part of the input buffer.
419  remaining_buffer_size -= real_bytes_processed;
420  current_buffer += real_bytes_processed;
421 
422  current_pos += pack_count;
423  current_len += pack_count;
424  remaining -= pack_count;
425 
426  }
427  assert(remaining_buffer_size == 0);
428  return kFALSE;
429 }
430 
431 Bool_t
432 TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
433 {
434  // Check that it's valid to access this file.
435  if (IsZombie())
436  {
437  Error("ReadBuffers", "Cannot read from a zombie file");
438  return kTRUE;
439  }
440 
441  if (! IsOpen())
442  {
443  Error("ReadBuffers", "Cannot read from a file that is not open");
444  return kTRUE;
445  }
446 
447  // For synchronous reads, we have special logic to optimize the I/O requests
448  // from ROOT before handing it to the storage.
449  if (buf)
450  {
451  return ReadBuffersSync(buf, pos, len, nbuf);
452  }
453  // For an async read, we assume the storage system is smart enough to do the
454  // optimization itself.
455 
456  // Read from underlying storage.
457  void* const nobuf = 0;
458  Int_t total = 0;
459  std::vector<IOPosBuffer> iov;
460  iov.reserve(nbuf);
461  for (Int_t i = 0; i < nbuf; ++i)
462  {
463  iov.push_back(IOPosBuffer(pos[i], nobuf, len[i]));
464  total += len[i];
465  }
466 
467  // Null buffer means asynchronous reads into I/O system's cache.
468  bool success;
469  StorageAccount::Stamp astats(storageCounter(s_statsARead, "readAsync"));
470  // Synchronise low-level cache with the supposed cache in TFile.
471  // storage_->caching(true, -1, 0);
472  success = storage_->prefetch(&iov[0], nbuf);
473  astats.tick(total);
474 
475  // If it didn't suceeed, pass down to the base class.
476  return success ? kFALSE : TFile::ReadBuffers(buf, pos, len, nbuf);
477 }
478 
479 Bool_t
480 TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len)
481 {
482  // Check that it's valid to access this file.
483  if (IsZombie())
484  {
485  Error("WriteBuffer", "Cannot write to a zombie file");
486  return kTRUE;
487  }
488 
489  if (! IsOpen())
490  {
491  Error("WriteBuffer", "Cannot write to a file that is not open");
492  return kTRUE;
493  }
494 
495  if (! fWritable)
496  {
497  Error("WriteBuffer", "File is not writable");
498  return kTRUE;
499  }
500 
502  StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, "writeViaCache"));
503 
504  // Try first writing via a cache, and if that's not possible, directly.
505  Int_t st;
506  switch ((st = WriteBufferViaCache(buf, len)))
507  {
508  case 0:
509  // Actual write.
510  {
511  StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, "writeActual"));
512  IOSize n = storage_->xwrite(buf, len);
513  xstats.tick(n);
514  stats.tick(n);
515 
516  // FIXME: What if it's a short write?
517  return n > 0 ? kFALSE : kTRUE;
518  }
519 
520  case 1:
521  cstats.tick(len);
522  stats.tick(len);
523  return kFALSE;
524 
525  case 2:
526  default:
527  Error("WriteBuffer", "Error writing to cache");
528  return kTRUE;
529  }
530 }
531 
535 // FIXME: Override GetBytesToPrefetch() so XROOTD can suggest how
536 // large a prefetch cache to use.
537 // FIXME: Asynchronous open support?
538 
542 Int_t
543 TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t /* mode */)
544 {
546 
547  if (storage_)
548  {
549  storage_->close();
550  delete storage_;
551  storage_ = 0;
552  }
553 
554  int openFlags = IOFlags::OpenRead;
555  if (flags & O_WRONLY) openFlags = IOFlags::OpenWrite;
556  else if (flags & O_RDWR) openFlags |= IOFlags::OpenWrite;
557  if (flags & O_CREAT) openFlags |= IOFlags::OpenCreate;
558  if (flags & O_APPEND) openFlags |= IOFlags::OpenAppend;
559  if (flags & O_EXCL) openFlags |= IOFlags::OpenExclusive;
560  if (flags & O_TRUNC) openFlags |= IOFlags::OpenTruncate;
561  if (flags & O_NONBLOCK) openFlags |= IOFlags::OpenNonBlock;
562 
563  if (! (storage_ = StorageFactory::get()->open(pathname, openFlags)))
564  {
565  MakeZombie();
566  gDirectory = gROOT;
567  throw cms::Exception("TStorageFactoryFile::SysOpen()")
568  << "Cannot open file '" << pathname << "'";
569  }
570 
571  stats.tick();
572  return 0;
573 }
574 
575 Int_t
577 {
579 
580  if (storage_)
581  {
582  storage_->close();
583  delete storage_;
584  storage_ = 0;
585  }
586 
587  stats.tick();
588  return 0;
589 }
590 
591 Long64_t
592 TStorageFactoryFile::SysSeek(Int_t /* fd */, Long64_t offset, Int_t whence)
593 {
595  Storage::Relative rel = (whence == SEEK_SET ? Storage::SET
596  : whence == SEEK_CUR ? Storage::CURRENT
597  : Storage::END);
598 
599  offset = storage_->position(offset, rel);
600  stats.tick();
601  return offset;
602 }
603 
604 Int_t
606 {
608  storage_->flush();
609  stats.tick();
610  return 0;
611 }
612 
613 Int_t
614 TStorageFactoryFile::SysStat(Int_t /* fd */, Long_t *id, Long64_t *size,
615  Long_t *flags, Long_t *modtime)
616 {
618  // FIXME: Most of this is unsupported or makes no sense with Storage
619  *id = ::Hash(fRealName);
620  *size = storage_->size();
621  *flags = 0;
622  *modtime = 0;
623  stats.tick();
624  return 0;
625 }
626 
627 void
629 {
630  TSystem::ResetErrno();
631 }
Code categoryCode() const
Definition: EDMException.h:93
IOSize xwrite(const void *from, IOSize n)
Definition: IOOutput.cc:176
int i
Definition: DBlmapReader.cc:9
CacheHint cacheHint(void) const
static StorageAccount::Counter * s_statsWrite
static StorageAccount::Counter * s_statsStat
#define PREFETCH_PROBE_LENGTH
Definition: Storage.h:18
virtual Long64_t SysSeek(Int_t fd, Long64_t offset, Int_t whence)
void tick(double amount=0., int64_t tick=0) const
virtual IOSize readv(IOPosBuffer *into, IOSize buffers)
Definition: Storage.cc:31
virtual Int_t SysOpen(const char *pathname, Int_t flags, UInt_t mode)
static StorageAccount::Counter * s_statsXWrite
static StorageAccount::Counter * s_statsClose
assert(m_qm.get())
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
IOSize xread(IOBuffer into)
Definition: IOInput.cc:166
virtual Bool_t ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
Relative
Definition: Storage.h:23
static StorageFactory * get(void)
static StorageAccount::Counter & storageCounter(StorageAccount::Counter *&c, const char *label)
IOSize realBytesProcessed() const
Definition: ReadRepacker.h:52
static StorageAccount::Counter * s_statsARead
tuple path
else: Piece not in the list, fine.
tuple iov
Definition: o2o.py:307
virtual Bool_t ReadBufferAsync(Long64_t off, Int_t len)
unsigned int(* Counter)(align::ID, const TrackerTopology *)
Definition: Counters.h:27
virtual void close(void)
Definition: Storage.cc:128
virtual IOOffset position(void) const
Definition: Storage.cc:95
static StorageAccount::Counter * s_statsCtor
tuple result
Definition: query.py:137
virtual void flush(void)
Definition: Storage.cc:124
virtual Bool_t ReadBuffer(char *buf, Int_t len)
bool isAvailable() const
Definition: Service.h:46
XrdSiteStatisticsInformation * statsService
Definition: XrdSource.cc:108
std::string Hash
Definition: Types.h:43
virtual Bool_t WriteBuffer(const char *buf, Int_t len)
double f[11][100]
int pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
Definition: ReadRepacker.cc:22
void unpack(char *buf)
static StorageAccount::Counter * s_statsRead
static StorageAccount::Counter * s_statsXRead
static StorageAccount::Counter * s_statsFlush
virtual bool prefetch(const IOPosBuffer *what, IOSize n)
Definition: Storage.cc:119
static StorageAccount::Counter * s_statsCWrite
static Counter & counter(const std::string &storageClass, const std::string &operation)
IOSize bufferUsed() const
Definition: ReadRepacker.h:49
virtual IOOffset size(void) const
Definition: Storage.cc:102
string const
Definition: compareJSON.py:14
std::vector< IOPosBuffer > & iov()
Definition: ReadRepacker.h:47
void setException(std::exception_ptr e)
void ResetErrno(void) const
#define update(a, b)
virtual Int_t SysStat(Int_t fd, Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime)
#define O_NONBLOCK
Definition: SysFile.h:21
size_t IOSize
Definition: IOTypes.h:14
virtual Int_t SysClose(Int_t fd)
static StorageAccount::Counter * s_statsSeek
static StorageAccount::Counter * s_statsOpen
virtual Int_t SysSync(Int_t fd)
Bool_t ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
void Initialize(const char *name, Option_t *option="")
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.
static StorageAccount::Counter * s_statsCPrefetch
static StorageAccount::Counter * s_statsCRead