CMS 3D CMS Logo

Classes | Public Member Functions | Static Public Member Functions | Protected Member Functions | Static Protected Member Functions | Private Member Functions | Private Attributes | Static Private Attributes

edm::StreamerInputSource Class Reference

#include <StreamerInputSource.h>

Inheritance diagram for edm::StreamerInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper edm::EventStreamHttpReader edm::StreamerFileReader edm::StreamerInputModule< Producer >

List of all members.

Classes

class  ProductGetter

Public Member Functions

void deserializeAndMergeWithRegistry (InitMsgView const &initView, bool subsequent=false)
void deserializeEvent (EventMsgView const &eventView)
 StreamerInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
virtual ~StreamerInputSource ()

Static Public Member Functions

static std::auto_ptr
< SendJobHeader
deserializeRegistry (InitMsgView const &initView)
static void fillDescription (ParameterSetDescription &description)
static void mergeIntoRegistry (SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, bool subsequent)
static unsigned int uncompressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)

Protected Member Functions

void resetAfterEndRun ()

Static Protected Member Functions

static void buildClassCache (SendDescs const &descs)
static void declareStreamers (SendDescs const &descs)

Private Member Functions

virtual EventPrincipalread (EventPrincipal &eventPrincipal)
virtual boost::shared_ptr
< FileBlock
readFile_ ()
virtual void setRun (RunNumber_t r)

Private Attributes

bool adjustEventToNewProductRegistry_
std::vector< unsigned char > dest_
ProductGetter productGetter_
std::unique_ptr< SendEventsendEvent_
TClass * tc_
TBufferFile xbuf_

Static Private Attributes

static std::string processName_
static unsigned int protocolVersion_

Detailed Description

Definition at line 30 of file StreamerInputSource.h.


Constructor & Destructor Documentation

edm::StreamerInputSource::StreamerInputSource ( ParameterSet const &  pset,
InputSourceDescription const &  desc 
) [explicit]

Definition at line 48 of file StreamerInputSource.cc.

                                                       :
    RawInputSource(pset, desc),
    tc_(getTClass(typeid(SendEvent))),
    dest_(init_size),
    xbuf_(TBuffer::kRead, init_size),
    sendEvent_(),
    productGetter_(),
    adjustEventToNewProductRegistry_(false) {
  }
edm::StreamerInputSource::~StreamerInputSource ( ) [virtual]

Definition at line 60 of file StreamerInputSource.cc.

{}

Member Function Documentation

void edm::StreamerInputSource::buildClassCache ( SendDescs const &  descs) [static, protected]

Definition at line 108 of file StreamerInputSource.cc.

References edm::doBuildRealData(), alignCSCRings::e, FDEBUG, i, AlCaHLTBitMon_QueryRunRegistry::string, and edm::wrappedClassName().

Referenced by mergeIntoRegistry().

                                                             {
    SendDescs::const_iterator i(descs.begin()), e(descs.end());

    for(; i != e; ++i) {
        //pi->init();
        std::string const real_name = wrappedClassName(i->className());
        FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
        doBuildRealData(real_name);
    }
  }
void edm::StreamerInputSource::declareStreamers ( SendDescs const &  descs) [static, protected]

Definition at line 95 of file StreamerInputSource.cc.

