#include <StreamerInputSource.h>
Classes | |
class | ProductGetter |
Public Member Functions | |
void | deserializeAndMergeWithRegistry (InitMsgView const &initView, bool subsequent=false) |
void | deserializeEvent (EventMsgView const &eventView) |
StreamerInputSource (ParameterSet const &pset, InputSourceDescription const &desc) | |
virtual | ~StreamerInputSource () |
Static Public Member Functions | |
static std::auto_ptr < SendJobHeader > | deserializeRegistry (InitMsgView const &initView) |
static void | fillDescription (ParameterSetDescription &description) |
static void | mergeIntoRegistry (SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, bool subsequent) |
static unsigned int | uncompressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize) |
Protected Member Functions | |
void | resetAfterEndRun () |
Static Protected Member Functions | |
static void | buildClassCache (SendDescs const &descs) |
static void | declareStreamers (SendDescs const &descs) |
Private Member Functions | |
virtual EventPrincipal * | read (EventPrincipal &eventPrincipal) |
virtual boost::shared_ptr < FileBlock > | readFile_ () |
virtual void | setRun (RunNumber_t r) |
Private Attributes | |
bool | adjustEventToNewProductRegistry_ |
std::vector< unsigned char > | dest_ |
ProductGetter | productGetter_ |
std::unique_ptr< SendEvent > | sendEvent_ |
TClass * | tc_ |
TBufferFile | xbuf_ |
Static Private Attributes | |
static std::string | processName_ |
static unsigned int | protocolVersion_ |
Definition at line 30 of file StreamerInputSource.h.
edm::StreamerInputSource::StreamerInputSource | ( | ParameterSet const & | pset, |
InputSourceDescription const & | desc | ||
) | [explicit] |
Definition at line 48 of file StreamerInputSource.cc.
: RawInputSource(pset, desc), tc_(getTClass(typeid(SendEvent))), dest_(init_size), xbuf_(TBuffer::kRead, init_size), sendEvent_(), productGetter_(), adjustEventToNewProductRegistry_(false) { }
edm::StreamerInputSource::~StreamerInputSource | ( | ) | [virtual] |
Definition at line 60 of file StreamerInputSource.cc.
{}
void edm::StreamerInputSource::buildClassCache | ( | SendDescs const & | descs | ) | [static, protected] |
Definition at line 108 of file StreamerInputSource.cc.
References edm::doBuildRealData(), alignCSCRings::e, FDEBUG, i, AlCaHLTBitMon_QueryRunRegistry::string, and edm::wrappedClassName().
Referenced by mergeIntoRegistry().
{ SendDescs::const_iterator i(descs.begin()), e(descs.end()); for(; i != e; ++i) { //pi->init(); std::string const real_name = wrappedClassName(i->className()); FDEBUG(6) << "BuildReadData: " << real_name << std::endl; doBuildRealData(real_name); } }
void edm::StreamerInputSource::declareStreamers | ( | SendDescs const & | descs | ) | [static, protected] |
Definition at line 95 of file StreamerInputSource.cc.
References alignCSCRings::e, FDEBUG, i, edm::loadCap(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::wrappedClassName().
Referenced by mergeIntoRegistry().
{ SendDescs::const_iterator i(descs.begin()), e(descs.end()); for(; i != e; ++i) { //pi->init(); std::string const real_name = wrappedClassName(i->className()); FDEBUG(6) << "declare: " << real_name << std::endl; loadCap(real_name); } }
void edm::StreamerInputSource::deserializeAndMergeWithRegistry | ( | InitMsgView const & | initView, |
bool | subsequent = false |
||
) |
Deserializes the specified init message into a SendJobHeader object and merges registries.
Definition at line 172 of file StreamerInputSource.cc.
References adjustEventToNewProductRegistry_, edm::InputSource::branchIDListHelper(), deserializeRegistry(), i, edm::detail::ThreadSafeRegistry< KEY, T, E >::insertMapped(), instance, mergeIntoRegistry(), edm::InputSource::productRegistryUpdate(), sd, and edm::ParameterSet::setID().
Referenced by edm::StreamerFileReader::checkNextEvent(), edm::EventStreamHttpReader::readHeader(), edm::StreamerFileReader::reset_(), and edm::StreamerInputModule< Producer >::StreamerInputModule().
{ std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView); ProcessConfigurationVector const& pcv = sd->processConfigurations(); mergeIntoRegistry(*sd, productRegistryUpdate(), *branchIDListHelper(), subsequent); if (subsequent) { adjustEventToNewProductRegistry_ = true; } SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet(); pset::Registry& psetRegistry = *pset::Registry::instance(); for (SendJobHeader::ParameterSetMap::const_iterator i = psetMap.begin(), iEnd = psetMap.end(); i != iEnd; ++i) { ParameterSet pset(i->second.pset()); pset.setID(i->first); psetRegistry.insertMapped(pset); } ProcessConfigurationRegistry& pcReg = *ProcessConfigurationRegistry::instance(); for (ProcessConfigurationVector::const_iterator it = pcv.begin(), itEnd = pcv.end(); it != itEnd; ++it) { pcReg.insertMapped(*it); } }
void edm::StreamerInputSource::deserializeEvent | ( | EventMsgView const & | eventView | ) |
Deserializes the specified event message.
Definition at line 196 of file StreamerInputSource.cc.
References cms::Adler32(), EventMsgView::adler32_chksum(), dtNoiseDBValidation_cfg::cerr, EventMsgView::code(), filterCSVwithJSON::copy, dest_, Header::EVENT, EventMsgView::event(), EventMsgView::eventData(), EventMsgView::eventLength(), Exception, FDEBUG, EventMsgView::hostName(), instance, edm::Timestamp::invalidTimestamp(), EventMsgView::lumi(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), EventMsgView::origDataSize(), pos, productGetter_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), EventMsgView::run(), edm::InputSource::runAuxiliary(), sendEvent_, edm::InputSource::setEventCached(), edm::InputSource::setLuminosityBlockAuxiliary(), edm::LuminosityBlockAuxiliary::setProcessHistoryID(), edm::RunAuxiliary::setProcessHistoryID(), edm::setRefCoreStreamer(), edm::InputSource::setRunAuxiliary(), EventMsgView::size(), tc_, uncompressBuffer(), and xbuf_.
Referenced by edm::EventStreamHttpReader::checkNextEvent(), and edm::StreamerFileReader::checkNextEvent().
{ if(eventView.code() != Header::EVENT) throw cms::Exception("StreamTranslation","Event deserialization error") << "received wrong message type: expected EVENT, got " << eventView.code() << "\n"; FDEBUG(9) << "Decode event: " << eventView.event() << " " << eventView.run() << " " << eventView.size() << " " << eventView.adler32_chksum() << " " << eventView.eventLength() << " " << eventView.eventData() << std::endl; EventSourceSentry(*this); // uncompress if we need to // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed // need to get rid of this when 090 MTCC streamers are gotten rid of unsigned long origsize = eventView.origDataSize(); unsigned long dest_size; //(should be >= eventView.origDataSize()) uint32_t adler32_chksum = cms::Adler32((char*)eventView.eventData(), eventView.eventLength()); //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl; //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " " // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl; if((uint32)adler32_chksum != eventView.adler32_chksum()) { std::cerr << "Error from StreamerInputSource: checksum of event data blob failed " << " chksum from event = " << adler32_chksum << " from header = " << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl; // skip event (based on option?) or throw exception? } if(origsize != 78 && origsize != 0) { // compressed dest_size = uncompressBuffer((unsigned char*)eventView.eventData(), eventView.eventLength(), dest_, origsize); } else { // not compressed // we need to copy anyway the buffer as we are using dest in xbuf dest_size = eventView.eventLength(); dest_.resize(dest_size); unsigned char* pos = (unsigned char*) &dest_[0]; unsigned char* from = (unsigned char*) eventView.eventData(); std::copy(from,from+dest_size,pos); } //TBuffer xbuf(TBuffer::kRead, dest_size, // (char*) &dest[0],kFALSE); //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(), // (char*) eventView.eventData(),kFALSE); xbuf_.Reset(); xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE); RootDebug tracer(10,10); setRefCoreStreamer(&productGetter_); sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_)); setRefCoreStreamer(); if(sendEvent_.get()==0) { throw cms::Exception("StreamTranslation","Event deserialization error") << "got a null event from input stream\n"; } ProcessHistoryRegistry::instance()->insertMapped(sendEvent_->processHistory()); FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl; if(runAuxiliary().get() == 0 || runAuxiliary()->run() != sendEvent_->aux().run()) { RunAuxiliary* runAuxiliary = new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp()); runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id()); setRunAuxiliary(runAuxiliary); resetLuminosityBlockAuxiliary(); } if(!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) { LuminosityBlockAuxiliary* luminosityBlockAuxiliary = new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp()); luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id()); setLuminosityBlockAuxiliary(luminosityBlockAuxiliary); } setEventCached(); }
std::auto_ptr< SendJobHeader > edm::StreamerInputSource::deserializeRegistry | ( | InitMsgView const & | initView | ) | [static] |
Deserializes the specified init message into a SendJobHeader object (which is related to the product registry).
Definition at line 124 of file StreamerInputSource.cc.
References cms::Adler32(), InitMsgView::adler32_chksum(), dtNoiseDBValidation_cfg::cerr, InitMsgView::code(), InitMsgView::descData(), InitMsgView::descLength(), Exception, FDEBUG, edm::getTClass(), InitMsgView::hostName(), Header::INIT, InitMsgView::processName(), processName_, InitMsgView::protocolVersion(), protocolVersion_, and sd.
Referenced by deserializeAndMergeWithRegistry(), and edm::readHeaderFromStream().
{ if(initView.code() != Header::INIT) throw cms::Exception("StreamTranslation","Registry deserialization error") << "received wrong message type: expected INIT, got " << initView.code() << "\n"; //Get the process name and store if for Protocol version 4 and above. if (initView.protocolVersion() > 3) { processName_ = initView.processName(); protocolVersion_ = initView.protocolVersion(); FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl; FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl; } // calculate the adler32 checksum uint32_t adler32_chksum = cms::Adler32((char*)initView.descData(),initView.descLength()); //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl; //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " " // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl; if((uint32)adler32_chksum != initView.adler32_chksum()) { std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed " << " chksum from registry data = " << adler32_chksum << " from header = " << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl; // skip event (based on option?) or throw exception? } TClass* desc = getTClass(typeid(SendJobHeader)); TBufferFile xbuf(TBuffer::kRead, initView.descLength(), (char*)initView.descData(),kFALSE); RootDebug tracer(10,10); std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc)); if(sd.get()==0) { throw cms::Exception("StreamTranslation","Registry deserialization error") << "Could not read the initial product registry list\n"; } return sd; }
void edm::StreamerInputSource::fillDescription | ( | ParameterSetDescription & | description | ) | [static] |
Reimplemented from edm::RawInputSource.
Definition at line 397 of file StreamerInputSource.cc.
Referenced by edm::StreamerFileReader::fillDescriptions().
{ RawInputSource::fillDescription(desc); }
void edm::StreamerInputSource::mergeIntoRegistry | ( | SendJobHeader const & | header, |
ProductRegistry & | reg, | ||
BranchIDListHelper & | branchIDListHelper, | ||
bool | subsequent | ||
) | [static] |
Definition at line 69 of file StreamerInputSource.cc.
References edm::SendJobHeader::branchIDLists(), buildClassCache(), declareStreamers(), edm::SendJobHeader::descs(), Exception, FDEBUG, edm::fillProductRegistryTransients(), edm::loadExtraClasses(), edm::ProductRegistry::merge(), edm::BranchDescription::Permissive, edm::SendJobHeader::processConfigurations(), AlCaHLTBitMon_QueryRunRegistry::string, edm::BranchIDListHelper::updateFromInput(), and edm::ProductRegistry::updateFromInput().
Referenced by deserializeAndMergeWithRegistry(), and edm::getRegFromFile().
{ SendDescs const& descs = header.descs(); FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl; if (subsequent) { ProductRegistry pReg; pReg.updateFromInput(descs); fillProductRegistryTransients(header.processConfigurations(), pReg); std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive); if (!mergeInfo.empty()) { throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo; } branchIDListHelper.updateFromInput(header.branchIDLists()); } else { declareStreamers(descs); buildClassCache(descs); loadExtraClasses(); reg.updateFromInput(descs); fillProductRegistryTransients(header.processConfigurations(), reg); branchIDListHelper.updateFromInput(header.branchIDLists()); } }
EventPrincipal * edm::StreamerInputSource::read | ( | EventPrincipal & | eventPrincipal | ) | [private, virtual] |
Implements edm::RawInputSource.
Definition at line 273 of file StreamerInputSource.cc.
References adjustEventToNewProductRegistry_, edm::Principal::adjustIndexesAfterProductRegistryAddition(), edm::Principal::adjustToNewProductRegistry(), edm::InputSource::branchIDListHelper(), FDEBUG, edm::EventPrincipal::fillEventPrincipal(), productGetter_, edm::InputSource::productRegistry(), edm::EventPrincipal::putOnRead(), sendEvent_, edm::StreamerInputSource::ProductGetter::setEventPrincipal(), and edm::Principal::size().
{ if(adjustEventToNewProductRegistry_) { eventPrincipal.adjustIndexesAfterProductRegistryAddition(); bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry()); assert(eventOK); adjustEventToNewProductRegistry_ = false; } boost::shared_ptr<EventSelectionIDVector> ids(new EventSelectionIDVector(sendEvent_->eventSelectionIDs())); boost::shared_ptr<BranchListIndexes> indexes(new BranchListIndexes(sendEvent_->branchListIndexes())); branchIDListHelper()->fixBranchListIndexes(*indexes); eventPrincipal.fillEventPrincipal(sendEvent_->aux(), ids, indexes); productGetter_.setEventPrincipal(&eventPrincipal); // no process name list handling SendProds & sps = sendEvent_->products(); for(SendProds::iterator spi = sps.begin(), spe = sps.end(); spi != spe; ++spi) { FDEBUG(10) << "check prodpair" << std::endl; if(spi->desc() == 0) throw cms::Exception("StreamTranslation","Empty Provenance"); FDEBUG(5) << "Prov:" << " " << spi->desc()->className() << " " << spi->desc()->productInstanceName() << " " << spi->desc()->branchID() << std::endl; ConstBranchDescription branchDesc(*spi->desc()); // This ProductProvenance constructor inserts into the entry description registry ProductProvenance productProvenance(spi->branchID(), *spi->parents()); if(spi->prod() != 0) { FDEBUG(10) << "addgroup next " << spi->branchID() << std::endl; eventPrincipal.putOnRead(branchDesc, spi->prod(), productProvenance); FDEBUG(10) << "addgroup done" << std::endl; } else { FDEBUG(10) << "addgroup empty next " << spi->branchID() << std::endl; eventPrincipal.putOnRead(branchDesc, spi->prod(), productProvenance); FDEBUG(10) << "addgroup empty done" << std::endl; } spi->clear(); } FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl; return &eventPrincipal; }
boost::shared_ptr< FileBlock > edm::StreamerInputSource::readFile_ | ( | ) | [private, virtual] |
Reimplemented from edm::InputSource.
Definition at line 64 of file StreamerInputSource.cc.
{ return boost::shared_ptr<FileBlock>(new FileBlock); }
void edm::StreamerInputSource::resetAfterEndRun | ( | ) | [protected] |
Definition at line 364 of file StreamerInputSource.cc.
References edm::InputSource::eventCached(), edm::InputSource::reset(), edm::InputSource::resetLuminosityBlockAuxiliary(), and edm::InputSource::resetRunAuxiliary().
{ // called from an online streamer source to reset after a stop command // so an enable command will work resetLuminosityBlockAuxiliary(); resetRunAuxiliary(); assert(!eventCached()); reset(); }
void edm::StreamerInputSource::setRun | ( | RunNumber_t | r | ) | [private, virtual] |
Reimplemented from edm::InputSource.
Definition at line 373 of file StreamerInputSource.cc.
References Exception, and edm::errors::LogicError.
{ // Need to define a dummy setRun here or else the InputSource::setRun is called // if we have a source inheriting from this and wants to define a setRun method throw Exception(errors::LogicError) << "StreamerInputSource::setRun()\n" << "Run number cannot be modified for this type of Input Source\n" << "Contact a Storage Manager Developer\n"; }
unsigned int edm::StreamerInputSource::uncompressBuffer | ( | unsigned char * | inputBuffer, |
unsigned int | inputSize, | ||
std::vector< unsigned char > & | outputBuffer, | ||
unsigned int | expectedFullSize | ||
) | [static] |
Uncompresses the data in the specified input buffer into the specified output buffer. The inputSize should be set to the size of the compressed data in the inputBuffer. The expectedFullSize should be set to the original size of the data (before compression). Returns the actual size of the uncompressed data. Errors are reported by throwing exceptions.
Definition at line 329 of file StreamerInputSource.cc.
References dtNoiseDBValidation_cfg::cerr, Exception, FDEBUG, and run_regression::ret.
Referenced by edm::StreamDQMDeserializer::deserializeDQMEvent(), and deserializeEvent().
{ unsigned long origSize = expectedFullSize; unsigned long uncompressedSize = expectedFullSize*1.1; FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl; outputBuffer.resize(uncompressedSize); int ret = uncompress(&outputBuffer[0], &uncompressedSize, inputBuffer, inputSize); // do not need compression level //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl; if(ret == Z_OK) { // check the length against original uncompressed length FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl; if(origSize != uncompressedSize) { std::cerr << "deserializeEvent: Problem with uncompress, original size = " << origSize << " uncompress size = " << uncompressedSize << std::endl; // we throw an error and return without event! null pointer throw cms::Exception("StreamDeserialization","Uncompression error") << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n"; } } else { // we throw an error and return without event! null pointer std::cerr << "deserializeEvent: Problem with uncompress, return value = " << ret << std::endl; throw cms::Exception("StreamDeserialization","Uncompression error") << "Error code = " << ret << "\n "; } return (unsigned int) uncompressedSize; }
bool edm::StreamerInputSource::adjustEventToNewProductRegistry_ [private] |
Definition at line 91 of file StreamerInputSource.h.
Referenced by deserializeAndMergeWithRegistry(), and read().
std::vector<unsigned char> edm::StreamerInputSource::dest_ [private] |
Definition at line 87 of file StreamerInputSource.h.
Referenced by deserializeEvent().
std::string edm::StreamerInputSource::processName_ [static, private] |
Definition at line 94 of file StreamerInputSource.h.
Referenced by deserializeRegistry().
Definition at line 90 of file StreamerInputSource.h.
Referenced by deserializeEvent(), and read().
unsigned int edm::StreamerInputSource::protocolVersion_ [static, private] |
Definition at line 95 of file StreamerInputSource.h.
Referenced by deserializeRegistry().
std::unique_ptr<SendEvent> edm::StreamerInputSource::sendEvent_ [private] |
Definition at line 89 of file StreamerInputSource.h.
Referenced by deserializeEvent(), and read().
TClass* edm::StreamerInputSource::tc_ [private] |
Definition at line 86 of file StreamerInputSource.h.
Referenced by deserializeEvent().
TBufferFile edm::StreamerInputSource::xbuf_ [private] |
Definition at line 88 of file StreamerInputSource.h.
Referenced by deserializeEvent().