4 #include "Alignment/Geners/interface/MultiFileArchive.hh"
6 #include "Alignment/Geners/interface/ContiguousCatalog.hh"
7 #include "Alignment/Geners/interface/IOException.hh"
8 #include "Alignment/Geners/interface/WriteOnlyCatalog.hh"
9 #include "Alignment/Geners/interface/streamposIO.hh"
10 #include "Alignment/Geners/interface/uriUtils.hh"
13 MultiFileArchive::MultiFileArchive(
const char *basename,
16 const unsigned typicalFileSizeInMB,
17 const unsigned dataFileBufferSize,
18 const unsigned catalogFileBufferSize)
19 : BinaryArchiveBase(basename,
mode),
24 catalogFileName_(AbsArchive::
name() +
".gsbmf"),
25 writeFileURI_(
"/ / / / / / /\\ \\ \\ \\"),
26 readFileURI_(writeFileURI_),
29 maxpos_(
std::streamoff(1048576
LL * typicalFileSizeInMB)),
31 catalogMergeLevel_(1),
32 annotationsMerged_(
false),
33 streamFlushed_(
true) {
39 if (dataFileBufferSize)
40 filebuf_ =
new char[dataFileBufferSize];
41 writeStream_.rdbuf()->pubsetbuf(filebuf_, dataFileBufferSize);
44 if (dataFileBufferSize)
45 readbuf_ =
new char[dataFileBufferSize];
46 separateReadStream_.rdbuf()->pubsetbuf(readbuf_, dataFileBufferSize);
51 if (catalogFileBufferSize)
52 catabuf_ =
new char[catalogFileBufferSize];
53 catStream_.rdbuf()->pubsetbuf(catabuf_, catalogFileBufferSize);
54 catStream_.open(catalogFileName_.c_str(), openmode() & ~
std::ios_base::app);
55 if (!catStream_.is_open())
56 throw IOOpeningFailure(
"gs::MultiFileArchive constructor", catalogFileName_);
62 if (isEmptyFile(catStream_))
63 setCatalog(
new ContiguousCatalog());
65 readCatalog<ContiguousCatalog>();
73 if (isEmptyFile(catStream_)) {
74 setCatalog(
new WriteOnlyCatalog(catStream_));
80 if (!catStream_.is_open())
81 throw IOOpeningFailure(
"gs::MultiFileArchive constructor", catalogFileName_);
82 readCatalog<WriteOnlyCatalog>();
90 const std::streampos pos1 = writeStream_.tellp();
97 errorStream() <<
e.what();
101 void MultiFileArchive::releaseBuffers() {
102 if (writeStream_.is_open())
103 writeStream_.close();
104 if (separateReadStream_.is_open())
105 separateReadStream_.close();
106 if (catStream_.is_open())
108 catStream_.rdbuf()->pubsetbuf(
nullptr, 0);
109 writeStream_.rdbuf()->pubsetbuf(
nullptr, 0);
110 separateReadStream_.rdbuf()->pubsetbuf(
nullptr, 0);
119 MultiFileArchive::~MultiFileArchive() {
124 void MultiFileArchive::writeCatalog() {
126 if (!annotationsMerged_) {
127 if (!annotation_.empty())
128 catalogAnnotations_.push_back(annotation_);
129 annotationsMerged_ =
true;
131 const unsigned compress = static_cast<unsigned>(compressionMode());
133 std::ostringstream os;
134 os <<
"In MultiFileArchive::writeCatalog: "
135 <<
"failed to write catalog data to file " << catalogFileName_;
136 throw IOWriteFailure(os.str());
141 void MultiFileArchive::openWriteStream() {
143 assert(!writeStream_.is_open());
145 std::ostringstream os;
147 writeFileName_ = os.str();
150 openDataFile(writeStream_, writeFileName_.c_str());
153 std::ostream &MultiFileArchive::plainOutputStream() {
156 if (writeStream_.is_open()) {
158 lastpos_ = writeStream_.tellp();
159 if (lastpos_ > maxpos_) {
160 writeStream_.close();
164 }
else if (injectMetadata()) {
166 std::streampos catpos(0);
167 write_pod(writeStream_, catpos);
168 lastpos_ = writeStream_.tellp();
171 if (!writeStream_.is_open()) {
174 if (injectMetadata()) {
175 jumppos_ = writeStream_.tellp();
176 std::streampos catpos(0);
177 write_pod(writeStream_, catpos);
179 lastpos_ = writeStream_.tellp();
181 streamFlushed_ =
false;
186 void MultiFileArchive::flush() {
188 if (!streamFlushed_) {
189 writeStream_.flush();
190 streamFlushed_ =
true;
194 if (dynamic_cast<WriteOnlyCatalog *>(
catalog()) ==
nullptr)
201 void MultiFileArchive::setupWriteStream() {
204 for (
unsigned i = 0; removed; ++
i) {
205 std::ostringstream os;
210 writeFileNumber_ = 0;
212 unsigned long firstNonExistent = 0;
213 for (;; ++firstNonExistent) {
214 std::ostringstream os;
217 std::ifstream
f(
fname.c_str());
221 writeFileNumber_ = firstNonExistent ? firstNonExistent - 1UL : 0UL;
226 std::istream &MultiFileArchive::plainInputStream(
const unsigned long long id,
227 unsigned *compressionCode,
228 unsigned long long *length) {
229 std::fstream *readStream = &writeStream_;
233 throw gs::IOInvalidArgument(
"In gs::MultiFileArchive::plainInputStream: invalid item id");
238 std::streampos
pos(0);
240 if (!
catalog()->retrieveStreampos(
id, compressionCode, length, &
pos)) {
241 std::ostringstream os;
242 os <<
"In gs::MultiFileArchive::plainInputStream: "
243 <<
"failed to locate item with id " <<
id <<
"in the catalog stored in file " << catalogFileName_;
244 throw gs::IOInvalidArgument(os.str());
248 std::shared_ptr<const CatalogEntry> sptr =
catalog()->retrieveEntry(
id);
249 const CatalogEntry *pe = sptr.get();
251 std::ostringstream os;
252 os <<
"In gs::MultiFileArchive::plainInputStream: "
253 <<
"failed to locate item with id " <<
id <<
"in the catalog stored in file " << catalogFileName_;
254 throw gs::IOInvalidArgument(os.str());
256 pos = pe->location().streamPosition();
257 if (pe->location().URI() != writeFileURI_) {
258 updateReadStream(pe->location().URI());
259 readStream = &separateReadStream_;
261 *compressionCode = pe->compressionCode();
262 *length = pe->itemLength();
266 if (readStream == &writeStream_) {
267 assert(writeStream_.is_open());
268 if (!streamFlushed_) {
269 writeStream_.flush();
270 streamFlushed_ =
true;
274 readStream->seekg(
pos);
279 void MultiFileArchive::updateReadStream(
const std::string &uri) {
280 if (uri == readFileURI_)
284 if (separateReadStream_.is_open()) {
285 separateReadStream_.close();
286 separateReadStream_.clear();
293 separateReadStream_.open(readFileName_.c_str(), std::ios_base::binary |
std::ios_base::in);
294 if (!separateReadStream_.is_open())
295 throw IOOpeningFailure(
"gs::MultiFileArchive::updateReadStream", readFileName_);
299 unsigned long long MultiFileArchive::addToCatalog(
const AbsRecord &
record,
300 const unsigned compressionCode,
301 const unsigned long long itemLength) {
302 unsigned long long id = 0;
304 id =
catalog()->makeEntry(
record, compressionCode, itemLength, ItemLocation(lastpos_, writeFileURI_.c_str()));
305 if (
id && injectMetadata()) {
309 std::streampos
now = writeStream_.tellp();
310 if (
entry->write(writeStream_)) {
311 writeStream_.seekp(jumppos_);
312 write_pod(writeStream_,
now);