CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_5/src/IOPool/Streamer/src/StreamerInputSource.cc

Go to the documentation of this file.
00001 #include "IOPool/Streamer/interface/StreamerInputSource.h"
00002 
00003 #include "IOPool/Streamer/interface/EventMessage.h"
00004 #include "IOPool/Streamer/interface/InitMessage.h"
00005 #include "IOPool/Streamer/interface/ClassFiller.h"
00006 
00007 #include "FWCore/Framework/interface/EventPrincipal.h"
00008 #include "FWCore/Framework/interface/FileBlock.h"
00009 #include "FWCore/ParameterSet/interface/FillProductRegistryTransients.h"
00010 #include "DataFormats/Provenance/interface/BranchDescription.h"
00011 #include "DataFormats/Provenance/interface/ProductProvenance.h"
00012 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
00013 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
00014 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
00015 #include "DataFormats/Provenance/interface/EventSelectionID.h"
00016 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
00017 #include "DataFormats/Provenance/interface/BranchListIndex.h"
00018 
00019 #include "zlib.h"
00020 
00021 #include "DataFormats/Common/interface/RefCoreStreamer.h"
00022 #include "FWCore/Utilities/interface/WrappedClassName.h"
00023 #include "FWCore/Utilities/interface/Exception.h"
00024 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00025 #include "FWCore/ParameterSet/interface/Registry.h"
00026 #include "FWCore/Utilities/interface/EDMException.h"
00027 #include "FWCore/Utilities/interface/ThreadSafeRegistry.h"
00028 #include "FWCore/Utilities/interface/Adler32Calculator.h"
00029 
00030 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00031 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00032 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00033 #include "FWCore/Utilities/interface/DebugMacros.h"
00034 
00035 #include <string>
00036 #include <iostream>
00037 #include <set>
00038 
00039 namespace edm {
00040   namespace {
00041     int const init_size = 1024*1024;
00042   }
00043 
00044   std::string StreamerInputSource::processName_;
00045   unsigned int StreamerInputSource::protocolVersion_;
00046 
00047 
00048   StreamerInputSource::StreamerInputSource(
00049                     ParameterSet const& pset,
00050                     InputSourceDescription const& desc):
00051     RawInputSource(pset, desc),
00052     tc_(getTClass(typeid(SendEvent))),
00053     dest_(init_size),
00054     xbuf_(TBuffer::kRead, init_size),
00055     sendEvent_(),
00056     productGetter_(),
00057     adjustEventToNewProductRegistry_(false) {
00058   }
00059 
00060   StreamerInputSource::~StreamerInputSource() {}
00061 
00062   // ---------------------------------------
00063   std::unique_ptr<FileBlock>
00064   StreamerInputSource::readFile_() {
00065     return std::unique_ptr<FileBlock>(new FileBlock);
00066   }
00067 
00068   void
00069   StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header, ProductRegistry& reg, BranchIDListHelper& branchIDListHelper, bool subsequent) {
00070 
00071     SendDescs const& descs = header.descs();
00072 
00073     FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
00074 
00075     if (subsequent) {
00076       ProductRegistry pReg;
00077       pReg.updateFromInput(descs);
00078       fillProductRegistryTransients(header.processConfigurations(), pReg);
00079       std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
00080       if (!mergeInfo.empty()) {
00081         throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
00082       }
00083       branchIDListHelper.updateFromInput(header.branchIDLists());
00084     } else {
00085       declareStreamers(descs);
00086       buildClassCache(descs);
00087       loadExtraClasses();
00088       reg.updateFromInput(descs);
00089       fillProductRegistryTransients(header.processConfigurations(), reg);
00090       branchIDListHelper.updateFromInput(header.branchIDLists());
00091     }
00092   }
00093 
00094   void
00095   StreamerInputSource::declareStreamers(SendDescs const& descs) {
00096     SendDescs::const_iterator i(descs.begin()), e(descs.end());
00097 
00098     for(; i != e; ++i) {
00099         //pi->init();
00100         std::string const real_name = wrappedClassName(i->className());
00101         FDEBUG(6) << "declare: " << real_name << std::endl;
00102         loadCap(real_name);
00103     }
00104   }
00105 
00106 
00107   void
00108   StreamerInputSource::buildClassCache(SendDescs const& descs) {
00109     SendDescs::const_iterator i(descs.begin()), e(descs.end());
00110 
00111     for(; i != e; ++i) {
00112         //pi->init();
00113         std::string const real_name = wrappedClassName(i->className());
00114         FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
00115         doBuildRealData(real_name);
00116     }
00117   }
00118 
00123   std::auto_ptr<SendJobHeader>
00124   StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
00125     if(initView.code() != Header::INIT)
00126       throw cms::Exception("StreamTranslation","Registry deserialization error")
00127         << "received wrong message type: expected INIT, got "
00128         << initView.code() << "\n";
00129 
00130     //Get the process name and store if for Protocol version 4 and above.
00131     if (initView.protocolVersion() > 3) {
00132 
00133          processName_ = initView.processName();
00134          protocolVersion_ = initView.protocolVersion();
00135 
00136          FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
00137          FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
00138     }
00139 
00140    // calculate the adler32 checksum
00141    uint32_t adler32_chksum = cms::Adler32((char*)initView.descData(),initView.descLength());
00142    //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
00143    //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
00144    //          << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
00145     if((uint32)adler32_chksum != initView.adler32_chksum()) {
00146       std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed "
00147                 << " chksum from registry data = " << adler32_chksum << " from header = "
00148                 << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
00149       // skip event (based on option?) or throw exception?
00150     }
00151 
00152     TClass* desc = getTClass(typeid(SendJobHeader));
00153 
00154     TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
00155                  (char*)initView.descData(),kFALSE);
00156     RootDebug tracer(10,10);
00157     std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
00158 
00159     if(sd.get()==0) {
00160         throw cms::Exception("StreamTranslation","Registry deserialization error")
00161           << "Could not read the initial product registry list\n";
00162     }
00163 
00164     return sd;
00165   }
00166 
00171   void
00172   StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
00173      std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
00174      ProcessConfigurationVector const& pcv = sd->processConfigurations();
00175      mergeIntoRegistry(*sd, productRegistryUpdate(), *branchIDListHelper(), subsequent);
00176      if (subsequent) {
00177        adjustEventToNewProductRegistry_ = true;
00178      }
00179      SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
00180      pset::Registry& psetRegistry = *pset::Registry::instance();
00181      for (SendJobHeader::ParameterSetMap::const_iterator i = psetMap.begin(), iEnd = psetMap.end(); i != iEnd; ++i) {
00182        ParameterSet pset(i->second.pset());
00183        pset.setID(i->first);
00184        psetRegistry.insertMapped(pset);
00185      }
00186      ProcessConfigurationRegistry& pcReg = *ProcessConfigurationRegistry::instance();
00187      for (ProcessConfigurationVector::const_iterator it = pcv.begin(), itEnd = pcv.end(); it != itEnd; ++it) {
00188        pcReg.insertMapped(*it);
00189      }
00190   }
00191 
00195   void
00196   StreamerInputSource::deserializeEvent(EventMsgView const& eventView) {
00197     if(eventView.code() != Header::EVENT)
00198       throw cms::Exception("StreamTranslation","Event deserialization error")
00199         << "received wrong message type: expected EVENT, got "
00200         << eventView.code() << "\n";
00201     FDEBUG(9) << "Decode event: "
00202          << eventView.event() << " "
00203          << eventView.run() << " "
00204          << eventView.size() << " "
00205          << eventView.adler32_chksum() << " "
00206          << eventView.eventLength() << " "
00207          << eventView.eventData()
00208          << std::endl;
00209     EventSourceSentry sentry(*this);
00210     // uncompress if we need to
00211     // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
00212     // need to get rid of this when 090 MTCC streamers are gotten rid of
00213     unsigned long origsize = eventView.origDataSize();
00214     unsigned long dest_size; //(should be >= eventView.origDataSize())
00215 
00216     uint32_t adler32_chksum = cms::Adler32((char*)eventView.eventData(), eventView.eventLength());
00217     //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
00218     //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
00219     //          << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
00220     if((uint32)adler32_chksum != eventView.adler32_chksum()) {
00221       std::cerr << "Error from StreamerInputSource: checksum of event data blob failed "
00222                 << " chksum from event = " << adler32_chksum << " from header = "
00223                 << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
00224       // skip event (based on option?) or throw exception?
00225     }
00226     if(origsize != 78 && origsize != 0) {
00227       // compressed
00228       dest_size = uncompressBuffer((unsigned char*)eventView.eventData(),
00229                                    eventView.eventLength(), dest_, origsize);
00230     } else { // not compressed
00231       // we need to copy anyway the buffer as we are using dest in xbuf
00232       dest_size = eventView.eventLength();
00233       dest_.resize(dest_size);
00234       unsigned char* pos = (unsigned char*) &dest_[0];
00235       unsigned char* from = (unsigned char*) eventView.eventData();
00236       std::copy(from,from+dest_size,pos);
00237     }
00238     //TBuffer xbuf(TBuffer::kRead, dest_size,
00239     //             (char*) &dest[0],kFALSE);
00240     //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
00241     //             (char*) eventView.eventData(),kFALSE);
00242     xbuf_.Reset();
00243     xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
00244     RootDebug tracer(10,10);
00245 
00246     setRefCoreStreamer(&productGetter_);
00247     sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
00248     setRefCoreStreamer();
00249 
00250     if(sendEvent_.get()==0) {
00251         throw cms::Exception("StreamTranslation","Event deserialization error")
00252           << "got a null event from input stream\n";
00253     }
00254     ProcessHistoryRegistry::instance()->insertMapped(sendEvent_->processHistory());
00255 
00256     FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
00257     if(runAuxiliary().get() == 0 || runAuxiliary()->run() != sendEvent_->aux().run()) {
00258       RunAuxiliary* runAuxiliary = new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
00259       runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
00260       setRunAuxiliary(runAuxiliary);
00261       resetLuminosityBlockAuxiliary();
00262     }
00263     if(!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
00264       LuminosityBlockAuxiliary* luminosityBlockAuxiliary =
00265         new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
00266       luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
00267       setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
00268     }
00269     setEventCached();
00270   }
00271 
00272   EventPrincipal *
00273   StreamerInputSource::read(EventPrincipal& eventPrincipal) {
00274     if(adjustEventToNewProductRegistry_) {
00275       eventPrincipal.adjustIndexesAfterProductRegistryAddition();
00276       bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
00277       assert(eventOK);
00278       adjustEventToNewProductRegistry_ = false;
00279     }
00280     boost::shared_ptr<EventSelectionIDVector> ids(new EventSelectionIDVector(sendEvent_->eventSelectionIDs()));
00281     boost::shared_ptr<BranchListIndexes> indexes(new BranchListIndexes(sendEvent_->branchListIndexes()));
00282     branchIDListHelper()->fixBranchListIndexes(*indexes);
00283     eventPrincipal.fillEventPrincipal(sendEvent_->aux(), ids, indexes);
00284     productGetter_.setEventPrincipal(&eventPrincipal);
00285 
00286     // no process name list handling
00287 
00288     SendProds & sps = sendEvent_->products();
00289     for(SendProds::iterator spi = sps.begin(), spe = sps.end(); spi != spe; ++spi) {
00290         FDEBUG(10) << "check prodpair" << std::endl;
00291         if(spi->desc() == 0)
00292           throw cms::Exception("StreamTranslation","Empty Provenance");
00293         FDEBUG(5) << "Prov:"
00294              << " " << spi->desc()->className()
00295              << " " << spi->desc()->productInstanceName()
00296              << " " << spi->desc()->branchID()
00297              << std::endl;
00298 
00299         ConstBranchDescription branchDesc(*spi->desc());
00300         // This ProductProvenance constructor inserts into the entry description registry
00301         ProductProvenance productProvenance(spi->branchID(), *spi->parents());
00302 
00303         if(spi->prod() != 0) {
00304           FDEBUG(10) << "addproduct next " << spi->branchID() << std::endl;
00305           eventPrincipal.putOnRead(branchDesc, spi->prod(), productProvenance);
00306           FDEBUG(10) << "addproduct done" << std::endl;
00307         } else {
00308           FDEBUG(10) << "addproduct empty next " << spi->branchID() << std::endl;
00309           eventPrincipal.putOnRead(branchDesc, spi->prod(), productProvenance);
00310           FDEBUG(10) << "addproduct empty done" << std::endl;
00311         }
00312         spi->clear();
00313     }
00314 
00315     FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
00316 
00317     return &eventPrincipal;
00318   }
00319 
00328   unsigned int
00329   StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
00330                                         unsigned int inputSize,
00331                                         std::vector<unsigned char>& outputBuffer,
00332                                         unsigned int expectedFullSize) {
00333     unsigned long origSize = expectedFullSize;
00334     unsigned long uncompressedSize = expectedFullSize*1.1;
00335     FDEBUG(1) << "Uncompress: original size = " << origSize
00336               << ", compressed size = " << inputSize
00337               << std::endl;
00338     outputBuffer.resize(uncompressedSize);
00339     int ret = uncompress(&outputBuffer[0], &uncompressedSize,
00340                          inputBuffer, inputSize); // do not need compression level
00341     //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
00342     if(ret == Z_OK) {
00343         // check the length against original uncompressed length
00344         FDEBUG(10) << " original size = " << origSize << " final size = "
00345                    << uncompressedSize << std::endl;
00346         if(origSize != uncompressedSize) {
00347             std::cerr << "deserializeEvent: Problem with uncompress, original size = "
00348                  << origSize << " uncompress size = " << uncompressedSize << std::endl;
00349             // we throw an error and return without event! null pointer
00350             throw cms::Exception("StreamDeserialization","Uncompression error")
00351               << "mismatch event lengths should be" << origSize << " got "
00352               << uncompressedSize << "\n";
00353         }
00354     } else {
00355         // we throw an error and return without event! null pointer
00356         std::cerr << "deserializeEvent: Problem with uncompress, return value = "
00357              << ret << std::endl;
00358         throw cms::Exception("StreamDeserialization","Uncompression error")
00359             << "Error code = " << ret << "\n ";
00360     }
00361     return (unsigned int) uncompressedSize;
00362   }
00363 
00364   void StreamerInputSource::resetAfterEndRun() {
00365      // called from an online streamer source to reset after a stop command
00366      // so an enable command will work
00367      resetLuminosityBlockAuxiliary();
00368      resetRunAuxiliary();
00369      assert(!eventCached());
00370      reset();
00371   }
00372 
00373   void StreamerInputSource::setRun(RunNumber_t) {
00374      // Need to define a dummy setRun here or else the InputSource::setRun is called
00375      // if we have a source inheriting from this and wants to define a setRun method
00376      throw Exception(errors::LogicError)
00377      << "StreamerInputSource::setRun()\n"
00378      << "Run number cannot be modified for this type of Input Source\n"
00379      << "Contact a Storage Manager Developer\n";
00380   }
00381 
00382   StreamerInputSource::ProductGetter::ProductGetter() : eventPrincipal_(0) {}
00383 
00384   StreamerInputSource::ProductGetter::~ProductGetter() {}
00385 
00386   WrapperHolder
00387   StreamerInputSource::ProductGetter::getIt(ProductID const& id) const {
00388     return eventPrincipal_ ? eventPrincipal_->getIt(id) : WrapperHolder();
00389   }
00390 
00391   void
00392   StreamerInputSource::ProductGetter::setEventPrincipal(EventPrincipal *ep) {
00393     eventPrincipal_ = ep;
00394   }
00395 
00396   void
00397   StreamerInputSource::fillDescription(ParameterSetDescription& desc) {
00398     RawInputSource::fillDescription(desc);
00399   }
00400 }