CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Private Attributes
edm::StreamSerializer Class Reference

#include <StreamSerializer.h>

Public Member Functions

int serializeEvent (SerializeDataBuffer &data_buffer, EventForOutput const &event, ParameterSetID const &selectorConfig, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
 
int serializeRegistry (SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
 
 StreamSerializer (SelectedProducts const *selections)
 

Static Public Member Functions

static unsigned int compressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize)
 
static unsigned int compressBufferLZMA (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
 
static unsigned int compressBufferZSTD (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
 

Private Attributes

SelectedProducts const * selections_
 
edm::propagate_const< TClass * > tc_
 

Detailed Description

Definition at line 71 of file StreamSerializer.h.

Constructor & Destructor Documentation

edm::StreamSerializer::StreamSerializer ( SelectedProducts const *  selections)

Creates a translator instance for the specified product registry.

Definition at line 38 of file StreamSerializer.cc.

39  : selections_(selections), tc_(getTClass(typeid(SendEvent))) {}
SelectedProducts const * selections_
TClass * getTClass(const std::type_info &ti)
edm::propagate_const< TClass * > tc_

Member Function Documentation

unsigned int edm::StreamSerializer::compressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
int  compressionLevel,
unsigned int  reserveSize 
)
static

Compresses the data in the specified input buffer into the specified output buffer. Returns the size of the compressed data or zero if compression failed.

Definition at line 265 of file StreamSerializer.cc.

References Exception, FDEBUG, and runTheMatrix::ret.

Referenced by serializeEvent().

269  {
270  unsigned int resultSize = 0;
271 
272  // what are these magic numbers? (jbk) -> LSB 3.0 buffer size reccommendation
273  unsigned long dest_size = (unsigned long)(double(inputSize) * 1.002 + 1.0) + 12;
274  //this can has some overhead in memory usage (capacity > size) due to the way std::vector allocator works
275  if (outputBuffer.size() < dest_size + reserveSize)
276  outputBuffer.resize(dest_size + reserveSize);
277 
278  // compression 1-9, 6 is zlib default, 0 none
279  int ret = compress2(&outputBuffer[reserveSize], &dest_size, inputBuffer, inputSize, compressionLevel);
280 
281  // check status
282  if (ret == Z_OK) {
283  // return the correct length
284  resultSize = dest_size;
285 
286  FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
287  << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
288  } else {
289  throw cms::Exception("StreamSerializer", "compressBuffer")
290  << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
291  }
292 
293  return resultSize;
294  }
ret
prodAgent to be discontinued
#define FDEBUG(lev)
Definition: DebugMacros.h:19
unsigned int edm::StreamSerializer::compressBufferLZMA ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
int  compressionLevel,
unsigned int  reserveSize,
bool  addHeader = true 
)
static

Definition at line 297 of file StreamSerializer.cc.

References Exception, FDEBUG, and filters.

Referenced by serializeEvent().

302  {
303  // what are these magic numbers? (jbk)
304  unsigned int hdr_size = addHeader ? 4 : 0;
305  unsigned long dest_size = (unsigned long)(double(inputSize) * 1.01 + 1.0) + 12;
306  if (outputBuffer.size() < dest_size + reserveSize)
307  outputBuffer.resize(dest_size + reserveSize);
308 
309  // compression 1-9
310  uint32_t dict_size_est = inputSize / 4;
311  lzma_stream stream = LZMA_STREAM_INIT;
312  lzma_options_lzma opt_lzma2;
313  lzma_filter filters[] = {
314  {.id = LZMA_FILTER_LZMA2, .options = &opt_lzma2},
315  {.id = LZMA_VLI_UNKNOWN, .options = nullptr},
316  };
317  lzma_ret returnStatus;
318 
319  unsigned char *tgt = &outputBuffer[reserveSize];
320 
321  //if (*srcsize > 0xffffff || *srcsize < 0) { //16 MB limit ?
322  // return;
323  //}
324 
325  if (compressionLevel > 9)
326  compressionLevel = 9;
327 
328  lzma_bool presetStatus = lzma_lzma_preset(&opt_lzma2, compressionLevel);
329  if (presetStatus) {
330  throw cms::Exception("StreamSerializer", "compressBufferLZMA") << "LZMA preset return status: " << presetStatus;
331  }
332 
333  if (LZMA_DICT_SIZE_MIN > dict_size_est) {
334  dict_size_est = LZMA_DICT_SIZE_MIN;
335  }
336  if (opt_lzma2.dict_size > dict_size_est) {
337  /* reduce the dictionary size if larger than 1/4 the input size, preset
338  dictionaries size can be expensively large
339  */
340  opt_lzma2.dict_size = dict_size_est;
341  }
342 
343  returnStatus =
344  lzma_stream_encoder(&stream,
345  filters,
346  LZMA_CHECK_NONE); //CRC32 and CRC64 are available, but we already calculate adler32
347  if (returnStatus != LZMA_OK) {
348  throw cms::Exception("StreamSerializer", "compressBufferLZMA")
349  << "LZMA compression encoder return value: " << returnStatus;
350  }
351 
352  stream.next_in = (const uint8_t *)inputBuffer;
353  stream.avail_in = (size_t)(inputSize);
354 
355  stream.next_out = (uint8_t *)(&tgt[hdr_size]);
356  stream.avail_out = (size_t)(dest_size - hdr_size);
357 
358  returnStatus = lzma_code(&stream, LZMA_FINISH);
359 
360  if (returnStatus != LZMA_STREAM_END) {
361  lzma_end(&stream);
362  throw cms::Exception("StreamSerializer", "compressBufferLZMA")
363  << "LZMA compression return value: " << returnStatus;
364  }
365  lzma_end(&stream);
366 
367  //Add compression-specific header at the buffer start. This will be used to detect LZMA(2) format after streamer header
368  if (addHeader) {
369  tgt[0] = 'X'; /* Signature of LZMA from XZ Utils */
370  tgt[1] = 'Z';
371  tgt[2] = 0;
372  tgt[3] = 0; //let's put offset to 4, not 3
373  }
374 
375  FDEBUG(1) << " LZMA original size = " << inputSize << " final size = " << stream.total_out
376  << " ratio = " << double(stream.total_out) / double(inputSize) << std::endl;
377 
378  return stream.total_out + hdr_size;
379  }
std::vector< TPRegexp > filters
Definition: eve_filter.cc:22
#define FDEBUG(lev)
Definition: DebugMacros.h:19
unsigned int edm::StreamSerializer::compressBufferZSTD ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
int  compressionLevel,
unsigned int  reserveSize,
bool  addHeader = true 
)
static

