CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_2_9/src/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc

Go to the documentation of this file.
00001 #include "IOPool/TFileAdaptor/interface/TStorageFactoryFile.h"
00002 #include "Utilities/StorageFactory/interface/Storage.h"
00003 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00004 #include "Utilities/StorageFactory/interface/StorageAccount.h"
00005 #include "FWCore/Utilities/interface/EDMException.h"
00006 #include "TFileCacheRead.h"
00007 #include "TSystem.h"
00008 #include "TROOT.h"
00009 #include <errno.h>
00010 #include <sys/stat.h>
00011 #include <unistd.h>
00012 #include <fcntl.h>
00013 #include <iostream>
00014 
00015 #if 0
00016 #include "TTreeCache.h"
00017 #include "TTree.h"
00018 
00019 class TTreeCacheDebug : public TTreeCache {
00020 public:
00021   void dump(const char *label, const char *trailer)
00022   {
00023     Long64_t entry = fOwner->GetReadEntry();
00024     std::cerr
00025       << label << ": " << entry << " "
00026       << "{ fEntryMin=" << fEntryMin
00027       << ", fEntryMax=" << fEntryMax
00028       << ", fEntryNext=" << fEntryNext
00029       << ", fZipBytes=" << fZipBytes
00030       << ", fNbranches=" << fNbranches
00031       << ", fNReadOk=" << fNReadOk
00032       << ", fNReadMiss=" << fNReadMiss
00033       << ", fNReadPref=" << fNReadPref
00034       << ", fBranches=" << fBranches
00035       << ", fBrNames=" << fBrNames
00036       << ", fOwner=" << fOwner
00037       << ", fTree=" << fTree
00038       << ", fIsLearning=" << fIsLearning
00039       << ", fIsManual=" << fIsManual
00040       << "; fBufferSizeMin=" << fBufferSizeMin
00041       << ", fBufferSize=" << fBufferSize
00042       << ", fBufferLen=" << fBufferLen
00043       << ", fBytesToPrefetch=" << fBytesToPrefetch
00044       << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
00045       << ", fAsyncReading=" << fAsyncReading
00046       << ", fNseek=" << fNseek
00047       << ", fNtot=" << fNtot
00048       << ", fNb=" << fNb
00049       << ", fSeekSize=" << fSeekSize
00050       << ", fSeek=" << fSeek
00051       << ", fSeekIndex=" << fSeekIndex
00052       << ", fSeekSort=" << fSeekSort
00053       << ", fPos=" << fPos
00054       << ", fSeekLen=" << fSeekLen
00055       << ", fSeekSortLen=" << fSeekSortLen
00056       << ", fSeekPos=" << fSeekPos
00057       << ", fLen=" << fLen
00058       << ", fFile=" << fFile
00059       << ", fBuffer=" << (void *) fBuffer
00060       << ", fIsSorted=" << fIsSorted
00061       << " }\n" << trailer;
00062   }
00063 };
00064 #endif
00065 
00066 ClassImp(TStorageFactoryFile)
00067 static StorageAccount::Counter *s_statsCtor = 0;
00068 static StorageAccount::Counter *s_statsOpen = 0;
00069 static StorageAccount::Counter *s_statsClose = 0;
00070 static StorageAccount::Counter *s_statsFlush = 0;
00071 static StorageAccount::Counter *s_statsStat = 0;
00072 static StorageAccount::Counter *s_statsSeek = 0;
00073 static StorageAccount::Counter *s_statsRead = 0;
00074 static StorageAccount::Counter *s_statsCRead = 0;
00075 static StorageAccount::Counter *s_statsCPrefetch = 0;
00076 static StorageAccount::Counter *s_statsARead = 0;
00077 static StorageAccount::Counter *s_statsXRead = 0;
00078 static StorageAccount::Counter *s_statsWrite = 0;
00079 static StorageAccount::Counter *s_statsCWrite = 0;
00080 static StorageAccount::Counter *s_statsXWrite = 0;
00081 
00082 static inline StorageAccount::Counter &
00083 storageCounter(StorageAccount::Counter *&c, const char *label)
00084 {
00085   if (! c) c = &StorageAccount::counter("tstoragefile", label);
00086   return *c;
00087 }
00088 
00089 TStorageFactoryFile::TStorageFactoryFile(void)
00090   : storage_(0)
00091 {
00092   StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
00093   stats.tick(0);
00094 }
00095 
00096 // This constructor must be compatible with *all* the various built-in TFile plugins,
00097 // including TXNetFile.  This is why some arguments in the constructor is ignored.
00098 // If there's a future T*File that is incompatible with this constructor, a new
00099 // constructor will have to be added.
00100 TStorageFactoryFile::TStorageFactoryFile(const char *path,
00101                                          Option_t *option,
00102                                          const char *ftitle,
00103                                          Int_t compress,
00104                                          Int_t netopt,
00105                                          Bool_t parallelopen)
00106   : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
00107     storage_(0)
00108 {
00109   Initialize(path, option);
00110 }
00111 
00112 TStorageFactoryFile::TStorageFactoryFile(const char *path,
00113                                          Option_t *option /* = "" */,
00114                                          const char *ftitle /* = "" */,
00115                                          Int_t compress /* = 1 */)
00116   : TFile(path, "NET", ftitle, compress), // Pass "NET" to prevent local access in base class
00117     storage_(0)
00118 {
00119   Initialize(path, option);
00120 }
00121 
00122 void
00123 TStorageFactoryFile::Initialize(const char *path,
00124                                 Option_t *option /* = "" */)
00125 {
00126   StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
00127 
00128   // Parse options; at the moment we only accept read!
00129   fOption = option;
00130   fOption.ToUpper();
00131 
00132   if (fOption == "NEW")
00133     fOption = "CREATE";
00134 
00135   Bool_t create   = (fOption == "CREATE");
00136   Bool_t recreate = (fOption == "RECREATE");
00137   Bool_t update   = (fOption == "UPDATE");
00138   Bool_t read     = (fOption == "READ");
00139 
00140   if (!create && !recreate && !update && !read)
00141   {
00142     read = true;
00143     fOption = "READ";
00144   }
00145 
00146   if (recreate)
00147   {
00148     if (!gSystem->AccessPathName(path, kFileExists))
00149       gSystem->Unlink(path);
00150 
00151     recreate = false;
00152     create   = true;
00153     fOption  = "CREATE";
00154   }
00155   assert(!recreate);
00156 
00157   if (update && gSystem->AccessPathName(path, kFileExists))
00158   {
00159     update = kFALSE;
00160     create = kTRUE;
00161   }
00162 
00163   int           openFlags = IOFlags::OpenRead;
00164   if (!read)    openFlags |= IOFlags::OpenWrite;
00165   if (create)   openFlags |= IOFlags::OpenCreate;
00166   //if (recreate) openFlags |= IOFlags::OpenCreate | IOFlags::OpenTruncate;
00167 
00168   // Open storage
00169   if (! (storage_ = StorageFactory::get()->open(path, openFlags)))
00170   {
00171      MakeZombie();
00172      gDirectory = gROOT;
00173      throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()")
00174        << "Cannot open file '" << path << "'";
00175   }
00176 
00177   fRealName = path;
00178   fD = 0; // sorry, meaningless
00179   fWritable = read ? kFALSE : kTRUE;
00180 
00181   Init(create);
00182 
00183   stats.tick(0);
00184 }
00185 
00186 TStorageFactoryFile::~TStorageFactoryFile(void)
00187 {
00188   Close();
00189   delete storage_;
00190 }
00191 
00195 
00196 Bool_t
00197 TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len)
00198 {
00199   // This function needs to be optimized to minimize seeks.
00200   // See TFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) in ROOT 5.27.06.
00201   Seek(pos);
00202   return ReadBuffer(buf, len);
00203 }
00204 
00205 Bool_t
00206 TStorageFactoryFile::ReadBuffer(char *buf, Int_t len)
00207 {
00208   // Check that it's valid to access this file.
00209   if (IsZombie())
00210   {
00211     Error("ReadBuffer", "Cannot read from a zombie file");
00212     return kTRUE;
00213   }
00214 
00215   if (! IsOpen())
00216   {
00217     Error("ReadBuffer", "Cannot read from a file that is not open");
00218     return kTRUE;
00219   }
00220 
00221   // Read specified byte range from the storage.  Returns kTRUE in
00222   // case of error.  Note that ROOT uses this function recursively
00223   // to fill the cache; we use a flag to make sure our accounting
00224   // is reflected in a comprehensible manner.  The "read" counter
00225   // will include both, "readc" indicates how much read from the
00226   // cache, "readu" indicates how much we failed to read from the
00227   // cache (excluding those recursive reads), and "readx" counts
00228   // the amount actually passed to read from the storage object.
00229   StorageAccount::Stamp stats(storageCounter(s_statsRead, "read"));
00230 
00231   // If we have a cache, read from there first.  This returns 0
00232   // if the block hasn't been prefetched, 1 if it was in cache,
00233   // and 2 if there was an error.
00234   if (TFileCacheRead *c = GetCacheRead())
00235   {
00236     Long64_t here = GetRelOffset();
00237     Bool_t   async = c->IsAsyncReading();
00238 
00239     StorageAccount::Stamp cstats(async
00240                                  ? storageCounter(s_statsCPrefetch, "readPrefetchToCache")
00241                                  : storageCounter(s_statsCRead, "readViaCache"));
00242 
00243     Int_t st = ReadBufferViaCache(async ? 0 : buf, len);
00244 
00245     if (st == 2) {
00246       return kTRUE;
00247     }
00248 
00249     if (st == 1) {
00250       if (async) {
00251         cstats.tick(len);
00252         Seek(here);
00253       } else {
00254         cstats.tick(len);
00255         stats.tick(len);
00256         return kFALSE;
00257       }
00258     }
00259   }
00260 
00261   // FIXME: Re-enable read-ahead if the data wasn't in cache.
00262   // if (! st) storage_->caching(true, -1, s_readahead);
00263 
00264   // A real read
00265   StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00266   IOSize n = storage_->xread(buf, len);
00267   xstats.tick(n);
00268   stats.tick(n);
00269   return n ? kFALSE : kTRUE;
00270 }
00271 
00272 Bool_t
00273 TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len)
00274 {
00275   // Check that it's valid to access this file.
00276   if (IsZombie())
00277   {
00278     Error("ReadBufferAsync", "Cannot read from a zombie file");
00279     return kTRUE;
00280   }
00281 
00282   if (! IsOpen())
00283   {
00284     Error("ReadBufferAsync", "Cannot read from a file that is not open");
00285     return kTRUE;
00286   }
00287 
00288   StorageAccount::Stamp stats(storageCounter(s_statsARead, "readAsync"));
00289 
00290   // If asynchronous reading is disabled, bail out now, regardless
00291   // whether the underlying storage supports prefetching.  If it is
00292   // forced on, pretend it's on, even if the storage doesn't support
00293   // it, as this turns off the caching in ROOT's side.
00294   StorageFactory *f = StorageFactory::get();
00295 
00296   // Verify that we never using async reads in app-only mode
00297   if (f->cacheHint() == StorageFactory::CACHE_HINT_APPLICATION)
00298     return kTRUE;
00299 
00300   // Let the I/O method indicate if it can do client-side prefetch.
00301   // If it does, then for example TTreeCache will drop its own cache
00302   // and will use the client-side cache of the actual I/O layer.
00303   // If len is zero ROOT is probing for prefetch support.
00304   if (len) {
00305     // FIXME: Synchronise caching.
00306     // storage_->caching(true, -1, 0);
00307     ;
00308   }
00309 
00310   IOPosBuffer iov(off, (void *) 0, len ? len : 4096);
00311   if (storage_->prefetch(&iov, 1))
00312   {
00313     stats.tick(len);
00314     return kFALSE;
00315   }
00316 
00317   // Always ask ROOT to use async reads in storage-only mode,
00318   // regardless of whether the storage system supports it.
00319   if (f->cacheHint() == StorageFactory::CACHE_HINT_STORAGE)
00320     return kFALSE;
00321 
00322   // Prefetching not available right now.
00323   return kTRUE;
00324 }
00325 
00326 Bool_t
00327 TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
00328 {
00329   // Check that it's valid to access this file.
00330   if (IsZombie())
00331   {
00332     Error("ReadBuffers", "Cannot read from a zombie file");
00333     return kTRUE;
00334   }
00335 
00336   if (! IsOpen())
00337   {
00338     Error("ReadBuffers", "Cannot read from a file that is not open");
00339     return kTRUE;
00340   }
00341 
00342   // This should coalesce reads into a smaller number of large reads.
00343   // If the list of buffers to read has two or more buffers within 256KB of
00344   // each other, we collapse them into a single read from the storage system.
00345 
00346   // buf == 0 implies an async read; in this case, we skip the read coalescing logic.
00347   // Note that this code will cause CMSSW to never call readv().
00348   if (buf)
00349   {
00350     // Code ported from ROOT v5.26 trunk by Brian Bockelman.
00351     Int_t k = 0;
00352     Bool_t result = kTRUE;
00353     TFileCacheRead *old = fCacheRead;
00354     fCacheRead = 0;
00355     IOOffset curbegin = pos[0];
00356     IOOffset cur;
00357     std::vector<char> buf2(0);
00358     Int_t i = 0; // Position in the buffer.
00359     UInt_t n = 0; // Number of reads we have coalesced.
00360 
00361     // Size of our coalesce window.  In ROOT 5.26, this is actually a variable
00362     // you can tweak, but it's not exposed in CMSSW.
00363 
00364     // Iterate over all the requests we have been given.  We either read each
00365     // individually or coalesce them into a big read.
00366 
00367     // Loop over all the requests we have been given.  Only trigger a read if the
00368     // request at pos[i] wouldn't completely fit into the read coalesce buffer.
00369     // If we trigger, then we do a single read for requests i-n to i-1, inclusive.
00370     // If n==0, we have special logic.
00371     while (i < nbuf)
00372     {
00373       cur = pos[i]+len[i];
00374       Bool_t bigRead = kTRUE;
00375       if (cur -curbegin < READ_COALESCE_SIZE)
00376       {
00377         // Add the current request into the set of buffers we will coalesce
00378         n++; // Record we have a new request we will coalesce.
00379         i++; // Examine the next request in the next loop.
00380         bigRead = kFALSE;
00381       }
00382       // Only perform a read if one of the following holds:
00383       // 1) bigRead=TRUE; i.e., we can't fit any more requests into the window.
00384       // 2) i>=nbuf; if i pointed to nbuf-1 at the beginning of the while loop,
00385       //    then the above logic will either set bigRead=TRUE (making case #1
00386       //    true) or increment i, making i == nbuf.
00387       if (bigRead || (i>=nbuf))
00388       {
00389         // If n == 0, no read requests could be coalesced.  Simple read.
00390         if (n == 0)
00391         {
00392           //if the block to read is about the same size as the read-ahead buffer
00393           //we read the block directly
00394           Seek(pos[i]);
00395 
00396           StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00397           // if xread returns short, then we have an error.  Break from the loop
00398           // and return kTRUE - signaling an error.
00399           result = ((IOSize)len[i] == storage_->xread(&buf[k], len[i])) ? kFALSE : kTRUE;
00400           xstats.tick(len[i]);
00401 
00402           if (result)
00403             break;
00404           k += len[i];
00405           i++;
00406         }
00407         else
00408         {
00409           //otherwise we read all blocks that fit in the read-ahead buffer
00410           Seek(curbegin);
00411           // Only allocate buf2 once; use std::vector to make sure the memory
00412           // gets cleaned up, as xread can toss an exception.
00413           if (buf2.capacity() < READ_COALESCE_SIZE)
00414             buf2.resize(READ_COALESCE_SIZE);
00415           //we read ahead
00416           assert(len[i-1] >= 0);
00417           assert(pos[i-1] >= curbegin);
00418           assert(pos[i-1]-curbegin+len[i-1] <= READ_COALESCE_SIZE);
00419           IOSize nahead = IOSized(pos[i-1]-curbegin+len[i-1]);
00420 
00421           StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00422           result = ( nahead == storage_->xread(&buf2[0], nahead)) ? kFALSE : kTRUE;
00423           xstats.tick(nahead);
00424 
00425           if (result)
00426             break;
00427 
00428           // Now, copy the data from the read to the appropriate buffer in
00429           // order to fulfill the request.
00430           for (UInt_t j=0;j<n;++j) {
00431                 memcpy(&buf[k],&buf2[pos[i-n+j]-curbegin],len[i-n+j]);
00432                 k += len[i-n+j];
00433           }
00434           n = 0;
00435         }
00436         curbegin = pos[i];
00437       }
00438     }
00439     fCacheRead = old;
00440     return result;
00441   }
00442   assert(!buf);
00443 
00444   // Read from underlying storage.
00445   void* const nobuf = 0;
00446   Int_t total = 0;
00447   std::vector<IOPosBuffer> iov;
00448   iov.reserve(nbuf);
00449   for (Int_t i = 0; i < nbuf; ++i)
00450   {
00451     // iov.push_back(IOPosBuffer(pos[i], buf ? buf + total : 0, len[i]));
00452     iov.push_back(IOPosBuffer(pos[i], nobuf, len[i]));
00453     total += len[i];
00454   }
00455 
00456   // Null buffer means asynchronous reads into I/O system's cache.
00457   bool success;
00458   StorageAccount::Stamp astats(storageCounter(s_statsARead, "readAsync"));
00459   // Synchronise low-level cache with the supposed cache in TFile.
00460   // storage_->caching(true, -1, 0);
00461   success = storage_->prefetch(&iov[0], nbuf);
00462   astats.tick(total);
00463 
00464   // If it didn't suceeed, pass down to the base class.
00465   return success ? kFALSE : TFile::ReadBuffers(buf, pos, len, nbuf);
00466 }
00467 
00468 Bool_t
00469 TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len)
00470 {
00471   // Check that it's valid to access this file.
00472   if (IsZombie())
00473   {
00474     Error("WriteBuffer", "Cannot write to a zombie file");
00475     return kTRUE;
00476   }
00477 
00478   if (! IsOpen())
00479   {
00480     Error("WriteBuffer", "Cannot write to a file that is not open");
00481     return kTRUE;
00482   }
00483 
00484   if (! fWritable)
00485   {
00486     Error("WriteBuffer", "File is not writable");
00487     return kTRUE;
00488   }
00489 
00490   StorageAccount::Stamp stats(storageCounter(s_statsWrite, "write"));
00491   StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, "writeViaCache"));
00492 
00493   // Try first writing via a cache, and if that's not possible, directly.
00494   Int_t st;
00495   switch ((st = WriteBufferViaCache(buf, len)))
00496   {
00497   case 0:
00498     // Actual write.
00499     {
00500       StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, "writeActual"));
00501       IOSize n = storage_->xwrite(buf, len);
00502       xstats.tick(n);
00503       stats.tick(n);
00504 
00505       // FIXME: What if it's a short write?
00506       return n > 0 ? kFALSE : kTRUE;
00507     }
00508 
00509   case 1:
00510     cstats.tick(len);
00511     stats.tick(len);
00512     return kFALSE;
00513 
00514   case 2:
00515   default:
00516     Error("WriteBuffer", "Error writing to cache");
00517     return kTRUE;
00518   }
00519 }
00520 
00524 // FIXME: Override GetBytesToPrefetch() so XROOTD can suggest how
00525 // large a prefetch cache to use.
00526 // FIXME: Asynchronous open support?
00527 
00531 Int_t
00532 TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t /* mode */)
00533 {
00534   StorageAccount::Stamp stats(storageCounter(s_statsOpen, "open"));
00535 
00536   if (storage_)
00537   {
00538     storage_->close();
00539     delete storage_;
00540     storage_ = 0;
00541   }
00542 
00543   int                      openFlags = IOFlags::OpenRead;
00544   if (flags & O_WRONLY)    openFlags = IOFlags::OpenWrite;
00545   else if (flags & O_RDWR) openFlags |= IOFlags::OpenWrite;
00546   if (flags & O_CREAT)     openFlags |= IOFlags::OpenCreate;
00547   if (flags & O_APPEND)    openFlags |= IOFlags::OpenAppend;
00548   if (flags & O_EXCL)      openFlags |= IOFlags::OpenExclusive;
00549   if (flags & O_TRUNC)     openFlags |= IOFlags::OpenTruncate;
00550   if (flags & O_NONBLOCK)  openFlags |= IOFlags::OpenNonBlock;
00551 
00552   if (! (storage_ = StorageFactory::get()->open(pathname, openFlags)))
00553   {
00554      MakeZombie();
00555      gDirectory = gROOT;
00556      throw cms::Exception("TStorageFactoryFile::SysOpen()")
00557        << "Cannot open file '" << pathname << "'";
00558   }
00559 
00560   stats.tick();
00561   return 0;
00562 }
00563 
00564 Int_t
00565 TStorageFactoryFile::SysClose(Int_t /* fd */)
00566 {
00567   StorageAccount::Stamp stats(storageCounter(s_statsClose, "close"));
00568 
00569   if (storage_)
00570   {
00571     storage_->close();
00572     delete storage_;
00573     storage_ = 0;
00574   }
00575 
00576   stats.tick();
00577   return 0;
00578 }
00579 
00580 Long64_t
00581 TStorageFactoryFile::SysSeek(Int_t /* fd */, Long64_t offset, Int_t whence)
00582 {
00583   StorageAccount::Stamp stats(storageCounter(s_statsSeek, "seek"));
00584   Storage::Relative rel = (whence == SEEK_SET ? Storage::SET
00585                                : whence == SEEK_CUR ? Storage::CURRENT
00586                                : Storage::END);
00587 
00588   offset = storage_->position(offset, rel);
00589   stats.tick();
00590   return offset;
00591 }
00592 
00593 Int_t
00594 TStorageFactoryFile::SysSync(Int_t /* fd */)
00595 {
00596   StorageAccount::Stamp stats(storageCounter(s_statsFlush, "flush"));
00597   storage_->flush();
00598   stats.tick();
00599   return 0;
00600 }
00601 
00602 Int_t
00603 TStorageFactoryFile::SysStat(Int_t /* fd */, Long_t *id, Long64_t *size,
00604                              Long_t *flags, Long_t *modtime)
00605 {
00606   StorageAccount::Stamp stats(storageCounter(s_statsStat, "stat"));
00607   // FIXME: Most of this is unsupported or makes no sense with Storage
00608   *id = ::Hash(fRealName);
00609   *size = storage_->size();
00610   *flags = 0;
00611   *modtime = 0;
00612   stats.tick();
00613   return 0;
00614 }
00615 
00616 void
00617 TStorageFactoryFile::ResetErrno(void) const
00618 {
00619   TSystem::ResetErrno();
00620 }