CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_7/src/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc

Go to the documentation of this file.
00001 #include "IOPool/TFileAdaptor/interface/TStorageFactoryFile.h"
00002 #include "Utilities/StorageFactory/interface/Storage.h"
00003 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00004 #include "Utilities/StorageFactory/interface/StorageAccount.h"
00005 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
00006 #include "FWCore/ServiceRegistry/interface/Service.h"
00007 #include "FWCore/Utilities/interface/EDMException.h"
00008 #include "ReadRepacker.h"
00009 #include "TFileCacheRead.h"
00010 #include "TSystem.h"
00011 #include "TROOT.h"
00012 #include "TEnv.h"
00013 #include <errno.h>
00014 #include <sys/stat.h>
00015 #include <unistd.h>
00016 #include <fcntl.h>
00017 #include <iostream>
00018 #include <cassert>
00019 
00020 #if 0
00021 #include "TTreeCache.h"
00022 #include "TTree.h"
00023 
00024 class TTreeCacheDebug : public TTreeCache {
00025 public:
00026   void dump(const char *label, const char *trailer)
00027   {
00028     Long64_t entry = fOwner->GetReadEntry();
00029     std::cerr
00030       << label << ": " << entry << " "
00031       << "{ fEntryMin=" << fEntryMin
00032       << ", fEntryMax=" << fEntryMax
00033       << ", fEntryNext=" << fEntryNext
00034       << ", fZipBytes=" << fZipBytes
00035       << ", fNbranches=" << fNbranches
00036       << ", fNReadOk=" << fNReadOk
00037       << ", fNReadMiss=" << fNReadMiss
00038       << ", fNReadPref=" << fNReadPref
00039       << ", fBranches=" << fBranches
00040       << ", fBrNames=" << fBrNames
00041       << ", fOwner=" << fOwner
00042       << ", fTree=" << fTree
00043       << ", fIsLearning=" << fIsLearning
00044       << ", fIsManual=" << fIsManual
00045       << "; fBufferSizeMin=" << fBufferSizeMin
00046       << ", fBufferSize=" << fBufferSize
00047       << ", fBufferLen=" << fBufferLen
00048       << ", fBytesToPrefetch=" << fBytesToPrefetch
00049       << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
00050       << ", fAsyncReading=" << fAsyncReading
00051       << ", fNseek=" << fNseek
00052       << ", fNtot=" << fNtot
00053       << ", fNb=" << fNb
00054       << ", fSeekSize=" << fSeekSize
00055       << ", fSeek=" << fSeek
00056       << ", fSeekIndex=" << fSeekIndex
00057       << ", fSeekSort=" << fSeekSort
00058       << ", fPos=" << fPos
00059       << ", fSeekLen=" << fSeekLen
00060       << ", fSeekSortLen=" << fSeekSortLen
00061       << ", fSeekPos=" << fSeekPos
00062       << ", fLen=" << fLen
00063       << ", fFile=" << fFile
00064       << ", fBuffer=" << (void *) fBuffer
00065       << ", fIsSorted=" << fIsSorted
00066       << " }\n" << trailer;
00067   }
00068 };
00069 #endif
00070 
00071 ClassImp(TStorageFactoryFile)
00072 static StorageAccount::Counter *s_statsCtor = 0;
00073 static StorageAccount::Counter *s_statsOpen = 0;
00074 static StorageAccount::Counter *s_statsClose = 0;
00075 static StorageAccount::Counter *s_statsFlush = 0;
00076 static StorageAccount::Counter *s_statsStat = 0;
00077 static StorageAccount::Counter *s_statsSeek = 0;
00078 static StorageAccount::Counter *s_statsRead = 0;
00079 static StorageAccount::Counter *s_statsCRead = 0;
00080 static StorageAccount::Counter *s_statsCPrefetch = 0;
00081 static StorageAccount::Counter *s_statsARead = 0;
00082 static StorageAccount::Counter *s_statsXRead = 0;
00083 static StorageAccount::Counter *s_statsWrite = 0;
00084 static StorageAccount::Counter *s_statsCWrite = 0;
00085 static StorageAccount::Counter *s_statsXWrite = 0;
00086 
00087 static inline StorageAccount::Counter &
00088 storageCounter(StorageAccount::Counter *&c, const char *label)
00089 {
00090   if (! c) c = &StorageAccount::counter("tstoragefile", label);
00091   return *c;
00092 }
00093 
00094 TStorageFactoryFile::TStorageFactoryFile(void)
00095   : storage_(0)
00096 {
00097   StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
00098   stats.tick(0);
00099 }
00100 
00101 // This constructor must be compatible with *all* the various built-in TFile plugins,
00102 // including TXNetFile.  This is why some arguments in the constructor is ignored.
00103 // If there's a future T*File that is incompatible with this constructor, a new
00104 // constructor will have to be added.
00105 TStorageFactoryFile::TStorageFactoryFile(const char *path,
00106                                          Option_t *option,
00107                                          const char *ftitle,
00108                                          Int_t compress,
00109                                          Int_t netopt,
00110                                          Bool_t parallelopen /* = kFALSE */)
00111   : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
00112     storage_(0)
00113 {
00114   Initialize(path, option);
00115 }
00116 
00117 TStorageFactoryFile::TStorageFactoryFile(const char *path,
00118                                          Option_t *option /* = "" */,
00119                                          const char *ftitle /* = "" */,
00120                                          Int_t compress /* = 1 */)
00121   : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
00122     storage_(0)
00123 {
00124   Initialize(path, option);
00125 }
00126 
00127 void
00128 TStorageFactoryFile::Initialize(const char *path,
00129                                 Option_t *option /* = "" */)
00130 {
00131   StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
00132 
00133   // Enable AsyncReading.
00134   // This was the default for 5.27, but turned off by default for 5.32.
00135   // In our testing, AsyncReading is the fastest mechanism available.
00136   // In 5.32, the AsyncPrefetching mechanism is preferred, but has been a
00137   // performance hit in our "average case" tests.
00138   gEnv->SetValue("TFile.AsyncReading", 1);
00139 
00140   // Parse options; at the moment we only accept read!
00141   fOption = option;
00142   fOption.ToUpper();
00143 
00144   if (fOption == "NEW")
00145     fOption = "CREATE";
00146 
00147   Bool_t create   = (fOption == "CREATE");
00148   Bool_t recreate = (fOption == "RECREATE");
00149   Bool_t update   = (fOption == "UPDATE");
00150   Bool_t read     = (fOption == "READ");
00151 
00152   if (!create && !recreate && !update && !read)
00153   {
00154     read = true;
00155     fOption = "READ";
00156   }
00157 
00158   if (recreate)
00159   {
00160     if (!gSystem->AccessPathName(path, kFileExists))
00161       gSystem->Unlink(path);
00162 
00163     recreate = false;
00164     create   = true;
00165     fOption  = "CREATE";
00166   }
00167   assert(!recreate);
00168 
00169   if (update && gSystem->AccessPathName(path, kFileExists))
00170   {
00171     update = kFALSE;
00172     create = kTRUE;
00173   }
00174 
00175   assert(read || update || create);
00176 
00177   int           openFlags = IOFlags::OpenRead;
00178   if (!read)    openFlags |= IOFlags::OpenWrite;
00179   if (create)   openFlags |= IOFlags::OpenCreate;
00180   //if (recreate) openFlags |= IOFlags::OpenCreate | IOFlags::OpenTruncate;
00181 
00182   // Open storage
00183   if (! (storage_ = StorageFactory::get()->open(path, openFlags)))
00184   {
00185      MakeZombie();
00186      gDirectory = gROOT;
00187      throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()")
00188        << "Cannot open file '" << path << "'";
00189   }
00190 
00191   // Record the statistics.
00192   try {
00193     edm::Service<edm::storage::StatisticsSenderService> statsService;
00194     if (statsService.isAvailable()) {
00195       statsService->setSize(storage_->size());
00196     }
00197   } catch (edm::Exception e) {
00198     if (e.categoryCode() != edm::errors::NotFound) {
00199       throw;
00200     }
00201   }
00202 
00203   fRealName = path;
00204   fD = 0; // sorry, meaningless
00205   fWritable = read ? kFALSE : kTRUE;
00206 
00207   Init(create);
00208 
00209   stats.tick(0);
00210 }
00211 
00212 TStorageFactoryFile::~TStorageFactoryFile(void)
00213 {
00214   Close();
00215   delete storage_;
00216 }
00217 
00221 
00222 Bool_t
00223 TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len)
00224 {
00225   // This function needs to be optimized to minimize seeks.
00226   // See TFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) in ROOT 5.27.06.
00227   Seek(pos);
00228   return ReadBuffer(buf, len);
00229 }
00230 
00231 Bool_t
00232 TStorageFactoryFile::ReadBuffer(char *buf, Int_t len)
00233 {
00234   // Check that it's valid to access this file.
00235   if (IsZombie())
00236   {
00237     Error("ReadBuffer", "Cannot read from a zombie file");
00238     return kTRUE;
00239   }
00240 
00241   if (! IsOpen())
00242   {
00243     Error("ReadBuffer", "Cannot read from a file that is not open");
00244     return kTRUE;
00245   }
00246 
00247   // Read specified byte range from the storage.  Returns kTRUE in
00248   // case of error.  Note that ROOT uses this function recursively
00249   // to fill the cache; we use a flag to make sure our accounting
00250   // is reflected in a comprehensible manner.  The "read" counter
00251   // will include both, "readc" indicates how much read from the
00252   // cache, "readu" indicates how much we failed to read from the
00253   // cache (excluding those recursive reads), and "readx" counts
00254   // the amount actually passed to read from the storage object.
00255   StorageAccount::Stamp stats(storageCounter(s_statsRead, "read"));
00256 
00257   // If we have a cache, read from there first.  This returns 0
00258   // if the block hasn't been prefetched, 1 if it was in cache,
00259   // and 2 if there was an error.
00260   if (TFileCacheRead *c = GetCacheRead())
00261   {
00262     Long64_t here = GetRelOffset();
00263     Bool_t   async = c->IsAsyncReading();
00264 
00265     StorageAccount::Stamp cstats(async
00266                                  ? storageCounter(s_statsCPrefetch, "readPrefetchToCache")
00267                                  : storageCounter(s_statsCRead, "readViaCache"));
00268 
00269     Int_t st = ReadBufferViaCache(async ? 0 : buf, len);
00270 
00271     if (st == 2) {
00272       return kTRUE;
00273     }
00274 
00275     if (st == 1) {
00276       if (async) {
00277         cstats.tick(len);
00278         Seek(here);
00279       } else {
00280         cstats.tick(len);
00281         stats.tick(len);
00282         return kFALSE;
00283       }
00284     }
00285   }
00286 
00287   // FIXME: Re-enable read-ahead if the data wasn't in cache.
00288   // if (! st) storage_->caching(true, -1, s_readahead);
00289 
00290   // A real read
00291   StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00292   IOSize n = storage_->xread(buf, len);
00293   xstats.tick(n);
00294   stats.tick(n);
00295   return n ? kFALSE : kTRUE;
00296 }
00297 
00298 Bool_t
00299 TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len)
00300 {
00301   // Check that it's valid to access this file.
00302   if (IsZombie())
00303   {
00304     Error("ReadBufferAsync", "Cannot read from a zombie file");
00305     return kTRUE;
00306   }
00307 
00308   if (! IsOpen())
00309   {
00310     Error("ReadBufferAsync", "Cannot read from a file that is not open");
00311     return kTRUE;
00312   }
00313 
00314   StorageAccount::Stamp stats(storageCounter(s_statsARead, "readAsync"));
00315 
00316   // If asynchronous reading is disabled, bail out now, regardless
00317   // whether the underlying storage supports prefetching.  If it is
00318   // forced on, pretend it's on, even if the storage doesn't support
00319   // it, as this turns off the caching in ROOT's side.
00320   StorageFactory *f = StorageFactory::get();
00321 
00322   // Verify that we never using async reads in app-only mode
00323   if (f->cacheHint() == StorageFactory::CACHE_HINT_APPLICATION)
00324     return kTRUE;
00325 
00326   // Let the I/O method indicate if it can do client-side prefetch.
00327   // If it does, then for example TTreeCache will drop its own cache
00328   // and will use the client-side cache of the actual I/O layer.
00329   // If len is zero ROOT is probing for prefetch support.
00330   if (len) {
00331     // FIXME: Synchronise caching.
00332     // storage_->caching(true, -1, 0);
00333     ;
00334   }
00335 
00336   IOPosBuffer iov(off, (void *) 0, len ? len : PREFETCH_PROBE_LENGTH);
00337   if (storage_->prefetch(&iov, 1))
00338   {
00339     stats.tick(len);
00340     return kFALSE;
00341   }
00342 
00343   // Always ask ROOT to use async reads in storage-only mode,
00344   // regardless of whether the storage system supports it.
00345   if (f->cacheHint() == StorageFactory::CACHE_HINT_STORAGE)
00346     return kFALSE;
00347 
00348   // Prefetching not available right now.
00349   return kTRUE;
00350 }
00351 
00352 Bool_t
00353 TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
00354 {
00375   Int_t remaining = nbuf; // Number of read requests left to process.
00376   Int_t pack_count; // Number of read requests processed by this iteration.
00377     
00378   IOSize remaining_buffer_size=0;
00379   // Calculate the remaining buffer size for the ROOT-owned buffer by adding
00380   // the size of the various requests.
00381   for (Int_t i=0; i<nbuf; i++) remaining_buffer_size+=len[i];
00382 
00383   char     *current_buffer = buf;
00384   Long64_t *current_pos    = pos;
00385   Int_t    *current_len    = len;
00386 
00387   ReadRepacker repacker;
00388 
00389   while (remaining > 0) {
00390 
00391     pack_count = repacker.pack(static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
00392 
00393     int real_bytes_processed = repacker.realBytesProcessed();
00394     IOSize io_buffer_used = repacker.bufferUsed();
00395 
00396     // Issue readv, then unpack buffers.
00397     StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00398     std::vector<IOPosBuffer> &iov = repacker.iov();
00399     IOSize result = storage_->readv(&iov[0], iov.size());
00400     if (result != io_buffer_used) {
00401       return kTRUE;
00402     }
00403     xstats.tick(io_buffer_used);
00404     repacker.unpack(current_buffer);
00405 
00406     // Update the location of the unused part of the input buffer.
00407     remaining_buffer_size -= real_bytes_processed;
00408     current_buffer += real_bytes_processed;
00409 
00410     current_pos += pack_count;
00411     current_len += pack_count;
00412     remaining   -= pack_count; 
00413 
00414   }
00415   assert(remaining_buffer_size == 0);
00416   return kFALSE;
00417 }
00418 
00419 Bool_t
00420 TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
00421 {
00422   // Check that it's valid to access this file.
00423   if (IsZombie())
00424   {
00425     Error("ReadBuffers", "Cannot read from a zombie file");
00426     return kTRUE;
00427   }
00428 
00429   if (! IsOpen())
00430   {
00431     Error("ReadBuffers", "Cannot read from a file that is not open");
00432     return kTRUE;
00433   }
00434 
00435   // For synchronous reads, we have special logic to optimize the I/O requests
00436   // from ROOT before handing it to the storage.
00437   if (buf)
00438   {
00439     return ReadBuffersSync(buf, pos, len, nbuf);
00440   }
00441   // For an async read, we assume the storage system is smart enough to do the
00442   // optimization itself.
00443 
00444   // Read from underlying storage.
00445   void* const nobuf = 0;
00446   Int_t total = 0;
00447   std::vector<IOPosBuffer> iov;
00448   iov.reserve(nbuf);
00449   for (Int_t i = 0; i < nbuf; ++i)
00450   {
00451     iov.push_back(IOPosBuffer(pos[i], nobuf, len[i]));
00452     total += len[i];
00453   }
00454 
00455   // Null buffer means asynchronous reads into I/O system's cache.
00456   bool success;
00457   StorageAccount::Stamp astats(storageCounter(s_statsARead, "readAsync"));
00458   // Synchronise low-level cache with the supposed cache in TFile.
00459   // storage_->caching(true, -1, 0);
00460   success = storage_->prefetch(&iov[0], nbuf);
00461   astats.tick(total);
00462 
00463   // If it didn't suceeed, pass down to the base class.
00464   return success ? kFALSE : TFile::ReadBuffers(buf, pos, len, nbuf);
00465 }
00466 
00467 Bool_t
00468 TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len)
00469 {
00470   // Check that it's valid to access this file.
00471   if (IsZombie())
00472   {
00473     Error("WriteBuffer", "Cannot write to a zombie file");
00474     return kTRUE;
00475   }
00476 
00477   if (! IsOpen())
00478   {
00479     Error("WriteBuffer", "Cannot write to a file that is not open");
00480     return kTRUE;
00481   }
00482 
00483   if (! fWritable)
00484   {
00485     Error("WriteBuffer", "File is not writable");
00486     return kTRUE;
00487   }
00488 
00489   StorageAccount::Stamp stats(storageCounter(s_statsWrite, "write"));
00490   StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, "writeViaCache"));
00491 
00492   // Try first writing via a cache, and if that's not possible, directly.
00493   Int_t st;
00494   switch ((st = WriteBufferViaCache(buf, len)))
00495   {
00496   case 0:
00497     // Actual write.
00498     {
00499       StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, "writeActual"));
00500       IOSize n = storage_->xwrite(buf, len);
00501       xstats.tick(n);
00502       stats.tick(n);
00503 
00504       // FIXME: What if it's a short write?
00505       return n > 0 ? kFALSE : kTRUE;
00506     }
00507 
00508   case 1:
00509     cstats.tick(len);
00510     stats.tick(len);
00511     return kFALSE;
00512 
00513   case 2:
00514   default:
00515     Error("WriteBuffer", "Error writing to cache");
00516     return kTRUE;
00517   }
00518 }
00519 
00523 // FIXME: Override GetBytesToPrefetch() so XROOTD can suggest how
00524 // large a prefetch cache to use.
00525 // FIXME: Asynchronous open support?
00526 
00530 Int_t
00531 TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t /* mode */)
00532 {
00533   StorageAccount::Stamp stats(storageCounter(s_statsOpen, "open"));
00534 
00535   if (storage_)
00536   {
00537     storage_->close();
00538     delete storage_;
00539     storage_ = 0;
00540   }
00541 
00542   int                      openFlags = IOFlags::OpenRead;
00543   if (flags & O_WRONLY)    openFlags = IOFlags::OpenWrite;
00544   else if (flags & O_RDWR) openFlags |= IOFlags::OpenWrite;
00545   if (flags & O_CREAT)     openFlags |= IOFlags::OpenCreate;
00546   if (flags & O_APPEND)    openFlags |= IOFlags::OpenAppend;
00547   if (flags & O_EXCL)      openFlags |= IOFlags::OpenExclusive;
00548   if (flags & O_TRUNC)     openFlags |= IOFlags::OpenTruncate;
00549   if (flags & O_NONBLOCK)  openFlags |= IOFlags::OpenNonBlock;
00550 
00551   if (! (storage_ = StorageFactory::get()->open(pathname, openFlags)))
00552   {
00553      MakeZombie();
00554      gDirectory = gROOT;
00555      throw cms::Exception("TStorageFactoryFile::SysOpen()")
00556        << "Cannot open file '" << pathname << "'";
00557   }
00558 
00559   stats.tick();
00560   return 0;
00561 }
00562 
00563 Int_t
00564 TStorageFactoryFile::SysClose(Int_t /* fd */)
00565 {
00566   StorageAccount::Stamp stats(storageCounter(s_statsClose, "close"));
00567 
00568   if (storage_)
00569   {
00570     storage_->close();
00571     delete storage_;
00572     storage_ = 0;
00573   }
00574 
00575   stats.tick();
00576   return 0;
00577 }
00578 
00579 Long64_t
00580 TStorageFactoryFile::SysSeek(Int_t /* fd */, Long64_t offset, Int_t whence)
00581 {
00582   StorageAccount::Stamp stats(storageCounter(s_statsSeek, "seek"));
00583   Storage::Relative rel = (whence == SEEK_SET ? Storage::SET
00584                                : whence == SEEK_CUR ? Storage::CURRENT
00585                                : Storage::END);
00586 
00587   offset = storage_->position(offset, rel);
00588   stats.tick();
00589   return offset;
00590 }
00591 
00592 Int_t
00593 TStorageFactoryFile::SysSync(Int_t /* fd */)
00594 {
00595   StorageAccount::Stamp stats(storageCounter(s_statsFlush, "flush"));
00596   storage_->flush();
00597   stats.tick();
00598   return 0;
00599 }
00600 
00601 Int_t
00602 TStorageFactoryFile::SysStat(Int_t /* fd */, Long_t *id, Long64_t *size,
00603                              Long_t *flags, Long_t *modtime)
00604 {
00605   StorageAccount::Stamp stats(storageCounter(s_statsStat, "stat"));
00606   // FIXME: Most of this is unsupported or makes no sense with Storage
00607   *id = ::Hash(fRealName);
00608   *size = storage_->size();
00609   *flags = 0;
00610   *modtime = 0;
00611   stats.tick();
00612   return 0;
00613 }
00614 
00615 void
00616 TStorageFactoryFile::ResetErrno(void) const
00617 {
00618   TSystem::ResetErrno();
00619 }