CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
BlobStreamingService.cc
Go to the documentation of this file.
4 
6 
8 
9 
10 //
11 #include <cstddef>
12 //
13 #include "CoralBase/Blob.h"
14 //
15 #include <algorithm>
16 #include <typeinfo>
17 #include <string>
18 #include <cstring>
19 #include <zlib.h>
20 
21 // Function for testing BlobStreamingService
22 namespace test {
23  namespace BlobStreaming {
24  int test();
25  }
26 }
27 
28 namespace cond {
29 
31  friend int test::BlobStreaming::test();
32 
33  public:
34 
35 
37 
38  virtual ~BlobStreamingService();
39 
40  boost::shared_ptr<coral::Blob> write( const void* addressOfInputData, edm::TypeWithDict const & classDictionary, bool useCompression=true ) override;
41 
42  void read( const coral::Blob& blobData, void* addressOfContainer, edm::TypeWithDict const & classDictionary ) override;
43 
44 
45  private:
46 
47  typedef std::pair<unsigned long long, unsigned long long> uuid;
48 
49  static const size_t m_idsize=sizeof(uuid);
50  static const size_t m_offset = m_idsize + sizeof(unsigned long long);
51  static const size_t nVariants=3;
52 
54  static uuid const variantIds[nVariants];
55 
56 
57  static Variant findVariant(const void* address);
58  static int isVectorChar(edm::TypeWithDict const & classDictionary);
59 
60 
61  static boost::shared_ptr<coral::Blob> compress(const void* addr, size_t isize);
62  static boost::shared_ptr<coral::Blob> expand(const coral::Blob& blobIn);
63 
64 
65  boost::shared_ptr<ora::IBlobStreamingService> rootService;
66 
67  };
68 
69 
71 
73 
74  boost::shared_ptr<coral::Blob> BlobStreamingService::write( const void* addressOfInputData, edm::TypeWithDict const & classDictionary, bool useCompression ) {
75  boost::shared_ptr<coral::Blob> blobOut;
76  int const k = isVectorChar(classDictionary);
77  switch (k) {
78  case 0 :
79  {
80  // at the moment we write TBuffer compressed, than we see....
81  // we may wish to avoid one buffer copy...
82  boost::shared_ptr<coral::Blob> buffer = rootService->write(addressOfInputData, classDictionary);
83  if( useCompression ){
84  blobOut = compress(buffer->startingAddress(),buffer->size());
85  *reinterpret_cast<uuid*>(blobOut->startingAddress()) = variantIds[COMPRESSED_TBUFFER];
86  } else {
87  blobOut = buffer;
88  }
89  }
90  break;
91  case 1 :
92  {
93  if( useCompression ){
94  std::vector<unsigned char> const & v = *reinterpret_cast< std::vector<unsigned char> const *> (addressOfInputData);
95  blobOut = compress(&v.front(),v.size());
96  *reinterpret_cast<uuid*>(blobOut->startingAddress()) = variantIds[COMPRESSED_CHARS];
97  } else {
98  blobOut = rootService->write(addressOfInputData,classDictionary);
99  }
100  }
101  break;
102  case 2 :
103  {
104  if( useCompression ){
105  std::vector<char> const & v = *reinterpret_cast<std::vector<char> const *> (addressOfInputData);
106  blobOut = compress(&v.front(),v.size());
107  *reinterpret_cast<uuid*>(blobOut->startingAddress()) = variantIds[COMPRESSED_CHARS];
108  } else {
109  blobOut = rootService->write(addressOfInputData,classDictionary);
110  }
111  }
112  break;
113 
114  }
115  return blobOut;
116 
117  }
118 
119 
120  void BlobStreamingService::read( const coral::Blob& blobData, void* addressOfContainer, edm::TypeWithDict const & classDictionary ) {
121  // protect against small blobs...
122  Variant v = (size_t(blobData.size()) < m_offset) ? OLD : findVariant(blobData.startingAddress());
123  switch (v) {
124  case OLD :
125  {
126  rootService->read( blobData, addressOfContainer, classDictionary);
127  }
128  break;
129  case COMPRESSED_TBUFFER :
130  {
131  boost::shared_ptr<coral::Blob> blobIn = expand(blobData);
132  rootService->read( *blobIn, addressOfContainer, classDictionary);
133  }
134  break;
135  case COMPRESSED_CHARS :
136  {
137  boost::shared_ptr<coral::Blob> blobIn = expand(blobData);
138  int const k = isVectorChar(classDictionary);
139  switch (k) {
140  case 0 :
141  {
142  // error!!!
143  }
144  break;
145  case 1:
146  {
147  std::vector<unsigned char> & v = *reinterpret_cast< std::vector<unsigned char> *> (addressOfContainer);
148  // we should avoid the copy!
149  v.resize(blobIn->size());
150  std::memcpy(&v.front(),blobIn->startingAddress(),v.size());
151  }
152  break;
153  case 2:
154  {
155  std::vector<char> & v = *reinterpret_cast< std::vector<char> *> (addressOfContainer);
156  // we should avoid the copy!
157  v.resize(blobIn->size());
158  std::memcpy(&v.front(),blobIn->startingAddress(),v.size());
159  }
160  break;
161  }
162  }
163  }
164  }
165 
166 
167 
168 
171  ,BlobStreamingService::uuid(0xf4e92f169c974e8eLL, 0x97851f372586010dLL)
172  ,BlobStreamingService::uuid(0xc9a95a45e60243cfLL, 0x8dc549534f9a9274LL)
173  };
174 
175 
177  if (classDictionary == typeid(std::vector<unsigned char>)) return 1;
178  if (classDictionary == typeid(std::vector<char>)) return 2;
179  return 0;
180  }
181 
182 
183 
185  uuid const & id = *reinterpret_cast<uuid const*>(address);
187  return (v==variantIds+nVariants) ? OLD : (Variant)(v-variantIds);
188  }
189 
190 
191  boost::shared_ptr<coral::Blob> BlobStreamingService::compress(const void* addr, size_t isize) {
192  uLongf destLen = compressBound(isize);
193  size_t usize = destLen + m_offset;
194  boost::shared_ptr<coral::Blob> theBlob( new coral::Blob(usize));
195  void * startingAddress = (unsigned char*)(theBlob->startingAddress())+ m_offset;
196  int zerr = compress2( (unsigned char*)(startingAddress), &destLen,
197  (unsigned char*)(addr), isize,
198  9);
199  if (zerr!=0) edm::LogError("BlobStreamingService")<< "Compression error " << zerr;
200  destLen+= m_idsize + sizeof(unsigned long long);
201  theBlob->resize(destLen);
202 
203  startingAddress = (unsigned char*)(theBlob->startingAddress())+ m_idsize;
204  // write expanded size;
205  *reinterpret_cast<unsigned long long*>(startingAddress)=isize;
206  return theBlob;
207  }
208 
209  boost::shared_ptr<coral::Blob> BlobStreamingService::expand(const coral::Blob& blobIn) {
210  if (size_t(blobIn.size()) < m_offset) return boost::shared_ptr<coral::Blob>(new coral::Blob());
211  long long csize = blobIn.size() - m_offset;
212  unsigned char const * startingAddress = (unsigned char const*)(blobIn.startingAddress())+ m_idsize;
213  unsigned long long usize = *reinterpret_cast<unsigned long long const*>(startingAddress);
214  startingAddress += sizeof(unsigned long long);
215  boost::shared_ptr<coral::Blob> theBlob( new coral::Blob(usize));
216  uLongf destLen = usize;
217  int zerr = uncompress((unsigned char *)(theBlob->startingAddress()), &destLen,
218  startingAddress, csize);
219  if (zerr!=0 || usize!=destLen)
220  edm::LogError("BlobStreamingService")<< "uncompressing error " << zerr
221  << " original size was " << usize
222  << " new size is " << destLen;
223 
224  return theBlob;
225  }
226 
227 }
228 
229 // keep the old good name
static Variant findVariant(const void *address)
void zerr(int)
static int isVectorChar(edm::TypeWithDict const &classDictionary)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
std::pair< unsigned long long, unsigned long long > uuid
boost::shared_ptr< coral::Blob > write(const void *addressOfInputData, edm::TypeWithDict const &classDictionary, bool useCompression=true) override
static boost::shared_ptr< coral::Blob > expand(const coral::Blob &blobIn)
boost::shared_ptr< ora::IBlobStreamingService > rootService
static boost::shared_ptr< coral::Blob > compress(const void *addr, size_t isize)
Interface for a Streaming Service.
double const * startingAddress(ArrayAdaptor const &iV)
static uuid const variantIds[nVariants]
string const
Definition: compareJSON.py:14
#define DEFINE_EDM_PLUGIN(factory, type, name)
void read(const coral::Blob &blobData, void *addressOfContainer, edm::TypeWithDict const &classDictionary) override
Reads an object from a Blob and fills-in the container.