References alignCSCRings::e, FDEBUG, i, edm::loadCap(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::wrappedClassName().

Referenced by mergeIntoRegistry().

                                                              {
    SendDescs::const_iterator i(descs.begin()), e(descs.end());

    for(; i != e; ++i) {
        //pi->init();
        std::string const real_name = wrappedClassName(i->className());
        FDEBUG(6) << "declare: " << real_name << std::endl;
        loadCap(real_name);
    }
  }
void edm::StreamerInputSource::deserializeAndMergeWithRegistry ( InitMsgView const &  initView,
bool  subsequent = false 
)

Deserializes the specified init message into a SendJobHeader object and merges registries.

Definition at line 172 of file StreamerInputSource.cc.

References adjustEventToNewProductRegistry_, edm::InputSource::branchIDListHelper(), deserializeRegistry(), i, edm::detail::ThreadSafeRegistry< KEY, T, E >::insertMapped(), instance, mergeIntoRegistry(), edm::InputSource::productRegistryUpdate(), sd, and edm::ParameterSet::setID().

Referenced by edm::StreamerFileReader::checkNextEvent(), edm::EventStreamHttpReader::readHeader(), edm::StreamerFileReader::reset_(), and edm::StreamerInputModule< Producer >::StreamerInputModule().

                                                                                                   {
     std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
     ProcessConfigurationVector const& pcv = sd->processConfigurations();
     mergeIntoRegistry(*sd, productRegistryUpdate(), *branchIDListHelper(), subsequent);
     if (subsequent) {
       adjustEventToNewProductRegistry_ = true;
     }
     SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
     pset::Registry& psetRegistry = *pset::Registry::instance();
     for (SendJobHeader::ParameterSetMap::const_iterator i = psetMap.begin(), iEnd = psetMap.end(); i != iEnd; ++i) {
       ParameterSet pset(i->second.pset());
       pset.setID(i->first);
       psetRegistry.insertMapped(pset);
     }
     ProcessConfigurationRegistry& pcReg = *ProcessConfigurationRegistry::instance();
     for (ProcessConfigurationVector::const_iterator it = pcv.begin(), itEnd = pcv.end(); it != itEnd; ++it) {
       pcReg.insertMapped(*it);
     }
  }
void edm::StreamerInputSource::deserializeEvent ( EventMsgView const &  eventView)

Deserializes the specified event message.

Definition at line 196 of file StreamerInputSource.cc.

References cms::Adler32(), EventMsgView::adler32_chksum(), dtNoiseDBValidation_cfg::cerr, EventMsgView::code(), filterCSVwithJSON::copy, dest_, Header::EVENT, EventMsgView::event(), EventMsgView::eventData(), EventMsgView::eventLength(), Exception, FDEBUG, EventMsgView::hostName(), instance, edm::Timestamp::invalidTimestamp(), EventMsgView::lumi(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), EventMsgView::origDataSize(), pos, productGetter_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), EventMsgView::run(), edm::InputSource::runAuxiliary(), sendEvent_, edm::InputSource::setEventCached(), edm::InputSource::setLuminosityBlockAuxiliary(), edm::LuminosityBlockAuxiliary::setProcessHistoryID(), edm::RunAuxiliary::setProcessHistoryID(), edm::setRefCoreStreamer(), edm::InputSource::setRunAuxiliary(), EventMsgView::size(), tc_, uncompressBuffer(), and xbuf_.

Referenced by edm::EventStreamHttpReader::checkNextEvent(), and edm::StreamerFileReader::checkNextEvent().

                                                                     {
    if(eventView.code() != Header::EVENT)
      throw cms::Exception("StreamTranslation","Event deserialization error")
        << "received wrong message type: expected EVENT, got "
        << eventView.code() << "\n";
    FDEBUG(9) << "Decode event: "
         << eventView.event() << " "
         << eventView.run() << " "
         << eventView.size() << " "
         << eventView.adler32_chksum() << " "
         << eventView.eventLength() << " "
         << eventView.eventData()
         << std::endl;
    EventSourceSentry(*this);
    // uncompress if we need to
    // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
    // need to get rid of this when 090 MTCC streamers are gotten rid of
    unsigned long origsize = eventView.origDataSize();
    unsigned long dest_size; //(should be >= eventView.origDataSize())

    uint32_t adler32_chksum = cms::Adler32((char*)eventView.eventData(), eventView.eventLength());
    //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
    //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
    //          << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
    if((uint32)adler32_chksum != eventView.adler32_chksum()) {
      std::cerr << "Error from StreamerInputSource: checksum of event data blob failed "
                << " chksum from event = " << adler32_chksum << " from header = "
                << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
      // skip event (based on option?) or throw exception?
    }
    if(origsize != 78 && origsize != 0) {
      // compressed
      dest_size = uncompressBuffer((unsigned char*)eventView.eventData(),
                                   eventView.eventLength(), dest_, origsize);
    } else { // not compressed
      // we need to copy anyway the buffer as we are using dest in xbuf
      dest_size = eventView.eventLength();
      dest_.resize(dest_size);
      unsigned char* pos = (unsigned char*) &dest_[0];
      unsigned char* from = (unsigned char*) eventView.eventData();
      std::copy(from,from+dest_size,pos);
    }
    //TBuffer xbuf(TBuffer::kRead, dest_size,
    //             (char*) &dest[0],kFALSE);
    //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
    //             (char*) eventView.eventData(),kFALSE);
    xbuf_.Reset();
    xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
    RootDebug tracer(10,10);

    setRefCoreStreamer(&productGetter_);
    sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
    setRefCoreStreamer();

    if(sendEvent_.get()==0) {
        throw cms::Exception("StreamTranslation","Event deserialization error")
          << "got a null event from input stream\n";
    }
    ProcessHistoryRegistry::instance()->insertMapped(sendEvent_->processHistory());

    FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
    if(runAuxiliary().get() == 0 || runAuxiliary()->run() != sendEvent_->aux().run()) {
      RunAuxiliary* runAuxiliary = new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
      runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
      setRunAuxiliary(runAuxiliary);
      resetLuminosityBlockAuxiliary();
    }
    if(!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
      LuminosityBlockAuxiliary* luminosityBlockAuxiliary =
        new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
      luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
      setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
    }
    setEventCached();
  }
