00001
00007 #include "IOPool/Streamer/interface/StreamSerializer.h"
00008 #include "DataFormats/Provenance/interface/BranchDescription.h"
00009 #include "DataFormats/Provenance/interface/BranchID.h"
00010 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00011 #include "DataFormats/Provenance/interface/EventEntryDescription.h"
00012 #include "DataFormats/Provenance/interface/EventEntryInfo.h"
00013 #include "DataFormats/Provenance/interface/ModuleDescriptionRegistry.h"
00014 #include "TClass.h"
00015 #include "IOPool/Streamer/interface/ClassFiller.h"
00016 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
00017 #include "FWCore/Framework/interface/ConstProductRegistry.h"
00018 #include "FWCore/Framework/interface/EventPrincipal.h"
00019 #include "FWCore/ParameterSet/interface/Registry.h"
00020 #include "FWCore/Utilities/interface/WrappedClassName.h"
00021 #include "DataFormats/Streamer/interface/StreamedProducts.h"
00022 #include "DataFormats/Common/interface/OutputHandle.h"
00023 #include "FWCore/ServiceRegistry/interface/Service.h"
00024
00025 #include "zlib.h"
00026 #include <cstdlib>
00027 #include <list>
00028
00029 namespace edm
00030 {
00031
00032 StreamSerializer::Arr::Arr(int sz):ptr_((char*)malloc(sz)) { }
00033 StreamSerializer::Arr::~Arr() { free(ptr_); }
00034
00038 StreamSerializer::StreamSerializer(Selections const* selections):
00039 selections_(selections),
00040 tc_(getTClass(typeid(SendEvent)))
00041 { }
00042
00048 int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer)
00049 {
00050 FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl;
00051 SendJobHeader sd;
00052
00053 Selections::const_iterator i(selections_->begin()), e(selections_->end());
00054
00055 FDEBUG(9) << "Product List: " << std::endl;
00056
00057
00058 for(; i != e; ++i) {
00059 sd.push_back(**i);
00060 FDEBUG(9) << "StreamOutput got product = " << (*i)->className()
00061 << std::endl;
00062 }
00063 edm::Service<edm::ConstProductRegistry> reg;
00064 sd.setNextID(reg->nextID());
00065 sd.setModuleDescriptionMap(ModuleDescriptionRegistry::instance()->data());
00066 SendJobHeader::ParameterSetMap psetMap;
00067
00068 pset::Registry const* psetRegistry = pset::Registry::instance();
00069 for (pset::Registry::const_iterator it = psetRegistry->begin(), itEnd = psetRegistry->end(); it != itEnd; ++it) {
00070 psetMap.insert(std::make_pair(it->first, ParameterSetBlob(it->second.toStringOfTracked())));
00071 }
00072 sd.setParameterSetMap(psetMap);
00073
00074 data_buffer.rootbuf_.Reset();
00075
00076 RootDebug tracer(10,10);
00077
00078 TClass* tc = getTClass(typeid(SendJobHeader));
00079 int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc);
00080
00081 switch(bres)
00082 {
00083 case 0:
00084 {
00085 throw cms::Exception("StreamTranslation","Registry serialization failed")
00086 << "StreamSerializer failed to serialize registry\n";
00087 break;
00088 }
00089 case 1:
00090 break;
00091 case 2:
00092 {
00093 throw cms::Exception("StreamTranslation","Registry serialization truncated")
00094 << "StreamSerializer module attempted to serialize\n"
00095 << "a registry that is to big for the allocated buffers\n";
00096 break;
00097 }
00098 default:
00099 {
00100 throw cms::Exception("StreamTranslation","Registry serialization failed")
00101 << "StreamSerializer module got an unknown error code\n"
00102 << " while attempting to serialize registry\n";
00103 break;
00104 }
00105 }
00106
00107 data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
00108 data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
00109 data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
00110 return data_buffer.curr_space_used_;
00111 }
00112
00133 int StreamSerializer::serializeEvent(EventPrincipal const& eventPrincipal,
00134 bool use_compression, int compression_level,
00135 SerializeDataBuffer &data_buffer)
00136
00137 {
00138 EventEntryDescription entryDesc;
00139
00140 SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory());
00141
00142 Selections::const_iterator i(selections_->begin()),ie(selections_->end());
00143
00144
00145 for(Selections::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) {
00146 BranchDescription const& desc = **i;
00147 BranchID const& id = desc.branchID();
00148
00149 OutputHandle<EventEntryInfo> const oh = eventPrincipal.getForOutput<EventEntryInfo>(id, true);
00150 if (!oh.entryInfo()) {
00151
00152
00153 se.products().push_back(StreamedProduct(desc));
00154 } else {
00155 bool found = EntryDescriptionRegistry::instance()->getMapped(oh.entryInfoSharedPtr()->entryDescriptionID(), entryDesc);
00156 assert (found);
00157 se.products().push_back(StreamedProduct(oh.wrapper(),
00158 desc,
00159 entryDesc.moduleDescriptionID(),
00160 oh.entryInfoSharedPtr()->productID(),
00161 oh.entryInfoSharedPtr()->productStatus(),
00162 &entryDesc.parents()));
00163 }
00164 }
00165
00166 data_buffer.rootbuf_.Reset();
00167 RootDebug tracer(10,10);
00168
00169
00170 int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_);
00171 switch(bres)
00172 {
00173 case 0:
00174 {
00175 throw cms::Exception("StreamTranslation","Event serialization failed")
00176 << "StreamSerializer failed to serialize event: "
00177 << eventPrincipal.id();
00178 break;
00179 }
00180 case 1:
00181 break;
00182 case 2:
00183 {
00184 throw cms::Exception("StreamTranslation","Event serialization truncated")
00185 << "StreamSerializer module attempted to serialize an event\n"
00186 << "that is to big for the allocated buffers: "
00187 << eventPrincipal.id();
00188 break;
00189 }
00190 default:
00191 {
00192 throw cms::Exception("StreamTranslation","Event serialization failed")
00193 << "StreamSerializer module got an unknown error code\n"
00194 << " while attempting to serialize event: "
00195 << eventPrincipal.id();
00196 break;
00197 }
00198 }
00199
00200 data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length();
00201 data_buffer.curr_space_used_ = data_buffer.curr_event_size_;
00202 data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer();
00203 #if 0
00204 if(data_buffer.ptr_ != data_.ptr_)
00205 {
00206 std::cerr << "ROOT reset the buffer!!!!\n";
00207 data_.ptr_ = data_buffer.ptr_;
00208 }
00209 #endif
00210
00211
00212
00213
00214
00215
00216
00217 if(use_compression)
00218 {
00219 unsigned int dest_size =
00220 compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level);
00221 if(dest_size != 0)
00222 {
00223 data_buffer.ptr_ = &data_buffer.comp_buf_[0];
00224 data_buffer.curr_space_used_ = dest_size;
00225 }
00226 }
00227
00228 return data_buffer.curr_space_used_;
00229 }
00230
00236 unsigned int
00237 StreamSerializer::compressBuffer(unsigned char *inputBuffer,
00238 unsigned int inputSize,
00239 std::vector<unsigned char> &outputBuffer,
00240 int compressionLevel)
00241 {
00242 unsigned int resultSize = 0;
00243
00244
00245 unsigned long dest_size = (unsigned long)(double(inputSize)*
00246 1.002 + 1.0) + 12;
00247 if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size);
00248
00249
00250 int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer,
00251 inputSize, compressionLevel);
00252
00253
00254 if(ret == Z_OK)
00255 {
00256
00257 resultSize = dest_size;
00258
00259 FDEBUG(1) << " original size = " << inputSize
00260 << " final size = " << dest_size
00261 << " ratio = " << double(dest_size)/double(inputSize)
00262 << std::endl;
00263 }
00264 else
00265 {
00266
00267 FDEBUG(9) <<"Compression Return value: "<<ret
00268 << " Okay = " << Z_OK << std::endl;
00269
00270 std::cerr <<"Compression Return value: "<<ret<< " Okay = " << Z_OK << std::endl;
00271 }
00272
00273 return resultSize;
00274 }
00275 }