CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
TStorageFactoryFile.cc
Go to the documentation of this file.
9 #include "ReadRepacker.h"
10 #include "TFileCacheRead.h"
11 #include "TSystem.h"
12 #include "TROOT.h"
13 #include "TEnv.h"
14 #include <cerrno>
15 #include <sys/stat.h>
16 #include <unistd.h>
17 #include <fcntl.h>
18 #include <iostream>
19 #include <cassert>
20 
21 #if 0
22 #include "TTreeCache.h"
23 #include "TTree.h"
24 
25 class TTreeCacheDebug : public TTreeCache {
26 public:
27  void dump(const char *label, const char *trailer)
28  {
29  Long64_t entry = fOwner->GetReadEntry();
30  std::cerr
31  << label << ": " << entry << " "
32  << "{ fEntryMin=" << fEntryMin
33  << ", fEntryMax=" << fEntryMax
34  << ", fEntryNext=" << fEntryNext
35  << ", fZipBytes=" << fZipBytes
36  << ", fNbranches=" << fNbranches
37  << ", fNReadOk=" << fNReadOk
38  << ", fNReadMiss=" << fNReadMiss
39  << ", fNReadPref=" << fNReadPref
40  << ", fBranches=" << fBranches
41  << ", fBrNames=" << fBrNames
42  << ", fOwner=" << fOwner
43  << ", fTree=" << fTree
44  << ", fIsLearning=" << fIsLearning
45  << ", fIsManual=" << fIsManual
46  << "; fBufferSizeMin=" << fBufferSizeMin
47  << ", fBufferSize=" << fBufferSize
48  << ", fBufferLen=" << fBufferLen
49  << ", fBytesToPrefetch=" << fBytesToPrefetch
50  << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
51  << ", fAsyncReading=" << fAsyncReading
52  << ", fNseek=" << fNseek
53  << ", fNtot=" << fNtot
54  << ", fNb=" << fNb
55  << ", fSeekSize=" << fSeekSize
56  << ", fSeek=" << fSeek
57  << ", fSeekIndex=" << fSeekIndex
58  << ", fSeekSort=" << fSeekSort
59  << ", fPos=" << fPos
60  << ", fSeekLen=" << fSeekLen
61  << ", fSeekSortLen=" << fSeekSortLen
62  << ", fSeekPos=" << fSeekPos
63  << ", fLen=" << fLen
64  << ", fFile=" << fFile
65  << ", fBuffer=" << (void *) fBuffer
66  << ", fIsSorted=" << fIsSorted
67  << " }\n" << trailer;
68  }
69 };
70 #endif
71 
72 using namespace edm::storage;
73 
89 
91  StorageAccount::Operation operation) {
92  static const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
93  if (!c)
94  c = &StorageAccount::counter(token, operation);
95  return *c;
96 }
97 
100  stats.tick(0);
101 }
102 
103 // This constructor must be compatible with *all* the various built-in TFile plugins,
104 // including TXNetFile. This is why some arguments in the constructor is ignored.
105 // If there's a future T*File that is incompatible with this constructor, a new
106 // constructor will have to be added.
108  Option_t *option,
109  const char *ftitle,
110  Int_t compress,
111  Int_t netopt,
112  Bool_t parallelopen /* = kFALSE */)
113  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
114  storage_() {
115  try {
116  Initialize(path, option);
117  } catch (...) {
118  edm::threadLocalException::setException(std::current_exception()); // capture
119  }
120 }
121 
123  Option_t *option /* = "" */,
124  const char *ftitle /* = "" */,
125  Int_t compress /* = 1 */)
126  : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
127  storage_() {
128  try {
129  Initialize(path, option);
130  } catch (...) {
131  edm::threadLocalException::setException(std::current_exception()); // capture
132  }
133 }
134 
135 void TStorageFactoryFile::Initialize(const char *path, Option_t *option /* = "" */) {
137 
138  // Enable AsyncReading.
139  // This was the default for 5.27, but turned off by default for 5.32.
140  // In our testing, AsyncReading is the fastest mechanism available.
141  // In 5.32, the AsyncPrefetching mechanism is preferred, but has been a
142  // performance hit in our "average case" tests.
143  gEnv->SetValue("TFile.AsyncReading", 1);
144 
145  // Parse options; at the moment we only accept read!
146  fOption = option;
147  fOption.ToUpper();
148 
149  if (fOption == "NEW")
150  fOption = "CREATE";
151 
152  Bool_t create = (fOption == "CREATE");
153  Bool_t recreate = (fOption == "RECREATE");
154  Bool_t update = (fOption == "UPDATE");
155  Bool_t read = (fOption == "READ") || (fOption == "READWRAP");
156  Bool_t readwrap = (fOption == "READWRAP");
157 
158  if (!create && !recreate && !update && !read) {
159  read = true;
160  fOption = "READ";
161  }
162 
163  if (recreate) {
164  if (!gSystem->AccessPathName(path, kFileExists))
165  gSystem->Unlink(path);
166 
167  recreate = false;
168  create = true;
169  fOption = "CREATE";
170  }
171  assert(!recreate);
172 
173  if (update && gSystem->AccessPathName(path, kFileExists)) {
174  update = kFALSE;
175  create = kTRUE;
176  }
177 
178  assert(read || update || create);
179 
180  int openFlags = IOFlags::OpenRead;
181  if (!read)
182  openFlags |= IOFlags::OpenWrite;
183  if (create)
184  openFlags |= IOFlags::OpenCreate;
185  //if (recreate) openFlags |= IOFlags::OpenCreate | IOFlags::OpenTruncate;
186  if (readwrap)
187  openFlags |= IOFlags::OpenWrap;
188 
189  // Open storage
190  if (!(storage_ = StorageFactory::get()->open(path, openFlags))) {
191  MakeZombie();
192  gDirectory = gROOT;
193  throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()") << "Cannot open file '" << path << "'";
194  }
195 
196  // Record the statistics.
197  try {
199  if (statsService.isAvailable()) {
200  statsService->setSize(path, storage_->size());
201  }
202  } catch (edm::Exception const &e) {
203  if (e.categoryCode() != edm::errors::NotFound) {
204  throw;
205  }
206  }
207 
208  fRealName = path;
209  fD = 0; // sorry, meaningless
210  fWritable = read ? kFALSE : kTRUE;
211 
212  Init(create);
213 
214  stats.tick(0);
215 }
216 
218 
222 
223 Bool_t TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) {
224  // This function needs to be optimized to minimize seeks.
225  // See TFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) in ROOT 5.27.06.
226  Seek(pos);
227  return ReadBuffer(buf, len);
228 }
229 
230 Bool_t TStorageFactoryFile::ReadBuffer(char *buf, Int_t len) {
231  // Check that it's valid to access this file.
232  if (IsZombie()) {
233  Error("ReadBuffer", "Cannot read from a zombie file");
234  return kTRUE;
235  }
236 
237  if (!IsOpen()) {
238  Error("ReadBuffer", "Cannot read from a file that is not open");
239  return kTRUE;
240  }
241 
242  // Read specified byte range from the storage. Returns kTRUE in
243  // case of error. Note that ROOT uses this function recursively
244  // to fill the cache; we use a flag to make sure our accounting
245  // is reflected in a comprehensible manner. The "read" counter
246  // will include both, "readc" indicates how much read from the
247  // cache, "readu" indicates how much we failed to read from the
248  // cache (excluding those recursive reads), and "readx" counts
249  // the amount actually passed to read from the storage object.
251 
252  // If we have a cache, read from there first. This returns 0
253  // if the block hasn't been prefetched, 1 if it was in cache,
254  // and 2 if there was an error.
255  if (TFileCacheRead *c = GetCacheRead()) {
256  Long64_t here = GetRelOffset();
257  Bool_t async = c->IsAsyncReading();
258 
259  StorageAccount::Stamp cstats(async
260  ? storageCounter(s_statsCPrefetch, StorageAccount::Operation::readPrefetchToCache)
261  : storageCounter(s_statsCRead, StorageAccount::Operation::readViaCache));
262 
263  Int_t st = ReadBufferViaCache(async ? nullptr : buf, len);
264 
265  if (st == 2) {
266  Error("ReadBuffer",
267  "ReadBufferViaCache failed. Asked to read nBytes: %d from offset: %lld with file size: %lld",
268  len,
269  here,
270  GetSize());
271  return kTRUE;
272  }
273 
274  if (st == 1) {
275  if (async) {
276  cstats.tick(len);
277  Seek(here);
278  } else {
279  cstats.tick(len);
280  stats.tick(len);
281  return kFALSE;
282  }
283  }
284  }
285 
286  // FIXME: Re-enable read-ahead if the data wasn't in cache.
287  // if (! st) storage_->caching(true, -1, s_readahead);
288 
289  // A real read
290  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, StorageAccount::Operation::readActual));
291  IOSize n = storage_->xread(buf, len);
292  xstats.tick(n);
293  stats.tick(n);
294  if (n < static_cast<IOSize>(len)) {
295  Error("ReadBuffer",
296  "read from Storage::xread returned %ld. Asked to read n bytes: %d from offset: %lld with file size: %lld",
297  n,
298  len,
299  GetRelOffset(),
300  GetSize());
301  }
302  return n ? kFALSE : kTRUE;
303 }
304 
305 Bool_t TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len) {
306  // Check that it's valid to access this file.
307  if (IsZombie()) {
308  Error("ReadBufferAsync", "Cannot read from a zombie file");
309  return kTRUE;
310  }
311 
312  if (!IsOpen()) {
313  Error("ReadBufferAsync", "Cannot read from a file that is not open");
314  return kTRUE;
315  }
316 
317  StorageAccount::Stamp stats(storageCounter(s_statsARead, StorageAccount::Operation::readAsync));
318 
319  // If asynchronous reading is disabled, bail out now, regardless
320  // whether the underlying storage supports prefetching. If it is
321  // forced on, pretend it's on, even if the storage doesn't support
322  // it, as this turns off the caching in ROOT's side.
324 
325  // Verify that we never using async reads in app-only mode
326  if (f->cacheHint() == StorageFactory::CACHE_HINT_APPLICATION)
327  return kTRUE;
328 
329  // Let the I/O method indicate if it can do client-side prefetch.
330  // If it does, then for example TTreeCache will drop its own cache
331  // and will use the client-side cache of the actual I/O layer.
332  // If len is zero ROOT is probing for prefetch support.
333  if (len) {
334  // FIXME: Synchronise caching.
335  // storage_->caching(true, -1, 0);
336  ;
337  }
338 
339  IOPosBuffer iov(off, (void *)nullptr, len ? len : PREFETCH_PROBE_LENGTH);
340  if (storage_->prefetch(&iov, 1)) {
341  stats.tick(len);
342  return kFALSE;
343  }
344 
345  // Always ask ROOT to use async reads in storage-only mode,
346  // regardless of whether the storage system supports it.
347  if (f->cacheHint() == StorageFactory::CACHE_HINT_STORAGE)
348  return kFALSE;
349 
350  // Prefetching not available right now.
351  return kTRUE;
352 }
353 
354 Bool_t TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) {
375  Int_t remaining = nbuf; // Number of read requests left to process.
376  Int_t pack_count; // Number of read requests processed by this iteration.
377 
378  IOSize remaining_buffer_size = 0;
379  // Calculate the remaining buffer size for the ROOT-owned buffer by adding
380  // the size of the various requests.
381  for (Int_t i = 0; i < nbuf; i++)
382  remaining_buffer_size += len[i];
383 
384  char *current_buffer = buf;
385  Long64_t *current_pos = pos;
386  Int_t *current_len = len;
387 
388  ReadRepacker repacker;
389 
390  while (remaining > 0) {
391  pack_count = repacker.pack(
392  static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
393 
394  int real_bytes_processed = repacker.realBytesProcessed();
395  IOSize io_buffer_used = repacker.bufferUsed();
396 
397  // Issue readv, then unpack buffers.
398  StorageAccount::Stamp xstats(storageCounter(s_statsXRead, StorageAccount::Operation::readActual));
399  std::vector<IOPosBuffer> &iov = repacker.iov();
400  IOSize result = storage_->readv(&iov[0], iov.size());
401  if (result != io_buffer_used) {
402  Error(
403  "ReadBuffersSync", "Storage::readv returned different size result=%ld expected=%ld", result, io_buffer_used);
404  return kTRUE;
405  }
406  xstats.tick(io_buffer_used);
407  repacker.unpack(current_buffer);
408 
409  // Update the location of the unused part of the input buffer.
410  remaining_buffer_size -= real_bytes_processed;
411  current_buffer += real_bytes_processed;
412 
413  current_pos += pack_count;
414  current_len += pack_count;
415  remaining -= pack_count;
416  }
417  assert(remaining_buffer_size == 0);
418  return kFALSE;
419 }
420 
421 Bool_t TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) {
422  // Check that it's valid to access this file.
423  if (IsZombie()) {
424  Error("ReadBuffers", "Cannot read from a zombie file");
425  return kTRUE;
426  }
427 
428  if (!IsOpen()) {
429  Error("ReadBuffers", "Cannot read from a file that is not open");
430  return kTRUE;
431  }
432 
433  // For synchronous reads, we have special logic to optimize the I/O requests
434  // from ROOT before handing it to the storage.
435  if (buf) {
436  return ReadBuffersSync(buf, pos, len, nbuf);
437  }
438  // For an async read, we assume the storage system is smart enough to do the
439  // optimization itself.
440 
441  // Read from underlying storage.
442  void *const nobuf = nullptr;
443  Int_t total = 0;
444  std::vector<IOPosBuffer> iov;
445  iov.reserve(nbuf);
446  for (Int_t i = 0; i < nbuf; ++i) {
447  iov.emplace_back(pos[i], nobuf, len[i]);
448  total += len[i];
449  }
450 
451  // Null buffer means asynchronous reads into I/O system's cache.
452  bool success;
453  StorageAccount::Stamp astats(storageCounter(s_statsARead, StorageAccount::Operation::readAsync));
454  // Synchronise low-level cache with the supposed cache in TFile.
455  // storage_->caching(true, -1, 0);
456  success = storage_->prefetch(iov.data(), nbuf);
457  astats.tick(total);
458 
459  // If it didn't suceeed, pass down to the base class.
460  if (not success) {
461  if (TFile::ReadBuffers(buf, pos, len, nbuf)) {
462  Error("ReadBuffers", "call to TFile::ReadBuffers failed after prefetch already failed.");
463  return kTRUE;
464  }
465  }
466  return kFALSE;
467 }
468 
469 Bool_t TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len) {
470  // Check that it's valid to access this file.
471  if (IsZombie()) {
472  Error("WriteBuffer", "Cannot write to a zombie file");
473  return kTRUE;
474  }
475 
476  if (!IsOpen()) {
477  Error("WriteBuffer", "Cannot write to a file that is not open");
478  return kTRUE;
479  }
480 
481  if (!fWritable) {
482  Error("WriteBuffer", "File is not writable");
483  return kTRUE;
484  }
485 
487  StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, StorageAccount::Operation::writeViaCache));
488 
489  // Try first writing via a cache, and if that's not possible, directly.
490  Int_t st;
491  switch ((st = WriteBufferViaCache(buf, len))) {
492  case 0:
493  // Actual write.
494  {
495  StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, StorageAccount::Operation::writeActual));
496  IOSize n = storage_->xwrite(buf, len);
497  xstats.tick(n);
498  stats.tick(n);
499 
500  // FIXME: What if it's a short write?
501  return n > 0 ? kFALSE : kTRUE;
502  }
503 
504  case 1:
505  cstats.tick(len);
506  stats.tick(len);
507  return kFALSE;
508 
509  case 2:
510  default:
511  Error("WriteBuffer", "Error writing to cache");
512  return kTRUE;
513  }
514 }
515 
519 // FIXME: Override GetBytesToPrefetch() so XROOTD can suggest how
520 // large a prefetch cache to use.
521 // FIXME: Asynchronous open support?
522 
526 Int_t TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t /* mode */) {
527  StorageAccount::Stamp stats(storageCounter(s_statsOpen, StorageAccount::Operation::open));
528 
529  if (storage_) {
530  storage_->close();
531  }
532 
533  int openFlags = IOFlags::OpenRead;
534  if (flags & O_WRONLY)
535  openFlags = IOFlags::OpenWrite;
536  else if (flags & O_RDWR)
537  openFlags |= IOFlags::OpenWrite;
538  if (flags & O_CREAT)
539  openFlags |= IOFlags::OpenCreate;
540  if (flags & O_APPEND)
541  openFlags |= IOFlags::OpenAppend;
542  if (flags & O_EXCL)
543  openFlags |= IOFlags::OpenExclusive;
544  if (flags & O_TRUNC)
545  openFlags |= IOFlags::OpenTruncate;
546  if (flags & O_NONBLOCK)
547  openFlags |= IOFlags::OpenNonBlock;
548 
549  if (!(storage_ = StorageFactory::get()->open(pathname, openFlags))) {
550  MakeZombie();
551  gDirectory = gROOT;
552  throw cms::Exception("TStorageFactoryFile::SysOpen()") << "Cannot open file '" << pathname << "'";
553  }
554 
555  stats.tick();
556  return 0;
557 }
558 
559 Int_t TStorageFactoryFile::SysClose(Int_t /* fd */) {
560  StorageAccount::Stamp stats(storageCounter(s_statsClose, StorageAccount::Operation::close));
561 
562  if (storage_) {
563  storage_->close();
564  releaseStorage();
565  }
566 
567  stats.tick();
568  return 0;
569 }
570 
571 Long64_t TStorageFactoryFile::SysSeek(Int_t /* fd */, Long64_t offset, Int_t whence) {
572  StorageAccount::Stamp stats(storageCounter(s_statsSeek, StorageAccount::Operation::seek));
573  Storage::Relative rel = (whence == SEEK_SET ? Storage::SET : whence == SEEK_CUR ? Storage::CURRENT : Storage::END);
574 
575  offset = storage_->position(offset, rel);
576  stats.tick();
577  return offset;
578 }
579 
580 Int_t TStorageFactoryFile::SysSync(Int_t /* fd */) {
581  StorageAccount::Stamp stats(storageCounter(s_statsFlush, StorageAccount::Operation::flush));
582  storage_->flush();
583  stats.tick();
584  return 0;
585 }
586 
587 Int_t TStorageFactoryFile::SysStat(Int_t /* fd */, Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime) {
589  // FIXME: Most of this is unsupported or makes no sense with Storage
590  *id = ::Hash(fRealName);
591  *size = storage_->size();
592  *flags = 0;
593  *modtime = 0;
594  stats.tick();
595  return 0;
596 }
597 
598 void TStorageFactoryFile::ResetErrno(void) const { TSystem::ResetErrno(); }
Code categoryCode() const
Definition: EDMException.h:99
edm::ErrorSummaryEntry Error
static StorageAccount::Counter * s_statsWrite
static StorageAccount::Counter * s_statsStat
const edm::EventSetup & c
static StorageClassToken tokenForStorageClassName(std::string const &iName)
void setSize(const std::string &urlOrLfn, size_t size)
Long64_t SysSeek(Int_t fd, Long64_t offset, Int_t whence) override
static StorageAccount::Counter * s_statsXWrite
static StorageAccount::Counter * s_statsClose
Int_t SysClose(Int_t fd) override
static Counter & counter(StorageClassToken token, Operation operation)
Bool_t ReadBufferAsync(Long64_t off, Int_t len) override
assert(be >=bs)
Bool_t ReadBuffer(char *buf, Int_t len) override
CacheHint cacheHint(void) const
tuple result
Definition: mps_fire.py:311
edm::propagate_const< std::unique_ptr< edm::storage::Storage > > storage_
char const * label
IOSize realBytesProcessed() const
Definition: ReadRepacker.h:54
static StorageAccount::Counter * s_statsARead
static StorageAccount::Counter & storageCounter(StorageAccount::Counter *&c, StorageAccount::Operation operation)
constexpr int PREFETCH_PROBE_LENGTH
Definition: Storage.h:18
static StorageAccount::Counter * s_statsCtor
ClassImp(AliDaqEventHeader)
bool isAvailable() const
Definition: Service.h:40
XrdSiteStatisticsInformation * statsService
Definition: XrdSource.cc:215
std::string Hash
Definition: Types.h:43
int pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
Definition: ReadRepacker.cc:21
void tick(uint64_t amount=0, int64_t tick=0) const
void unpack(char *buf)
static StorageAccount::Counter * s_statsRead
static StorageAccount::Counter * s_statsXRead
static StorageAccount::Counter * s_statsFlush
Int_t SysOpen(const char *pathname, Int_t flags, UInt_t mode) override
size_t IOSize
Definition: IOTypes.h:15
static StorageAccount::Counter * s_statsCWrite
IOSize bufferUsed() const
Definition: ReadRepacker.h:49
std::vector< IOPosBuffer > & iov()
Definition: ReadRepacker.h:47
void setException(std::exception_ptr e)
void ResetErrno(void) const override
#define update(a, b)
list entry
Definition: mps_splice.py:68
~TStorageFactoryFile(void) override
#define O_NONBLOCK
Definition: SysFile.h:23
static StorageAccount::Counter * s_statsSeek
Int_t SysSync(Int_t fd) override
#define get
static StorageAccount::Counter * s_statsOpen
Bool_t WriteBuffer(const char *buf, Int_t len) override
std::unique_ptr< GeometricDet > construct(DDCompactView const &cpv, std::vector< int > const &detidShifts)
Bool_t ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) override
Bool_t ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
void Initialize(const char *name, Option_t *option="")
tuple size
Write out results.
tuple dump
OutputFilePath = cms.string(&#39;/tmp/zhokin/&#39;), OutputFileExt = cms.string(&#39;&#39;),.
static StorageAccount::Counter * s_statsCPrefetch
Int_t SysStat(Int_t fd, Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime) override
static StorageAccount::Counter * s_statsCRead