Definition at line 381 of file StreamSerializer.cc.

References Exception, FDEBUG, and createfilelist::int.

Referenced by serializeEvent().

386  {
387  unsigned int hdr_size = addHeader ? 4 : 0;
388  unsigned int resultSize = 0;
389 
390  // what are these magic numbers? (jbk) -> LSB 3.0 buffer size reccommendation
391  size_t worst_size = ZSTD_compressBound(inputSize);
392  //this can has some overhead in memory usage (capacity > size) due to the way std::vector allocator works
393  if (outputBuffer.size() < worst_size + reserveSize + hdr_size)
394  outputBuffer.resize(worst_size + reserveSize + hdr_size);
395 
396  //Add compression-specific header at the buffer start. This will be used to detect ZSTD format after streamer header
397  unsigned char *tgt = &outputBuffer[reserveSize];
398  if (addHeader) {
399  tgt[0] = 'Z'; /* Pre */
400  tgt[1] = 'S';
401  tgt[2] = 0;
402  tgt[3] = 0;
403  }
404 
405  // compression 1-20
406  size_t dest_size = ZSTD_compress(
407  (void *)&outputBuffer[reserveSize + hdr_size], worst_size, (void *)inputBuffer, inputSize, compressionLevel);
408 
409  // check status
410  if (!ZSTD_isError(dest_size)) {
411  // return the correct length
412  resultSize = (unsigned int)dest_size + hdr_size;
413 
414  FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
415  << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
416  } else {
417  throw cms::Exception("StreamSerializer", "compressBuffer")
418  << "Compression (ZSTD) Error: " << ZSTD_getErrorName(dest_size);
419  }
420 
421  return resultSize;
422  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
int edm::StreamSerializer::serializeEvent ( SerializeDataBuffer data_buffer,
EventForOutput const &  event,
ParameterSetID const &  selectorConfig,
StreamerCompressionAlgo  compressionAlgo,
int  compression_level,
unsigned int  reserveSize 
) const

Serializes the specified event into the specified event message.

make a char* as a data member, tell ROOT to not adapt it, but still use it. initialize it to 1M, let ROOT resize if it wants, then delete it in the dtor.

change the call to not take an eventMessage, add a member function to return the address of the place that ROOT wrote the serialized data.

return the length of the serialized object and the actual length if compression has been done (may want to cache these lengths in this object instead.

the caller will need to copy the data from this object to its final destination in the EventMsgBuilder.

Definition at line 127 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, SerializeDataBuffer::bufferPointer(), beam_dqm_sourceclient-live_cfg::cerr, SerializeDataBuffer::comp_buf_, compressBuffer(), compressBufferLZMA(), compressBufferZSTD(), OnlineOutput_cfi::compression_level, filterCSVwithJSON::copy, SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, edm::EventForOutput::eventAuxiliary(), Exception, edm::ParentageRegistry::getMapped(), edm::ParentageRegistry::instance(), edm::BasicHandle::isValid(), edm::LZMA, edm::ProductProvenance::parentageID(), edm::Parentage::parents(), edm::Provenance::productProvenance(), edm::BasicHandle::provenance(), SerializeDataBuffer::ptr_, SerializeDataBuffer::reserve_size, mps_fire::result, SerializeDataBuffer::rootbuf_, corrVsCorr::selection, selections_, tc_, edm::BranchDescription::unwrappedTypeID(), edm::BasicHandle::wrapper(), edm::ZLIB, and edm::ZSTD.

Referenced by edm::StreamerOutputModuleCommon::serializeEvent().

132  {
133  EventSelectionIDVector selectionIDs = event.eventSelectionIDs();
134  selectionIDs.push_back(selectorConfig);
135  SendEvent se(event.eventAuxiliary(), event.processHistory(), selectionIDs, event.branchListIndexes());
136 
137  // Loop over EDProducts, fill the provenance, and write.
138 
139  // Historical note. I fixed two bugs in the code below in
140  // March 2017. One would have caused any Parentage written
141  // using the Streamer output module to be total nonsense
142  // prior to the fix. The other would have caused seg faults
143  // when the Parentage was dropped in an earlier process.
144 
145  // FIX ME. The code below stores the direct parentage of
146  // kept products, but it does not save the parentage of
147  // dropped objects that are ancestors of kept products like
148  // the PoolOutputModule. That information is currently
149  // lost when the streamer output module is used.
150 
151  for (auto const &selection : *selections_) {
152  BranchDescription const &desc = *selection.first;
153  BasicHandle result = event.getByToken(selection.second, desc.unwrappedTypeID());
154  if (!result.isValid()) {
155  // No product with this ID was put in the event.
156  // Create and write the provenance.
157  se.products().push_back(StreamedProduct(desc));
158  } else {
159  if (result.provenance()->productProvenance()) {
160  Parentage const *parentage =
161  ParentageRegistry::instance()->getMapped(result.provenance()->productProvenance()->parentageID());
162  assert(parentage);
163  se.products().push_back(
164  StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, &parentage->parents()));
165  } else {
166  se.products().push_back(StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, nullptr));
167  }
168  }
169  }
170 
171  data_buffer.rootbuf_.Reset();
172  RootDebug tracer(10, 10);
173 
174  //TClass* tc = getTClass(typeid(SendEvent));
175  int bres = data_buffer.rootbuf_.WriteObjectAny(&se, tc_);
176  switch (bres) {
177  case 0: // failure
178  {
179  throw cms::Exception("StreamTranslation", "Event serialization failed")
180  << "StreamSerializer failed to serialize event: " << event.id();
181  break;
182  }
183  case 1: // succcess
184  break;
185  case 2: // truncated result
186  {
187  throw cms::Exception("StreamTranslation", "Event serialization truncated")
188  << "StreamSerializer module attempted to serialize an event\n"
189  << "that is to big for the allocated buffers: " << event.id();
190  break;
191  }
192  default: // unknown
193  {
194  throw cms::Exception("StreamTranslation", "Event serialization failed")
195  << "StreamSerializer module got an unknown error code\n"
196  << " while attempting to serialize event: " << event.id();
197  break;
198  }
199  }
200 
201  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
202  data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
203 
204 #if 0
205  if(data_buffer.ptr_ != data_.ptr_) {
206  std::cerr << "ROOT reset the buffer!!!!\n";
207  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
208  }
209 #endif
210  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
211  // eventMessage.eventAddr());
212  // eventMessage.setEventLength(rootbuf.Length());
213 
214  // compress before return if we need to
215  // should test if compressed already - should never be?
216  // as double compression can have problems
217  unsigned int dest_size = 0;
218  switch (compressionAlgo) {
219  case ZLIB:
220  dest_size = compressBuffer((unsigned char *)data_buffer.rootbuf_.Buffer(),
221  data_buffer.curr_event_size_,
222  data_buffer.comp_buf_,
224  reserveSize);
225  break;
226  case LZMA:
227  dest_size = compressBufferLZMA((unsigned char *)data_buffer.rootbuf_.Buffer(),
228  data_buffer.curr_event_size_,
229  data_buffer.comp_buf_,
231  reserveSize);
232  break;
233  case ZSTD:
234  dest_size = compressBufferZSTD((unsigned char *)data_buffer.rootbuf_.Buffer(),
235  data_buffer.curr_event_size_,
236  data_buffer.comp_buf_,
238  reserveSize);
239  break;
240  default:
241  dest_size = data_buffer.rootbuf_.Length();
242  if (data_buffer.comp_buf_.size() < dest_size + reserveSize)
243  data_buffer.comp_buf_.resize(dest_size + reserveSize);
244  std::copy((char *)data_buffer.rootbuf_.Buffer(),
245  (char *)data_buffer.rootbuf_.Buffer() + dest_size,
246  (char *)(&data_buffer.comp_buf_[SerializeDataBuffer::reserve_size]));
247  break;
248  };
249 
250  data_buffer.ptr_ = &data_buffer.comp_buf_[reserveSize]; // reset to point at compressed area
251  data_buffer.curr_space_used_ = dest_size;
252 
253  // calculate the adler32 checksum and fill it into the struct
254  data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
255  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
256 
257  return data_buffer.curr_space_used_;
258  }
static unsigned int compressBufferZSTD(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
selection
main part
Definition: corrVsCorr.py:100
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< unsigned char > comp_buf_
unsigned int curr_event_size_
bool getMapped(key_type const &k, value_type &result) const
unsigned char const * bufferPointer() const
unsigned int curr_space_used_
static unsigned int compressBufferLZMA(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize)
static constexpr unsigned int reserve_size
static ParentageRegistry * instance()
Definition: event.py:1
edm::propagate_const< unsigned char * > ptr_
edm::propagate_const< TClass * > tc_
int edm::StreamSerializer::serializeRegistry ( SerializeDataBuffer data_buffer,
const BranchIDLists branchIDLists,
ThinnedAssociationsHelper const &  thinnedAssociationsHelper 
)

Serializes the product registry (that was specified to the constructor) into the specified InitMessage.

Definition at line 46 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, SerializeDataBuffer::bufferPointer(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, Exception, FDEBUG, edm::pset::Registry::fillMap(), edm::getTClass(), edm::pset::Registry::instance(), SerializeDataBuffer::ptr_, edm::SendJobHeader::push_back(), SerializeDataBuffer::rootbuf_, sd, corrVsCorr::selection, selections_, edm::SendJobHeader::setBranchIDLists(), edm::SendJobHeader::setParameterSetMap(), and edm::SendJobHeader::setThinnedAssociationsHelper().

Referenced by edm::StreamerOutputModuleCommon::serializeRegistry().

48  {
49  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
50  SendJobHeader sd;
51 
52  FDEBUG(9) << "Product List: " << std::endl;
53 
54  for (auto const &selection : *selections_) {
55  sd.push_back(*selection.first);
56  FDEBUG(9) << "StreamOutput got product = " << selection.first->className() << std::endl;
57  }
59  sd.setBranchIDLists(branchIDLists);
60  sd.setThinnedAssociationsHelper(thinnedAssociationsHelper);
62 
64  sd.setParameterSetMap(psetMap);
65 
66  data_buffer.rootbuf_.Reset();
67 
68  RootDebug tracer(10, 10);
69 
70  TClass *tc = getTClass(typeid(SendJobHeader));
71  int bres = data_buffer.rootbuf_.WriteObjectAny((char *)&sd, tc);
72 
73  switch (bres) {
74  case 0: // failure
75  {
76  throw cms::Exception("StreamTranslation", "Registry serialization failed")
77  << "StreamSerializer failed to serialize registry\n";
78  break;
79  }
80  case 1: // succcess
81  break;
82  case 2: // truncated result
83  {
84  throw cms::Exception("StreamTranslation", "Registry serialization truncated")
85  << "StreamSerializer module attempted to serialize\n"
86  << "a registry that is to big for the allocated buffers\n";
87  break;
88  }
89  default: // unknown
90  {
91  throw cms::Exception("StreamTranslation", "Registry serialization failed")
92  << "StreamSerializer module got an unknown error code\n"
93  << " while attempting to serialize registry\n";
94  break;
95  }
96  }
97 
98  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
99  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
100  data_buffer.ptr_ = (unsigned char *)data_buffer.rootbuf_.Buffer();
101  // calculate the adler32 checksum and fill it into the struct
102  data_buffer.adler32_chksum_ = cms::Adler32((char *)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
103  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
104  return data_buffer.curr_space_used_;
105  }
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
selection
main part
Definition: corrVsCorr.py:100
unsigned int curr_event_size_
unsigned char const * bufferPointer() const
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
double sd
TClass * getTClass(const std::type_info &ti)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void fillMap(regmap_type &fillme) const
Definition: Registry.cc:42
static Registry * instance()
Definition: Registry.cc:12
edm::propagate_const< unsigned char * > ptr_

Member Data Documentation

SelectedProducts const* edm::StreamSerializer::selections_
private

Definition at line 112 of file StreamSerializer.h.

Referenced by serializeEvent(), and serializeRegistry().

edm::propagate_const<TClass *> edm::StreamSerializer::tc_
private

Definition at line 113 of file StreamSerializer.h.

Referenced by serializeEvent().