00001 #include "IOPool/TFileAdaptor/interface/TStorageFactoryFile.h"
00002 #include "Utilities/StorageFactory/interface/Storage.h"
00003 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00004 #include "Utilities/StorageFactory/interface/StorageAccount.h"
00005 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
00006 #include "FWCore/ServiceRegistry/interface/Service.h"
00007 #include "FWCore/Utilities/interface/EDMException.h"
00008 #include "ReadRepacker.h"
00009 #include "TFileCacheRead.h"
00010 #include "TSystem.h"
00011 #include "TROOT.h"
00012 #include "TEnv.h"
00013 #include <errno.h>
00014 #include <sys/stat.h>
00015 #include <unistd.h>
00016 #include <fcntl.h>
00017 #include <iostream>
00018 #include <cassert>
00019
00020 #if 0
00021 #include "TTreeCache.h"
00022 #include "TTree.h"
00023
00024 class TTreeCacheDebug : public TTreeCache {
00025 public:
00026 void dump(const char *label, const char *trailer)
00027 {
00028 Long64_t entry = fOwner->GetReadEntry();
00029 std::cerr
00030 << label << ": " << entry << " "
00031 << "{ fEntryMin=" << fEntryMin
00032 << ", fEntryMax=" << fEntryMax
00033 << ", fEntryNext=" << fEntryNext
00034 << ", fZipBytes=" << fZipBytes
00035 << ", fNbranches=" << fNbranches
00036 << ", fNReadOk=" << fNReadOk
00037 << ", fNReadMiss=" << fNReadMiss
00038 << ", fNReadPref=" << fNReadPref
00039 << ", fBranches=" << fBranches
00040 << ", fBrNames=" << fBrNames
00041 << ", fOwner=" << fOwner
00042 << ", fTree=" << fTree
00043 << ", fIsLearning=" << fIsLearning
00044 << ", fIsManual=" << fIsManual
00045 << "; fBufferSizeMin=" << fBufferSizeMin
00046 << ", fBufferSize=" << fBufferSize
00047 << ", fBufferLen=" << fBufferLen
00048 << ", fBytesToPrefetch=" << fBytesToPrefetch
00049 << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
00050 << ", fAsyncReading=" << fAsyncReading
00051 << ", fNseek=" << fNseek
00052 << ", fNtot=" << fNtot
00053 << ", fNb=" << fNb
00054 << ", fSeekSize=" << fSeekSize
00055 << ", fSeek=" << fSeek
00056 << ", fSeekIndex=" << fSeekIndex
00057 << ", fSeekSort=" << fSeekSort
00058 << ", fPos=" << fPos
00059 << ", fSeekLen=" << fSeekLen
00060 << ", fSeekSortLen=" << fSeekSortLen
00061 << ", fSeekPos=" << fSeekPos
00062 << ", fLen=" << fLen
00063 << ", fFile=" << fFile
00064 << ", fBuffer=" << (void *) fBuffer
00065 << ", fIsSorted=" << fIsSorted
00066 << " }\n" << trailer;
00067 }
00068 };
00069 #endif
00070
00071 ClassImp(TStorageFactoryFile)
00072 static StorageAccount::Counter *s_statsCtor = 0;
00073 static StorageAccount::Counter *s_statsOpen = 0;
00074 static StorageAccount::Counter *s_statsClose = 0;
00075 static StorageAccount::Counter *s_statsFlush = 0;
00076 static StorageAccount::Counter *s_statsStat = 0;
00077 static StorageAccount::Counter *s_statsSeek = 0;
00078 static StorageAccount::Counter *s_statsRead = 0;
00079 static StorageAccount::Counter *s_statsCRead = 0;
00080 static StorageAccount::Counter *s_statsCPrefetch = 0;
00081 static StorageAccount::Counter *s_statsARead = 0;
00082 static StorageAccount::Counter *s_statsXRead = 0;
00083 static StorageAccount::Counter *s_statsWrite = 0;
00084 static StorageAccount::Counter *s_statsCWrite = 0;
00085 static StorageAccount::Counter *s_statsXWrite = 0;
00086
00087 static inline StorageAccount::Counter &
00088 storageCounter(StorageAccount::Counter *&c, const char *label)
00089 {
00090 if (! c) c = &StorageAccount::counter("tstoragefile", label);
00091 return *c;
00092 }
00093
00094 TStorageFactoryFile::TStorageFactoryFile(void)
00095 : storage_(0)
00096 {
00097 StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
00098 stats.tick(0);
00099 }
00100
00101
00102
00103
00104
00105 TStorageFactoryFile::TStorageFactoryFile(const char *path,
00106 Option_t *option,
00107 const char *ftitle,
00108 Int_t compress,
00109 Int_t netopt,
00110 Bool_t parallelopen )
00111 : TFile(path, "NET", ftitle, compress),
00112 storage_(0)
00113 {
00114 Initialize(path, option);
00115 }
00116
00117 TStorageFactoryFile::TStorageFactoryFile(const char *path,
00118 Option_t *option ,
00119 const char *ftitle ,
00120 Int_t compress )
00121 : TFile(path, "NET", ftitle, compress),
00122 storage_(0)
00123 {
00124 Initialize(path, option);
00125 }
00126
00127 void
00128 TStorageFactoryFile::Initialize(const char *path,
00129 Option_t *option )
00130 {
00131 StorageAccount::Stamp stats(storageCounter(s_statsCtor, "construct"));
00132
00133
00134
00135
00136
00137
00138 gEnv->SetValue("TFile.AsyncReading", 1);
00139
00140
00141 fOption = option;
00142 fOption.ToUpper();
00143
00144 if (fOption == "NEW")
00145 fOption = "CREATE";
00146
00147 Bool_t create = (fOption == "CREATE");
00148 Bool_t recreate = (fOption == "RECREATE");
00149 Bool_t update = (fOption == "UPDATE");
00150 Bool_t read = (fOption == "READ");
00151
00152 if (!create && !recreate && !update && !read)
00153 {
00154 read = true;
00155 fOption = "READ";
00156 }
00157
00158 if (recreate)
00159 {
00160 if (!gSystem->AccessPathName(path, kFileExists))
00161 gSystem->Unlink(path);
00162
00163 recreate = false;
00164 create = true;
00165 fOption = "CREATE";
00166 }
00167 assert(!recreate);
00168
00169 if (update && gSystem->AccessPathName(path, kFileExists))
00170 {
00171 update = kFALSE;
00172 create = kTRUE;
00173 }
00174
00175 assert(read || update || create);
00176
00177 int openFlags = IOFlags::OpenRead;
00178 if (!read) openFlags |= IOFlags::OpenWrite;
00179 if (create) openFlags |= IOFlags::OpenCreate;
00180
00181
00182
00183 if (! (storage_ = StorageFactory::get()->open(path, openFlags)))
00184 {
00185 MakeZombie();
00186 gDirectory = gROOT;
00187 throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()")
00188 << "Cannot open file '" << path << "'";
00189 }
00190
00191
00192 try {
00193 edm::Service<edm::storage::StatisticsSenderService> statsService;
00194 if (statsService.isAvailable()) {
00195 statsService->setSize(storage_->size());
00196 }
00197 } catch (edm::Exception e) {
00198 if (e.categoryCode() != edm::errors::NotFound) {
00199 throw;
00200 }
00201 }
00202
00203 fRealName = path;
00204 fD = 0;
00205 fWritable = read ? kFALSE : kTRUE;
00206
00207 Init(create);
00208
00209 stats.tick(0);
00210 }
00211
00212 TStorageFactoryFile::~TStorageFactoryFile(void)
00213 {
00214 Close();
00215 delete storage_;
00216 }
00217
00221
00222 Bool_t
00223 TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len)
00224 {
00225
00226
00227 Seek(pos);
00228 return ReadBuffer(buf, len);
00229 }
00230
00231 Bool_t
00232 TStorageFactoryFile::ReadBuffer(char *buf, Int_t len)
00233 {
00234
00235 if (IsZombie())
00236 {
00237 Error("ReadBuffer", "Cannot read from a zombie file");
00238 return kTRUE;
00239 }
00240
00241 if (! IsOpen())
00242 {
00243 Error("ReadBuffer", "Cannot read from a file that is not open");
00244 return kTRUE;
00245 }
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255 StorageAccount::Stamp stats(storageCounter(s_statsRead, "read"));
00256
00257
00258
00259
00260 if (TFileCacheRead *c = GetCacheRead())
00261 {
00262 Long64_t here = GetRelOffset();
00263 Bool_t async = c->IsAsyncReading();
00264
00265 StorageAccount::Stamp cstats(async
00266 ? storageCounter(s_statsCPrefetch, "readPrefetchToCache")
00267 : storageCounter(s_statsCRead, "readViaCache"));
00268
00269 Int_t st = ReadBufferViaCache(async ? 0 : buf, len);
00270
00271 if (st == 2) {
00272 return kTRUE;
00273 }
00274
00275 if (st == 1) {
00276 if (async) {
00277 cstats.tick(len);
00278 Seek(here);
00279 } else {
00280 cstats.tick(len);
00281 stats.tick(len);
00282 return kFALSE;
00283 }
00284 }
00285 }
00286
00287
00288
00289
00290
00291 StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00292 IOSize n = storage_->xread(buf, len);
00293 xstats.tick(n);
00294 stats.tick(n);
00295 return n ? kFALSE : kTRUE;
00296 }
00297
00298 Bool_t
00299 TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len)
00300 {
00301
00302 if (IsZombie())
00303 {
00304 Error("ReadBufferAsync", "Cannot read from a zombie file");
00305 return kTRUE;
00306 }
00307
00308 if (! IsOpen())
00309 {
00310 Error("ReadBufferAsync", "Cannot read from a file that is not open");
00311 return kTRUE;
00312 }
00313
00314 StorageAccount::Stamp stats(storageCounter(s_statsARead, "readAsync"));
00315
00316
00317
00318
00319
00320 StorageFactory *f = StorageFactory::get();
00321
00322
00323 if (f->cacheHint() == StorageFactory::CACHE_HINT_APPLICATION)
00324 return kTRUE;
00325
00326
00327
00328
00329
00330 if (len) {
00331
00332
00333 ;
00334 }
00335
00336 IOPosBuffer iov(off, (void *) 0, len ? len : PREFETCH_PROBE_LENGTH);
00337 if (storage_->prefetch(&iov, 1))
00338 {
00339 stats.tick(len);
00340 return kFALSE;
00341 }
00342
00343
00344
00345 if (f->cacheHint() == StorageFactory::CACHE_HINT_STORAGE)
00346 return kFALSE;
00347
00348
00349 return kTRUE;
00350 }
00351
00352 Bool_t
00353 TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
00354 {
00375 Int_t remaining = nbuf;
00376 Int_t pack_count;
00377
00378 IOSize remaining_buffer_size=0;
00379
00380
00381 for (Int_t i=0; i<nbuf; i++) remaining_buffer_size+=len[i];
00382
00383 char *current_buffer = buf;
00384 Long64_t *current_pos = pos;
00385 Int_t *current_len = len;
00386
00387 ReadRepacker repacker;
00388
00389 while (remaining > 0) {
00390
00391 pack_count = repacker.pack(static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
00392
00393 int real_bytes_processed = repacker.realBytesProcessed();
00394 IOSize io_buffer_used = repacker.bufferUsed();
00395
00396
00397 StorageAccount::Stamp xstats(storageCounter(s_statsXRead, "readActual"));
00398 std::vector<IOPosBuffer> &iov = repacker.iov();
00399 IOSize result = storage_->readv(&iov[0], iov.size());
00400 if (result != io_buffer_used) {
00401 return kTRUE;
00402 }
00403 xstats.tick(io_buffer_used);
00404 repacker.unpack(current_buffer);
00405
00406
00407 remaining_buffer_size -= real_bytes_processed;
00408 current_buffer += real_bytes_processed;
00409
00410 current_pos += pack_count;
00411 current_len += pack_count;
00412 remaining -= pack_count;
00413
00414 }
00415 assert(remaining_buffer_size == 0);
00416 return kFALSE;
00417 }
00418
00419 Bool_t
00420 TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
00421 {
00422
00423 if (IsZombie())
00424 {
00425 Error("ReadBuffers", "Cannot read from a zombie file");
00426 return kTRUE;
00427 }
00428
00429 if (! IsOpen())
00430 {
00431 Error("ReadBuffers", "Cannot read from a file that is not open");
00432 return kTRUE;
00433 }
00434
00435
00436
00437 if (buf)
00438 {
00439 return ReadBuffersSync(buf, pos, len, nbuf);
00440 }
00441
00442
00443
00444
00445 void* const nobuf = 0;
00446 Int_t total = 0;
00447 std::vector<IOPosBuffer> iov;
00448 iov.reserve(nbuf);
00449 for (Int_t i = 0; i < nbuf; ++i)
00450 {
00451 iov.push_back(IOPosBuffer(pos[i], nobuf, len[i]));
00452 total += len[i];
00453 }
00454
00455
00456 bool success;
00457 StorageAccount::Stamp astats(storageCounter(s_statsARead, "readAsync"));
00458
00459
00460 success = storage_->prefetch(&iov[0], nbuf);
00461 astats.tick(total);
00462
00463
00464 return success ? kFALSE : TFile::ReadBuffers(buf, pos, len, nbuf);
00465 }
00466
00467 Bool_t
00468 TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len)
00469 {
00470
00471 if (IsZombie())
00472 {
00473 Error("WriteBuffer", "Cannot write to a zombie file");
00474 return kTRUE;
00475 }
00476
00477 if (! IsOpen())
00478 {
00479 Error("WriteBuffer", "Cannot write to a file that is not open");
00480 return kTRUE;
00481 }
00482
00483 if (! fWritable)
00484 {
00485 Error("WriteBuffer", "File is not writable");
00486 return kTRUE;
00487 }
00488
00489 StorageAccount::Stamp stats(storageCounter(s_statsWrite, "write"));
00490 StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, "writeViaCache"));
00491
00492
00493 Int_t st;
00494 switch ((st = WriteBufferViaCache(buf, len)))
00495 {
00496 case 0:
00497
00498 {
00499 StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, "writeActual"));
00500 IOSize n = storage_->xwrite(buf, len);
00501 xstats.tick(n);
00502 stats.tick(n);
00503
00504
00505 return n > 0 ? kFALSE : kTRUE;
00506 }
00507
00508 case 1:
00509 cstats.tick(len);
00510 stats.tick(len);
00511 return kFALSE;
00512
00513 case 2:
00514 default:
00515 Error("WriteBuffer", "Error writing to cache");
00516 return kTRUE;
00517 }
00518 }
00519
00523
00524
00525
00526
00530 Int_t
00531 TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t )
00532 {
00533 StorageAccount::Stamp stats(storageCounter(s_statsOpen, "open"));
00534
00535 if (storage_)
00536 {
00537 storage_->close();
00538 delete storage_;
00539 storage_ = 0;
00540 }
00541
00542 int openFlags = IOFlags::OpenRead;
00543 if (flags & O_WRONLY) openFlags = IOFlags::OpenWrite;
00544 else if (flags & O_RDWR) openFlags |= IOFlags::OpenWrite;
00545 if (flags & O_CREAT) openFlags |= IOFlags::OpenCreate;
00546 if (flags & O_APPEND) openFlags |= IOFlags::OpenAppend;
00547 if (flags & O_EXCL) openFlags |= IOFlags::OpenExclusive;
00548 if (flags & O_TRUNC) openFlags |= IOFlags::OpenTruncate;
00549 if (flags & O_NONBLOCK) openFlags |= IOFlags::OpenNonBlock;
00550
00551 if (! (storage_ = StorageFactory::get()->open(pathname, openFlags)))
00552 {
00553 MakeZombie();
00554 gDirectory = gROOT;
00555 throw cms::Exception("TStorageFactoryFile::SysOpen()")
00556 << "Cannot open file '" << pathname << "'";
00557 }
00558
00559 stats.tick();
00560 return 0;
00561 }
00562
00563 Int_t
00564 TStorageFactoryFile::SysClose(Int_t )
00565 {
00566 StorageAccount::Stamp stats(storageCounter(s_statsClose, "close"));
00567
00568 if (storage_)
00569 {
00570 storage_->close();
00571 delete storage_;
00572 storage_ = 0;
00573 }
00574
00575 stats.tick();
00576 return 0;
00577 }
00578
00579 Long64_t
00580 TStorageFactoryFile::SysSeek(Int_t , Long64_t offset, Int_t whence)
00581 {
00582 StorageAccount::Stamp stats(storageCounter(s_statsSeek, "seek"));
00583 Storage::Relative rel = (whence == SEEK_SET ? Storage::SET
00584 : whence == SEEK_CUR ? Storage::CURRENT
00585 : Storage::END);
00586
00587 offset = storage_->position(offset, rel);
00588 stats.tick();
00589 return offset;
00590 }
00591
00592 Int_t
00593 TStorageFactoryFile::SysSync(Int_t )
00594 {
00595 StorageAccount::Stamp stats(storageCounter(s_statsFlush, "flush"));
00596 storage_->flush();
00597 stats.tick();
00598 return 0;
00599 }
00600
00601 Int_t
00602 TStorageFactoryFile::SysStat(Int_t , Long_t *id, Long64_t *size,
00603 Long_t *flags, Long_t *modtime)
00604 {
00605 StorageAccount::Stamp stats(storageCounter(s_statsStat, "stat"));
00606
00607 *id = ::Hash(fRealName);
00608 *size = storage_->size();
00609 *flags = 0;
00610 *modtime = 0;
00611 stats.tick();
00612 return 0;
00613 }
00614
00615 void
00616 TStorageFactoryFile::ResetErrno(void) const
00617 {
00618 TSystem::ResetErrno();
00619 }