50 eventPrincipalHolder_(),
51 adjustEventToNewProductRegistry_(
false),
53 protocolVersion_(0U) {
59 std::unique_ptr<FileBlock>
61 return std::unique_ptr<FileBlock>(
new FileBlock);
69 FDEBUG(6) <<
"mergeIntoRegistry: Product List: " << std::endl;
75 if (!mergeInfo.empty()) {
76 throw cms::Exception(
"MismatchedInput",
"RootInputFileSequence::previousEvent()") << mergeInfo;
92 for(
auto const& item : descs) {
95 FDEBUG(6) <<
"declare: " << real_name << std::endl;
103 for(
auto const& item : descs) {
106 FDEBUG(6) <<
"BuildReadData: " << real_name << std::endl;
115 std::auto_ptr<SendJobHeader>
118 throw cms::Exception(
"StreamTranslation",
"Registry deserialization error")
119 <<
"received wrong message type: expected INIT, got "
120 << initView.
code() <<
"\n";
128 FDEBUG(10) <<
"StreamerInputSource::deserializeRegistry processName = "<<
processName_<< std::endl;
129 FDEBUG(10) <<
"StreamerInputSource::deserializeRegistry protocolVersion_= "<<
protocolVersion_<< std::endl;
138 std::cerr <<
"Error from StreamerInputSource: checksum of Init registry blob failed "
139 <<
" chksum from registry data = " << adler32_chksum <<
" from header = "
146 TBufferFile xbuf(TBuffer::kRead, initView.
descLength(),
147 const_cast<char*
>((
char const*)initView.
descData()),kFALSE);
149 std::auto_ptr<SendJobHeader>
sd((
SendJobHeader*)xbuf.ReadObjectAny(desc));
151 if(
sd.get() ==
nullptr) {
152 throw cms::Exception(
"StreamTranslation",
"Registry deserialization error")
153 <<
"Could not read the initial product registry list\n";
156 sd->initializeTransients();
173 for (
auto const& item : psetMap) {
175 pset.
setID(item.first);
186 throw cms::Exception(
"StreamTranslation",
"Event deserialization error")
187 <<
"received wrong message type: expected EVENT, got "
188 << eventView.
code() <<
"\n";
189 FDEBUG(9) <<
"Decode event: "
190 << eventView.
event() <<
" "
191 << eventView.
run() <<
" "
192 << eventView.
size() <<
" "
201 unsigned long dest_size;
208 std::cerr <<
"Error from StreamerInputSource: checksum of event data blob failed "
209 <<
" chksum from event = " << adler32_chksum <<
" from header = "
213 if(origsize != 78 && origsize != 0) {
220 dest_.resize(dest_size);
221 unsigned char* pos = (
unsigned char*) &
dest_[0];
222 unsigned char const* from = (
unsigned char const*) eventView.
eventData();
238 throw cms::Exception(
"StreamTranslation",
"Event deserialization error")
239 <<
"got a null event from input stream\n";
276 for(
auto& spitem : sps) {
277 FDEBUG(10) <<
"check prodpair" << std::endl;
278 if(spitem.desc() ==
nullptr)
281 <<
" " << spitem.desc()->className()
282 <<
" " << spitem.desc()->productInstanceName()
283 <<
" " << spitem.desc()->branchID()
290 if(spitem.prod() !=
nullptr) {
291 FDEBUG(10) <<
"addproduct next " << spitem.branchID() << std::endl;
292 eventPrincipal.
putOnRead(branchDesc, spitem.prod(), productProvenance);
293 FDEBUG(10) <<
"addproduct done" << std::endl;
295 FDEBUG(10) <<
"addproduct empty next " << spitem.branchID() << std::endl;
296 eventPrincipal.
putOnRead(branchDesc, spitem.prod(), productProvenance);
297 FDEBUG(10) <<
"addproduct empty done" << std::endl;
302 FDEBUG(10) <<
"Size = " << eventPrincipal.
size() << std::endl;
315 unsigned int inputSize,
316 std::vector<unsigned char>& outputBuffer,
317 unsigned int expectedFullSize) {
318 unsigned long origSize = expectedFullSize;
319 unsigned long uncompressedSize = expectedFullSize*1.1;
320 FDEBUG(1) <<
"Uncompress: original size = " << origSize
321 <<
", compressed size = " << inputSize
323 outputBuffer.resize(uncompressedSize);
324 int ret = uncompress(&outputBuffer[0], &uncompressedSize,
325 inputBuffer, inputSize);
329 FDEBUG(10) <<
" original size = " << origSize <<
" final size = "
330 << uncompressedSize << std::endl;
331 if(origSize != uncompressedSize) {
332 std::cerr <<
"deserializeEvent: Problem with uncompress, original size = "
333 << origSize <<
" uncompress size = " << uncompressedSize << std::endl;
335 throw cms::Exception(
"StreamDeserialization",
"Uncompression error")
336 <<
"mismatch event lengths should be" << origSize <<
" got "
337 << uncompressedSize <<
"\n";
341 std::cerr <<
"deserializeEvent: Problem with uncompress, return value = "
343 throw cms::Exception(
"StreamDeserialization",
"Uncompression error")
344 <<
"Error code = " << ret <<
"\n ";
346 return (
unsigned int) uncompressedSize;
362 <<
"StreamerInputSource::setRun()\n"
363 <<
"Run number cannot be modified for this type of Input Source\n"
364 <<
"Contact a Storage Manager Developer\n";
373 return eventPrincipal_ ? eventPrincipal_->getIt(
id) :
WrapperHolder();
378 assert(eventPrincipal_ !=
nullptr);
379 return eventPrincipal_->transitionIndex();
384 eventPrincipal_ = ep;
void setID(ParameterSetID const &id)
static Timestamp invalidTimestamp()
const uint8 * eventData() const
const uint8 * descData() const
std::string hostName() const
std::vector< BranchDescription > SendDescs
void doBuildRealData(const std::string &name)
bool registerProcessHistory(ProcessHistory const &processHistory)
void setRefCoreStreamer(bool resetAll=false)
TClass * getTClass(const std::type_info &ti)
std::vector< EventSelectionID > EventSelectionIDVector
uint32 adler32_chksum() const
uint32 eventLength() const
uint32 adler32_chksum() const
std::vector< BranchListIndex > BranchListIndexes
std::string hostName() const
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
void setProcessHistoryID(ProcessHistoryID const &phid)
std::vector< StreamedProduct > SendProds
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
uint32 protocolVersion() const
std::string wrappedClassName(std::string const &iFullName)
uint32 origDataSize() const
bool updateFromInput(BranchIDLists const &bidlists)
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
bool adjustToNewProductRegistry(ProductRegistry const ®)
std::string processName() const
void updateFromInput(ProductList const &other)
void putOnRead(BranchDescription const &bd, void const *product, ProductProvenance const &productProvenance)
void setProcessHistoryID(ProcessHistoryID const &phid)
void adjustIndexesAfterProductRegistryAddition()
uint32 descLength() const
volatile std::atomic< bool > shutdown_flag false
bool insertMapped(value_type const &v)
static Registry * instance()
void loadCap(const std::string &name)