std::auto_ptr< SendJobHeader > edm::StreamerInputSource::deserializeRegistry ( InitMsgView const &  initView) [static]

Deserializes the specified init message into a SendJobHeader object (which is related to the product registry).

Definition at line 124 of file StreamerInputSource.cc.

References cms::Adler32(), InitMsgView::adler32_chksum(), dtNoiseDBValidation_cfg::cerr, InitMsgView::code(), InitMsgView::descData(), InitMsgView::descLength(), Exception, FDEBUG, edm::getTClass(), InitMsgView::hostName(), Header::INIT, InitMsgView::processName(), processName_, InitMsgView::protocolVersion(), protocolVersion_, and sd.

Referenced by deserializeAndMergeWithRegistry(), and edm::readHeaderFromStream().

                                                                      {
    if(initView.code() != Header::INIT)
      throw cms::Exception("StreamTranslation","Registry deserialization error")
        << "received wrong message type: expected INIT, got "
        << initView.code() << "\n";

    //Get the process name and store if for Protocol version 4 and above.
    if (initView.protocolVersion() > 3) {

         processName_ = initView.processName();
         protocolVersion_ = initView.protocolVersion();

         FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
         FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
    }

   // calculate the adler32 checksum
   uint32_t adler32_chksum = cms::Adler32((char*)initView.descData(),initView.descLength());
   //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
   //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
   //          << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
    if((uint32)adler32_chksum != initView.adler32_chksum()) {
      std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed "
                << " chksum from registry data = " << adler32_chksum << " from header = "
                << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
      // skip event (based on option?) or throw exception?
    }

    TClass* desc = getTClass(typeid(SendJobHeader));

    TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
                 (char*)initView.descData(),kFALSE);
    RootDebug tracer(10,10);
    std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));

    if(sd.get()==0) {
        throw cms::Exception("StreamTranslation","Registry deserialization error")
          << "Could not read the initial product registry list\n";
    }

    return sd;
  }
void edm::StreamerInputSource::fillDescription ( ParameterSetDescription description) [static]

Reimplemented from edm::RawInputSource.

Definition at line 397 of file StreamerInputSource.cc.

Referenced by edm::StreamerFileReader::fillDescriptions().

void edm::StreamerInputSource::mergeIntoRegistry ( SendJobHeader const &  header,
ProductRegistry reg,
BranchIDListHelper branchIDListHelper,
bool  subsequent 
) [static]

Definition at line 69 of file StreamerInputSource.cc.

References edm::SendJobHeader::branchIDLists(), buildClassCache(), declareStreamers(), edm::SendJobHeader::descs(), Exception, FDEBUG, edm::fillProductRegistryTransients(), edm::loadExtraClasses(), edm::ProductRegistry::merge(), edm::BranchDescription::Permissive, edm::SendJobHeader::processConfigurations(), AlCaHLTBitMon_QueryRunRegistry::string, edm::BranchIDListHelper::updateFromInput(), and edm::ProductRegistry::updateFromInput().

