CMS 3D CMS Logo

StreamSerializer.cc
Go to the documentation of this file.
1 
24 
25 #include "zlib.h"
26 #include "lzma.h"
27 #include "zstd.h"
28 #include <algorithm>
29 #include <cstdlib>
30 #include <iostream>
31 #include <vector>
32 
33 namespace edm {
34 
39  : selections_(selections), tc_(getTClass(typeid(SendEvent))) {}
40 
46  const BranchIDLists &branchIDLists,
47  ThinnedAssociationsHelper const &thinnedAssociationsHelper) {
50  return serializeRegistry(data_buffer, branchIDLists, thinnedAssociationsHelper, psetMap);
51  }
52 
54  const BranchIDLists &branchIDLists,
55  ThinnedAssociationsHelper const &thinnedAssociationsHelper,
56  SendJobHeader::ParameterSetMap const &psetMap) {
57  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
58  SendJobHeader sd;
59 
60  FDEBUG(9) << "Product List: " << std::endl;
61 
62  for (auto const &selection : *selections_) {
63  sd.push_back(*selection.first);
64  FDEBUG(9) << "StreamOutput got product = " << selection.first->className() << std::endl;
65  }
67  sd.setBranchIDLists(branchIDLists);
68  sd.setThinnedAssociationsHelper(thinnedAssociationsHelper);
69  sd.setParameterSetMap(psetMap);
70 
71  data_buffer.rootbuf_.Reset();
72 
73  RootDebug tracer(10, 10);
74 
75  TClass *tc = getTClass(typeid(SendJobHeader));
76  int bres = data_buffer.rootbuf_.WriteObjectAny((char *)&sd, tc);
77 
78  switch (bres) {
79  case 0: // failure
80  {
81  throw cms::Exception("StreamTranslation", "Registry serialization failed")
82  << "StreamSerializer failed to serialize registry\n";
83  break;
84  }
85  case 1: // succcess
86  break;
87  case 2: // truncated result
88  {
89  throw cms::Exception("StreamTranslation", "Registry serialization truncated")
90  << "StreamSerializer module attempted to serialize\n"
91  << "a registry that is to big for the allocated buffers\n";
92  break;
93  }
94  default: // unknown
95  {
96  throw cms::Exception("StreamTranslation", "Registry serialization failed")
97  << "StreamSerializer module got an unknown error code\n"
98  << " while attempting to serialize registry\n";
99  break;
100  }
101  }
102 
103  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
104  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
105  data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
106  // calculate the adler32 checksum and fill it into the struct
107  data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
108  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
109  return data_buffer.curr_space_used_;
110  }
111 
133  EventForOutput const &event,
134  ParameterSetID const &selectorConfig,
135  StreamerCompressionAlgo compressionAlgo,
136  int compression_level,
137  unsigned int reserveSize) const {
138  EventSelectionIDVector selectionIDs = event.eventSelectionIDs();
139  selectionIDs.push_back(selectorConfig);
140  SendEvent se(event.eventAuxiliary(), event.processHistory(), selectionIDs, event.branchListIndexes());
141 
142  // Loop over EDProducts, fill the provenance, and write.
143 
144  // Historical note. I fixed two bugs in the code below in
145  // March 2017. One would have caused any Parentage written
146  // using the Streamer output module to be total nonsense
147  // prior to the fix. The other would have caused seg faults
148  // when the Parentage was dropped in an earlier process.
149 
150  // FIX ME. The code below stores the direct parentage of
151  // kept products, but it does not save the parentage of
152  // dropped objects that are ancestors of kept products like
153  // the PoolOutputModule. That information is currently
154  // lost when the streamer output module is used.
155 
156  for (auto const &selection : *selections_) {
157  BranchDescription const &desc = *selection.first;
158  BasicHandle result = event.getByToken(selection.second, desc.unwrappedTypeID());
159  if (!result.isValid()) {
160  // No product with this ID was put in the event.
161  // Create and write the provenance.
162  se.products().push_back(StreamedProduct(desc));
163  } else {
164  if (result.provenance()->productProvenance()) {
165  Parentage const *parentage =
166  ParentageRegistry::instance()->getMapped(result.provenance()->productProvenance()->parentageID());
167  assert(parentage);
168  se.products().push_back(
169  StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, &parentage->parents()));
170  } else {
171  se.products().push_back(StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, nullptr));
172  }
173  }
174  }
175 
176  data_buffer.rootbuf_.Reset();
177  RootDebug tracer(10, 10);
178 
179  //TClass* tc = getTClass(typeid(SendEvent));
180  int bres = data_buffer.rootbuf_.WriteObjectAny(&se, tc_);
181  switch (bres) {
182  case 0: // failure
183  {
184  throw cms::Exception("StreamTranslation", "Event serialization failed")
185  << "StreamSerializer failed to serialize event: " << event.id();
186  break;
187  }
188  case 1: // succcess
189  break;
190  case 2: // truncated result
191  {
192  throw cms::Exception("StreamTranslation", "Event serialization truncated")
193  << "StreamSerializer module attempted to serialize an event\n"
194  << "that is to big for the allocated buffers: " << event.id();
195  break;
196  }
197  default: // unknown
198  {
199  throw cms::Exception("StreamTranslation", "Event serialization failed")
200  << "StreamSerializer module got an unknown error code\n"
201  << " while attempting to serialize event: " << event.id();
202  break;
203  }
204  }
205 
206  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
207  data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
208 
209 #if 0
210  if(data_buffer.ptr_ != data_.ptr_) {
211  std::cerr << "ROOT reset the buffer!!!!\n";
212  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
213  }
214 #endif
215  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
216  // eventMessage.eventAddr());
217  // eventMessage.setEventLength(rootbuf.Length());
218 
219  // compress before return if we need to
220  // should test if compressed already - should never be?
221  // as double compression can have problems
222  unsigned int dest_size = 0;
223  switch (compressionAlgo) {
224  case ZLIB:
225  dest_size = compressBuffer((unsigned char *)data_buffer.rootbuf_.Buffer(),
226  data_buffer.curr_event_size_,
227  data_buffer.comp_buf_,
229  reserveSize);
230  break;
231  case LZMA:
232  dest_size = compressBufferLZMA((unsigned char *)data_buffer.rootbuf_.Buffer(),
233  data_buffer.curr_event_size_,
234  data_buffer.comp_buf_,
236  reserveSize);
237  break;
238  case ZSTD:
239  dest_size = compressBufferZSTD((unsigned char *)data_buffer.rootbuf_.Buffer(),
240  data_buffer.curr_event_size_,
241  data_buffer.comp_buf_,
243  reserveSize);
244  break;
245  default:
246  dest_size = data_buffer.rootbuf_.Length();
247  if (data_buffer.comp_buf_.size() < dest_size + reserveSize)
248  data_buffer.comp_buf_.resize(dest_size + reserveSize);
249  std::copy((char *)data_buffer.rootbuf_.Buffer(),
250  (char *)data_buffer.rootbuf_.Buffer() + dest_size,
251  (char *)(&data_buffer.comp_buf_[SerializeDataBuffer::reserve_size]));
252  break;
253  };
254 
255  data_buffer.ptr_ = &data_buffer.comp_buf_[reserveSize]; // reset to point at compressed area
256  data_buffer.curr_space_used_ = dest_size;
257 
258  // calculate the adler32 checksum and fill it into the struct
259  data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
260  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
261 
262  return data_buffer.curr_space_used_;
263  }
264 
270  unsigned int StreamSerializer::compressBuffer(unsigned char *inputBuffer,
271  unsigned int inputSize,
272  std::vector<unsigned char> &outputBuffer,
273  int compressionLevel,
274  unsigned int reserveSize) {
275  unsigned int resultSize = 0;
276 
277  // what are these magic numbers? (jbk) -> LSB 3.0 buffer size reccommendation
278  unsigned long dest_size = (unsigned long)(double(inputSize) * 1.002 + 1.0) + 12;
279  //this can has some overhead in memory usage (capacity > size) due to the way std::vector allocator works
280  if (outputBuffer.size() < dest_size + reserveSize)
281  outputBuffer.resize(dest_size + reserveSize);
282 
283  // compression 1-9, 6 is zlib default, 0 none
284  int ret = compress2(&outputBuffer[reserveSize], &dest_size, inputBuffer, inputSize, compressionLevel);
285 
286  // check status
287  if (ret == Z_OK) {
288  // return the correct length
289  resultSize = dest_size;
290 
291  FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
292  << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
293  } else {
294  throw cms::Exception("StreamSerializer", "compressBuffer")
295  << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
296  }
297 
298  return resultSize;
299  }
300 
301  //this is based on ROOT R__zipLZMA
302  unsigned int StreamSerializer::compressBufferLZMA(unsigned char *inputBuffer,
303  unsigned int inputSize,
304  std::vector<unsigned char> &outputBuffer,
305  int compressionLevel,
306  unsigned int reserveSize,
307  bool addHeader) {
308  // what are these magic numbers? (jbk)
309  unsigned int hdr_size = addHeader ? 4 : 0;
310  unsigned long dest_size = (unsigned long)(double(inputSize) * 1.01 + 1.0) + 12;
311  if (outputBuffer.size() < dest_size + reserveSize)
312  outputBuffer.resize(dest_size + reserveSize);
313 
314  // compression 1-9
315  uint32_t dict_size_est = inputSize / 4;
316  lzma_stream stream = LZMA_STREAM_INIT;
317  lzma_options_lzma opt_lzma2;
318  lzma_filter filters[] = {
319  {.id = LZMA_FILTER_LZMA2, .options = &opt_lzma2},
320  {.id = LZMA_VLI_UNKNOWN, .options = nullptr},
321  };
322  lzma_ret returnStatus;
323 
324  unsigned char *tgt = &outputBuffer[reserveSize];
325 
326  //if (*srcsize > 0xffffff || *srcsize < 0) { //16 MB limit ?
327  // return;
328  //}
329 
330  if (compressionLevel > 9)
331  compressionLevel = 9;
332 
333  lzma_bool presetStatus = lzma_lzma_preset(&opt_lzma2, compressionLevel);
334  if (presetStatus) {
335  throw cms::Exception("StreamSerializer", "compressBufferLZMA") << "LZMA preset return status: " << presetStatus;
336  }
337 
338  if (LZMA_DICT_SIZE_MIN > dict_size_est) {
339  dict_size_est = LZMA_DICT_SIZE_MIN;
340  }
341  if (opt_lzma2.dict_size > dict_size_est) {
342  /* reduce the dictionary size if larger than 1/4 the input size, preset
343  dictionaries size can be expensively large
344  */
345  opt_lzma2.dict_size = dict_size_est;
346  }
347 
348  returnStatus =
349  lzma_stream_encoder(&stream,
350  filters,
351  LZMA_CHECK_NONE); //CRC32 and CRC64 are available, but we already calculate adler32
352  if (returnStatus != LZMA_OK) {
353  throw cms::Exception("StreamSerializer", "compressBufferLZMA")
354  << "LZMA compression encoder return value: " << returnStatus;
355  }
356 
357  stream.next_in = (const uint8_t *)inputBuffer;
358  stream.avail_in = (size_t)(inputSize);
359 
360  stream.next_out = (uint8_t *)(&tgt[hdr_size]);
361  stream.avail_out = (size_t)(dest_size - hdr_size);
362 
363  returnStatus = lzma_code(&stream, LZMA_FINISH);
364 
365  if (returnStatus != LZMA_STREAM_END) {
366  lzma_end(&stream);
367  throw cms::Exception("StreamSerializer", "compressBufferLZMA")
368  << "LZMA compression return value: " << returnStatus;
369  }
370  lzma_end(&stream);
371 
372  //Add compression-specific header at the buffer start. This will be used to detect LZMA(2) format after streamer header
373  if (addHeader) {
374  tgt[0] = 'X'; /* Signature of LZMA from XZ Utils */
375  tgt[1] = 'Z';
376  tgt[2] = 0;
377  tgt[3] = 0; //let's put offset to 4, not 3
378  }
379 
380  FDEBUG(1) << " LZMA original size = " << inputSize << " final size = " << stream.total_out
381  << " ratio = " << double(stream.total_out) / double(inputSize) << std::endl;
382 
383  return stream.total_out + hdr_size;
384  }
385 
386  unsigned int StreamSerializer::compressBufferZSTD(unsigned char *inputBuffer,
387  unsigned int inputSize,
388  std::vector<unsigned char> &outputBuffer,
389  int compressionLevel,
390  unsigned int reserveSize,
391  bool addHeader) {
392  unsigned int hdr_size = addHeader ? 4 : 0;
393  unsigned int resultSize = 0;
394 
395  // what are these magic numbers? (jbk) -> LSB 3.0 buffer size reccommendation
396  size_t worst_size = ZSTD_compressBound(inputSize);
397  //this can has some overhead in memory usage (capacity > size) due to the way std::vector allocator works
398  if (outputBuffer.size() < worst_size + reserveSize + hdr_size)
399  outputBuffer.resize(worst_size + reserveSize + hdr_size);
400 
401  //Add compression-specific header at the buffer start. This will be used to detect ZSTD format after streamer header
402  unsigned char *tgt = &outputBuffer[reserveSize];
403  if (addHeader) {
404  tgt[0] = 'Z'; /* Pre */
405  tgt[1] = 'S';
406  tgt[2] = 0;
407  tgt[3] = 0;
408  }
409 
410  // compression 1-20
411  size_t dest_size = ZSTD_compress(
412  (void *)&outputBuffer[reserveSize + hdr_size], worst_size, (void *)inputBuffer, inputSize, compressionLevel);
413 
414  // check status
415  if (!ZSTD_isError(dest_size)) {
416  // return the correct length
417  resultSize = (unsigned int)dest_size + hdr_size;
418 
419  FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
420  << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
421  } else {
422  throw cms::Exception("StreamSerializer", "compressBuffer")
423  << "Compression (ZSTD) Error: " << ZSTD_getErrorName(dest_size);
424  }
425 
426  return resultSize;
427  }
428 
429 } // namespace edm
static unsigned int compressBufferZSTD(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
void setThinnedAssociationsHelper(ThinnedAssociationsHelper const &v)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
ret
prodAgent to be discontinued
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
selection
main part
Definition: corrVsCorr.py:100
std::vector< BranchID > const & parents() const
Definition: Parentage.h:44
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
#define FDEBUG(lev)
Definition: DebugMacros.h:19
assert(be >=bs)
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:63
std::vector< TPRegexp > filters
Definition: eve_filter.cc:22
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
bool getMapped(key_type const &k, value_type &result) const
std::vector< unsigned char > comp_buf_
unsigned int curr_event_size_
int serializeEvent(SerializeDataBuffer &data_buffer, EventForOutput const &event, ParameterSetID const &selectorConfig, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
void fillMap(regmap_type &fillme) const
Definition: Registry.cc:42
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
unsigned char const * bufferPointer() const
unsigned int curr_space_used_
static unsigned int compressBufferLZMA(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize)
HLT enums.
void setBranchIDLists(BranchIDLists const &bidlists)
static constexpr unsigned int reserve_size
void push_back(BranchDescription const &bd)
StreamerCompressionAlgo
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12
Definition: event.py:1
void setParameterSetMap(ParameterSetMap const &psetMap)
edm::propagate_const< unsigned char * > ptr_
StreamSerializer(SelectedProducts const *selections)
edm::propagate_const< TClass * > tc_