CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Static Public Member Functions | Private Attributes
edm::StreamSerializer Class Reference

#include <StreamSerializer.h>

Public Member Functions

int serializeEvent (EventForOutput const &event, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer)
 
int serializeRegistry (SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
 
 StreamSerializer (SelectedProducts const *selections)
 

Static Public Member Functions

static unsigned int compressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
 

Private Attributes

SelectedProducts const * selections_
 
edm::propagate_const< TClass * > tc_
 

Detailed Description

Definition at line 68 of file StreamSerializer.h.

Constructor & Destructor Documentation

edm::StreamSerializer::StreamSerializer ( SelectedProducts const *  selections)

Creates a translator instance for the specified product registry.

Definition at line 35 of file StreamSerializer.cc.

35  :
36  selections_(selections),
37  tc_(getTClass(typeid(SendEvent))) {
38  }
edm::propagate_const< TClass * > tc_
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:66
SelectedProducts const * selections_

Member Function Documentation

unsigned int edm::StreamSerializer::compressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
int  compressionLevel 
)
static

Compresses the data in the specified input buffer into the specified output buffer. Returns the size of the compressed data or zero if compression failed.

Definition at line 228 of file StreamSerializer.cc.

References ecal_dqm_sourceclient-live_cfg::cerr, FDEBUG, and runTheMatrix::ret.

Referenced by serializeEvent().

231  {
232  unsigned int resultSize = 0;
233 
234  // what are these magic numbers? (jbk)
235  unsigned long dest_size = (unsigned long)(double(inputSize)*
236  1.002 + 1.0) + 12;
237  if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
238 
239  // compression 1-9, 6 is zlib default, 0 none
240  int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
241  inputSize, compressionLevel);
242 
243  // check status
244  if(ret == Z_OK) {
245  // return the correct length
246  resultSize = dest_size;
247 
248  FDEBUG(1) << " original size = " << inputSize
249  << " final size = " << dest_size
250  << " ratio = " << double(dest_size)/double(inputSize)
251  << std::endl;
252  } else {
253  // compression failed, return a size of zero
254  FDEBUG(9) << "Compression Return value: " << ret
255  << " Okay = " << Z_OK << std::endl;
256  // do we throw an exception here?
257  std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
258 
259  }
260 
261  return resultSize;
262  }
tuple ret
prodAgent to be discontinued
#define FDEBUG(lev)
Definition: DebugMacros.h:18
int edm::StreamSerializer::serializeEvent ( EventForOutput const &  event,
ParameterSetID const &  selectorConfig,
bool  use_compression,
int  compression_level,
SerializeDataBuffer data_buffer 
)

Serializes the specified event into the specified event message.

make a char* as a data member, tell ROOT to not adapt it, but still use it. initialize it to 1M, let ROOT resize if it wants, then delete it in the dtor.

change the call to not take an eventMessage, add a member function to return the address of the place that ROOT wrote the serialized data.

