CMS 3D CMS Logo

StreamSerializer.cc
Go to the documentation of this file.
1 
23 
24 #include "zlib.h"
25 #include <algorithm>
26 #include <cstdlib>
27 #include <iostream>
28 #include <vector>
29 
30 namespace edm {
31 
36  selections_(selections),
37  tc_(getTClass(typeid(SendEvent))) {
38  }
39 
46  const BranchIDLists &branchIDLists,
47  ThinnedAssociationsHelper const& thinnedAssociationsHelper) {
48  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
50 
51  FDEBUG(9) << "Product List: " << std::endl;
52 
53 
54  for(auto const& selection : *selections_) {
55  sd.push_back(*selection.first);
56  FDEBUG(9) << "StreamOutput got product = " << selection.first->className()
57  << std::endl;
58  }
60  sd.setBranchIDLists(branchIDLists);
61  sd.setThinnedAssociationsHelper(thinnedAssociationsHelper);
63 
65  sd.setParameterSetMap(psetMap);
66 
67  data_buffer.rootbuf_.Reset();
68 
69  RootDebug tracer(10,10);
70 
71  TClass* tc = getTClass(typeid(SendJobHeader));
72  int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
73 
74  switch(bres) {
75  case 0: // failure
76  {
77  throw cms::Exception("StreamTranslation","Registry serialization failed")
78  << "StreamSerializer failed to serialize registry\n";
79  break;
80  }
81  case 1: // succcess
82  break;
83  case 2: // truncated result
84  {
85  throw cms::Exception("StreamTranslation","Registry serialization truncated")
86  << "StreamSerializer module attempted to serialize\n"
87  << "a registry that is to big for the allocated buffers\n";
88  break;
89  }
90  default: // unknown
91  {
92  throw cms::Exception("StreamTranslation","Registry serialization failed")
93  << "StreamSerializer module got an unknown error code\n"
94  << " while attempting to serialize registry\n";
95  break;
96  }
97  }
98 
99  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
100  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
101  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
102  // calculate the adler32 checksum and fill it into the struct
103  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
104  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
105  return data_buffer.curr_space_used_;
106  }
107 
129  ParameterSetID const& selectorConfig,
130  bool use_compression, int compression_level,
131  SerializeDataBuffer& data_buffer) {
132 
133  EventSelectionIDVector selectionIDs = event.eventSelectionIDs();
134  selectionIDs.push_back(selectorConfig);
135  SendEvent se(event.eventAuxiliary(), event.processHistory(), selectionIDs, event.branchListIndexes());
136 
137  // Loop over EDProducts, fill the provenance, and write.
138 
139  // Historical note. I fixed two bugs in the code below in
140  // March 2017. One would have caused any Parentage written
141  // using the Streamer output module to be total nonsense
142  // prior to the fix. The other would have caused seg faults
143  // when the Parentage was dropped in an earlier process.
144 
145  // FIX ME. The code below stores the direct parentage of
146  // kept products, but it does not save the parentage of
147  // dropped objects that are ancestors of kept products like
148  // the PoolOutputModule. That information is currently
149  // lost when the streamer output module is used.
150 
151  for(auto const& selection : *selections_) {
152  BranchDescription const& desc = *selection.first;
154  event.getByToken(selection.second, desc.unwrappedTypeID(), result);
155  if(!result.isValid()) {
156  // No product with this ID was put in the event.
157  // Create and write the provenance.
158  se.products().push_back(StreamedProduct(desc));
159  } else {
160  if (result.provenance()->productProvenance()) {
162  assert(parentage);
163  se.products().push_back(StreamedProduct(result.wrapper(),
164  desc,
165  result.wrapper() != nullptr,
166  &parentage->parents()));
167  } else {
168  se.products().push_back(StreamedProduct(result.wrapper(),
169  desc,
170  result.wrapper() != nullptr,
171  nullptr));
172  }
173  }
174  }
175 
176  data_buffer.rootbuf_.Reset();
177  RootDebug tracer(10,10);
178 
179  //TClass* tc = getTClass(typeid(SendEvent));
180  int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
181  switch(bres) {
182  case 0: // failure
183  {
184  throw cms::Exception("StreamTranslation","Event serialization failed")
185  << "StreamSerializer failed to serialize event: "
186  << event.id();
187  break;
188  }
189  case 1: // succcess
190  break;
191  case 2: // truncated result
192  {
193  throw cms::Exception("StreamTranslation","Event serialization truncated")
194  << "StreamSerializer module attempted to serialize an event\n"
195  << "that is to big for the allocated buffers: "
196  << event.id();
197  break;
198  }
199  default: // unknown
200  {
201  throw cms::Exception("StreamTranslation","Event serialization failed")
202  << "StreamSerializer module got an unknown error code\n"
203  << " while attempting to serialize event: "
204  << event.id();
205  break;
206  }
207  }
208 
209  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
210  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
211  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
212 #if 0
213  if(data_buffer.ptr_ != data_.ptr_) {
214  std::cerr << "ROOT reset the buffer!!!!\n";
215  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
216  }
217 #endif
218  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
219  // eventMessage.eventAddr());
220  // eventMessage.setEventLength(rootbuf.Length());
221 
222  // compress before return if we need to
223  // should test if compressed already - should never be?
224  // as double compression can have problems
225  if(use_compression) {
226  unsigned int dest_size =
227  compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
228  if(dest_size != 0) {
229  data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
230  data_buffer.curr_space_used_ = dest_size;
231  }
232  }
233  // calculate the adler32 checksum and fill it into the struct
234  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
235  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
236 
237  return data_buffer.curr_space_used_;
238  }
239 
245  unsigned int
246  StreamSerializer::compressBuffer(unsigned char *inputBuffer,
247  unsigned int inputSize,
248  std::vector<unsigned char> &outputBuffer,
249  int compressionLevel) {
250  unsigned int resultSize = 0;
251 
252  // what are these magic numbers? (jbk)
253  unsigned long dest_size = (unsigned long)(double(inputSize)*
254  1.002 + 1.0) + 12;
255  if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
256 
257  // compression 1-9, 6 is zlib default, 0 none
258  int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
259  inputSize, compressionLevel);
260 
261  // check status
262  if(ret == Z_OK) {
263  // return the correct length
264  resultSize = dest_size;
265 
266  FDEBUG(1) << " original size = " << inputSize
267  << " final size = " << dest_size
268  << " ratio = " << double(dest_size)/double(inputSize)
269  << std::endl;
270  } else {
271  // compression failed, return a size of zero
272  FDEBUG(9) << "Compression Return value: " << ret
273  << " Okay = " << Z_OK << std::endl;
274  // do we throw an exception here?
275  std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
276 
277  }
278 
279  return resultSize;
280  }
281 }
void setThinnedAssociationsHelper(ThinnedAssociationsHelper const &v)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
EventAuxiliary const & eventAuxiliary() const
edm::propagate_const< TClass * > tc_
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
selection
main part
Definition: corrVsCorr.py:98
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:31
edm::propagate_const< unsigned char * > ptr_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
std::vector< BranchID > const & parents() const
Definition: Parentage.h:44
std::vector< unsigned char > comp_buf_
Provenance const * provenance() const
Definition: BasicHandle.h:94
unsigned int curr_event_size_
bool getMapped(key_type const &k, value_type &result) const
TypeID unwrappedTypeID() const
WrapperBase const * wrapper() const
Definition: BasicHandle.h:90
unsigned char const * bufferPointer() const
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
double sd
TClass * getTClass(const std::type_info &ti)
bool isValid() const
Definition: BasicHandle.h:82
ParentageID const & parentageID() const
int serializeEvent(EventForOutput const &event, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer)
HLT enums.
void setBranchIDLists(BranchIDLists const &bidlists)
void push_back(BranchDescription const &bd)
static ParentageRegistry * instance()
void fillMap(regmap_type &fillme) const
Definition: Registry.cc:49
static Registry * instance()
Definition: Registry.cc:12
Definition: event.py:1
void setParameterSetMap(ParameterSetMap const &psetMap)
StreamSerializer(SelectedProducts const *selections)