CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_6_1_1/src/IOPool/Streamer/src/StreamSerializer.cc

Go to the documentation of this file.
00001 
00007 #include "IOPool/Streamer/interface/StreamSerializer.h"
00008 #include "DataFormats/Provenance/interface/BranchDescription.h"
00009 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00010 #include "DataFormats/Provenance/interface/Parentage.h"
00011 #include "DataFormats/Provenance/interface/ProductProvenance.h"
00012 #include "DataFormats/Provenance/interface/Selections.h"
00013 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00014 #include "DataFormats/Provenance/interface/EventSelectionID.h"
00015 #include "DataFormats/Provenance/interface/BranchListIndex.h"
00016 #include "IOPool/Streamer/interface/ClassFiller.h"
00017 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
00018 #include "FWCore/Framework/interface/ConstProductRegistry.h"
00019 #include "FWCore/Framework/interface/EventPrincipal.h"
00020 #include "FWCore/ParameterSet/interface/Registry.h"
00021 #include "FWCore/Utilities/interface/Adler32Calculator.h"
00022 #include "DataFormats/Streamer/interface/StreamedProducts.h"
00023 #include "DataFormats/Common/interface/OutputHandle.h"
00024 #include "FWCore/ServiceRegistry/interface/Service.h"
00025 
00026 #include "zlib.h"
00027 #include <cstdlib>
00028 #include <list>
00029 
00030 namespace edm {
00031 
00035   StreamSerializer::StreamSerializer(Selections const* selections):
00036     selections_(selections),
00037     tc_(getTClass(typeid(SendEvent))) {
00038   }
00039 
00045   int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists) {
00046     FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
00047     SendJobHeader sd;
00048 
00049     Selections::const_iterator i(selections_->begin()), e(selections_->end());
00050 
00051     FDEBUG(9) << "Product List: " << std::endl;
00052 
00053 
00054     for(; i != e; ++i)  {
00055         sd.push_back(**i);
00056         FDEBUG(9) << "StreamOutput got product = " << (*i)->className()
00057                   << std::endl;
00058     }
00059     Service<ConstProductRegistry> reg;
00060     sd.setBranchIDLists(branchIDLists);
00061     SendJobHeader::ParameterSetMap psetMap;
00062 
00063     pset::fillMap(pset::Registry::instance(), psetMap);
00064     sd.setParameterSetMap(psetMap);
00065 
00066     typedef ProcessConfigurationRegistry::collection_type PCMap;
00067     PCMap const& procConfigMap = ProcessConfigurationRegistry::instance()->data();
00068     ProcessConfigurationVector procConfigVector;
00069     for(PCMap::const_iterator i = procConfigMap.begin(), e = procConfigMap.end(); i != e; ++i) {
00070       procConfigVector.push_back(i->second);
00071     }
00072     sort_all(procConfigVector);
00073     sd.setProcessConfigurations(procConfigVector);
00074 
00075     data_buffer.rootbuf_.Reset();
00076 
00077     RootDebug tracer(10,10);
00078 
00079     TClass* tc = getTClass(typeid(SendJobHeader));
00080     int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
00081 
00082     switch(bres) {
00083       case 0: // failure
00084       {
00085           throw cms::Exception("StreamTranslation","Registry serialization failed")
00086             << "StreamSerializer failed to serialize registry\n";
00087           break;
00088       }
00089       case 1: // succcess
00090         break;
00091       case 2: // truncated result
00092       {
00093           throw cms::Exception("StreamTranslation","Registry serialization truncated")
00094             << "StreamSerializer module attempted to serialize\n"
00095             << "a registry that is to big for the allocated buffers\n";
00096           break;
00097       }
00098       default: // unknown
00099       {
00100           throw cms::Exception("StreamTranslation","Registry serialization failed")
00101             << "StreamSerializer module got an unknown error code\n"
00102             << " while attempting to serialize registry\n";
00103           break;
00104       }
00105     }
00106 
00107    data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
00108    data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
00109    data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
00110    // calculate the adler32 checksum and fill it into the struct
00111    data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
00112    //std::cout << "Adler32 checksum of init message = " << data_buffer.adler32_chksum_ << std::endl;
00113    return data_buffer.curr_space_used_;
00114   }
00115 
00136   int StreamSerializer::serializeEvent(EventPrincipal const& eventPrincipal,
00137                                        ParameterSetID const& selectorConfig,
00138                                        bool use_compression, int compression_level,
00139                                        SerializeDataBuffer &data_buffer) {
00140     Parentage parentage;
00141 
00142     EventSelectionIDVector selectionIDs = eventPrincipal.eventSelectionIDs();
00143     selectionIDs.push_back(selectorConfig);
00144     SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory(), selectionIDs, eventPrincipal.branchListIndexes());
00145 
00146     Selections::const_iterator i(selections_->begin()),ie(selections_->end());
00147     // Loop over EDProducts, fill the provenance, and write.
00148 
00149     for(Selections::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) {
00150       BranchDescription const& desc = **i;
00151       BranchID const& id = desc.branchID();
00152 
00153       OutputHandle const oh = eventPrincipal.getForOutput(id, true);
00154       if(!oh.productProvenance()) {
00155         // No product with this ID was put in the event.
00156         // Create and write the provenance.
00157         se.products().push_back(StreamedProduct(desc));
00158       } else {
00159         bool found = ParentageRegistry::instance()->getMapped(oh.productProvenance()->parentageID(), parentage);
00160         assert(found);
00161         se.products().push_back(StreamedProduct(oh.wrapper(),
00162                                                 desc,
00163                                                 oh.wrapper() != 0,
00164                                                 &parentage.parents()));
00165       }
00166     }
00167 
00168     data_buffer.rootbuf_.Reset();
00169     RootDebug tracer(10,10);
00170 
00171     //TClass* tc = getTClass(typeid(SendEvent));
00172     int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
00173     switch(bres) {
00174       case 0: // failure
00175         {
00176           throw cms::Exception("StreamTranslation","Event serialization failed")
00177             << "StreamSerializer failed to serialize event: "
00178             << eventPrincipal.id();
00179           break;
00180         }
00181       case 1: // succcess
00182         break;
00183       case 2: // truncated result
00184         {
00185           throw cms::Exception("StreamTranslation","Event serialization truncated")
00186             << "StreamSerializer module attempted to serialize an event\n"
00187             << "that is to big for the allocated buffers: "
00188             << eventPrincipal.id();
00189           break;
00190         }
00191     default: // unknown
00192         {
00193           throw cms::Exception("StreamTranslation","Event serialization failed")
00194             << "StreamSerializer module got an unknown error code\n"
00195             << " while attempting to serialize event: "
00196             << eventPrincipal.id();
00197           break;
00198         }
00199       }
00200 
00201    data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
00202    data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
00203    data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
00204 #if 0
00205    if(data_buffer.ptr_ != data_.ptr_) {
00206         std::cerr << "ROOT reset the buffer!!!!\n";
00207         data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
00208         }
00209 #endif
00210    // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
00211    // eventMessage.eventAddr());
00212    // eventMessage.setEventLength(rootbuf.Length());
00213 
00214     // compress before return if we need to
00215     // should test if compressed already - should never be?
00216     //   as double compression can have problems
00217     if(use_compression) {
00218       unsigned int dest_size =
00219         compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
00220       if(dest_size != 0) {
00221         data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
00222         data_buffer.curr_space_used_ = dest_size;
00223       }
00224     }
00225     // calculate the adler32 checksum and fill it into the struct
00226     data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
00227     //std::cout << "Adler32 checksum of event = " << data_buffer.adler32_chksum_ << std::endl;
00228 
00229     return data_buffer.curr_space_used_;
00230   }
00231 
00237   unsigned int
00238   StreamSerializer::compressBuffer(unsigned char *inputBuffer,
00239                                    unsigned int inputSize,
00240                                    std::vector<unsigned char> &outputBuffer,
00241                                    int compressionLevel) {
00242     unsigned int resultSize = 0;
00243 
00244     // what are these magic numbers? (jbk)
00245     unsigned long dest_size = (unsigned long)(double(inputSize)*
00246                                               1.002 + 1.0) + 12;
00247     if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
00248 
00249     // compression 1-9, 6 is zlib default, 0 none
00250     int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
00251                         inputSize, compressionLevel);
00252 
00253     // check status
00254     if(ret == Z_OK) {
00255         // return the correct length
00256         resultSize = dest_size;
00257 
00258         FDEBUG(1) << " original size = " << inputSize
00259                   << " final size = " << dest_size
00260                   << " ratio = " << double(dest_size)/double(inputSize)
00261                   << std::endl;
00262     } else {
00263         // compression failed, return a size of zero
00264         FDEBUG(9) << "Compression Return value: " << ret
00265                   << " Okay = " << Z_OK << std::endl;
00266         // do we throw an exception here?
00267         std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
00268 
00269     }
00270 
00271     return resultSize;
00272   }
00273 }