00001 #include "IOPool/Streamer/interface/StreamerInputSource.h"
00002
00003 #include "IOPool/Streamer/interface/EventMessage.h"
00004 #include "IOPool/Streamer/interface/InitMessage.h"
00005 #include "IOPool/Streamer/interface/ClassFiller.h"
00006
00007 #include "FWCore/Framework/interface/EventPrincipal.h"
00008 #include "FWCore/Framework/interface/FileBlock.h"
00009 #include "FWCore/Framework/src/PrincipalCache.h"
00010 #include "FWCore/ParameterSet/interface/FillProductRegistryTransients.h"
00011 #include "DataFormats/Provenance/interface/BranchDescription.h"
00012 #include "DataFormats/Provenance/interface/ProductProvenance.h"
00013 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
00014 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
00015 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
00016 #include "DataFormats/Provenance/interface/EventSelectionID.h"
00017 #include "DataFormats/Provenance/interface/BranchListIndex.h"
00018
00019 #include "zlib.h"
00020
00021 #include "DataFormats/Common/interface/RefCoreStreamer.h"
00022 #include "FWCore/Utilities/interface/WrappedClassName.h"
00023 #include "FWCore/Utilities/interface/Exception.h"
00024 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00025 #include "FWCore/ParameterSet/interface/Registry.h"
00026 #include "FWCore/Utilities/interface/EDMException.h"
00027 #include "FWCore/Utilities/interface/ThreadSafeRegistry.h"
00028 #include "FWCore/Utilities/interface/Adler32Calculator.h"
00029
00030 #include "DataFormats/Provenance/interface/BranchIDListRegistry.h"
00031 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00032 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00033 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00034 #include "FWCore/Utilities/interface/DebugMacros.h"
00035
00036 #include <string>
00037 #include <iostream>
00038 #include <set>
00039
00040 namespace edm {
00041 namespace {
00042 int const init_size = 1024*1024;
00043 }
00044
00045 std::string StreamerInputSource::processName_;
00046 unsigned int StreamerInputSource::protocolVersion_;
00047
00048
00049 StreamerInputSource::StreamerInputSource(
00050 ParameterSet const& pset,
00051 InputSourceDescription const& desc):
00052 InputSource(pset, desc),
00053
00054
00055 inputFileTransitionsEachEvent_(
00056 pset.getUntrackedParameter<bool>("inputFileTransitionsEachEvent", false)),
00057 newRun_(true),
00058 newLumi_(true),
00059 eventCached_(false),
00060 tc_(getTClass(typeid(SendEvent))),
00061 dest_(init_size),
00062 xbuf_(TBuffer::kRead, init_size),
00063 runEndingFlag_(false),
00064 productGetter_() {
00065 }
00066
00067 StreamerInputSource::~StreamerInputSource() {}
00068
00069
00070 boost::shared_ptr<FileBlock>
00071 StreamerInputSource::readFile_() {
00072 return boost::shared_ptr<FileBlock>(new FileBlock);
00073 }
00074
00075 void
00076 StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header, ProductRegistry& reg, bool subsequent) {
00077
00078 SendDescs const& descs = header.descs();
00079
00080 FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
00081
00082 if (subsequent) {
00083 ProductRegistry pReg;
00084 pReg.updateFromInput(descs);
00085 fillProductRegistryTransients(header.processConfigurations(), pReg);
00086 std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
00087 if (!mergeInfo.empty()) {
00088 throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
00089 }
00090 BranchIDListHelper::updateFromInput(header.branchIDLists(), std::string());
00091 } else {
00092 declareStreamers(descs);
00093 buildClassCache(descs);
00094 loadExtraClasses();
00095 reg.updateFromInput(descs);
00096 fillProductRegistryTransients(header.processConfigurations(), reg);
00097 BranchIDListHelper::updateFromInput(header.branchIDLists(), std::string());
00098 }
00099 }
00100
00101 void
00102 StreamerInputSource::declareStreamers(SendDescs const& descs) {
00103 SendDescs::const_iterator i(descs.begin()), e(descs.end());
00104
00105 for(; i != e; ++i) {
00106
00107 std::string const real_name = wrappedClassName(i->className());
00108 FDEBUG(6) << "declare: " << real_name << std::endl;
00109 loadCap(real_name);
00110 }
00111 }
00112
00113
00114 void
00115 StreamerInputSource::buildClassCache(SendDescs const& descs) {
00116 SendDescs::const_iterator i(descs.begin()), e(descs.end());
00117
00118 for(; i != e; ++i) {
00119
00120 std::string const real_name = wrappedClassName(i->className());
00121 FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
00122 doBuildRealData(real_name);
00123 }
00124 }
00125
00126 boost::shared_ptr<RunAuxiliary>
00127 StreamerInputSource::readRunAuxiliary_() {
00128 assert(newRun_);
00129 assert(runAuxiliary());
00130 newRun_ = false;
00131 return runAuxiliary();
00132 }
00133
00134 boost::shared_ptr<LuminosityBlockAuxiliary>
00135 StreamerInputSource::readLuminosityBlockAuxiliary_() {
00136 assert(!newRun_);
00137 assert(newLumi_);
00138 assert(luminosityBlockAuxiliary());
00139 newLumi_ = false;
00140 return luminosityBlockAuxiliary();
00141 }
00142
00143 EventPrincipal*
00144 StreamerInputSource::readEvent_() {
00145 assert(!newRun_);
00146 assert(!newLumi_);
00147 assert(eventCached_);
00148 eventCached_ = false;
00149 return eventPrincipalCache();
00150 }
00151
00152 InputSource::ItemType
00153 StreamerInputSource::getNextItemType() {
00154 if (runEndingFlag_) {
00155 return IsStop;
00156 }
00157 if(newRun_ && runAuxiliary()) {
00158 return IsRun;
00159 }
00160 if(newLumi_ && luminosityBlockAuxiliary()) {
00161 return IsLumi;
00162 }
00163 if (eventCached_) {
00164 return IsEvent;
00165 }
00166 if (inputFileTransitionsEachEvent_) {
00167 resetRunAuxiliary();
00168 resetLuminosityBlockAuxiliary();
00169 }
00170 read();
00171 if (!eventCached_) {
00172 return IsStop;
00173 } else {
00174 runEndingFlag_ = false;
00175 if (inputFileTransitionsEachEvent_) {
00176 return IsFile;
00177 }
00178 }
00179 if(newRun_) {
00180 return IsRun;
00181 } else if(newLumi_) {
00182 return IsLumi;
00183 }
00184 return IsEvent;
00185 }
00186
00191 std::auto_ptr<SendJobHeader>
00192 StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
00193 if(initView.code() != Header::INIT)
00194 throw cms::Exception("StreamTranslation","Registry deserialization error")
00195 << "received wrong message type: expected INIT, got "
00196 << initView.code() << "\n";
00197
00198
00199 if (initView.protocolVersion() > 3) {
00200
00201 processName_ = initView.processName();
00202 protocolVersion_ = initView.protocolVersion();
00203
00204 FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
00205 FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
00206 }
00207
00208
00209 uint32_t adler32_chksum = cms::Adler32((char*)initView.descData(),initView.descLength());
00210
00211
00212
00213 if((uint32)adler32_chksum != initView.adler32_chksum()) {
00214 std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed "
00215 << " chksum from registry data = " << adler32_chksum << " from header = "
00216 << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
00217
00218 }
00219
00220 TClass* desc = getTClass(typeid(SendJobHeader));
00221
00222 TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
00223 (char*)initView.descData(),kFALSE);
00224 RootDebug tracer(10,10);
00225 std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
00226
00227 if(sd.get()==0) {
00228 throw cms::Exception("StreamTranslation","Registry deserialization error")
00229 << "Could not read the initial product registry list\n";
00230 }
00231
00232 return sd;
00233 }
00234
00239 void
00240 StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
00241 std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
00242 ProcessConfigurationVector const& pcv = sd->processConfigurations();
00243 mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent);
00244 if (subsequent) {
00245 principalCache().adjustEventToNewProductRegistry(productRegistry());
00246 }
00247 SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
00248 pset::Registry& psetRegistry = *pset::Registry::instance();
00249 for (SendJobHeader::ParameterSetMap::const_iterator i = psetMap.begin(), iEnd = psetMap.end(); i != iEnd; ++i) {
00250 ParameterSet pset(i->second.pset());
00251 pset.setID(i->first);
00252 psetRegistry.insertMapped(pset);
00253 }
00254 ProcessConfigurationRegistry& pcReg = *ProcessConfigurationRegistry::instance();
00255 for (ProcessConfigurationVector::const_iterator it = pcv.begin(), itEnd = pcv.end(); it != itEnd; ++it) {
00256 pcReg.insertMapped(*it);
00257 }
00258 }
00259
00263 EventPrincipal*
00264 StreamerInputSource::deserializeEvent(EventMsgView const& eventView) {
00265 if(eventView.code() != Header::EVENT)
00266 throw cms::Exception("StreamTranslation","Event deserialization error")
00267 << "received wrong message type: expected EVENT, got "
00268 << eventView.code() << "\n";
00269 FDEBUG(9) << "Decode event: "
00270 << eventView.event() << " "
00271 << eventView.run() << " "
00272 << eventView.size() << " "
00273 << eventView.adler32_chksum() << " "
00274 << eventView.eventLength() << " "
00275 << eventView.eventData()
00276 << std::endl;
00277 EventSourceSentry(*this);
00278
00279
00280
00281 unsigned long origsize = eventView.origDataSize();
00282 unsigned long dest_size;
00283
00284 uint32_t adler32_chksum = cms::Adler32((char*)eventView.eventData(), eventView.eventLength());
00285
00286
00287
00288 if((uint32)adler32_chksum != eventView.adler32_chksum()) {
00289 std::cerr << "Error from StreamerInputSource: checksum of event data blob failed "
00290 << " chksum from event = " << adler32_chksum << " from header = "
00291 << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
00292
00293 }
00294 if(origsize != 78 && origsize != 0) {
00295
00296 dest_size = uncompressBuffer((unsigned char*)eventView.eventData(),
00297 eventView.eventLength(), dest_, origsize);
00298 } else {
00299
00300 dest_size = eventView.eventLength();
00301 dest_.resize(dest_size);
00302 unsigned char* pos = (unsigned char*) &dest_[0];
00303 unsigned char* from = (unsigned char*) eventView.eventData();
00304 std::copy(from,from+dest_size,pos);
00305 }
00306
00307
00308
00309
00310 xbuf_.Reset();
00311 xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
00312 RootDebug tracer(10,10);
00313
00314 setRefCoreStreamer(&productGetter_);
00315 std::auto_ptr<SendEvent> sd((SendEvent*)xbuf_.ReadObjectAny(tc_));
00316 setRefCoreStreamer();
00317
00318 if(sd.get()==0) {
00319 throw cms::Exception("StreamTranslation","Event deserialization error")
00320 << "got a null event from input stream\n";
00321 }
00322 ProcessHistoryRegistry::instance()->insertMapped(sd->processHistory());
00323
00324 FDEBUG(5) << "Got event: " << sd->aux().id() << " " << sd->products().size() << std::endl;
00325 if(runAuxiliary().get() == 0 || runAuxiliary()->run() != sd->aux().run()) {
00326 newRun_ = newLumi_ = true;
00327 RunAuxiliary* runAuxiliary = new RunAuxiliary(sd->aux().run(), sd->aux().time(), Timestamp::invalidTimestamp());
00328 runAuxiliary->setProcessHistoryID(sd->processHistory().id());
00329 setRunAuxiliary(runAuxiliary);
00330 readAndCacheRun();
00331 setRunPrematurelyRead();
00332 }
00333 if(!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
00334 LuminosityBlockAuxiliary* luminosityBlockAuxiliary =
00335 new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sd->aux().time(), Timestamp::invalidTimestamp());
00336 luminosityBlockAuxiliary->setProcessHistoryID(sd->processHistory().id());
00337 setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
00338 newLumi_ = true;
00339 readAndCacheLumi();
00340 setLumiPrematurelyRead();
00341 }
00342
00343 boost::shared_ptr<EventSelectionIDVector> ids(new EventSelectionIDVector(sd->eventSelectionIDs()));
00344 boost::shared_ptr<BranchListIndexes> indexes(new BranchListIndexes(sd->branchListIndexes()));
00345 std::auto_ptr<EventAuxiliary> aux(new EventAuxiliary(sd->aux()));
00346 eventPrincipalCache()->fillEventPrincipal(aux, luminosityBlockPrincipal(), ids, indexes);
00347 productGetter_.setEventPrincipal(eventPrincipalCache());
00348 eventCached_ = true;
00349
00350
00351
00352 SendProds & sps = sd->products();
00353 for(SendProds::iterator spi = sps.begin(), spe = sps.end(); spi != spe; ++spi) {
00354 FDEBUG(10) << "check prodpair" << std::endl;
00355 if(spi->desc() == 0)
00356 throw cms::Exception("StreamTranslation","Empty Provenance");
00357 FDEBUG(5) << "Prov:"
00358 << " " << spi->desc()->className()
00359 << " " << spi->desc()->productInstanceName()
00360 << " " << spi->desc()->branchID()
00361 << std::endl;
00362
00363 ConstBranchDescription branchDesc(*spi->desc());
00364
00365 std::auto_ptr<ProductProvenance> productProvenance(
00366 new ProductProvenance(spi->branchID(),
00367 spi->status(),
00368 *spi->parents()));
00369
00370 if(spi->prod() != 0) {
00371 std::auto_ptr<EDProduct> aprod(const_cast<EDProduct*>(spi->prod()));
00372 FDEBUG(10) << "addgroup next " << spi->branchID() << std::endl;
00373 eventPrincipalCache()->putOnRead(branchDesc, aprod, productProvenance);
00374 FDEBUG(10) << "addgroup done" << std::endl;
00375 } else {
00376 FDEBUG(10) << "addgroup empty next " << spi->branchID() << std::endl;
00377 eventPrincipalCache()->putOnRead(branchDesc, std::auto_ptr<EDProduct>(), productProvenance);
00378 FDEBUG(10) << "addgroup empty done" << std::endl;
00379 }
00380 spi->clear();
00381 }
00382
00383 FDEBUG(10) << "Size = " << eventPrincipalCache()->size() << std::endl;
00384
00385 return eventPrincipalCache();
00386 }
00387
00396 unsigned int
00397 StreamerInputSource::uncompressBuffer(unsigned char *inputBuffer,
00398 unsigned int inputSize,
00399 std::vector<unsigned char> &outputBuffer,
00400 unsigned int expectedFullSize) {
00401 unsigned long origSize = expectedFullSize;
00402 unsigned long uncompressedSize = expectedFullSize*1.1;
00403 FDEBUG(1) << "Uncompress: original size = " << origSize
00404 << ", compressed size = " << inputSize
00405 << std::endl;
00406 outputBuffer.resize(uncompressedSize);
00407 int ret = uncompress(&outputBuffer[0], &uncompressedSize,
00408 inputBuffer, inputSize);
00409
00410 if(ret == Z_OK) {
00411
00412 FDEBUG(10) << " original size = " << origSize << " final size = "
00413 << uncompressedSize << std::endl;
00414 if(origSize != uncompressedSize) {
00415 std::cerr << "deserializeEvent: Problem with uncompress, original size = "
00416 << origSize << " uncompress size = " << uncompressedSize << std::endl;
00417
00418 throw cms::Exception("StreamDeserialization","Uncompression error")
00419 << "mismatch event lengths should be" << origSize << " got "
00420 << uncompressedSize << "\n";
00421 }
00422 } else {
00423
00424 std::cerr << "deserializeEvent: Problem with uncompress, return value = "
00425 << ret << std::endl;
00426 throw cms::Exception("StreamDeserialization","Uncompression error")
00427 << "Error code = " << ret << "\n ";
00428 }
00429 return (unsigned int) uncompressedSize;
00430 }
00431
00432 void StreamerInputSource::resetAfterEndRun() {
00433
00434
00435 resetLuminosityBlockAuxiliary();
00436 resetRunAuxiliary();
00437 newRun_ = newLumi_ = true;
00438 assert(!eventCached_);
00439 reset();
00440 runEndingFlag_ = false;
00441 }
00442
00443 void StreamerInputSource::setRun(RunNumber_t) {
00444
00445
00446 throw Exception(errors::LogicError)
00447 << "StreamerInputSource::setRun()\n"
00448 << "Run number cannot be modified for this type of Input Source\n"
00449 << "Contact a Storage Manager Developer\n";
00450 }
00451
00452 StreamerInputSource::ProductGetter::ProductGetter() : eventPrincipal_(0) {}
00453
00454 StreamerInputSource::ProductGetter::~ProductGetter() {}
00455
00456 EDProduct const*
00457 StreamerInputSource::ProductGetter::getIt(ProductID const& id) const {
00458 return eventPrincipal_ ? eventPrincipal_->getIt(id) : 0;
00459 }
00460
00461 void
00462 StreamerInputSource::ProductGetter::setEventPrincipal(EventPrincipal *ep) {
00463 eventPrincipal_ = ep;
00464 }
00465
00466 void
00467 StreamerInputSource::fillDescription(ParameterSetDescription& desc) {
00468
00469
00470 InputSource::fillDescription(desc);
00471 }
00472 }