42 int const init_size = 1024 * 1024;
49 xbuf_(TBuffer::kRead, init_size),
51 eventPrincipalHolder_(),
52 adjustEventToNewProductRegistry_(
false),
54 protocolVersion_(0
U) {}
62 FDEBUG(6) <<
"mergeIntoRegistry: Product List: " << std::endl;
68 if (!mergeInfo.empty()) {
69 throw cms::Exception(
"MismatchedInput",
"RootInputFileSequence::previousEvent()") << mergeInfo;
82 std::vector<std::string> missingDictionaries;
83 std::vector<std::string> branchNamesForMissing;
84 std::vector<std::string> producedTypes;
85 for (
auto const&
item : descs) {
88 FDEBUG(6) <<
"declare: " << real_name << std::endl;
89 if (!
loadCap(real_name, missingDictionaries)) {
90 branchNamesForMissing.emplace_back(
item.branchName());
91 producedTypes.emplace_back(
item.className() +
std::string(
" (read from input)"));
94 if (!missingDictionaries.empty()) {
95 std::string context(
"Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
101 for (
auto const&
item : descs) {
104 FDEBUG(6) <<
"BuildReadData: " << real_name << std::endl;
115 throw cms::Exception(
"StreamTranslation",
"Registry deserialization error")
116 <<
"received wrong message type: expected INIT, got " << initView.
code() <<
"\n";
123 FDEBUG(10) <<
"StreamerInputSource::deserializeRegistry processName = " <<
processName_ << std::endl;
124 FDEBUG(10) <<
"StreamerInputSource::deserializeRegistry protocolVersion_= " <<
protocolVersion_ << std::endl;
135 <<
" chksum from registry data = " << adler32_chksum <<
" from header = " << initView.
adler32_chksum()
136 <<
" host name = " << initView.
hostName() << std::endl;
142 TBuffer::kRead, initView.
descLength(),
const_cast<char*
>((
char const*)initView.
descData()), kFALSE);
146 if (sd.get() ==
nullptr) {
147 throw cms::Exception(
"StreamTranslation",
"Registry deserialization error")
148 <<
"Could not read the initial product registry list\n";
151 sd->initializeTransients();
167 for (
auto const&
item : psetMap) {
195 throw cms::Exception(
"StreamTranslation",
"Event deserialization error")
196 <<
"received wrong message type: expected EVENT, got " << eventView.
code() <<
"\n";
197 FDEBUG(9) <<
"Decode event: " << eventView.
event() <<
" " << eventView.
run() <<
" " << eventView.
size() <<
" " 204 unsigned long dest_size;
210 if (static_cast<uint32>(adler32_chksum) != eventView.
adler32_chksum()) {
213 <<
" chksum from event = " << adler32_chksum <<
" from header = " << eventView.
adler32_chksum()
214 <<
" host name = " << eventView.
hostName() << std::endl;
216 if (origsize != 78 && origsize != 0) {
236 dest_.resize(dest_size);
237 unsigned char*
pos = (
unsigned char*)&
dest_[0];
238 unsigned char const* from = (
unsigned char const*)eventView.
eventData();
246 xbuf_.SetBuffer(&
dest_[0], dest_size, kFALSE);
257 std::shared_ptr<void> refCoreStreamerGuard(
nullptr, [](
void*) {
265 throw cms::Exception(
"StreamTranslation",
"Event deserialization error")
266 <<
"got a null event from input stream\n";
323 for (
auto& spitem : sps) {
324 FDEBUG(10) <<
"check prodpair" << std::endl;
325 if (spitem.desc() ==
nullptr)
328 <<
" " << spitem.desc()->className() <<
" " << spitem.desc()->productInstanceName() <<
" " 329 << spitem.desc()->branchID() << std::endl;
333 if (spitem.parents()) {
334 std::optional<ProductProvenance> productProvenance{std::in_place, spitem.
branchID(), *spitem.parents()};
335 if (spitem.prod() !=
nullptr) {
336 FDEBUG(10) <<
"addproduct next " << spitem.branchID() << std::endl;
338 std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())),
340 FDEBUG(10) <<
"addproduct done" << std::endl;
342 FDEBUG(10) <<
"addproduct empty next " << spitem.branchID() << std::endl;
343 eventPrincipal.
putOnRead(branchDesc, std::unique_ptr<WrapperBase>(),
std::move(productProvenance));
344 FDEBUG(10) <<
"addproduct empty done" << std::endl;
347 std::optional<ProductProvenance> productProvenance;
348 if (spitem.prod() !=
nullptr) {
349 FDEBUG(10) <<
"addproduct next " << spitem.branchID() << std::endl;
351 branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
352 FDEBUG(10) <<
"addproduct done" << std::endl;
354 FDEBUG(10) <<
"addproduct empty next " << spitem.branchID() << std::endl;
355 eventPrincipal.
putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
356 FDEBUG(10) <<
"addproduct empty done" << std::endl;
362 FDEBUG(10) <<
"Size = " << eventPrincipal.
size() << std::endl;
374 unsigned int inputSize,
375 std::vector<unsigned char>& outputBuffer,
376 unsigned int expectedFullSize) {
377 unsigned long origSize = expectedFullSize;
378 unsigned long uncompressedSize = expectedFullSize * 1.1;
379 FDEBUG(1) <<
"Uncompress: original size = " << origSize <<
", compressed size = " << inputSize << std::endl;
380 outputBuffer.resize(uncompressedSize);
381 int ret = uncompress(&outputBuffer[0], &uncompressedSize, inputBuffer, inputSize);
385 FDEBUG(10) <<
" original size = " << origSize <<
" final size = " << uncompressedSize << std::endl;
386 if (origSize != uncompressedSize) {
388 throw cms::Exception(
"StreamDeserialization",
"Uncompression error")
389 <<
"mismatch event lengths should be" << origSize <<
" got " << uncompressedSize <<
"\n";
393 throw cms::Exception(
"StreamDeserialization",
"Uncompression error") <<
"Error code = " <<
ret <<
"\n ";
395 return (
unsigned int)uncompressedSize;
399 if (inputSize >= 4 && !strcmp((
const char*)inputBuffer,
"XZ"))
406 unsigned int inputSize,
407 std::vector<unsigned char>& outputBuffer,
408 unsigned int expectedFullSize,
410 unsigned long origSize = expectedFullSize;
411 unsigned long uncompressedSize = expectedFullSize * 1.1;
412 FDEBUG(1) <<
"Uncompress: original size = " << origSize <<
", compressed size = " << inputSize << std::endl;
413 outputBuffer.resize(uncompressedSize);
415 lzma_stream
stream = LZMA_STREAM_INIT;
416 lzma_ret returnStatus;
418 returnStatus = lzma_stream_decoder(&
stream, UINT64_MAX, 0
U);
419 if (returnStatus != LZMA_OK) {
420 throw cms::Exception(
"StreamDeserializationLZM",
"LZMA stream decoder error")
421 <<
"Error code = " << returnStatus <<
"\n ";
424 size_t hdrSize = hasHeader ? 4 : 0;
425 stream.next_in = (
const uint8_t*)(inputBuffer + hdrSize);
426 stream.avail_in = (size_t)(inputSize - hdrSize);
427 stream.next_out = (uint8_t*)&outputBuffer[0];
428 stream.avail_out = (size_t)uncompressedSize;
430 returnStatus = lzma_code(&
stream, LZMA_FINISH);
431 if (returnStatus != LZMA_STREAM_END) {
433 throw cms::Exception(
"StreamDeserializationLZM",
"LZMA uncompression error")
434 <<
"Error code = " << returnStatus <<
"\n ";
438 uncompressedSize = (
unsigned int)
stream.total_out;
440 FDEBUG(10) <<
" original size = " << origSize <<
" final size = " << uncompressedSize << std::endl;
441 if (origSize != uncompressedSize) {
443 throw cms::Exception(
"StreamDeserialization",
"LZMA uncompression error")
444 <<
"mismatch event lengths should be" << origSize <<
" got " << uncompressedSize <<
"\n";
447 return uncompressedSize;
451 if (inputSize >= 4 && !strcmp((
const char*)inputBuffer,
"ZS"))
458 unsigned int inputSize,
459 std::vector<unsigned char>& outputBuffer,
460 unsigned int expectedFullSize,
462 unsigned long uncompressedSize = expectedFullSize * 1.1;
463 FDEBUG(1) <<
"Uncompress: original size = " << expectedFullSize <<
", compressed size = " << inputSize << std::endl;
464 outputBuffer.resize(uncompressedSize);
466 size_t hdrSize = hasHeader ? 4 : 0;
467 size_t ret = ZSTD_decompress(
468 (
void*)&(outputBuffer[0]), uncompressedSize, (
const void*)(inputBuffer + hdrSize), inputSize - hdrSize);
470 if (ZSTD_isError(
ret)) {
471 throw cms::Exception(
"StreamDeserializationZSTD",
"ZSTD uncompression error")
472 <<
"Error core " <<
ret <<
", message:" << ZSTD_getErrorName(
ret);
474 return (
unsigned int)
ret;
490 <<
"Run number cannot be modified for this type of Input Source\n" 491 <<
"Contact a Storage Manager Developer\n";
499 return eventPrincipal_ ? eventPrincipal_->getIt(
id) :
nullptr;
502 std::optional<std::tuple<edm::WrapperBase const*, unsigned int>>
505 return eventPrincipal_->getThinnedProduct(
id,
index);
510 std::vector<WrapperBase const*>& wrappers,
511 std::vector<unsigned int>&
keys)
const {
513 eventPrincipal_->getThinnedProducts(pid, wrappers,
keys);
518 if (eventPrincipal_) {
519 return eventPrincipal_->getThinnedKeyFrom(
parent,
index, thinned);
521 return std::monostate{};
526 assert(eventPrincipal_ !=
nullptr);
527 return eventPrincipal_->transitionIndex();
uint32 origDataSize() const
void throwMissingDictionariesException(std::vector< std::string > &missingDictionaries, std::string const &context)
static Timestamp invalidTimestamp()
const uint8 * eventData() const
std::variant< unsigned int, detail::GetThinnedKeyFromExceptionFactory, std::monostate > OptionalThinnedKey
void doBuildRealData(const std::string &name)
BranchID const & branchID() const
ret
prodAgent to be discontinued
std::vector< BranchDescription > SendDescs
bool registerProcessHistory(ProcessHistory const &processHistory)
void setRefCoreStreamer(bool resetAll=false)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
StreamID streamID() const
std::string hostName() const
std::vector< EventSelectionID > EventSelectionIDVector
const uint8 * descData() const
std::vector< BranchListIndex > BranchListIndexes
uint32 protocolVersion() const
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
bool insertMapped(value_type const &v, bool forceUpdate=false)
uint32 descLength() const
std::vector< StreamedProduct > SendProds
std::string hostName() const
uint32 adler32_chksum() const
std::string processName() const
TClass * getTClass(const std::type_info &ti)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
void putOnRead(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, std::optional< ProductProvenance > productProvenance) const
std::string wrappedClassName(std::string const &iFullName)
uint32 adler32_chksum() const
bool loadCap(const std::string &name, std::vector< std::string > &missingDictionaries)
bool getMapped(ProcessHistoryID const &key, ProcessHistory &value) const
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistory const *processHistory, DelayedReader *reader=nullptr)
bool adjustToNewProductRegistry(ProductRegistry const ®)
void updateFromInput(ProductList const &other)
void adjustIndexesAfterProductRegistryAddition()
unsigned int value() const
uint32 eventLength() const
static Registry * instance()