53 FDEBUG(6) <<
"StreamSerializer::serializeRegistry" << std::endl;
56 FDEBUG(9) <<
"Product List: " << std::endl;
60 FDEBUG(9) <<
"StreamOutput got product = " <<
selection.first->className() << std::endl;
70 int bres = data_buffer.
rootbuf_.WriteObjectAny((
char *)&sd, tc);
75 throw cms::Exception(
"StreamTranslation",
"Registry serialization failed")
76 <<
"StreamSerializer failed to serialize registry\n";
83 throw cms::Exception(
"StreamTranslation",
"Registry serialization truncated")
84 <<
"StreamSerializer module attempted to serialize\n" 85 <<
"a registry that is to big for the allocated buffers\n";
90 throw cms::Exception(
"StreamTranslation",
"Registry serialization failed")
91 <<
"StreamSerializer module got an unknown error code\n" 92 <<
" while attempting to serialize registry\n";
99 data_buffer.
ptr_ = (
unsigned char *)data_buffer.
rootbuf_.Buffer();
129 uint32_t metaDataChecksum,
132 unsigned int reserveSize)
const {
134 selectionIDs.push_back(selectorConfig);
136 event.processHistory(),
138 event.branchListIndexes(),
165 if (
result.provenance()->productProvenance()) {
169 se.products().push_back(
184 unsigned int reserveSize)
const {
185 SendEvent se({}, {}, {}, {}, branchIDLists, thinnedAssociationsHelper, 0);
194 unsigned int reserveSize)
const {
199 int bres = data_buffer.
rootbuf_.WriteObjectAny(&se,
tc_);
203 throw cms::Exception(
"StreamTranslation",
"Event serialization failed")
204 <<
"StreamSerializer failed to serialize event: " << se.
aux().
id();
211 throw cms::Exception(
"StreamTranslation",
"Event serialization truncated")
212 <<
"StreamSerializer module attempted to serialize an event\n" 213 <<
"that is to big for the allocated buffers: " << se.
aux().
id();
218 throw cms::Exception(
"StreamTranslation",
"Event serialization failed")
219 <<
"StreamSerializer module got an unknown error code\n" 220 <<
" while attempting to serialize event: " << se.
aux().
id();
226 data_buffer.
ptr_ = (
unsigned char *)data_buffer.
rootbuf_.Buffer();
229 if(data_buffer.
ptr_ != data_.ptr_) {
230 std::cerr <<
"ROOT reset the buffer!!!!\n";
231 data_.ptr_ = data_buffer.
ptr_;
241 unsigned int dest_size = 0;
242 switch (compressionAlgo) {
265 dest_size = data_buffer.
rootbuf_.Length();
266 if (data_buffer.
comp_buf_.size() < dest_size + reserveSize)
267 data_buffer.
comp_buf_.resize(dest_size + reserveSize);
269 (
char *)data_buffer.
rootbuf_.Buffer() + dest_size,
290 unsigned int inputSize,
291 std::vector<unsigned char> &outputBuffer,
293 unsigned int reserveSize) {
294 unsigned int resultSize = 0;
297 unsigned long dest_size = (
unsigned long)(
double(inputSize) * 1.002 + 1.0) + 12;
299 if (outputBuffer.size() < dest_size + reserveSize)
300 outputBuffer.resize(dest_size + reserveSize);
303 int ret = compress2(&outputBuffer[reserveSize], &dest_size, inputBuffer, inputSize,
compressionLevel);
308 resultSize = dest_size;
310 FDEBUG(1) <<
" original size = " << inputSize <<
" final size = " << dest_size
311 <<
" ratio = " << double(dest_size) / double(inputSize) << std::endl;
314 <<
"Compression Return value: " <<
ret <<
" Okay = " << Z_OK << std::endl;
322 unsigned int inputSize,
323 std::vector<unsigned char> &outputBuffer,
325 unsigned int reserveSize,
328 unsigned int hdr_size = addHeader ? 4 : 0;
329 unsigned long dest_size = (
unsigned long)(
double(inputSize) * 1.01 + 1.0) + 12;
330 if (outputBuffer.size() < dest_size + reserveSize)
331 outputBuffer.resize(dest_size + reserveSize);
334 uint32_t dict_size_est = inputSize / 4;
335 lzma_stream
stream = LZMA_STREAM_INIT;
336 lzma_options_lzma opt_lzma2;
338 {.id = LZMA_FILTER_LZMA2, .options = &opt_lzma2},
339 {.id = LZMA_VLI_UNKNOWN, .options =
nullptr},
341 lzma_ret returnStatus;
343 unsigned char *tgt = &outputBuffer[reserveSize];
354 throw cms::Exception(
"StreamSerializer",
"compressBufferLZMA") <<
"LZMA preset return status: " << presetStatus;
357 if (LZMA_DICT_SIZE_MIN > dict_size_est) {
358 dict_size_est = LZMA_DICT_SIZE_MIN;
360 if (opt_lzma2.dict_size > dict_size_est) {
364 opt_lzma2.dict_size = dict_size_est;
368 lzma_stream_encoder(&
stream,
371 if (returnStatus != LZMA_OK) {
373 <<
"LZMA compression encoder return value: " << returnStatus;
376 stream.next_in = (
const uint8_t *)inputBuffer;
377 stream.avail_in = (size_t)(inputSize);
379 stream.next_out = (uint8_t *)(&tgt[hdr_size]);
380 stream.avail_out = (size_t)(dest_size - hdr_size);
382 returnStatus = lzma_code(&
stream, LZMA_FINISH);
384 if (returnStatus != LZMA_STREAM_END) {
387 <<
"LZMA compression return value: " << returnStatus;
399 FDEBUG(1) <<
" LZMA original size = " << inputSize <<
" final size = " <<
stream.total_out
400 <<
" ratio = " << double(
stream.total_out) / double(inputSize) << std::endl;
402 return stream.total_out + hdr_size;
406 unsigned int inputSize,
407 std::vector<unsigned char> &outputBuffer,
409 unsigned int reserveSize,
411 unsigned int hdr_size = addHeader ? 4 : 0;
412 unsigned int resultSize = 0;
415 size_t worst_size = ZSTD_compressBound(inputSize);
417 if (outputBuffer.size() < worst_size + reserveSize + hdr_size)
418 outputBuffer.resize(worst_size + reserveSize + hdr_size);
421 unsigned char *tgt = &outputBuffer[reserveSize];
430 size_t dest_size = ZSTD_compress(
431 (
void *)&outputBuffer[reserveSize + hdr_size], worst_size, (
void *)inputBuffer, inputSize,
compressionLevel);
434 if (!ZSTD_isError(dest_size)) {
436 resultSize = (
unsigned int)dest_size + hdr_size;
438 FDEBUG(1) <<
" original size = " << inputSize <<
" final size = " << dest_size
439 <<
" ratio = " << double(dest_size) / double(inputSize) << std::endl;
442 <<
"Compression (ZSTD) Error: " << ZSTD_getErrorName(dest_size);
std::vector< unsigned char > comp_buf_
std::vector< BranchIDList > BranchIDLists
int serializeEvent(SerializeDataBuffer &data_buffer, EventForOutput const &event, ParameterSetID const &selectorConfig, uint32_t metaDataChecksum, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
StreamSerializer(SelectedProducts const *selections)
ret
prodAgent to be discontinued
static unsigned int compressBufferZSTD(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
edm::propagate_const< unsigned char * > ptr_
int serializeEventMetaData(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
data_buffer.adler32_chksum_ is the meta data checksum to pass to subsequent events ...
static constexpr unsigned int reserve_size
std::vector< BranchID > const & parents() const
int serializeRegistry(SerializeDataBuffer &data_buffer) const
EventAuxiliary const & aux() const
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
unsigned int curr_space_used_
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize)
std::vector< TPRegexp > filters
static unsigned int compressBufferLZMA(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel, unsigned int reserveSize, bool addHeader=true)
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
EventID const & id() const
bool getMapped(key_type const &k, value_type &result) const
void fillMap(regmap_type &fillme) const
unsigned char const * bufferPointer() const
edm::propagate_const< TClass * > tc_
int serializeEventCommon(SerializeDataBuffer &data_buffer, edm::SendEvent const &iEvent, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
TClass * getTClass(const std::type_info &ti)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
static ParentageRegistry * instance()
static Registry * instance()
unsigned int curr_event_size_