Go to the documentation of this file.00001
00007 #include "IOPool/Streamer/interface/StreamSerializer.h"
00008 #include "DataFormats/Provenance/interface/BranchDescription.h"
00009 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00010 #include "DataFormats/Provenance/interface/Parentage.h"
00011 #include "DataFormats/Provenance/interface/ProductProvenance.h"
00012 #include "DataFormats/Provenance/interface/BranchIDListRegistry.h"
00013 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00014 #include "DataFormats/Provenance/interface/EventSelectionID.h"
00015 #include "DataFormats/Provenance/interface/BranchListIndex.h"
00016 #include "IOPool/Streamer/interface/ClassFiller.h"
00017 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
00018 #include "FWCore/Framework/interface/ConstProductRegistry.h"
00019 #include "FWCore/Framework/interface/EventPrincipal.h"
00020 #include "FWCore/ParameterSet/interface/Registry.h"
00021 #include "FWCore/Utilities/interface/Adler32Calculator.h"
00022 #include "DataFormats/Streamer/interface/StreamedProducts.h"
00023 #include "DataFormats/Common/interface/OutputHandle.h"
00024 #include "FWCore/ServiceRegistry/interface/Service.h"
00025
00026 #include "zlib.h"
00027 #include <cstdlib>
00028 #include <list>
00029
00030 namespace edm {
00031
00032 StreamSerializer::Arr::Arr(int sz):ptr_((char*)malloc(sz)) { }
00033 StreamSerializer::Arr::~Arr() { free(ptr_); }
00034
00038 StreamSerializer::StreamSerializer(Selections const* selections):
00039 selections_(selections),
00040 tc_(getTClass(typeid(SendEvent))) {
00041 }
00042
00048 int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer) {
00049 FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
00050 SendJobHeader sd;
00051
00052 Selections::const_iterator i(selections_->begin()), e(selections_->end());
00053
00054 FDEBUG(9) << "Product List: " << std::endl;
00055
00056
00057 for(; i != e; ++i) {
00058 sd.push_back(**i);
00059 FDEBUG(9) << "StreamOutput got product = " << (*i)->className()
00060 << std::endl;
00061 }
00062 Service<ConstProductRegistry> reg;
00063 sd.setBranchIDLists(BranchIDListRegistry::instance()->data());
00064 SendJobHeader::ParameterSetMap psetMap;
00065
00066 pset::fillMap(pset::Registry::instance(), psetMap);
00067 sd.setParameterSetMap(psetMap);
00068
00069 typedef ProcessConfigurationRegistry::collection_type PCMap;
00070 PCMap const& procConfigMap = ProcessConfigurationRegistry::instance()->data();
00071 ProcessConfigurationVector procConfigVector;
00072 for (PCMap::const_iterator i = procConfigMap.begin(), e = procConfigMap.end(); i != e; ++i) {
00073 procConfigVector.push_back(i->second);
00074 }
00075 sort_all(procConfigVector);
00076 sd.setProcessConfigurations(procConfigVector);
00077
00078 data_buffer.rootbuf_.Reset();
00079
00080 RootDebug tracer(10,10);
00081
00082 TClass* tc = getTClass(typeid(SendJobHeader));
00083 int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
00084
00085 switch(bres) {
00086 case 0:
00087 {
00088 throw cms::Exception("StreamTranslation","Registry serialization failed")
00089 << "StreamSerializer failed to serialize registry\n";
00090 break;
00091 }
00092 case 1:
00093 break;
00094 case 2:
00095 {
00096 throw cms::Exception("StreamTranslation","Registry serialization truncated")
00097 << "StreamSerializer module attempted to serialize\n"
00098 << "a registry that is to big for the allocated buffers\n";
00099 break;
00100 }
00101 default:
00102 {
00103 throw cms::Exception("StreamTranslation","Registry serialization failed")
00104 << "StreamSerializer module got an unknown error code\n"
00105 << " while attempting to serialize registry\n";
00106 break;
00107 }
00108 }
00109
00110 data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
00111 data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
00112 data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
00113
00114 data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
00115
00116 return data_buffer.curr_space_used_;
00117 }
00118
00139 int StreamSerializer::serializeEvent(EventPrincipal const& eventPrincipal,
00140 ParameterSetID const& selectorConfig,
00141 bool use_compression, int compression_level,
00142 SerializeDataBuffer &data_buffer) {
00143 Parentage parentage;
00144
00145 EventSelectionIDVector selectionIDs = eventPrincipal.eventSelectionIDs();
00146 selectionIDs.push_back(selectorConfig);
00147 SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory(), selectionIDs, eventPrincipal.branchListIndexes());
00148
00149 Selections::const_iterator i(selections_->begin()),ie(selections_->end());
00150
00151
00152 for(Selections::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) {
00153 BranchDescription const& desc = **i;
00154 BranchID const& id = desc.branchID();
00155
00156 OutputHandle const oh = eventPrincipal.getForOutput(id, true);
00157 if (!oh.productProvenance()) {
00158
00159
00160 se.products().push_back(StreamedProduct(desc));
00161 } else {
00162 bool found = ParentageRegistry::instance()->getMapped(oh.productProvenanceSharedPtr()->parentageID(), parentage);
00163 assert (found);
00164 se.products().push_back(StreamedProduct(oh.wrapper(),
00165 desc,
00166 oh.productProvenanceSharedPtr()->productStatus(),
00167 &parentage.parents()));
00168 }
00169 }
00170
00171 data_buffer.rootbuf_.Reset();
00172 RootDebug tracer(10,10);
00173
00174
00175 int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
00176 switch(bres) {
00177 case 0:
00178 {
00179 throw cms::Exception("StreamTranslation","Event serialization failed")
00180 << "StreamSerializer failed to serialize event: "
00181 << eventPrincipal.id();
00182 break;
00183 }
00184 case 1:
00185 break;
00186 case 2:
00187 {
00188 throw cms::Exception("StreamTranslation","Event serialization truncated")
00189 << "StreamSerializer module attempted to serialize an event\n"
00190 << "that is to big for the allocated buffers: "
00191 << eventPrincipal.id();
00192 break;
00193 }
00194 default:
00195 {
00196 throw cms::Exception("StreamTranslation","Event serialization failed")
00197 << "StreamSerializer module got an unknown error code\n"
00198 << " while attempting to serialize event: "
00199 << eventPrincipal.id();
00200 break;
00201 }
00202 }
00203
00204 data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
00205 data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
00206 data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
00207 #if 0
00208 if(data_buffer.ptr_ != data_.ptr_) {
00209 std::cerr << "ROOT reset the buffer!!!!\n";
00210 data_.ptr_ = data_buffer.ptr_;
00211 }
00212 #endif
00213
00214
00215
00216
00217
00218
00219
00220 if(use_compression) {
00221 unsigned int dest_size =
00222 compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
00223 if(dest_size != 0) {
00224 data_buffer.ptr_ = &data_buffer.comp_buf_[0];
00225 data_buffer.curr_space_used_ = dest_size;
00226 }
00227 }
00228
00229 data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
00230
00231
00232 return data_buffer.curr_space_used_;
00233 }
00234
00240 unsigned int
00241 StreamSerializer::compressBuffer(unsigned char *inputBuffer,
00242 unsigned int inputSize,
00243 std::vector<unsigned char> &outputBuffer,
00244 int compressionLevel) {
00245 unsigned int resultSize = 0;
00246
00247
00248 unsigned long dest_size = (unsigned long)(double(inputSize)*
00249 1.002 + 1.0) + 12;
00250 if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
00251
00252
00253 int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
00254 inputSize, compressionLevel);
00255
00256
00257 if(ret == Z_OK) {
00258
00259 resultSize = dest_size;
00260
00261 FDEBUG(1) << " original size = " << inputSize
00262 << " final size = " << dest_size
00263 << " ratio = " << double(dest_size)/double(inputSize)
00264 << std::endl;
00265 } else {
00266
00267 FDEBUG(9) << "Compression Return value: " << ret
00268 << " Okay = " << Z_OK << std::endl;
00269
00270 std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
00271
00272 }
00273
00274 return resultSize;
00275 }
00276 }