CMS 3D CMS Logo

MultiFileArchive.cc
Go to the documentation of this file.
1 #include <cstdio>
2 #include <sstream>
3 
4 #include "Alignment/Geners/interface/MultiFileArchive.hh"
5 
6 #include "Alignment/Geners/interface/ContiguousCatalog.hh"
7 #include "Alignment/Geners/interface/IOException.hh"
8 #include "Alignment/Geners/interface/WriteOnlyCatalog.hh"
9 #include "Alignment/Geners/interface/streamposIO.hh"
10 #include "Alignment/Geners/interface/uriUtils.hh"
11 
12 namespace gs {
13  MultiFileArchive::MultiFileArchive(const char *basename,
14  const char *mode,
15  const char *ann,
16  const unsigned typicalFileSizeInMB,
17  const unsigned dataFileBufferSize,
18  const unsigned catalogFileBufferSize)
19  : BinaryArchiveBase(basename, mode),
20  filebuf_(nullptr),
21  readbuf_(nullptr),
22  catabuf_(nullptr),
23  annotation_(ann ? std::string(ann) : std::string("")),
24  catalogFileName_(AbsArchive::name() + ".gsbmf"), // binary metafile
25  writeFileURI_("/ / / / / / /\\ \\ \\ \\"),
26  readFileURI_(writeFileURI_),
27  lastpos_(0),
28  jumppos_(0),
29  maxpos_(std::streamoff(1048576LL * typicalFileSizeInMB)),
30  writeFileNumber_(0),
31  catalogMergeLevel_(1),
32  annotationsMerged_(false),
33  streamFlushed_(true) {
34  if (!modeValid())
35  return;
36 
37  try {
38  // Get a new buffer for the output stream
39  if (dataFileBufferSize)
40  filebuf_ = new char[dataFileBufferSize];
41  writeStream_.rdbuf()->pubsetbuf(filebuf_, dataFileBufferSize);
42 
43  // Get a new buffer for the input stream
44  if (dataFileBufferSize)
45  readbuf_ = new char[dataFileBufferSize];
46  separateReadStream_.rdbuf()->pubsetbuf(readbuf_, dataFileBufferSize);
47 
48  // Get a new buffer for the catalog and open the catalog stream.
49  // We may have to rewrite the complete catalog, so remove the flag
50  // std::ios_base::app from the opening mode.
51  if (catalogFileBufferSize)
52  catabuf_ = new char[catalogFileBufferSize];
53  catStream_.rdbuf()->pubsetbuf(catabuf_, catalogFileBufferSize);
54  catStream_.open(catalogFileName_.c_str(), openmode() & ~std::ios_base::app);
55  if (!catStream_.is_open())
56  throw IOOpeningFailure("gs::MultiFileArchive constructor", catalogFileName_);
57 
58  // Can we use a write-only catalog?
59  if (openmode() & std::ios_base::in) {
60  // Reading is allowed. Have to use in-memory catalog.
61  // If the file data already exists, get the catalog in.
62  if (isEmptyFile(catStream_))
63  setCatalog(new ContiguousCatalog());
64  else
65  readCatalog<ContiguousCatalog>();
66  } else {
67  // Yes, we can use a write-only catalog.
68  // Is the catalog file empty? If so, write out
69  // the stuff needed at the beginning of the file.
70  // If not, assume that the necessary stuff is
71  // already there. Note that in this case we will
72  // not be able to add the annotation.
73  if (isEmptyFile(catStream_)) {
74  setCatalog(new WriteOnlyCatalog(catStream_));
75  writeCatalog();
76  } else {
77  catStream_.close();
78  catStream_.clear();
79  catStream_.open(catalogFileName_.c_str(), openmode() | std::ios_base::in);
80  if (!catStream_.is_open())
81  throw IOOpeningFailure("gs::MultiFileArchive constructor", catalogFileName_);
82  readCatalog<WriteOnlyCatalog>();
83  catStream_.seekp(0, std::ios_base::end);
84  }
85  }
86 
87  // Open the write stream
88  if (openmode() & std::ios_base::out) {
89  setupWriteStream();
90  const std::streampos pos1 = writeStream_.tellp();
91  if (maxpos_ < pos1)
92  maxpos_ = pos1;
93  }
94  } catch (std::exception &e) {
95  setCatalog(nullptr);
96  releaseBuffers();
97  errorStream() << e.what();
98  }
99  }
100 
101  void MultiFileArchive::releaseBuffers() {
102  if (writeStream_.is_open())
103  writeStream_.close();
104  if (separateReadStream_.is_open())
105  separateReadStream_.close();
106  if (catStream_.is_open())
107  catStream_.close();
108  catStream_.rdbuf()->pubsetbuf(nullptr, 0);
109  writeStream_.rdbuf()->pubsetbuf(nullptr, 0);
110  separateReadStream_.rdbuf()->pubsetbuf(nullptr, 0);
111  delete[] catabuf_;
112  catabuf_ = nullptr;
113  delete[] readbuf_;
114  readbuf_ = nullptr;
115  delete[] filebuf_;
116  filebuf_ = nullptr;
117  }
118 
119  MultiFileArchive::~MultiFileArchive() {
120  flush();
121  releaseBuffers();
122  }
123 
124  void MultiFileArchive::writeCatalog() {
125  if (isOpen()) {
126  if (!annotationsMerged_) {
127  if (!annotation_.empty())
128  catalogAnnotations_.push_back(annotation_);
129  annotationsMerged_ = true;
130  }
131  const unsigned compress = static_cast<unsigned>(compressionMode());
132  if (!writeBinaryCatalog(catStream_, compress, catalogMergeLevel_, catalogAnnotations_, *catalog())) {
133  std::ostringstream os;
134  os << "In MultiFileArchive::writeCatalog: "
135  << "failed to write catalog data to file " << catalogFileName_;
136  throw IOWriteFailure(os.str());
137  }
138  }
139  }
140 
141  void MultiFileArchive::openWriteStream() {
142  assert(openmode() & std::ios_base::out);
143  assert(!writeStream_.is_open());
144  {
145  std::ostringstream os;
146  os << AbsArchive::name() << '.' << writeFileNumber_ << ".gsbd";
147  writeFileName_ = os.str();
148  }
149  writeFileURI_ = localFileURI(writeFileName_.c_str());
150  openDataFile(writeStream_, writeFileName_.c_str());
151  }
152 
153  std::ostream &MultiFileArchive::plainOutputStream() {
154  if (isOpen()) {
155  assert(openmode() & std::ios_base::out);
156  if (writeStream_.is_open()) {
157  writeStream_.seekp(0, std::ios_base::end);
158  lastpos_ = writeStream_.tellp();
159  if (lastpos_ > maxpos_) {
160  writeStream_.close();
161  // Don't have to clear. "openDataFile" will do it.
162  // writeStream_.clear();
163  ++writeFileNumber_;
164  } else if (injectMetadata()) {
165  jumppos_ = lastpos_;
166  std::streampos catpos(0);
167  write_pod(writeStream_, catpos);
168  lastpos_ = writeStream_.tellp();
169  }
170  }
171  if (!writeStream_.is_open()) {
172  openWriteStream();
173  writeStream_.seekp(0, std::ios_base::end);
174  if (injectMetadata()) {
175  jumppos_ = writeStream_.tellp();
176  std::streampos catpos(0);
177  write_pod(writeStream_, catpos);
178  }
179  lastpos_ = writeStream_.tellp();
180  }
181  streamFlushed_ = false;
182  }
183  return writeStream_;
184  }
185 
186  void MultiFileArchive::flush() {
187  if (isOpen()) {
188  if (!streamFlushed_) {
189  writeStream_.flush();
190  streamFlushed_ = true;
191  }
192 
193  if (openmode() & std::ios_base::out) {
194  if (dynamic_cast<WriteOnlyCatalog *>(catalog()) == nullptr)
195  writeCatalog();
196  catStream_.flush();
197  }
198  }
199  }
200 
201  void MultiFileArchive::setupWriteStream() {
202  if (openmode() & std::ios_base::trunc) {
203  bool removed = true;
204  for (unsigned i = 0; removed; ++i) {
205  std::ostringstream os;
206  os << AbsArchive::name() << '.' << i << ".gsbd";
207  std::string fname = os.str();
208  removed = std::remove(fname.c_str()) == 0;
209  }
210  writeFileNumber_ = 0;
211  } else {
212  unsigned long firstNonExistent = 0;
213  for (;; ++firstNonExistent) {
214  std::ostringstream os;
215  os << AbsArchive::name() << '.' << firstNonExistent << ".gsbd";
216  std::string fname = os.str();
217  std::ifstream f(fname.c_str());
218  if (!f)
219  break;
220  }
221  writeFileNumber_ = firstNonExistent ? firstNonExistent - 1UL : 0UL;
222  }
223  openWriteStream();
224  }
225 
226  std::istream &MultiFileArchive::plainInputStream(const unsigned long long id,
227  unsigned *compressionCode,
228  unsigned long long *length) {
229  std::fstream *readStream = &writeStream_;
230  if (isOpen()) {
231  assert(openmode() & std::ios_base::in);
232  if (!id)
233  throw gs::IOInvalidArgument("In gs::MultiFileArchive::plainInputStream: invalid item id");
234 
235  // If we have a write stream, and if the archive
236  // has one file only, we should be able to retrieve
237  // stream position quickly
238  std::streampos pos(0);
239  if ((openmode() & std::ios_base::out) && writeFileNumber_ == 0UL) {
240  if (!catalog()->retrieveStreampos(id, compressionCode, length, &pos)) {
241  std::ostringstream os;
242  os << "In gs::MultiFileArchive::plainInputStream: "
243  << "failed to locate item with id " << id << "in the catalog stored in file " << catalogFileName_;
244  throw gs::IOInvalidArgument(os.str());
245  }
246  } else {
247  // Here, we have to do a full catalog search
248  std::shared_ptr<const CatalogEntry> sptr = catalog()->retrieveEntry(id);
249  const CatalogEntry *pe = sptr.get();
250  if (!pe) {
251  std::ostringstream os;
252  os << "In gs::MultiFileArchive::plainInputStream: "
253  << "failed to locate item with id " << id << "in the catalog stored in file " << catalogFileName_;
254  throw gs::IOInvalidArgument(os.str());
255  }
256  pos = pe->location().streamPosition();
257  if (pe->location().URI() != writeFileURI_) {
258  updateReadStream(pe->location().URI());
259  readStream = &separateReadStream_;
260  }
261  *compressionCode = pe->compressionCode();
262  *length = pe->itemLength();
263  }
264 
265  // Flush the write stream if it will be used for reading
266  if (readStream == &writeStream_) {
267  assert(writeStream_.is_open());
268  if (!streamFlushed_) {
269  writeStream_.flush();
270  streamFlushed_ = true;
271  }
272  }
273 
274  readStream->seekg(pos);
275  }
276  return *readStream;
277  }
278 
279  void MultiFileArchive::updateReadStream(const std::string &uri) {
280  if (uri == readFileURI_)
281  return;
282 
283  assert(openmode() & std::ios_base::in);
284  if (separateReadStream_.is_open()) {
285  separateReadStream_.close();
286  separateReadStream_.clear();
287  }
288 
289  // We need to get the name of the local file from the URI.
290  // We will assume that it belongs to the archive we are
291  // working with right now.
292  readFileName_ = joinDir1WithName2(AbsArchive::name().c_str(), uri.c_str());
293  separateReadStream_.open(readFileName_.c_str(), std::ios_base::binary | std::ios_base::in);
294  if (!separateReadStream_.is_open())
295  throw IOOpeningFailure("gs::MultiFileArchive::updateReadStream", readFileName_);
296  readFileURI_ = uri;
297  }
298 
299  unsigned long long MultiFileArchive::addToCatalog(const AbsRecord &record,
300  const unsigned compressionCode,
301  const unsigned long long itemLength) {
302  unsigned long long id = 0;
303  if (isOpen()) {
304  id = catalog()->makeEntry(record, compressionCode, itemLength, ItemLocation(lastpos_, writeFileURI_.c_str()));
305  if (id && injectMetadata()) {
306  const CatalogEntry *entry = catalog()->lastEntryMade();
307  assert(entry);
308  writeStream_.seekp(0, std::ios_base::end);
309  std::streampos now = writeStream_.tellp();
310  if (entry->write(writeStream_)) {
311  writeStream_.seekp(jumppos_);
312  write_pod(writeStream_, now);
313  writeStream_.seekp(0, std::ios_base::end);
314  } else
315  id = 0;
316  }
317  }
318  return id;
319  }
320 } // namespace gs
assert(be >=bs)
std::string joinDir1WithName2(const char *fname1, const char *fname2)
Definition: uriUtils.cc:38
bool writeBinaryCatalog(std::ostream &os, const unsigned compressionCode, const unsigned mergeLevel, const std::vector< std::string > &annotations, const AbsCatalog &catalog, const unsigned formatId)
Definition: CatalogIO.cc:4
std::string localFileURI(const char *filename)
Definition: uriUtils.cc:8
double f[11][100]
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:233
string fname
main script
Definition: AbsArchive.cc:46