CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Private Attributes
edm::StreamSerializer Class Reference

#include <StreamSerializer.h>

Public Member Functions

int serializeEvent (SerializeDataBuffer &data_buffer, EventForOutput const &event, ParameterSetID const &selectorConfig, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
 
int serializeRegistry (SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
 
int serializeRegistry (SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper, SendJobHeader::ParameterSetMap const &psetMap)
 
 StreamSerializer (SelectedProducts const *selections)
 

Static Public Member Functions

static unsigned int compressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize)
 
static unsigned int compressBufferLZMA (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
 
static unsigned int compressBufferZSTD (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
 

Private Attributes

SelectedProducts const * selections_
 
edm::propagate_const< TClass * > tc_
 

Detailed Description

Definition at line 72 of file StreamSerializer.h.

Constructor & Destructor Documentation

◆ StreamSerializer()

edm::StreamSerializer::StreamSerializer ( SelectedProducts const *  selections)

Creates a translator instance for the specified product registry.

Definition at line 38 of file StreamSerializer.cc.

39  : selections_(selections), tc_(getTClass(typeid(SendEvent))) {}
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:63
SelectedProducts const * selections_
edm::propagate_const< TClass * > tc_

Member Function Documentation

◆ compressBuffer()

unsigned int edm::StreamSerializer::compressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
int  compressionLevel,
unsigned int  reserveSize 
)
static

Compresses the data in the specified input buffer into the specified output buffer. Returns the size of the compressed data or zero if compression failed.

Definition at line 270 of file StreamSerializer.cc.

References NanoAODEDMEventContent_cff::compressionLevel, Exception, FDEBUG, and runTheMatrix::ret.

Referenced by serializeEvent().

274  {
275  unsigned int resultSize = 0;
276 
277  // what are these magic numbers? (jbk) -> LSB 3.0 buffer size reccommendation
278  unsigned long dest_size = (unsigned long)(double(inputSize) * 1.002 + 1.0) + 12;
279  //this can has some overhead in memory usage (capacity > size) due to the way std::vector allocator works
280  if (outputBuffer.size() < dest_size + reserveSize)
281  outputBuffer.resize(dest_size + reserveSize);
282 
283  // compression 1-9, 6 is zlib default, 0 none
284  int ret = compress2(&outputBuffer[reserveSize], &dest_size, inputBuffer, inputSize, compressionLevel);
285 
286  // check status
287  if (ret == Z_OK) {
288  // return the correct length
289  resultSize = dest_size;
290 
291  FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
292  << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
293  } else {
294  throw cms::Exception("StreamSerializer", "compressBuffer")
295  << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
296  }
297 
298  return resultSize;
299  }
ret
prodAgent to be discontinued
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ compressBufferLZMA()

unsigned int edm::StreamSerializer::compressBufferLZMA ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
int  compressionLevel,
unsigned int  reserveSize,
bool  addHeader = true 
)
static

Definition at line 302 of file StreamSerializer.cc.

References NanoAODEDMEventContent_cff::compressionLevel, Exception, FDEBUG, filters, and cms::cuda::stream.

Referenced by serializeEvent().

307  {
308  // what are these magic numbers? (jbk)
309  unsigned int hdr_size = addHeader ? 4 : 0;
310  unsigned long dest_size = (unsigned long)(double(inputSize) * 1.01 + 1.0) + 12;
311  if (outputBuffer.size() < dest_size + reserveSize)
312  outputBuffer.resize(dest_size + reserveSize);
313 
314  // compression 1-9
315  uint32_t dict_size_est = inputSize / 4;
316  lzma_stream stream = LZMA_STREAM_INIT;
317  lzma_options_lzma opt_lzma2;
318  lzma_filter filters[] = {
319  {.id = LZMA_FILTER_LZMA2, .options = &opt_lzma2},
320  {.id = LZMA_VLI_UNKNOWN, .options = nullptr},
321  };
322  lzma_ret returnStatus;
323 
324  unsigned char *tgt = &outputBuffer[reserveSize];
325 
326  //if (*srcsize > 0xffffff || *srcsize < 0) { //16 MB limit ?
327  // return;
328  //}
329 
330  if (compressionLevel > 9)
331  compressionLevel = 9;
332 
333  lzma_bool presetStatus = lzma_lzma_preset(&opt_lzma2, compressionLevel);
334  if (presetStatus) {
335  throw cms::Exception("StreamSerializer", "compressBufferLZMA") << "LZMA preset return status: " << presetStatus;
336  }
337 
338  if (LZMA_DICT_SIZE_MIN > dict_size_est) {
339  dict_size_est = LZMA_DICT_SIZE_MIN;
340  }
341  if (opt_lzma2.dict_size > dict_size_est) {
342  /* reduce the dictionary size if larger than 1/4 the input size, preset
343  dictionaries size can be expensively large
344  */
345  opt_lzma2.dict_size = dict_size_est;
346  }
347 
348  returnStatus =
349  lzma_stream_encoder(&stream,
350  filters,
351  LZMA_CHECK_NONE); //CRC32 and CRC64 are available, but we already calculate adler32
352  if (returnStatus != LZMA_OK) {
353  throw cms::Exception("StreamSerializer", "compressBufferLZMA")
354  << "LZMA compression encoder return value: " << returnStatus;
355  }
356 
357  stream.next_in = (const uint8_t *)inputBuffer;
358  stream.avail_in = (size_t)(inputSize);
359 
360  stream.next_out = (uint8_t *)(&tgt[hdr_size]);
361  stream.avail_out = (size_t)(dest_size - hdr_size);
362 
363  returnStatus = lzma_code(&stream, LZMA_FINISH);
364 
365  if (returnStatus != LZMA_STREAM_END) {
366  lzma_end(&stream);
367  throw cms::Exception("StreamSerializer", "compressBufferLZMA")
368  << "LZMA compression return value: " << returnStatus;
369  }
370  lzma_end(&stream);
371 
372  //Add compression-specific header at the buffer start. This will be used to detect LZMA(2) format after streamer header
373  if (addHeader) {
374  tgt[0] = 'X'; /* Signature of LZMA from XZ Utils */
375  tgt[1] = 'Z';
376  tgt[2] = 0;
377  tgt[3] = 0; //let's put offset to 4, not 3
378  }
379 
380  FDEBUG(1) << " LZMA original size = " << inputSize << " final size = " << stream.total_out
381  << " ratio = " << double(stream.total_out) / double(inputSize) << std::endl;
382 
383  return stream.total_out + hdr_size;
384  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< TPRegexp > filters
Definition: eve_filter.cc:22

◆ compressBufferZSTD()

unsigned int edm::StreamSerializer::compressBufferZSTD ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
int  compressionLevel,
unsigned int  reserveSize,
bool  addHeader = true 
)
static

Definition at line 386 of file StreamSerializer.cc.

References NanoAODEDMEventContent_cff::compressionLevel, Exception, FDEBUG, and createfilelist::int.

Referenced by serializeEvent().

391  {
392  unsigned int hdr_size = addHeader ? 4 : 0;
393  unsigned int resultSize = 0;
394 
395  // what are these magic numbers? (jbk) -> LSB 3.0 buffer size reccommendation
396  size_t worst_size = ZSTD_compressBound(inputSize);
397  //this can has some overhead in memory usage (capacity > size) due to the way std::vector allocator works
398  if (outputBuffer.size() < worst_size + reserveSize + hdr_size)
399  outputBuffer.resize(worst_size + reserveSize + hdr_size);
400 
401  //Add compression-specific header at the buffer start. This will be used to detect ZSTD format after streamer header
402  unsigned char *tgt = &outputBuffer[reserveSize];
403  if (addHeader) {
404  tgt[0] = 'Z'; /* Pre */
405  tgt[1] = 'S';
406  tgt[2] = 0;
407  tgt[3] = 0;
408  }
409 
410  // compression 1-20
411  size_t dest_size = ZSTD_compress(
412  (void *)&outputBuffer[reserveSize + hdr_size], worst_size, (void *)inputBuffer, inputSize, compressionLevel);
413 
414  // check status
415  if (!ZSTD_isError(dest_size)) {
416  // return the correct length
417  resultSize = (unsigned int)dest_size + hdr_size;
418 
419  FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
420  << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
421  } else {
422  throw cms::Exception("StreamSerializer", "compressBuffer")
423  << "Compression (ZSTD) Error: " << ZSTD_getErrorName(dest_size);
424  }
425 
426  return resultSize;
427  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19

◆ serializeEvent()

int edm::StreamSerializer::serializeEvent ( SerializeDataBuffer data_buffer,
EventForOutput const &  event,
ParameterSetID const &  selectorConfig,
StreamerCompressionAlgo  compressionAlgo,
int  compression_level,
unsigned int  reserveSize 
) const

Serializes the specified event into the specified event message.

make a char* as a data member, tell ROOT to not adapt it, but still use it. initialize it to 1M, let ROOT resize if it wants, then delete it in the dtor.

change the call to not take an eventMessage, add a member function to return the address of the place that ROOT wrote the serialized data.

return the length of the serialized object and the actual length if compression has been done (may want to cache these lengths in this object instead.

the caller will need to copy the data from this object to its final destination in the EventMsgBuilder.

Definition at line 132 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, cms::cuda::assert(), SerializeDataBuffer::bufferPointer(), EcnaPython_AdcPeg12_S1_10_R170298_1_0_150_Dee0::cerr, SerializeDataBuffer::comp_buf_, compressBuffer(), compressBufferLZMA(), compressBufferZSTD(), OnlineOutput_cfi::compression_level, filterCSVwithJSON::copy, SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, submitPVResolutionJobs::desc, Exception, edm::ParentageRegistry::getMapped(), edm::ParentageRegistry::instance(), edm::LZMA, edm::Parentage::parents(), SerializeDataBuffer::ptr_, SerializeDataBuffer::reserve_size, mps_fire::result, SerializeDataBuffer::rootbuf_, corrVsCorr::selection, selections_, tc_, edm::ZLIB, and edm::ZSTD.

Referenced by edm::StreamerOutputModuleCommon::serializeEvent().

137  {
138  EventSelectionIDVector selectionIDs = event.eventSelectionIDs();
139  selectionIDs.push_back(selectorConfig);
140  SendEvent se(event.eventAuxiliary(), event.processHistory(), selectionIDs, event.branchListIndexes());
141 
142  // Loop over EDProducts, fill the provenance, and write.
143 
144  // Historical note. I fixed two bugs in the code below in
145  // March 2017. One would have caused any Parentage written
146  // using the Streamer output module to be total nonsense
147  // prior to the fix. The other would have caused seg faults
148  // when the Parentage was dropped in an earlier process.
149 
150  // FIX ME. The code below stores the direct parentage of
151  // kept products, but it does not save the parentage of
152  // dropped objects that are ancestors of kept products like
153  // the PoolOutputModule. That information is currently
154  // lost when the streamer output module is used.
155 
156  for (auto const &selection : *selections_) {
157  BranchDescription const &desc = *selection.first;
158  BasicHandle result = event.getByToken(selection.second, desc.unwrappedTypeID());
159  if (!result.isValid()) {
160  // No product with this ID was put in the event.
161  // Create and write the provenance.
162  se.products().push_back(StreamedProduct(desc));
163  } else {
164  if (result.provenance()->productProvenance()) {
165  Parentage const *parentage =
166  ParentageRegistry::instance()->getMapped(result.provenance()->productProvenance()->parentageID());
167  assert(parentage);
168  se.products().push_back(
169  StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, &parentage->parents()));
170  } else {
171  se.products().push_back(StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, nullptr));
172  }
173  }
174  }
175 
176  data_buffer.rootbuf_.Reset();
177  RootDebug tracer(10, 10);
178 
179  //TClass* tc = getTClass(typeid(SendEvent));
180  int bres = data_buffer.rootbuf_.WriteObjectAny(&se, tc_);
181  switch (bres) {
182  case 0: // failure
183  {
184  throw cms::Exception("StreamTranslation", "Event serialization failed")
185  << "StreamSerializer failed to serialize event: " << event.id();
186  break;
187  }
188  case 1: // succcess
189  break;
190  case 2: // truncated result
191  {
192  throw cms::Exception("StreamTranslation", "Event serialization truncated")
193  << "StreamSerializer module attempted to serialize an event\n"
194  << "that is to big for the allocated buffers: " << event.id();
195  break;
196  }
197  default: // unknown
198  {
199  throw cms::Exception("StreamTranslation", "Event serialization failed")
200  << "StreamSerializer module got an unknown error code\n"
201  << " while attempting to serialize event: " << event.id();
202  break;
203  }
204  }
205 
206  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
207  data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
208 
209 #if 0
210  if(data_buffer.ptr_ != data_.ptr_) {
211  std::cerr << "ROOT reset the buffer!!!!\n";
212  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
213  }
214 #endif
215  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
216  // eventMessage.eventAddr());
217  // eventMessage.setEventLength(rootbuf.Length());
218 
219  // compress before return if we need to
220  // should test if compressed already - should never be?
221  // as double compression can have problems
222  unsigned int dest_size = 0;
223  switch (compressionAlgo) {
224  case ZLIB:
225  dest_size = compressBuffer((unsigned char *)data_buffer.rootbuf_.Buffer(),
226  data_buffer.curr_event_size_,
227  data_buffer.comp_buf_,
229  reserveSize);
230  break;
231  case LZMA:
232  dest_size = compressBufferLZMA((unsigned char *)data_buffer.rootbuf_.Buffer(),
233  data_buffer.curr_event_size_,
234  data_buffer.comp_buf_,
236  reserveSize);
237  break;
238  case ZSTD:
239  dest_size = compressBufferZSTD((unsigned char *)data_buffer.rootbuf_.Buffer(),
240  data_buffer.curr_event_size_,
241  data_buffer.comp_buf_,
243  reserveSize);
244  break;
245  default:
246  dest_size = data_buffer.rootbuf_.Length();
247  if (data_buffer.comp_buf_.size() < dest_size + reserveSize)
248  data_buffer.comp_buf_.resize(dest_size + reserveSize);
249  std::copy((char *)data_buffer.rootbuf_.Buffer(),
250  (char *)data_buffer.rootbuf_.Buffer() + dest_size,
251  (char *)(&data_buffer.comp_buf_[SerializeDataBuffer::reserve_size]));
252  break;
253  };
254 
255  data_buffer.ptr_ = &data_buffer.comp_buf_[reserveSize]; // reset to point at compressed area
256  data_buffer.curr_space_used_ = dest_size;
257 
258  // calculate the adler32 checksum and fill it into the struct
259  data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
260  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
261 
262  return data_buffer.curr_space_used_;
263  }
static unsigned int compressBufferZSTD(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
selection
main part
Definition: corrVsCorr.py:100
assert(be >=bs)
std::vector< EventSelectionID > EventSelectionIDVector
bool getMapped(key_type const &k, value_type &result) const
std::vector< unsigned char > comp_buf_
unsigned int curr_event_size_
unsigned char const * bufferPointer() const
unsigned int curr_space_used_
static unsigned int compressBufferLZMA(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize)
static constexpr unsigned int reserve_size
static ParentageRegistry * instance()
Definition: event.py:1
edm::propagate_const< unsigned char * > ptr_
edm::propagate_const< TClass * > tc_

◆ serializeRegistry() [1/2]

int edm::StreamSerializer::serializeRegistry ( SerializeDataBuffer data_buffer,
const BranchIDLists branchIDLists,
ThinnedAssociationsHelper const &  thinnedAssociationsHelper 
)

Serializes the product registry (that was specified to the constructor) into the specified InitMessage.

Definition at line 45 of file StreamSerializer.cc.

References edm::pset::Registry::fillMap(), and edm::pset::Registry::instance().

Referenced by edm::StreamerOutputModuleCommon::serializeRegistry().

47  {
50  return serializeRegistry(data_buffer, branchIDLists, thinnedAssociationsHelper, psetMap);
51  }
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
void fillMap(regmap_type &fillme) const
Definition: Registry.cc:42
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
static Registry * instance()
Definition: Registry.cc:12

◆ serializeRegistry() [2/2]

int edm::StreamSerializer::serializeRegistry ( SerializeDataBuffer data_buffer,
const BranchIDLists branchIDLists,
ThinnedAssociationsHelper const &  thinnedAssociationsHelper,
SendJobHeader::ParameterSetMap const &  psetMap 
)

Definition at line 53 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, SerializeDataBuffer::bufferPointer(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, Exception, FDEBUG, edm::getTClass(), SerializeDataBuffer::ptr_, edm::SendJobHeader::push_back(), SerializeDataBuffer::rootbuf_, corrVsCorr::selection, selections_, edm::SendJobHeader::setBranchIDLists(), edm::SendJobHeader::setParameterSetMap(), and edm::SendJobHeader::setThinnedAssociationsHelper().

56  {
57  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
58  SendJobHeader sd;
59 
60  FDEBUG(9) << "Product List: " << std::endl;
61 
62  for (auto const &selection : *selections_) {
63  sd.push_back(*selection.first);
64  FDEBUG(9) << "StreamOutput got product = " << selection.first->className() << std::endl;
65  }
67  sd.setBranchIDLists(branchIDLists);
68  sd.setThinnedAssociationsHelper(thinnedAssociationsHelper);
69  sd.setParameterSetMap(psetMap);
70 
71  data_buffer.rootbuf_.Reset();
72 
73  RootDebug tracer(10, 10);
74 
75  TClass *tc = getTClass(typeid(SendJobHeader));
76  int bres = data_buffer.rootbuf_.WriteObjectAny((char *)&sd, tc);
77 
78  switch (bres) {
79  case 0: // failure
80  {
81  throw cms::Exception("StreamTranslation", "Registry serialization failed")
82  << "StreamSerializer failed to serialize registry\n";
83  break;
84  }
85  case 1: // succcess
86  break;
87  case 2: // truncated result
88  {
89  throw cms::Exception("StreamTranslation", "Registry serialization truncated")
90  << "StreamSerializer module attempted to serialize\n"
91  << "a registry that is to big for the allocated buffers\n";
92  break;
93  }
94  default: // unknown
95  {
96  throw cms::Exception("StreamTranslation", "Registry serialization failed")
97  << "StreamSerializer module got an unknown error code\n"
98  << " while attempting to serialize registry\n";
99  break;
100  }
101  }
102 
103  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
104  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
105  data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
106  // calculate the adler32 checksum and fill it into the struct
107  data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
108  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
109  return data_buffer.curr_space_used_;
110  }
selection
main part
Definition: corrVsCorr.py:100
#define FDEBUG(lev)
Definition: DebugMacros.h:19
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:63
unsigned int curr_event_size_
unsigned char const * bufferPointer() const
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
edm::propagate_const< unsigned char * > ptr_

Member Data Documentation

◆ selections_

SelectedProducts const* edm::StreamSerializer::selections_
private

Definition at line 118 of file StreamSerializer.h.

Referenced by serializeEvent(), and serializeRegistry().

◆ tc_

edm::propagate_const<TClass *> edm::StreamSerializer::tc_
private

Definition at line 119 of file StreamSerializer.h.

Referenced by serializeEvent().