Referenced by deserializeAndMergeWithRegistry(), and edm::getRegFromFile().

                                                                                                                                                   {

    SendDescs const& descs = header.descs();

    FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;

    if (subsequent) {
      ProductRegistry pReg;
      pReg.updateFromInput(descs);
      fillProductRegistryTransients(header.processConfigurations(), pReg);
      std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
      if (!mergeInfo.empty()) {
        throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
      }
      branchIDListHelper.updateFromInput(header.branchIDLists());
    } else {
      declareStreamers(descs);
      buildClassCache(descs);
      loadExtraClasses();
      reg.updateFromInput(descs);
      fillProductRegistryTransients(header.processConfigurations(), reg);
      branchIDListHelper.updateFromInput(header.branchIDLists());
    }
  }
EventPrincipal * edm::StreamerInputSource::read ( EventPrincipal eventPrincipal) [private, virtual]

Implements edm::RawInputSource.

Definition at line 273 of file StreamerInputSource.cc.

References adjustEventToNewProductRegistry_, edm::Principal::adjustIndexesAfterProductRegistryAddition(), edm::Principal::adjustToNewProductRegistry(), edm::InputSource::branchIDListHelper(), FDEBUG, edm::EventPrincipal::fillEventPrincipal(), productGetter_, edm::InputSource::productRegistry(), edm::EventPrincipal::putOnRead(), sendEvent_, edm::StreamerInputSource::ProductGetter::setEventPrincipal(), and edm::Principal::size().

                                                          {
    if(adjustEventToNewProductRegistry_) {
      eventPrincipal.adjustIndexesAfterProductRegistryAddition();
      bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
      assert(eventOK);
      adjustEventToNewProductRegistry_ = false;
    }
    boost::shared_ptr<EventSelectionIDVector> ids(new EventSelectionIDVector(sendEvent_->eventSelectionIDs()));
    boost::shared_ptr<BranchListIndexes> indexes(new BranchListIndexes(sendEvent_->branchListIndexes()));
    branchIDListHelper()->fixBranchListIndexes(*indexes);
    eventPrincipal.fillEventPrincipal(sendEvent_->aux(), ids, indexes);
    productGetter_.setEventPrincipal(&eventPrincipal);

    // no process name list handling

    SendProds & sps = sendEvent_->products();
    for(SendProds::iterator spi = sps.begin(), spe = sps.end(); spi != spe; ++spi) {
        FDEBUG(10) << "check prodpair" << std::endl;
        if(spi->desc() == 0)
          throw cms::Exception("StreamTranslation","Empty Provenance");
        FDEBUG(5) << "Prov:"
             << " " << spi->desc()->className()
             << " " << spi->desc()->productInstanceName()
             << " " << spi->desc()->branchID()
             << std::endl;

        ConstBranchDescription branchDesc(*spi->desc());
        // This ProductProvenance constructor inserts into the entry description registry
        ProductProvenance productProvenance(spi->branchID(), *spi->parents());

        if(spi->prod() != 0) {
          FDEBUG(10) << "addgroup next " << spi->branchID() << std::endl;
          eventPrincipal.putOnRead(branchDesc, spi->prod(), productProvenance);
          FDEBUG(10) << "addgroup done" << std::endl;
        } else {
          FDEBUG(10) << "addgroup empty next " << spi->branchID() << std::endl;
          eventPrincipal.putOnRead(branchDesc, spi->prod(), productProvenance);
          FDEBUG(10) << "addgroup empty done" << std::endl;
        }
        spi->clear();
    }

    FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;

    return &eventPrincipal;
  }
boost::shared_ptr< FileBlock > edm::StreamerInputSource::readFile_ ( ) [private, virtual]

Reimplemented from edm::InputSource.

Definition at line 64 of file StreamerInputSource.cc.

                                 {
    return boost::shared_ptr<FileBlock>(new FileBlock);
  }
void edm::StreamerInputSource::resetAfterEndRun ( ) [protected]

Definition at line 364 of file StreamerInputSource.cc.

