CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
TStorageFactoryFile.cc
Go to the documentation of this file.
6 #include "TFileCacheRead.h"
7 #include "TSystem.h"
8 #include "TROOT.h"
9 #include <errno.h>
10 #include <sys/stat.h>
11 #include <unistd.h>
12 #include <fcntl.h>
13 #include <iostream>
14 
15 #if 0
16 #include "TTreeCache.h"
17 #include "TTree.h"
18 
19 class TTreeCacheDebug : public TTreeCache {
20 public:
21  void dump(const char *label, const char *trailer)
22  {
23  Long64_t entry = fOwner->GetReadEntry();
24  std::cerr
25  << label << ": " << entry << " "
26  << "{ fEntryMin=" << fEntryMin
27  << ", fEntryMax=" << fEntryMax
28  << ", fEntryNext=" << fEntryNext
29  << ", fZipBytes=" << fZipBytes
30  << ", fNbranches=" << fNbranches
31  << ", fNReadOk=" << fNReadOk
32  << ", fNReadMiss=" << fNReadMiss
33  << ", fNReadPref=" << fNReadPref
34  << ", fBranches=" << fBranches
35  << ", fBrNames=" << fBrNames
36  << ", fOwner=" << fOwner
37  << ", fTree=" << fTree
38  << ", fIsLearning=" << fIsLearning
39  << ", fIsManual=" << fIsManual
40  << "; fBufferSizeMin=" << fBufferSizeMin
41  << ", fBufferSize=" << fBufferSize
42  << ", fBufferLen=" << fBufferLen
43  << ", fBytesToPrefetch=" << fBytesToPrefetch
44  << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
45  << ", fAsyncReading=" << fAsyncReading
46  << ", fNseek=" << fNseek
47  << ", fNtot=" << fNtot
48  << ", fNb=" << fNb
49  << ", fSeekSize=" << fSeekSize
50  << ", fSeek=" << fSeek
51  << ", fSeekIndex=" << fSeekIndex
52  << ", fSeekSort=" << fSeekSort
53  << ", fPos=" << fPos
54  << ", fSeekLen=" << fSeekLen
55  << ", fSeekSortLen=" << fSeekSortLen
56  << ", fSeekPos=" << fSeekPos
57  << ", fLen=" << fLen
58  << ", fFile=" << fFile
59  << ", fBuffer=" << (void *) fBuffer
60  << ", fIsSorted=" << fIsSorted
61  << " }\n" << trailer;
62  }
63 };
64 #endif
65 
66 ClassImp(TStorageFactoryFile)
81 
82 static inline StorageAccount::Counter &
84 {
85  if (! c) c = &StorageAccount::counter("tstoragefile", label);
86  return *c;
87 }
88 
90  : storage_(0)
91 {
93  stats.tick(0);
94 }
95 
96 
98  Option_t *option /* = "" */,
99  const char *ftitle /* = "" */,
100  Int_t compress /* = 1 */)
101  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
102  storage_(0)
103 {
104  StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
105 
106  // Parse options; at the moment we only accept read!
107  fOption = option;
108  fOption.ToUpper();
109 
110  if (fOption == "NEW")
111  fOption = "CREATE";
112 
113  Bool_t create = (fOption == "CREATE");
114  Bool_t recreate = (fOption == "RECREATE");
115  Bool_t update = (fOption == "UPDATE");
116  Bool_t read = (fOption == "READ");
117 
118  if (!create && !recreate && !update && !read)
119  {
120  read = true;
121  fOption = "READ";
122  }
123 
124  if (recreate)
125  {
126  if (!gSystem->AccessPathName(path, kFileExists))
127  gSystem->Unlink(path);
128 
129  recreate = false;
130  create = true;
131  fOption = "CREATE";
132  }
133 
134  if (update && gSystem->AccessPathName(path, kFileExists))
135  {
136  update = kFALSE;
137  create = kTRUE;
138  }
139 
140  int openFlags = IOFlags::OpenRead;
141  if (!read) openFlags |= IOFlags::OpenWrite;
142  if (create) openFlags |= IOFlags::OpenCreate;
143  if (recreate) openFlags |= IOFlags::OpenCreate | IOFlags::OpenTruncate;
144 
145  // Open storage
146  if (! (storage_ = StorageFactory::get()->open(path, openFlags)))
147  {
148  MakeZombie();
149  gDirectory = gROOT;
150  throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()")
151  << "Cannot open file '" << path << "'";
152  }
153 
154  fRealName = path;
155  fD = 0; // sorry, meaningless
156  fWritable = read ? kFALSE : kTRUE;
157 
158  Init(create);
159 
160  stats.tick(0);
161 }
162 
164 {
165  Close();
166  delete storage_;
167 }
168 
172 
173 Bool_t
174 TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len)
175 {
176  // This function needs to be optimized to minimize seeks.
177  // See TFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) in ROOT 5.27.06.
178  Seek(pos);
179  return ReadBuffer(buf, len);
180 }
181 
182 Bool_t
183 TStorageFactoryFile::ReadBuffer(char *buf, Int_t len)
184 {
185  // Check that it's valid to access this file.
186  if (IsZombie())
187  {
188  Error("ReadBuffer", "Cannot read from a zombie file");
189  return kTRUE;
190  }
191 
192  if (! IsOpen())
193  {
194  Error("ReadBuffer", "Cannot read from a file that is not open");
195  return kTRUE;
196  }
197 
198  // Read specified byte range from the storage. Returns kTRUE in
199  // case of error. Note that ROOT uses this function recursively
200  // to fill the cache; we use a flag to make sure our accounting
201  // is reflected in a comprehensible manner. The "read" counter
202  // will include both, "readc" indicates how much read from the
203  // cache, "readu" indicates how much we failed to read from the
204  // cache (excluding those recursive reads), and "readx" counts
205  // the amount actually passed to read from the storage object.
207 
208  // If we have a cache, read from there first. This returns 0
209  // if the block hasn't been prefetched, 1 if it was in cache,
210  // and 2 if there was an error.
211  if (TFileCacheRead *c = GetCacheRead())
212  {
213  Long64_t here = GetRelOffset();
214  Bool_t async = c->IsAsyncReading();
215 
216  StorageAccount::Stamp cstats(async
217  ? storageCounter(s_statsCPrefetch, "readPrefetchToCache")
218  : storageCounter(s_statsCRead, "readViaCache"));
219 
220  Int_t st = ReadBufferViaCache(async ? 0 : buf, len);
221 
222  if (st == 2) {
223  return kTRUE;
224  }
225 
226  if (st == 1) {
227  if (async) {
228  cstats.tick(len);
229  Seek(here);
230  } else {
231  cstats.tick(len);
232  stats.tick(len);
233  return kFALSE;
234  }
235  }
236  }
237 
238  // FIXME: Re-enable read-ahead if the data wasn't in cache.
239  // if (! st) storage_->caching(true, -1, s_readahead);
240 
241  // A real read
242  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
243  IOSize n = storage_->xread(buf, len);
244  xstats.tick(n);
245  stats.tick(n);
246  return n ? kFALSE : kTRUE;
247 }
248 
249 Bool_t
250 TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len)
251 {
252  // Check that it's valid to access this file.
253  if (IsZombie())
254  {
255  Error("ReadBufferAsync", "Cannot read from a zombie file");
256  return kTRUE;
257  }
258 
259  if (! IsOpen())
260  {
261  Error("ReadBufferAsync", "Cannot read from a file that is not open");
262  return kTRUE;
263  }
264 
266 
267  // If asynchronous reading is disabled, bail out now, regardless
268  // whether the underlying storage supports prefetching. If it is
269  // forced on, pretend it's on, even if the storage doesn't support
270  // it, as this turns off the caching in ROOT's side.
272 
273  // Verify that we never using async reads in app-only mode
275  return kTRUE;
276 
277  // Let the I/O method indicate if it can do client-side prefetch.
278  // If it does, then for example TTreeCache will drop its own cache
279  // and will use the client-side cache of the actual I/O layer.
280  // If len is zero ROOT is probing for prefetch support.
281  if (len) {
282  // FIXME: Synchronise caching.
283  // storage_->caching(true, -1, 0);
284  ;
285  }
286 
287  IOPosBuffer iov(off, (void *) 0, len ? len : 4096);
288  if (storage_->prefetch(&iov, 1))
289  {
290  stats.tick(len);
291  return kFALSE;
292  }
293 
294  // Always ask ROOT to use async reads in storage-only mode,
295  // regardless of whether the storage system supports it.
297  return kFALSE;
298 
299  // Prefetching not available right now.
300  return kTRUE;
301 }
302 
303 Bool_t
304 TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
305 {
306  // Check that it's valid to access this file.
307  if (IsZombie())
308  {
309  Error("ReadBuffers", "Cannot read from a zombie file");
310  return kTRUE;
311  }
312 
313  if (! IsOpen())
314  {
315  Error("ReadBuffers", "Cannot read from a file that is not open");
316  return kTRUE;
317  }
318 
319  // This should coalesce reads into a smaller number of large reads.
320  // If the list of buffers to read has two or more buffers within 256KB of
321  // each other, we collapse them into a single read from the storage system.
322 
323  // buf == 0 implies an async read; in this case, we skip the read coalescing logic.
324  // Note that this code will cause CMSSW to never call readv().
325  if (buf)
326  {
327  // Code ported from ROOT v5.26 trunk by Brian Bockelman.
328  Int_t k = 0;
329  Bool_t result = kTRUE;
330  TFileCacheRead *old = fCacheRead;
331  fCacheRead = 0;
332  IOOffset curbegin = pos[0];
333  IOOffset cur;
334  std::vector<char> buf2(0);
335  Int_t i = 0; // Position in the buffer.
336  Int_t n = 0; // Number of reads we have coalesced.
337 
338  // Size of our coalesce window. In ROOT 5.26, this is actually a variable
339  // you can tweak, but it's not exposed in CMSSW.
340 
341  // Iterate over all the requests we have been given. We either read each
342  // individually or coalesce them into a big read.
343 
344  // Loop over all the requests we have been given. Only trigger a read if the
345  // request at pos[i] wouldn't completely fit into the read coalesce buffer.
346  // If we trigger, then we do a single read for requests i-n to i-1, inclusive.
347  // If n==0, we have special logic.
348  while (i < nbuf)
349  {
350  cur = pos[i]+len[i];
351  Bool_t bigRead = kTRUE;
352  if (cur -curbegin < READ_COALESCE_SIZE)
353  {
354  // Add the current request into the set of buffers we will coalesce
355  n++; // Record we have a new request we will coalesce.
356  i++; // Examine the next request in the next loop.
357  bigRead = kFALSE;
358  }
359  // Only perform a read if one of the following holds:
360  // 1) bigRead=TRUE; i.e., we can't fit any more requests into the window.
361  // 2) i>=nbuf; if i pointed to nbuf-1 at the beginning of the while loop,
362  // then the above logic will either set bigRead=TRUE (making case #1
363  // true) or increment i, making i == nbuf.
364  if (bigRead || (i>=nbuf))
365  {
366  // If n == 0, no read requests could be coalesced. Simple read.
367  if (n == 0)
368  {
369  //if the block to read is about the same size as the read-ahead buffer
370  //we read the block directly
371  Seek(pos[i]);
372 
373  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
374  // if xread returns short, then we have an error. Break from the loop
375  // and return kTRUE - signaling an error.
376  result = ((IOSize)len[i] == storage_->xread(&buf[k], len[i])) ? kFALSE : kTRUE;
377  xstats.tick(len[i]);
378 
379  if (result)
380  break;
381  k += len[i];
382  i++;
383  }
384  else
385  {
386  //otherwise we read all blocks that fit in the read-ahead buffer
387  Seek(curbegin);
388  // Only allocate buf2 once; use std::vector to make sure the memory
389  // gets cleaned up, as xread can toss an exception.
390  if (buf2.capacity() < READ_COALESCE_SIZE)
391  buf2.resize(READ_COALESCE_SIZE);
392  //we read ahead
393  assert(len[i-1] >= 0);
394  assert(pos[i-1] >= curbegin);
395  assert(pos[i-1]-curbegin+len[i-1] <= READ_COALESCE_SIZE);
396  IOSize nahead = IOSized(pos[i-1]-curbegin+len[i-1]);
397 
398  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
399  result = ( nahead == storage_->xread(&buf2[0], nahead)) ? kFALSE : kTRUE;
400  xstats.tick(nahead);
401 
402  if (result)
403  break;
404 
405  // Now, copy the data from the read to the appropriate buffer in
406  // order to fulfill the request.
407  for (Int_t j=0;j<n;j++) {
408  memcpy(&buf[k],&buf2[pos[i-n+j]-curbegin],len[i-n+j]);
409  k += len[i-n+j];
410  }
411  n = 0;
412  }
413  curbegin = pos[i];
414  }
415  }
416  fCacheRead = old;
417  return result;
418  }
419 
420  // Read from underlying storage.
421  Int_t total = 0;
422  std::vector<IOPosBuffer> iov;
423  iov.reserve(nbuf);
424  for (Int_t i = 0; i < nbuf; ++i)
425  {
426  iov.push_back(IOPosBuffer(pos[i], buf ? buf + total : 0, len[i]));
427  total += len[i];
428  }
429 
430  // Null buffer means asynchronous reads into I/O system's cache.
431  bool success;
432  StorageAccount::Stamp astats(storageCounter(s_statsARead, "readAsync"));
433  // Synchronise low-level cache with the supposed cache in TFile.
434  // storage_->caching(true, -1, 0);
435  success = storage_->prefetch(&iov[0], nbuf);
436  astats.tick(total);
437 
438  // If it didn't suceeed, pass down to the base class.
439  return success ? kFALSE : TFile::ReadBuffers(buf, pos, len, nbuf);
440 }
441 
442 Bool_t
443 TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len)
444 {
445  // Check that it's valid to access this file.
446  if (IsZombie())
447  {
448  Error("WriteBuffer", "Cannot write to a zombie file");
449  return kTRUE;
450  }
451 
452  if (! IsOpen())
453  {
454  Error("WriteBuffer", "Cannot write to a file that is not open");
455  return kTRUE;
456  }
457 
458  if (! fWritable)
459  {
460  Error("WriteBuffer", "File is not writable");
461  return kTRUE;
462  }
463 
465  StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, "writeViaCache"));
466 
467  // Try first writing via a cache, and if that's not possible, directly.
468  Int_t st;
469  switch ((st = WriteBufferViaCache(buf, len)))
470  {
471  case 0:
472  // Actual write.
473  {
474  StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, "writeActual"));
475  IOSize n = storage_->xwrite(buf, len);
476  xstats.tick(n);
477  stats.tick(n);
478 
479  // FIXME: What if it's a short write?
480  return n > 0 ? kFALSE : kTRUE;
481  }
482 
483  case 1:
484  cstats.tick(len);
485  stats.tick(len);
486  return kFALSE;
487 
488  case 2:
489  default:
490  Error("WriteBuffer", "Error writing to cache");
491  return kTRUE;
492  }
493 }
494 
498 // FIXME: Override GetBytesToPrefetch() so XROOTD can suggest how
499 // large a prefetch cache to use.
500 // FIXME: Asynchronous open support?
501 
505 Int_t
506 TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t /* mode */)
507 {
509 
510  if (storage_)
511  {
512  storage_->close();
513  delete storage_;
514  storage_ = 0;
515  }
516 
517  int openFlags = IOFlags::OpenRead;
518  if (flags & O_WRONLY) openFlags = IOFlags::OpenWrite;
519  else if (flags & O_RDWR) openFlags |= IOFlags::OpenWrite;
520  if (flags & O_CREAT) openFlags |= IOFlags::OpenCreate;
521  if (flags & O_APPEND) openFlags |= IOFlags::OpenAppend;
522  if (flags & O_EXCL) openFlags |= IOFlags::OpenExclusive;
523  if (flags & O_TRUNC) openFlags |= IOFlags::OpenTruncate;
524  if (flags & O_NONBLOCK) openFlags |= IOFlags::OpenNonBlock;
525 
526  if (! (storage_ = StorageFactory::get()->open(pathname, openFlags)))
527  {
528  MakeZombie();
529  gDirectory = gROOT;
530  throw cms::Exception("TStorageFactoryFile::SysOpen()")
531  << "Cannot open file '" << pathname << "'";
532  }
533 
534  stats.tick();
535  return 0;
536 }
537 
538 Int_t
540 {
542 
543  if (storage_)
544  {
545  storage_->close();
546  delete storage_;
547  storage_ = 0;
548  }
549 
550  stats.tick();
551  return 0;
552 }
553 
554 Long64_t
555 TStorageFactoryFile::SysSeek(Int_t /* fd */, Long64_t offset, Int_t whence)
556 {
558  Storage::Relative rel = (whence == SEEK_SET ? Storage::SET
559  : whence == SEEK_CUR ? Storage::CURRENT
560  : Storage::END);
561 
562  offset = storage_->position(offset, rel);
563  stats.tick();
564  return offset;
565 }
566 
567 Int_t
569 {
571  storage_->flush();
572  stats.tick();
573  return 0;
574 }
575 
576 Int_t
577 TStorageFactoryFile::SysStat(Int_t /* fd */, Long_t *id, Long64_t *size,
578  Long_t *flags, Long_t *modtime)
579 {
581  // FIXME: Most of this is unsupported or makes no sense with Storage
582  *id = ::Hash(fRealName);
583  *size = storage_->size();
584  *flags = 0;
585  *modtime = 0;
586  stats.tick();
587  return 0;
588 }
589 
590 void
592 {
593  TSystem::ResetErrno();
594 }
IOSize xwrite(const void *from, IOSize n)
Definition: IOOutput.cc:176
int i
Definition: DBlmapReader.cc:9
CacheHint cacheHint(void) const
static StorageAccount::Counter * s_statsWrite
static StorageAccount::Counter * s_statsStat
virtual Long64_t SysSeek(Int_t fd, Long64_t offset, Int_t whence)
const std::string & label
Definition: MVAComputer.cc:186
virtual Int_t SysOpen(const char *pathname, Int_t flags, UInt_t mode)
static StorageAccount::Counter * s_statsXWrite
static StorageAccount::Counter * s_statsClose
std::vector< Variable::Flags > flags
Definition: MVATrainer.cc:135
IOSize xread(IOBuffer into)
Definition: IOInput.cc:166
virtual Bool_t ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
int path() const
Definition: HLTadd.h:3
Relative
Definition: Storage.h:11
static StorageFactory * get(void)
static StorageAccount::Counter & storageCounter(StorageAccount::Counter *&c, const char *label)
static StorageAccount::Counter * s_statsARead
tuple iov
Definition: o2o.py:307
virtual Bool_t ReadBufferAsync(Long64_t off, Int_t len)
unsigned int(* Counter)(align::ID)
Definition: Counters.h:26
virtual void close(void)
Definition: Storage.cc:128
std::pair< std::string, MonitorElement * > entry
Definition: ME_MAP.h:8
virtual IOOffset position(void) const
Definition: Storage.cc:95
static StorageAccount::Counter * s_statsCtor
tuple result
Definition: query.py:137
virtual void flush(void)
Definition: Storage.cc:124
virtual Bool_t ReadBuffer(char *buf, Int_t len)
int j
Definition: DBlmapReader.cc:9
virtual Bool_t WriteBuffer(const char *buf, Int_t len)
double f[11][100]
unsigned int offset(bool)
static StorageAccount::Counter * s_statsRead
static StorageAccount::Counter * s_statsXRead
int k[5][pyjets_maxn]
static StorageAccount::Counter * s_statsFlush
void tick(double amount=0.) const
virtual bool prefetch(const IOPosBuffer *what, IOSize n)
Definition: Storage.cc:119
unsigned int UInt_t
Definition: FUTypes.h:12
static StorageAccount::Counter * s_statsCWrite
static Counter & counter(const std::string &storageClass, const std::string &operation)
virtual IOOffset size(void) const
Definition: Storage.cc:102
string const
Definition: compareJSON.py:14
int64_t IOOffset
Definition: IOTypes.h:19
void ResetErrno(void) const
IOSize IOSized(IOOffset n)
Definition: IOTypes.h:37
#define update(a, b)
virtual Int_t SysStat(Int_t fd, Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime)
#define O_NONBLOCK
Definition: SysFile.h:21
size_t IOSize
Definition: IOTypes.h:14
virtual Int_t SysClose(Int_t fd)
static StorageAccount::Counter * s_statsSeek
static StorageAccount::Counter * s_statsOpen
virtual Int_t SysSync(Int_t fd)
SurfaceDeformation * create(int type, const std::vector< double > &params)
tuple size
Write out results.
static StorageAccount::Counter * s_statsCPrefetch
#define READ_COALESCE_SIZE
static StorageAccount::Counter * s_statsCRead