CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamSerializer.cc
Go to the documentation of this file.
1 
24 
25 #include "zlib.h"
26 #include <algorithm>
27 #include <cstdlib>
28 #include <list>
29 
30 namespace edm {
31 
36  selections_(selections),
37  tc_(getTClass(typeid(SendEvent))) {
38  }
39 
46  const BranchIDLists &branchIDLists,
47  ThinnedAssociationsHelper const& thinnedAssociationsHelper) {
48  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
50 
51  FDEBUG(9) << "Product List: " << std::endl;
52 
53 
54  for(auto const& selection : *selections_) {
55  sd.push_back(*selection);
56  FDEBUG(9) << "StreamOutput got product = " << selection->className()
57  << std::endl;
58  }
60  sd.setBranchIDLists(branchIDLists);
61  sd.setThinnedAssociationsHelper(thinnedAssociationsHelper);
63 
65  sd.setParameterSetMap(psetMap);
66 
67  data_buffer.rootbuf_.Reset();
68 
69  RootDebug tracer(10,10);
70 
71  TClass* tc = getTClass(typeid(SendJobHeader));
72  int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
73 
74  switch(bres) {
75  case 0: // failure
76  {
77  throw cms::Exception("StreamTranslation","Registry serialization failed")
78  << "StreamSerializer failed to serialize registry\n";
79  break;
80  }
81  case 1: // succcess
82  break;
83  case 2: // truncated result
84  {
85  throw cms::Exception("StreamTranslation","Registry serialization truncated")
86  << "StreamSerializer module attempted to serialize\n"
87  << "a registry that is to big for the allocated buffers\n";
88  break;
89  }
90  default: // unknown
91  {
92  throw cms::Exception("StreamTranslation","Registry serialization failed")
93  << "StreamSerializer module got an unknown error code\n"
94  << " while attempting to serialize registry\n";
95  break;
96  }
97  }
98 
99  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
100  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
101  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
102  // calculate the adler32 checksum and fill it into the struct
103  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
104  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
105  return data_buffer.curr_space_used_;
106  }
107 
129  ParameterSetID const& selectorConfig,
131  SerializeDataBuffer &data_buffer,
132  ModuleCallingContext const* mcc) {
133  Parentage parentage;
134 
135  EventSelectionIDVector selectionIDs = eventPrincipal.eventSelectionIDs();
136  selectionIDs.push_back(selectorConfig);
137  SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory(), selectionIDs, eventPrincipal.branchListIndexes());
138 
139  // Loop over EDProducts, fill the provenance, and write.
140 
141  for(SelectedProducts::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) {
142  BranchDescription const& desc = **i;
143  BranchID const& id = desc.branchID();
144 
145  OutputHandle const oh = eventPrincipal.getForOutput(id, true, mcc);
146  if(!oh.productProvenance()) {
147  // No product with this ID was put in the event.
148  // Create and write the provenance.
149  se.products().push_back(StreamedProduct(desc));
150  } else {
152  assert(found);
153  se.products().push_back(StreamedProduct(oh.wrapper(),
154  desc,
155  oh.wrapper() != 0,
156  &parentage.parents()));
157  }
158  }
159 
160  data_buffer.rootbuf_.Reset();
161  RootDebug tracer(10,10);
162 
163  //TClass* tc = getTClass(typeid(SendEvent));
164  int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
165  switch(bres) {
166  case 0: // failure
167  {
168  throw cms::Exception("StreamTranslation","Event serialization failed")
169  << "StreamSerializer failed to serialize event: "
170  << eventPrincipal.id();
171  break;
172  }
173  case 1: // succcess
174  break;
175  case 2: // truncated result
176  {
177  throw cms::Exception("StreamTranslation","Event serialization truncated")
178  << "StreamSerializer module attempted to serialize an event\n"
179  << "that is to big for the allocated buffers: "
180  << eventPrincipal.id();
181  break;
182  }
183  default: // unknown
184  {
185  throw cms::Exception("StreamTranslation","Event serialization failed")
186  << "StreamSerializer module got an unknown error code\n"
187  << " while attempting to serialize event: "
188  << eventPrincipal.id();
189  break;
190  }
191  }
192 
193  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
194  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
195  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
196 #if 0
197  if(data_buffer.ptr_ != data_.ptr_) {
198  std::cerr << "ROOT reset the buffer!!!!\n";
199  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
200  }
201 #endif
202  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
203  // eventMessage.eventAddr());
204  // eventMessage.setEventLength(rootbuf.Length());
205 
206  // compress before return if we need to
207  // should test if compressed already - should never be?
208  // as double compression can have problems
209  if(use_compression) {
210  unsigned int dest_size =
211  compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
212  if(dest_size != 0) {
213  data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
214  data_buffer.curr_space_used_ = dest_size;
215  }
216  }
217  // calculate the adler32 checksum and fill it into the struct
218  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
219  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
220 
221  return data_buffer.curr_space_used_;
222  }
223 
229  unsigned int
230  StreamSerializer::compressBuffer(unsigned char *inputBuffer,
231  unsigned int inputSize,
232  std::vector<unsigned char> &outputBuffer,
233  int compressionLevel) {
234  unsigned int resultSize = 0;
235 
236  // what are these magic numbers? (jbk)
237  unsigned long dest_size = (unsigned long)(double(inputSize)*
238  1.002 + 1.0) + 12;
239  if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
240 
241  // compression 1-9, 6 is zlib default, 0 none
242  int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
243  inputSize, compressionLevel);
244 
245  // check status
246  if(ret == Z_OK) {
247  // return the correct length
248  resultSize = dest_size;
249 
250  FDEBUG(1) << " original size = " << inputSize
251  << " final size = " << dest_size
252  << " ratio = " << double(dest_size)/double(inputSize)
253  << std::endl;
254  } else {
255  // compression failed, return a size of zero
256  FDEBUG(9) << "Compression Return value: " << ret
257  << " Okay = " << Z_OK << std::endl;
258  // do we throw an exception here?
259  std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
260 
261  }
262 
263  return resultSize;
264  }
265 }
int i
Definition: DBlmapReader.cc:9
void setThinnedAssociationsHelper(ThinnedAssociationsHelper const &v)
WrapperBase const * wrapper() const
Definition: OutputHandle.h:89
EventSelectionIDVector const & eventSelectionIDs() const
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
selection
main part
Definition: corrVsCorr.py:98
EventID const & id() const
ProductProvenance const * productProvenance() const
Definition: OutputHandle.h:97
int serializeEvent(EventPrincipal const &eventPrincipal, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer, ModuleCallingContext const *mcc)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
BranchListIndexes const & branchListIndexes() const
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:78
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
ProcessHistory const & processHistory() const
Definition: Principal.h:137
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< BranchID > const & parents() const
Definition: Parentage.h:37
std::vector< unsigned char > comp_buf_
unsigned int curr_event_size_
bool getMapped(key_type const &k, value_type &result) const
tuple compression_level
Definition: fu_pp.py:86
BranchID const & branchID() const
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
OutputHandle getForOutput(BranchID const &bid, bool getProd, ModuleCallingContext const *mcc) const
Definition: Principal.cc:716
SelectedProducts const * selections_
double sd
tuple use_compression
Definition: fu_pp.py:85
ParentageID const & parentageID() const
std::vector< BranchDescription const * > SelectedProducts
void setBranchIDLists(BranchIDLists const &bidlists)
void push_back(BranchDescription const &bd)
unsigned char * ptr_
static ParentageRegistry * instance()
EventAuxiliary const & aux() const
void fillMap(regmap_type &fillme) const
Definition: Registry.cc:49
static Registry * instance()
Definition: Registry.cc:16
void setParameterSetMap(ParameterSetMap const &psetMap)
StreamSerializer(SelectedProducts const *selections)