CMS 3D CMS Logo

StreamSerializer.cc
Go to the documentation of this file.
1 
23 
24 #include "zlib.h"
25 #include <algorithm>
26 #include <cstdlib>
27 #include <iostream>
28 #include <vector>
29 
30 namespace edm {
31 
36  : selections_(selections), tc_(getTClass(typeid(SendEvent))) {}
37 
44  const BranchIDLists& branchIDLists,
45  ThinnedAssociationsHelper const& thinnedAssociationsHelper) {
46  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
48 
49  FDEBUG(9) << "Product List: " << std::endl;
50 
51  for (auto const& selection : *selections_) {
52  sd.push_back(*selection.first);
53  FDEBUG(9) << "StreamOutput got product = " << selection.first->className() << std::endl;
54  }
56  sd.setBranchIDLists(branchIDLists);
57  sd.setThinnedAssociationsHelper(thinnedAssociationsHelper);
59 
61  sd.setParameterSetMap(psetMap);
62 
63  data_buffer.rootbuf_.Reset();
64 
65  RootDebug tracer(10, 10);
66 
67  TClass* tc = getTClass(typeid(SendJobHeader));
68  int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
69 
70  switch (bres) {
71  case 0: // failure
72  {
73  throw cms::Exception("StreamTranslation", "Registry serialization failed")
74  << "StreamSerializer failed to serialize registry\n";
75  break;
76  }
77  case 1: // succcess
78  break;
79  case 2: // truncated result
80  {
81  throw cms::Exception("StreamTranslation", "Registry serialization truncated")
82  << "StreamSerializer module attempted to serialize\n"
83  << "a registry that is to big for the allocated buffers\n";
84  break;
85  }
86  default: // unknown
87  {
88  throw cms::Exception("StreamTranslation", "Registry serialization failed")
89  << "StreamSerializer module got an unknown error code\n"
90  << " while attempting to serialize registry\n";
91  break;
92  }
93  }
94 
95  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
96  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
97  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
98  // calculate the adler32 checksum and fill it into the struct
99  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
100  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
101  return data_buffer.curr_space_used_;
102  }
103 
125  ParameterSetID const& selectorConfig,
126  bool use_compression,
127  int compression_level,
128  SerializeDataBuffer& data_buffer) const {
129  EventSelectionIDVector selectionIDs = event.eventSelectionIDs();
130  selectionIDs.push_back(selectorConfig);
131  SendEvent se(event.eventAuxiliary(), event.processHistory(), selectionIDs, event.branchListIndexes());
132 
133  // Loop over EDProducts, fill the provenance, and write.
134 
135  // Historical note. I fixed two bugs in the code below in
136  // March 2017. One would have caused any Parentage written
137  // using the Streamer output module to be total nonsense
138  // prior to the fix. The other would have caused seg faults
139  // when the Parentage was dropped in an earlier process.
140 
141  // FIX ME. The code below stores the direct parentage of
142  // kept products, but it does not save the parentage of
143  // dropped objects that are ancestors of kept products like
144  // the PoolOutputModule. That information is currently
145  // lost when the streamer output module is used.
146 
147  for (auto const& selection : *selections_) {
148  BranchDescription const& desc = *selection.first;
149  BasicHandle result = event.getByToken(selection.second, desc.unwrappedTypeID());
150  if (!result.isValid()) {
151  // No product with this ID was put in the event.
152  // Create and write the provenance.
153  se.products().push_back(StreamedProduct(desc));
154  } else {
155  if (result.provenance()->productProvenance()) {
156  Parentage const* parentage =
158  assert(parentage);
159  se.products().push_back(
160  StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, &parentage->parents()));
161  } else {
162  se.products().push_back(StreamedProduct(result.wrapper(), desc, result.wrapper() != nullptr, nullptr));
163  }
164  }
165  }
166 
167  data_buffer.rootbuf_.Reset();
168  RootDebug tracer(10, 10);
169 
170  //TClass* tc = getTClass(typeid(SendEvent));
171  int bres = data_buffer.rootbuf_.WriteObjectAny(&se, tc_);
172  switch (bres) {
173  case 0: // failure
174  {
175  throw cms::Exception("StreamTranslation", "Event serialization failed")
176  << "StreamSerializer failed to serialize event: " << event.id();
177  break;
178  }
179  case 1: // succcess
180  break;
181  case 2: // truncated result
182  {
183  throw cms::Exception("StreamTranslation", "Event serialization truncated")
184  << "StreamSerializer module attempted to serialize an event\n"
185  << "that is to big for the allocated buffers: " << event.id();
186  break;
187  }
188  default: // unknown
189  {
190  throw cms::Exception("StreamTranslation", "Event serialization failed")
191  << "StreamSerializer module got an unknown error code\n"
192  << " while attempting to serialize event: " << event.id();
193  break;
194  }
195  }
196 
197  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
198  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
199  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
200 #if 0
201  if(data_buffer.ptr_ != data_.ptr_) {
202  std::cerr << "ROOT reset the buffer!!!!\n";
203  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
204  }
205 #endif
206  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
207  // eventMessage.eventAddr());
208  // eventMessage.setEventLength(rootbuf.Length());
209 
210  // compress before return if we need to
211  // should test if compressed already - should never be?
212  // as double compression can have problems
213  if (use_compression) {
214  unsigned int dest_size =
215  compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
216  if (dest_size != 0) {
217  data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
218  data_buffer.curr_space_used_ = dest_size;
219  }
220  }
221  // calculate the adler32 checksum and fill it into the struct
222  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
223  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
224 
225  return data_buffer.curr_space_used_;
226  }
227 
233  unsigned int StreamSerializer::compressBuffer(unsigned char* inputBuffer,
234  unsigned int inputSize,
235  std::vector<unsigned char>& outputBuffer,
236  int compressionLevel) {
237  unsigned int resultSize = 0;
238 
239  // what are these magic numbers? (jbk)
240  unsigned long dest_size = (unsigned long)(double(inputSize) * 1.002 + 1.0) + 12;
241  if (outputBuffer.size() < dest_size)
242  outputBuffer.resize(dest_size);
243 
244  // compression 1-9, 6 is zlib default, 0 none
245  int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer, inputSize, compressionLevel);
246 
247  // check status
248  if (ret == Z_OK) {
249  // return the correct length
250  resultSize = dest_size;
251 
252  FDEBUG(1) << " original size = " << inputSize << " final size = " << dest_size
253  << " ratio = " << double(dest_size) / double(inputSize) << std::endl;
254  } else {
255  // compression failed, return a size of zero
256  FDEBUG(9) << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
257  // do we throw an exception here?
258  std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
259  }
260 
261  return resultSize;
262  }
263 } // namespace edm
void setThinnedAssociationsHelper(ThinnedAssociationsHelper const &v)
WrapperBase const * wrapper() const (true)
Definition: BasicHandle.h:82
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
EventAuxiliary const & eventAuxiliary() const
bool isValid() const (true)
Definition: BasicHandle.h:74
edm::propagate_const< TClass * > tc_
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
selection
main part
Definition: corrVsCorr.py:100
Provenance const * provenance() const (true)
Definition: BasicHandle.h:86
ProductProvenance const * productProvenance() const
Definition: Provenance.cc:35
edm::propagate_const< unsigned char * > ptr_
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
std::vector< BranchID > const & parents() const
Definition: Parentage.h:44
std::vector< unsigned char > comp_buf_
unsigned int curr_event_size_
bool getMapped(key_type const &k, value_type &result) const
TypeID unwrappedTypeID() const
unsigned char const * bufferPointer() const
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
double sd
TClass * getTClass(const std::type_info &ti)
ParentageID const & parentageID() const
int serializeEvent(EventForOutput const &event, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer) const
HLT enums.
void setBranchIDLists(BranchIDLists const &bidlists)
void push_back(BranchDescription const &bd)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
static ParentageRegistry * instance()
void fillMap(regmap_type &fillme) const
Definition: Registry.cc:42
static Registry * instance()
Definition: Registry.cc:12
Definition: event.py:1
void setParameterSetMap(ParameterSetMap const &psetMap)
StreamSerializer(SelectedProducts const *selections)