CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamSerializer.cc
Go to the documentation of this file.
1 
25 
26 #include "zlib.h"
27 #include <cstdlib>
28 #include <list>
29 
30 namespace edm {
31 
32  StreamSerializer::Arr::Arr(int sz):ptr_((char*)malloc(sz)) { }
33  StreamSerializer::Arr::~Arr() { free(ptr_); }
34 
39  selections_(selections),
40  tc_(getTClass(typeid(SendEvent))) {
41  }
42 
49  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
50  SendJobHeader sd;
51 
52  Selections::const_iterator i(selections_->begin()), e(selections_->end());
53 
54  FDEBUG(9) << "Product List: " << std::endl;
55 
56 
57  for(; i != e; ++i) {
58  sd.push_back(**i);
59  FDEBUG(9) << "StreamOutput got product = " << (*i)->className()
60  << std::endl;
61  }
65 
67  sd.setParameterSetMap(psetMap);
68 
70  PCMap const& procConfigMap = ProcessConfigurationRegistry::instance()->data();
71  ProcessConfigurationVector procConfigVector;
72  for (PCMap::const_iterator i = procConfigMap.begin(), e = procConfigMap.end(); i != e; ++i) {
73  procConfigVector.push_back(i->second);
74  }
75  sort_all(procConfigVector);
76  sd.setProcessConfigurations(procConfigVector);
77 
78  data_buffer.rootbuf_.Reset();
79 
80  RootDebug tracer(10,10);
81 
82  TClass* tc = getTClass(typeid(SendJobHeader));
83  int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
84 
85  switch(bres) {
86  case 0: // failure
87  {
88  throw cms::Exception("StreamTranslation","Registry serialization failed")
89  << "StreamSerializer failed to serialize registry\n";
90  break;
91  }
92  case 1: // succcess
93  break;
94  case 2: // truncated result
95  {
96  throw cms::Exception("StreamTranslation","Registry serialization truncated")
97  << "StreamSerializer module attempted to serialize\n"
98  << "a registry that is to big for the allocated buffers\n";
99  break;
100  }
101  default: // unknown
102  {
103  throw cms::Exception("StreamTranslation","Registry serialization failed")
104  << "StreamSerializer module got an unknown error code\n"
105  << " while attempting to serialize registry\n";
106  break;
107  }
108  }
109 
110  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
111  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
112  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
113  // calculate the adler32 checksum and fill it into the struct
114  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
115  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
116  return data_buffer.curr_space_used_;
117  }
118 
140  ParameterSetID const& selectorConfig,
142  SerializeDataBuffer &data_buffer) {
143  Parentage parentage;
144 
145  EventSelectionIDVector selectionIDs = eventPrincipal.eventSelectionIDs();
146  selectionIDs.push_back(selectorConfig);
147  SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory(), selectionIDs, eventPrincipal.branchListIndexes());
148 
149  Selections::const_iterator i(selections_->begin()),ie(selections_->end());
150  // Loop over EDProducts, fill the provenance, and write.
151 
152  for(Selections::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) {
153  BranchDescription const& desc = **i;
154  BranchID const& id = desc.branchID();
155 
156  OutputHandle const oh = eventPrincipal.getForOutput(id, true);
157  if (!oh.productProvenance()) {
158  // No product with this ID was put in the event.
159  // Create and write the provenance.
160  se.products().push_back(StreamedProduct(desc));
161  } else {
162  bool found = ParentageRegistry::instance()->getMapped(oh.productProvenanceSharedPtr()->parentageID(), parentage);
163  assert (found);
164  se.products().push_back(StreamedProduct(oh.wrapper(),
165  desc,
166  oh.productProvenanceSharedPtr()->productStatus(),
167  &parentage.parents()));
168  }
169  }
170 
171  data_buffer.rootbuf_.Reset();
172  RootDebug tracer(10,10);
173 
174  //TClass* tc = getTClass(typeid(SendEvent));
175  int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
176  switch(bres) {
177  case 0: // failure
178  {
179  throw cms::Exception("StreamTranslation","Event serialization failed")
180  << "StreamSerializer failed to serialize event: "
181  << eventPrincipal.id();
182  break;
183  }
184  case 1: // succcess
185  break;
186  case 2: // truncated result
187  {
188  throw cms::Exception("StreamTranslation","Event serialization truncated")
189  << "StreamSerializer module attempted to serialize an event\n"
190  << "that is to big for the allocated buffers: "
191  << eventPrincipal.id();
192  break;
193  }
194  default: // unknown
195  {
196  throw cms::Exception("StreamTranslation","Event serialization failed")
197  << "StreamSerializer module got an unknown error code\n"
198  << " while attempting to serialize event: "
199  << eventPrincipal.id();
200  break;
201  }
202  }
203 
204  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
205  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
206  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
207 #if 0
208  if(data_buffer.ptr_ != data_.ptr_) {
209  std::cerr << "ROOT reset the buffer!!!!\n";
210  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
211  }
212 #endif
213  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
214  // eventMessage.eventAddr());
215  // eventMessage.setEventLength(rootbuf.Length());
216 
217  // compress before return if we need to
218  // should test if compressed already - should never be?
219  // as double compression can have problems
220  if(use_compression) {
221  unsigned int dest_size =
222  compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
223  if(dest_size != 0) {
224  data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
225  data_buffer.curr_space_used_ = dest_size;
226  }
227  }
228  // calculate the adler32 checksum and fill it into the struct
229  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
230  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
231 
232  return data_buffer.curr_space_used_;
233  }
234 
240  unsigned int
241  StreamSerializer::compressBuffer(unsigned char *inputBuffer,
242  unsigned int inputSize,
243  std::vector<unsigned char> &outputBuffer,
244  int compressionLevel) {
245  unsigned int resultSize = 0;
246 
247  // what are these magic numbers? (jbk)
248  unsigned long dest_size = (unsigned long)(double(inputSize)*
249  1.002 + 1.0) + 12;
250  if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
251 
252  // compression 1-9, 6 is zlib default, 0 none
253  int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
254  inputSize, compressionLevel);
255 
256  // check status
257  if(ret == Z_OK) {
258  // return the correct length
259  resultSize = dest_size;
260 
261  FDEBUG(1) << " original size = " << inputSize
262  << " final size = " << dest_size
263  << " ratio = " << double(dest_size)/double(inputSize)
264  << std::endl;
265  } else {
266  // compression failed, return a size of zero
267  FDEBUG(9) << "Compression Return value: " << ret
268  << " Okay = " << Z_OK << std::endl;
269  // do we throw an exception here?
270  std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
271 
272  }
273 
274  return resultSize;
275  }
276 }
int i
Definition: DBlmapReader.cc:9
EventSelectionIDVector const & eventSelectionIDs() const
void setProcessConfigurations(std::vector< ProcessConfiguration > const &pcs)
boost::shared_ptr< ProductProvenance > productProvenanceSharedPtr() const
Definition: OutputHandle.h:101
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
EventID const & id() const
ProductProvenance const * productProvenance() const
Definition: OutputHandle.h:97
bool getMapped(key_type const &k, value_type &result) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
BranchListIndexes const & branchListIndexes() const
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:87
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
ProcessHistory const & processHistory() const
Definition: Principal.h:112
std::vector< EventSelectionID > EventSelectionIDVector
ProcessConfigurationRegistry::vector_type ProcessConfigurationVector
std::vector< BranchID > const & parents() const
Definition: Parentage.h:39
std::vector< unsigned char > comp_buf_
std::map< key_type, value_type > collection_type
unsigned int curr_event_size_
iterator end()
Definition: Selections.h:367
OutputHandle getForOutput(BranchID const &bid, bool getProd) const
Definition: Principal.cc:523
BranchID const & branchID() const
iterator begin()
Definition: Selections.h:366
StreamSerializer(Selections const *selections)
unsigned int curr_space_used_
int serializeEvent(EventPrincipal const &eventPrincipal, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
EDProduct const * wrapper() const
Definition: OutputHandle.h:89
void setBranchIDLists(BranchIDLists const &bidlists)
static ThreadSafeIndexedRegistry * instance()
Selections const * selections_
void push_back(BranchDescription const &bd)
int serializeRegistry(SerializeDataBuffer &data_buffer)
static ThreadSafeRegistry * instance()
collection_type & data()
Provide access to the contained collection.
void fillMap(Registry *reg, regmap_type &fillme)
Definition: Registry.cc:24
unsigned char * ptr_
EventAuxiliary const & aux() const
void setParameterSetMap(ParameterSetMap const &psetMap)