CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamSerializer.cc
Go to the documentation of this file.
1 
24 
25 #include "zlib.h"
26 #include <algorithm>
27 #include <cstdlib>
28 #include <list>
29 
30 namespace edm {
31 
36  selections_(selections),
37  tc_(getTClass(typeid(SendEvent))) {
38  }
39 
45  int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists) {
46  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
48 
49  FDEBUG(9) << "Product List: " << std::endl;
50 
51 
52  for(auto const& selection : *selections_) {
53  sd.push_back(*selection);
54  FDEBUG(9) << "StreamOutput got product = " << selection->className()
55  << std::endl;
56  }
58  sd.setBranchIDLists(branchIDLists);
60 
62  sd.setParameterSetMap(psetMap);
63 
64  data_buffer.rootbuf_.Reset();
65 
66  RootDebug tracer(10,10);
67 
68  TClass* tc = getTClass(typeid(SendJobHeader));
69  int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
70 
71  switch(bres) {
72  case 0: // failure
73  {
74  throw cms::Exception("StreamTranslation","Registry serialization failed")
75  << "StreamSerializer failed to serialize registry\n";
76  break;
77  }
78  case 1: // succcess
79  break;
80  case 2: // truncated result
81  {
82  throw cms::Exception("StreamTranslation","Registry serialization truncated")
83  << "StreamSerializer module attempted to serialize\n"
84  << "a registry that is to big for the allocated buffers\n";
85  break;
86  }
87  default: // unknown
88  {
89  throw cms::Exception("StreamTranslation","Registry serialization failed")
90  << "StreamSerializer module got an unknown error code\n"
91  << " while attempting to serialize registry\n";
92  break;
93  }
94  }
95 
96  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
97  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
98  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
99  // calculate the adler32 checksum and fill it into the struct
100  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
101  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
102  return data_buffer.curr_space_used_;
103  }
104 
126  ParameterSetID const& selectorConfig,
128  SerializeDataBuffer &data_buffer,
129  ModuleCallingContext const* mcc) {
130  Parentage parentage;
131 
132  EventSelectionIDVector selectionIDs = eventPrincipal.eventSelectionIDs();
133  selectionIDs.push_back(selectorConfig);
134  SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory(), selectionIDs, eventPrincipal.branchListIndexes());
135 
136  // Loop over EDProducts, fill the provenance, and write.
137 
138  for(SelectedProducts::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) {
139  BranchDescription const& desc = **i;
140  BranchID const& id = desc.branchID();
141 
142  OutputHandle const oh = eventPrincipal.getForOutput(id, true, mcc);
143  if(!oh.productProvenance()) {
144  // No product with this ID was put in the event.
145  // Create and write the provenance.
146  se.products().push_back(StreamedProduct(desc));
147  } else {
149  assert(found);
150  se.products().push_back(StreamedProduct(oh.wrapper(),
151  desc,
152  oh.wrapper() != 0,
153  &parentage.parents()));
154  }
155  }
156 
157  data_buffer.rootbuf_.Reset();
158  RootDebug tracer(10,10);
159 
160  //TClass* tc = getTClass(typeid(SendEvent));
161  int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
162  switch(bres) {
163  case 0: // failure
164  {
165  throw cms::Exception("StreamTranslation","Event serialization failed")
166  << "StreamSerializer failed to serialize event: "
167  << eventPrincipal.id();
168  break;
169  }
170  case 1: // succcess
171  break;
172  case 2: // truncated result
173  {
174  throw cms::Exception("StreamTranslation","Event serialization truncated")
175  << "StreamSerializer module attempted to serialize an event\n"
176  << "that is to big for the allocated buffers: "
177  << eventPrincipal.id();
178  break;
179  }
180  default: // unknown
181  {
182  throw cms::Exception("StreamTranslation","Event serialization failed")
183  << "StreamSerializer module got an unknown error code\n"
184  << " while attempting to serialize event: "
185  << eventPrincipal.id();
186  break;
187  }
188  }
189 
190  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
191  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
192  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
193 #if 0
194  if(data_buffer.ptr_ != data_.ptr_) {
195  std::cerr << "ROOT reset the buffer!!!!\n";
196  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
197  }
198 #endif
199  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
200  // eventMessage.eventAddr());
201  // eventMessage.setEventLength(rootbuf.Length());
202 
203  // compress before return if we need to
204  // should test if compressed already - should never be?
205  // as double compression can have problems
206  if(use_compression) {
207  unsigned int dest_size =
208  compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
209  if(dest_size != 0) {
210  data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
211  data_buffer.curr_space_used_ = dest_size;
212  }
213  }
214  // calculate the adler32 checksum and fill it into the struct
215  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
216  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
217 
218  return data_buffer.curr_space_used_;
219  }
220 
226  unsigned int
227  StreamSerializer::compressBuffer(unsigned char *inputBuffer,
228  unsigned int inputSize,
229  std::vector<unsigned char> &outputBuffer,
230  int compressionLevel) {
231  unsigned int resultSize = 0;
232 
233  // what are these magic numbers? (jbk)
234  unsigned long dest_size = (unsigned long)(double(inputSize)*
235  1.002 + 1.0) + 12;
236  if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
237 
238  // compression 1-9, 6 is zlib default, 0 none
239  int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
240  inputSize, compressionLevel);
241 
242  // check status
243  if(ret == Z_OK) {
244  // return the correct length
245  resultSize = dest_size;
246 
247  FDEBUG(1) << " original size = " << inputSize
248  << " final size = " << dest_size
249  << " ratio = " << double(dest_size)/double(inputSize)
250  << std::endl;
251  } else {
252  // compression failed, return a size of zero
253  FDEBUG(9) << "Compression Return value: " << ret
254  << " Okay = " << Z_OK << std::endl;
255  // do we throw an exception here?
256  std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
257 
258  }
259 
260  return resultSize;
261  }
262 }
int i
Definition: DBlmapReader.cc:9
EventSelectionIDVector const & eventSelectionIDs() const
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
selection
main part
Definition: corrVsCorr.py:98
EventID const & id() const
ProductProvenance const * productProvenance() const
Definition: OutputHandle.h:101
int serializeEvent(EventPrincipal const &eventPrincipal, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer, ModuleCallingContext const *mcc)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
BranchListIndexes const & branchListIndexes() const
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:78
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
ProcessHistory const & processHistory() const
Definition: Principal.h:139
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< BranchID > const & parents() const
Definition: Parentage.h:37
std::vector< unsigned char > comp_buf_
unsigned int curr_event_size_
bool getMapped(key_type const &k, value_type &result) const
tuple compression_level
Definition: fu_pp.py:86
void const * wrapper() const
Definition: OutputHandle.h:89
BranchID const & branchID() const
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
OutputHandle getForOutput(BranchID const &bid, bool getProd, ModuleCallingContext const *mcc) const
Definition: Principal.cc:716
SelectedProducts const * selections_
double sd
tuple use_compression
Definition: fu_pp.py:85
ParentageID const & parentageID() const
std::vector< BranchDescription const * > SelectedProducts
void setBranchIDLists(BranchIDLists const &bidlists)
void push_back(BranchDescription const &bd)
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists)
unsigned char * ptr_
static ParentageRegistry * instance()
EventAuxiliary const & aux() const
void fillMap(regmap_type &fillme) const
Definition: Registry.cc:47
static Registry * instance()
Definition: Registry.cc:14
void setParameterSetMap(ParameterSetMap const &psetMap)
StreamSerializer(SelectedProducts const *selections)