00001 #include "IOPool/Streamer/interface/StreamerInputSource.h"
00002 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
00003
00004 #include "IOPool/Streamer/interface/EventMessage.h"
00005 #include "IOPool/Streamer/interface/InitMessage.h"
00006 #include "IOPool/Streamer/interface/ClassFiller.h"
00007
00008 #include "FWCore/Framework/interface/EventPrincipal.h"
00009 #include "FWCore/Framework/interface/FileBlock.h"
00010 #include "DataFormats/Provenance/interface/BranchDescription.h"
00011 #include "DataFormats/Provenance/interface/EventEntryDescription.h"
00012 #include "DataFormats/Provenance/interface/EventEntryInfo.h"
00013 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00014 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
00015 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
00016 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
00017
00018 #include "zlib.h"
00019
00020 #include "DataFormats/Common/interface/RefCoreStreamer.h"
00021 #include "FWCore/Utilities/interface/WrappedClassName.h"
00022 #include "DataFormats/Common/interface/EDProduct.h"
00023 #include "FWCore/Utilities/interface/Exception.h"
00024 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00025 #include "FWCore/Framework/interface/RunPrincipal.h"
00026 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00027 #include "FWCore/ParameterSet/interface/Registry.h"
00028 #include "FWCore/Utilities/interface/ThreadSafeRegistry.h"
00029
00030 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00031 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00032 #include "FWCore/Utilities/interface/DebugMacros.h"
00033
00034 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00035
00036 #include <string>
00037 #include <iostream>
00038 #include <set>
00039
00040 namespace edm {
00041 namespace {
00042 const int init_size = 1024*1024;
00043 }
00044
00045 std::string StreamerInputSource::processName_;
00046 unsigned int StreamerInputSource::protocolVersion_;
00047
00048
00049 StreamerInputSource::StreamerInputSource(
00050 ParameterSet const& pset,
00051 InputSourceDescription const& desc):
00052 InputSource(pset, desc),
00053
00054
00055 inputFileTransitionsEachEvent_(
00056 pset.getUntrackedParameter<bool>("inputFileTransitionsEachEvent", false)),
00057 newRun_(true),
00058 newLumi_(true),
00059 ep_(),
00060 tc_(getTClass(typeid(SendEvent))),
00061 dest_(init_size),
00062 xbuf_(TBuffer::kRead, init_size),
00063 runEndingFlag_(false),
00064 productGetter_()
00065 {
00066 }
00067
00068 StreamerInputSource::~StreamerInputSource() {}
00069
00070
00071 boost::shared_ptr<FileBlock>
00072 StreamerInputSource::readFile_() {
00073 productRegistryUpdate().setProductIDs(productRegistry()->nextID());
00074 return boost::shared_ptr<FileBlock>(new FileBlock);
00075 }
00076
00077 void
00078 StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header,
00079 ProductRegistry& reg, bool subsequent) {
00080
00081 SendDescs const& descs = header.descs();
00082
00083 SendDescs::const_iterator i(descs.begin()), e(descs.end());
00084
00085 FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
00086
00087 if (subsequent) {
00088 ProductRegistry pReg;
00089 for(; i != e; ++i) {
00090 pReg.copyProduct(*i);
00091 FDEBUG(6) << "StreamInput prod = " << i->className() << std::endl;
00092 }
00093 std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
00094 if (!mergeInfo.empty()) {
00095 throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
00096 }
00097 } else {
00098 declareStreamers(descs);
00099 buildClassCache(descs);
00100 loadExtraClasses();
00101 for(; i != e; ++i) {
00102 i->setDefaultTransients();
00103 i->init();
00104 reg.copyProduct(*i);
00105 FDEBUG(6) << "StreamInput prod = " << i->className() << std::endl;
00106 }
00107 }
00108 reg.setNextID(header.nextID());
00109 }
00110
00111 void
00112 StreamerInputSource::declareStreamers(SendDescs const& descs) {
00113 SendDescs::const_iterator i(descs.begin()), e(descs.end());
00114
00115 for(; i != e; ++i) {
00116
00117 std::string const real_name = wrappedClassName(i->className());
00118 FDEBUG(6) << "declare: " << real_name << std::endl;
00119 loadCap(real_name);
00120 }
00121 }
00122
00123
00124 void
00125 StreamerInputSource::buildClassCache(SendDescs const& descs) {
00126 SendDescs::const_iterator i(descs.begin()), e(descs.end());
00127
00128 for(; i != e; ++i) {
00129
00130 std::string const real_name = wrappedClassName(i->className());
00131 FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
00132 doBuildRealData(real_name);
00133 }
00134 }
00135
00136 void
00137 StreamerInputSource::saveTriggerNames(InitMsgView const* header) {
00138
00139 ParameterSet trigger_pset;
00140 std::vector<std::string> paths;
00141 header->hltTriggerNames(paths);
00142 trigger_pset.addParameter<Strings>("@trigger_paths", paths);
00143 pset::Registry* psetRegistry = pset::Registry::instance();
00144 psetRegistry->insertMapped(trigger_pset);
00145 }
00146
00147 boost::shared_ptr<RunPrincipal>
00148 StreamerInputSource::readRun_() {
00149 assert(newRun_);
00150 assert(runPrincipal());
00151 newRun_ = false;
00152 return runPrincipal();
00153 }
00154
00155 boost::shared_ptr<LuminosityBlockPrincipal>
00156 StreamerInputSource::readLuminosityBlock_() {
00157 assert(!newRun_);
00158 assert(newLumi_);
00159 assert(luminosityBlockPrincipal());
00160 newLumi_ = false;
00161 return luminosityBlockPrincipal();
00162 }
00163
00164 std::auto_ptr<EventPrincipal>
00165 StreamerInputSource::readEvent_() {
00166 assert(!newRun_);
00167 assert(!newLumi_);
00168 assert(ep_.get() != 0);
00169
00170 return ep_;
00171 }
00172
00173 InputSource::ItemType
00174 StreamerInputSource::getNextItemType() {
00175 if (runEndingFlag_) {
00176 return IsStop;
00177 }
00178 if(newRun_ && runPrincipal()) {
00179 return IsRun;
00180 }
00181 if(newLumi_ && luminosityBlockPrincipal()) {
00182 return IsLumi;
00183 }
00184 if (ep_.get() != 0) {
00185 return IsEvent;
00186 }
00187 if (inputFileTransitionsEachEvent_) {
00188 resetRunPrincipal();
00189 resetLuminosityBlockPrincipal();
00190 }
00191 ep_ = read();
00192 if (ep_.get() == 0) {
00193 return IsStop;
00194 } else {
00195 runEndingFlag_ = false;
00196 if (inputFileTransitionsEachEvent_) {
00197 return IsFile;
00198 }
00199 }
00200 if(newRun_) {
00201 return IsRun;
00202 } else if(newLumi_) {
00203 return IsLumi;
00204 }
00205 return IsEvent;
00206 }
00207
00212 std::auto_ptr<SendJobHeader>
00213 StreamerInputSource::deserializeRegistry(InitMsgView const& initView)
00214 {
00215 if(initView.code() != Header::INIT)
00216 throw cms::Exception("StreamTranslation","Registry deserialization error")
00217 << "received wrong message type: expected INIT, got "
00218 << initView.code() << "\n";
00219
00220
00221 if (initView.protocolVersion() > 3) {
00222
00223 processName_ = initView.processName();
00224 protocolVersion_ = initView.protocolVersion();
00225
00226 FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
00227 FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
00228 }
00229
00230 TClass* desc = getTClass(typeid(SendJobHeader));
00231
00232 RootBuffer xbuf(TBuffer::kRead, initView.descLength(),
00233 (char*)initView.descData(),kFALSE);
00234 RootDebug tracer(10,10);
00235 std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
00236
00237 if(sd.get()==0) {
00238 throw cms::Exception("StreamTranslation","Registry deserialization error")
00239 << "Could not read the initial product registry list\n";
00240 }
00241
00242 return sd;
00243 }
00244
00249 void
00250 StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent)
00251 {
00252 std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
00253 mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent);
00254 ModuleDescriptionRegistry & moduleDescriptionRegistry = *ModuleDescriptionRegistry::instance();
00255 ModuleDescriptionMap const& mdMap = sd->moduleDescriptionMap();
00256 for (ModuleDescriptionMap::const_iterator k = mdMap.begin(), kEnd = mdMap.end(); k != kEnd; ++k) {
00257 moduleDescriptionRegistry.insertMapped(k->second);
00258 }
00259 SendJobHeader::ParameterSetMap const & psetMap = sd->processParameterSet();
00260 pset::Registry& psetRegistry = *pset::Registry::instance();
00261 for (SendJobHeader::ParameterSetMap::const_iterator i = psetMap.begin(), iEnd = psetMap.end(); i != iEnd; ++i) {
00262 psetRegistry.insertMapped(ParameterSet(i->second.pset_));
00263 }
00264 }
00265
00269 std::auto_ptr<EventPrincipal>
00270 StreamerInputSource::deserializeEvent(EventMsgView const& eventView)
00271 {
00272 if(eventView.code() != Header::EVENT)
00273 throw cms::Exception("StreamTranslation","Event deserialization error")
00274 << "received wrong message type: expected EVENT, got "
00275 << eventView.code() << "\n";
00276 FDEBUG(9) << "Decode event: "
00277 << eventView.event() << " "
00278 << eventView.run() << " "
00279 << eventView.size() << " "
00280 << eventView.eventLength() << " "
00281 << eventView.eventData()
00282 << std::endl;
00283 EventSourceSentry(*this);
00284
00285
00286
00287 unsigned long origsize = eventView.origDataSize();
00288 unsigned long dest_size;
00289
00290 if(origsize != 78 && origsize != 0)
00291 {
00292
00293 dest_size = uncompressBuffer((unsigned char*)eventView.eventData(),
00294 eventView.eventLength(), dest_, origsize);
00295 }
00296 else
00297 {
00298
00299 dest_size = eventView.eventLength();
00300 dest_.resize(dest_size);
00301 unsigned char* pos = (unsigned char*) &dest_[0];
00302 unsigned char* from = (unsigned char*) eventView.eventData();
00303 std::copy(from,from+dest_size,pos);
00304 }
00305
00306
00307
00308
00309 xbuf_.Reset();
00310 xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
00311 RootDebug tracer(10,10);
00312
00313 setRefCoreStreamer(&productGetter_);
00314 std::auto_ptr<SendEvent> sd((SendEvent*)xbuf_.ReadObjectAny(tc_));
00315
00316 if(sd.get()==0) {
00317 throw cms::Exception("StreamTranslation","Event deserialization error")
00318 << "got a null event from input stream\n";
00319 }
00320 sd->processHistory().setDefaultTransients();
00321 ProcessHistoryRegistry::instance()->insertMapped(sd->processHistory());
00322
00323 FDEBUG(5) << "Got event: " << sd->aux().id() << " " << sd->products().size() << std::endl;
00324 if(!runPrincipal() || runPrincipal()->run() != sd->aux().run()) {
00325 newRun_ = newLumi_ = true;
00326 RunAuxiliary runAux(sd->aux().run(), sd->aux().time(), Timestamp::invalidTimestamp());
00327 setRunPrincipal(boost::shared_ptr<RunPrincipal>(
00328 new RunPrincipal(runAux,
00329 productRegistry(),
00330 processConfiguration())));
00331 resetLuminosityBlockPrincipal();
00332 }
00333 if(!luminosityBlockPrincipal() || luminosityBlockPrincipal()->luminosityBlock() != eventView.lumi()) {
00334
00335 LuminosityBlockAuxiliary lumiAux(runPrincipal()->run(), eventView.lumi(), sd->aux().time(), Timestamp::invalidTimestamp());
00336 setLuminosityBlockPrincipal(boost::shared_ptr<LuminosityBlockPrincipal>(
00337 new LuminosityBlockPrincipal(lumiAux,
00338 productRegistry(),
00339 processConfiguration())));
00340 newLumi_ = true;
00341 }
00342
00343 std::auto_ptr<EventPrincipal> ep(new EventPrincipal(sd->aux(),
00344 productRegistry(),
00345 processConfiguration(),
00346 sd->processHistory().id()));
00347 productGetter_.setEventPrincipal(ep.get());
00348
00349
00350
00351 ProductID largestID;
00352 SendProds & sps = sd->products();
00353 for(SendProds::iterator spi = sps.begin(), spe = sps.end(); spi != spe; ++spi) {
00354 FDEBUG(10) << "check prodpair" << std::endl;
00355 if(spi->desc() == 0)
00356 throw cms::Exception("StreamTranslation","Empty Provenance");
00357 FDEBUG(5) << "Prov:"
00358 << " " << spi->desc()->className()
00359 << " " << spi->desc()->productInstanceName()
00360 << " " << spi->desc()->branchID()
00361 << std::endl;
00362
00363 ConstBranchDescription branchDesc(*spi->desc());
00364
00365 boost::shared_ptr<EventEntryInfo> eventEntryDesc(
00366 new EventEntryInfo(spi->branchID(),
00367 spi->status(),
00368 spi->mod(),
00369 spi->productID(),
00370 *spi->parents()));
00371
00372 ep->branchMapperPtr()->insert(*eventEntryDesc);
00373 if(spi->productID() > largestID) {
00374 largestID = spi->productID();
00375 }
00376 if(spi->prod() != 0) {
00377 std::auto_ptr<EDProduct> aprod(const_cast<EDProduct*>(spi->prod()));
00378 FDEBUG(10) << "addgroup next " << spi->branchID() << std::endl;
00379 ep->addGroup(aprod, branchDesc, eventEntryDesc);
00380 FDEBUG(10) << "addgroup done" << std::endl;
00381 } else {
00382 FDEBUG(10) << "addgroup empty next " << spi->branchID() << std::endl;
00383 ep->addGroup(branchDesc, eventEntryDesc);
00384 FDEBUG(10) << "addgroup empty done" << std::endl;
00385 }
00386 spi->clear();
00387 }
00388
00389 if(largestID.id() >= productRegistry()->nextID()) {
00390 edm::LogError("MetaDataError")<<"The input file has a critical problem, the 'nextID' for the ProductRegistry ("
00391 <<productRegistry()->nextID()
00392 <<")\n is less than the largest ProductID ("
00393 <<largestID.id()
00394 <<") used in a previous process.\n"
00395 " Will modify the ProductRegistry to attempt to correct the problem,\n"
00396 " although it is possible that edm::Ref*'s or edm::Ptr's may still fail.\n"
00397 " Please contact StreamerOutputModule developers.";
00398
00399
00400
00401 productRegistryUpdate().setNextID(largestID.id()+1);
00402 productRegistryUpdate().setProductIDs(largestID.id()+1);
00403 }
00404 FDEBUG(10) << "Size = " << ep->size() << std::endl;
00405
00406 return ep;
00407 }
00408
00417 unsigned int
00418 StreamerInputSource::uncompressBuffer(unsigned char *inputBuffer,
00419 unsigned int inputSize,
00420 std::vector<unsigned char> &outputBuffer,
00421 unsigned int expectedFullSize)
00422 {
00423 unsigned long origSize = expectedFullSize;
00424 unsigned long uncompressedSize = expectedFullSize;
00425 FDEBUG(1) << "Uncompress: original size = " << origSize
00426 << ", compressed size = " << inputSize
00427 << std::endl;
00428 outputBuffer.resize(origSize);
00429 int ret = uncompress(&outputBuffer[0], &uncompressedSize,
00430 inputBuffer, inputSize);
00431
00432 if(ret == Z_OK) {
00433
00434 FDEBUG(10) << " original size = " << origSize << " final size = "
00435 << uncompressedSize << std::endl;
00436 if(origSize != uncompressedSize) {
00437 std::cerr << "deserializeEvent: Problem with uncompress, original size = "
00438 << origSize << " uncompress size = " << uncompressedSize << std::endl;
00439
00440 throw cms::Exception("StreamDeserialization","Uncompression error")
00441 << "mismatch event lengths should be" << origSize << " got "
00442 << uncompressedSize << "\n";
00443 }
00444 } else {
00445
00446 std::cerr << "deserializeEvent: Problem with uncompress, return value = "
00447 << ret << std::endl;
00448 throw cms::Exception("StreamDeserialization","Uncompression error")
00449 << "Error code = " << ret << "\n ";
00450 }
00451
00452 return (unsigned int) uncompressedSize;
00453 }
00454
00455 void StreamerInputSource::resetAfterEndRun()
00456 {
00457
00458
00459 assert(ep_.get() == 0);
00460 reset();
00461 runEndingFlag_ = false;
00462 }
00463
00464 void StreamerInputSource::setRun(RunNumber_t)
00465 {
00466
00467
00468 throw edm::Exception(edm::errors::LogicError)
00469 << "StreamerInputSource::setRun()\n"
00470 << "Run number cannot be modified for this type of Input Source\n"
00471 << "Contact a Storage Manager Developer\n";
00472 }
00473
00474 StreamerInputSource::ProductGetter::ProductGetter() : eventPrincipal_(0) {}
00475
00476 StreamerInputSource::ProductGetter::~ProductGetter() {}
00477
00478 EDProduct const*
00479 StreamerInputSource::ProductGetter::getIt(edm::ProductID const& id) const {
00480 return eventPrincipal_ ? eventPrincipal_->getIt(id) : 0;
00481 }
00482
00483 void
00484 StreamerInputSource::ProductGetter::setEventPrincipal(EventPrincipal *ep) {
00485 eventPrincipal_ = ep;
00486 }
00487 }