CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Private Attributes
edm::StreamSerializer Class Reference

#include <StreamSerializer.h>

Public Member Functions

int serializeEvent (EventForOutput const &event, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer)
 
int serializeRegistry (SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
 
 StreamSerializer (SelectedProducts const *selections)
 

Static Public Member Functions

static unsigned int compressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
 

Private Attributes

SelectedProducts const * selections_
 
edm::propagate_const< TClass * > tc_
 

Detailed Description

Definition at line 68 of file StreamSerializer.h.

Constructor & Destructor Documentation

edm::StreamSerializer::StreamSerializer ( SelectedProducts const *  selections)

Creates a translator instance for the specified product registry.

Definition at line 35 of file StreamSerializer.cc.

35  :
37  tc_(getTClass(typeid(SendEvent))) {
38  }
edm::propagate_const< TClass * > tc_
SelectedProducts const * selections_
TClass * getTClass(const std::type_info &ti)

Member Function Documentation

unsigned int edm::StreamSerializer::compressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
int  compressionLevel 
)
static

Compresses the data in the specified input buffer into the specified output buffer. Returns the size of the compressed data or zero if compression failed.

Definition at line 246 of file StreamSerializer.cc.

References MessageLogger_cfi::cerr, and FDEBUG.

Referenced by serializeEvent().

249  {
250  unsigned int resultSize = 0;
251 
252  // what are these magic numbers? (jbk)
253  unsigned long dest_size = (unsigned long)(double(inputSize)*
254  1.002 + 1.0) + 12;
255  if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
256 
257  // compression 1-9, 6 is zlib default, 0 none
258  int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
259  inputSize, compressionLevel);
260 
261  // check status
262  if(ret == Z_OK) {
263  // return the correct length
264  resultSize = dest_size;
265 
266  FDEBUG(1) << " original size = " << inputSize
267  << " final size = " << dest_size
268  << " ratio = " << double(dest_size)/double(inputSize)
269  << std::endl;
270  } else {
271  // compression failed, return a size of zero
272  FDEBUG(9) << "Compression Return value: " << ret
273  << " Okay = " << Z_OK << std::endl;
274  // do we throw an exception here?
275  std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
276 
277  }
278 
279  return resultSize;
280  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
int edm::StreamSerializer::serializeEvent ( EventForOutput const &  event,
ParameterSetID const &  selectorConfig,
bool  use_compression,
int  compression_level,
SerializeDataBuffer data_buffer 
)

Serializes the specified event into the specified event message.

make a char* as a data member, tell ROOT to not adapt it, but still use it. initialize it to 1M, let ROOT resize if it wants, then delete it in the dtor.

change the call to not take an eventMessage, add a member function to return the address of the place that ROOT wrote the serialized data.

