Go to the documentation of this file.00001
00007 #include "IOPool/Streamer/interface/StreamSerializer.h"
00008 #include "DataFormats/Provenance/interface/BranchDescription.h"
00009 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
00010 #include "DataFormats/Provenance/interface/Parentage.h"
00011 #include "DataFormats/Provenance/interface/ProductProvenance.h"
00012 #include "DataFormats/Provenance/interface/BranchIDListRegistry.h"
00013 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00014 #include "DataFormats/Provenance/interface/EventSelectionID.h"
00015 #include "DataFormats/Provenance/interface/BranchListIndex.h"
00016 #include "IOPool/Streamer/interface/ClassFiller.h"
00017 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
00018 #include "FWCore/Framework/interface/ConstProductRegistry.h"
00019 #include "FWCore/Framework/interface/EventPrincipal.h"
00020 #include "FWCore/ParameterSet/interface/Registry.h"
00021 #include "FWCore/Utilities/interface/Adler32Calculator.h"
00022 #include "DataFormats/Streamer/interface/StreamedProducts.h"
00023 #include "DataFormats/Common/interface/OutputHandle.h"
00024 #include "FWCore/ServiceRegistry/interface/Service.h"
00025
00026 #include "zlib.h"
00027 #include <cstdlib>
00028 #include <list>
00029
00030 namespace edm {
00031
00035 StreamSerializer::StreamSerializer(Selections const* selections):
00036 selections_(selections),
00037 tc_(getTClass(typeid(SendEvent))) {
00038 }
00039
00045 int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer) {
00046 FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
00047 SendJobHeader sd;
00048
00049 Selections::const_iterator i(selections_->begin()), e(selections_->end());
00050
00051 FDEBUG(9) << "Product List: " << std::endl;
00052
00053
00054 for(; i != e; ++i) {
00055 sd.push_back(**i);
00056 FDEBUG(9) << "StreamOutput got product = " << (*i)->className()
00057 << std::endl;
00058 }
00059 Service<ConstProductRegistry> reg;
00060 sd.setBranchIDLists(BranchIDListRegistry::instance()->data());
00061 SendJobHeader::ParameterSetMap psetMap;
00062
00063 pset::fillMap(pset::Registry::instance(), psetMap);
00064 sd.setParameterSetMap(psetMap);
00065
00066 typedef ProcessConfigurationRegistry::collection_type PCMap;
00067 PCMap const& procConfigMap = ProcessConfigurationRegistry::instance()->data();
00068 ProcessConfigurationVector procConfigVector;
00069 for(PCMap::const_iterator i = procConfigMap.begin(), e = procConfigMap.end(); i != e; ++i) {
00070 procConfigVector.push_back(i->second);
00071 }
00072 sort_all(procConfigVector);
00073 sd.setProcessConfigurations(procConfigVector);
00074
00075 data_buffer.rootbuf_.Reset();
00076
00077 RootDebug tracer(10,10);
00078
00079 TClass* tc = getTClass(typeid(SendJobHeader));
00080 int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
00081
00082 switch(bres) {
00083 case 0:
00084 {
00085 throw cms::Exception("StreamTranslation","Registry serialization failed")
00086 << "StreamSerializer failed to serialize registry\n";
00087 break;
00088 }
00089 case 1:
00090 break;
00091 case 2:
00092 {
00093 throw cms::Exception("StreamTranslation","Registry serialization truncated")
00094 << "StreamSerializer module attempted to serialize\n"
00095 << "a registry that is to big for the allocated buffers\n";
00096 break;
00097 }
00098 default:
00099 {
00100 throw cms::Exception("StreamTranslation","Registry serialization failed")
00101 << "StreamSerializer module got an unknown error code\n"
00102 << " while attempting to serialize registry\n";
00103 break;
00104 }
00105 }
00106
00107 data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
00108 data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
00109 data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
00110
00111 data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
00112
00113 return data_buffer.curr_space_used_;
00114 }
00115
00136 int StreamSerializer::serializeEvent(EventPrincipal const& eventPrincipal,
00137 ParameterSetID const& selectorConfig,
00138 bool use_compression, int compression_level,
00139 SerializeDataBuffer &data_buffer) {
00140 Parentage parentage;
00141
00142 EventSelectionIDVector selectionIDs = eventPrincipal.eventSelectionIDs();
00143 selectionIDs.push_back(selectorConfig);
00144 SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory(), selectionIDs, eventPrincipal.branchListIndexes());
00145
00146 Selections::const_iterator i(selections_->begin()),ie(selections_->end());
00147
00148
00149 for(Selections::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) {
00150 BranchDescription const& desc = **i;
00151 BranchID const& id = desc.branchID();
00152
00153 OutputHandle const oh = eventPrincipal.getForOutput(id, true);
00154 if(!oh.productProvenance()) {
00155
00156
00157 se.products().push_back(StreamedProduct(desc));
00158 } else {
00159 bool found = ParentageRegistry::instance()->getMapped(oh.productProvenance()->parentageID(), parentage);
00160 assert(found);
00161 se.products().push_back(StreamedProduct(oh.wrapper(),
00162 desc,
00163 oh.wrapper() != 0,
00164 &parentage.parents()));
00165 }
00166 }
00167
00168 data_buffer.rootbuf_.Reset();
00169 RootDebug tracer(10,10);
00170
00171
00172 int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
00173 switch(bres) {
00174 case 0:
00175 {
00176 throw cms::Exception("StreamTranslation","Event serialization failed")
00177 << "StreamSerializer failed to serialize event: "
00178 << eventPrincipal.id();
00179 break;
00180 }
00181 case 1:
00182 break;
00183 case 2:
00184 {
00185 throw cms::Exception("StreamTranslation","Event serialization truncated")
00186 << "StreamSerializer module attempted to serialize an event\n"
00187 << "that is to big for the allocated buffers: "
00188 << eventPrincipal.id();
00189 break;
00190 }
00191 default:
00192 {
00193 throw cms::Exception("StreamTranslation","Event serialization failed")
00194 << "StreamSerializer module got an unknown error code\n"
00195 << " while attempting to serialize event: "
00196 << eventPrincipal.id();
00197 break;
00198 }
00199 }
00200
00201 data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
00202 data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
00203 data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
00204 #if 0
00205 if(data_buffer.ptr_ != data_.ptr_) {
00206 std::cerr << "ROOT reset the buffer!!!!\n";
00207 data_.ptr_ = data_buffer.ptr_;
00208 }
00209 #endif
00210
00211
00212
00213
00214
00215
00216
00217 if(use_compression) {
00218 unsigned int dest_size =
00219 compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
00220 if(dest_size != 0) {
00221 data_buffer.ptr_ = &data_buffer.comp_buf_[0];
00222 data_buffer.curr_space_used_ = dest_size;
00223 }
00224 }
00225
00226 data_buffer.adler32_chksum_ = cms::Adler32((char*)data_buffer.ptr_, data_buffer.curr_space_used_);
00227
00228
00229 return data_buffer.curr_space_used_;
00230 }
00231
00237 unsigned int
00238 StreamSerializer::compressBuffer(unsigned char *inputBuffer,
00239 unsigned int inputSize,
00240 std::vector<unsigned char> &outputBuffer,
00241 int compressionLevel) {
00242 unsigned int resultSize = 0;
00243
00244
00245 unsigned long dest_size = (unsigned long)(double(inputSize)*
00246 1.002 + 1.0) + 12;
00247 if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
00248
00249
00250 int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
00251 inputSize, compressionLevel);
00252
00253
00254 if(ret == Z_OK) {
00255
00256 resultSize = dest_size;
00257
00258 FDEBUG(1) << " original size = " << inputSize
00259 << " final size = " << dest_size
00260 << " ratio = " << double(dest_size)/double(inputSize)
00261 << std::endl;
00262 } else {
00263
00264 FDEBUG(9) << "Compression Return value: " << ret
00265 << " Okay = " << Z_OK << std::endl;
00266
00267 std::cerr << "Compression Return value: " << ret << " Okay = " << Z_OK << std::endl;
00268
00269 }
00270
00271 return resultSize;
00272 }
00273 }