00001 #include "IOPool/TFileAdaptor/interface/TStorageFactoryFile.h"
00002 #include "Utilities/StorageFactory/interface/Storage.h"
00003 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00004 #include "Utilities/StorageFactory/interface/StorageAccount.h"
00005 #include "FWCore/Utilities/interface/EDMException.h"
00006 #include "ReadRepacker.h"
00007 #include "TFileCacheRead.h"
00008 #include "TSystem.h"
00009 #include "TROOT.h"
00010 #include "TEnv.h"
00011 #include <errno.h>
00012 #include <sys/stat.h>
00013 #include <unistd.h>
00014 #include <fcntl.h>
00015 #include <iostream>
00016
00017 #if 0
00018 #include "TTreeCache.h"
00019 #include "TTree.h"
00020
00021 class TTreeCacheDebug : public TTreeCache {
00022 public:
00023 void dump(const char *label, const char *trailer)
00024 {
00025 Long64_t entry = fOwner->GetReadEntry();
00026 std::cerr
00027 << label << ": " << entry << " "
00028 << "{ fEntryMin=" << fEntryMin
00029 << ", fEntryMax=" << fEntryMax
00030 << ", fEntryNext=" << fEntryNext
00031 << ", fZipBytes=" << fZipBytes
00032 << ", fNbranches=" << fNbranches
00033 << ", fNReadOk=" << fNReadOk
00034 << ", fNReadMiss=" << fNReadMiss
00035 << ", fNReadPref=" << fNReadPref
00036 << ", fBranches=" << fBranches
00037 << ", fBrNames=" << fBrNames
00038 << ", fOwner=" << fOwner
00039 << ", fTree=" << fTree
00040 << ", fIsLearning=" << fIsLearning
00041 << ", fIsManual=" << fIsManual
00042 << "; fBufferSizeMin=" << fBufferSizeMin
00043 << ", fBufferSize=" << fBufferSize
00044 << ", fBufferLen=" << fBufferLen
00045 << ", fBytesToPrefetch=" << fBytesToPrefetch
00046 << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
00047 << ", fAsyncReading=" << fAsyncReading
00048 << ", fNseek=" << fNseek
00049 << ", fNtot=" << fNtot
00050 << ", fNb=" << fNb
00051 << ", fSeekSize=" << fSeekSize
00052 << ", fSeek=" << fSeek
00053 << ", fSeekIndex=" << fSeekIndex
00054 << ", fSeekSort=" << fSeekSort
00055 << ", fPos=" << fPos
00056 << ", fSeekLen=" << fSeekLen
00057 << ", fSeekSortLen=" << fSeekSortLen
00058 << ", fSeekPos=" << fSeekPos
00059 << ", fLen=" << fLen
00060 << ", fFile=" << fFile
00061 << ", fBuffer=" << (void *) fBuffer
00062 << ", fIsSorted=" << fIsSorted
00063 << " }\n" << trailer;
00064 }
00065 };
00066 #endif
00067
00068 ClassImp(TStorageFactoryFile)
00069 static StorageAccount::Counter *s_statsCtor = 0;
00070 static StorageAccount::Counter *s_statsOpen = 0;
00071 static StorageAccount::Counter *s_statsClose = 0;
00072 static StorageAccount::Counter *s_statsFlush = 0;
00073 static StorageAccount::Counter *s_statsStat = 0;
00074 static StorageAccount::Counter *s_statsSeek = 0;
00075 static StorageAccount::Counter *s_statsRead = 0;
00076 static StorageAccount::Counter *s_statsCRead = 0;
00077 static StorageAccount::Counter *s_statsCPrefetch = 0;
00078 static StorageAccount::Counter *s_statsARead = 0;
00079 static StorageAccount::Counter *s_statsXRead = 0;
00080 static StorageAccount::Counter *s_statsWrite = 0;
00081 static StorageAccount::Counter *s_statsCWrite = 0;
00082 static StorageAccount::Counter *s_statsXWrite = 0;
00083
00084 static inline StorageAccount::Counter &
00085 storageCounter(StorageAccount::Counter *&c, const char *label)
00086 {
00087 if (! c) c = &StorageAccount::counter("tstoragefile", label);
00088 return *c;
00089 }
00090
00091 TStorageFactoryFile::TStorageFactoryFile(void)
00092 : storage_(0)
00093 {
00094 StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
00095 stats.tick(0);
00096 }
00097
00098
00099
00100
00101
00102 TStorageFactoryFile::TStorageFactoryFile(const char *path,
00103 Option_t *option,
00104 const char *ftitle,
00105 Int_t compress,
00106 Int_t netopt,
00107 Bool_t parallelopen )
00108 : TFile(path, "NET", ftitle, compress),
00109 storage_(0)
00110 {
00111 Initialize(path, option);
00112 }
00113
00114 TStorageFactoryFile::TStorageFactoryFile(const char *path,
00115 Option_t *option ,
00116 const char *ftitle ,
00117 Int_t compress )
00118 : TFile(path, "NET", ftitle, compress),
00119 storage_(0)
00120 {
00121 Initialize(path, option);
00122 }
00123
00124 void
00125 TStorageFactoryFile::Initialize(const char *path,
00126 Option_t *option )
00127 {
00128 StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
00129
00130
00131
00132
00133
00134
00135 gEnv->SetValue("TFile.AsyncReading", 1);
00136
00137
00138 fOption = option;
00139 fOption.ToUpper();
00140
00141 if (fOption == "NEW")
00142 fOption = "CREATE";
00143
00144 Bool_t create = (fOption == "CREATE");
00145 Bool_t recreate = (fOption == "RECREATE");
00146 Bool_t update = (fOption == "UPDATE");
00147 Bool_t read = (fOption == "READ");
00148
00149 if (!create && !recreate && !update && !read)
00150 {
00151 read = true;
00152 fOption = "READ";
00153 }
00154
00155 if (recreate)
00156 {
00157 if (!gSystem->AccessPathName(path, kFileExists))
00158 gSystem->Unlink(path);
00159
00160 recreate = false;
00161 create = true;
00162 fOption = "CREATE";
00163 }
00164 assert(!recreate);
00165
00166 if (update && gSystem->AccessPathName(path, kFileExists))
00167 {
00168 update = kFALSE;
00169 create = kTRUE;
00170 }
00171
00172 int openFlags = IOFlags::OpenRead;
00173 if (!read) openFlags |= IOFlags::OpenWrite;
00174 if (create) openFlags |= IOFlags::OpenCreate;
00175
00176
00177
00178 if (! (storage_ = StorageFactory::get()->open(path, openFlags)))
00179 {
00180 MakeZombie();
00181 gDirectory = gROOT;
00182 throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()")
00183 << "Cannot open file '" << path << "'";
00184 }
00185
00186 fRealName = path;
00187 fD = 0;
00188 fWritable = read ? kFALSE : kTRUE;
00189
00190 Init(create);
00191
00192 stats.tick(0);
00193 }
00194
00195 TStorageFactoryFile::~TStorageFactoryFile(void)
00196 {
00197 Close();
00198 delete storage_;
00199 }
00200
00204
00205 Bool_t
00206 TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len)
00207 {
00208
00209
00210 Seek(pos);
00211 return ReadBuffer(buf, len);
00212 }
00213
00214 Bool_t
00215 TStorageFactoryFile::ReadBuffer(char *buf, Int_t len)
00216 {
00217
00218 if (IsZombie())
00219 {
00220 Error("ReadBuffer", "Cannot read from a zombie file");
00221 return kTRUE;
00222 }
00223
00224 if (! IsOpen())
00225 {
00226 Error("ReadBuffer", "Cannot read from a file that is not open");
00227 return kTRUE;
00228 }
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238 StorageAccount::Stamp stats(storageCounter(s_statsRead, "read"));
00239
00240
00241
00242
00243 if (TFileCacheRead *c = GetCacheRead())
00244 {
00245 Long64_t here = GetRelOffset();
00246 Bool_t async = c->IsAsyncReading();
00247
00248 StorageAccount::Stamp cstats(async
00249 ? storageCounter(s_statsCPrefetch, "readPrefetchToCache")
00250 : storageCounter(s_statsCRead, "readViaCache"));
00251
00252 Int_t st = ReadBufferViaCache(async ? 0 : buf, len);
00253
00254 if (st == 2) {
00255 return kTRUE;
00256 }
00257
00258 if (st == 1) {
00259 if (async) {
00260 cstats.tick(len);
00261 Seek(here);
00262 } else {
00263 cstats.tick(len);
00264 stats.tick(len);
00265 return kFALSE;
00266 }
00267 }
00268 }
00269
00270
00271
00272
00273
00274 StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00275 IOSize n = storage_->xread(buf, len);
00276 xstats.tick(n);
00277 stats.tick(n);
00278 return n ? kFALSE : kTRUE;
00279 }
00280
00281 Bool_t
00282 TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len)
00283 {
00284
00285 if (IsZombie())
00286 {
00287 Error("ReadBufferAsync", "Cannot read from a zombie file");
00288 return kTRUE;
00289 }
00290
00291 if (! IsOpen())
00292 {
00293 Error("ReadBufferAsync", "Cannot read from a file that is not open");
00294 return kTRUE;
00295 }
00296
00297 StorageAccount::Stamp stats(storageCounter(s_statsARead, "readAsync"));
00298
00299
00300
00301
00302
00303 StorageFactory *f = StorageFactory::get();
00304
00305
00306 if (f->cacheHint() == StorageFactory::CACHE_HINT_APPLICATION)
00307 return kTRUE;
00308
00309
00310
00311
00312
00313 if (len) {
00314
00315
00316 ;
00317 }
00318
00319 IOPosBuffer iov(off, (void *) 0, len ? len : 4096);
00320 if (storage_->prefetch(&iov, 1))
00321 {
00322 stats.tick(len);
00323 return kFALSE;
00324 }
00325
00326
00327
00328 if (f->cacheHint() == StorageFactory::CACHE_HINT_STORAGE)
00329 return kFALSE;
00330
00331
00332 return kTRUE;
00333 }
00334
00335 Bool_t
00336 TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
00337 {
00358 Int_t remaining = nbuf;
00359 Int_t pack_count;
00360
00361 IOSize remaining_buffer_size=0;
00362
00363
00364 for (Int_t i=0; i<nbuf; i++) remaining_buffer_size+=len[i];
00365
00366 char *current_buffer = buf;
00367 Long64_t *current_pos = pos;
00368 Int_t *current_len = len;
00369
00370 ReadRepacker repacker;
00371
00372 while (remaining > 0) {
00373
00374 pack_count = repacker.pack(static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
00375
00376 int real_bytes_processed = repacker.realBytesProcessed();
00377 IOSize io_buffer_used = repacker.bufferUsed();
00378
00379
00380 StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00381 std::vector<IOPosBuffer> &iov = repacker.iov();
00382 IOSize result = storage_->readv(&iov[0], iov.size());
00383 if (result != io_buffer_used) {
00384 return kTRUE;
00385 }
00386 xstats.tick(io_buffer_used);
00387 repacker.unpack(current_buffer);
00388
00389
00390 remaining_buffer_size -= real_bytes_processed;
00391 current_buffer += real_bytes_processed;
00392
00393 current_pos += pack_count;
00394 current_len += pack_count;
00395 remaining -= pack_count;
00396
00397 }
00398 assert(remaining_buffer_size == 0);
00399 return kFALSE;
00400 }
00401
00402 Bool_t
00403 TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
00404 {
00405
00406 if (IsZombie())
00407 {
00408 Error("ReadBuffers", "Cannot read from a zombie file");
00409 return kTRUE;
00410 }
00411
00412 if (! IsOpen())
00413 {
00414 Error("ReadBuffers", "Cannot read from a file that is not open");
00415 return kTRUE;
00416 }
00417
00418
00419
00420 if (buf)
00421 {
00422 return ReadBuffersSync(buf, pos, len, nbuf);
00423 }
00424
00425
00426
00427
00428 void* const nobuf = 0;
00429 Int_t total = 0;
00430 std::vector<IOPosBuffer> iov;
00431 iov.reserve(nbuf);
00432 for (Int_t i = 0; i < nbuf; ++i)
00433 {
00434 iov.push_back(IOPosBuffer(pos[i], nobuf, len[i]));
00435 total += len[i];
00436 }
00437
00438
00439 bool success;
00440 StorageAccount::Stamp astats(storageCounter(s_statsARead, "readAsync"));
00441
00442
00443 success = storage_->prefetch(&iov[0], nbuf);
00444 astats.tick(total);
00445
00446
00447 return success ? kFALSE : TFile::ReadBuffers(buf, pos, len, nbuf);
00448 }
00449
00450 Bool_t
00451 TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len)
00452 {
00453
00454 if (IsZombie())
00455 {
00456 Error("WriteBuffer", "Cannot write to a zombie file");
00457 return kTRUE;
00458 }
00459
00460 if (! IsOpen())
00461 {
00462 Error("WriteBuffer", "Cannot write to a file that is not open");
00463 return kTRUE;
00464 }
00465
00466 if (! fWritable)
00467 {
00468 Error("WriteBuffer", "File is not writable");
00469 return kTRUE;
00470 }
00471
00472 StorageAccount::Stamp stats(storageCounter(s_statsWrite, "write"));
00473 StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, "writeViaCache"));
00474
00475
00476 Int_t st;
00477 switch ((st = WriteBufferViaCache(buf, len)))
00478 {
00479 case 0:
00480
00481 {
00482 StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, "writeActual"));
00483 IOSize n = storage_->xwrite(buf, len);
00484 xstats.tick(n);
00485 stats.tick(n);
00486
00487
00488 return n > 0 ? kFALSE : kTRUE;
00489 }
00490
00491 case 1:
00492 cstats.tick(len);
00493 stats.tick(len);
00494 return kFALSE;
00495
00496 case 2:
00497 default:
00498 Error("WriteBuffer", "Error writing to cache");
00499 return kTRUE;
00500 }
00501 }
00502
00506
00507
00508
00509
00513 Int_t
00514 TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t )
00515 {
00516 StorageAccount::Stamp stats(storageCounter(s_statsOpen, "open"));
00517
00518 if (storage_)
00519 {
00520 storage_->close();
00521 delete storage_;
00522 storage_ = 0;
00523 }
00524
00525 int openFlags = IOFlags::OpenRead;
00526 if (flags & O_WRONLY) openFlags = IOFlags::OpenWrite;
00527 else if (flags & O_RDWR) openFlags |= IOFlags::OpenWrite;
00528 if (flags & O_CREAT) openFlags |= IOFlags::OpenCreate;
00529 if (flags & O_APPEND) openFlags |= IOFlags::OpenAppend;
00530 if (flags & O_EXCL) openFlags |= IOFlags::OpenExclusive;
00531 if (flags & O_TRUNC) openFlags |= IOFlags::OpenTruncate;
00532 if (flags & O_NONBLOCK) openFlags |= IOFlags::OpenNonBlock;
00533
00534 if (! (storage_ = StorageFactory::get()->open(pathname, openFlags)))
00535 {
00536 MakeZombie();
00537 gDirectory = gROOT;
00538 throw cms::Exception("TStorageFactoryFile::SysOpen()")
00539 << "Cannot open file '" << pathname << "'";
00540 }
00541
00542 stats.tick();
00543 return 0;
00544 }
00545
00546 Int_t
00547 TStorageFactoryFile::SysClose(Int_t )
00548 {
00549 StorageAccount::Stamp stats(storageCounter(s_statsClose, "close"));
00550
00551 if (storage_)
00552 {
00553 storage_->close();
00554 delete storage_;
00555 storage_ = 0;
00556 }
00557
00558 stats.tick();
00559 return 0;
00560 }
00561
00562 Long64_t
00563 TStorageFactoryFile::SysSeek(Int_t , Long64_t offset, Int_t whence)
00564 {
00565 StorageAccount::Stamp stats(storageCounter(s_statsSeek, "seek"));
00566 Storage::Relative rel = (whence == SEEK_SET ? Storage::SET
00567 : whence == SEEK_CUR ? Storage::CURRENT
00568 : Storage::END);
00569
00570 offset = storage_->position(offset, rel);
00571 stats.tick();
00572 return offset;
00573 }
00574
00575 Int_t
00576 TStorageFactoryFile::SysSync(Int_t )
00577 {
00578 StorageAccount::Stamp stats(storageCounter(s_statsFlush, "flush"));
00579 storage_->flush();
00580 stats.tick();
00581 return 0;
00582 }
00583
00584 Int_t
00585 TStorageFactoryFile::SysStat(Int_t , Long_t *id, Long64_t *size,
00586 Long_t *flags, Long_t *modtime)
00587 {
00588 StorageAccount::Stamp stats(storageCounter(s_statsStat, "stat"));
00589
00590 *id = ::Hash(fRealName);
00591 *size = storage_->size();
00592 *flags = 0;
00593 *modtime = 0;
00594 stats.tick();
00595 return 0;
00596 }
00597
00598 void
00599 TStorageFactoryFile::ResetErrno(void) const
00600 {
00601 TSystem::ResetErrno();
00602 }