CMS 3D CMS Logo

StreamSerializer.cc

Go to the documentation of this file.
00001 
00007 #include "IOPool/Streamer/interface/StreamSerializer.h"
00008 #include "DataFormats/Provenance/interface/BranchDescription.h"
00009 #include "DataFormats/Provenance/interface/BranchID.h"
00010 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00011 #include "DataFormats/Provenance/interface/EventEntryDescription.h"
00012 #include "DataFormats/Provenance/interface/EventEntryInfo.h"
00013 #include "DataFormats/Provenance/interface/ModuleDescriptionRegistry.h"
00014 #include "TClass.h"
00015 #include "IOPool/Streamer/interface/ClassFiller.h"
00016 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
00017 #include "FWCore/Framework/interface/ConstProductRegistry.h"
00018 #include "FWCore/Framework/interface/EventPrincipal.h"
00019 #include "FWCore/ParameterSet/interface/Registry.h"
00020 #include "FWCore/Utilities/interface/WrappedClassName.h"
00021 #include "DataFormats/Streamer/interface/StreamedProducts.h"
00022 #include "DataFormats/Common/interface/OutputHandle.h"
00023 #include "FWCore/ServiceRegistry/interface/Service.h"
00024 
00025 #include "zlib.h"
00026 #include <cstdlib>
00027 #include <list>
00028 
00029 namespace edm
00030 {
00031 
00032   StreamSerializer::Arr::Arr(int sz):ptr_((char*)malloc(sz)) { }
00033   StreamSerializer::Arr::~Arr() { free(ptr_); }
00034 
00038   StreamSerializer::StreamSerializer(Selections const* selections):
00039     selections_(selections),
00040     tc_(getTClass(typeid(SendEvent)))
00041   { }
00042 
00048   int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer)
00049   {
00050     FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
00051     SendJobHeader sd;
00052 
00053     Selections::const_iterator i(selections_->begin()), e(selections_->end());
00054 
00055     FDEBUG(9) << "Product List: " << std::endl;
00056 
00057 
00058     for(; i != e; ++i)  {
00059         sd.push_back(**i);
00060         FDEBUG(9) << "StreamOutput got product = " << (*i)->className()
00061                   << std::endl;
00062     }
00063     edm::Service<edm::ConstProductRegistry> reg;
00064     sd.setNextID(reg->nextID());
00065     sd.setModuleDescriptionMap(ModuleDescriptionRegistry::instance()->data());
00066     SendJobHeader::ParameterSetMap psetMap;
00067 
00068     pset::Registry const* psetRegistry = pset::Registry::instance();
00069     for (pset::Registry::const_iterator it = psetRegistry->begin(), itEnd = psetRegistry->end(); it != itEnd; ++it) {
00070       psetMap.insert(std::make_pair(it->first, ParameterSetBlob(it->second.toStringOfTracked())));
00071     }
00072     sd.setParameterSetMap(psetMap);
00073 
00074     data_buffer.rootbuf_.Reset();
00075 
00076     RootDebug tracer(10,10);
00077 
00078     TClass* tc = getTClass(typeid(SendJobHeader));
00079     int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
00080 
00081     switch(bres)
00082     {
00083       case 0: // failure
00084       {
00085           throw cms::Exception("StreamTranslation","Registry serialization failed")
00086             << "StreamSerializer failed to serialize registry\n";
00087           break;
00088       }
00089       case 1: // succcess
00090         break;
00091       case 2: // truncated result
00092       {
00093           throw cms::Exception("StreamTranslation","Registry serialization truncated")
00094             << "StreamSerializer module attempted to serialize\n"
00095             << "a registry that is to big for the allocated buffers\n";
00096           break;
00097       }
00098       default: // unknown
00099       {
00100           throw cms::Exception("StreamTranslation","Registry serialization failed")
00101             << "StreamSerializer module got an unknown error code\n"
00102             << " while attempting to serialize registry\n";
00103           break;
00104       }
00105     }
00106 
00107    data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
00108    data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
00109    data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
00110    return data_buffer.curr_space_used_;
00111   }
00112 
00133   int StreamSerializer::serializeEvent(EventPrincipal const& eventPrincipal,
00134                                        bool use_compression, int compression_level,
00135                                        SerializeDataBuffer &data_buffer)
00136 
00137   {
00138     EventEntryDescription entryDesc;
00139     
00140     SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory());
00141 
00142     Selections::const_iterator i(selections_->begin()),ie(selections_->end());
00143     // Loop over EDProducts, fill the provenance, and write.
00144 
00145     for(Selections::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) {
00146       BranchDescription const& desc = **i;
00147       BranchID const& id = desc.branchID();
00148 
00149       OutputHandle<EventEntryInfo> const oh = eventPrincipal.getForOutput<EventEntryInfo>(id, true);
00150       if (!oh.entryInfo()) {
00151         // No product with this ID was put in the event.
00152         // Create and write the provenance.
00153         se.products().push_back(StreamedProduct(desc));
00154       } else {
00155         bool found = EntryDescriptionRegistry::instance()->getMapped(oh.entryInfoSharedPtr()->entryDescriptionID(), entryDesc);
00156         assert (found);
00157         se.products().push_back(StreamedProduct(oh.wrapper(),
00158                                                desc,
00159                                                entryDesc.moduleDescriptionID(),
00160                                                oh.entryInfoSharedPtr()->productID(),
00161                                                oh.entryInfoSharedPtr()->productStatus(),
00162                                                &entryDesc.parents()));
00163       }
00164     }
00165 
00166     data_buffer.rootbuf_.Reset();
00167     RootDebug tracer(10,10);
00168 
00169     //TClass* tc = getTClass(typeid(SendEvent));
00170     int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
00171     switch(bres)
00172       {
00173       case 0: // failure
00174         {
00175           throw cms::Exception("StreamTranslation","Event serialization failed")
00176             << "StreamSerializer failed to serialize event: "
00177             << eventPrincipal.id();
00178           break;
00179         }
00180       case 1: // succcess
00181         break;
00182       case 2: // truncated result
00183         {
00184           throw cms::Exception("StreamTranslation","Event serialization truncated")
00185             << "StreamSerializer module attempted to serialize an event\n"
00186             << "that is to big for the allocated buffers: "
00187             << eventPrincipal.id();
00188           break;
00189         }
00190     default: // unknown
00191         {
00192           throw cms::Exception("StreamTranslation","Event serialization failed")
00193             << "StreamSerializer module got an unknown error code\n"
00194             << " while attempting to serialize event: "
00195             << eventPrincipal.id();
00196           break;
00197         }
00198       }
00199    
00200    data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
00201    data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
00202    data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
00203 #if 0
00204    if(data_buffer.ptr_ != data_.ptr_)
00205         {
00206         std::cerr << "ROOT reset the buffer!!!!\n";
00207         data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!!
00208         }
00209 #endif
00210    // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(),
00211    //   eventMessage.eventAddr());
00212    // eventMessage.setEventLength(rootbuf.Length()); 
00213 
00214     // compress before return if we need to
00215     // should test if compressed already - should never be?
00216     //   as double compression can have problems
00217     if(use_compression)
00218     {
00219       unsigned int dest_size =
00220         compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
00221       if(dest_size != 0)
00222       {
00223         data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area
00224         data_buffer.curr_space_used_ = dest_size;
00225       }
00226     }
00227 
00228     return data_buffer.curr_space_used_;
00229   }
00230 
00236   unsigned int
00237   StreamSerializer::compressBuffer(unsigned char *inputBuffer,
00238                                    unsigned int inputSize,
00239                                    std::vector<unsigned char> &outputBuffer,
00240                                    int compressionLevel)
00241   {
00242     unsigned int resultSize = 0;
00243 
00244     // what are these magic numbers? (jbk)
00245     unsigned long dest_size = (unsigned long)(double(inputSize)*
00246                                               1.002 + 1.0) + 12;
00247     if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
00248 
00249     // compression 1-9, 6 is zlib default, 0 none
00250     int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
00251                         inputSize, compressionLevel);
00252 
00253     // check status
00254     if(ret == Z_OK)
00255       {
00256         // return the correct length
00257         resultSize = dest_size;
00258 
00259         FDEBUG(1) << " original size = " << inputSize
00260                   << " final size = " << dest_size
00261                   << " ratio = " << double(dest_size)/double(inputSize)
00262                   << std::endl;
00263       }
00264     else
00265       {
00266         // compression failed, return a size of zero
00267         FDEBUG(9) <<"Compression Return value: "<<ret
00268                   << " Okay = " << Z_OK << std::endl;
00269         // do we throw an exception here?
00270         std::cerr <<"Compression Return value: "<<ret<< " Okay = " << Z_OK << std::endl;
00271       }
00272 
00273     return resultSize;
00274   }
00275 }

Generated on Tue Jun 9 17:39:20 2009 for CMSSW by  doxygen 1.5.4