CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/CondCore/DBCommon/plugins/BlobStreamingService.cc

Go to the documentation of this file.
00001 #include "CondCore/DBCommon/interface/BlobStreamerPluginFactory.h"
00002 #include "CondCore/DBCommon/interface/Exception.h"
00003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00004 
00005 #include "CondCore/ORA/interface/IBlobStreamingService.h"
00006 
00007 #include "TBufferBlobStreamingService.h"
00008 
00009 
00010 //
00011 #include <cstddef>
00012 //
00013 #include "CoralBase/Blob.h"
00014 //
00015 #include <algorithm>
00016 #include <typeinfo>
00017 #include <string>
00018 #include <cstring>
00019 #include <zlib.h>
00020 
00021 namespace cond {
00022 
00023   class BlobStreamingService : virtual public ora::IBlobStreamingService {
00024   public:
00025     
00026     
00027     BlobStreamingService();
00028     
00029     virtual ~BlobStreamingService();
00030     
00031     boost::shared_ptr<coral::Blob> write( const void* addressOfInputData,  Reflex::Type const & classDictionary, bool useCompression=true );
00032     
00033     void read( const coral::Blob& blobData, void* addressOfContainer,  Reflex::Type const & classDictionary );
00034     
00035     
00036   private:
00037     
00038     typedef std::pair<unsigned long long, unsigned long long> uuid;
00039     
00040     static const size_t m_idsize=sizeof(uuid);
00041     static const size_t m_offset = m_idsize + sizeof(unsigned long long);
00042     static const size_t nVariants=3;
00043     
00044     enum Variant { OLD, COMPRESSED_TBUFFER, COMPRESSED_CHARS }; 
00045     static uuid const variantIds[nVariants];
00046     
00047     
00048     static Variant findVariant(const void* address);
00049     static int isVectorChar(Reflex::Type const & classDictionary);
00050     
00051     
00052     static boost::shared_ptr<coral::Blob>  compress(const void* addr, size_t isize);
00053     static boost::shared_ptr<coral::Blob>  expand(const coral::Blob& blobIn);
00054     
00055     
00056     boost::shared_ptr<ora::IBlobStreamingService> rootService; 
00057     
00058   };
00059   
00060   
00061   BlobStreamingService::BlobStreamingService() : rootService(new cond::TBufferBlobStreamingService()){}
00062   
00063   BlobStreamingService::~BlobStreamingService(){}
00064   
00065   boost::shared_ptr<coral::Blob> BlobStreamingService::write( const void* addressOfInputData,  Reflex::Type const & classDictionary, bool useCompression ) {
00066     boost::shared_ptr<coral::Blob> blobOut;
00067     int const k = isVectorChar(classDictionary);
00068     switch (k) {
00069     case 0 : 
00070       {
00071         // at the moment we write TBuffer compressed, than we see....
00072         // we may wish to avoid one buffer copy...
00073         boost::shared_ptr<coral::Blob> buffer = rootService->write(addressOfInputData, classDictionary);
00074         if( useCompression ){
00075           blobOut = compress(buffer->startingAddress(),buffer->size());
00076           *reinterpret_cast<uuid*>(blobOut->startingAddress()) = variantIds[COMPRESSED_TBUFFER];
00077         } else {
00078           blobOut = buffer;
00079         }
00080       }
00081       break;
00082     case 1 : 
00083       {
00084         if( useCompression ){
00085           std::vector<unsigned char> const & v = *reinterpret_cast< std::vector<unsigned char> const *> (addressOfInputData);
00086           blobOut = compress(&v.front(),v.size());
00087           *reinterpret_cast<uuid*>(blobOut->startingAddress()) = variantIds[COMPRESSED_CHARS];
00088         } else {
00089           blobOut = rootService->write(addressOfInputData,classDictionary);
00090         }
00091       }
00092       break;
00093     case 2 : 
00094       {
00095         if( useCompression ){
00096           std::vector<char> const & v = *reinterpret_cast<std::vector<char> const *> (addressOfInputData);
00097           blobOut = compress(&v.front(),v.size());
00098           *reinterpret_cast<uuid*>(blobOut->startingAddress()) = variantIds[COMPRESSED_CHARS];
00099         } else {
00100           blobOut = rootService->write(addressOfInputData,classDictionary);
00101         }
00102       }
00103       break;
00104       
00105     }
00106     return blobOut;
00107 
00108   }
00109   
00110   
00111   void BlobStreamingService::read( const coral::Blob& blobData, void* addressOfContainer,  Reflex::Type const & classDictionary ) {
00112     // protect against small blobs...
00113     Variant v =  (size_t(blobData.size()) < m_offset) ? OLD : findVariant(blobData.startingAddress());
00114     switch (v) {
00115     case OLD :
00116       {
00117         rootService->read( blobData, addressOfContainer, classDictionary);
00118       }
00119       break;
00120     case COMPRESSED_TBUFFER :
00121       {
00122         boost::shared_ptr<coral::Blob> blobIn = expand(blobData);
00123         rootService->read( *blobIn, addressOfContainer, classDictionary);
00124       }
00125       break;
00126     case COMPRESSED_CHARS :
00127       {
00128         boost::shared_ptr<coral::Blob> blobIn = expand(blobData);
00129         int const k = isVectorChar(classDictionary);
00130         switch (k) {
00131         case 0 : 
00132           {
00133             // error!!!
00134           }
00135           break;
00136         case 1:
00137           {
00138             std::vector<unsigned char> & v = *reinterpret_cast< std::vector<unsigned char> *> (addressOfContainer);
00139             // we should avoid the copy!
00140             v.resize(blobIn->size());
00141             std::memcpy(&v.front(),blobIn->startingAddress(),v.size());
00142           }
00143           break;
00144         case 2:
00145           {
00146             std::vector<char> & v = *reinterpret_cast< std::vector<char> *> (addressOfContainer);
00147             // we should avoid the copy!
00148             v.resize(blobIn->size());
00149             std::memcpy(&v.front(),blobIn->startingAddress(),v.size());
00150           }
00151           break;
00152         }
00153       }
00154     }
00155   }
00156       
00157     
00158     
00159     
00160   const BlobStreamingService::uuid BlobStreamingService::variantIds[nVariants] = {
00161     BlobStreamingService::uuid(0,0)
00162     ,BlobStreamingService::uuid(0xf4e92f169c974e8eLL, 0x97851f372586010dLL)
00163     ,BlobStreamingService::uuid(0xc9a95a45e60243cfLL, 0x8dc549534f9a9274LL)
00164   };
00165   
00166   
00167   int BlobStreamingService::isVectorChar(Reflex::Type const & classDictionary) {
00168     std::type_info const & t = classDictionary.TypeInfo();
00169     if (t==typeid(std::vector<unsigned char>) ) return 1;
00170     if (t==typeid(std::vector<char>) ) return 2;
00171     return 0;
00172   }
00173   
00174   
00175   
00176   BlobStreamingService::Variant BlobStreamingService::findVariant(const void* address) {
00177     uuid const & id = *reinterpret_cast<uuid const*>(address);
00178     uuid const *  v = std::find(variantIds,variantIds+nVariants,id);
00179     return (v==variantIds+nVariants) ? OLD : (Variant)(v-variantIds);
00180   }
00181   
00182   
00183   boost::shared_ptr<coral::Blob> BlobStreamingService::compress(const void* addr, size_t isize) {
00184     uLongf destLen = compressBound(isize);
00185     size_t usize = destLen + m_offset;
00186     boost::shared_ptr<coral::Blob> theBlob( new coral::Blob(usize));
00187      void * startingAddress = (unsigned char*)(theBlob->startingAddress())+ m_offset;
00188     int zerr =  compress2( (unsigned char*)(startingAddress), &destLen,
00189                            (unsigned char*)(addr), isize,
00190                           9);
00191     if (zerr!=0) edm::LogError("BlobStreamingService")<< "Compression error " << zerr;
00192     destLen+= m_idsize + sizeof(unsigned long long);
00193     theBlob->resize(destLen);
00194 
00195     startingAddress = (unsigned char*)(theBlob->startingAddress())+ m_idsize;
00196     // write expanded size;
00197     *reinterpret_cast<unsigned long long*>(startingAddress)=isize; 
00198     return theBlob;
00199   }
00200 
00201   boost::shared_ptr<coral::Blob> BlobStreamingService::expand(const coral::Blob& blobIn) {
00202     if (size_t(blobIn.size()) < m_offset) return boost::shared_ptr<coral::Blob>(new coral::Blob());
00203     long long csize =  blobIn.size() - m_offset;
00204     unsigned char const * startingAddress = (unsigned char const*)(blobIn.startingAddress())+ m_idsize;
00205     unsigned long long usize = *reinterpret_cast<unsigned long long const*>(startingAddress);
00206     startingAddress +=  sizeof(unsigned long long);
00207     boost::shared_ptr<coral::Blob> theBlob( new coral::Blob(usize));
00208     uLongf destLen = usize;
00209     int zerr =  uncompress((unsigned char *)(theBlob->startingAddress()),  &destLen,
00210                            startingAddress, csize);
00211     if (zerr!=0 || usize!=destLen) 
00212       edm::LogError("BlobStreamingService")<< "uncompressing error " << zerr
00213                                << " original size was " << usize
00214                                << " new size is " << destLen;
00215     
00216      return theBlob; 
00217   }
00218 
00219 }
00220 
00221 // keep the old good name
00222 DEFINE_EDM_PLUGIN(cond::BlobStreamerPluginFactory,cond::BlobStreamingService,"COND/Services/BlobStreamingService");