CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamSerializer.cc
Go to the documentation of this file.
1 
25 
26 #include "zlib.h"
27 #include <cstdlib>
28 #include <list>
29 
30 namespace edm {
31 
36  selections_(selections),
37  tc_(getTClass(typeid(SendEvent))) {
38  }
39 
46  FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
48 
49  Selections::const_iterator i(selections_->begin()), e(selections_->end());
50 
51  FDEBUG(9) << "Product List: " << std::endl;
52 
53 
54  for(; i != e; ++i) {
55  sd.push_back(**i);
56  FDEBUG(9) << "StreamOutput got product = " << (*i)->className()
57  << std::endl;
58  }
62 
64  sd.setParameterSetMap(psetMap);
65 
67  PCMap const& procConfigMap = ProcessConfigurationRegistry::instance()->data();
68  ProcessConfigurationVector procConfigVector;
69  for(PCMap::const_iterator i = procConfigMap.begin(), e = procConfigMap.end(); i != e; ++i) {
70  procConfigVector.push_back(i->second);
71  }
72  sort_all(procConfigVector);
73  sd.setProcessConfigurations(procConfigVector);
74 
75  data_buffer.rootbuf_.Reset();
76 
77  RootDebug tracer(10,10);
78 
79  TClass* tc = getTClass(typeid(SendJobHeader));
80  int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
81 
82  switch(bres) {
83  case 0: // failure
84  {
85  throw cms::Exception("StreamTranslation","Registry serialization failed")
86  << "StreamSerializer failed to serialize registry\n";
87  break;
88  }
89  case 1: // succcess
90  break;
91  case 2: // truncated result
92  {
93  throw cms::Exception("StreamTranslation","Registry serialization truncated")
94  << "StreamSerializer module attempted to serialize\n"
95  << "a registry that is to big for the allocated buffers\n";
96  break;
97  }
98  default: // unknown
99  {
100  throw cms::Exception("StreamTranslation","Registry serialization failed")
101  << "StreamSerializer module got an unknown error code\n"
102  << " while attempting to serialize registry\n";
103  break;
104  }
105  }
106 
107  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
108  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
109  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
110  // calculate the adler32 checksum and fill it into the struct
111  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
112  //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
113  return data_buffer.curr_space_used_;
114  }
115 
137  ParameterSetID const& selectorConfig,
139  SerializeDataBuffer &data_buffer) {
140  Parentage parentage;
141 
142  EventSelectionIDVector selectionIDs = eventPrincipal.eventSelectionIDs();
143  selectionIDs.push_back(selectorConfig);
144  SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory(), selectionIDs, eventPrincipal.branchListIndexes());
145 
146  Selections::const_iterator i(selections_->begin()),ie(selections_->end());
147  // Loop over EDProducts, fill the provenance, and write.
148 
149  for(Selections::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) {
150  BranchDescription const& desc = **i;
151  BranchID const& id = desc.branchID();
152 
153  OutputHandle const oh = eventPrincipal.getForOutput(id, true);
154  if(!oh.productProvenance()) {
155  // No product with this ID was put in the event.
156  // Create and write the provenance.
157  se.products().push_back(StreamedProduct(desc));
158  } else {
160  assert(found);
161  se.products().push_back(StreamedProduct(oh.wrapper(),
162  desc,
163  oh.wrapper() != 0,
164  &parentage.parents()));
165  }
166  }
167 
168  data_buffer.rootbuf_.Reset();
169  RootDebug tracer(10,10);
170 
171  //TClass* tc = getTClass(typeid(SendEvent));
172  int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
173  switch(bres) {
174  case 0: // failure
175  {
176  throw cms::Exception("StreamTranslation","Event serialization failed")
177  << "StreamSerializer failed to serialize event: "
178  << eventPrincipal.id();
179  break;
180  }
181  case 1: // succcess
182  break;
183  case 2: // truncated result
184  {
185  throw cms::Exception("StreamTranslation","Event serialization truncated")
186  << "StreamSerializer module attempted to serialize an event\n"
187  << "that is to big for the allocated buffers: "
188  << eventPrincipal.id();
189  break;
190  }
191  default: // unknown
192  {
193  throw cms::Exception("StreamTranslation","Event serialization failed")
194  << "StreamSerializer module got an unknown error code\n"
195  << " while attempting to serialize event: "
196  << eventPrincipal.id();
197  break;
198  }
199  }
200 
201  data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
202  data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
203  data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
204 #if 0
205  if(data_buffer.ptr_ != data_.ptr_) {
206  std::cerr << "ROOT reset the buffer!!!!\n";
207  data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
208  }
209 #endif
210  // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
211  // eventMessage.eventAddr());
212  // eventMessage.setEventLength(rootbuf.Length());
213 
214  // compress before return if we need to
215  // should test if compressed already - should never be?
216  // as double compression can have problems
217  if(use_compression) {
218  unsigned int dest_size =
219  compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
220  if(dest_size != 0) {
221  data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
222  data_buffer.curr_space_used_ = dest_size;
223  }
224  }
225  // calculate the adler32 checksum and fill it into the struct
226  data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
227  //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
228 
229  return data_buffer.curr_space_used_;
230  }
231 
237  unsigned int
238  StreamSerializer::compressBuffer(unsigned char *inputBuffer,
239  unsigned int inputSize,
240  std::vector<unsigned char> &outputBuffer,
241  int compressionLevel) {
242  unsigned int resultSize = 0;
243 
244  // what are these magic numbers? (jbk)
245  unsigned long dest_size = (unsigned long)(double(inputSize)*
246  1.002 + 1.0) + 12;
247  if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
248 
249  // compression 1-9, 6 is zlib default, 0 none
250  int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
251  inputSize, compressionLevel);
252 
253  // check status
254  if(ret == Z_OK) {
255  // return the correct length
256  resultSize = dest_size;
257 
258  FDEBUG(1) << " original size = " << inputSize
259  << " final size = " << dest_size
260  << " ratio = " << double(dest_size)/double(inputSize)
261  << std::endl;
262  } else {
263  // compression failed, return a size of zero
264  FDEBUG(9) << "Compression Return value: " << ret
265  << " Okay = " << Z_OK << std::endl;
266  // do we throw an exception here?
267  std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
268 
269  }
270 
271  return resultSize;
272  }
273 }
int i
Definition: DBlmapReader.cc:9
EventSelectionIDVector const & eventSelectionIDs() const
void setProcessConfigurations(std::vector< ProcessConfiguration > const &pcs)
static ThreadSafeRegistry * instance()
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
EventID const & id() const
ProductProvenance const * productProvenance() const
Definition: OutputHandle.h:101
bool getMapped(key_type const &k, value_type &result) const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
BranchListIndexes const & branchListIndexes() const
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:87
static unsigned int compressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel)
ProcessHistory const & processHistory() const
Definition: Principal.h:122
std::vector< EventSelectionID > EventSelectionIDVector
ProcessConfigurationRegistry::vector_type ProcessConfigurationVector
std::vector< BranchID > const & parents() const
Definition: Parentage.h:38
std::vector< unsigned char > comp_buf_
std::map< key_type, value_type > collection_type
unsigned int curr_event_size_
iterator end()
Definition: Selections.h:367
static ThreadSafeIndexedRegistry * instance()
void const * wrapper() const
Definition: OutputHandle.h:89
OutputHandle getForOutput(BranchID const &bid, bool getProd) const
Definition: Principal.cc:659
BranchID const & branchID() const
iterator begin()
Definition: Selections.h:366
StreamSerializer(Selections const *selections)
unsigned int curr_space_used_
int serializeEvent(EventPrincipal const &eventPrincipal, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
double sd
ParentageID const & parentageID() const
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
void setBranchIDLists(BranchIDLists const &bidlists)
Selections const * selections_
void push_back(BranchDescription const &bd)
int serializeRegistry(SerializeDataBuffer &data_buffer)
collection_type & data()
Provide access to the contained collection.
void fillMap(Registry *reg, regmap_type &fillme)
Definition: Registry.cc:24
unsigned char * ptr_
EventAuxiliary const & aux() const
void setParameterSetMap(ParameterSetMap const &psetMap)