00001 #include "IOPool/Streamer/interface/StreamerInputSource.h"
00002
00003 #include "IOPool/Streamer/interface/EventMessage.h"
00004 #include "IOPool/Streamer/interface/InitMessage.h"
00005 #include "IOPool/Streamer/interface/ClassFiller.h"
00006
00007 #include "FWCore/Framework/interface/EventPrincipal.h"
00008 #include "FWCore/Framework/interface/FileBlock.h"
00009 #include "FWCore/ParameterSet/interface/FillProductRegistryTransients.h"
00010 #include "DataFormats/Provenance/interface/BranchDescription.h"
00011 #include "DataFormats/Provenance/interface/ProductProvenance.h"
00012 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
00013 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
00014 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
00015 #include "DataFormats/Provenance/interface/EventSelectionID.h"
00016 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00017 #include "DataFormats/Provenance/interface/BranchListIndex.h"
00018
00019 #include "zlib.h"
00020
00021 #include "DataFormats/Common/interface/RefCoreStreamer.h"
00022 #include "FWCore/Utilities/interface/WrappedClassName.h"
00023 #include "FWCore/Utilities/interface/Exception.h"
00024 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00025 #include "FWCore/ParameterSet/interface/Registry.h"
00026 #include "FWCore/Utilities/interface/EDMException.h"
00027 #include "FWCore/Utilities/interface/ThreadSafeRegistry.h"
00028 #include "FWCore/Utilities/interface/Adler32Calculator.h"
00029
00030 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00031 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00032 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00033 #include "FWCore/Utilities/interface/DebugMacros.h"
00034
00035 #include <string>
00036 #include <iostream>
00037 #include <set>
00038
00039 namespace edm {
00040 namespace {
00041 int const init_size = 1024*1024;
00042 }
00043
00044 std::string StreamerInputSource::processName_;
00045 unsigned int StreamerInputSource::protocolVersion_;
00046
00047
00048 StreamerInputSource::StreamerInputSource(
00049 ParameterSet const& pset,
00050 InputSourceDescription const& desc):
00051 RawInputSource(pset, desc),
00052 tc_(getTClass(typeid(SendEvent))),
00053 dest_(init_size),
00054 xbuf_(TBuffer::kRead, init_size),
00055 sendEvent_(),
00056 productGetter_(),
00057 adjustEventToNewProductRegistry_(false) {
00058 }
00059
00060 StreamerInputSource::~StreamerInputSource() {}
00061
00062
00063 std::unique_ptr<FileBlock>
00064 StreamerInputSource::readFile_() {
00065 return std::unique_ptr<FileBlock>(new FileBlock);
00066 }
00067
00068 void
00069 StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header, ProductRegistry& reg, BranchIDListHelper& branchIDListHelper, bool subsequent) {
00070
00071 SendDescs const& descs = header.descs();
00072
00073 FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
00074
00075 if (subsequent) {
00076 ProductRegistry pReg;
00077 pReg.updateFromInput(descs);
00078 fillProductRegistryTransients(header.processConfigurations(), pReg);
00079 std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
00080 if (!mergeInfo.empty()) {
00081 throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
00082 }
00083 branchIDListHelper.updateFromInput(header.branchIDLists());
00084 } else {
00085 declareStreamers(descs);
00086 buildClassCache(descs);
00087 loadExtraClasses();
00088 reg.updateFromInput(descs);
00089 fillProductRegistryTransients(header.processConfigurations(), reg);
00090 branchIDListHelper.updateFromInput(header.branchIDLists());
00091 }
00092 }
00093
00094 void
00095 StreamerInputSource::declareStreamers(SendDescs const& descs) {
00096 SendDescs::const_iterator i(descs.begin()), e(descs.end());
00097
00098 for(; i != e; ++i) {
00099
00100 std::string const real_name = wrappedClassName(i->className());
00101 FDEBUG(6) << "declare: " << real_name << std::endl;
00102 loadCap(real_name);
00103 }
00104 }
00105
00106
00107 void
00108 StreamerInputSource::buildClassCache(SendDescs const& descs) {
00109 SendDescs::const_iterator i(descs.begin()), e(descs.end());
00110
00111 for(; i != e; ++i) {
00112
00113 std::string const real_name = wrappedClassName(i->className());
00114 FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
00115 doBuildRealData(real_name);
00116 }
00117 }
00118
00123 std::auto_ptr<SendJobHeader>
00124 StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
00125 if(initView.code() != Header::INIT)
00126 throw cms::Exception("StreamTranslation","Registry deserialization error")
00127 << "received wrong message type: expected INIT, got "
00128 << initView.code() << "\n";
00129
00130
00131 if (initView.protocolVersion() > 3) {
00132
00133 processName_ = initView.processName();
00134 protocolVersion_ = initView.protocolVersion();
00135
00136 FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
00137 FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
00138 }
00139
00140
00141 uint32_t adler32_chksum = cms::Adler32((char*)initView.descData(),initView.descLength());
00142
00143
00144
00145 if((uint32)adler32_chksum != initView.adler32_chksum()) {
00146 std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed "
00147 << " chksum from registry data = " << adler32_chksum << " from header = "
00148 << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
00149
00150 }
00151
00152 TClass* desc = getTClass(typeid(SendJobHeader));
00153
00154 TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
00155 (char*)initView.descData(),kFALSE);
00156 RootDebug tracer(10,10);
00157 std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
00158
00159 if(sd.get()==0) {
00160 throw cms::Exception("StreamTranslation","Registry deserialization error")
00161 << "Could not read the initial product registry list\n";
00162 }
00163
00164 return sd;
00165 }
00166
00171 void
00172 StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
00173 std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
00174 ProcessConfigurationVector const& pcv = sd->processConfigurations();
00175 mergeIntoRegistry(*sd, productRegistryUpdate(), *branchIDListHelper(), subsequent);
00176 if (subsequent) {
00177 adjustEventToNewProductRegistry_ = true;
00178 }
00179 SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
00180 pset::Registry& psetRegistry = *pset::Registry::instance();
00181 for (SendJobHeader::ParameterSetMap::const_iterator i = psetMap.begin(), iEnd = psetMap.end(); i != iEnd; ++i) {
00182 ParameterSet pset(i->second.pset());
00183 pset.setID(i->first);
00184 psetRegistry.insertMapped(pset);
00185 }
00186 ProcessConfigurationRegistry& pcReg = *ProcessConfigurationRegistry::instance();
00187 for (ProcessConfigurationVector::const_iterator it = pcv.begin(), itEnd = pcv.end(); it != itEnd; ++it) {
00188 pcReg.insertMapped(*it);
00189 }
00190 }
00191
00195 void
00196 StreamerInputSource::deserializeEvent(EventMsgView const& eventView) {
00197 if(eventView.code() != Header::EVENT)
00198 throw cms::Exception("StreamTranslation","Event deserialization error")
00199 << "received wrong message type: expected EVENT, got "
00200 << eventView.code() << "\n";
00201 FDEBUG(9) << "Decode event: "
00202 << eventView.event() << " "
00203 << eventView.run() << " "
00204 << eventView.size() << " "
00205 << eventView.adler32_chksum() << " "
00206 << eventView.eventLength() << " "
00207 << eventView.eventData()
00208 << std::endl;
00209 EventSourceSentry sentry(*this);
00210
00211
00212
00213 unsigned long origsize = eventView.origDataSize();
00214 unsigned long dest_size;
00215
00216 uint32_t adler32_chksum = cms::Adler32((char*)eventView.eventData(), eventView.eventLength());
00217
00218
00219
00220 if((uint32)adler32_chksum != eventView.adler32_chksum()) {
00221 std::cerr << "Error from StreamerInputSource: checksum of event data blob failed "
00222 << " chksum from event = " << adler32_chksum << " from header = "
00223 << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
00224
00225 }
00226 if(origsize != 78 && origsize != 0) {
00227
00228 dest_size = uncompressBuffer((unsigned char*)eventView.eventData(),
00229 eventView.eventLength(), dest_, origsize);
00230 } else {
00231
00232 dest_size = eventView.eventLength();
00233 dest_.resize(dest_size);
00234 unsigned char* pos = (unsigned char*) &dest_[0];
00235 unsigned char* from = (unsigned char*) eventView.eventData();
00236 std::copy(from,from+dest_size,pos);
00237 }
00238
00239
00240
00241
00242 xbuf_.Reset();
00243 xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
00244 RootDebug tracer(10,10);
00245
00246 setRefCoreStreamer(&productGetter_);
00247 sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
00248 setRefCoreStreamer();
00249
00250 if(sendEvent_.get()==0) {
00251 throw cms::Exception("StreamTranslation","Event deserialization error")
00252 << "got a null event from input stream\n";
00253 }
00254 ProcessHistoryRegistry::instance()->insertMapped(sendEvent_->processHistory());
00255
00256 FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
00257 if(runAuxiliary().get() == 0 || runAuxiliary()->run() != sendEvent_->aux().run()) {
00258 RunAuxiliary* runAuxiliary = new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
00259 runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
00260 setRunAuxiliary(runAuxiliary);
00261 resetLuminosityBlockAuxiliary();
00262 }
00263 if(!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
00264 LuminosityBlockAuxiliary* luminosityBlockAuxiliary =
00265 new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
00266 luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
00267 setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
00268 }
00269 setEventCached();
00270 }
00271
00272 EventPrincipal *
00273 StreamerInputSource::read(EventPrincipal& eventPrincipal) {
00274 if(adjustEventToNewProductRegistry_) {
00275 eventPrincipal.adjustIndexesAfterProductRegistryAddition();
00276 bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
00277 assert(eventOK);
00278 adjustEventToNewProductRegistry_ = false;
00279 }
00280 boost::shared_ptr<EventSelectionIDVector> ids(new EventSelectionIDVector(sendEvent_->eventSelectionIDs()));
00281 boost::shared_ptr<BranchListIndexes> indexes(new BranchListIndexes(sendEvent_->branchListIndexes()));
00282 branchIDListHelper()->fixBranchListIndexes(*indexes);
00283 eventPrincipal.fillEventPrincipal(sendEvent_->aux(), ids, indexes);
00284 productGetter_.setEventPrincipal(&eventPrincipal);
00285
00286
00287
00288 SendProds & sps = sendEvent_->products();
00289 for(SendProds::iterator spi = sps.begin(), spe = sps.end(); spi != spe; ++spi) {
00290 FDEBUG(10) << "check prodpair" << std::endl;
00291 if(spi->desc() == 0)
00292 throw cms::Exception("StreamTranslation","Empty Provenance");
00293 FDEBUG(5) << "Prov:"
00294 << " " << spi->desc()->className()
00295 << " " << spi->desc()->productInstanceName()
00296 << " " << spi->desc()->branchID()
00297 << std::endl;
00298
00299 ConstBranchDescription branchDesc(*spi->desc());
00300
00301 ProductProvenance productProvenance(spi->branchID(), *spi->parents());
00302
00303 if(spi->prod() != 0) {
00304 FDEBUG(10) << "addproduct next " << spi->branchID() << std::endl;
00305 eventPrincipal.putOnRead(branchDesc, spi->prod(), productProvenance);
00306 FDEBUG(10) << "addproduct done" << std::endl;
00307 } else {
00308 FDEBUG(10) << "addproduct empty next " << spi->branchID() << std::endl;
00309 eventPrincipal.putOnRead(branchDesc, spi->prod(), productProvenance);
00310 FDEBUG(10) << "addproduct empty done" << std::endl;
00311 }
00312 spi->clear();
00313 }
00314
00315 FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
00316
00317 return &eventPrincipal;
00318 }
00319
00328 unsigned int
00329 StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
00330 unsigned int inputSize,
00331 std::vector<unsigned char>& outputBuffer,
00332 unsigned int expectedFullSize) {
00333 unsigned long origSize = expectedFullSize;
00334 unsigned long uncompressedSize = expectedFullSize*1.1;
00335 FDEBUG(1) << "Uncompress: original size = " << origSize
00336 << ", compressed size = " << inputSize
00337 << std::endl;
00338 outputBuffer.resize(uncompressedSize);
00339 int ret = uncompress(&outputBuffer[0], &uncompressedSize,
00340 inputBuffer, inputSize);
00341
00342 if(ret == Z_OK) {
00343
00344 FDEBUG(10) << " original size = " << origSize << " final size = "
00345 << uncompressedSize << std::endl;
00346 if(origSize != uncompressedSize) {
00347 std::cerr << "deserializeEvent: Problem with uncompress, original size = "
00348 << origSize << " uncompress size = " << uncompressedSize << std::endl;
00349
00350 throw cms::Exception("StreamDeserialization","Uncompression error")
00351 << "mismatch event lengths should be" << origSize << " got "
00352 << uncompressedSize << "\n";
00353 }
00354 } else {
00355
00356 std::cerr << "deserializeEvent: Problem with uncompress, return value = "
00357 << ret << std::endl;
00358 throw cms::Exception("StreamDeserialization","Uncompression error")
00359 << "Error code = " << ret << "\n ";
00360 }
00361 return (unsigned int) uncompressedSize;
00362 }
00363
00364 void StreamerInputSource::resetAfterEndRun() {
00365
00366
00367 resetLuminosityBlockAuxiliary();
00368 resetRunAuxiliary();
00369 assert(!eventCached());
00370 reset();
00371 }
00372
00373 void StreamerInputSource::setRun(RunNumber_t) {
00374
00375
00376 throw Exception(errors::LogicError)
00377 << "StreamerInputSource::setRun()\n"
00378 << "Run number cannot be modified for this type of Input Source\n"
00379 << "Contact a Storage Manager Developer\n";
00380 }
00381
00382 StreamerInputSource::ProductGetter::ProductGetter() : eventPrincipal_(0) {}
00383
00384 StreamerInputSource::ProductGetter::~ProductGetter() {}
00385
00386 WrapperHolder
00387 StreamerInputSource::ProductGetter::getIt(ProductID const& id) const {
00388 return eventPrincipal_ ? eventPrincipal_->getIt(id) : WrapperHolder();
00389 }
00390
00391 void
00392 StreamerInputSource::ProductGetter::setEventPrincipal(EventPrincipal *ep) {
00393 eventPrincipal_ = ep;
00394 }
00395
00396 void
00397 StreamerInputSource::fillDescription(ParameterSetDescription& desc) {
00398 RawInputSource::fillDescription(desc);
00399 }
00400 }