CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Private Attributes
edm::StreamSerializer Class Reference

#include <StreamSerializer.h>

Public Member Functions

int serializeEvent (EventForOutput const &event, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer) const
 
int serializeRegistry (SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
 
 StreamSerializer (SelectedProducts const *selections)
 

Static Public Member Functions

static unsigned int compressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
 

Private Attributes

SelectedProducts const * selections_
 
edm::propagate_const< TClass * > tc_
 

Detailed Description

Definition at line 65 of file StreamSerializer.h.

Constructor & Destructor Documentation

edm::StreamSerializer::StreamSerializer ( SelectedProducts const *  selections)

Creates a translator instance for the specified product registry.

Definition at line 35 of file StreamSerializer.cc.

36  : selections_(selections), tc_(getTClass(typeid(SendEvent))) {}
edm::propagate_const< TClass * > tc_
SelectedProducts const * selections_
TClass * getTClass(const std::type_info &ti)

Member Function Documentation

unsigned int edm::StreamSerializer::compressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
int  compressionLevel 
)
static

Compresses the data in the specified input buffer into the specified output buffer. Returns the size of the compressed data or zero if compression failed.

Definition at line 233 of file StreamSerializer.cc.

References MessageLogger_cfi::cerr, and FDEBUG.

Referenced by serializeEvent().

236  {
237  unsigned int resultSize = 0;
238 
239  // what are these magic numbers? (jbk)
240  unsigned long dest_size = (unsigned long)(double(inputSize) * 1.002 + 1.0) + 12;
241  if (outputBuffer.size() < dest_size)
242  outputBuffer.resize(dest_size);
243 
244  // compression 1-9, 6 is zlib default, 0 none
245  int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer, inputSize, compressionLevel);
246 
247  // check status
248  if (ret == Z_OK) {
249  // return the correct length
250  resultSize = dest_size;
251 
252  FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
253  << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
254  } else {
255  // compression failed, return a size of zero
256  FDEBUG(9) << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
257  // do we throw an exception here?
258  std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
259  }
260 
261  return resultSize;
262  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
int edm::StreamSerializer::serializeEvent ( EventForOutput const &  event,
ParameterSetID const &  selectorConfig,
bool  use_compression,
int  compression_level,
SerializeDataBuffer data_buffer 
) const

Serializes the specified event into the specified event message.

make a char* as a data member, tell ROOT to not adapt it, but still use it. initialize it to 1M, let ROOT resize if it wants, then delete it in the dtor.

change the call to not take an eventMessage, add a member function to return the address of the place that ROOT wrote the serialized data.

return the length of the serialized object and the actual length if compression has been done (may want to cache these lengths in this object instead.

the caller will need to copy the data from this object to its final destination in the EventMsgBuilder.

Definition at line 124 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, SerializeDataBuffer::bufferPointer(), MessageLogger_cfi::cerr, SerializeDataBuffer::comp_buf_, compressBuffer(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, edm::EventForOutput::eventAuxiliary(), Exception, edm::ParentageRegistry::getMapped(), edm::ParentageRegistry::instance(), edm::BasicHandle::isValid(), edm::ProductProvenance::parentageID(), edm::Parentage::parents(), edm::Provenance::productProvenance(), edm::BasicHandle::provenance(), SerializeDataBuffer::ptr_, mps_fire::result, SerializeDataBuffer::rootbuf_, corrVsCorr::selection, selections_, tc_, edm::BranchDescription::unwrappedTypeID(), and edm::BasicHandle::wrapper().

Referenced by edm::StreamerOutputModuleCommon::serializeEvent().

128  {
129  EventSelectionIDVector selectionIDs = event.eventSelectionIDs();
130  selectionIDs.push_back(selectorConfig);
131  SendEvent se(event.eventAuxiliary(), event.processHistory(), selectionIDs, event.branchListIndexes());
132 
133  // Loop over EDProducts, fill the provenance, and write.
134 
135  // Historical note. I fixed two bugs in the code below in
136  // March 2017. One would have caused any Parentage written
137  // using the Streamer output module to be total nonsense
138  // prior to the fix. The other would have caused seg faults
139  // when the Parentage was dropped in an earlier process.
140 
141  // FIX ME. The code below stores the direct parentage of
142  // kept products, but it does not save the parentage of
143  // dropped objects that are ancestors of kept products like
144  // the PoolOutputModule. That information is currently
145  // lost when the streamer output module is used.
146 
147  for (auto const& selection : *selections_) {
148  BranchDescription const& desc = *selection.first;
149  BasicHandle result = event.getByToken(selection.second, desc.unwrappedTypeID());
150  if (!result.isValid()) {
151  // No product with this ID was put in the event.
152  // Create and write the provenance.
153  se.products().push_back(StreamedProduct(desc));
154  } else {
155  if (result.provenance()->productProvenance()) {
156  Parentage const* parentage =
157  ParentageRegistry::instance()->getMapped(result.provenance()->productProvenance()->parentageID());
158  assert(parentage);
159  se.products().push_back(
160  StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, &parentage->parents()));
161  } else {
162  se.products().push_back(StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, nullptr));
163  }
164  }
165  }
166 
167  data_buffer.rootbuf_.Reset();
168  RootDebug tracer(10, 10);
169 
170  //TClass* tc = getTClass(typeid(SendEvent));
171  int bres = data_buffer.rootbuf_.WriteObjectAny(&se, tc_);
172  switch (bres) {
173  case 0: // failure
174  {
175  throw cms::Exception("StreamTranslation", "Event serialization failed")
176  << "StreamSerializer failed to serialize event: " << event.id();
177  break;
178  }
179  case 1: // succcess
180  break;
181  case 2: // truncated result
182  {
183  throw cms::Exception("StreamTranslation", "Event serialization truncated")
184  << "StreamSerializer module attempted to serialize an event\n"
185  << "that is to big for the allocated buffers: " << event.id();
186  break;
187  }
188  default: // unknown
189  {
190  throw cms::Exception("StreamTranslation", "Event serialization failed")
191  << "StreamSerializer module got an unknown error code\n"
192  << " while attempting to serialize event: " << event.id();
193  break;
194  }
195  }
196 
197  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
198  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
199  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
200 #if 0
201  if(data_buffer.ptr_ != data_.ptr_) {
202  std::cerr << "ROOT reset the buffer!!!!\n";
203  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
204  }
205 #endif
206  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
207  // eventMessage.eventAddr());
208  // eventMessage.setEventLength(rootbuf.Length());
209 
210  // compress before return if we need to
211  // should test if compressed already - should never be?
212  // as double compression can have problems
213  if (use_compression) {
214  unsigned int dest_size =
215  compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
216  if (dest_size != 0) {
217  data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
218  data_buffer.curr_space_used_ = dest_size;
219  }
220  }
221  // calculate the adler32 checksum and fill it into the struct
222  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
223  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
224 
225  return data_buffer.curr_space_used_;
226  }
edm::propagate_const< TClass * > tc_
selection
main part
Definition: corrVsCorr.py:100
edm::propagate_const< unsigned char * > ptr_
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< unsigned char > comp_buf_
unsigned int curr_event_size_
bool getMapped(key_type const &k, value_type &result) const
unsigned char const * bufferPointer() const
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
static ParentageRegistry * instance()
Definition: event.py:1
int edm::StreamSerializer::serializeRegistry ( SerializeDataBuffer data_buffer,
const BranchIDLists branchIDLists,
ThinnedAssociationsHelper const &  thinnedAssociationsHelper 
)

