00001 #include "IOPool/TFileAdaptor/interface/TStorageFactoryFile.h"
00002 #include "Utilities/StorageFactory/interface/Storage.h"
00003 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00004 #include "Utilities/StorageFactory/interface/StorageAccount.h"
00005 #include "FWCore/Utilities/interface/EDMException.h"
00006 #include "TFileCacheRead.h"
00007 #include "TSystem.h"
00008 #include "TROOT.h"
00009 #include <errno.h>
00010 #include <sys/stat.h>
00011 #include <unistd.h>
00012 #include <fcntl.h>
00013 #include <iostream>
00014
00015 #if 0
00016 #include "TTreeCache.h"
00017 #include "TTree.h"
00018
00019 class TTreeCacheDebug : public TTreeCache {
00020 public:
00021 void dump(const char *label, const char *trailer)
00022 {
00023 Long64_t entry = fOwner->GetReadEntry();
00024 std::cerr
00025 << label << ": " << entry << " "
00026 << "{ fEntryMin=" << fEntryMin
00027 << ", fEntryMax=" << fEntryMax
00028 << ", fEntryNext=" << fEntryNext
00029 << ", fZipBytes=" << fZipBytes
00030 << ", fNbranches=" << fNbranches
00031 << ", fNReadOk=" << fNReadOk
00032 << ", fNReadMiss=" << fNReadMiss
00033 << ", fNReadPref=" << fNReadPref
00034 << ", fBranches=" << fBranches
00035 << ", fBrNames=" << fBrNames
00036 << ", fOwner=" << fOwner
00037 << ", fTree=" << fTree
00038 << ", fIsLearning=" << fIsLearning
00039 << ", fIsManual=" << fIsManual
00040 << "; fBufferSizeMin=" << fBufferSizeMin
00041 << ", fBufferSize=" << fBufferSize
00042 << ", fBufferLen=" << fBufferLen
00043 << ", fBytesToPrefetch=" << fBytesToPrefetch
00044 << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
00045 << ", fAsyncReading=" << fAsyncReading
00046 << ", fNseek=" << fNseek
00047 << ", fNtot=" << fNtot
00048 << ", fNb=" << fNb
00049 << ", fSeekSize=" << fSeekSize
00050 << ", fSeek=" << fSeek
00051 << ", fSeekIndex=" << fSeekIndex
00052 << ", fSeekSort=" << fSeekSort
00053 << ", fPos=" << fPos
00054 << ", fSeekLen=" << fSeekLen
00055 << ", fSeekSortLen=" << fSeekSortLen
00056 << ", fSeekPos=" << fSeekPos
00057 << ", fLen=" << fLen
00058 << ", fFile=" << fFile
00059 << ", fBuffer=" << (void *) fBuffer
00060 << ", fIsSorted=" << fIsSorted
00061 << " }\n" << trailer;
00062 }
00063 };
00064 #endif
00065
00066 ClassImp(TStorageFactoryFile)
00067 static StorageAccount::Counter *s_statsCtor = 0;
00068 static StorageAccount::Counter *s_statsOpen = 0;
00069 static StorageAccount::Counter *s_statsClose = 0;
00070 static StorageAccount::Counter *s_statsFlush = 0;
00071 static StorageAccount::Counter *s_statsStat = 0;
00072 static StorageAccount::Counter *s_statsSeek = 0;
00073 static StorageAccount::Counter *s_statsRead = 0;
00074 static StorageAccount::Counter *s_statsCRead = 0;
00075 static StorageAccount::Counter *s_statsCPrefetch = 0;
00076 static StorageAccount::Counter *s_statsARead = 0;
00077 static StorageAccount::Counter *s_statsXRead = 0;
00078 static StorageAccount::Counter *s_statsWrite = 0;
00079 static StorageAccount::Counter *s_statsCWrite = 0;
00080 static StorageAccount::Counter *s_statsXWrite = 0;
00081
00082 static inline StorageAccount::Counter &
00083 storageCounter(StorageAccount::Counter *&c, const char *label)
00084 {
00085 if (! c) c = &StorageAccount::counter("tstoragefile", label);
00086 return *c;
00087 }
00088
00089 TStorageFactoryFile::TStorageFactoryFile(void)
00090 : storage_(0)
00091 {
00092 StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
00093 stats.tick(0);
00094 }
00095
00096
00097 TStorageFactoryFile::TStorageFactoryFile(const char *path,
00098 Option_t *option ,
00099 const char *ftitle ,
00100 Int_t compress )
00101 : TFile(path, "NET", ftitle, compress),
00102 storage_(0)
00103 {
00104 StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
00105
00106
00107 fOption = option;
00108 fOption.ToUpper();
00109
00110 if (fOption == "NEW")
00111 fOption = "CREATE";
00112
00113 Bool_t create = (fOption == "CREATE");
00114 Bool_t recreate = (fOption == "RECREATE");
00115 Bool_t update = (fOption == "UPDATE");
00116 Bool_t read = (fOption == "READ");
00117
00118 if (!create && !recreate && !update && !read)
00119 {
00120 read = true;
00121 fOption = "READ";
00122 }
00123
00124 if (recreate)
00125 {
00126 if (!gSystem->AccessPathName(path, kFileExists))
00127 gSystem->Unlink(path);
00128
00129 recreate = false;
00130 create = true;
00131 fOption = "CREATE";
00132 }
00133
00134 if (update && gSystem->AccessPathName(path, kFileExists))
00135 {
00136 update = kFALSE;
00137 create = kTRUE;
00138 }
00139
00140 int openFlags = IOFlags::OpenRead;
00141 if (!read) openFlags |= IOFlags::OpenWrite;
00142 if (create) openFlags |= IOFlags::OpenCreate;
00143 if (recreate) openFlags |= IOFlags::OpenCreate | IOFlags::OpenTruncate;
00144
00145
00146 if (! (storage_ = StorageFactory::get()->open(path, openFlags)))
00147 {
00148 MakeZombie();
00149 gDirectory = gROOT;
00150 throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()")
00151 << "Cannot open file '" << path << "'";
00152 }
00153
00154 fRealName = path;
00155 fD = 0;
00156 fWritable = read ? kFALSE : kTRUE;
00157
00158 Init(create);
00159
00160 stats.tick(0);
00161 }
00162
00163 TStorageFactoryFile::~TStorageFactoryFile(void)
00164 {
00165 Close();
00166 delete storage_;
00167 }
00168
00172
00173 Bool_t
00174 TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len)
00175 {
00176
00177
00178 Seek(pos);
00179 return ReadBuffer(buf, len);
00180 }
00181
00182 Bool_t
00183 TStorageFactoryFile::ReadBuffer(char *buf, Int_t len)
00184 {
00185
00186 if (IsZombie())
00187 {
00188 Error("ReadBuffer", "Cannot read from a zombie file");
00189 return kTRUE;
00190 }
00191
00192 if (! IsOpen())
00193 {
00194 Error("ReadBuffer", "Cannot read from a file that is not open");
00195 return kTRUE;
00196 }
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206 StorageAccount::Stamp stats(storageCounter(s_statsRead, "read"));
00207
00208
00209
00210
00211 if (TFileCacheRead *c = GetCacheRead())
00212 {
00213 Long64_t here = GetRelOffset();
00214 Bool_t async = c->IsAsyncReading();
00215
00216 StorageAccount::Stamp cstats(async
00217 ? storageCounter(s_statsCPrefetch, "readPrefetchToCache")
00218 : storageCounter(s_statsCRead, "readViaCache"));
00219
00220 Int_t st = ReadBufferViaCache(async ? 0 : buf, len);
00221
00222 if (st == 2) {
00223 return kTRUE;
00224 }
00225
00226 if (st == 1) {
00227 if (async) {
00228 cstats.tick(len);
00229 Seek(here);
00230 } else {
00231 cstats.tick(len);
00232 stats.tick(len);
00233 return kFALSE;
00234 }
00235 }
00236 }
00237
00238
00239
00240
00241
00242 StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00243 IOSize n = storage_->xread(buf, len);
00244 xstats.tick(n);
00245 stats.tick(n);
00246 return n ? kFALSE : kTRUE;
00247 }
00248
00249 Bool_t
00250 TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len)
00251 {
00252
00253 if (IsZombie())
00254 {
00255 Error("ReadBufferAsync", "Cannot read from a zombie file");
00256 return kTRUE;
00257 }
00258
00259 if (! IsOpen())
00260 {
00261 Error("ReadBufferAsync", "Cannot read from a file that is not open");
00262 return kTRUE;
00263 }
00264
00265 StorageAccount::Stamp stats(storageCounter(s_statsARead, "readAsync"));
00266
00267
00268
00269
00270
00271 StorageFactory *f = StorageFactory::get();
00272
00273
00274 if (f->cacheHint() == StorageFactory::CACHE_HINT_APPLICATION)
00275 return kTRUE;
00276
00277
00278
00279
00280
00281 if (len) {
00282
00283
00284 ;
00285 }
00286
00287 IOPosBuffer iov(off, (void *) 0, len ? len : 4096);
00288 if (storage_->prefetch(&iov, 1))
00289 {
00290 stats.tick(len);
00291 return kFALSE;
00292 }
00293
00294
00295
00296 if (f->cacheHint() == StorageFactory::CACHE_HINT_STORAGE)
00297 return kFALSE;
00298
00299
00300 return kTRUE;
00301 }
00302
00303 Bool_t
00304 TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
00305 {
00306
00307 if (IsZombie())
00308 {
00309 Error("ReadBuffers", "Cannot read from a zombie file");
00310 return kTRUE;
00311 }
00312
00313 if (! IsOpen())
00314 {
00315 Error("ReadBuffers", "Cannot read from a file that is not open");
00316 return kTRUE;
00317 }
00318
00319
00320
00321
00322
00323
00324
00325 if (buf)
00326 {
00327
00328 Int_t k = 0;
00329 Bool_t result = kTRUE;
00330 TFileCacheRead *old = fCacheRead;
00331 fCacheRead = 0;
00332 IOOffset curbegin = pos[0];
00333 IOOffset cur;
00334 std::vector<char> buf2(0);
00335 Int_t i = 0;
00336 Int_t n = 0;
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348 while (i < nbuf)
00349 {
00350 cur = pos[i]+len[i];
00351 Bool_t bigRead = kTRUE;
00352 if (cur -curbegin < READ_COALESCE_SIZE)
00353 {
00354
00355 n++;
00356 i++;
00357 bigRead = kFALSE;
00358 }
00359
00360
00361
00362
00363
00364 if (bigRead || (i>=nbuf))
00365 {
00366
00367 if (n == 0)
00368 {
00369
00370
00371 Seek(pos[i]);
00372
00373 StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00374
00375
00376 result = ((IOSize)len[i] == storage_->xread(&buf[k], len[i])) ? kFALSE : kTRUE;
00377 xstats.tick(len[i]);
00378
00379 if (result)
00380 break;
00381 k += len[i];
00382 i++;
00383 }
00384 else
00385 {
00386
00387 Seek(curbegin);
00388
00389
00390 if (buf2.capacity() < READ_COALESCE_SIZE)
00391 buf2.resize(READ_COALESCE_SIZE);
00392
00393 assert(len[i-1] >= 0);
00394 assert(pos[i-1] >= curbegin);
00395 assert(pos[i-1]-curbegin+len[i-1] <= READ_COALESCE_SIZE);
00396 IOSize nahead = IOSized(pos[i-1]-curbegin+len[i-1]);
00397
00398 StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00399 result = ( nahead == storage_->xread(&buf2[0], nahead)) ? kFALSE : kTRUE;
00400 xstats.tick(nahead);
00401
00402 if (result)
00403 break;
00404
00405
00406
00407 for (Int_t j=0;j<n;j++) {
00408 memcpy(&buf[k],&buf2[pos[i-n+j]-curbegin],len[i-n+j]);
00409 k += len[i-n+j];
00410 }
00411 n = 0;
00412 }
00413 curbegin = pos[i];
00414 }
00415 }
00416 fCacheRead = old;
00417 return result;
00418 }
00419
00420
00421 Int_t total = 0;
00422 std::vector<IOPosBuffer> iov;
00423 iov.reserve(nbuf);
00424 for (Int_t i = 0; i < nbuf; ++i)
00425 {
00426 iov.push_back(IOPosBuffer(pos[i], buf ? buf + total : 0, len[i]));
00427 total += len[i];
00428 }
00429
00430
00431 bool success;
00432 StorageAccount::Stamp astats(storageCounter(s_statsARead, "readAsync"));
00433
00434
00435 success = storage_->prefetch(&iov[0], nbuf);
00436 astats.tick(total);
00437
00438
00439 return success ? kFALSE : TFile::ReadBuffers(buf, pos, len, nbuf);
00440 }
00441
00442 Bool_t
00443 TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len)
00444 {
00445
00446 if (IsZombie())
00447 {
00448 Error("WriteBuffer", "Cannot write to a zombie file");
00449 return kTRUE;
00450 }
00451
00452 if (! IsOpen())
00453 {
00454 Error("WriteBuffer", "Cannot write to a file that is not open");
00455 return kTRUE;
00456 }
00457
00458 if (! fWritable)
00459 {
00460 Error("WriteBuffer", "File is not writable");
00461 return kTRUE;
00462 }
00463
00464 StorageAccount::Stamp stats(storageCounter(s_statsWrite, "write"));
00465 StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, "writeViaCache"));
00466
00467
00468 Int_t st;
00469 switch ((st = WriteBufferViaCache(buf, len)))
00470 {
00471 case 0:
00472
00473 {
00474 StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, "writeActual"));
00475 IOSize n = storage_->xwrite(buf, len);
00476 xstats.tick(n);
00477 stats.tick(n);
00478
00479
00480 return n > 0 ? kFALSE : kTRUE;
00481 }
00482
00483 case 1:
00484 cstats.tick(len);
00485 stats.tick(len);
00486 return kFALSE;
00487
00488 case 2:
00489 default:
00490 Error("WriteBuffer", "Error writing to cache");
00491 return kTRUE;
00492 }
00493 }
00494
00498
00499
00500
00501
00505 Int_t
00506 TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t )
00507 {
00508 StorageAccount::Stamp stats(storageCounter(s_statsOpen, "open"));
00509
00510 if (storage_)
00511 {
00512 storage_->close();
00513 delete storage_;
00514 storage_ = 0;
00515 }
00516
00517 int openFlags = IOFlags::OpenRead;
00518 if (flags & O_WRONLY) openFlags = IOFlags::OpenWrite;
00519 else if (flags & O_RDWR) openFlags |= IOFlags::OpenWrite;
00520 if (flags & O_CREAT) openFlags |= IOFlags::OpenCreate;
00521 if (flags & O_APPEND) openFlags |= IOFlags::OpenAppend;
00522 if (flags & O_EXCL) openFlags |= IOFlags::OpenExclusive;
00523 if (flags & O_TRUNC) openFlags |= IOFlags::OpenTruncate;
00524 if (flags & O_NONBLOCK) openFlags |= IOFlags::OpenNonBlock;
00525
00526 if (! (storage_ = StorageFactory::get()->open(pathname, openFlags)))
00527 {
00528 MakeZombie();
00529 gDirectory = gROOT;
00530 throw cms::Exception("TStorageFactoryFile::SysOpen()")
00531 << "Cannot open file '" << pathname << "'";
00532 }
00533
00534 stats.tick();
00535 return 0;
00536 }
00537
00538 Int_t
00539 TStorageFactoryFile::SysClose(Int_t )
00540 {
00541 StorageAccount::Stamp stats(storageCounter(s_statsClose, "close"));
00542
00543 if (storage_)
00544 {
00545 storage_->close();
00546 delete storage_;
00547 storage_ = 0;
00548 }
00549
00550 stats.tick();
00551 return 0;
00552 }
00553
00554 Long64_t
00555 TStorageFactoryFile::SysSeek(Int_t , Long64_t offset, Int_t whence)
00556 {
00557 StorageAccount::Stamp stats(storageCounter(s_statsSeek, "seek"));
00558 Storage::Relative rel = (whence == SEEK_SET ? Storage::SET
00559 : whence == SEEK_CUR ? Storage::CURRENT
00560 : Storage::END);
00561
00562 offset = storage_->position(offset, rel);
00563 stats.tick();
00564 return offset;
00565 }
00566
00567 Int_t
00568 TStorageFactoryFile::SysSync(Int_t )
00569 {
00570 StorageAccount::Stamp stats(storageCounter(s_statsFlush, "flush"));
00571 storage_->flush();
00572 stats.tick();
00573 return 0;
00574 }
00575
00576 Int_t
00577 TStorageFactoryFile::SysStat(Int_t , Long_t *id, Long64_t *size,
00578 Long_t *flags, Long_t *modtime)
00579 {
00580 StorageAccount::Stamp stats(storageCounter(s_statsStat, "stat"));
00581
00582 *id = ::Hash(fRealName);
00583 *size = storage_->size();
00584 *flags = 0;
00585 *modtime = 0;
00586 stats.tick();
00587 return 0;
00588 }
00589
00590 void
00591 TStorageFactoryFile::ResetErrno(void) const
00592 {
00593 TSystem::ResetErrno();
00594 }