CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Static Public Member Functions | Private Attributes
edm::StreamSerializer Class Reference

#include <StreamSerializer.h>

Public Member Functions

int serializeEvent (EventPrincipal const &eventPrincipal, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer)
 
int serializeRegistry (SerializeDataBuffer &data_buffer)
 
 StreamSerializer (Selections const *selections)
 

Static Public Member Functions

static unsigned int compressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
 

Private Attributes

Selections const * selections_
 
TClass * tc_
 

Detailed Description

Definition at line 62 of file StreamSerializer.h.

Constructor & Destructor Documentation

edm::StreamSerializer::StreamSerializer ( Selections const *  selections)

Creates a translator instance for the specified product registry.

Definition at line 35 of file StreamSerializer.cc.

35  :
36  selections_(selections),
37  tc_(getTClass(typeid(SendEvent))) {
38  }
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:87
Selections const * selections_

Member Function Documentation

unsigned int edm::StreamSerializer::compressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
int  compressionLevel 
)
static

Compresses the data in the specified input buffer into the specified output buffer. Returns the size of the compressed data or zero if compression failed.

Definition at line 238 of file StreamSerializer.cc.

References dtNoiseDBValidation_cfg::cerr, FDEBUG, and run_regression::ret.

Referenced by edm::StreamDQMSerializer::serializeDQMEvent(), and serializeEvent().

241  {
242  unsigned int resultSize = 0;
243 
244  // what are these magic numbers? (jbk)
245  unsigned long dest_size = (unsigned long)(double(inputSize)*
246  1.002 + 1.0) + 12;
247  if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
248 
249  // compression 1-9, 6 is zlib default, 0 none
250  int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
251  inputSize, compressionLevel);
252 
253  // check status
254  if(ret == Z_OK) {
255  // return the correct length
256  resultSize = dest_size;
257 
258  FDEBUG(1) << " original size = " << inputSize
259  << " final size = " << dest_size
260  << " ratio = " << double(dest_size)/double(inputSize)
261  << std::endl;
262  } else {
263  // compression failed, return a size of zero
264  FDEBUG(9) << "Compression Return value: " << ret
265  << " Okay = " << Z_OK << std::endl;
266  // do we throw an exception here?
267  std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
268 
269  }
270 
271  return resultSize;
272  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
int edm::StreamSerializer::serializeEvent ( EventPrincipal const &  eventPrincipal,
ParameterSetID const &  selectorConfig,
bool  use_compression,
int  compression_level,
SerializeDataBuffer data_buffer 
)

Serializes the specified event into the specified event message.

make a char* as a data member, tell ROOT to not adapt it, but still use it. initialize it to 1M, let ROOT resize if it wants, then delete it in the dtor.

change the call to not take an eventMessage, add a member function to return the address of the place that ROOT wrote the serialized data.

return the length of the serialized object and the actual length if compression has been done (may want to cache these lengths in this object instead.

the caller will need to copy the data from this object to its final destination in the EventMsgBuilder.

Definition at line 136 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, edm::EventPrincipal::aux(), Selections::begin(), edm::BranchDescription::branchID(), edm::EventPrincipal::branchListIndexes(), dtNoiseDBValidation_cfg::cerr, SerializeDataBuffer::comp_buf_, compressBuffer(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, Selections::end(), edm::EventPrincipal::eventSelectionIDs(), edm::hlt::Exception, newFWLiteAna::found, edm::Principal::getForOutput(), edm::detail::ThreadSafeRegistry< KEY, T, E >::getMapped(), i, edm::EventPrincipal::id(), edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), edm::ProductProvenance::parentageID(), edm::Parentage::parents(), edm::Principal::processHistory(), edm::OutputHandle::productProvenance(), SerializeDataBuffer::ptr_, SerializeDataBuffer::rootbuf_, selections_, tc_, and edm::OutputHandle::wrapper().

Referenced by edm::StreamerOutputModuleBase::serializeEvent().

139  {
140  Parentage parentage;
141 
142  EventSelectionIDVector selectionIDs = eventPrincipal.eventSelectionIDs();
143  selectionIDs.push_back(selectorConfig);
144  SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory(), selectionIDs, eventPrincipal.branchListIndexes());
145 
146  Selections::const_iterator i(selections_->begin()),ie(selections_->end());
147  // Loop over EDProducts, fill the provenance, and write.
148 
149  for(Selections::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) {
150  BranchDescription const& desc = **i;
151  BranchID const& id = desc.branchID();
152 
153  OutputHandle const oh = eventPrincipal.getForOutput(id, true);
154  if(!oh.productProvenance()) {
155  // No product with this ID was put in the event.
156  // Create and write the provenance.
157  se.products().push_back(StreamedProduct(desc));
158  } else {
159  bool found = ParentageRegistry::instance()->getMapped(oh.productProvenance()->parentageID(), parentage);
160  assert(found);
161  se.products().push_back(StreamedProduct(oh.wrapper(),
162  desc,
163  oh.wrapper() != 0,
164  &parentage.parents()));
165  }
166  }
167 
168  data_buffer.rootbuf_.Reset();
169  RootDebug tracer(10,10);
170 
171  //TClass* tc = getTClass(typeid(SendEvent));
172  int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
173  switch(bres) {
174  case 0: // failure
175  {
176  throw cms::Exception("StreamTranslation","Event serialization failed")
177  << "StreamSerializer failed to serialize event: "
178  << eventPrincipal.id();
179  break;
180  }
181  case 1: // succcess
182  break;
183  case 2: // truncated result
184  {
185  throw cms::Exception("StreamTranslation","Event serialization truncated")
186  << "StreamSerializer module attempted to serialize an event\n"
187  << "that is to big for the allocated buffers: "
188  << eventPrincipal.id();
189  break;
190  }
191  default: // unknown
192  {
193  throw cms::Exception("StreamTranslation","Event serialization failed")
194  << "StreamSerializer module got an unknown error code\n"
195  << " while attempting to serialize event: "
196  << eventPrincipal.id();
197  break;
198  }
199  }
200 
201  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
202  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
203  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
204 #if 0
205  if(data_buffer.ptr_ != data_.ptr_) {
206  std::cerr << "ROOT reset the buffer!!!!\n";
207  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
208  }
209 #endif
210  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
211  // eventMessage.eventAddr());
212  // eventMessage.setEventLength(rootbuf.Length());
213 
214  // compress before return if we need to
215  // should test if compressed already - should never be?
216  // as double compression can have problems
217  if(use_compression) {
218  unsigned int dest_size =
219  compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
220  if(dest_size != 0) {
221  data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
222  data_buffer.curr_space_used_ = dest_size;
223  }
224  }
225  // calculate the adler32 checksum and fill it into the struct
226  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
227  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
228 
229  return data_buffer.curr_space_used_;
230  }
int i
Definition: DBlmapReader.cc:9
static ThreadSafeRegistry * instance()
bool getMapped(key_type const &k, value_type &result) const
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< unsigned char > comp_buf_
unsigned int curr_event_size_
iterator end()
Definition: Selections.h:367
iterator begin()
Definition: Selections.h:366
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
Selections const * selections_
unsigned char * ptr_
int edm::StreamSerializer::serializeRegistry ( SerializeDataBuffer data_buffer)

