#include <StreamSerializer.h>
Classes | |
struct | Arr |
Public Member Functions | |
int | serializeEvent (EventPrincipal const &eventPrincipal, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer) |
int | serializeRegistry (SerializeDataBuffer &data_buffer) |
StreamSerializer (Selections const *selections) | |
Static Public Member Functions | |
static unsigned int | compressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel) |
Private Attributes | |
Selections const * | selections_ |
TClass * | tc_ |
Definition at line 62 of file StreamSerializer.h.
edm::StreamSerializer::StreamSerializer | ( | Selections const * | selections | ) |
Creates a translator instance for the specified product registry.
Definition at line 38 of file StreamSerializer.cc.
: selections_(selections), tc_(getTClass(typeid(SendEvent))) { }
unsigned int edm::StreamSerializer::compressBuffer | ( | unsigned char * | inputBuffer, |
unsigned int | inputSize, | ||
std::vector< unsigned char > & | outputBuffer, | ||
int | compressionLevel | ||
) | [static] |
Compresses the data in the specified input buffer into the specified output buffer. Returns the size of the compressed data or zero if compression failed.
Definition at line 241 of file StreamSerializer.cc.
References benchmark_cfg::cerr, FDEBUG, and runTheMatrix::ret.
Referenced by edm::StreamDQMSerializer::serializeDQMEvent(), and serializeEvent().
{ unsigned int resultSize = 0; // what are these magic numbers? (jbk) unsigned long dest_size = (unsigned long)(double(inputSize)* 1.002 + 1.0) + 12; if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size); // compression 1-9, 6 is zlib default, 0 none int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer, inputSize, compressionLevel); // check status if(ret == Z_OK) { // return the correct length resultSize = dest_size; FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size << " ratio = " << double(dest_size)/double(inputSize) << std::endl; } else { // compression failed, return a size of zero FDEBUG(9) << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl; // do we throw an exception here? std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl; } return resultSize; }
int edm::StreamSerializer::serializeEvent | ( | EventPrincipal const & | eventPrincipal, |
ParameterSetID const & | selectorConfig, | ||
bool | use_compression, | ||
int | compression_level, | ||
SerializeDataBuffer & | data_buffer | ||
) |
Serializes the specified event into the specified event message.
make a char* as a data member, tell ROOT to not adapt it, but still use it. initialize it to 1M, let ROOT resize if it wants, then delete it in the dtor.
change the call to not take an eventMessage, add a member function to return the address of the place that ROOT wrote the serialized data.
return the length of the serialized object and the actual length if compression has been done (may want to cache these lengths in this object instead.
the caller will need to copy the data from this object to its final destination in the EventMsgBuilder.
Definition at line 139 of file StreamSerializer.cc.
References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, edm::EventPrincipal::aux(), edm::BranchDescription::branchID(), edm::EventPrincipal::branchListIndexes(), benchmark_cfg::cerr, SerializeDataBuffer::comp_buf_, compressBuffer(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, edm::EventPrincipal::eventSelectionIDs(), Exception, newFWLiteAna::found, edm::Principal::getForOutput(), i, edm::EventPrincipal::id(), edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), edm::ProductProvenance::parentageID(), edm::Parentage::parents(), edm::Principal::processHistory(), edm::OutputHandle::productProvenance(), SerializeDataBuffer::ptr_, SerializeDataBuffer::rootbuf_, selections_, tc_, and edm::OutputHandle::wrapper().
Referenced by edm::StreamerOutputModuleBase::serializeEvent().
{ Parentage parentage; EventSelectionIDVector selectionIDs = eventPrincipal.eventSelectionIDs(); selectionIDs.push_back(selectorConfig); SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory(), selectionIDs, eventPrincipal.branchListIndexes()); Selections::const_iterator i(selections_->begin()),ie(selections_->end()); // Loop over EDProducts, fill the provenance, and write. for(Selections::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) { BranchDescription const& desc = **i; BranchID const& id = desc.branchID(); OutputHandle const oh = eventPrincipal.getForOutput(id, true); if(!oh.productProvenance()) { // No product with this ID was put in the event. // Create and write the provenance. se.products().push_back(StreamedProduct(desc)); } else { bool found = ParentageRegistry::instance()->getMapped(oh.productProvenance()->parentageID(), parentage); assert(found); se.products().push_back(StreamedProduct(oh.wrapper(), desc, oh.wrapper() != 0, &parentage.parents())); } } data_buffer.rootbuf_.Reset(); RootDebug tracer(10,10); //TClass* tc = getTClass(typeid(SendEvent)); int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_); switch(bres) { case 0: // failure { throw cms::Exception("StreamTranslation","Event serialization failed") << "StreamSerializer failed to serialize event: " << eventPrincipal.id(); break; } case 1: // succcess break; case 2: // truncated result { throw cms::Exception("StreamTranslation","Event serialization truncated") << "StreamSerializer module attempted to serialize an event\n" << "that is to big for the allocated buffers: " << eventPrincipal.id(); break; } default: // unknown { throw cms::Exception("StreamTranslation","Event serialization failed") << "StreamSerializer module got an unknown error code\n" << " while attempting to serialize event: " << eventPrincipal.id(); break; } } data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length(); data_buffer.curr_space_used_ = data_buffer.curr_event_size_; data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer(); #if 0 if(data_buffer.ptr_ != data_.ptr_) { std::cerr << "ROOT reset the buffer!!!!\n"; data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!! } #endif // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(), // eventMessage.eventAddr()); // eventMessage.setEventLength(rootbuf.Length()); // compress before return if we need to // should test if compressed already - should never be? // as double compression can have problems if(use_compression) { unsigned int dest_size = compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level); if(dest_size != 0) { data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area data_buffer.curr_space_used_ = dest_size; } } // calculate the adler32 checksum and fill it into the struct data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_); //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl; return data_buffer.curr_space_used_; }
int edm::StreamSerializer::serializeRegistry | ( | SerializeDataBuffer & | data_buffer | ) |
Serializes the product registry (that was specified to the constructor) into the specified InitMessage.
Definition at line 48 of file StreamSerializer.cc.
References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, AlCaHLTBitMon_QueryRunRegistry::data, Exception, FDEBUG, edm::pset::fillMap(), edm::getTClass(), i, edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), edm::detail::ThreadSafeIndexedRegistry< T, E >::instance(), SerializeDataBuffer::ptr_, edm::SendJobHeader::push_back(), SerializeDataBuffer::rootbuf_, selections_, edm::SendJobHeader::setBranchIDLists(), edm::SendJobHeader::setParameterSetMap(), edm::SendJobHeader::setProcessConfigurations(), and edm::sort_all().
Referenced by edm::StreamerOutputModuleBase::serializeRegistry().
{ FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl; SendJobHeader sd; Selections::const_iterator i(selections_->begin()), e(selections_->end()); FDEBUG(9) << "Product List: " << std::endl; for(; i != e; ++i) { sd.push_back(**i); FDEBUG(9) << "StreamOutput got product = " << (*i)->className() << std::endl; } Service<ConstProductRegistry> reg; sd.setBranchIDLists(BranchIDListRegistry::instance()->data()); SendJobHeader::ParameterSetMap psetMap; pset::fillMap(pset::Registry::instance(), psetMap); sd.setParameterSetMap(psetMap); typedef ProcessConfigurationRegistry::collection_type PCMap; PCMap const& procConfigMap = ProcessConfigurationRegistry::instance()->data(); ProcessConfigurationVector procConfigVector; for(PCMap::const_iterator i = procConfigMap.begin(), e = procConfigMap.end(); i != e; ++i) { procConfigVector.push_back(i->second); } sort_all(procConfigVector); sd.setProcessConfigurations(procConfigVector); data_buffer.rootbuf_.Reset(); RootDebug tracer(10,10); TClass* tc = getTClass(typeid(SendJobHeader)); int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc); switch(bres) { case 0: // failure { throw cms::Exception("StreamTranslation","Registry serialization failed") << "StreamSerializer failed to serialize registry\n"; break; } case 1: // succcess break; case 2: // truncated result { throw cms::Exception("StreamTranslation","Registry serialization truncated") << "StreamSerializer module attempted to serialize\n" << "a registry that is to big for the allocated buffers\n"; break; } default: // unknown { throw cms::Exception("StreamTranslation","Registry serialization failed") << "StreamSerializer module got an unknown error code\n" << " while attempting to serialize registry\n"; break; } } data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length(); data_buffer.curr_space_used_ = data_buffer.curr_event_size_; data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer(); // calculate the adler32 checksum and fill it into the struct data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_); //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl; return data_buffer.curr_space_used_; }
Selections const* edm::StreamSerializer::selections_ [private] |
Definition at line 96 of file StreamSerializer.h.
Referenced by serializeEvent(), and serializeRegistry().
TClass* edm::StreamSerializer::tc_ [private] |
Definition at line 98 of file StreamSerializer.h.
Referenced by serializeEvent().