CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc

Go to the documentation of this file.
00001 #include "IOPool/TFileAdaptor/interface/TStorageFactoryFile.h"
00002 #include "Utilities/StorageFactory/interface/Storage.h"
00003 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00004 #include "Utilities/StorageFactory/interface/StorageAccount.h"
00005 #include "FWCore/Utilities/interface/EDMException.h"
00006 #include "TFileCacheRead.h"
00007 #include "TSystem.h"
00008 #include "TROOT.h"
00009 #include <errno.h>
00010 #include <sys/stat.h>
00011 #include <unistd.h>
00012 #include <fcntl.h>
00013 #include <iostream>
00014 
00015 #if 0
00016 #include "TTreeCache.h"
00017 #include "TTree.h"
00018 
00019 class TTreeCacheDebug : public TTreeCache {
00020 public:
00021   void dump(const char *label, const char *trailer)
00022   {
00023     Long64_t entry = fOwner->GetReadEntry();
00024     std::cerr
00025       << label << ": " << entry << " "
00026       << "{ fEntryMin=" << fEntryMin
00027       << ", fEntryMax=" << fEntryMax
00028       << ", fEntryNext=" << fEntryNext
00029       << ", fZipBytes=" << fZipBytes
00030       << ", fNbranches=" << fNbranches
00031       << ", fNReadOk=" << fNReadOk
00032       << ", fNReadMiss=" << fNReadMiss
00033       << ", fNReadPref=" << fNReadPref
00034       << ", fBranches=" << fBranches
00035       << ", fBrNames=" << fBrNames
00036       << ", fOwner=" << fOwner
00037       << ", fTree=" << fTree
00038       << ", fIsLearning=" << fIsLearning
00039       << ", fIsManual=" << fIsManual
00040       << "; fBufferSizeMin=" << fBufferSizeMin
00041       << ", fBufferSize=" << fBufferSize
00042       << ", fBufferLen=" << fBufferLen
00043       << ", fBytesToPrefetch=" << fBytesToPrefetch
00044       << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
00045       << ", fAsyncReading=" << fAsyncReading
00046       << ", fNseek=" << fNseek
00047       << ", fNtot=" << fNtot
00048       << ", fNb=" << fNb
00049       << ", fSeekSize=" << fSeekSize
00050       << ", fSeek=" << fSeek
00051       << ", fSeekIndex=" << fSeekIndex
00052       << ", fSeekSort=" << fSeekSort
00053       << ", fPos=" << fPos
00054       << ", fSeekLen=" << fSeekLen
00055       << ", fSeekSortLen=" << fSeekSortLen
00056       << ", fSeekPos=" << fSeekPos
00057       << ", fLen=" << fLen
00058       << ", fFile=" << fFile
00059       << ", fBuffer=" << (void *) fBuffer
00060       << ", fIsSorted=" << fIsSorted
00061       << " }\n" << trailer;
00062   }
00063 };
00064 #endif
00065 
00066 ClassImp(TStorageFactoryFile)
00067 static StorageAccount::Counter *s_statsCtor = 0;
00068 static StorageAccount::Counter *s_statsOpen = 0;
00069 static StorageAccount::Counter *s_statsClose = 0;
00070 static StorageAccount::Counter *s_statsFlush = 0;
00071 static StorageAccount::Counter *s_statsStat = 0;
00072 static StorageAccount::Counter *s_statsSeek = 0;
00073 static StorageAccount::Counter *s_statsRead = 0;
00074 static StorageAccount::Counter *s_statsCRead = 0;
00075 static StorageAccount::Counter *s_statsCPrefetch = 0;
00076 static StorageAccount::Counter *s_statsARead = 0;
00077 static StorageAccount::Counter *s_statsXRead = 0;
00078 static StorageAccount::Counter *s_statsWrite = 0;
00079 static StorageAccount::Counter *s_statsCWrite = 0;
00080 static StorageAccount::Counter *s_statsXWrite = 0;
00081 
00082 static inline StorageAccount::Counter &
00083 storageCounter(StorageAccount::Counter *&c, const char *label)
00084 {
00085   if (! c) c = &StorageAccount::counter("tstoragefile", label);
00086   return *c;
00087 }
00088 
00089 TStorageFactoryFile::TStorageFactoryFile(void)
00090   : storage_(0)
00091 {
00092   StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
00093   stats.tick(0);
00094 }
00095 
00096 
00097 TStorageFactoryFile::TStorageFactoryFile(const char *path,
00098                                          Option_t *option /* = "" */,
00099                                          const char *ftitle /* = "" */,
00100                                          Int_t compress /* = 1 */)
00101   : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
00102     storage_(0)
00103 {
00104   StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
00105 
00106   // Parse options; at the moment we only accept read!
00107   fOption = option;
00108   fOption.ToUpper();
00109 
00110   if (fOption == "NEW")
00111     fOption = "CREATE";
00112 
00113   Bool_t create   = (fOption == "CREATE");
00114   Bool_t recreate = (fOption == "RECREATE");
00115   Bool_t update   = (fOption == "UPDATE");
00116   Bool_t read     = (fOption == "READ");
00117 
00118   if (!create && !recreate && !update && !read)
00119   {
00120     read = true;
00121     fOption = "READ";
00122   }
00123 
00124   if (recreate)
00125   {
00126     if (!gSystem->AccessPathName(path, kFileExists))
00127       gSystem->Unlink(path);
00128 
00129     recreate = false;
00130     create   = true;
00131     fOption  = "CREATE";
00132   }
00133 
00134   if (update && gSystem->AccessPathName(path, kFileExists))
00135   {
00136     update = kFALSE;
00137     create = kTRUE;
00138   }
00139 
00140   int           openFlags = IOFlags::OpenRead;
00141   if (!read)    openFlags |= IOFlags::OpenWrite;
00142   if (create)   openFlags |= IOFlags::OpenCreate;
00143   if (recreate) openFlags |= IOFlags::OpenCreate | IOFlags::OpenTruncate;
00144 
00145   // Open storage
00146   if (! (storage_ = StorageFactory::get()->open(path, openFlags)))
00147   {
00148      MakeZombie();
00149      gDirectory = gROOT;
00150      throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()")
00151        << "Cannot open file '" << path << "'";
00152   }
00153 
00154   fRealName = path;
00155   fD = 0; // sorry, meaningless
00156   fWritable = read ? kFALSE : kTRUE;
00157 
00158   Init(create);
00159 
00160   stats.tick(0);
00161 }
00162 
00163 TStorageFactoryFile::~TStorageFactoryFile(void)
00164 {
00165   Close();
00166   delete storage_;
00167 }
00168 
00172 
00173 Bool_t
00174 TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len)
00175 {
00176   // This function needs to be optimized to minimize seeks.
00177   // See TFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) in ROOT 5.27.06.
00178   Seek(pos);
00179   return ReadBuffer(buf, len);
00180 }
00181 
00182 Bool_t
00183 TStorageFactoryFile::ReadBuffer(char *buf, Int_t len)
00184 {
00185   // Check that it's valid to access this file.
00186   if (IsZombie())
00187   {
00188     Error("ReadBuffer", "Cannot read from a zombie file");
00189     return kTRUE;
00190   }
00191 
00192   if (! IsOpen())
00193   {
00194     Error("ReadBuffer", "Cannot read from a file that is not open");
00195     return kTRUE;
00196   }
00197 
00198   // Read specified byte range from the storage.  Returns kTRUE in
00199   // case of error.  Note that ROOT uses this function recursively
00200   // to fill the cache; we use a flag to make sure our accounting
00201   // is reflected in a comprehensible manner.  The "read" counter
00202   // will include both, "readc" indicates how much read from the
00203   // cache, "readu" indicates how much we failed to read from the
00204   // cache (excluding those recursive reads), and "readx" counts
00205   // the amount actually passed to read from the storage object.
00206   StorageAccount::Stamp stats(storageCounter(s_statsRead, "read"));
00207 
00208   // If we have a cache, read from there first.  This returns 0
00209   // if the block hasn't been prefetched, 1 if it was in cache,
00210   // and 2 if there was an error.
00211   if (TFileCacheRead *c = GetCacheRead())
00212   {
00213     Long64_t here = GetRelOffset();
00214     Bool_t   async = c->IsAsyncReading();
00215 
00216     StorageAccount::Stamp cstats(async
00217                                  ? storageCounter(s_statsCPrefetch, "readPrefetchToCache")
00218                                  : storageCounter(s_statsCRead, "readViaCache"));
00219 
00220     Int_t st = ReadBufferViaCache(async ? 0 : buf, len);
00221 
00222     if (st == 2) {
00223       return kTRUE;
00224     }
00225 
00226     if (st == 1) {
00227       if (async) {
00228         cstats.tick(len);
00229         Seek(here);
00230       } else {
00231         cstats.tick(len);
00232         stats.tick(len);
00233         return kFALSE;
00234       }
00235     }
00236   }
00237 
00238   // FIXME: Re-enable read-ahead if the data wasn't in cache.
00239   // if (! st) storage_->caching(true, -1, s_readahead);
00240 
00241   // A real read
00242   StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00243   IOSize n = storage_->xread(buf, len);
00244   xstats.tick(n);
00245   stats.tick(n);
00246   return n ? kFALSE : kTRUE;
00247 }
00248 
00249 Bool_t
00250 TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len)
00251 {
00252   // Check that it's valid to access this file.
00253   if (IsZombie())
00254   {
00255     Error("ReadBufferAsync", "Cannot read from a zombie file");
00256     return kTRUE;
00257   }
00258 
00259   if (! IsOpen())
00260   {
00261     Error("ReadBufferAsync", "Cannot read from a file that is not open");
00262     return kTRUE;
00263   }
00264 
00265   StorageAccount::Stamp stats(storageCounter(s_statsARead, "readAsync"));
00266 
00267   // If asynchronous reading is disabled, bail out now, regardless
00268   // whether the underlying storage supports prefetching.  If it is
00269   // forced on, pretend it's on, even if the storage doesn't support
00270   // it, as this turns off the caching in ROOT's side.
00271   StorageFactory *f = StorageFactory::get();
00272 
00273   // Verify that we never using async reads in app-only mode
00274   if (f->cacheHint() == StorageFactory::CACHE_HINT_APPLICATION)
00275     return kTRUE;
00276 
00277   // Let the I/O method indicate if it can do client-side prefetch.
00278   // If it does, then for example TTreeCache will drop its own cache
00279   // and will use the client-side cache of the actual I/O layer.
00280   // If len is zero ROOT is probing for prefetch support.
00281   if (len) {
00282     // FIXME: Synchronise caching.
00283     // storage_->caching(true, -1, 0);
00284     ;
00285   }
00286 
00287   IOPosBuffer iov(off, (void *) 0, len ? len : 4096);
00288   if (storage_->prefetch(&iov, 1))
00289   {
00290     stats.tick(len);
00291     return kFALSE;
00292   }
00293 
00294   // Always ask ROOT to use async reads in storage-only mode,
00295   // regardless of whether the storage system supports it.
00296   if (f->cacheHint() == StorageFactory::CACHE_HINT_STORAGE)
00297     return kFALSE;
00298 
00299   // Prefetching not available right now.
00300   return kTRUE;
00301 }
00302 
00303 Bool_t
00304 TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
00305 {
00306   // Check that it's valid to access this file.
00307   if (IsZombie())
00308   {
00309     Error("ReadBuffers", "Cannot read from a zombie file");
00310     return kTRUE;
00311   }
00312 
00313   if (! IsOpen())
00314   {
00315     Error("ReadBuffers", "Cannot read from a file that is not open");
00316     return kTRUE;
00317   }
00318 
00319   // This should coalesce reads into a smaller number of large reads.
00320   // If the list of buffers to read has two or more buffers within 256KB of
00321   // each other, we collapse them into a single read from the storage system.
00322 
00323   // buf == 0 implies an async read; in this case, we skip the read coalescing logic.
00324   // Note that this code will cause CMSSW to never call readv().
00325   if (buf)
00326   {
00327     // Code ported from ROOT v5.26 trunk by Brian Bockelman.
00328     Int_t k = 0;
00329     Bool_t result = kTRUE;
00330     TFileCacheRead *old = fCacheRead;
00331     fCacheRead = 0;
00332     IOOffset curbegin = pos[0];
00333     IOOffset cur;
00334     std::vector<char> buf2(0);
00335     Int_t i = 0; // Position in the buffer.
00336     Int_t n = 0; // Number of reads we have coalesced.
00337 
00338     // Size of our coalesce window.  In ROOT 5.26, this is actually a variable
00339     // you can tweak, but it's not exposed in CMSSW.
00340 
00341     // Iterate over all the requests we have been given.  We either read each
00342     // individually or coalesce them into a big read.
00343 
00344     // Loop over all the requests we have been given.  Only trigger a read if the
00345     // request at pos[i] wouldn't completely fit into the read coalesce buffer.
00346     // If we trigger, then we do a single read for requests i-n to i-1, inclusive.
00347     // If n==0, we have special logic.
00348     while (i < nbuf)
00349     {
00350       cur = pos[i]+len[i];
00351       Bool_t bigRead = kTRUE;
00352       if (cur -curbegin < READ_COALESCE_SIZE)
00353       {
00354         // Add the current request into the set of buffers we will coalesce
00355         n++; // Record we have a new request we will coalesce.
00356         i++; // Examine the next request in the next loop.
00357         bigRead = kFALSE;
00358       }
00359       // Only perform a read if one of the following holds:
00360       // 1) bigRead=TRUE; i.e., we can't fit any more requests into the window.
00361       // 2) i>=nbuf; if i pointed to nbuf-1 at the beginning of the while loop,
00362       //    then the above logic will either set bigRead=TRUE (making case #1
00363       //    true) or increment i, making i == nbuf.
00364       if (bigRead || (i>=nbuf))
00365       {
00366         // If n == 0, no read requests could be coalesced.  Simple read.
00367         if (n == 0)
00368         {
00369           //if the block to read is about the same size as the read-ahead buffer
00370           //we read the block directly
00371           Seek(pos[i]);
00372 
00373           StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00374           // if xread returns short, then we have an error.  Break from the loop
00375           // and return kTRUE - signaling an error.
00376           result = ((IOSize)len[i] == storage_->xread(&buf[k], len[i])) ? kFALSE : kTRUE;
00377           xstats.tick(len[i]);
00378              
00379           if (result)
00380             break;
00381           k += len[i];
00382           i++;
00383         }
00384         else
00385         {
00386           //otherwise we read all blocks that fit in the read-ahead buffer
00387           Seek(curbegin);
00388           // Only allocate buf2 once; use std::vector to make sure the memory
00389           // gets cleaned up, as xread can toss an exception.
00390           if (buf2.capacity() < READ_COALESCE_SIZE)
00391             buf2.resize(READ_COALESCE_SIZE);
00392           //we read ahead
00393           assert(len[i-1] >= 0);
00394           assert(pos[i-1] >= curbegin);
00395           assert(pos[i-1]-curbegin+len[i-1] <= READ_COALESCE_SIZE);
00396           IOSize nahead = IOSized(pos[i-1]-curbegin+len[i-1]);
00397 
00398           StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00399           result = ( nahead == storage_->xread(&buf2[0], nahead)) ? kFALSE : kTRUE;
00400           xstats.tick(nahead);
00401 
00402           if (result)
00403             break;
00404 
00405           // Now, copy the data from the read to the appropriate buffer in
00406           // order to fulfill the request.
00407           for (Int_t j=0;j<n;j++) {
00408                 memcpy(&buf[k],&buf2[pos[i-n+j]-curbegin],len[i-n+j]);
00409                 k += len[i-n+j];
00410           }
00411           n = 0;
00412         }
00413         curbegin = pos[i];
00414       }
00415     }
00416     fCacheRead = old;
00417     return result;
00418   }
00419 
00420   // Read from underlying storage.
00421   Int_t total = 0;
00422   std::vector<IOPosBuffer> iov;
00423   iov.reserve(nbuf);
00424   for (Int_t i = 0; i < nbuf; ++i)
00425   {
00426     iov.push_back(IOPosBuffer(pos[i], buf ? buf + total : 0, len[i]));
00427     total += len[i];
00428   }
00429 
00430   // Null buffer means asynchronous reads into I/O system's cache.
00431   bool success;
00432   StorageAccount::Stamp astats(storageCounter(s_statsARead, "readAsync"));
00433   // Synchronise low-level cache with the supposed cache in TFile.
00434   // storage_->caching(true, -1, 0);
00435   success = storage_->prefetch(&iov[0], nbuf);
00436   astats.tick(total);
00437 
00438   // If it didn't suceeed, pass down to the base class.
00439   return success ? kFALSE : TFile::ReadBuffers(buf, pos, len, nbuf);
00440 }
00441 
00442 Bool_t
00443 TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len)
00444 {
00445   // Check that it's valid to access this file.
00446   if (IsZombie())
00447   {
00448     Error("WriteBuffer", "Cannot write to a zombie file");
00449     return kTRUE;
00450   }
00451 
00452   if (! IsOpen())
00453   {
00454     Error("WriteBuffer", "Cannot write to a file that is not open");
00455     return kTRUE;
00456   }
00457 
00458   if (! fWritable)
00459   {
00460     Error("WriteBuffer", "File is not writable");
00461     return kTRUE;
00462   }
00463 
00464   StorageAccount::Stamp stats(storageCounter(s_statsWrite, "write"));
00465   StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, "writeViaCache"));
00466 
00467   // Try first writing via a cache, and if that's not possible, directly.
00468   Int_t st;
00469   switch ((st = WriteBufferViaCache(buf, len)))
00470   {
00471   case 0:
00472     // Actual write.
00473     {
00474       StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, "writeActual"));
00475       IOSize n = storage_->xwrite(buf, len);
00476       xstats.tick(n);
00477       stats.tick(n);
00478 
00479       // FIXME: What if it's a short write?
00480       return n > 0 ? kFALSE : kTRUE;
00481     }
00482 
00483   case 1:
00484     cstats.tick(len);
00485     stats.tick(len);
00486     return kFALSE;
00487 
00488   case 2:
00489   default:
00490     Error("WriteBuffer", "Error writing to cache");
00491     return kTRUE;
00492   }
00493 }
00494 
00498 // FIXME: Override GetBytesToPrefetch() so XROOTD can suggest how
00499 // large a prefetch cache to use.
00500 // FIXME: Asynchronous open support?
00501 
00505 Int_t
00506 TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t /* mode */)
00507 {
00508   StorageAccount::Stamp stats(storageCounter(s_statsOpen, "open"));
00509 
00510   if (storage_)
00511   {
00512     storage_->close();
00513     delete storage_;
00514     storage_ = 0;
00515   }
00516 
00517   int                      openFlags = IOFlags::OpenRead;
00518   if (flags & O_WRONLY)    openFlags = IOFlags::OpenWrite;
00519   else if (flags & O_RDWR) openFlags |= IOFlags::OpenWrite;
00520   if (flags & O_CREAT)     openFlags |= IOFlags::OpenCreate;
00521   if (flags & O_APPEND)    openFlags |= IOFlags::OpenAppend;
00522   if (flags & O_EXCL)      openFlags |= IOFlags::OpenExclusive;
00523   if (flags & O_TRUNC)     openFlags |= IOFlags::OpenTruncate;
00524   if (flags & O_NONBLOCK)  openFlags |= IOFlags::OpenNonBlock;
00525 
00526   if (! (storage_ = StorageFactory::get()->open(pathname, openFlags)))
00527   {
00528      MakeZombie();
00529      gDirectory = gROOT;
00530      throw cms::Exception("TStorageFactoryFile::SysOpen()")
00531        << "Cannot open file '" << pathname << "'";
00532   }
00533 
00534   stats.tick();
00535   return 0;
00536 }
00537 
00538 Int_t
00539 TStorageFactoryFile::SysClose(Int_t /* fd */)
00540 {
00541   StorageAccount::Stamp stats(storageCounter(s_statsClose, "close"));
00542 
00543   if (storage_)
00544   {
00545     storage_->close();
00546     delete storage_;
00547     storage_ = 0;
00548   }
00549 
00550   stats.tick();
00551   return 0;
00552 }
00553 
00554 Long64_t
00555 TStorageFactoryFile::SysSeek(Int_t /* fd */, Long64_t offset, Int_t whence)
00556 {
00557   StorageAccount::Stamp stats(storageCounter(s_statsSeek, "seek"));
00558   Storage::Relative rel = (whence == SEEK_SET ? Storage::SET
00559                            : whence == SEEK_CUR ? Storage::CURRENT
00560                            : Storage::END);
00561 
00562   offset = storage_->position(offset, rel);
00563   stats.tick();
00564   return offset;
00565 }
00566 
00567 Int_t
00568 TStorageFactoryFile::SysSync(Int_t /* fd */)
00569 {
00570   StorageAccount::Stamp stats(storageCounter(s_statsFlush, "flush"));
00571   storage_->flush();
00572   stats.tick();
00573   return 0;
00574 }
00575 
00576 Int_t
00577 TStorageFactoryFile::SysStat(Int_t /* fd */, Long_t *id, Long64_t *size,
00578                       Long_t *flags, Long_t *modtime)
00579 {
00580   StorageAccount::Stamp stats(storageCounter(s_statsStat, "stat"));
00581   // FIXME: Most of this is unsupported or makes no sense with Storage
00582   *id = ::Hash(fRealName);
00583   *size = storage_->size();
00584   *flags = 0;
00585   *modtime = 0;
00586   stats.tick();
00587   return 0;
00588 }
00589 
00590 void
00591 TStorageFactoryFile::ResetErrno(void) const
00592 {
00593   TSystem::ResetErrno();
00594 }