4 #include "Alignment/Geners/interface/MultiFileArchive.hh"
6 #include "Alignment/Geners/interface/uriUtils.hh"
7 #include "Alignment/Geners/interface/ContiguousCatalog.hh"
8 #include "Alignment/Geners/interface/WriteOnlyCatalog.hh"
9 #include "Alignment/Geners/interface/IOException.hh"
10 #include "Alignment/Geners/interface/streamposIO.hh"
13 MultiFileArchive::MultiFileArchive(
const char* basename,
const char*
mode,
15 const unsigned typicalFileSizeInMB,
16 const unsigned dataFileBufferSize,
17 const unsigned catalogFileBufferSize)
18 : BinaryArchiveBase(basename, mode),
23 catalogFileName_(AbsArchive::
name() +
".gsbmf"),
24 writeFileURI_(
"/ / / / / / /\\ \\ \\ \\"),
25 readFileURI_(writeFileURI_),
28 maxpos_(std::streamoff(1048576LL*typicalFileSizeInMB)),
30 catalogMergeLevel_(1),
31 annotationsMerged_(
false),
34 if (!modeValid())
return;
39 if (dataFileBufferSize)
40 filebuf_ =
new char[dataFileBufferSize];
41 writeStream_.rdbuf()->pubsetbuf(filebuf_, dataFileBufferSize);
44 if (dataFileBufferSize)
45 readbuf_ =
new char[dataFileBufferSize];
46 separateReadStream_.rdbuf()->pubsetbuf(readbuf_,dataFileBufferSize);
51 if (catalogFileBufferSize)
52 catabuf_ =
new char[catalogFileBufferSize];
53 catStream_.rdbuf()->pubsetbuf(catabuf_, catalogFileBufferSize);
54 catStream_.open(catalogFileName_.c_str(),
55 openmode() & ~std::ios_base::app);
56 if (!catStream_.is_open())
57 throw IOOpeningFailure(
"gs::MultiFileArchive constructor",
65 if (isEmptyFile(catStream_))
66 setCatalog(
new ContiguousCatalog());
68 readCatalog<ContiguousCatalog>();
78 if (isEmptyFile(catStream_))
80 setCatalog(
new WriteOnlyCatalog(catStream_));
87 catStream_.open(catalogFileName_.c_str(),
89 if (!catStream_.is_open())
throw IOOpeningFailure(
90 "gs::MultiFileArchive constructor", catalogFileName_);
91 readCatalog<WriteOnlyCatalog>();
100 const std::streampos pos1 = writeStream_.tellp();
109 errorStream() << e.what();
113 void MultiFileArchive::releaseBuffers()
115 if (writeStream_.is_open()) writeStream_.close();
116 if (separateReadStream_.is_open()) separateReadStream_.close();
117 if (catStream_.is_open()) catStream_.close();
118 catStream_.rdbuf()->pubsetbuf(0, 0);
119 writeStream_.rdbuf()->pubsetbuf(0, 0);
120 separateReadStream_.rdbuf()->pubsetbuf(0, 0);
121 delete [] catabuf_; catabuf_ = 0;
122 delete [] readbuf_; readbuf_ = 0;
123 delete [] filebuf_; filebuf_ = 0;
126 MultiFileArchive::~MultiFileArchive()
132 void MultiFileArchive::writeCatalog()
136 if (!annotationsMerged_)
138 if (annotation_.size())
139 catalogAnnotations_.push_back(annotation_);
140 annotationsMerged_ =
true;
142 const unsigned compress =
static_cast<unsigned>(compressionMode());
144 catalogAnnotations_, *
catalog()))
146 std::ostringstream os;
147 os <<
"In MultiFileArchive::writeCatalog: "
148 <<
"failed to write catalog data to file "
150 throw IOWriteFailure(os.str());
155 void MultiFileArchive::openWriteStream()
158 assert(!writeStream_.is_open());
160 std::ostringstream os;
162 writeFileName_ = os.str();
165 openDataFile(writeStream_, writeFileName_.c_str());
168 std::ostream& MultiFileArchive::plainOutputStream()
173 if (writeStream_.is_open())
176 lastpos_ = writeStream_.tellp();
177 if (lastpos_ > maxpos_)
179 writeStream_.close();
184 else if (injectMetadata())
187 std::streampos catpos(0);
188 write_pod(writeStream_, catpos);
189 lastpos_ = writeStream_.tellp();
192 if (!writeStream_.is_open())
196 if (injectMetadata())
198 jumppos_ = writeStream_.tellp();
199 std::streampos catpos(0);
200 write_pod(writeStream_, catpos);
202 lastpos_ = writeStream_.tellp();
204 streamFlushed_ =
false;
209 void MultiFileArchive::flush()
215 writeStream_.flush();
216 streamFlushed_ =
true;
221 if (dynamic_cast<WriteOnlyCatalog*>(
catalog()) == 0)
228 void MultiFileArchive::setupWriteStream()
233 for (
unsigned i=0; removed; ++
i)
235 std::ostringstream os;
240 writeFileNumber_ = 0;
244 unsigned long firstNonExistent = 0;
245 for (; ; ++firstNonExistent)
247 std::ostringstream os;
250 std::ifstream
f(fname.c_str());
254 writeFileNumber_ = firstNonExistent ? firstNonExistent - 1UL : 0UL;
259 std::istream& MultiFileArchive::plainInputStream(
260 const unsigned long long id,
261 unsigned* compressionCode,
262 unsigned long long* length)
264 std::fstream* readStream = &writeStream_;
268 if (!
id)
throw gs::IOInvalidArgument(
269 "In gs::MultiFileArchive::plainInputStream: invalid item id");
274 std::streampos pos(0);
277 if (!
catalog()->retrieveStreampos(
278 id, compressionCode, length, &pos))
280 std::ostringstream os;
281 os <<
"In gs::MultiFileArchive::plainInputStream: "
282 <<
"failed to locate item with id " <<
id
283 <<
"in the catalog stored in file " << catalogFileName_;
284 throw gs::IOInvalidArgument(os.str());
290 CPP11_shared_ptr<const CatalogEntry> sptr =
292 const CatalogEntry* pe = sptr.get();
295 std::ostringstream os;
296 os <<
"In gs::MultiFileArchive::plainInputStream: "
297 <<
"failed to locate item with id " <<
id
298 <<
"in the catalog stored in file " << catalogFileName_;
299 throw gs::IOInvalidArgument(os.str());
301 pos = pe->location().streamPosition();
302 if (pe->location().URI() != writeFileURI_)
304 updateReadStream(pe->location().URI());
305 readStream = &separateReadStream_;
307 *compressionCode = pe->compressionCode();
308 *length = pe->itemLength();
312 if (readStream == &writeStream_)
314 assert(writeStream_.is_open());
317 writeStream_.flush();
318 streamFlushed_ =
true;
322 readStream->seekg(pos);
327 void MultiFileArchive::updateReadStream(
const std::string& uri)
329 if (uri == readFileURI_)
333 if (separateReadStream_.is_open())
335 separateReadStream_.close();
336 separateReadStream_.clear();
344 separateReadStream_.open(readFileName_.c_str(), std::ios_base::binary |
346 if (!separateReadStream_.is_open())
347 throw IOOpeningFailure(
"gs::MultiFileArchive::updateReadStream",
352 unsigned long long MultiFileArchive::addToCatalog(
353 const AbsRecord&
record,
const unsigned compressionCode,
354 const unsigned long long itemLength)
356 unsigned long long id = 0;
360 record, compressionCode, itemLength,
361 ItemLocation(lastpos_, writeFileURI_.c_str()));
362 if (
id && injectMetadata())
364 const CatalogEntry* entry =
catalog()->lastEntryMade();
367 std::streampos
now = writeStream_.tellp();
368 if (entry->write(writeStream_))
370 writeStream_.seekp(jumppos_);
371 write_pod(writeStream_, now);
std::string joinDir1WithName2(const char *fname1, const char *fname2)
bool writeBinaryCatalog(std::ostream &os, const unsigned compressionCode, const unsigned mergeLevel, const std::vector< std::string > &annotations, const AbsCatalog &catalog, const unsigned formatId)
std::string localFileURI(const char *filename)
volatile std::atomic< bool > shutdown_flag false