CMS 3D CMS Logo

Classes | Public Member Functions | Static Public Member Functions | Private Attributes

edm::StreamSerializer Class Reference

#include <StreamSerializer.h>

List of all members.

Classes

struct  Arr

Public Member Functions

int serializeEvent (EventPrincipal const &eventPrincipal, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer)
int serializeRegistry (SerializeDataBuffer &data_buffer)
 StreamSerializer (Selections const *selections)

Static Public Member Functions

static unsigned int compressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)

Private Attributes

Selections const * selections_
TClass * tc_

Detailed Description

Definition at line 62 of file StreamSerializer.h.


Constructor & Destructor Documentation

edm::StreamSerializer::StreamSerializer ( Selections const *  selections)

Creates a translator instance for the specified product registry.

Definition at line 38 of file StreamSerializer.cc.

                                                                :
    selections_(selections),
    tc_(getTClass(typeid(SendEvent))) {
  }

Member Function Documentation

unsigned int edm::StreamSerializer::compressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
int  compressionLevel 
) [static]

Compresses the data in the specified input buffer into the specified output buffer. Returns the size of the compressed data or zero if compression failed.

Definition at line 241 of file StreamSerializer.cc.

References benchmark_cfg::cerr, FDEBUG, and runTheMatrix::ret.

Referenced by edm::StreamDQMSerializer::serializeDQMEvent(), and serializeEvent().

                                                         {
    unsigned int resultSize = 0;

    // what are these magic numbers? (jbk)
    unsigned long dest_size = (unsigned long)(double(inputSize)*
                                              1.002 + 1.0) + 12;
    if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);

    // compression 1-9, 6 is zlib default, 0 none
    int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
                        inputSize, compressionLevel);

    // check status
    if(ret == Z_OK) {
        // return the correct length
        resultSize = dest_size;

        FDEBUG(1) << " original size = " << inputSize
                  << " final size = " << dest_size
                  << " ratio = " << double(dest_size)/double(inputSize)
                  << std::endl;
    } else {
        // compression failed, return a size of zero
        FDEBUG(9) << "Compression Return value: " << ret
                  << " Okay = " << Z_OK << std::endl;
        // do we throw an exception here?
        std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;

    }

    return resultSize;
  }
int edm::StreamSerializer::serializeEvent ( EventPrincipal const &  eventPrincipal,
ParameterSetID const &  selectorConfig,
bool  use_compression,
int  compression_level,
SerializeDataBuffer data_buffer 
)

Serializes the specified event into the specified event message.

make a char* as a data member, tell ROOT to not adapt it, but still use it. initialize it to 1M, let ROOT resize if it wants, then delete it in the dtor.

change the call to not take an eventMessage, add a member function to return the address of the place that ROOT wrote the serialized data.

return the length of the serialized object and the actual length if compression has been done (may want to cache these lengths in this object instead.

the caller will need to copy the data from this object to its final destination in the EventMsgBuilder.

Definition at line 139 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, edm::EventPrincipal::aux(), edm::BranchDescription::branchID(), edm::EventPrincipal::branchListIndexes(), benchmark_cfg::cerr, SerializeDataBuffer::comp_buf_, compressBuffer(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, edm::EventPrincipal::eventSelectionIDs(), Exception, newFWLiteAna::found, edm::Principal::getForOutput(), i, edm::EventPrincipal::id(), edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), edm::ProductProvenance::parentageID(), edm::Parentage::parents(), edm::Principal::processHistory(), edm::OutputHandle::productProvenance(), SerializeDataBuffer::ptr_, SerializeDataBuffer::rootbuf_, selections_, tc_, and edm::OutputHandle::wrapper().

Referenced by edm::StreamerOutputModuleBase::serializeEvent().

                                                                         {
    Parentage parentage;

    EventSelectionIDVector selectionIDs = eventPrincipal.eventSelectionIDs();
    selectionIDs.push_back(selectorConfig);
    SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory(), selectionIDs, eventPrincipal.branchListIndexes());

    Selections::const_iterator i(selections_->begin()),ie(selections_->end());
    // Loop over EDProducts, fill the provenance, and write.

    for(Selections::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) {
      BranchDescription const& desc = **i;
      BranchID const& id = desc.branchID();

      OutputHandle const oh = eventPrincipal.getForOutput(id, true);
      if(!oh.productProvenance()) {
        // No product with this ID was put in the event.
        // Create and write the provenance.
        se.products().push_back(StreamedProduct(desc));
      } else {
        bool found = ParentageRegistry::instance()->getMapped(oh.productProvenance()->parentageID(), parentage);
        assert(found);
        se.products().push_back(StreamedProduct(oh.wrapper(),
                                                desc,
                                                oh.wrapper() != 0,
                                                &parentage.parents()));
      }
    }

    data_buffer.rootbuf_.Reset();
    RootDebug tracer(10,10);

    //TClass* tc = getTClass(typeid(SendEvent));
    int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
    switch(bres) {
      case 0: // failure
        {
          throw cms::Exception("StreamTranslation","Event serialization failed")
            << "StreamSerializer failed to serialize event: "
            << eventPrincipal.id();
          break;
        }
      case 1: // succcess
        break;
      case 2: // truncated result
        {
          throw cms::Exception("StreamTranslation","Event serialization truncated")
            << "StreamSerializer module attempted to serialize an event\n"
            << "that is to big for the allocated buffers: "
            << eventPrincipal.id();
          break;
        }
    default: // unknown
        {
          throw cms::Exception("StreamTranslation","Event serialization failed")
            << "StreamSerializer module got an unknown error code\n"
            << " while attempting to serialize event: "
            << eventPrincipal.id();
          break;
        }
      }

   data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
   data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
   data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
#if 0
   if(data_buffer.ptr_ != data_.ptr_) {
        std::cerr << "ROOT reset the buffer!!!!\n";
        data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
        }
#endif
   // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
   // eventMessage.eventAddr());
   // eventMessage.setEventLength(rootbuf.Length());

    // compress before return if we need to
    // should test if compressed already - should never be?
    //   as double compression can have problems
    if(use_compression) {
      unsigned int dest_size =
        compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
      if(dest_size != 0) {
        data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
        data_buffer.curr_space_used_ = dest_size;
      }
    }
    // calculate the adler32 checksum and fill it into the struct
    data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
    //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;

    return data_buffer.curr_space_used_;
  }