Serializes the product registry (that was specified to the constructor) into the specified InitMessage.

Definition at line 43 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, SerializeDataBuffer::bufferPointer(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, Exception, FDEBUG, edm::pset::Registry::fillMap(), edm::getTClass(), edm::pset::Registry::instance(), SerializeDataBuffer::ptr_, edm::SendJobHeader::push_back(), SerializeDataBuffer::rootbuf_, sd, corrVsCorr::selection, selections_, edm::SendJobHeader::setBranchIDLists(), edm::SendJobHeader::setParameterSetMap(), and edm::SendJobHeader::setThinnedAssociationsHelper().

Referenced by edm::StreamerOutputModuleCommon::serializeRegistry().

45  {
46  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
47  SendJobHeader sd;
48 
49  FDEBUG(9) << "Product List: " << std::endl;
50 
51  for (auto const& selection : *selections_) {
52  sd.push_back(*selection.first);
53  FDEBUG(9) << "StreamOutput got product = " << selection.first->className() << std::endl;
54  }
56  sd.setBranchIDLists(branchIDLists);
57  sd.setThinnedAssociationsHelper(thinnedAssociationsHelper);
59 
61  sd.setParameterSetMap(psetMap);
62 
63  data_buffer.rootbuf_.Reset();
64 
65  RootDebug tracer(10, 10);
66 
67  TClass* tc = getTClass(typeid(SendJobHeader));
68  int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
69 
70  switch (bres) {
71  case 0: // failure
72  {
73  throw cms::Exception("StreamTranslation", "Registry serialization failed")
74  << "StreamSerializer failed to serialize registry\n";
75  break;
76  }
77  case 1: // succcess
78  break;
79  case 2: // truncated result
80  {
81  throw cms::Exception("StreamTranslation", "Registry serialization truncated")
82  << "StreamSerializer module attempted to serialize\n"
83  << "a registry that is to big for the allocated buffers\n";
84  break;
85  }
86  default: // unknown
87  {
88  throw cms::Exception("StreamTranslation", "Registry serialization failed")
89  << "StreamSerializer module got an unknown error code\n"
90  << " while attempting to serialize registry\n";
91  break;
92  }
93  }
94 
95  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
96  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
97  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
98  // calculate the adler32 checksum and fill it into the struct
99  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
100  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
101  return data_buffer.curr_space_used_;
102  }
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
selection
main part
Definition: corrVsCorr.py:100
edm::propagate_const< unsigned char * > ptr_
unsigned int curr_event_size_
unsigned char const * bufferPointer() const
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
double sd
TClass * getTClass(const std::type_info &ti)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void fillMap(regmap_type &fillme) const
Definition: Registry.cc:42
static Registry * instance()
Definition: Registry.cc:12

Member Data Documentation

SelectedProducts const* edm::StreamSerializer::selections_
private

Definition at line 90 of file StreamSerializer.h.

Referenced by serializeEvent(), and serializeRegistry().

edm::propagate_const<TClass*> edm::StreamSerializer::tc_
private

Definition at line 91 of file StreamSerializer.h.

Referenced by serializeEvent().