CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_6/src/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc

Go to the documentation of this file.
00001 #include "IOPool/TFileAdaptor/interface/TStorageFactoryFile.h"
00002 #include "Utilities/StorageFactory/interface/Storage.h"
00003 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00004 #include "Utilities/StorageFactory/interface/StorageAccount.h"
00005 #include "FWCore/Utilities/interface/EDMException.h"
00006 #include "ReadRepacker.h"
00007 #include "TFileCacheRead.h"
00008 #include "TSystem.h"
00009 #include "TROOT.h"
00010 #include "TEnv.h"
00011 #include <errno.h>
00012 #include <sys/stat.h>
00013 #include <unistd.h>
00014 #include <fcntl.h>
00015 #include <iostream>
00016 
00017 #if 0
00018 #include "TTreeCache.h"
00019 #include "TTree.h"
00020 
00021 class TTreeCacheDebug : public TTreeCache {
00022 public:
00023   void dump(const char *label, const char *trailer)
00024   {
00025     Long64_t entry = fOwner->GetReadEntry();
00026     std::cerr
00027       << label << ": " << entry << " "
00028       << "{ fEntryMin=" << fEntryMin
00029       << ", fEntryMax=" << fEntryMax
00030       << ", fEntryNext=" << fEntryNext
00031       << ", fZipBytes=" << fZipBytes
00032       << ", fNbranches=" << fNbranches
00033       << ", fNReadOk=" << fNReadOk
00034       << ", fNReadMiss=" << fNReadMiss
00035       << ", fNReadPref=" << fNReadPref
00036       << ", fBranches=" << fBranches
00037       << ", fBrNames=" << fBrNames
00038       << ", fOwner=" << fOwner
00039       << ", fTree=" << fTree
00040       << ", fIsLearning=" << fIsLearning
00041       << ", fIsManual=" << fIsManual
00042       << "; fBufferSizeMin=" << fBufferSizeMin
00043       << ", fBufferSize=" << fBufferSize
00044       << ", fBufferLen=" << fBufferLen
00045       << ", fBytesToPrefetch=" << fBytesToPrefetch
00046       << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
00047       << ", fAsyncReading=" << fAsyncReading
00048       << ", fNseek=" << fNseek
00049       << ", fNtot=" << fNtot
00050       << ", fNb=" << fNb
00051       << ", fSeekSize=" << fSeekSize
00052       << ", fSeek=" << fSeek
00053       << ", fSeekIndex=" << fSeekIndex
00054       << ", fSeekSort=" << fSeekSort
00055       << ", fPos=" << fPos
00056       << ", fSeekLen=" << fSeekLen
00057       << ", fSeekSortLen=" << fSeekSortLen
00058       << ", fSeekPos=" << fSeekPos
00059       << ", fLen=" << fLen
00060       << ", fFile=" << fFile
00061       << ", fBuffer=" << (void *) fBuffer
00062       << ", fIsSorted=" << fIsSorted
00063       << " }\n" << trailer;
00064   }
00065 };
00066 #endif
00067 
00068 ClassImp(TStorageFactoryFile)
00069 static StorageAccount::Counter *s_statsCtor = 0;
00070 static StorageAccount::Counter *s_statsOpen = 0;
00071 static StorageAccount::Counter *s_statsClose = 0;
00072 static StorageAccount::Counter *s_statsFlush = 0;
00073 static StorageAccount::Counter *s_statsStat = 0;
00074 static StorageAccount::Counter *s_statsSeek = 0;
00075 static StorageAccount::Counter *s_statsRead = 0;
00076 static StorageAccount::Counter *s_statsCRead = 0;
00077 static StorageAccount::Counter *s_statsCPrefetch = 0;
00078 static StorageAccount::Counter *s_statsARead = 0;
00079 static StorageAccount::Counter *s_statsXRead = 0;
00080 static StorageAccount::Counter *s_statsWrite = 0;
00081 static StorageAccount::Counter *s_statsCWrite = 0;
00082 static StorageAccount::Counter *s_statsXWrite = 0;
00083 
00084 static inline StorageAccount::Counter &
00085 storageCounter(StorageAccount::Counter *&c, const char *label)
00086 {
00087   if (! c) c = &StorageAccount::counter("tstoragefile", label);
00088   return *c;
00089 }
00090 
00091 TStorageFactoryFile::TStorageFactoryFile(void)
00092   : storage_(0)
00093 {
00094   StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
00095   stats.tick(0);
00096 }
00097 
00098 // This constructor must be compatible with *all* the various built-in TFile plugins,
00099 // including TXNetFile.  This is why some arguments in the constructor is ignored.
00100 // If there's a future T*File that is incompatible with this constructor, a new
00101 // constructor will have to be added.
00102 TStorageFactoryFile::TStorageFactoryFile(const char *path,
00103                                          Option_t *option,
00104                                          const char *ftitle,
00105                                          Int_t compress,
00106                                          Int_t netopt,
00107                                          Bool_t parallelopen /* = kFALSE */)
00108   : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
00109     storage_(0)
00110 {
00111   Initialize(path, option);
00112 }
00113 
00114 TStorageFactoryFile::TStorageFactoryFile(const char *path,
00115                                          Option_t *option /* = "" */,
00116                                          const char *ftitle /* = "" */,
00117                                          Int_t compress /* = 1 */)
00118   : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
00119     storage_(0)
00120 {
00121   Initialize(path, option);
00122 }
00123 
00124 void
00125 TStorageFactoryFile::Initialize(const char *path,
00126                                 Option_t *option /* = "" */)
00127 {
00128   StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
00129 
00130   // Enable AsyncReading.
00131   // This was the default for 5.27, but turned off by default for 5.32.
00132   // In our testing, AsyncReading is the fastest mechanism available.
00133   // In 5.32, the AsyncPrefetching mechanism is preferred, but has been a
00134   // performance hit in our "average case" tests.
00135   gEnv->SetValue("TFile.AsyncReading", 1);
00136 
00137   // Parse options; at the moment we only accept read!
00138   fOption = option;
00139   fOption.ToUpper();
00140 
00141   if (fOption == "NEW")
00142     fOption = "CREATE";
00143 
00144   Bool_t create   = (fOption == "CREATE");
00145   Bool_t recreate = (fOption == "RECREATE");
00146   Bool_t update   = (fOption == "UPDATE");
00147   Bool_t read     = (fOption == "READ");
00148 
00149   if (!create && !recreate && !update && !read)
00150   {
00151     read = true;
00152     fOption = "READ";
00153   }
00154 
00155   if (recreate)
00156   {
00157     if (!gSystem->AccessPathName(path, kFileExists))
00158       gSystem->Unlink(path);
00159 
00160     recreate = false;
00161     create   = true;
00162     fOption  = "CREATE";
00163   }
00164   assert(!recreate);
00165 
00166   if (update && gSystem->AccessPathName(path, kFileExists))
00167   {
00168     update = kFALSE;
00169     create = kTRUE;
00170   }
00171 
00172   int           openFlags = IOFlags::OpenRead;
00173   if (!read)    openFlags |= IOFlags::OpenWrite;
00174   if (create)   openFlags |= IOFlags::OpenCreate;
00175   //if (recreate) openFlags |= IOFlags::OpenCreate | IOFlags::OpenTruncate;
00176 
00177   // Open storage
00178   if (! (storage_ = StorageFactory::get()->open(path, openFlags)))
00179   {
00180      MakeZombie();
00181      gDirectory = gROOT;
00182      throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()")
00183        << "Cannot open file '" << path << "'";
00184   }
00185 
00186   fRealName = path;
00187   fD = 0; // sorry, meaningless
00188   fWritable = read ? kFALSE : kTRUE;
00189 
00190   Init(create);
00191 
00192   stats.tick(0);
00193 }
00194 
00195 TStorageFactoryFile::~TStorageFactoryFile(void)
00196 {
00197   Close();
00198   delete storage_;
00199 }
00200 
00204 
00205 Bool_t
00206 TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len)
00207 {
00208   // This function needs to be optimized to minimize seeks.
00209   // See TFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) in ROOT 5.27.06.
00210   Seek(pos);
00211   return ReadBuffer(buf, len);
00212 }
00213 
00214 Bool_t
00215 TStorageFactoryFile::ReadBuffer(char *buf, Int_t len)
00216 {
00217   // Check that it's valid to access this file.
00218   if (IsZombie())
00219   {
00220     Error("ReadBuffer", "Cannot read from a zombie file");
00221     return kTRUE;
00222   }
00223 
00224   if (! IsOpen())
00225   {
00226     Error("ReadBuffer", "Cannot read from a file that is not open");
00227     return kTRUE;
00228   }
00229 
00230   // Read specified byte range from the storage.  Returns kTRUE in
00231   // case of error.  Note that ROOT uses this function recursively
00232   // to fill the cache; we use a flag to make sure our accounting
00233   // is reflected in a comprehensible manner.  The "read" counter
00234   // will include both, "readc" indicates how much read from the
00235   // cache, "readu" indicates how much we failed to read from the
00236   // cache (excluding those recursive reads), and "readx" counts
00237   // the amount actually passed to read from the storage object.
00238   StorageAccount::Stamp stats(storageCounter(s_statsRead, "read"));
00239 
00240   // If we have a cache, read from there first.  This returns 0
00241   // if the block hasn't been prefetched, 1 if it was in cache,
00242   // and 2 if there was an error.
00243   if (TFileCacheRead *c = GetCacheRead())
00244   {
00245     Long64_t here = GetRelOffset();
00246     Bool_t   async = c->IsAsyncReading();
00247 
00248     StorageAccount::Stamp cstats(async
00249                                  ? storageCounter(s_statsCPrefetch, "readPrefetchToCache")
00250                                  : storageCounter(s_statsCRead, "readViaCache"));
00251 
00252     Int_t st = ReadBufferViaCache(async ? 0 : buf, len);
00253 
00254     if (st == 2) {
00255       return kTRUE;
00256     }
00257 
00258     if (st == 1) {
00259       if (async) {
00260         cstats.tick(len);
00261         Seek(here);
00262       } else {
00263         cstats.tick(len);
00264         stats.tick(len);
00265         return kFALSE;
00266       }
00267     }
00268   }
00269 
00270   // FIXME: Re-enable read-ahead if the data wasn't in cache.
00271   // if (! st) storage_->caching(true, -1, s_readahead);
00272 
00273   // A real read
00274   StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00275   IOSize n = storage_->xread(buf, len);
00276   xstats.tick(n);
00277   stats.tick(n);
00278   return n ? kFALSE : kTRUE;
00279 }
00280 
00281 Bool_t
00282 TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len)
00283 {
00284   // Check that it's valid to access this file.
00285   if (IsZombie())
00286   {
00287     Error("ReadBufferAsync", "Cannot read from a zombie file");
00288     return kTRUE;
00289   }
00290 
00291   if (! IsOpen())
00292   {
00293     Error("ReadBufferAsync", "Cannot read from a file that is not open");
00294     return kTRUE;
00295   }
00296 
00297   StorageAccount::Stamp stats(storageCounter(s_statsARead, "readAsync"));
00298 
00299   // If asynchronous reading is disabled, bail out now, regardless
00300   // whether the underlying storage supports prefetching.  If it is
00301   // forced on, pretend it's on, even if the storage doesn't support
00302   // it, as this turns off the caching in ROOT's side.
00303   StorageFactory *f = StorageFactory::get();
00304 
00305   // Verify that we never using async reads in app-only mode
00306   if (f->cacheHint() == StorageFactory::CACHE_HINT_APPLICATION)
00307     return kTRUE;
00308 
00309   // Let the I/O method indicate if it can do client-side prefetch.
00310   // If it does, then for example TTreeCache will drop its own cache
00311   // and will use the client-side cache of the actual I/O layer.
00312   // If len is zero ROOT is probing for prefetch support.
00313   if (len) {
00314     // FIXME: Synchronise caching.
00315     // storage_->caching(true, -1, 0);
00316     ;
00317   }
00318 
00319   IOPosBuffer iov(off, (void *) 0, len ? len : 4096);
00320   if (storage_->prefetch(&iov, 1))
00321   {
00322     stats.tick(len);
00323     return kFALSE;
00324   }
00325 
00326   // Always ask ROOT to use async reads in storage-only mode,
00327   // regardless of whether the storage system supports it.
00328   if (f->cacheHint() == StorageFactory::CACHE_HINT_STORAGE)
00329     return kFALSE;
00330 
00331   // Prefetching not available right now.
00332   return kTRUE;
00333 }
00334 
00335 Bool_t
00336 TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
00337 {
00358   Int_t remaining = nbuf; // Number of read requests left to process.
00359   Int_t pack_count; // Number of read requests processed by this iteration.
00360     
00361   IOSize remaining_buffer_size=0;
00362   // Calculate the remaining buffer size for the ROOT-owned buffer by adding
00363   // the size of the various requests.
00364   for (Int_t i=0; i<nbuf; i++) remaining_buffer_size+=len[i];
00365 
00366   char     *current_buffer = buf;
00367   Long64_t *current_pos    = pos;
00368   Int_t    *current_len    = len;
00369 
00370   ReadRepacker repacker;
00371 
00372   while (remaining > 0) {
00373 
00374     pack_count = repacker.pack(static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
00375 
00376     int real_bytes_processed = repacker.realBytesProcessed();
00377     IOSize io_buffer_used = repacker.bufferUsed();
00378 
00379     // Issue readv, then unpack buffers.
00380     StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00381     std::vector<IOPosBuffer> &iov = repacker.iov();
00382     IOSize result = storage_->readv(&iov[0], iov.size());
00383     if (result != io_buffer_used) {
00384       return kTRUE;
00385     }
00386     xstats.tick(io_buffer_used);
00387     repacker.unpack(current_buffer);
00388 
00389     // Update the location of the unused part of the input buffer.
00390     remaining_buffer_size -= real_bytes_processed;
00391     current_buffer += real_bytes_processed;
00392 
00393     current_pos += pack_count;
00394     current_len += pack_count;
00395     remaining   -= pack_count; 
00396 
00397   }
00398   assert(remaining_buffer_size == 0);
00399   return kFALSE;
00400 }
00401 
00402 Bool_t
00403 TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
00404 {
00405   // Check that it's valid to access this file.
00406   if (IsZombie())
00407   {
00408     Error("ReadBuffers", "Cannot read from a zombie file");
00409     return kTRUE;
00410   }
00411 
00412   if (! IsOpen())
00413   {
00414     Error("ReadBuffers", "Cannot read from a file that is not open");
00415     return kTRUE;
00416   }
00417 
00418   // For synchronous reads, we have special logic to optimize the I/O requests
00419   // from ROOT before handing it to the storage.
00420   if (buf)
00421   {
00422     return ReadBuffersSync(buf, pos, len, nbuf);
00423   }
00424   // For an async read, we assume the storage system is smart enough to do the
00425   // optimization itself.
00426 
00427   // Read from underlying storage.
00428   void* const nobuf = 0;
00429   Int_t total = 0;
00430   std::vector<IOPosBuffer> iov;
00431   iov.reserve(nbuf);
00432   for (Int_t i = 0; i < nbuf; ++i)
00433   {
00434     iov.push_back(IOPosBuffer(pos[i], nobuf, len[i]));
00435     total += len[i];
00436   }
00437 
00438   // Null buffer means asynchronous reads into I/O system's cache.
00439   bool success;
00440   StorageAccount::Stamp astats(storageCounter(s_statsARead, "readAsync"));
00441   // Synchronise low-level cache with the supposed cache in TFile.
00442   // storage_->caching(true, -1, 0);
00443   success = storage_->prefetch(&iov[0], nbuf);
00444   astats.tick(total);
00445 
00446   // If it didn't suceeed, pass down to the base class.
00447   return success ? kFALSE : TFile::ReadBuffers(buf, pos, len, nbuf);
00448 }
00449 
00450 Bool_t
00451 TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len)
00452 {
00453   // Check that it's valid to access this file.
00454   if (IsZombie())
00455   {
00456     Error("WriteBuffer", "Cannot write to a zombie file");
00457     return kTRUE;
00458   }
00459 
00460   if (! IsOpen())
00461   {
00462     Error("WriteBuffer", "Cannot write to a file that is not open");
00463     return kTRUE;
00464   }
00465 
00466   if (! fWritable)
00467   {
00468     Error("WriteBuffer", "File is not writable");
00469     return kTRUE;
00470   }
00471 
00472   StorageAccount::Stamp stats(storageCounter(s_statsWrite, "write"));
00473   StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, "writeViaCache"));
00474 
00475   // Try first writing via a cache, and if that's not possible, directly.
00476   Int_t st;
00477   switch ((st = WriteBufferViaCache(buf, len)))
00478   {
00479   case 0:
00480     // Actual write.
00481     {
00482       StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, "writeActual"));
00483       IOSize n = storage_->xwrite(buf, len);
00484       xstats.tick(n);
00485       stats.tick(n);
00486 
00487       // FIXME: What if it's a short write?
00488       return n > 0 ? kFALSE : kTRUE;
00489     }
00490 
00491   case 1:
00492     cstats.tick(len);
00493     stats.tick(len);
00494     return kFALSE;
00495 
00496   case 2:
00497   default:
00498     Error("WriteBuffer", "Error writing to cache");
00499     return kTRUE;
00500   }
00501 }
00502 
00506 // FIXME: Override GetBytesToPrefetch() so XROOTD can suggest how
00507 // large a prefetch cache to use.
00508 // FIXME: Asynchronous open support?
00509 
00513 Int_t
00514 TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t /* mode */)
00515 {
00516   StorageAccount::Stamp stats(storageCounter(s_statsOpen, "open"));
00517 
00518   if (storage_)
00519   {
00520     storage_->close();
00521     delete storage_;
00522     storage_ = 0;
00523   }
00524 
00525   int                      openFlags = IOFlags::OpenRead;
00526   if (flags & O_WRONLY)    openFlags = IOFlags::OpenWrite;
00527   else if (flags & O_RDWR) openFlags |= IOFlags::OpenWrite;
00528   if (flags & O_CREAT)     openFlags |= IOFlags::OpenCreate;
00529   if (flags & O_APPEND)    openFlags |= IOFlags::OpenAppend;
00530   if (flags & O_EXCL)      openFlags |= IOFlags::OpenExclusive;
00531   if (flags & O_TRUNC)     openFlags |= IOFlags::OpenTruncate;
00532   if (flags & O_NONBLOCK)  openFlags |= IOFlags::OpenNonBlock;
00533 
00534   if (! (storage_ = StorageFactory::get()->open(pathname, openFlags)))
00535   {
00536      MakeZombie();
00537      gDirectory = gROOT;
00538      throw cms::Exception("TStorageFactoryFile::SysOpen()")
00539        << "Cannot open file '" << pathname << "'";
00540   }
00541 
00542   stats.tick();
00543   return 0;
00544 }
00545 
00546 Int_t
00547 TStorageFactoryFile::SysClose(Int_t /* fd */)
00548 {
00549   StorageAccount::Stamp stats(storageCounter(s_statsClose, "close"));
00550 
00551   if (storage_)
00552   {
00553     storage_->close();
00554     delete storage_;
00555     storage_ = 0;
00556   }
00557 
00558   stats.tick();
00559   return 0;
00560 }
00561 
00562 Long64_t
00563 TStorageFactoryFile::SysSeek(Int_t /* fd */, Long64_t offset, Int_t whence)
00564 {
00565   StorageAccount::Stamp stats(storageCounter(s_statsSeek, "seek"));
00566   Storage::Relative rel = (whence == SEEK_SET ? Storage::SET
00567                                : whence == SEEK_CUR ? Storage::CURRENT
00568                                : Storage::END);
00569 
00570   offset = storage_->position(offset, rel);
00571   stats.tick();
00572   return offset;
00573 }
00574 
00575 Int_t
00576 TStorageFactoryFile::SysSync(Int_t /* fd */)
00577 {
00578   StorageAccount::Stamp stats(storageCounter(s_statsFlush, "flush"));
00579   storage_->flush();
00580   stats.tick();
00581   return 0;
00582 }
00583 
00584 Int_t
00585 TStorageFactoryFile::SysStat(Int_t /* fd */, Long_t *id, Long64_t *size,
00586                              Long_t *flags, Long_t *modtime)
00587 {
00588   StorageAccount::Stamp stats(storageCounter(s_statsStat, "stat"));
00589   // FIXME: Most of this is unsupported or makes no sense with Storage
00590   *id = ::Hash(fRealName);
00591   *size = storage_->size();
00592   *flags = 0;
00593   *modtime = 0;
00594   stats.tick();
00595   return 0;
00596 }
00597 
00598 void
00599 TStorageFactoryFile::ResetErrno(void) const
00600 {
00601   TSystem::ResetErrno();
00602 }