00001 #include "IOPool/Streamer/interface/StreamerInputSource.h"
00002
00003 #include "IOPool/Streamer/interface/EventMessage.h"
00004 #include "IOPool/Streamer/interface/InitMessage.h"
00005 #include "IOPool/Streamer/interface/ClassFiller.h"
00006
00007 #include "FWCore/Framework/interface/EventPrincipal.h"
00008 #include "FWCore/Framework/interface/FileBlock.h"
00009 #include "FWCore/Framework/src/PrincipalCache.h"
00010 #include "FWCore/ParameterSet/interface/FillProductRegistryTransients.h"
00011 #include "DataFormats/Provenance/interface/BranchDescription.h"
00012 #include "DataFormats/Provenance/interface/ProductProvenance.h"
00013 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
00014 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
00015 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
00016 #include "DataFormats/Provenance/interface/EventSelectionID.h"
00017 #include "DataFormats/Provenance/interface/BranchListIndex.h"
00018
00019 #include "zlib.h"
00020
00021 #include "DataFormats/Common/interface/RefCoreStreamer.h"
00022 #include "FWCore/Utilities/interface/WrappedClassName.h"
00023 #include "FWCore/Utilities/interface/Exception.h"
00024 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00025 #include "FWCore/ParameterSet/interface/Registry.h"
00026 #include "FWCore/Utilities/interface/EDMException.h"
00027 #include "FWCore/Utilities/interface/ThreadSafeRegistry.h"
00028 #include "FWCore/Utilities/interface/Adler32Calculator.h"
00029
00030 #include "DataFormats/Provenance/interface/BranchIDListRegistry.h"
00031 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00032 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00033 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00034 #include "FWCore/Utilities/interface/DebugMacros.h"
00035
00036 #include <string>
00037 #include <iostream>
00038 #include <set>
00039
00040 namespace edm {
00041 namespace {
00042 int const init_size = 1024*1024;
00043 }
00044
00045 std::string StreamerInputSource::processName_;
00046 unsigned int StreamerInputSource::protocolVersion_;
00047
00048
00049 StreamerInputSource::StreamerInputSource(
00050 ParameterSet const& pset,
00051 InputSourceDescription const& desc):
00052 InputSource(pset, desc),
00053
00054
00055 inputFileTransitionsEachEvent_(
00056 pset.getUntrackedParameter<bool>("inputFileTransitionsEachEvent", false)),
00057 newRun_(true),
00058 newLumi_(true),
00059 eventCached_(false),
00060 tc_(getTClass(typeid(SendEvent))),
00061 dest_(init_size),
00062 xbuf_(TBuffer::kRead, init_size),
00063 runEndingFlag_(false),
00064 productGetter_() {
00065 }
00066
00067 StreamerInputSource::~StreamerInputSource() {}
00068
00069
00070 boost::shared_ptr<FileBlock>
00071 StreamerInputSource::readFile_() {
00072 return boost::shared_ptr<FileBlock>(new FileBlock);
00073 }
00074
00075 void
00076 StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header, ProductRegistry& reg, bool subsequent) {
00077
00078 SendDescs const& descs = header.descs();
00079
00080 FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
00081
00082 if (subsequent) {
00083 ProductRegistry pReg;
00084 pReg.updateFromInput(descs);
00085 fillProductRegistryTransients(header.processConfigurations(), pReg);
00086 std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
00087 if (!mergeInfo.empty()) {
00088 throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
00089 }
00090 BranchIDListHelper::updateFromInput(header.branchIDLists(), std::string());
00091 } else {
00092 declareStreamers(descs);
00093 buildClassCache(descs);
00094 loadExtraClasses();
00095 reg.updateFromInput(descs);
00096 fillProductRegistryTransients(header.processConfigurations(), reg);
00097 BranchIDListHelper::updateFromInput(header.branchIDLists(), std::string());
00098 }
00099 }
00100
00101 void
00102 StreamerInputSource::declareStreamers(SendDescs const& descs) {
00103 SendDescs::const_iterator i(descs.begin()), e(descs.end());
00104
00105 for(; i != e; ++i) {
00106
00107 std::string const real_name = wrappedClassName(i->className());
00108 FDEBUG(6) << "declare: " << real_name << std::endl;
00109 loadCap(real_name);
00110 }
00111 }
00112
00113
00114 void
00115 StreamerInputSource::buildClassCache(SendDescs const& descs) {
00116 SendDescs::const_iterator i(descs.begin()), e(descs.end());
00117
00118 for(; i != e; ++i) {
00119
00120 std::string const real_name = wrappedClassName(i->className());
00121 FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
00122 doBuildRealData(real_name);
00123 }
00124 }
00125
00126 boost::shared_ptr<RunAuxiliary>
00127 StreamerInputSource::readRunAuxiliary_() {
00128 assert(newRun_);
00129 assert(runAuxiliary());
00130 newRun_ = false;
00131 return runAuxiliary();
00132 }
00133
00134 boost::shared_ptr<LuminosityBlockAuxiliary>
00135 StreamerInputSource::readLuminosityBlockAuxiliary_() {
00136 assert(!newRun_);
00137 assert(newLumi_);
00138 assert(luminosityBlockAuxiliary());
00139 newLumi_ = false;
00140 return luminosityBlockAuxiliary();
00141 }
00142
00143 EventPrincipal*
00144 StreamerInputSource::readEvent_() {
00145 assert(!newRun_);
00146 assert(!newLumi_);
00147 assert(eventCached_);
00148 eventCached_ = false;
00149 eventPrincipalCache()->setLuminosityBlockPrincipal(luminosityBlockPrincipal());
00150 return eventPrincipalCache();
00151 }
00152
00153 InputSource::ItemType
00154 StreamerInputSource::getNextItemType() {
00155 if (runEndingFlag_) {
00156 return IsStop;
00157 }
00158 if(newRun_ && runAuxiliary()) {
00159 return IsRun;
00160 }
00161 if(newLumi_ && luminosityBlockAuxiliary()) {
00162 return IsLumi;
00163 }
00164 if (eventCached_) {
00165 return IsEvent;
00166 }
00167 if (inputFileTransitionsEachEvent_) {
00168 resetRunAuxiliary();
00169 resetLuminosityBlockAuxiliary();
00170 }
00171 read();
00172 if (!eventCached_) {
00173 return IsStop;
00174 } else {
00175 runEndingFlag_ = false;
00176 if (inputFileTransitionsEachEvent_) {
00177 return IsFile;
00178 }
00179 }
00180 if(newRun_) {
00181 return IsRun;
00182 } else if(newLumi_) {
00183 return IsLumi;
00184 }
00185 return IsEvent;
00186 }
00187
00192 std::auto_ptr<SendJobHeader>
00193 StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
00194 if(initView.code() != Header::INIT)
00195 throw cms::Exception("StreamTranslation","Registry deserialization error")
00196 << "received wrong message type: expected INIT, got "
00197 << initView.code() << "\n";
00198
00199
00200 if (initView.protocolVersion() > 3) {
00201
00202 processName_ = initView.processName();
00203 protocolVersion_ = initView.protocolVersion();
00204
00205 FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
00206 FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
00207 }
00208
00209
00210 uint32_t adler32_chksum = cms::Adler32((char*)initView.descData(),initView.descLength());
00211
00212
00213
00214 if((uint32)adler32_chksum != initView.adler32_chksum()) {
00215 std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed "
00216 << " chksum from registry data = " << adler32_chksum << " from header = "
00217 << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
00218
00219 }
00220
00221 TClass* desc = getTClass(typeid(SendJobHeader));
00222
00223 TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
00224 (char*)initView.descData(),kFALSE);
00225 RootDebug tracer(10,10);
00226 std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
00227
00228 if(sd.get()==0) {
00229 throw cms::Exception("StreamTranslation","Registry deserialization error")
00230 << "Could not read the initial product registry list\n";
00231 }
00232
00233 return sd;
00234 }
00235
00240 void
00241 StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
00242 std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
00243 ProcessConfigurationVector const& pcv = sd->processConfigurations();
00244 mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent);
00245 if (subsequent) {
00246 principalCache().adjustEventToNewProductRegistry(productRegistry());
00247 }
00248 SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
00249 pset::Registry& psetRegistry = *pset::Registry::instance();
00250 for (SendJobHeader::ParameterSetMap::const_iterator i = psetMap.begin(), iEnd = psetMap.end(); i != iEnd; ++i) {
00251 ParameterSet pset(i->second.pset());
00252 pset.setID(i->first);
00253 psetRegistry.insertMapped(pset);
00254 }
00255 ProcessConfigurationRegistry& pcReg = *ProcessConfigurationRegistry::instance();
00256 for (ProcessConfigurationVector::const_iterator it = pcv.begin(), itEnd = pcv.end(); it != itEnd; ++it) {
00257 pcReg.insertMapped(*it);
00258 }
00259 }
00260
00264 EventPrincipal*
00265 StreamerInputSource::deserializeEvent(EventMsgView const& eventView) {
00266 if(eventView.code() != Header::EVENT)
00267 throw cms::Exception("StreamTranslation","Event deserialization error")
00268 << "received wrong message type: expected EVENT, got "
00269 << eventView.code() << "\n";
00270 FDEBUG(9) << "Decode event: "
00271 << eventView.event() << " "
00272 << eventView.run() << " "
00273 << eventView.size() << " "
00274 << eventView.adler32_chksum() << " "
00275 << eventView.eventLength() << " "
00276 << eventView.eventData()
00277 << std::endl;
00278 EventSourceSentry(*this);
00279
00280
00281
00282 unsigned long origsize = eventView.origDataSize();
00283 unsigned long dest_size;
00284
00285 uint32_t adler32_chksum = cms::Adler32((char*)eventView.eventData(), eventView.eventLength());
00286
00287
00288
00289 if((uint32)adler32_chksum != eventView.adler32_chksum()) {
00290 std::cerr << "Error from StreamerInputSource: checksum of event data blob failed "
00291 << " chksum from event = " << adler32_chksum << " from header = "
00292 << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
00293
00294 }
00295 if(origsize != 78 && origsize != 0) {
00296
00297 dest_size = uncompressBuffer((unsigned char*)eventView.eventData(),
00298 eventView.eventLength(), dest_, origsize);
00299 } else {
00300
00301 dest_size = eventView.eventLength();
00302 dest_.resize(dest_size);
00303 unsigned char* pos = (unsigned char*) &dest_[0];
00304 unsigned char* from = (unsigned char*) eventView.eventData();
00305 std::copy(from,from+dest_size,pos);
00306 }
00307
00308
00309
00310
00311 xbuf_.Reset();
00312 xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
00313 RootDebug tracer(10,10);
00314
00315 setRefCoreStreamer(&productGetter_);
00316 std::auto_ptr<SendEvent> sd((SendEvent*)xbuf_.ReadObjectAny(tc_));
00317 setRefCoreStreamer();
00318
00319 if(sd.get()==0) {
00320 throw cms::Exception("StreamTranslation","Event deserialization error")
00321 << "got a null event from input stream\n";
00322 }
00323 ProcessHistoryRegistry::instance()->insertMapped(sd->processHistory());
00324
00325 FDEBUG(5) << "Got event: " << sd->aux().id() << " " << sd->products().size() << std::endl;
00326 if(runAuxiliary().get() == 0 || runAuxiliary()->run() != sd->aux().run()) {
00327 newRun_ = newLumi_ = true;
00328 RunAuxiliary* runAuxiliary = new RunAuxiliary(sd->aux().run(), sd->aux().time(), Timestamp::invalidTimestamp());
00329 runAuxiliary->setProcessHistoryID(sd->processHistory().id());
00330 setRunAuxiliary(runAuxiliary);
00331 resetLuminosityBlockAuxiliary();
00332 }
00333 if(!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
00334 LuminosityBlockAuxiliary* luminosityBlockAuxiliary =
00335 new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sd->aux().time(), Timestamp::invalidTimestamp());
00336 luminosityBlockAuxiliary->setProcessHistoryID(sd->processHistory().id());
00337 setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
00338 newLumi_ = true;
00339 }
00340
00341 boost::shared_ptr<EventSelectionIDVector> ids(new EventSelectionIDVector(sd->eventSelectionIDs()));
00342 boost::shared_ptr<BranchListIndexes> indexes(new BranchListIndexes(sd->branchListIndexes()));
00343 BranchIDListHelper::fixBranchListIndexes(*indexes);
00344 eventPrincipalCache()->fillEventPrincipal(sd->aux(), boost::shared_ptr<LuminosityBlockPrincipal>(), ids, indexes);
00345 productGetter_.setEventPrincipal(eventPrincipalCache());
00346 eventCached_ = true;
00347
00348
00349
00350 SendProds & sps = sd->products();
00351 for(SendProds::iterator spi = sps.begin(), spe = sps.end(); spi != spe; ++spi) {
00352 FDEBUG(10) << "check prodpair" << std::endl;
00353 if(spi->desc() == 0)
00354 throw cms::Exception("StreamTranslation","Empty Provenance");
00355 FDEBUG(5) << "Prov:"
00356 << " " << spi->desc()->className()
00357 << " " << spi->desc()->productInstanceName()
00358 << " " << spi->desc()->branchID()
00359 << std::endl;
00360
00361 ConstBranchDescription branchDesc(*spi->desc());
00362
00363 ProductProvenance productProvenance(spi->branchID(), *spi->parents());
00364
00365 if(spi->prod() != 0) {
00366 FDEBUG(10) << "addgroup next " << spi->branchID() << std::endl;
00367 eventPrincipalCache()->putOnRead(branchDesc, spi->prod(), productProvenance);
00368 FDEBUG(10) << "addgroup done" << std::endl;
00369 } else {
00370 FDEBUG(10) << "addgroup empty next " << spi->branchID() << std::endl;
00371 eventPrincipalCache()->putOnRead(branchDesc, spi->prod(), productProvenance);
00372 FDEBUG(10) << "addgroup empty done" << std::endl;
00373 }
00374 spi->clear();
00375 }
00376
00377 FDEBUG(10) << "Size = " << eventPrincipalCache()->size() << std::endl;
00378
00379 return eventPrincipalCache();
00380 }
00381
00390 unsigned int
00391 StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
00392 unsigned int inputSize,
00393 std::vector<unsigned char>& outputBuffer,
00394 unsigned int expectedFullSize) {
00395 unsigned long origSize = expectedFullSize;
00396 unsigned long uncompressedSize = expectedFullSize*1.1;
00397 FDEBUG(1) << "Uncompress: original size = " << origSize
00398 << ", compressed size = " << inputSize
00399 << std::endl;
00400 outputBuffer.resize(uncompressedSize);
00401 int ret = uncompress(&outputBuffer[0], &uncompressedSize,
00402 inputBuffer, inputSize);
00403
00404 if(ret == Z_OK) {
00405
00406 FDEBUG(10) << " original size = " << origSize << " final size = "
00407 << uncompressedSize << std::endl;
00408 if(origSize != uncompressedSize) {
00409 std::cerr << "deserializeEvent: Problem with uncompress, original size = "
00410 << origSize << " uncompress size = " << uncompressedSize << std::endl;
00411
00412 throw cms::Exception("StreamDeserialization","Uncompression error")
00413 << "mismatch event lengths should be" << origSize << " got "
00414 << uncompressedSize << "\n";
00415 }
00416 } else {
00417
00418 std::cerr << "deserializeEvent: Problem with uncompress, return value = "
00419 << ret << std::endl;
00420 throw cms::Exception("StreamDeserialization","Uncompression error")
00421 << "Error code = " << ret << "\n ";
00422 }
00423 return (unsigned int) uncompressedSize;
00424 }
00425
00426 void StreamerInputSource::resetAfterEndRun() {
00427
00428
00429 resetLuminosityBlockAuxiliary();
00430 resetRunAuxiliary();
00431 newRun_ = newLumi_ = true;
00432 assert(!eventCached_);
00433 reset();
00434 runEndingFlag_ = false;
00435 }
00436
00437 void StreamerInputSource::setRun(RunNumber_t) {
00438
00439
00440 throw Exception(errors::LogicError)
00441 << "StreamerInputSource::setRun()\n"
00442 << "Run number cannot be modified for this type of Input Source\n"
00443 << "Contact a Storage Manager Developer\n";
00444 }
00445
00446 StreamerInputSource::ProductGetter::ProductGetter() : eventPrincipal_(0) {}
00447
00448 StreamerInputSource::ProductGetter::~ProductGetter() {}
00449
00450 WrapperHolder
00451 StreamerInputSource::ProductGetter::getIt(ProductID const& id) const {
00452 return eventPrincipal_ ? eventPrincipal_->getIt(id) : WrapperHolder();
00453 }
00454
00455 void
00456 StreamerInputSource::ProductGetter::setEventPrincipal(EventPrincipal *ep) {
00457 eventPrincipal_ = ep;
00458 }
00459
00460 void
00461 StreamerInputSource::fillDescription(ParameterSetDescription& desc) {
00462
00463
00464 InputSource::fillDescription(desc);
00465 }
00466 }