#include <IOPool/Streamer/interface/StreamSerializer.h>
Public Member Functions | |
int | serializeEvent (EventPrincipal const &eventPrincipal, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer) |
Serializes the specified event into the specified event message. | |
int | serializeRegistry (SerializeDataBuffer &data_buffer) |
Serializes the product registry (that was specified to the constructor) into the specified InitMessage. | |
StreamSerializer (Selections const *selections) | |
Creates a translator instance for the specified product registry. | |
Static Public Member Functions | |
static unsigned int | compressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, int compressionLevel) |
Compresses the data in the specified input buffer into the specified output buffer. | |
Private Attributes | |
Selections const * | selections_ |
TClass * | tc_ |
Classes | |
struct | Arr |
Definition at line 62 of file StreamSerializer.h.
edm::StreamSerializer::StreamSerializer | ( | Selections const * | selections | ) |
Creates a translator instance for the specified product registry.
Definition at line 38 of file StreamSerializer.cc.
00038 : 00039 selections_(selections), 00040 tc_(getTClass(typeid(SendEvent))) 00041 { }
unsigned int edm::StreamSerializer::compressBuffer | ( | unsigned char * | inputBuffer, | |
unsigned int | inputSize, | |||
std::vector< unsigned char > & | outputBuffer, | |||
int | compressionLevel | |||
) | [static] |
Compresses the data in the specified input buffer into the specified output buffer.
Returns the size of the compressed data or zero if compression failed.
Definition at line 237 of file StreamSerializer.cc.
References TestMuL1L2Filter_cff::cerr, lat::endl(), and FDEBUG.
Referenced by edm::StreamDQMSerializer::serializeDQMEvent(), and serializeEvent().
00241 { 00242 unsigned int resultSize = 0; 00243 00244 // what are these magic numbers? (jbk) 00245 unsigned long dest_size = (unsigned long)(double(inputSize)* 00246 1.002 + 1.0) + 12; 00247 if(outputBuffer.size() < dest_size) outputBuffer.resize(dest_size); 00248 00249 // compression 1-9, 6 is zlib default, 0 none 00250 int ret = compress2(&outputBuffer[0], &dest_size, inputBuffer, 00251 inputSize, compressionLevel); 00252 00253 // check status 00254 if(ret == Z_OK) 00255 { 00256 // return the correct length 00257 resultSize = dest_size; 00258 00259 FDEBUG(1) << " original size = " << inputSize 00260 << " final size = " << dest_size 00261 << " ratio = " << double(dest_size)/double(inputSize) 00262 << std::endl; 00263 } 00264 else 00265 { 00266 // compression failed, return a size of zero 00267 FDEBUG(9) <<"Compression Return value: "<<ret 00268 << " Okay = " << Z_OK << std::endl; 00269 // do we throw an exception here? 00270 std::cerr <<"Compression Return value: "<<ret<< " Okay = " << Z_OK << std::endl; 00271 } 00272 00273 return resultSize; 00274 }
int edm::StreamSerializer::serializeEvent | ( | EventPrincipal const & | eventPrincipal, | |
bool | use_compression, | |||
int | compression_level, | |||
SerializeDataBuffer & | data_buffer | |||
) |
Serializes the specified event into the specified event message.
make a char* as a data member, tell ROOT to not adapt it, but still use it. initialize it to 1M, let ROOT resize if it wants, then delete it in the dtor.
change the call to not take an eventMessage, add a member function to return the address of the place that ROOT wrote the serialized data.
return the length of the serialized object and the actual length if compression has been done (may want to cache these lengths in this object instead.
the caller will need to copy the data from this object to its final destination in the EventMsgBuilder.
Definition at line 133 of file StreamSerializer.cc.
References edm::EventPrincipal::aux(), Selections::begin(), edm::BranchDescription::branchID(), TestMuL1L2Filter_cff::cerr, SerializeDataBuffer::comp_buf_, compressBuffer(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, Selections::end(), edm::OutputHandle< T >::entryInfo(), edm::OutputHandle< T >::entryInfoSharedPtr(), Exception, edm::Principal::getForOutput(), i, edm::EventPrincipal::id(), edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), edm::EventEntryDescription::moduleDescriptionID(), edm::EventEntryDescription::parents(), edm::Principal::processHistory(), SerializeDataBuffer::ptr_, SerializeDataBuffer::rootbuf_, selections_, tc_, and edm::OutputHandle< T >::wrapper().
Referenced by edm::StreamerOutputModuleBase::serializeEvent().
00137 { 00138 EventEntryDescription entryDesc; 00139 00140 SendEvent se(eventPrincipal.aux(), eventPrincipal.processHistory()); 00141 00142 Selections::const_iterator i(selections_->begin()),ie(selections_->end()); 00143 // Loop over EDProducts, fill the provenance, and write. 00144 00145 for(Selections::const_iterator i = selections_->begin(), iEnd = selections_->end(); i != iEnd; ++i) { 00146 BranchDescription const& desc = **i; 00147 BranchID const& id = desc.branchID(); 00148 00149 OutputHandle<EventEntryInfo> const oh = eventPrincipal.getForOutput<EventEntryInfo>(id, true); 00150 if (!oh.entryInfo()) { 00151 // No product with this ID was put in the event. 00152 // Create and write the provenance. 00153 se.products().push_back(StreamedProduct(desc)); 00154 } else { 00155 bool found = EntryDescriptionRegistry::instance()->getMapped(oh.entryInfoSharedPtr()->entryDescriptionID(), entryDesc); 00156 assert (found); 00157 se.products().push_back(StreamedProduct(oh.wrapper(), 00158 desc, 00159 entryDesc.moduleDescriptionID(), 00160 oh.entryInfoSharedPtr()->productID(), 00161 oh.entryInfoSharedPtr()->productStatus(), 00162 &entryDesc.parents())); 00163 } 00164 } 00165 00166 data_buffer.rootbuf_.Reset(); 00167 RootDebug tracer(10,10); 00168 00169 //TClass* tc = getTClass(typeid(SendEvent)); 00170 int bres = data_buffer.rootbuf_.WriteObjectAny(&se,tc_); 00171 switch(bres) 00172 { 00173 case 0: // failure 00174 { 00175 throw cms::Exception("StreamTranslation","Event serialization failed") 00176 << "StreamSerializer failed to serialize event: " 00177 << eventPrincipal.id(); 00178 break; 00179 } 00180 case 1: // succcess 00181 break; 00182 case 2: // truncated result 00183 { 00184 throw cms::Exception("StreamTranslation","Event serialization truncated") 00185 << "StreamSerializer module attempted to serialize an event\n" 00186 << "that is to big for the allocated buffers: " 00187 << eventPrincipal.id(); 00188 break; 00189 } 00190 default: // unknown 00191 { 00192 throw cms::Exception("StreamTranslation","Event serialization failed") 00193 << "StreamSerializer module got an unknown error code\n" 00194 << " while attempting to serialize event: " 00195 << eventPrincipal.id(); 00196 break; 00197 } 00198 } 00199 00200 data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length(); 00201 data_buffer.curr_space_used_ = data_buffer.curr_event_size_; 00202 data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer(); 00203 #if 0 00204 if(data_buffer.ptr_ != data_.ptr_) 00205 { 00206 std::cerr << "ROOT reset the buffer!!!!\n"; 00207 data_.ptr_ = data_buffer.ptr_; // ROOT may have reset our data pointer!!!! 00208 } 00209 #endif 00210 // std::copy(rootbuf_.Buffer(),rootbuf_.Buffer()+rootbuf_.Length(), 00211 // eventMessage.eventAddr()); 00212 // eventMessage.setEventLength(rootbuf.Length()); 00213 00214 // compress before return if we need to 00215 // should test if compressed already - should never be? 00216 // as double compression can have problems 00217 if(use_compression) 00218 { 00219 unsigned int dest_size = 00220 compressBuffer(data_buffer.ptr_, data_buffer.curr_event_size_, data_buffer.comp_buf_, compression_level); 00221 if(dest_size != 0) 00222 { 00223 data_buffer.ptr_ = &data_buffer.comp_buf_[0]; // reset to point at compressed area 00224 data_buffer.curr_space_used_ = dest_size; 00225 } 00226 } 00227 00228 return data_buffer.curr_space_used_; 00229 }
int edm::StreamSerializer::serializeRegistry | ( | SerializeDataBuffer & | data_buffer | ) |
Serializes the product registry (that was specified to the constructor) into the specified InitMessage.
Definition at line 48 of file StreamSerializer.cc.
References Selections::begin(), edm::detail::ThreadSafeRegistry< KEY, T, E >::begin(), SerializeDataBuffer::curr_event_size_, SerializeDataBuffer::curr_space_used_, data, e, edm::detail::ThreadSafeRegistry< KEY, T, E >::end(), Selections::end(), lat::endl(), Exception, FDEBUG, edm::getTClass(), i, edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), it, SerializeDataBuffer::ptr_, edm::SendJobHeader::push_back(), SerializeDataBuffer::rootbuf_, selections_, edm::SendJobHeader::setModuleDescriptionMap(), edm::SendJobHeader::setNextID(), edm::SendJobHeader::setParameterSetMap(), and tc.
Referenced by edm::StreamerOutputModuleBase::serializeRegistry().
00049 { 00050 FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl; 00051 SendJobHeader sd; 00052 00053 Selections::const_iterator i(selections_->begin()), e(selections_->end()); 00054 00055 FDEBUG(9) << "Product List: " << std::endl; 00056 00057 00058 for(; i != e; ++i) { 00059 sd.push_back(**i); 00060 FDEBUG(9) << "StreamOutput got product = " << (*i)->className() 00061 << std::endl; 00062 } 00063 edm::Service<edm::ConstProductRegistry> reg; 00064 sd.setNextID(reg->nextID()); 00065 sd.setModuleDescriptionMap(ModuleDescriptionRegistry::instance()->data()); 00066 SendJobHeader::ParameterSetMap psetMap; 00067 00068 pset::Registry const* psetRegistry = pset::Registry::instance(); 00069 for (pset::Registry::const_iterator it = psetRegistry->begin(), itEnd = psetRegistry->end(); it != itEnd; ++it) { 00070 psetMap.insert(std::make_pair(it->first, ParameterSetBlob(it->second.toStringOfTracked()))); 00071 } 00072 sd.setParameterSetMap(psetMap); 00073 00074 data_buffer.rootbuf_.Reset(); 00075 00076 RootDebug tracer(10,10); 00077 00078 TClass* tc = getTClass(typeid(SendJobHeader)); 00079 int bres = data_buffer.rootbuf_.WriteObjectAny((char*)&sd, tc); 00080 00081 switch(bres) 00082 { 00083 case 0: // failure 00084 { 00085 throw cms::Exception("StreamTranslation","Registry serialization failed") 00086 << "StreamSerializer failed to serialize registry\n"; 00087 break; 00088 } 00089 case 1: // succcess 00090 break; 00091 case 2: // truncated result 00092 { 00093 throw cms::Exception("StreamTranslation","Registry serialization truncated") 00094 << "StreamSerializer module attempted to serialize\n" 00095 << "a registry that is to big for the allocated buffers\n"; 00096 break; 00097 } 00098 default: // unknown 00099 { 00100 throw cms::Exception("StreamTranslation","Registry serialization failed") 00101 << "StreamSerializer module got an unknown error code\n" 00102 << " while attempting to serialize registry\n"; 00103 break; 00104 } 00105 } 00106 00107 data_buffer.curr_event_size_ = data_buffer.rootbuf_.Length(); 00108 data_buffer.curr_space_used_ = data_buffer.curr_event_size_; 00109 data_buffer.ptr_ = (unsigned char*)data_buffer.rootbuf_.Buffer(); 00110 return data_buffer.curr_space_used_; 00111 }
Selections const* edm::StreamSerializer::selections_ [private] |
Definition at line 95 of file StreamSerializer.h.
Referenced by serializeEvent(), and serializeRegistry().
TClass* edm::StreamSerializer::tc_ [private] |