References edm::InputSource::eventCached(), edm::InputSource::reset(), edm::InputSource::resetLuminosityBlockAuxiliary(), and edm::InputSource::resetRunAuxiliary().

                                             {
     // called from an online streamer source to reset after a stop command
     // so an enable command will work
     resetLuminosityBlockAuxiliary();
     resetRunAuxiliary();
     assert(!eventCached());
     reset();
  }
void edm::StreamerInputSource::setRun ( RunNumber_t  r) [private, virtual]

Reimplemented from edm::InputSource.

Definition at line 373 of file StreamerInputSource.cc.

References Exception, and edm::errors::LogicError.

                                              {
     // Need to define a dummy setRun here or else the InputSource::setRun is called
     // if we have a source inheriting from this and wants to define a setRun method
     throw Exception(errors::LogicError)
     << "StreamerInputSource::setRun()\n"
     << "Run number cannot be modified for this type of Input Source\n"
     << "Contact a Storage Manager Developer\n";
  }
unsigned int edm::StreamerInputSource::uncompressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
unsigned int  expectedFullSize 
) [static]

Uncompresses the data in the specified input buffer into the specified output buffer. The inputSize should be set to the size of the compressed data in the inputBuffer. The expectedFullSize should be set to the original size of the data (before compression). Returns the actual size of the uncompressed data. Errors are reported by throwing exceptions.

Definition at line 329 of file StreamerInputSource.cc.

References dtNoiseDBValidation_cfg::cerr, Exception, FDEBUG, and run_regression::ret.

Referenced by edm::StreamDQMDeserializer::deserializeDQMEvent(), and deserializeEvent().

                                                                       {
    unsigned long origSize = expectedFullSize;
    unsigned long uncompressedSize = expectedFullSize*1.1;
    FDEBUG(1) << "Uncompress: original size = " << origSize
              << ", compressed size = " << inputSize
              << std::endl;
    outputBuffer.resize(uncompressedSize);
    int ret = uncompress(&outputBuffer[0], &uncompressedSize,
                         inputBuffer, inputSize); // do not need compression level
    //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
    if(ret == Z_OK) {
        // check the length against original uncompressed length
        FDEBUG(10) << " original size = " << origSize << " final size = "
                   << uncompressedSize << std::endl;
        if(origSize != uncompressedSize) {
            std::cerr << "deserializeEvent: Problem with uncompress, original size = "
                 << origSize << " uncompress size = " << uncompressedSize << std::endl;
            // we throw an error and return without event! null pointer
            throw cms::Exception("StreamDeserialization","Uncompression error")
              << "mismatch event lengths should be" << origSize << " got "
              << uncompressedSize << "\n";
        }
    } else {
        // we throw an error and return without event! null pointer
        std::cerr << "deserializeEvent: Problem with uncompress, return value = "
             << ret << std::endl;
        throw cms::Exception("StreamDeserialization","Uncompression error")
            << "Error code = " << ret << "\n ";
    }
    return (unsigned int) uncompressedSize;
  }

Member Data Documentation

Definition at line 91 of file StreamerInputSource.h.

Referenced by deserializeAndMergeWithRegistry(), and read().

std::vector<unsigned char> edm::StreamerInputSource::dest_ [private]

Definition at line 87 of file StreamerInputSource.h.

Referenced by deserializeEvent().

std::string edm::StreamerInputSource::processName_ [static, private]

Definition at line 94 of file StreamerInputSource.h.

Referenced by deserializeRegistry().

Definition at line 90 of file StreamerInputSource.h.

Referenced by deserializeEvent(), and read().

unsigned int edm::StreamerInputSource::protocolVersion_ [static, private]

Definition at line 95 of file StreamerInputSource.h.

Referenced by deserializeRegistry().

std::unique_ptr<SendEvent> edm::StreamerInputSource::sendEvent_ [private]

Definition at line 89 of file StreamerInputSource.h.

Referenced by deserializeEvent(), and read().

TClass* edm::StreamerInputSource::tc_ [private]

Definition at line 86 of file StreamerInputSource.h.

Referenced by deserializeEvent().

TBufferFile edm::StreamerInputSource::xbuf_ [private]

Definition at line 88 of file StreamerInputSource.h.

Referenced by deserializeEvent().