51 eventPrincipalHolder_(),
52 adjustEventToNewProductRegistry_(
false),
54 protocolVersion_(0
U) {
60 std::unique_ptr<FileBlock>
62 return std::make_unique<FileBlock>();
74 FDEBUG(6) <<
"mergeIntoRegistry: Product List: " << std::endl;
80 if (!mergeInfo.empty()) {
81 throw cms::Exception(
"MismatchedInput",
"RootInputFileSequence::previousEvent()") << mergeInfo;
99 std::vector<std::string> missingDictionaries;
100 std::vector<std::string> branchNamesForMissing;
101 std::vector<std::string> producedTypes;
102 for (
auto const& item : descs) {
105 FDEBUG(6) <<
"declare: " << real_name << std::endl;
106 if (!
loadCap(real_name, missingDictionaries)) {
107 branchNamesForMissing.emplace_back(item.branchName());
108 producedTypes.emplace_back(item.className() +
std::string(
" (read from input)"));
111 if (!missingDictionaries.empty()) {
112 std::string context(
"Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
120 for(
auto const& item : descs) {
123 FDEBUG(6) <<
"BuildReadData: " << real_name << std::endl;
132 std::unique_ptr<SendJobHeader>
135 throw cms::Exception(
"StreamTranslation",
"Registry deserialization error")
136 <<
"received wrong message type: expected INIT, got " 137 << initView.
code() <<
"\n";
145 FDEBUG(10) <<
"StreamerInputSource::deserializeRegistry processName = "<<
processName_<< std::endl;
146 FDEBUG(10) <<
"StreamerInputSource::deserializeRegistry protocolVersion_= "<<
protocolVersion_<< std::endl;
157 <<
" chksum from registry data = " << adler32_chksum <<
" from header = " 163 TBufferFile xbuf(TBuffer::kRead, initView.
descLength(),
164 const_cast<char*
>((
char const*)initView.
descData()),kFALSE);
166 std::unique_ptr<SendJobHeader>
sd((
SendJobHeader*)xbuf.ReadObjectAny(desc));
168 if(
sd.get() ==
nullptr) {
169 throw cms::Exception(
"StreamTranslation",
"Registry deserialization error")
170 <<
"Could not read the initial product registry list\n";
173 sd->initializeTransients();
190 for (
auto const& item : psetMap) {
192 pset.setID(item.first);
203 throw cms::Exception(
"StreamTranslation",
"Event deserialization error")
204 <<
"received wrong message type: expected EVENT, got " 205 << eventView.
code() <<
"\n";
206 FDEBUG(9) <<
"Decode event: " 207 << eventView.
event() <<
" " 208 << eventView.
run() <<
" " 209 << eventView.
size() <<
" " 218 unsigned long dest_size;
227 <<
" chksum from event = " << adler32_chksum <<
" from header = " 230 if(origsize != 78 && origsize != 0) {
237 dest_.resize(dest_size);
238 unsigned char*
pos = (
unsigned char*) &
dest_[0];
239 unsigned char const* from = (
unsigned char const*) eventView.
eventData();
260 if(sendEvent_.get() ==
nullptr) {
261 throw cms::Exception(
"StreamTranslation",
"Event deserialization error")
262 <<
"got a null event from input stream\n";
266 FDEBUG(5) <<
"Got event: " << sendEvent_->aux().id() <<
" " << sendEvent_->products().size() << std::endl;
306 for(
auto& spitem : sps) {
307 FDEBUG(10) <<
"check prodpair" << std::endl;
308 if(spitem.desc() ==
nullptr)
311 <<
" " << spitem.desc()->className()
312 <<
" " << spitem.desc()->productInstanceName()
313 <<
" " << spitem.desc()->branchID()
318 if (spitem.parents()) {
320 if(spitem.prod() !=
nullptr) {
321 FDEBUG(10) <<
"addproduct next " << spitem.branchID() << std::endl;
322 eventPrincipal.
putOnRead(branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), &productProvenance);
323 FDEBUG(10) <<
"addproduct done" << std::endl;
325 FDEBUG(10) <<
"addproduct empty next " << spitem.branchID() << std::endl;
326 eventPrincipal.
putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), &productProvenance);
327 FDEBUG(10) <<
"addproduct empty done" << std::endl;
331 if(spitem.prod() !=
nullptr) {
332 FDEBUG(10) <<
"addproduct next " << spitem.branchID() << std::endl;
333 eventPrincipal.
putOnRead(branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
334 FDEBUG(10) <<
"addproduct done" << std::endl;
336 FDEBUG(10) <<
"addproduct empty next " << spitem.branchID() << std::endl;
337 eventPrincipal.
putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
338 FDEBUG(10) <<
"addproduct empty done" << std::endl;
344 FDEBUG(10) <<
"Size = " << eventPrincipal.
size() << std::endl;
357 unsigned int inputSize,
358 std::vector<unsigned char>& outputBuffer,
359 unsigned int expectedFullSize) {
360 unsigned long origSize = expectedFullSize;
361 unsigned long uncompressedSize = expectedFullSize*1.1;
362 FDEBUG(1) <<
"Uncompress: original size = " << origSize
363 <<
", compressed size = " << inputSize
365 outputBuffer.resize(uncompressedSize);
366 int ret = uncompress(&outputBuffer[0], &uncompressedSize,
367 inputBuffer, inputSize);
371 FDEBUG(10) <<
" original size = " << origSize <<
" final size = " 372 << uncompressedSize << std::endl;
373 if(origSize != uncompressedSize) {
375 throw cms::Exception(
"StreamDeserialization",
"Uncompression error")
376 <<
"mismatch event lengths should be" << origSize <<
" got " 377 << uncompressedSize <<
"\n";
381 throw cms::Exception(
"StreamDeserialization",
"Uncompression error")
382 <<
"Error code = " << ret <<
"\n ";
384 return (
unsigned int) uncompressedSize;
400 <<
"StreamerInputSource::setRun()\n" 401 <<
"Run number cannot be modified for this type of Input Source\n" 402 <<
"Contact a Storage Manager Developer\n";
421 std::vector<WrapperBase const*>& wrappers,
422 std::vector<unsigned int>&
keys)
const {
void throwMissingDictionariesException(std::vector< std::string > &missingDictionaries, std::string const &context)
virtual WrapperBase const * getThinnedProduct(ProductID const &pid, unsigned int &key) const override
static Timestamp invalidTimestamp()
const uint8 * eventData() const
const uint8 * descData() const
std::string hostName() const
std::vector< BranchDescription > SendDescs
void doBuildRealData(const std::string &name)
bool registerProcessHistory(ProcessHistory const &processHistory)
void setRefCoreStreamer(bool resetAll=false)
void updateFromPrimaryInput(ThinnedAssociationsHelper const &)
std::vector< EventSelectionID > EventSelectionIDVector
virtual void getThinnedProducts(ProductID const &pid, std::vector< WrapperBase const * > &foundContainers, std::vector< unsigned int > &keys) const override
uint32 adler32_chksum() const
uint32 eventLength() const
uint32 adler32_chksum() const
std::vector< BranchListIndex > BranchListIndexes
StreamID streamID() const
std::string hostName() const
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
bool insertMapped(value_type const &v, bool forceUpdate=false)
void setProcessHistoryID(ProcessHistoryID const &phid)
std::vector< StreamedProduct > SendProds
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
uint32 protocolVersion() const
unsigned int value() const
std::string wrappedClassName(std::string const &iFullName)
void putOnRead(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const *productProvenance) const
bool loadCap(const std::string &name, std::vector< std::string > &missingDictionaries)
TClass * getTClass(const std::type_info &ti)
uint32 origDataSize() const
unsigned int transitionIndex() const
bool updateFromInput(BranchIDLists const &bidlists)
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
bool adjustToNewProductRegistry(ProductRegistry const ®)
std::string processName() const
void updateFromInput(ProductList const &other)
void setProcessHistoryID(ProcessHistoryID const &phid)
void adjustIndexesAfterProductRegistryAddition()
uint32 descLength() const
virtual WrapperBase const * getIt(ProductID const &pid) const override
static Registry * instance()