int edm::StreamSerializer::serializeRegistry ( SerializeDataBuffer data_buffer)

Serializes the product registry (that was specified to the constructor) into the specified InitMessage.

Definition at line 48 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, AlCaHLTBitMon_QueryRunRegistry::data, Exception, FDEBUG, edm::pset::fillMap(), edm::getTClass(), i, edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), edm::detail::ThreadSafeIndexedRegistry< T, E >::instance(), SerializeDataBuffer::ptr_, edm::SendJobHeader::push_back(), SerializeDataBuffer::rootbuf_, selections_, edm::SendJobHeader::setBranchIDLists(), edm::SendJobHeader::setParameterSetMap(), edm::SendJobHeader::setProcessConfigurations(), and edm::sort_all().

Referenced by edm::StreamerOutputModuleBase::serializeRegistry().

                                                                          {
    FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
    SendJobHeader sd;

    Selections::const_iterator i(selections_->begin()), e(selections_->end());

    FDEBUG(9) << "Product List: " << std::endl;


    for(; i != e; ++i)  {
        sd.push_back(**i);
        FDEBUG(9) << "StreamOutput got product = " << (*i)->className()
                  << std::endl;
    }
    Service<ConstProductRegistry> reg;
    sd.setBranchIDLists(BranchIDListRegistry::instance()->data());
    SendJobHeader::ParameterSetMap psetMap;

    pset::fillMap(pset::Registry::instance(), psetMap);
    sd.setParameterSetMap(psetMap);

    typedef ProcessConfigurationRegistry::collection_type PCMap;
    PCMap const& procConfigMap = ProcessConfigurationRegistry::instance()->data();
    ProcessConfigurationVector procConfigVector;
    for(PCMap::const_iterator i = procConfigMap.begin(), e = procConfigMap.end(); i != e; ++i) {
      procConfigVector.push_back(i->second);
    }
    sort_all(procConfigVector);
    sd.setProcessConfigurations(procConfigVector);

    data_buffer.rootbuf_.Reset();

    RootDebug tracer(10,10);

    TClass* tc = getTClass(typeid(SendJobHeader));
    int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);

    switch(bres) {
      case 0: // failure
      {
          throw cms::Exception("StreamTranslation","Registry serialization failed")
            << "StreamSerializer failed to serialize registry\n";
          break;
      }
      case 1: // succcess
        break;
      case 2: // truncated result
      {
          throw cms::Exception("StreamTranslation","Registry serialization truncated")
            << "StreamSerializer module attempted to serialize\n"
            << "a registry that is to big for the allocated buffers\n";
          break;
      }
      default: // unknown
      {
          throw cms::Exception("StreamTranslation","Registry serialization failed")
            << "StreamSerializer module got an unknown error code\n"
            << " while attempting to serialize registry\n";
          break;
      }
    }

   data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
   data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
   data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
   // calculate the adler32 checksum and fill it into the struct
   data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
   //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
   return data_buffer.curr_space_used_;
  }

Member Data Documentation

Definition at line 96 of file StreamSerializer.h.

Referenced by serializeEvent(), and serializeRegistry().

TClass* edm::StreamSerializer::tc_ [private]

Definition at line 98 of file StreamSerializer.h.

Referenced by serializeEvent().