CMS 3D CMS Logo

StreamSerializer.cc
Go to the documentation of this file.
1 
24 
25 #include "zlib.h"
26 #include "lzma.h"
27 #include "zstd.h"
28 #include <algorithm>
29 #include <cstdlib>
30 #include <iostream>
31 #include <vector>
32 
33 namespace edm::streamer {
34 
39  : selections_(selections), tc_(getTClass(typeid(SendEvent))) {}
40 
48  return serializeRegistry(data_buffer, psetMap);
49  }
50 
52  SendJobHeader::ParameterSetMap const &psetMap) const {
53  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
54  SendJobHeader sd;
55 
56  FDEBUG(9) << "Product List: " << std::endl;
57 
58  for (auto const &selection : *selections_) {
59  sd.push_back(*selection.first);
60  FDEBUG(9) << "StreamOutput got product = " << selection.first->className() << std::endl;
61  }
63  sd.setParameterSetMap(psetMap);
64 
65  data_buffer.rootbuf_.Reset();
66 
67  RootDebug tracer(10, 10);
68 
69  TClass *tc = getTClass(typeid(SendJobHeader));
70  int bres = data_buffer.rootbuf_.WriteObjectAny((char *)&sd, tc);
71 
72  switch (bres) {
73  case 0: // failure
74  {
75  throw cms::Exception("StreamTranslation", "Registry serialization failed")
76  << "StreamSerializer failed to serialize registry\n";
77  break;
78  }
79  case 1: // succcess
80  break;
81  case 2: // truncated result
82  {
83  throw cms::Exception("StreamTranslation", "Registry serialization truncated")
84  << "StreamSerializer module attempted to serialize\n"
85  << "a registry that is to big for the allocated buffers\n";
86  break;
87  }
88  default: // unknown
89  {
90  throw cms::Exception("StreamTranslation", "Registry serialization failed")
91  << "StreamSerializer module got an unknown error code\n"
92  << " while attempting to serialize registry\n";
93  break;
94  }
95  }
96 
97  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
98  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
99  data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
100  // calculate the adler32 checksum and fill it into the struct
101  data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
102  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
103  return data_buffer.curr_space_used_;
104  }
105 
127  EventForOutput const &event,
128  ParameterSetID const &selectorConfig,
129  uint32_t metaDataChecksum,
130  StreamerCompressionAlgo compressionAlgo,
131  int compression_level,
132  unsigned int reserveSize) const {
133  EventSelectionIDVector selectionIDs = event.eventSelectionIDs();
134  selectionIDs.push_back(selectorConfig);
135  SendEvent se(event.eventAuxiliary(),
136  event.processHistory(),
137  selectionIDs,
138  event.branchListIndexes(),
139  {},
140  {},
141  metaDataChecksum);
142 
143  // Loop over EDProducts, fill the provenance, and write.
144 
145  // Historical note. I fixed two bugs in the code below in
146  // March 2017. One would have caused any Parentage written
147  // using the Streamer output module to be total nonsense
148  // prior to the fix. The other would have caused seg faults
149  // when the Parentage was dropped in an earlier process.
150 
151  // FIX ME. The code below stores the direct parentage of
152  // kept products, but it does not save the parentage of
153  // dropped objects that are ancestors of kept products like
154  // the PoolOutputModule. That information is currently
155  // lost when the streamer output module is used.
156 
157  for (auto const &selection : *selections_) {
158  BranchDescription const &desc = *selection.first;
159  BasicHandle result = event.getByToken(selection.second, desc.unwrappedTypeID());
160  if (!result.isValid()) {
161  // No product with this ID was put in the event.
162  // Create and write the provenance.
163  se.products().push_back(StreamedProduct(desc));
164  } else {
165  if (result.provenance()->productProvenance()) {
166  Parentage const *parentage =
167  ParentageRegistry::instance()->getMapped(result.provenance()->productProvenance()->parentageID());
168  assert(parentage);
169  se.products().push_back(
170  StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, &parentage->parents()));
171  } else {
172  se.products().push_back(StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, nullptr));
173  }
174  }
175  }
176  return serializeEventCommon(data_buffer, se, compressionAlgo, compression_level, reserveSize);
177  }
178 
180  const BranchIDLists &branchIDLists,
181  ThinnedAssociationsHelper const &thinnedAssociationsHelper,
182  StreamerCompressionAlgo compressionAlgo,
183  int compression_level,
184  unsigned int reserveSize) const {
185  SendEvent se({}, {}, {}, {}, branchIDLists, thinnedAssociationsHelper, 0);
186 
187  return serializeEventCommon(data_buffer, se, compressionAlgo, compression_level, reserveSize);
188  }
189 
191  edm::SendEvent const &se,
192  StreamerCompressionAlgo compressionAlgo,
193  int compression_level,
194  unsigned int reserveSize) const {
195  data_buffer.rootbuf_.Reset();
196  RootDebug tracer(10, 10);
197 
198  //TClass* tc = getTClass(typeid(SendEvent));
199  int bres = data_buffer.rootbuf_.WriteObjectAny(&se, tc_);
200  switch (bres) {
201  case 0: // failure
202  {
203  throw cms::Exception("StreamTranslation", "Event serialization failed")
204  << "StreamSerializer failed to serialize event: " << se.aux().id();
205  break;
206  }
207  case 1: // succcess
208  break;
209  case 2: // truncated result
210  {
211  throw cms::Exception("StreamTranslation", "Event serialization truncated")
212  << "StreamSerializer module attempted to serialize an event\n"
213  << "that is to big for the allocated buffers: " << se.aux().id();
214  break;
215  }
216  default: // unknown
217  {
218  throw cms::Exception("StreamTranslation", "Event serialization failed")
219  << "StreamSerializer module got an unknown error code\n"
220  << " while attempting to serialize event: " << se.aux().id();
221  break;
222  }
223  }
224 
225  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
226  data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
227 
228 #if 0
229  if(data_buffer.ptr_ != data_.ptr_) {
230  std::cerr << "ROOT reset the buffer!!!!\n";
231  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
232  }
233 #endif
234  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
235  // eventMessage.eventAddr());
236  // eventMessage.setEventLength(rootbuf.Length());
237 
238  // compress before return if we need to
239  // should test if compressed already - should never be?
240  // as double compression can have problems
241  unsigned int dest_size = 0;
242  switch (compressionAlgo) {
243  case ZLIB:
244  dest_size = compressBuffer((unsigned char *)data_buffer.rootbuf_.Buffer(),
245  data_buffer.curr_event_size_,
246  data_buffer.comp_buf_,
248  reserveSize);
249  break;
250  case LZMA:
251  dest_size = compressBufferLZMA((unsigned char *)data_buffer.rootbuf_.Buffer(),
252  data_buffer.curr_event_size_,
253  data_buffer.comp_buf_,
255  reserveSize);
256  break;
257  case ZSTD:
258  dest_size = compressBufferZSTD((unsigned char *)data_buffer.rootbuf_.Buffer(),
259  data_buffer.curr_event_size_,
260  data_buffer.comp_buf_,
262  reserveSize);
263  break;
264  default:
265  dest_size = data_buffer.rootbuf_.Length();
266  if (data_buffer.comp_buf_.size() < dest_size + reserveSize)
267  data_buffer.comp_buf_.resize(dest_size + reserveSize);
268  std::copy((char *)data_buffer.rootbuf_.Buffer(),
269  (char *)data_buffer.rootbuf_.Buffer() + dest_size,
270  (char *)(&data_buffer.comp_buf_[SerializeDataBuffer::reserve_size]));
271  break;
272  };
273 
274  data_buffer.ptr_ = &data_buffer.comp_buf_[reserveSize]; // reset to point at compressed area
275  data_buffer.curr_space_used_ = dest_size;
276 
277  // calculate the adler32 checksum and fill it into the struct
278  data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
279  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
280 
281  return data_buffer.curr_space_used_;
282  }
283 
289  unsigned int StreamSerializer::compressBuffer(unsigned char *inputBuffer,
290  unsigned int inputSize,
291  std::vector<unsigned char> &outputBuffer,
292  int compressionLevel,
293  unsigned int reserveSize) {
294  unsigned int resultSize = 0;
295 
296  // what are these magic numbers? (jbk) -> LSB 3.0 buffer size reccommendation
297  unsigned long dest_size = (unsigned long)(double(inputSize) * 1.002 + 1.0) + 12;
298  //this can has some overhead in memory usage (capacity > size) due to the way std::vector allocator works
299  if (outputBuffer.size() < dest_size + reserveSize)
300  outputBuffer.resize(dest_size + reserveSize);
301 
302  // compression 1-9, 6 is zlib default, 0 none
303  int ret = compress2(&outputBuffer[reserveSize], &dest_size, inputBuffer, inputSize, compressionLevel);
304 
305  // check status
306  if (ret == Z_OK) {
307  // return the correct length
308  resultSize = dest_size;
309 
310  FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
311  << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
312  } else {
313  throw cms::Exception("StreamSerializer", "compressBuffer")
314  << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
315  }
316 
317  return resultSize;
318  }
319 
320  //this is based on ROOT R__zipLZMA
321  unsigned int StreamSerializer::compressBufferLZMA(unsigned char *inputBuffer,
322  unsigned int inputSize,
323  std::vector<unsigned char> &outputBuffer,
324  int compressionLevel,
325  unsigned int reserveSize,
326  bool addHeader) {
327  // what are these magic numbers? (jbk)
328  unsigned int hdr_size = addHeader ? 4 : 0;
329  unsigned long dest_size = (unsigned long)(double(inputSize) * 1.01 + 1.0) + 12;
330  if (outputBuffer.size() < dest_size + reserveSize)
331  outputBuffer.resize(dest_size + reserveSize);
332 
333  // compression 1-9
334  uint32_t dict_size_est = inputSize / 4;
335  lzma_stream stream = LZMA_STREAM_INIT;
336  lzma_options_lzma opt_lzma2;
337  lzma_filter filters[] = {
338  {.id = LZMA_FILTER_LZMA2, .options = &opt_lzma2},
339  {.id = LZMA_VLI_UNKNOWN, .options = nullptr},
340  };
341  lzma_ret returnStatus;
342 
343  unsigned char *tgt = &outputBuffer[reserveSize];
344 
345  //if (*srcsize > 0xffffff || *srcsize < 0) { //16 MB limit ?
346  // return;
347  //}
348 
349  if (compressionLevel > 9)
350  compressionLevel = 9;
351 
352  lzma_bool presetStatus = lzma_lzma_preset(&opt_lzma2, compressionLevel);
353  if (presetStatus) {
354  throw cms::Exception("StreamSerializer", "compressBufferLZMA") << "LZMA preset return status: " << presetStatus;
355  }
356 
357  if (LZMA_DICT_SIZE_MIN > dict_size_est) {
358  dict_size_est = LZMA_DICT_SIZE_MIN;
359  }
360  if (opt_lzma2.dict_size > dict_size_est) {
361  /* reduce the dictionary size if larger than 1/4 the input size, preset
362  dictionaries size can be expensively large
363  */
364  opt_lzma2.dict_size = dict_size_est;
365  }
366 
367  returnStatus =
368  lzma_stream_encoder(&stream,
369  filters,
370  LZMA_CHECK_NONE); //CRC32 and CRC64 are available, but we already calculate adler32
371  if (returnStatus != LZMA_OK) {
372  throw cms::Exception("StreamSerializer", "compressBufferLZMA")
373  << "LZMA compression encoder return value: " << returnStatus;
374  }
375 
376  stream.next_in = (const uint8_t *)inputBuffer;
377  stream.avail_in = (size_t)(inputSize);
378 
379  stream.next_out = (uint8_t *)(&tgt[hdr_size]);
380  stream.avail_out = (size_t)(dest_size - hdr_size);
381 
382  returnStatus = lzma_code(&stream, LZMA_FINISH);
383 
384  if (returnStatus != LZMA_STREAM_END) {
385  lzma_end(&stream);
386  throw cms::Exception("StreamSerializer", "compressBufferLZMA")
387  << "LZMA compression return value: " << returnStatus;
388  }
389  lzma_end(&stream);
390 
391  //Add compression-specific header at the buffer start. This will be used to detect LZMA(2) format after streamer header
392  if (addHeader) {
393  tgt[0] = 'X'; /* Signature of LZMA from XZ Utils */
394  tgt[1] = 'Z';
395  tgt[2] = 0;
396  tgt[3] = 0; //let's put offset to 4, not 3
397  }
398 
399  FDEBUG(1) << " LZMA original size = " << inputSize << " final size = " << stream.total_out
400  << " ratio = " << double(stream.total_out) / double(inputSize) << std::endl;
401 
402  return stream.total_out + hdr_size;
403  }
404 
405  unsigned int StreamSerializer::compressBufferZSTD(unsigned char *inputBuffer,
406  unsigned int inputSize,
407  std::vector<unsigned char> &outputBuffer,
408  int compressionLevel,
409  unsigned int reserveSize,
410  bool addHeader) {
411  unsigned int hdr_size = addHeader ? 4 : 0;
412  unsigned int resultSize = 0;
413 
414  // what are these magic numbers? (jbk) -> LSB 3.0 buffer size reccommendation
415  size_t worst_size = ZSTD_compressBound(inputSize);
416  //this can has some overhead in memory usage (capacity > size) due to the way std::vector allocator works
417  if (outputBuffer.size() < worst_size + reserveSize + hdr_size)
418  outputBuffer.resize(worst_size + reserveSize + hdr_size);
419 
420  //Add compression-specific header at the buffer start. This will be used to detect ZSTD format after streamer header
421  unsigned char *tgt = &outputBuffer[reserveSize];
422  if (addHeader) {
423  tgt[0] = 'Z'; /* Pre */
424  tgt[1] = 'S';
425  tgt[2] = 0;
426  tgt[3] = 0;
427  }
428 
429  // compression 1-20
430  size_t dest_size = ZSTD_compress(
431  (void *)&outputBuffer[reserveSize + hdr_size], worst_size, (void *)inputBuffer, inputSize, compressionLevel);
432 
433  // check status
434  if (!ZSTD_isError(dest_size)) {
435  // return the correct length
436  resultSize = (unsigned int)dest_size + hdr_size;
437 
438  FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
439  << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
440  } else {
441  throw cms::Exception("StreamSerializer", "compressBuffer")
442  << "Compression (ZSTD) Error: " << ZSTD_getErrorName(dest_size);
443  }
444 
445  return resultSize;
446  }
447 
448 } // namespace edm::streamer
std::vector< unsigned char > comp_buf_
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
int serializeEvent(SerializeDataBuffer &data_buffer, EventForOutput const &event, ParameterSetID const &selectorConfig, uint32_t metaDataChecksum, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
StreamSerializer(SelectedProducts const *selections)
ret
prodAgent to be discontinued
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
static unsigned int compressBufferZSTD(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
edm::propagate_const< unsigned char * > ptr_
int serializeEventMetaData(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
data_buffer.adler32_chksum_ is the meta data checksum to pass to subsequent events ...
selection
main part
Definition: corrVsCorr.py:100
static constexpr unsigned int reserve_size
std::vector< BranchID > const & parents() const
Definition: Parentage.h:44
int serializeRegistry(SerializeDataBuffer &data_buffer) const
EventAuxiliary const & aux() const
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
#define FDEBUG(lev)
Definition: DebugMacros.h:19
assert(be >=bs)
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize)
std::vector< TPRegexp > filters
Definition: eve_filter.cc:22
static unsigned int compressBufferLZMA(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
EventID const & id() const
bool getMapped(key_type const &k, value_type &result) const
void fillMap(regmap_type &fillme) const
Definition: Registry.cc:42
unsigned char const * bufferPointer() const
edm::propagate_const< TClass * > tc_
int serializeEventCommon(SerializeDataBuffer &data_buffer, edm::SendEvent const &iEvent, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:63
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
void push_back(BranchDescription const &bd)
static ParentageRegistry * instance()
static Registry * instance()
Definition: Registry.cc:12
Definition: event.py:1
void setParameterSetMap(ParameterSetMap const &psetMap)