CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamSerializer.cc
Go to the documentation of this file.
1 
24 
25 #include "zlib.h"
26 #include <algorithm>
27 #include <cstdlib>
28 #include <iostream>
29 #include <vector>
30 
31 namespace edm {
32 
37  selections_(selections),
38  tc_(getTClass(typeid(SendEvent))) {
39  }
40 
47  const BranchIDLists &branchIDLists,
48  ThinnedAssociationsHelper const& thinnedAssociationsHelper) {
49  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
51 
52  FDEBUG(9) << "Product List: " << std::endl;
53 
54 
55  for(auto const& selection : *selections_) {
56  sd.push_back(*selection);
57  FDEBUG(9) << "StreamOutput got product = " << selection->className()
58  << std::endl;
59  }
61  sd.setBranchIDLists(branchIDLists);
62  sd.setThinnedAssociationsHelper(thinnedAssociationsHelper);
64 
66  sd.setParameterSetMap(psetMap);
67 
68  data_buffer.rootbuf_.Reset();
69 
70  RootDebug tracer(10,10);
71 
72  TClass* tc = getTClass(typeid(SendJobHeader));
73  int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
74 
75  switch(bres) {
76  case 0: // failure
77  {
78  throw cms::Exception("StreamTranslation","Registry serialization failed")
79  << "StreamSerializer failed to serialize registry\n";
80  break;
81  }
82  case 1: // succcess
83  break;
84  case 2: // truncated result
85  {
86  throw cms::Exception("StreamTranslation","Registry serialization truncated")
87  << "StreamSerializer module attempted to serialize\n"
88  << "a registry that is to big for the allocated buffers\n";
89  break;
90  }
91  default: // unknown
92  {
93  throw cms::Exception("StreamTranslation","Registry serialization failed")
94  << "StreamSerializer module got an unknown error code\n"
95  << " while attempting to serialize registry\n";
96  break;
97  }
98  }
99 
100  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
101  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
102  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
103  // calculate the adler32 checksum and fill it into the struct
104  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
105  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
106  return data_buffer.curr_space_used_;
107  }
108 
130  ParameterSetID const& selectorConfig,
132  SerializeDataBuffer &data_buffer,
133  ModuleCallingContext const* mcc) {
134  Parentage parentage;
135 
136  EventSelectionIDVector selectionIDs = eventPrincipal.eventSelectionIDs();
137  selectionIDs.push_back(selectorConfig);
138  SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory(), selectionIDs, eventPrincipal.branchListIndexes());
139 
140  // Loop over EDProducts, fill the provenance, and write.
141 
142  for(SelectedProducts::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) {
143  BranchDescription const& desc = **i;
144  BranchID const& id = desc.branchID();
145 
146  OutputHandle const oh = eventPrincipal.getForOutput(id, true, mcc);
147  if(!oh.productProvenance()) {
148  // No product with this ID was put in the event.
149  // Create and write the provenance.
150  se.products().push_back(StreamedProduct(desc));
151  } else {
153  assert(found);
154  se.products().push_back(StreamedProduct(oh.wrapper(),
155  desc,
156  oh.wrapper() != 0,
157  &parentage.parents()));
158  }
159  }
160 
161  data_buffer.rootbuf_.Reset();
162  RootDebug tracer(10,10);
163 
164  //TClass* tc = getTClass(typeid(SendEvent));
165  int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
166  switch(bres) {
167  case 0: // failure
168  {
169  throw cms::Exception("StreamTranslation","Event serialization failed")
170  << "StreamSerializer failed to serialize event: "
171  << eventPrincipal.id();
172  break;
173  }
174  case 1: // succcess
175  break;
176  case 2: // truncated result
177  {
178  throw cms::Exception("StreamTranslation","Event serialization truncated")
179  << "StreamSerializer module attempted to serialize an event\n"
180  << "that is to big for the allocated buffers: "
181  << eventPrincipal.id();
182  break;
183  }
184  default: // unknown
185  {
186  throw cms::Exception("StreamTranslation","Event serialization failed")
187  << "StreamSerializer module got an unknown error code\n"
188  << " while attempting to serialize event: "
189  << eventPrincipal.id();
190  break;
191  }
192  }
193 
194  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
195  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
196  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
197 #if 0
198  if(data_buffer.ptr_ != data_.ptr_) {
199  std::cerr << "ROOT reset the buffer!!!!\n";
200  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
201  }
202 #endif
203  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
204  // eventMessage.eventAddr());
205  // eventMessage.setEventLength(rootbuf.Length());
206 
207  // compress before return if we need to
208  // should test if compressed already - should never be?
209  // as double compression can have problems
210  if(use_compression) {
211  unsigned int dest_size =
212  compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
213  if(dest_size != 0) {
214  data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
215  data_buffer.curr_space_used_ = dest_size;
216  }
217  }
218  // calculate the adler32 checksum and fill it into the struct
219  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
220  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
221 
222  return data_buffer.curr_space_used_;
223  }
224 
230  unsigned int
231  StreamSerializer::compressBuffer(unsigned char *inputBuffer,
232  unsigned int inputSize,
233  std::vector<unsigned char> &outputBuffer,
234  int compressionLevel) {
235  unsigned int resultSize = 0;
236 
237  // what are these magic numbers? (jbk)
238  unsigned long dest_size = (unsigned long)(double(inputSize)*
239  1.002 + 1.0) + 12;
240  if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
241 
242  // compression 1-9, 6 is zlib default, 0 none
243  int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
244  inputSize, compressionLevel);
245 
246  // check status
247  if(ret == Z_OK) {
248  // return the correct length
249  resultSize = dest_size;
250 
251  FDEBUG(1) << " original size = " << inputSize
252  << " final size = " << dest_size
253  << " ratio = " << double(dest_size)/double(inputSize)
254  << std::endl;
255  } else {
256  // compression failed, return a size of zero
257  FDEBUG(9) << "Compression Return value: " << ret
258  << " Okay = " << Z_OK << std::endl;
259  // do we throw an exception here?
260  std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
261 
262  }
263 
264  return resultSize;
265  }
266 }
int i
Definition: DBlmapReader.cc:9
void setThinnedAssociationsHelper(ThinnedAssociationsHelper const &v)
WrapperBase const * wrapper() const
Definition: OutputHandle.h:89
EventSelectionIDVector const & eventSelectionIDs() const
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
assert(m_qm.get())
selection
main part
Definition: corrVsCorr.py:98
EventID const & id() const
ProductProvenance const * productProvenance() const
Definition: OutputHandle.h:97
int serializeEvent(EventPrincipal const &eventPrincipal, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer, ModuleCallingContext const *mcc)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
BranchListIndexes const & branchListIndexes() const
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:79
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
ProcessHistory const & processHistory() const
Definition: Principal.h:137
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< BranchID > const & parents() const
Definition: Parentage.h:37
std::vector< unsigned char > comp_buf_
unsigned int curr_event_size_
bool getMapped(key_type const &k, value_type &result) const
tuple compression_level
Definition: fu_pp.py:86
BranchID const & branchID() const
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
OutputHandle getForOutput(BranchID const &bid, bool getProd, ModuleCallingContext const *mcc) const
Definition: Principal.cc:726
SelectedProducts const * selections_
double sd
tuple use_compression
Definition: fu_pp.py:85
ParentageID const & parentageID() const
std::vector< BranchDescription const * > SelectedProducts
void setBranchIDLists(BranchIDLists const &bidlists)
void push_back(BranchDescription const &bd)
unsigned char * ptr_
static ParentageRegistry * instance()
EventAuxiliary const & aux() const
void fillMap(regmap_type &fillme) const
Definition: Registry.cc:49
static Registry * instance()
Definition: Registry.cc:16
void setParameterSetMap(ParameterSetMap const &psetMap)
StreamSerializer(SelectedProducts const *selections)