CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Static Public Member Functions | Private Attributes
edm::StreamSerializer Class Reference

#include <StreamSerializer.h>

Public Member Functions

int serializeEvent (EventPrincipal const &eventPrincipal, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer, ModuleCallingContext const *mcc)
 
int serializeRegistry (SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
 
 StreamSerializer (SelectedProducts const *selections)
 

Static Public Member Functions

static unsigned int compressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
 

Private Attributes

SelectedProducts const * selections_
 
TClass * tc_
 

Detailed Description

Definition at line 66 of file StreamSerializer.h.

Constructor & Destructor Documentation

edm::StreamSerializer::StreamSerializer ( SelectedProducts const *  selections)

Creates a translator instance for the specified product registry.

Definition at line 36 of file StreamSerializer.cc.

36  :
37  selections_(selections),
38  tc_(getTClass(typeid(SendEvent))) {
39  }
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:79
SelectedProducts const * selections_

Member Function Documentation

unsigned int edm::StreamSerializer::compressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
int  compressionLevel 
)
static

Compresses the data in the specified input buffer into the specified output buffer. Returns the size of the compressed data or zero if compression failed.

Definition at line 231 of file StreamSerializer.cc.

References ecal_dqm_sourceclient-live_cfg::cerr, FDEBUG, and run_regression::ret.

Referenced by serializeEvent().

234  {
235  unsigned int resultSize = 0;
236 
237  // what are these magic numbers? (jbk)
238  unsigned long dest_size = (unsigned long)(double(inputSize)*
239  1.002 + 1.0) + 12;
240  if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
241 
242  // compression 1-9, 6 is zlib default, 0 none
243  int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
244  inputSize, compressionLevel);
245 
246  // check status
247  if(ret == Z_OK) {
248  // return the correct length
249  resultSize = dest_size;
250 
251  FDEBUG(1) << " original size = " << inputSize
252  << " final size = " << dest_size
253  << " ratio = " << double(dest_size)/double(inputSize)
254  << std::endl;
255  } else {
256  // compression failed, return a size of zero
257  FDEBUG(9) << "Compression Return value: " << ret
258  << " Okay = " << Z_OK << std::endl;
259  // do we throw an exception here?
260  std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
261 
262  }
263 
264  return resultSize;
265  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
int edm::StreamSerializer::serializeEvent ( EventPrincipal const &  eventPrincipal,
ParameterSetID const &  selectorConfig,
bool  use_compression,
int  compression_level,
SerializeDataBuffer data_buffer,
ModuleCallingContext const *  mcc 
)

Serializes the specified event into the specified event message.

make a char* as a data member, tell ROOT to not adapt it, but still use it. initialize it to 1M, let ROOT resize if it wants, then delete it in the dtor.

change the call to not take an eventMessage, add a member function to return the address of the place that ROOT wrote the serialized data.

