CMS 3D CMS Logo

CMSSW_4_4_3_patch1/src/IOPool/Streamer/src/StreamSerializer.cc

Go to the documentation of this file.
00001 
00007 #include "IOPool/Streamer/interface/StreamSerializer.h"
00008 #include "DataFormats/Provenance/interface/BranchDescription.h"
00009 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00010 #include "DataFormats/Provenance/interface/Parentage.h"
00011 #include "DataFormats/Provenance/interface/ProductProvenance.h"
00012 #include "DataFormats/Provenance/interface/BranchIDListRegistry.h"
00013 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00014 #include "DataFormats/Provenance/interface/EventSelectionID.h"
00015 #include "DataFormats/Provenance/interface/BranchListIndex.h"
00016 #include "IOPool/Streamer/interface/ClassFiller.h"
00017 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
00018 #include "FWCore/Framework/interface/ConstProductRegistry.h"
00019 #include "FWCore/Framework/interface/EventPrincipal.h"
00020 #include "FWCore/ParameterSet/interface/Registry.h"
00021 #include "FWCore/Utilities/interface/Adler32Calculator.h"
00022 #include "DataFormats/Streamer/interface/StreamedProducts.h"
00023 #include "DataFormats/Common/interface/OutputHandle.h"
00024 #include "FWCore/ServiceRegistry/interface/Service.h"
00025 
00026 #include "zlib.h"
00027 #include <cstdlib>
00028 #include <list>
00029 
00030 namespace edm {
00031 
00032   StreamSerializer::Arr::Arr(int sz):ptr_((char*)malloc(sz)) { }
00033   StreamSerializer::Arr::~Arr() { free(ptr_); }
00034 
00038   StreamSerializer::StreamSerializer(Selections const* selections):
00039     selections_(selections),
00040     tc_(getTClass(typeid(SendEvent))) {
00041   }
00042 
00048   int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer) {
00049     FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
00050     SendJobHeader sd;
00051 
00052     Selections::const_iterator i(selections_->begin()), e(selections_->end());
00053 
00054     FDEBUG(9) << "Product List: " << std::endl;
00055 
00056 
00057     for(; i != e; ++i)  {
00058         sd.push_back(**i);
00059         FDEBUG(9) << "StreamOutput got product = " << (*i)->className()
00060                   << std::endl;
00061     }
00062     Service<ConstProductRegistry> reg;
00063     sd.setBranchIDLists(BranchIDListRegistry::instance()->data());
00064     SendJobHeader::ParameterSetMap psetMap;
00065 
00066     pset::fillMap(pset::Registry::instance(), psetMap);
00067     sd.setParameterSetMap(psetMap);
00068 
00069     typedef ProcessConfigurationRegistry::collection_type PCMap;
00070     PCMap const& procConfigMap = ProcessConfigurationRegistry::instance()->data();
00071     ProcessConfigurationVector procConfigVector;
00072     for(PCMap::const_iterator i = procConfigMap.begin(), e = procConfigMap.end(); i != e; ++i) {
00073       procConfigVector.push_back(i->second);
00074     }
00075     sort_all(procConfigVector);
00076     sd.setProcessConfigurations(procConfigVector);
00077 
00078     data_buffer.rootbuf_.Reset();
00079 
00080     RootDebug tracer(10,10);
00081 
00082     TClass* tc = getTClass(typeid(SendJobHeader));
00083     int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
00084 
00085     switch(bres) {
00086       case 0: // failure
00087       {
00088           throw cms::Exception("StreamTranslation","Registry serialization failed")
00089             << "StreamSerializer failed to serialize registry\n";
00090           break;
00091       }
00092       case 1: // succcess
00093         break;
00094       case 2: // truncated result
00095       {
00096           throw cms::Exception("StreamTranslation","Registry serialization truncated")
00097             << "StreamSerializer module attempted to serialize\n"
00098             << "a registry that is to big for the allocated buffers\n";
00099           break;
00100       }
00101       default: // unknown
00102       {
00103           throw cms::Exception("StreamTranslation","Registry serialization failed")
00104             << "StreamSerializer module got an unknown error code\n"
00105             << " while attempting to serialize registry\n";
00106           break;
00107       }
00108     }
00109 
00110    data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
00111    data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
00112    data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
00113    // calculate the adler32 checksum and fill it into the struct
00114    data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
00115    //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
00116    return data_buffer.curr_space_used_;
00117   }
00118 
00139   int StreamSerializer::serializeEvent(EventPrincipal const& eventPrincipal,
00140                                        ParameterSetID const& selectorConfig,
00141                                        bool use_compression, int compression_level,
00142                                        SerializeDataBuffer &data_buffer) {
00143     Parentage parentage;
00144 
00145     EventSelectionIDVector selectionIDs = eventPrincipal.eventSelectionIDs();
00146     selectionIDs.push_back(selectorConfig);
00147     SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory(), selectionIDs, eventPrincipal.branchListIndexes());
00148 
00149     Selections::const_iterator i(selections_->begin()),ie(selections_->end());
00150     // Loop over EDProducts, fill the provenance, and write.
00151 
00152     for(Selections::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) {
00153       BranchDescription const& desc = **i;
00154       BranchID const& id = desc.branchID();
00155 
00156       OutputHandle const oh = eventPrincipal.getForOutput(id, true);
00157       if(!oh.productProvenance()) {
00158         // No product with this ID was put in the event.
00159         // Create and write the provenance.
00160         se.products().push_back(StreamedProduct(desc));
00161       } else {
00162         bool found = ParentageRegistry::instance()->getMapped(oh.productProvenance()->parentageID(), parentage);
00163         assert(found);
00164         se.products().push_back(StreamedProduct(oh.wrapper(),
00165                                                 desc,
00166                                                 oh.wrapper() != 0,
00167                                                 &parentage.parents()));
00168       }
00169     }
00170 
00171     data_buffer.rootbuf_.Reset();
00172     RootDebug tracer(10,10);
00173 
00174     //TClass* tc = getTClass(typeid(SendEvent));
00175     int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
00176     switch(bres) {
00177       case 0: // failure
00178         {
00179           throw cms::Exception("StreamTranslation","Event serialization failed")
00180             << "StreamSerializer failed to serialize event: "
00181             << eventPrincipal.id();
00182           break;
00183         }
00184       case 1: // succcess
00185         break;
00186       case 2: // truncated result
00187         {
00188           throw cms::Exception("StreamTranslation","Event serialization truncated")
00189             << "StreamSerializer module attempted to serialize an event\n"
00190             << "that is to big for the allocated buffers: "
00191             << eventPrincipal.id();
00192           break;
00193         }
00194     default: // unknown
00195         {
00196           throw cms::Exception("StreamTranslation","Event serialization failed")
00197             << "StreamSerializer module got an unknown error code\n"
00198             << " while attempting to serialize event: "
00199             << eventPrincipal.id();
00200           break;
00201         }
00202       }
00203 
00204    data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
00205    data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
00206    data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
00207 #if 0
00208    if(data_buffer.ptr_ != data_.ptr_) {
00209         std::cerr << "ROOT reset the buffer!!!!\n";
00210         data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
00211         }
00212 #endif
00213    // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
00214    // eventMessage.eventAddr());
00215    // eventMessage.setEventLength(rootbuf.Length());
00216 
00217     // compress before return if we need to
00218     // should test if compressed already - should never be?
00219     //   as double compression can have problems
00220     if(use_compression) {
00221       unsigned int dest_size =
00222         compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
00223       if(dest_size != 0) {
00224         data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
00225         data_buffer.curr_space_used_ = dest_size;
00226       }
00227     }
00228     // calculate the adler32 checksum and fill it into the struct
00229     data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
00230     //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
00231 
00232     return data_buffer.curr_space_used_;
00233   }
00234 
00240   unsigned int
00241   StreamSerializer::compressBuffer(unsigned char *inputBuffer,
00242                                    unsigned int inputSize,
00243                                    std::vector<unsigned char> &outputBuffer,
00244                                    int compressionLevel) {
00245     unsigned int resultSize = 0;
00246 
00247     // what are these magic numbers? (jbk)
00248     unsigned long dest_size = (unsigned long)(double(inputSize)*
00249                                               1.002 + 1.0) + 12;
00250     if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
00251 
00252     // compression 1-9, 6 is zlib default, 0 none
00253     int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
00254                         inputSize, compressionLevel);
00255 
00256     // check status
00257     if(ret == Z_OK) {
00258         // return the correct length
00259         resultSize = dest_size;
00260 
00261         FDEBUG(1) << " original size = " << inputSize
00262                   << " final size = " << dest_size
00263                   << " ratio = " << double(dest_size)/double(inputSize)
00264                   << std::endl;
00265     } else {
00266         // compression failed, return a size of zero
00267         FDEBUG(9) << "Compression Return value: " << ret
00268                   << " Okay = " << Z_OK << std::endl;
00269         // do we throw an exception here?
00270         std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
00271 
00272     }
00273 
00274     return resultSize;
00275   }
00276 }