CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
BlobStreamingService.cc
Go to the documentation of this file.
4 
6 
8 
9 
10 //
11 #include <cstddef>
12 //
13 #include "CoralBase/Blob.h"
14 //
15 #include <algorithm>
16 #include <typeinfo>
17 #include <string>
18 #include <cstring>
19 #include <zlib.h>
20 
21 namespace cond {
22 
24  public:
25 
26 
28 
29  virtual ~BlobStreamingService();
30 
31  boost::shared_ptr<coral::Blob> write( const void* addressOfInputData, Reflex::Type const & classDictionary, bool useCompression=true );
32 
33  void read( const coral::Blob& blobData, void* addressOfContainer, Reflex::Type const & classDictionary );
34 
35 
36  private:
37 
38  typedef std::pair<unsigned long long, unsigned long long> uuid;
39 
40  static const size_t m_idsize=sizeof(uuid);
41  static const size_t m_offset = m_idsize + sizeof(unsigned long long);
42  static const size_t nVariants=3;
43 
45  static uuid const variantIds[nVariants];
46 
47 
48  static Variant findVariant(const void* address);
49  static int isVectorChar(Reflex::Type const & classDictionary);
50 
51 
52  static boost::shared_ptr<coral::Blob> compress(const void* addr, size_t isize);
53  static boost::shared_ptr<coral::Blob> expand(const coral::Blob& blobIn);
54 
55 
56  boost::shared_ptr<ora::IBlobStreamingService> rootService;
57 
58  };
59 
60 
62 
64 
65  boost::shared_ptr<coral::Blob> BlobStreamingService::write( const void* addressOfInputData, Reflex::Type const & classDictionary, bool useCompression ) {
66  boost::shared_ptr<coral::Blob> blobOut;
67  int const k = isVectorChar(classDictionary);
68  switch (k) {
69  case 0 :
70  {
71  // at the moment we write TBuffer compressed, than we see....
72  // we may wish to avoid one buffer copy...
73  boost::shared_ptr<coral::Blob> buffer = rootService->write(addressOfInputData, classDictionary);
74  if( useCompression ){
75  blobOut = compress(buffer->startingAddress(),buffer->size());
76  *reinterpret_cast<uuid*>(blobOut->startingAddress()) = variantIds[COMPRESSED_TBUFFER];
77  } else {
78  blobOut = buffer;
79  }
80  }
81  break;
82  case 1 :
83  {
84  if( useCompression ){
85  std::vector<unsigned char> const & v = *reinterpret_cast< std::vector<unsigned char> const *> (addressOfInputData);
86  blobOut = compress(&v.front(),v.size());
87  *reinterpret_cast<uuid*>(blobOut->startingAddress()) = variantIds[COMPRESSED_CHARS];
88  } else {
89  blobOut = rootService->write(addressOfInputData,classDictionary);
90  }
91  }
92  break;
93  case 2 :
94  {
95  if( useCompression ){
96  std::vector<char> const & v = *reinterpret_cast<std::vector<char> const *> (addressOfInputData);
97  blobOut = compress(&v.front(),v.size());
98  *reinterpret_cast<uuid*>(blobOut->startingAddress()) = variantIds[COMPRESSED_CHARS];
99  } else {
100  blobOut = rootService->write(addressOfInputData,classDictionary);
101  }
102  }
103  break;
104 
105  }
106  return blobOut;
107 
108  }
109 
110 
111  void BlobStreamingService::read( const coral::Blob& blobData, void* addressOfContainer, Reflex::Type const & classDictionary ) {
112  // protect against small blobs...
113  Variant v = (size_t(blobData.size()) < m_offset) ? OLD : findVariant(blobData.startingAddress());
114  switch (v) {
115  case OLD :
116  {
117  rootService->read( blobData, addressOfContainer, classDictionary);
118  }
119  break;
120  case COMPRESSED_TBUFFER :
121  {
122  boost::shared_ptr<coral::Blob> blobIn = expand(blobData);
123  rootService->read( *blobIn, addressOfContainer, classDictionary);
124  }
125  break;
126  case COMPRESSED_CHARS :
127  {
128  boost::shared_ptr<coral::Blob> blobIn = expand(blobData);
129  int const k = isVectorChar(classDictionary);
130  switch (k) {
131  case 0 :
132  {
133  // error!!!
134  }
135  break;
136  case 1:
137  {
138  std::vector<unsigned char> & v = *reinterpret_cast< std::vector<unsigned char> *> (addressOfContainer);
139  // we should avoid the copy!
140  v.resize(blobIn->size());
141  std::memcpy(&v.front(),blobIn->startingAddress(),v.size());
142  }
143  break;
144  case 2:
145  {
146  std::vector<char> & v = *reinterpret_cast< std::vector<char> *> (addressOfContainer);
147  // we should avoid the copy!
148  v.resize(blobIn->size());
149  std::memcpy(&v.front(),blobIn->startingAddress(),v.size());
150  }
151  break;
152  }
153  }
154  }
155  }
156 
157 
158 
159 
162  ,BlobStreamingService::uuid(0xf4e92f169c974e8eLL, 0x97851f372586010dLL)
163  ,BlobStreamingService::uuid(0xc9a95a45e60243cfLL, 0x8dc549534f9a9274LL)
164  };
165 
166 
167  int BlobStreamingService::isVectorChar(Reflex::Type const & classDictionary) {
168  std::type_info const & t = classDictionary.TypeInfo();
169  if (t==typeid(std::vector<unsigned char>) ) return 1;
170  if (t==typeid(std::vector<char>) ) return 2;
171  return 0;
172  }
173 
174 
175 
177  uuid const & id = *reinterpret_cast<uuid const*>(address);
179  return (v==variantIds+nVariants) ? OLD : (Variant)(v-variantIds);
180  }
181 
182 
183  boost::shared_ptr<coral::Blob> BlobStreamingService::compress(const void* addr, size_t isize) {
184  uLongf destLen = compressBound(isize);
185  size_t usize = destLen + m_offset;
186  boost::shared_ptr<coral::Blob> theBlob( new coral::Blob(usize));
187  void * startingAddress = (unsigned char*)(theBlob->startingAddress())+ m_offset;
188  int zerr = compress2( (unsigned char*)(startingAddress), &destLen,
189  (unsigned char*)(addr), isize,
190  9);
191  if (zerr!=0) edm::LogError("BlobStreamingService")<< "Compression error " << zerr;
192  destLen+= m_idsize + sizeof(unsigned long long);
193  theBlob->resize(destLen);
194 
195  startingAddress = (unsigned char*)(theBlob->startingAddress())+ m_idsize;
196  // write expanded size;
197  *reinterpret_cast<unsigned long long*>(startingAddress)=isize;
198  return theBlob;
199  }
200 
201  boost::shared_ptr<coral::Blob> BlobStreamingService::expand(const coral::Blob& blobIn) {
202  if (size_t(blobIn.size()) < m_offset) return boost::shared_ptr<coral::Blob>(new coral::Blob());
203  long long csize = blobIn.size() - m_offset;
204  unsigned char const * startingAddress = (unsigned char const*)(blobIn.startingAddress())+ m_idsize;
205  unsigned long long usize = *reinterpret_cast<unsigned long long const*>(startingAddress);
206  startingAddress += sizeof(unsigned long long);
207  boost::shared_ptr<coral::Blob> theBlob( new coral::Blob(usize));
208  uLongf destLen = usize;
209  int zerr = uncompress((unsigned char *)(theBlob->startingAddress()), &destLen,
210  startingAddress, csize);
211  if (zerr!=0 || usize!=destLen)
212  edm::LogError("BlobStreamingService")<< "uncompressing error " << zerr
213  << " original size was " << usize
214  << " new size is " << destLen;
215 
216  return theBlob;
217  }
218 
219 }
220 
221 // keep the old good name
char * address
Definition: mlp_lapack.h:14
static Variant findVariant(const void *address)
void zerr(int)
void read(const coral::Blob &blobData, void *addressOfContainer, Reflex::Type const &classDictionary)
Reads an object from a Blob and fills-in the container.
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
boost::shared_ptr< coral::Blob > write(const void *addressOfInputData, Reflex::Type const &classDictionary, bool useCompression=true)
std::pair< unsigned long long, unsigned long long > uuid
static boost::shared_ptr< coral::Blob > expand(const coral::Blob &blobIn)
boost::shared_ptr< ora::IBlobStreamingService > rootService
static int isVectorChar(Reflex::Type const &classDictionary)
static boost::shared_ptr< coral::Blob > compress(const void *addr, size_t isize)
int k[5][pyjets_maxn]
Interface for a Streaming Service.
static uuid const variantIds[nVariants]
string const
Definition: compareJSON.py:14
#define DEFINE_EDM_PLUGIN(factory, type, name)