return the length of the serialized object and the actual length if compression has been done (may want to cache these lengths in this object instead.

the caller will need to copy the data from this object to its final destination in the EventMsgBuilder.

Definition at line 129 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, assert(), edm::EventPrincipal::aux(), edm::BranchDescription::branchID(), edm::EventPrincipal::branchListIndexes(), ecal_dqm_sourceclient-live_cfg::cerr, SerializeDataBuffer::comp_buf_, compressBuffer(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, edm::EventPrincipal::eventSelectionIDs(), Exception, newFWLiteAna::found, edm::Principal::getForOutput(), edm::ParentageRegistry::getMapped(), i, edm::EventPrincipal::id(), edm::ParentageRegistry::instance(), edm::ProductProvenance::parentageID(), edm::Parentage::parents(), edm::Principal::processHistory(), edm::OutputHandle::productProvenance(), SerializeDataBuffer::ptr_, SerializeDataBuffer::rootbuf_, selections_, tc_, and edm::OutputHandle::wrapper().

Referenced by edm::StreamerOutputModuleBase::serializeEvent().

133  {
134  Parentage parentage;
135 
136  EventSelectionIDVector selectionIDs = eventPrincipal.eventSelectionIDs();
137  selectionIDs.push_back(selectorConfig);
138  SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory(), selectionIDs, eventPrincipal.branchListIndexes());
139 
140  // Loop over EDProducts, fill the provenance, and write.
141 
142  for(SelectedProducts::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) {
143  BranchDescription const& desc = **i;
144  BranchID const& id = desc.branchID();
145 
146  OutputHandle const oh = eventPrincipal.getForOutput(id, true, mcc);
147  if(!oh.productProvenance()) {
148  // No product with this ID was put in the event.
149  // Create and write the provenance.
150  se.products().push_back(StreamedProduct(desc));
151  } else {
152  bool found = ParentageRegistry::instance()->getMapped(oh.productProvenance()->parentageID(), parentage);
153  assert(found);
154  se.products().push_back(StreamedProduct(oh.wrapper(),
155  desc,
156  oh.wrapper() != 0,
157  &parentage.parents()));
158  }
159  }
160 
161  data_buffer.rootbuf_.Reset();
162  RootDebug tracer(10,10);
163 
164  //TClass* tc = getTClass(typeid(SendEvent));
165  int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
166  switch(bres) {
167  case 0: // failure
168  {
169  throw cms::Exception("StreamTranslation","Event serialization failed")
170  << "StreamSerializer failed to serialize event: "
171  << eventPrincipal.id();
172  break;
173  }
174  case 1: // succcess
175  break;
176  case 2: // truncated result
177  {
178  throw cms::Exception("StreamTranslation","Event serialization truncated")
179  << "StreamSerializer module attempted to serialize an event\n"
180  << "that is to big for the allocated buffers: "
181  << eventPrincipal.id();
182  break;
183  }
184  default: // unknown
185  {
186  throw cms::Exception("StreamTranslation","Event serialization failed")
187  << "StreamSerializer module got an unknown error code\n"
188  << " while attempting to serialize event: "
189  << eventPrincipal.id();
190  break;
191  }
192  }
193 
194  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
195  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
196  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
197 #if 0
198  if(data_buffer.ptr_ != data_.ptr_) {
199  std::cerr << "ROOT reset the buffer!!!!\n";
200  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
201  }
202 #endif
203  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
204  // eventMessage.eventAddr());
205  // eventMessage.setEventLength(rootbuf.Length());
206 
207  // compress before return if we need to
208  // should test if compressed already - should never be?
209  // as double compression can have problems
210  if(use_compression) {
211  unsigned int dest_size =
212  compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
213  if(dest_size != 0) {
214  data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
215  data_buffer.curr_space_used_ = dest_size;
216  }
217  }
218  // calculate the adler32 checksum and fill it into the struct
219  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
220  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
221 
222  return data_buffer.curr_space_used_;
223  }
int i
Definition: DBlmapReader.cc:9
assert(m_qm.get())
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< unsigned char > comp_buf_
unsigned int curr_event_size_
bool getMapped(key_type const &k, value_type &result) const
tuple compression_level
Definition: fu_pp.py:86
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
tuple use_compression
Definition: fu_pp.py:85
unsigned char * ptr_
static ParentageRegistry * instance()
int edm::StreamSerializer::serializeRegistry ( SerializeDataBuffer data_buffer,
const BranchIDLists branchIDLists,
ThinnedAssociationsHelper const &  thinnedAssociationsHelper 
)

Serializes the product registry (that was specified to the constructor) into the specified InitMessage.

Definition at line 46 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, Exception, FDEBUG, edm::pset::Registry::fillMap(), edm::getTClass(), edm::pset::Registry::instance(), SerializeDataBuffer::ptr_, edm::SendJobHeader::push_back(), SerializeDataBuffer::rootbuf_, sd, corrVsCorr::selection, selections_, edm::SendJobHeader::setBranchIDLists(), edm::SendJobHeader::setParameterSetMap(), and edm::SendJobHeader::setThinnedAssociationsHelper().

Referenced by edm::StreamerOutputModuleBase::serializeRegistry().

48  {
49  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
50  SendJobHeader sd;
51 
52  FDEBUG(9) << "Product List: " << std::endl;
53 
54 
55  for(auto const& selection : *selections_) {
56  sd.push_back(*selection);
57  FDEBUG(9) << "StreamOutput got product = " << selection->className()
58  << std::endl;
59  }
61  sd.setBranchIDLists(branchIDLists);
62  sd.setThinnedAssociationsHelper(thinnedAssociationsHelper);
64 
66  sd.setParameterSetMap(psetMap);
67 
68  data_buffer.rootbuf_.Reset();
69 
70  RootDebug tracer(10,10);
71 
72  TClass* tc = getTClass(typeid(SendJobHeader));
73  int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
74 
75  switch(bres) {
76  case 0: // failure
77  {
78  throw cms::Exception("StreamTranslation","Registry serialization failed")
79  << "StreamSerializer failed to serialize registry\n";
80  break;
81  }
82  case 1: // succcess
83  break;
84  case 2: // truncated result
85  {
86  throw cms::Exception("StreamTranslation","Registry serialization truncated")
87  << "StreamSerializer module attempted to serialize\n"
88  << "a registry that is to big for the allocated buffers\n";
89  break;
90  }
91  default: // unknown
92  {
93  throw cms::Exception("StreamTranslation","Registry serialization failed")
94  << "StreamSerializer module got an unknown error code\n"
95  << " while attempting to serialize registry\n";
96  break;
97  }
98  }
99 
100  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
101  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
102  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
103  // calculate the adler32 checksum and fill it into the struct
104  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
105  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
106  return data_buffer.curr_space_used_;
107  }
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
selection
main part
Definition: corrVsCorr.py:98
#define FDEBUG(lev)
Definition: DebugMacros.h:18
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:79
unsigned int curr_event_size_
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
double sd
unsigned char * ptr_
void fillMap(regmap_type &fillme) const
Definition: Registry.cc:49
static Registry * instance()
Definition: Registry.cc:16

Member Data Documentation

SelectedProducts const* edm::StreamSerializer::selections_
private

Definition at line 95 of file StreamSerializer.h.

Referenced by serializeEvent(), and serializeRegistry().

TClass* edm::StreamSerializer::tc_
private

Definition at line 96 of file StreamSerializer.h.

Referenced by serializeEvent().