Go to the documentation of this file.00001 #include "CondCore/DBCommon/interface/BlobStreamerPluginFactory.h"
00002 #include "CondCore/DBCommon/interface/Exception.h"
00003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00004
00005 #include "CondCore/ORA/interface/IBlobStreamingService.h"
00006
00007 #include "TBufferBlobStreamingService.h"
00008
00009
00010
00011 #include <cstddef>
00012
00013 #include "CoralBase/Blob.h"
00014
00015 #include <algorithm>
00016 #include <typeinfo>
00017 #include <string>
00018 #include <cstring>
00019 #include <zlib.h>
00020
00021 namespace cond {
00022
00023 class BlobStreamingService : virtual public ora::IBlobStreamingService {
00024 public:
00025
00026
00027 BlobStreamingService();
00028
00029 virtual ~BlobStreamingService();
00030
00031 boost::shared_ptr<coral::Blob> write( const void* addressOfInputData, Reflex::Type const & classDictionary, bool useCompression=true );
00032
00033 void read( const coral::Blob& blobData, void* addressOfContainer, Reflex::Type const & classDictionary );
00034
00035
00036 private:
00037
00038 typedef std::pair<unsigned long long, unsigned long long> uuid;
00039
00040 static const size_t m_idsize=sizeof(uuid);
00041 static const size_t m_offset = m_idsize + sizeof(unsigned long long);
00042 static const size_t nVariants=3;
00043
00044 enum Variant { OLD, COMPRESSED_TBUFFER, COMPRESSED_CHARS };
00045 static uuid const variantIds[nVariants];
00046
00047
00048 static Variant findVariant(const void* address);
00049 static int isVectorChar(Reflex::Type const & classDictionary);
00050
00051
00052 static boost::shared_ptr<coral::Blob> compress(const void* addr, size_t isize);
00053 static boost::shared_ptr<coral::Blob> expand(const coral::Blob& blobIn);
00054
00055
00056 boost::shared_ptr<ora::IBlobStreamingService> rootService;
00057
00058 };
00059
00060
00061 BlobStreamingService::BlobStreamingService() : rootService(new cond::TBufferBlobStreamingService()){}
00062
00063 BlobStreamingService::~BlobStreamingService(){}
00064
00065 boost::shared_ptr<coral::Blob> BlobStreamingService::write( const void* addressOfInputData, Reflex::Type const & classDictionary, bool useCompression ) {
00066 boost::shared_ptr<coral::Blob> blobOut;
00067 int const k = isVectorChar(classDictionary);
00068 switch (k) {
00069 case 0 :
00070 {
00071
00072
00073 boost::shared_ptr<coral::Blob> buffer = rootService->write(addressOfInputData, classDictionary);
00074 if( useCompression ){
00075 blobOut = compress(buffer->startingAddress(),buffer->size());
00076 *reinterpret_cast<uuid*>(blobOut->startingAddress()) = variantIds[COMPRESSED_TBUFFER];
00077 } else {
00078 blobOut = buffer;
00079 }
00080 }
00081 break;
00082 case 1 :
00083 {
00084 if( useCompression ){
00085 std::vector<unsigned char> const & v = *reinterpret_cast< std::vector<unsigned char> const *> (addressOfInputData);
00086 blobOut = compress(&v.front(),v.size());
00087 *reinterpret_cast<uuid*>(blobOut->startingAddress()) = variantIds[COMPRESSED_CHARS];
00088 } else {
00089 blobOut = rootService->write(addressOfInputData,classDictionary);
00090 }
00091 }
00092 break;
00093 case 2 :
00094 {
00095 if( useCompression ){
00096 std::vector<char> const & v = *reinterpret_cast<std::vector<char> const *> (addressOfInputData);
00097 blobOut = compress(&v.front(),v.size());
00098 *reinterpret_cast<uuid*>(blobOut->startingAddress()) = variantIds[COMPRESSED_CHARS];
00099 } else {
00100 blobOut = rootService->write(addressOfInputData,classDictionary);
00101 }
00102 }
00103 break;
00104
00105 }
00106 return blobOut;
00107
00108 }
00109
00110
00111 void BlobStreamingService::read( const coral::Blob& blobData, void* addressOfContainer, Reflex::Type const & classDictionary ) {
00112
00113 Variant v = (size_t(blobData.size()) < m_offset) ? OLD : findVariant(blobData.startingAddress());
00114 switch (v) {
00115 case OLD :
00116 {
00117 rootService->read( blobData, addressOfContainer, classDictionary);
00118 }
00119 break;
00120 case COMPRESSED_TBUFFER :
00121 {
00122 boost::shared_ptr<coral::Blob> blobIn = expand(blobData);
00123 rootService->read( *blobIn, addressOfContainer, classDictionary);
00124 }
00125 break;
00126 case COMPRESSED_CHARS :
00127 {
00128 boost::shared_ptr<coral::Blob> blobIn = expand(blobData);
00129 int const k = isVectorChar(classDictionary);
00130 switch (k) {
00131 case 0 :
00132 {
00133
00134 }
00135 break;
00136 case 1:
00137 {
00138 std::vector<unsigned char> & v = *reinterpret_cast< std::vector<unsigned char> *> (addressOfContainer);
00139
00140 v.resize(blobIn->size());
00141 std::memcpy(&v.front(),blobIn->startingAddress(),v.size());
00142 }
00143 break;
00144 case 2:
00145 {
00146 std::vector<char> & v = *reinterpret_cast< std::vector<char> *> (addressOfContainer);
00147
00148 v.resize(blobIn->size());
00149 std::memcpy(&v.front(),blobIn->startingAddress(),v.size());
00150 }
00151 break;
00152 }
00153 }
00154 }
00155 }
00156
00157
00158
00159
00160 const BlobStreamingService::uuid BlobStreamingService::variantIds[nVariants] = {
00161 BlobStreamingService::uuid(0,0)
00162 ,BlobStreamingService::uuid(0xf4e92f169c974e8eLL, 0x97851f372586010dLL)
00163 ,BlobStreamingService::uuid(0xc9a95a45e60243cfLL, 0x8dc549534f9a9274LL)
00164 };
00165
00166
00167 int BlobStreamingService::isVectorChar(Reflex::Type const & classDictionary) {
00168 std::type_info const & t = classDictionary.TypeInfo();
00169 if (t==typeid(std::vector<unsigned char>) ) return 1;
00170 if (t==typeid(std::vector<char>) ) return 2;
00171 return 0;
00172 }
00173
00174
00175
00176 BlobStreamingService::Variant BlobStreamingService::findVariant(const void* address) {
00177 uuid const & id = *reinterpret_cast<uuid const*>(address);
00178 uuid const * v = std::find(variantIds,variantIds+nVariants,id);
00179 return (v==variantIds+nVariants) ? OLD : (Variant)(v-variantIds);
00180 }
00181
00182
00183 boost::shared_ptr<coral::Blob> BlobStreamingService::compress(const void* addr, size_t isize) {
00184 uLongf destLen = compressBound(isize);
00185 size_t usize = destLen + m_offset;
00186 boost::shared_ptr<coral::Blob> theBlob( new coral::Blob(usize));
00187 void * startingAddress = (unsigned char*)(theBlob->startingAddress())+ m_offset;
00188 int zerr = compress2( (unsigned char*)(startingAddress), &destLen,
00189 (unsigned char*)(addr), isize,
00190 9);
00191 if (zerr!=0) edm::LogError("BlobStreamingService")<< "Compression error " << zerr;
00192 destLen+= m_idsize + sizeof(unsigned long long);
00193 theBlob->resize(destLen);
00194
00195 startingAddress = (unsigned char*)(theBlob->startingAddress())+ m_idsize;
00196
00197 *reinterpret_cast<unsigned long long*>(startingAddress)=isize;
00198 return theBlob;
00199 }
00200
00201 boost::shared_ptr<coral::Blob> BlobStreamingService::expand(const coral::Blob& blobIn) {
00202 if (size_t(blobIn.size()) < m_offset) return boost::shared_ptr<coral::Blob>(new coral::Blob());
00203 long long csize = blobIn.size() - m_offset;
00204 unsigned char const * startingAddress = (unsigned char const*)(blobIn.startingAddress())+ m_idsize;
00205 unsigned long long usize = *reinterpret_cast<unsigned long long const*>(startingAddress);
00206 startingAddress += sizeof(unsigned long long);
00207 boost::shared_ptr<coral::Blob> theBlob( new coral::Blob(usize));
00208 uLongf destLen = usize;
00209 int zerr = uncompress((unsigned char *)(theBlob->startingAddress()), &destLen,
00210 startingAddress, csize);
00211 if (zerr!=0 || usize!=destLen)
00212 edm::LogError("BlobStreamingService")<< "uncompressing error " << zerr
00213 << " original size was " << usize
00214 << " new size is " << destLen;
00215
00216 return theBlob;
00217 }
00218
00219 }
00220
00221
00222 DEFINE_EDM_PLUGIN(cond::BlobStreamerPluginFactory,cond::BlobStreamingService,"COND/Services/BlobStreamingService");