return the length of the serialized object and the actual length if compression has been done (may want to cache these lengths in this object instead.

the caller will need to copy the data from this object to its final destination in the EventMsgBuilder.

Definition at line 128 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, SerializeDataBuffer::bufferPointer(), MessageLogger_cfi::cerr, SerializeDataBuffer::comp_buf_, compressBuffer(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, edm::EventForOutput::eventAuxiliary(), Exception, edm::ParentageRegistry::getMapped(), edm::ParentageRegistry::instance(), edm::BasicHandle::isValid(), edm::ProductProvenance::parentageID(), edm::Parentage::parents(), edm::Provenance::productProvenance(), edm::BasicHandle::provenance(), SerializeDataBuffer::ptr_, mps_fire::result, SerializeDataBuffer::rootbuf_, corrVsCorr::selection, selections_, tc_, edm::BranchDescription::unwrappedTypeID(), and edm::BasicHandle::wrapper().

Referenced by edm::StreamerOutputModuleBase::serializeEvent().

131  {
132 
133  EventSelectionIDVector selectionIDs = event.eventSelectionIDs();
134  selectionIDs.push_back(selectorConfig);
135  SendEvent se(event.eventAuxiliary(), event.processHistory(), selectionIDs, event.branchListIndexes());
136 
137  // Loop over EDProducts, fill the provenance, and write.
138 
139  // Historical note. I fixed two bugs in the code below in
140  // March 2017. One would have caused any Parentage written
141  // using the Streamer output module to be total nonsense
142  // prior to the fix. The other would have caused seg faults
143  // when the Parentage was dropped in an earlier process.
144 
145  // FIX ME. The code below stores the direct parentage of
146  // kept products, but it does not save the parentage of
147  // dropped objects that are ancestors of kept products like
148  // the PoolOutputModule. That information is currently
149  // lost when the streamer output module is used.
150 
151  for(auto const& selection : *selections_) {
152  BranchDescription const& desc = *selection.first;
153  BasicHandle result;
154  event.getByToken(selection.second, desc.unwrappedTypeID(), result);
155  if(!result.isValid()) {
156  // No product with this ID was put in the event.
157  // Create and write the provenance.
158  se.products().push_back(StreamedProduct(desc));
159  } else {
160  if (result.provenance()->productProvenance()) {
161  Parentage const* parentage = ParentageRegistry::instance()->getMapped(result.provenance()->productProvenance()->parentageID());
162  assert(parentage);
163  se.products().push_back(StreamedProduct(result.wrapper(),
164  desc,
165  result.wrapper() != nullptr,
166  &parentage->parents()));
167  } else {
168  se.products().push_back(StreamedProduct(result.wrapper(),
169  desc,
170  result.wrapper() != nullptr,
171  nullptr));
172  }
173  }
174  }
175 
176  data_buffer.rootbuf_.Reset();
177  RootDebug tracer(10,10);
178 
179  //TClass* tc = getTClass(typeid(SendEvent));
180  int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
181  switch(bres) {
182  case 0: // failure
183  {
184  throw cms::Exception("StreamTranslation","Event serialization failed")
185  << "StreamSerializer failed to serialize event: "
186  << event.id();
187  break;
188  }
189  case 1: // succcess
190  break;
191  case 2: // truncated result
192  {
193  throw cms::Exception("StreamTranslation","Event serialization truncated")
194  << "StreamSerializer module attempted to serialize an event\n"
195  << "that is to big for the allocated buffers: "
196  << event.id();
197  break;
198  }
199  default: // unknown
200  {
201  throw cms::Exception("StreamTranslation","Event serialization failed")
202  << "StreamSerializer module got an unknown error code\n"
203  << " while attempting to serialize event: "
204  << event.id();
205  break;
206  }
207  }
208 
209  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
210  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
211  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
212 #if 0
213  if(data_buffer.ptr_ != data_.ptr_) {
214  std::cerr << "ROOT reset the buffer!!!!\n";
215  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
216  }
217 #endif
218  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
219  // eventMessage.eventAddr());
220  // eventMessage.setEventLength(rootbuf.Length());
221 
222  // compress before return if we need to
223  // should test if compressed already - should never be?
224  // as double compression can have problems
225  if(use_compression) {
226  unsigned int dest_size =
227  compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
228  if(dest_size != 0) {
229  data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
230  data_buffer.curr_space_used_ = dest_size;
231  }
232  }
233  // calculate the adler32 checksum and fill it into the struct
234  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
235  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
236 
237  return data_buffer.curr_space_used_;
238  }
edm::propagate_const< TClass * > tc_
selection
main part
Definition: corrVsCorr.py:98
edm::propagate_const< unsigned char * > ptr_
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< unsigned char > comp_buf_
unsigned int curr_event_size_
bool getMapped(key_type const &k, value_type &result) const
unsigned char const * bufferPointer() const
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
static ParentageRegistry * instance()
Definition: event.py:1
int edm::StreamSerializer::serializeRegistry ( SerializeDataBuffer data_buffer,
const BranchIDLists branchIDLists,
ThinnedAssociationsHelper const &  thinnedAssociationsHelper 
)

Serializes the product registry (that was specified to the constructor) into the specified InitMessage.

Definition at line 45 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, SerializeDataBuffer::bufferPointer(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, Exception, FDEBUG, edm::pset::Registry::fillMap(), edm::getTClass(), edm::pset::Registry::instance(), SerializeDataBuffer::ptr_, edm::SendJobHeader::push_back(), SerializeDataBuffer::rootbuf_, sd, corrVsCorr::selection, selections_, edm::SendJobHeader::setBranchIDLists(), edm::SendJobHeader::setParameterSetMap(), and edm::SendJobHeader::setThinnedAssociationsHelper().

Referenced by edm::StreamerOutputModuleBase::serializeRegistry().

47  {
48  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
49  SendJobHeader sd;
50 
51  FDEBUG(9) << "Product List: " << std::endl;
52 
53 
54  for(auto const& selection : *selections_) {
55  sd.push_back(*selection.first);
56  FDEBUG(9) << "StreamOutput got product = " << selection.first->className()
57  << std::endl;
58  }
60  sd.setBranchIDLists(branchIDLists);
61  sd.setThinnedAssociationsHelper(thinnedAssociationsHelper);
63 
65  sd.setParameterSetMap(psetMap);
66 
67  data_buffer.rootbuf_.Reset();
68 
69  RootDebug tracer(10,10);
70 
71  TClass* tc = getTClass(typeid(SendJobHeader));
72  int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
73 
74  switch(bres) {
75  case 0: // failure
76  {
77  throw cms::Exception("StreamTranslation","Registry serialization failed")
78  << "StreamSerializer failed to serialize registry\n";
79  break;
80  }
81  case 1: // succcess
82  break;
83  case 2: // truncated result
84  {
85  throw cms::Exception("StreamTranslation","Registry serialization truncated")
86  << "StreamSerializer module attempted to serialize\n"
87  << "a registry that is to big for the allocated buffers\n";
88  break;
89  }
90  default: // unknown
91  {
92  throw cms::Exception("StreamTranslation","Registry serialization failed")
93  << "StreamSerializer module got an unknown error code\n"
94  << " while attempting to serialize registry\n";
95  break;
96  }
97  }
98 
99  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
100  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
101  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
102  // calculate the adler32 checksum and fill it into the struct
103  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
104  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
105  return data_buffer.curr_space_used_;
106  }
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
selection
main part
Definition: corrVsCorr.py:98
edm::propagate_const< unsigned char * > ptr_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
unsigned int curr_event_size_
unsigned char const * bufferPointer() const
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
double sd
TClass * getTClass(const std::type_info &ti)
void fillMap(regmap_type &fillme) const
Definition: Registry.cc:49
static Registry * instance()
Definition: Registry.cc:12

Member Data Documentation

SelectedProducts const* edm::StreamSerializer::selections_
private

Definition at line 95 of file StreamSerializer.h.

Referenced by serializeEvent(), and serializeRegistry().

edm::propagate_const<TClass*> edm::StreamSerializer::tc_
private

Definition at line 96 of file StreamSerializer.h.

Referenced by serializeEvent().