42 int const init_size = 1024 * 1024;
49 xbuf_(TBuffer::kRead, init_size),
51 eventPrincipalHolder_(),
52 adjustEventToNewProductRegistry_(
false),
54 protocolVersion_(0
U) {}
66 FDEBUG(6) <<
"mergeIntoRegistry: Product List: " << std::endl;
72 if (!mergeInfo.empty()) {
73 throw cms::Exception(
"MismatchedInput",
"RootInputFileSequence::previousEvent()") << mergeInfo;
90 std::vector<std::string> missingDictionaries;
91 std::vector<std::string> branchNamesForMissing;
92 std::vector<std::string> producedTypes;
93 for (
auto const&
item : descs) {
96 FDEBUG(6) <<
"declare: " << real_name << std::endl;
97 if (!
loadCap(real_name, missingDictionaries)) {
98 branchNamesForMissing.emplace_back(
item.branchName());
99 producedTypes.emplace_back(
item.className() +
std::string(
" (read from input)"));
102 if (!missingDictionaries.empty()) {
103 std::string context(
"Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
109 for (
auto const&
item : descs) {
112 FDEBUG(6) <<
"BuildReadData: " << real_name << std::endl;
123 throw cms::Exception(
"StreamTranslation",
"Registry deserialization error")
124 <<
"received wrong message type: expected INIT, got " << initView.
code() <<
"\n";
131 FDEBUG(10) <<
"StreamerInputSource::deserializeRegistry processName = " <<
processName_ << std::endl;
132 FDEBUG(10) <<
"StreamerInputSource::deserializeRegistry protocolVersion_= " <<
protocolVersion_ << std::endl;
143 <<
" chksum from registry data = " << adler32_chksum <<
" from header = " << initView.
adler32_chksum()
144 <<
" host name = " << initView.
hostName() << std::endl;
150 TBuffer::kRead, initView.
descLength(), const_cast<char*>((
char const*)initView.
descData()), kFALSE);
152 std::unique_ptr<SendJobHeader>
sd((
SendJobHeader*)xbuf.ReadObjectAny(desc));
154 if (
sd.get() ==
nullptr) {
155 throw cms::Exception(
"StreamTranslation",
"Registry deserialization error")
156 <<
"Could not read the initial product registry list\n";
159 sd->initializeTransients();
175 for (
auto const&
item : psetMap) {
187 throw cms::Exception(
"StreamTranslation",
"Event deserialization error")
188 <<
"received wrong message type: expected EVENT, got " << eventView.
code() <<
"\n";
189 FDEBUG(9) <<
"Decode event: " << eventView.
event() <<
" " << eventView.
run() <<
" " << eventView.
size() <<
" "
196 unsigned long dest_size;
205 <<
" chksum from event = " << adler32_chksum <<
" from header = " << eventView.
adler32_chksum()
206 <<
" host name = " << eventView.
hostName() << std::endl;
208 if (origsize != 78 && origsize != 0) {
228 dest_.resize(dest_size);
229 unsigned char*
pos = (
unsigned char*)&
dest_[0];
230 unsigned char const* from = (
unsigned char const*)eventView.
eventData();
238 xbuf_.SetBuffer(&
dest_[0], dest_size, kFALSE);
249 std::shared_ptr<void> refCoreStreamerGuard(
nullptr, [](
void*) {
257 throw cms::Exception(
"StreamTranslation",
"Event deserialization error")
258 <<
"got a null event from input stream\n";
304 for (
auto& spitem : sps) {
305 FDEBUG(10) <<
"check prodpair" << std::endl;
306 if (spitem.desc() ==
nullptr)
309 <<
" " << spitem.desc()->className() <<
" " << spitem.desc()->productInstanceName() <<
" "
310 << spitem.desc()->branchID() << std::endl;
314 if (spitem.parents()) {
315 std::optional<ProductProvenance> productProvenance{std::in_place, spitem.
branchID(), *spitem.parents()};
316 if (spitem.prod() !=
nullptr) {
317 FDEBUG(10) <<
"addproduct next " << spitem.branchID() << std::endl;
319 std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())),
321 FDEBUG(10) <<
"addproduct done" << std::endl;
323 FDEBUG(10) <<
"addproduct empty next " << spitem.branchID() << std::endl;
324 eventPrincipal.
putOnRead(branchDesc, std::unique_ptr<WrapperBase>(),
std::move(productProvenance));
325 FDEBUG(10) <<
"addproduct empty done" << std::endl;
328 std::optional<ProductProvenance> productProvenance;
329 if (spitem.prod() !=
nullptr) {
330 FDEBUG(10) <<
"addproduct next " << spitem.branchID() << std::endl;
332 branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
333 FDEBUG(10) <<
"addproduct done" << std::endl;
335 FDEBUG(10) <<
"addproduct empty next " << spitem.branchID() << std::endl;
336 eventPrincipal.
putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
337 FDEBUG(10) <<
"addproduct empty done" << std::endl;
343 FDEBUG(10) <<
"Size = " << eventPrincipal.
size() << std::endl;
355 unsigned int inputSize,
356 std::vector<unsigned char>& outputBuffer,
357 unsigned int expectedFullSize) {
358 unsigned long origSize = expectedFullSize;
359 unsigned long uncompressedSize = expectedFullSize * 1.1;
360 FDEBUG(1) <<
"Uncompress: original size = " << origSize <<
", compressed size = " << inputSize << std::endl;
361 outputBuffer.resize(uncompressedSize);
362 int ret = uncompress(&outputBuffer[0], &uncompressedSize, inputBuffer, inputSize);
366 FDEBUG(10) <<
" original size = " << origSize <<
" final size = " << uncompressedSize << std::endl;
367 if (origSize != uncompressedSize) {
369 throw cms::Exception(
"StreamDeserialization",
"Uncompression error")
370 <<
"mismatch event lengths should be" << origSize <<
" got " << uncompressedSize <<
"\n";
374 throw cms::Exception(
"StreamDeserialization",
"Uncompression error") <<
"Error code = " <<
ret <<
"\n ";
376 return (
unsigned int)uncompressedSize;
380 if (inputSize >= 4 && !strcmp((
const char*)inputBuffer,
"XZ"))
387 unsigned int inputSize,
388 std::vector<unsigned char>& outputBuffer,
389 unsigned int expectedFullSize,
391 unsigned long origSize = expectedFullSize;
392 unsigned long uncompressedSize = expectedFullSize * 1.1;
393 FDEBUG(1) <<
"Uncompress: original size = " << origSize <<
", compressed size = " << inputSize << std::endl;
394 outputBuffer.resize(uncompressedSize);
396 lzma_stream
stream = LZMA_STREAM_INIT;
397 lzma_ret returnStatus;
399 returnStatus = lzma_stream_decoder(&
stream, UINT64_MAX, 0
U);
400 if (returnStatus != LZMA_OK) {
401 throw cms::Exception(
"StreamDeserializationLZM",
"LZMA stream decoder error")
402 <<
"Error code = " << returnStatus <<
"\n ";
405 size_t hdrSize = hasHeader ? 4 : 0;
406 stream.next_in = (
const uint8_t*)(inputBuffer + hdrSize);
407 stream.avail_in = (size_t)(inputSize - hdrSize);
408 stream.next_out = (uint8_t*)&outputBuffer[0];
409 stream.avail_out = (size_t)uncompressedSize;
411 returnStatus = lzma_code(&
stream, LZMA_FINISH);
412 if (returnStatus != LZMA_STREAM_END) {
414 throw cms::Exception(
"StreamDeserializationLZM",
"LZMA uncompression error")
415 <<
"Error code = " << returnStatus <<
"\n ";
419 uncompressedSize = (
unsigned int)
stream.total_out;
421 FDEBUG(10) <<
" original size = " << origSize <<
" final size = " << uncompressedSize << std::endl;
422 if (origSize != uncompressedSize) {
424 throw cms::Exception(
"StreamDeserialization",
"LZMA uncompression error")
425 <<
"mismatch event lengths should be" << origSize <<
" got " << uncompressedSize <<
"\n";
428 return uncompressedSize;
432 if (inputSize >= 4 && !strcmp((
const char*)inputBuffer,
"ZS"))
439 unsigned int inputSize,
440 std::vector<unsigned char>& outputBuffer,
441 unsigned int expectedFullSize,
443 unsigned long uncompressedSize = expectedFullSize * 1.1;
444 FDEBUG(1) <<
"Uncompress: original size = " << expectedFullSize <<
", compressed size = " << inputSize << std::endl;
445 outputBuffer.resize(uncompressedSize);
447 size_t hdrSize = hasHeader ? 4 : 0;
448 size_t ret = ZSTD_decompress(
449 (
void*)&(outputBuffer[0]), uncompressedSize, (
const void*)(inputBuffer + hdrSize), inputSize - hdrSize);
451 if (ZSTD_isError(
ret)) {
452 throw cms::Exception(
"StreamDeserializationZSTD",
"ZSTD uncompression error")
453 <<
"Error core " <<
ret <<
", message:" << ZSTD_getErrorName(
ret);
455 return (
unsigned int)
ret;
471 <<
"Run number cannot be modified for this type of Input Source\n"
472 <<
"Contact a Storage Manager Developer\n";
480 return eventPrincipal_ ? eventPrincipal_->getIt(
id) :
nullptr;
484 unsigned int&
index)
const {
485 return eventPrincipal_ ? eventPrincipal_->getThinnedProduct(
id,
index) :
nullptr;
489 std::vector<WrapperBase const*>& wrappers,
490 std::vector<unsigned int>&
keys)
const {
492 eventPrincipal_->getThinnedProducts(pid, wrappers,
keys);
496 assert(eventPrincipal_ !=
nullptr);
497 return eventPrincipal_->transitionIndex();