CMS 3D CMS Logo

StreamSerializer.cc
Go to the documentation of this file.
1 
24 
25 #include "zlib.h"
26 #include "lzma.h"
27 #include "zstd.h"
28 #include <algorithm>
29 #include <cstdlib>
30 #include <iostream>
31 #include <vector>
32 
33 namespace edm {
34 
39  : selections_(selections), tc_(getTClass(typeid(SendEvent))) {}
40 
47  const BranchIDLists &branchIDLists,
48  ThinnedAssociationsHelper const &thinnedAssociationsHelper) {
49  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
51 
52  FDEBUG(9) << "Product List: " << std::endl;
53 
54  for (auto const &selection : *selections_) {
55  sd.push_back(*selection.first);
56  FDEBUG(9) << "StreamOutput got product = " << selection.first->className() << std::endl;
57  }
59  sd.setBranchIDLists(branchIDLists);
60  sd.setThinnedAssociationsHelper(thinnedAssociationsHelper);
62 
64  sd.setParameterSetMap(psetMap);
65 
66  data_buffer.rootbuf_.Reset();
67 
68  RootDebug tracer(10, 10);
69 
70  TClass *tc = getTClass(typeid(SendJobHeader));
71  int bres = data_buffer.rootbuf_.WriteObjectAny((char *)&sd, tc);
72 
73  switch (bres) {
74  case 0: // failure
75  {
76  throw cms::Exception("StreamTranslation", "Registry serialization failed")
77  << "StreamSerializer failed to serialize registry\n";
78  break;
79  }
80  case 1: // succcess
81  break;
82  case 2: // truncated result
83  {
84  throw cms::Exception("StreamTranslation", "Registry serialization truncated")
85  << "StreamSerializer module attempted to serialize\n"
86  << "a registry that is to big for the allocated buffers\n";
87  break;
88  }
89  default: // unknown
90  {
91  throw cms::Exception("StreamTranslation", "Registry serialization failed")
92  << "StreamSerializer module got an unknown error code\n"
93  << " while attempting to serialize registry\n";
94  break;
95  }
96  }
97 
98  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
99  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
100  data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
101  // calculate the adler32 checksum and fill it into the struct
102  data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
103  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
104  return data_buffer.curr_space_used_;
105  }
106 
128  EventForOutput const &event,
129  ParameterSetID const &selectorConfig,
130  StreamerCompressionAlgo compressionAlgo,
131  int compression_level,
132  unsigned int reserveSize) const {
133  EventSelectionIDVector selectionIDs = event.eventSelectionIDs();
134  selectionIDs.push_back(selectorConfig);
135  SendEvent se(event.eventAuxiliary(), event.processHistory(), selectionIDs, event.branchListIndexes());
136 
137  // Loop over EDProducts, fill the provenance, and write.
138 
139  // Historical note. I fixed two bugs in the code below in
140  // March 2017. One would have caused any Parentage written
141  // using the Streamer output module to be total nonsense
142  // prior to the fix. The other would have caused seg faults
143  // when the Parentage was dropped in an earlier process.
144 
145  // FIX ME. The code below stores the direct parentage of
146  // kept products, but it does not save the parentage of
147  // dropped objects that are ancestors of kept products like
148  // the PoolOutputModule. That information is currently
149  // lost when the streamer output module is used.
150 
151  for (auto const &selection : *selections_) {
152  BranchDescription const &desc = *selection.first;
153  BasicHandle result = event.getByToken(selection.second, desc.unwrappedTypeID());
154  if (!result.isValid()) {
155  // No product with this ID was put in the event.
156  // Create and write the provenance.
157  se.products().push_back(StreamedProduct(desc));
158  } else {
159  if (result.provenance()->productProvenance()) {
160  Parentage const *parentage =
162  assert(parentage);
163  se.products().push_back(
164  StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, &parentage->parents()));
165  } else {
166  se.products().push_back(StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, nullptr));
167  }
168  }
169  }
170 
171  data_buffer.rootbuf_.Reset();
172  RootDebug tracer(10, 10);
173 
174  //TClass* tc = getTClass(typeid(SendEvent));
175  int bres = data_buffer.rootbuf_.WriteObjectAny(&se, tc_);
176  switch (bres) {
177  case 0: // failure
178  {
179  throw cms::Exception("StreamTranslation", "Event serialization failed")
180  << "StreamSerializer failed to serialize event: " << event.id();
181  break;
182  }
183  case 1: // succcess
184  break;
185  case 2: // truncated result
186  {
187  throw cms::Exception("StreamTranslation", "Event serialization truncated")
188  << "StreamSerializer module attempted to serialize an event\n"
189  << "that is to big for the allocated buffers: " << event.id();
190  break;
191  }
192  default: // unknown
193  {
194  throw cms::Exception("StreamTranslation", "Event serialization failed")
195  << "StreamSerializer module got an unknown error code\n"
196  << " while attempting to serialize event: " << event.id();
197  break;
198  }
199  }
200 
201  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
202  data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
203 
204 #if 0
205  if(data_buffer.ptr_ != data_.ptr_) {
206  std::cerr << "ROOT reset the buffer!!!!\n";
207  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
208  }
209 #endif
210  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
211  // eventMessage.eventAddr());
212  // eventMessage.setEventLength(rootbuf.Length());
213 
214  // compress before return if we need to
215  // should test if compressed already - should never be?
216  // as double compression can have problems
217  unsigned int dest_size = 0;
218  switch (compressionAlgo) {
219  case ZLIB:
220  dest_size = compressBuffer((unsigned char *)data_buffer.rootbuf_.Buffer(),
221  data_buffer.curr_event_size_,
222  data_buffer.comp_buf_,
224  reserveSize);
225  break;
226  case LZMA:
227  dest_size = compressBufferLZMA((unsigned char *)data_buffer.rootbuf_.Buffer(),
228  data_buffer.curr_event_size_,
229  data_buffer.comp_buf_,
231  reserveSize);
232  break;
233  case ZSTD:
234  dest_size = compressBufferZSTD((unsigned char *)data_buffer.rootbuf_.Buffer(),
235  data_buffer.curr_event_size_,
236  data_buffer.comp_buf_,
238  reserveSize);
239  break;
240  default:
241  dest_size = data_buffer.rootbuf_.Length();
242  if (data_buffer.comp_buf_.size() < dest_size + reserveSize)
243  data_buffer.comp_buf_.resize(dest_size + reserveSize);
244  std::copy((char *)data_buffer.rootbuf_.Buffer(),
245  (char *)data_buffer.rootbuf_.Buffer() + dest_size,
246  (char *)(&data_buffer.comp_buf_[SerializeDataBuffer::reserve_size]));
247  break;
248  };
249 
250  data_buffer.ptr_ = &data_buffer.comp_buf_[reserveSize]; // reset to point at compressed area
251  data_buffer.curr_space_used_ = dest_size;
252 
253  // calculate the adler32 checksum and fill it into the struct
254  data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
255  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
256 
257  return data_buffer.curr_space_used_;
258  }
259 
265  unsigned int StreamSerializer::compressBuffer(unsigned char *inputBuffer,
266  unsigned int inputSize,
267  std::vector<unsigned char> &outputBuffer,
268  int compressionLevel,
269  unsigned int reserveSize) {
270  unsigned int resultSize = 0;
271 
272  // what are these magic numbers? (jbk) -> LSB 3.0 buffer size reccommendation
273  unsigned long dest_size = (unsigned long)(double(inputSize) * 1.002 + 1.0) + 12;
274  //this can has some overhead in memory usage (capacity > size) due to the way std::vector allocator works
275  if (outputBuffer.size() < dest_size + reserveSize)
276  outputBuffer.resize(dest_size + reserveSize);
277 
278  // compression 1-9, 6 is zlib default, 0 none
279  int ret = compress2(&outputBuffer[reserveSize], &dest_size, inputBuffer, inputSize, compressionLevel);
280 
281  // check status
282  if (ret == Z_OK) {
283  // return the correct length
284  resultSize = dest_size;
285 
286  FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
287  << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
288  } else {
289  throw cms::Exception("StreamSerializer", "compressBuffer")
290  << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
291  }
292 
293  return resultSize;
294  }
295 
296  //this is based on ROOT R__zipLZMA
297  unsigned int StreamSerializer::compressBufferLZMA(unsigned char *inputBuffer,
298  unsigned int inputSize,
299  std::vector<unsigned char> &outputBuffer,
300  int compressionLevel,
301  unsigned int reserveSize,
302  bool addHeader) {
303  // what are these magic numbers? (jbk)
304  unsigned int hdr_size = addHeader ? 4 : 0;
305  unsigned long dest_size = (unsigned long)(double(inputSize) * 1.01 + 1.0) + 12;
306  if (outputBuffer.size() < dest_size + reserveSize)
307  outputBuffer.resize(dest_size + reserveSize);
308 
309  // compression 1-9
310  uint32_t dict_size_est = inputSize / 4;
311  lzma_stream stream = LZMA_STREAM_INIT;
312  lzma_options_lzma opt_lzma2;
313  lzma_filter filters[] = {
314  {.id = LZMA_FILTER_LZMA2, .options = &opt_lzma2},
315  {.id = LZMA_VLI_UNKNOWN, .options = nullptr},
316  };
317  lzma_ret returnStatus;
318 
319  unsigned char *tgt = &outputBuffer[reserveSize];
320 
321  //if (*srcsize > 0xffffff || *srcsize < 0) { //16 MB limit ?
322  // return;
323  //}
324 
325  if (compressionLevel > 9)
326  compressionLevel = 9;
327 
328  lzma_bool presetStatus = lzma_lzma_preset(&opt_lzma2, compressionLevel);
329  if (presetStatus) {
330  throw cms::Exception("StreamSerializer", "compressBufferLZMA") << "LZMA preset return status: " << presetStatus;
331  }
332 
333  if (LZMA_DICT_SIZE_MIN > dict_size_est) {
334  dict_size_est = LZMA_DICT_SIZE_MIN;
335  }
336  if (opt_lzma2.dict_size > dict_size_est) {
337  /* reduce the dictionary size if larger than 1/4 the input size, preset
338  dictionaries size can be expensively large
339  */
340  opt_lzma2.dict_size = dict_size_est;
341  }
342 
343  returnStatus =
344  lzma_stream_encoder(&stream,
345  filters,
346  LZMA_CHECK_NONE); //CRC32 and CRC64 are available, but we already calculate adler32
347  if (returnStatus != LZMA_OK) {
348  throw cms::Exception("StreamSerializer", "compressBufferLZMA")
349  << "LZMA compression encoder return value: " << returnStatus;
350  }
351 
352  stream.next_in = (const uint8_t *)inputBuffer;
353  stream.avail_in = (size_t)(inputSize);
354 
355  stream.next_out = (uint8_t *)(&tgt[hdr_size]);
356  stream.avail_out = (size_t)(dest_size - hdr_size);
357 
358  returnStatus = lzma_code(&stream, LZMA_FINISH);
359 
360  if (returnStatus != LZMA_STREAM_END) {
361  lzma_end(&stream);
362  throw cms::Exception("StreamSerializer", "compressBufferLZMA")
363  << "LZMA compression return value: " << returnStatus;
364  }
365  lzma_end(&stream);
366 
367  //Add compression-specific header at the buffer start. This will be used to detect LZMA(2) format after streamer header
368  if (addHeader) {
369  tgt[0] = 'X'; /* Signature of LZMA from XZ Utils */
370  tgt[1] = 'Z';
371  tgt[2] = 0;
372  tgt[3] = 0; //let's put offset to 4, not 3
373  }
374 
375  FDEBUG(1) << " LZMA original size = " << inputSize << " final size = " << stream.total_out
376  << " ratio = " << double(stream.total_out) / double(inputSize) << std::endl;
377 
378  return stream.total_out + hdr_size;
379  }
380 
381  unsigned int StreamSerializer::compressBufferZSTD(unsigned char *inputBuffer,
382  unsigned int inputSize,
383  std::vector<unsigned char> &outputBuffer,
384  int compressionLevel,
385  unsigned int reserveSize,
386  bool addHeader) {
387  unsigned int hdr_size = addHeader ? 4 : 0;
388  unsigned int resultSize = 0;
389 
390  // what are these magic numbers? (jbk) -> LSB 3.0 buffer size reccommendation
391  size_t worst_size = ZSTD_compressBound(inputSize);
392  //this can has some overhead in memory usage (capacity > size) due to the way std::vector allocator works
393  if (outputBuffer.size() < worst_size + reserveSize + hdr_size)
394  outputBuffer.resize(worst_size + reserveSize + hdr_size);
395 
396  //Add compression-specific header at the buffer start. This will be used to detect ZSTD format after streamer header
397  unsigned char *tgt = &outputBuffer[reserveSize];
398  if (addHeader) {
399  tgt[0] = 'Z'; /* Pre */
400  tgt[1] = 'S';
401  tgt[2] = 0;
402  tgt[3] = 0;
403  }
404 
405  // compression 1-20
406  size_t dest_size = ZSTD_compress(
407  (void *)&outputBuffer[reserveSize + hdr_size], worst_size, (void *)inputBuffer, inputSize, compressionLevel);
408 
409  // check status
410  if (!ZSTD_isError(dest_size)) {
411  // return the correct length
412  resultSize = (unsigned int)dest_size + hdr_size;
413 
414  FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
415  << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
416  } else {
417  throw cms::Exception("StreamSerializer", "compressBuffer")
418  << "Compression (ZSTD) Error: " << ZSTD_getErrorName(dest_size);
419  }
420 
421  return resultSize;
422  }
423 
424 } // namespace edm
static unsigned int compressBufferZSTD(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
void setThinnedAssociationsHelper(ThinnedAssociationsHelper const &v)
WrapperBase const * wrapper() const (true)
Definition: BasicHandle.h:74
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
EventAuxiliary const & eventAuxiliary() const
bool isValid() const (true)
Definition: BasicHandle.h:70
ret
prodAgent to be discontinued
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
selection
main part
Definition: corrVsCorr.py:100
Provenance const * provenance() const (true)
Definition: BasicHandle.h:76
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:27
std::vector< TPRegexp > filters
Definition: eve_filter.cc:22
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
std::vector< BranchID > const & parents() const
Definition: Parentage.h:44
std::vector< unsigned char > comp_buf_
unsigned int curr_event_size_
bool getMapped(key_type const &k, value_type &result) const
TypeID unwrappedTypeID() const
unsigned char const * bufferPointer() const
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
unsigned int curr_space_used_
static unsigned int compressBufferLZMA(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
double sd
int serializeEvent(SerializeDataBuffer &data_buffer, EventForOutput const &event, ParameterSetID const &selectorConfig, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize)
TClass * getTClass(const std::type_info &ti)
ParentageID const & parentageID() const
HLT enums.
void setBranchIDLists(BranchIDLists const &bidlists)
static constexpr unsigned int reserve_size
void push_back(BranchDescription const &bd)
StreamerCompressionAlgo
#define FDEBUG(lev)
Definition: DebugMacros.h:19
static ParentageRegistry * instance()
void fillMap(regmap_type &fillme) const
Definition: Registry.cc:42
static Registry * instance()
Definition: Registry.cc:12
Definition: event.py:1
void setParameterSetMap(ParameterSetMap const &psetMap)
edm::propagate_const< unsigned char * > ptr_
StreamSerializer(SelectedProducts const *selections)
edm::propagate_const< TClass * > tc_