return the length of the serialized object and the actual length if compression has been done (may want to cache these lengths in this object instead.

the caller will need to copy the data from this object to its final destination in the EventMsgBuilder.

Definition at line 128 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, assert(), SerializeDataBuffer::bufferPointer(), ecal_dqm_sourceclient-live_cfg::cerr, SerializeDataBuffer::comp_buf_, compressBuffer(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, edm::EventForOutput::eventAuxiliary(), Exception, newFWLiteAna::found, edm::ParentageRegistry::getMapped(), edm::ParentageRegistry::instance(), edm::BasicHandle::isValid(), edm::ProductProvenance::parentageID(), edm::Parentage::parents(), edm::Provenance::productProvenance(), edm::BasicHandle::provenance(), SerializeDataBuffer::ptr_, mps_fire::result, SerializeDataBuffer::rootbuf_, corrVsCorr::selection, selections_, tc_, edm::BranchDescription::unwrappedTypeID(), and edm::BasicHandle::wrapper().

Referenced by edm::StreamerOutputModuleBase::serializeEvent().

131  {
132  Parentage parentage;
133 
134  EventSelectionIDVector selectionIDs = event.eventSelectionIDs();
135  selectionIDs.push_back(selectorConfig);
136  SendEvent se(event.eventAuxiliary(), event.processHistory(), selectionIDs, event.branchListIndexes());
137 
138  // Loop over EDProducts, fill the provenance, and write.
139 
140  for(auto const& selection : *selections_) {
141  BranchDescription const& desc = *selection.first;
142  BasicHandle result;
143  event.getByToken(selection.second, desc.unwrappedTypeID(), result);
144  if(!result.isValid()) {
145  // No product with this ID was put in the event.
146  // Create and write the provenance.
147  se.products().push_back(StreamedProduct(desc));
148  } else {
149  bool found = ParentageRegistry::instance()->getMapped(result.provenance()->productProvenance()->parentageID(), parentage);
150  assert(found);
151  se.products().push_back(StreamedProduct(result.wrapper(),
152  desc,
153  result.wrapper() != nullptr,
154  &parentage.parents()));
155  }
156  }
157 
158  data_buffer.rootbuf_.Reset();
159  RootDebug tracer(10,10);
160 
161  //TClass* tc = getTClass(typeid(SendEvent));
162  int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
163  switch(bres) {
164  case 0: // failure
165  {
166  throw cms::Exception("StreamTranslation","Event serialization failed")
167  << "StreamSerializer failed to serialize event: "
168  << event.id();
169  break;
170  }
171  case 1: // succcess
172  break;
173  case 2: // truncated result
174  {
175  throw cms::Exception("StreamTranslation","Event serialization truncated")
176  << "StreamSerializer module attempted to serialize an event\n"
177  << "that is to big for the allocated buffers: "
178  << event.id();
179  break;
180  }
181  default: // unknown
182  {
183  throw cms::Exception("StreamTranslation","Event serialization failed")
184  << "StreamSerializer module got an unknown error code\n"
185  << " while attempting to serialize event: "
186  << event.id();
187  break;
188  }
189  }
190 
191  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
192  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
193  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
194 #if 0
195  if(data_buffer.ptr_ != data_.ptr_) {
196  std::cerr << "ROOT reset the buffer!!!!\n";
197  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
198  }
199 #endif
200  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
201  // eventMessage.eventAddr());
202  // eventMessage.setEventLength(rootbuf.Length());
203 
204  // compress before return if we need to
205  // should test if compressed already - should never be?
206  // as double compression can have problems
207  if(use_compression) {
208  unsigned int dest_size =
209  compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
210  if(dest_size != 0) {
211  data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
212  data_buffer.curr_space_used_ = dest_size;
213  }
214  }
215  // calculate the adler32 checksum and fill it into the struct
216  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
217  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
218 
219  return data_buffer.curr_space_used_;
220  }
edm::propagate_const< TClass * > tc_
assert(m_qm.get())
selection
main part
Definition: corrVsCorr.py:98
edm::propagate_const< unsigned char * > ptr_
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
std::vector< EventSelectionID > EventSelectionIDVector
tuple result
Definition: mps_fire.py:84
std::vector< unsigned char > comp_buf_
unsigned int curr_event_size_
bool getMapped(key_type const &k, value_type &result) const
unsigned char const * bufferPointer() const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
static ParentageRegistry * instance()
int edm::StreamSerializer::serializeRegistry ( SerializeDataBuffer data_buffer,
const BranchIDLists branchIDLists,
ThinnedAssociationsHelper const &  thinnedAssociationsHelper 
)

Serializes the product registry (that was specified to the constructor) into the specified InitMessage.

Definition at line 45 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, SerializeDataBuffer::bufferPointer(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, Exception, FDEBUG, edm::pset::Registry::fillMap(), edm::getTClass(), edm::pset::Registry::instance(), SerializeDataBuffer::ptr_, edm::SendJobHeader::push_back(), SerializeDataBuffer::rootbuf_, sd, corrVsCorr::selection, selections_, edm::SendJobHeader::setBranchIDLists(), edm::SendJobHeader::setParameterSetMap(), and edm::SendJobHeader::setThinnedAssociationsHelper().

Referenced by edm::StreamerOutputModuleBase::serializeRegistry().

47  {
48  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
49  SendJobHeader sd;
50 
51  FDEBUG(9) << "Product List: " << std::endl;
52 
53 
54  for(auto const& selection : *selections_) {
55  sd.push_back(*selection.first);
56  FDEBUG(9) << "StreamOutput got product = " << selection.first->className()
57  << std::endl;
58  }
60  sd.setBranchIDLists(branchIDLists);
61  sd.setThinnedAssociationsHelper(thinnedAssociationsHelper);
63 
65  sd.setParameterSetMap(psetMap);
66 
67  data_buffer.rootbuf_.Reset();
68 
69  RootDebug tracer(10,10);
70 
71  TClass* tc = getTClass(typeid(SendJobHeader));
72  int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
73 
74  switch(bres) {
75  case 0: // failure
76  {
77  throw cms::Exception("StreamTranslation","Registry serialization failed")
78  << "StreamSerializer failed to serialize registry\n";
79  break;
80  }
81  case 1: // succcess
82  break;
83  case 2: // truncated result
84  {
85  throw cms::Exception("StreamTranslation","Registry serialization truncated")
86  << "StreamSerializer module attempted to serialize\n"
87  << "a registry that is to big for the allocated buffers\n";
88  break;
89  }
90  default: // unknown
91  {
92  throw cms::Exception("StreamTranslation","Registry serialization failed")
93  << "StreamSerializer module got an unknown error code\n"
94  << " while attempting to serialize registry\n";
95  break;
96  }
97  }
98 
99  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
100  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
101  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
102  // calculate the adler32 checksum and fill it into the struct
103  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.bufferPointer(), data_buffer.curr_space_used_);
104  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
105  return data_buffer.curr_space_used_;
106  }
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
selection
main part
Definition: corrVsCorr.py:98
edm::propagate_const< unsigned char * > ptr_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:66
unsigned int curr_event_size_
unsigned char const * bufferPointer() const
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
SelectedProducts const * selections_
double sd
void fillMap(regmap_type &fillme) const
Definition: Registry.cc:49
static Registry * instance()
Definition: Registry.cc:12

Member Data Documentation

SelectedProducts const* edm::StreamSerializer::selections_
private

Definition at line 95 of file StreamSerializer.h.

Referenced by serializeEvent(), and serializeRegistry().

edm::propagate_const<TClass*> edm::StreamSerializer::tc_
private

Definition at line 96 of file StreamSerializer.h.

Referenced by serializeEvent().