Serializes the product registry (that was specified to the constructor) into the specified InitMessage.

Definition at line 45 of file StreamSerializer.cc.

References cms::Adler32(), SerializeDataBuffer::adler32_chksum_, Selections::begin(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, data, edm::detail::ThreadSafeRegistry< KEY, T, E >::data(), alignCSCRings::e, Selections::end(), edm::hlt::Exception, FDEBUG, edm::pset::fillMap(), edm::getTClass(), i, edm::detail::ThreadSafeIndexedRegistry< T, E >::instance(), edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), SerializeDataBuffer::ptr_, edm::SendJobHeader::push_back(), SerializeDataBuffer::rootbuf_, sd, selections_, edm::SendJobHeader::setBranchIDLists(), edm::SendJobHeader::setParameterSetMap(), edm::SendJobHeader::setProcessConfigurations(), and edm::sort_all().

Referenced by edm::StreamerOutputModuleBase::serializeRegistry().

45  {
46  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
47  SendJobHeader sd;
48 
49  Selections::const_iterator i(selections_->begin()), e(selections_->end());
50 
51  FDEBUG(9) << "Product List: " << std::endl;
52 
53 
54  for(; i != e; ++i) {
55  sd.push_back(**i);
56  FDEBUG(9) << "StreamOutput got product = " << (*i)->className()
57  << std::endl;
58  }
60  sd.setBranchIDLists(BranchIDListRegistry::instance()->data());
62 
64  sd.setParameterSetMap(psetMap);
65 
67  PCMap const& procConfigMap = ProcessConfigurationRegistry::instance()->data();
68  ProcessConfigurationVector procConfigVector;
69  for(PCMap::const_iterator i = procConfigMap.begin(), e = procConfigMap.end(); i != e; ++i) {
70  procConfigVector.push_back(i->second);
71  }
72  sort_all(procConfigVector);
73  sd.setProcessConfigurations(procConfigVector);
74 
75  data_buffer.rootbuf_.Reset();
76 
77  RootDebug tracer(10,10);
78 
79  TClass* tc = getTClass(typeid(SendJobHeader));
80  int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
81 
82  switch(bres) {
83  case 0: // failure
84  {
85  throw cms::Exception("StreamTranslation","Registry serialization failed")
86  << "StreamSerializer failed to serialize registry\n";
87  break;
88  }
89  case 1: // succcess
90  break;
91  case 2: // truncated result
92  {
93  throw cms::Exception("StreamTranslation","Registry serialization truncated")
94  << "StreamSerializer module attempted to serialize\n"
95  << "a registry that is to big for the allocated buffers\n";
96  break;
97  }
98  default: // unknown
99  {
100  throw cms::Exception("StreamTranslation","Registry serialization failed")
101  << "StreamSerializer module got an unknown error code\n"
102  << " while attempting to serialize registry\n";
103  break;
104  }
105  }
106 
107  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
108  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
109  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
110  // calculate the adler32 checksum and fill it into the struct
111  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
112  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
113  return data_buffer.curr_space_used_;
114  }
int i
Definition: DBlmapReader.cc:9
static ThreadSafeRegistry * instance()
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
#define FDEBUG(lev)
Definition: DebugMacros.h:18
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:87
ProcessConfigurationRegistry::vector_type ProcessConfigurationVector
std::map< key_type, value_type > collection_type
unsigned int curr_event_size_
iterator end()
Definition: Selections.h:367
static ThreadSafeIndexedRegistry * instance()
iterator begin()
Definition: Selections.h:366
unsigned int curr_space_used_
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
double sd
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
Selections const * selections_
collection_type & data()
Provide access to the contained collection.
void fillMap(Registry *reg, regmap_type &fillme)
Definition: Registry.cc:24
unsigned char * ptr_

Member Data Documentation

Selections const* edm::StreamSerializer::selections_
private

Definition at line 87 of file StreamSerializer.h.

Referenced by serializeEvent(), and serializeRegistry().

TClass* edm::StreamSerializer::tc_
private

Definition at line 88 of file StreamSerializer.h.

Referenced by serializeEvent().