CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
MultiFileArchive.cc
Go to the documentation of this file.
1 #include <sstream>
2 #include <cstdio>
3 
4 #include "Alignment/Geners/interface/MultiFileArchive.hh"
5 
6 #include "Alignment/Geners/interface/uriUtils.hh"
7 #include "Alignment/Geners/interface/ContiguousCatalog.hh"
8 #include "Alignment/Geners/interface/WriteOnlyCatalog.hh"
9 #include "Alignment/Geners/interface/IOException.hh"
10 #include "Alignment/Geners/interface/streamposIO.hh"
11 
12 namespace gs {
13  MultiFileArchive::MultiFileArchive(const char* basename, const char* mode,
14  const char* ann,
15  const unsigned typicalFileSizeInMB,
16  const unsigned dataFileBufferSize,
17  const unsigned catalogFileBufferSize)
18  : BinaryArchiveBase(basename, mode),
19  filebuf_(0),
20  readbuf_(0),
21  catabuf_(0),
22  annotation_(ann ? std::string(ann) : std::string("")),
23  catalogFileName_(AbsArchive::name() + ".gsbmf"), // binary metafile
24  writeFileURI_("/ / / / / / /\\ \\ \\ \\"),
25  readFileURI_(writeFileURI_),
26  lastpos_(0),
27  jumppos_(0),
28  maxpos_(std::streamoff(1048576LL*typicalFileSizeInMB)),
29  writeFileNumber_(0),
30  catalogMergeLevel_(1),
31  annotationsMerged_(false),
32  streamFlushed_(true)
33  {
34  if (!modeValid()) return;
35 
36  try
37  {
38  // Get a new buffer for the output stream
39  if (dataFileBufferSize)
40  filebuf_ = new char[dataFileBufferSize];
41  writeStream_.rdbuf()->pubsetbuf(filebuf_, dataFileBufferSize);
42 
43  // Get a new buffer for the input stream
44  if (dataFileBufferSize)
45  readbuf_ = new char[dataFileBufferSize];
46  separateReadStream_.rdbuf()->pubsetbuf(readbuf_,dataFileBufferSize);
47 
48  // Get a new buffer for the catalog and open the catalog stream.
49  // We may have to rewrite the complete catalog, so remove the flag
50  // std::ios_base::app from the opening mode.
51  if (catalogFileBufferSize)
52  catabuf_ = new char[catalogFileBufferSize];
53  catStream_.rdbuf()->pubsetbuf(catabuf_, catalogFileBufferSize);
54  catStream_.open(catalogFileName_.c_str(),
55  openmode() & ~std::ios_base::app);
56  if (!catStream_.is_open())
57  throw IOOpeningFailure("gs::MultiFileArchive constructor",
58  catalogFileName_);
59 
60  // Can we use a write-only catalog?
61  if (openmode() & std::ios_base::in)
62  {
63  // Reading is allowed. Have to use in-memory catalog.
64  // If the file data already exists, get the catalog in.
65  if (isEmptyFile(catStream_))
66  setCatalog(new ContiguousCatalog());
67  else
68  readCatalog<ContiguousCatalog>();
69  }
70  else
71  {
72  // Yes, we can use a write-only catalog.
73  // Is the catalog file empty? If so, write out
74  // the stuff needed at the beginning of the file.
75  // If not, assume that the necessary stuff is
76  // already there. Note that in this case we will
77  // not be able to add the annotation.
78  if (isEmptyFile(catStream_))
79  {
80  setCatalog(new WriteOnlyCatalog(catStream_));
81  writeCatalog();
82  }
83  else
84  {
85  catStream_.close();
86  catStream_.clear();
87  catStream_.open(catalogFileName_.c_str(),
88  openmode() | std::ios_base::in);
89  if (!catStream_.is_open()) throw IOOpeningFailure(
90  "gs::MultiFileArchive constructor", catalogFileName_);
91  readCatalog<WriteOnlyCatalog>();
92  catStream_.seekp(0, std::ios_base::end);
93  }
94  }
95 
96  // Open the write stream
97  if (openmode() & std::ios_base::out)
98  {
99  setupWriteStream();
100  const std::streampos pos1 = writeStream_.tellp();
101  if (maxpos_ < pos1)
102  maxpos_ = pos1;
103  }
104  }
105  catch (std::exception& e)
106  {
107  setCatalog(0);
108  releaseBuffers();
109  errorStream() << e.what();
110  }
111  }
112 
113  void MultiFileArchive::releaseBuffers()
114  {
115  if (writeStream_.is_open()) writeStream_.close();
116  if (separateReadStream_.is_open()) separateReadStream_.close();
117  if (catStream_.is_open()) catStream_.close();
118  catStream_.rdbuf()->pubsetbuf(0, 0);
119  writeStream_.rdbuf()->pubsetbuf(0, 0);
120  separateReadStream_.rdbuf()->pubsetbuf(0, 0);
121  delete [] catabuf_; catabuf_ = 0;
122  delete [] readbuf_; readbuf_ = 0;
123  delete [] filebuf_; filebuf_ = 0;
124  }
125 
126  MultiFileArchive::~MultiFileArchive()
127  {
128  flush();
129  releaseBuffers();
130  }
131 
132  void MultiFileArchive::writeCatalog()
133  {
134  if (isOpen())
135  {
136  if (!annotationsMerged_)
137  {
138  if (annotation_.size())
139  catalogAnnotations_.push_back(annotation_);
140  annotationsMerged_ = true;
141  }
142  const unsigned compress = static_cast<unsigned>(compressionMode());
143  if (!writeBinaryCatalog(catStream_, compress, catalogMergeLevel_,
144  catalogAnnotations_, *catalog()))
145  {
146  std::ostringstream os;
147  os << "In MultiFileArchive::writeCatalog: "
148  << "failed to write catalog data to file "
149  << catalogFileName_;
150  throw IOWriteFailure(os.str());
151  }
152  }
153  }
154 
155  void MultiFileArchive::openWriteStream()
156  {
157  assert(openmode() & std::ios_base::out);
158  assert(!writeStream_.is_open());
159  {
160  std::ostringstream os;
161  os << AbsArchive::name() << '.' << writeFileNumber_ << ".gsbd";
162  writeFileName_ = os.str();
163  }
164  writeFileURI_ = localFileURI(writeFileName_.c_str());
165  openDataFile(writeStream_, writeFileName_.c_str());
166  }
167 
168  std::ostream& MultiFileArchive::plainOutputStream()
169  {
170  if (isOpen())
171  {
172  assert(openmode() & std::ios_base::out);
173  if (writeStream_.is_open())
174  {
175  writeStream_.seekp(0, std::ios_base::end);
176  lastpos_ = writeStream_.tellp();
177  if (lastpos_ > maxpos_)
178  {
179  writeStream_.close();
180  // Don't have to clear. "openDataFile" will do it.
181  // writeStream_.clear();
182  ++writeFileNumber_;
183  }
184  else if (injectMetadata())
185  {
186  jumppos_ = lastpos_;
187  std::streampos catpos(0);
188  write_pod(writeStream_, catpos);
189  lastpos_ = writeStream_.tellp();
190  }
191  }
192  if (!writeStream_.is_open())
193  {
194  openWriteStream();
195  writeStream_.seekp(0, std::ios_base::end);
196  if (injectMetadata())
197  {
198  jumppos_ = writeStream_.tellp();
199  std::streampos catpos(0);
200  write_pod(writeStream_, catpos);
201  }
202  lastpos_ = writeStream_.tellp();
203  }
204  streamFlushed_ = false;
205  }
206  return writeStream_;
207  }
208 
209  void MultiFileArchive::flush()
210  {
211  if (isOpen())
212  {
213  if (!streamFlushed_)
214  {
215  writeStream_.flush();
216  streamFlushed_ = true;
217  }
218 
219  if (openmode() & std::ios_base::out)
220  {
221  if (dynamic_cast<WriteOnlyCatalog*>(catalog()) == 0)
222  writeCatalog();
223  catStream_.flush();
224  }
225  }
226  }
227 
228  void MultiFileArchive::setupWriteStream()
229  {
230  if (openmode() & std::ios_base::trunc)
231  {
232  bool removed = true;
233  for (unsigned i=0; removed; ++i)
234  {
235  std::ostringstream os;
236  os << AbsArchive::name() << '.' << i << ".gsbd";
237  std::string fname = os.str();
238  removed = std::remove(fname.c_str()) == 0;
239  }
240  writeFileNumber_ = 0;
241  }
242  else
243  {
244  unsigned long firstNonExistent = 0;
245  for (; ; ++firstNonExistent)
246  {
247  std::ostringstream os;
248  os << AbsArchive::name() << '.' << firstNonExistent << ".gsbd";
249  std::string fname = os.str();
250  std::ifstream f(fname.c_str());
251  if (!f)
252  break;
253  }
254  writeFileNumber_ = firstNonExistent ? firstNonExistent - 1UL : 0UL;
255  }
256  openWriteStream();
257  }
258 
259  std::istream& MultiFileArchive::plainInputStream(
260  const unsigned long long id,
261  unsigned* compressionCode,
262  unsigned long long* length)
263  {
264  std::fstream* readStream = &writeStream_;
265  if (isOpen())
266  {
267  assert(openmode() & std::ios_base::in);
268  if (!id) throw gs::IOInvalidArgument(
269  "In gs::MultiFileArchive::plainInputStream: invalid item id");
270 
271  // If we have a write stream, and if the archive
272  // has one file only, we should be able to retrieve
273  // stream position quickly
274  std::streampos pos(0);
275  if ((openmode() & std::ios_base::out) && writeFileNumber_ == 0UL)
276  {
277  if (!catalog()->retrieveStreampos(
278  id, compressionCode, length, &pos))
279  {
280  std::ostringstream os;
281  os << "In gs::MultiFileArchive::plainInputStream: "
282  << "failed to locate item with id " << id
283  << "in the catalog stored in file " << catalogFileName_;
284  throw gs::IOInvalidArgument(os.str());
285  }
286  }
287  else
288  {
289  // Here, we have to do a full catalog search
290  CPP11_shared_ptr<const CatalogEntry> sptr =
291  catalog()->retrieveEntry(id);
292  const CatalogEntry* pe = sptr.get();
293  if (!pe)
294  {
295  std::ostringstream os;
296  os << "In gs::MultiFileArchive::plainInputStream: "
297  << "failed to locate item with id " << id
298  << "in the catalog stored in file " << catalogFileName_;
299  throw gs::IOInvalidArgument(os.str());
300  }
301  pos = pe->location().streamPosition();
302  if (pe->location().URI() != writeFileURI_)
303  {
304  updateReadStream(pe->location().URI());
305  readStream = &separateReadStream_;
306  }
307  *compressionCode = pe->compressionCode();
308  *length = pe->itemLength();
309  }
310 
311  // Flush the write stream if it will be used for reading
312  if (readStream == &writeStream_)
313  {
314  assert(writeStream_.is_open());
315  if (!streamFlushed_)
316  {
317  writeStream_.flush();
318  streamFlushed_ = true;
319  }
320  }
321 
322  readStream->seekg(pos);
323  }
324  return *readStream;
325  }
326 
327  void MultiFileArchive::updateReadStream(const std::string& uri)
328  {
329  if (uri == readFileURI_)
330  return;
331 
332  assert(openmode() & std::ios_base::in);
333  if (separateReadStream_.is_open())
334  {
335  separateReadStream_.close();
336  separateReadStream_.clear();
337  }
338 
339  // We need to get the name of the local file from the URI.
340  // We will assume that it belongs to the archive we are
341  // working with right now.
342  readFileName_ = joinDir1WithName2(AbsArchive::name().c_str(),
343  uri.c_str());
344  separateReadStream_.open(readFileName_.c_str(), std::ios_base::binary |
346  if (!separateReadStream_.is_open())
347  throw IOOpeningFailure("gs::MultiFileArchive::updateReadStream",
348  readFileName_);
349  readFileURI_ = uri;
350  }
351 
352  unsigned long long MultiFileArchive::addToCatalog(
353  const AbsRecord& record, const unsigned compressionCode,
354  const unsigned long long itemLength)
355  {
356  unsigned long long id = 0;
357  if (isOpen())
358  {
359  id = catalog()->makeEntry(
360  record, compressionCode, itemLength,
361  ItemLocation(lastpos_, writeFileURI_.c_str()));
362  if (id && injectMetadata())
363  {
364  const CatalogEntry* entry = catalog()->lastEntryMade();
365  assert(entry);
366  writeStream_.seekp(0, std::ios_base::end);
367  std::streampos now = writeStream_.tellp();
368  if (entry->write(writeStream_))
369  {
370  writeStream_.seekp(jumppos_);
371  write_pod(writeStream_, now);
372  writeStream_.seekp(0, std::ios_base::end);
373  }
374  else
375  id = 0;
376  }
377  }
378  return id;
379  }
380 }
int i
Definition: DBlmapReader.cc:9
JetCorrectorParameters::Record record
Definition: classes.h:7
assert(m_qm.get())
std::string joinDir1WithName2(const char *fname1, const char *fname2)
Definition: uriUtils.cc:41
bool writeBinaryCatalog(std::ostream &os, const unsigned compressionCode, const unsigned mergeLevel, const std::vector< std::string > &annotations, const AbsCatalog &catalog, const unsigned formatId)
Definition: CatalogIO.cc:4
std::string localFileURI(const char *filename)
Definition: uriUtils.cc:8
double f[11][100]
#define end
Definition: vmac.h:37
string fname
main script
list entry
Definition: mps_splice.py:62
volatile std::atomic< bool > shutdown_flag false