CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch13/src/IOPool/Streamer/src/StreamerInputSource.cc

Go to the documentation of this file.
00001 #include "IOPool/Streamer/interface/StreamerInputSource.h"
00002 
00003 #include "IOPool/Streamer/interface/EventMessage.h"
00004 #include "IOPool/Streamer/interface/InitMessage.h"
00005 #include "IOPool/Streamer/interface/ClassFiller.h"
00006 
00007 #include "FWCore/Framework/interface/EventPrincipal.h"
00008 #include "FWCore/Framework/interface/FileBlock.h"
00009 #include "FWCore/Framework/src/PrincipalCache.h"
00010 #include "FWCore/ParameterSet/interface/FillProductRegistryTransients.h"
00011 #include "DataFormats/Provenance/interface/BranchDescription.h"
00012 #include "DataFormats/Provenance/interface/ProductProvenance.h"
00013 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
00014 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
00015 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
00016 #include "DataFormats/Provenance/interface/EventSelectionID.h"
00017 #include "DataFormats/Provenance/interface/BranchListIndex.h"
00018 
00019 #include "zlib.h"
00020 
00021 #include "DataFormats/Common/interface/RefCoreStreamer.h"
00022 #include "FWCore/Utilities/interface/WrappedClassName.h"
00023 #include "FWCore/Utilities/interface/Exception.h"
00024 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00025 #include "FWCore/ParameterSet/interface/Registry.h"
00026 #include "FWCore/Utilities/interface/EDMException.h"
00027 #include "FWCore/Utilities/interface/ThreadSafeRegistry.h"
00028 #include "FWCore/Utilities/interface/Adler32Calculator.h"
00029 
00030 #include "DataFormats/Provenance/interface/BranchIDListRegistry.h"
00031 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00032 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00033 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00034 #include "FWCore/Utilities/interface/DebugMacros.h"
00035 
00036 #include <string>
00037 #include <iostream>
00038 #include <set>
00039 
00040 namespace edm {
00041   namespace {
00042     int const init_size = 1024*1024;
00043   }
00044 
00045   std::string StreamerInputSource::processName_;
00046   unsigned int StreamerInputSource::protocolVersion_;
00047 
00048 
00049   StreamerInputSource::StreamerInputSource(
00050                     ParameterSet const& pset,
00051                     InputSourceDescription const& desc):
00052     InputSource(pset, desc),
00053     // The default value for the following parameter get defined in at least one derived class
00054     // where it has a different default value.
00055     inputFileTransitionsEachEvent_(
00056       pset.getUntrackedParameter<bool>("inputFileTransitionsEachEvent", false)),
00057     newRun_(true),
00058     newLumi_(true),
00059     eventCached_(false),
00060     tc_(getTClass(typeid(SendEvent))),
00061     dest_(init_size),
00062     xbuf_(TBuffer::kRead, init_size),
00063     runEndingFlag_(false),
00064     productGetter_() {
00065   }
00066 
00067   StreamerInputSource::~StreamerInputSource() {}
00068 
00069   // ---------------------------------------
00070   boost::shared_ptr<FileBlock>
00071   StreamerInputSource::readFile_() {
00072     return boost::shared_ptr<FileBlock>(new FileBlock);
00073   }
00074 
00075   void
00076   StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header, ProductRegistry& reg, bool subsequent) {
00077 
00078     SendDescs const& descs = header.descs();
00079 
00080     FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
00081 
00082     if (subsequent) {
00083       ProductRegistry pReg;
00084       pReg.updateFromInput(descs);
00085       fillProductRegistryTransients(header.processConfigurations(), pReg);
00086       std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
00087       if (!mergeInfo.empty()) {
00088         throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
00089       }
00090       BranchIDListHelper::updateFromInput(header.branchIDLists(), std::string());
00091     } else {
00092       declareStreamers(descs);
00093       buildClassCache(descs);
00094       loadExtraClasses();
00095       reg.updateFromInput(descs);
00096       fillProductRegistryTransients(header.processConfigurations(), reg);
00097       BranchIDListHelper::updateFromInput(header.branchIDLists(), std::string());
00098     }
00099   }
00100 
00101   void
00102   StreamerInputSource::declareStreamers(SendDescs const& descs) {
00103     SendDescs::const_iterator i(descs.begin()), e(descs.end());
00104 
00105     for(; i != e; ++i) {
00106         //pi->init();
00107         std::string const real_name = wrappedClassName(i->className());
00108         FDEBUG(6) << "declare: " << real_name << std::endl;
00109         loadCap(real_name);
00110     }
00111   }
00112 
00113 
00114   void
00115   StreamerInputSource::buildClassCache(SendDescs const& descs) {
00116     SendDescs::const_iterator i(descs.begin()), e(descs.end());
00117 
00118     for(; i != e; ++i) {
00119         //pi->init();
00120         std::string const real_name = wrappedClassName(i->className());
00121         FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
00122         doBuildRealData(real_name);
00123     }
00124   }
00125 
00126   boost::shared_ptr<RunAuxiliary>
00127   StreamerInputSource::readRunAuxiliary_() {
00128     assert(newRun_);
00129     assert(runAuxiliary());
00130     newRun_ = false;
00131     return runAuxiliary();
00132   }
00133 
00134   boost::shared_ptr<LuminosityBlockAuxiliary>
00135   StreamerInputSource::readLuminosityBlockAuxiliary_() {
00136     assert(!newRun_);
00137     assert(newLumi_);
00138     assert(luminosityBlockAuxiliary());
00139     newLumi_ = false;
00140     return luminosityBlockAuxiliary();
00141   }
00142 
00143   EventPrincipal*
00144   StreamerInputSource::readEvent_() {
00145     assert(!newRun_);
00146     assert(!newLumi_);
00147     assert(eventCached_);
00148     eventCached_ = false;
00149     return eventPrincipalCache();
00150   }
00151 
00152   InputSource::ItemType
00153   StreamerInputSource::getNextItemType() {
00154     if (runEndingFlag_) {
00155       return IsStop;
00156     }
00157     if(newRun_ && runAuxiliary()) {
00158       return IsRun;
00159     }
00160     if(newLumi_ && luminosityBlockAuxiliary()) {
00161       return IsLumi;
00162     }
00163     if (eventCached_) {
00164       return IsEvent;
00165     }
00166     if (inputFileTransitionsEachEvent_) {
00167       resetRunAuxiliary();
00168       resetLuminosityBlockAuxiliary();
00169     }
00170     read();
00171     if (!eventCached_) {
00172       return IsStop;
00173     } else {
00174       runEndingFlag_ = false;
00175       if (inputFileTransitionsEachEvent_) {
00176         return IsFile;
00177       }
00178     }
00179     if(newRun_) {
00180       return IsRun;
00181     } else if(newLumi_) {
00182       return IsLumi;
00183     }
00184     return IsEvent;
00185   }
00186 
00191   std::auto_ptr<SendJobHeader>
00192   StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
00193     if(initView.code() != Header::INIT)
00194       throw cms::Exception("StreamTranslation","Registry deserialization error")
00195         << "received wrong message type: expected INIT, got "
00196         << initView.code() << "\n";
00197 
00198     //Get the process name and store if for Protocol version 4 and above.
00199     if (initView.protocolVersion() > 3) {
00200 
00201          processName_ = initView.processName();
00202          protocolVersion_ = initView.protocolVersion();
00203 
00204          FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
00205          FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
00206     }
00207 
00208    // calculate the adler32 checksum
00209    uint32_t adler32_chksum = cms::Adler32((char*)initView.descData(),initView.descLength());
00210    //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
00211    //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
00212    //          << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
00213     if((uint32)adler32_chksum != initView.adler32_chksum()) {
00214       std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed "
00215                 << " chksum from registry data = " << adler32_chksum << " from header = "
00216                 << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
00217       // skip event (based on option?) or throw exception?
00218     }
00219 
00220     TClass* desc = getTClass(typeid(SendJobHeader));
00221 
00222     TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
00223                  (char*)initView.descData(),kFALSE);
00224     RootDebug tracer(10,10);
00225     std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
00226 
00227     if(sd.get()==0) {
00228         throw cms::Exception("StreamTranslation","Registry deserialization error")
00229           << "Could not read the initial product registry list\n";
00230     }
00231 
00232     return sd;
00233   }
00234 
00239   void
00240   StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
00241      std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
00242      ProcessConfigurationVector const& pcv = sd->processConfigurations();
00243      mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent);
00244      if (subsequent) {
00245        principalCache().adjustEventToNewProductRegistry(productRegistry());
00246      }
00247      SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
00248      pset::Registry& psetRegistry = *pset::Registry::instance();
00249      for (SendJobHeader::ParameterSetMap::const_iterator i = psetMap.begin(), iEnd = psetMap.end(); i != iEnd; ++i) {
00250        ParameterSet pset(i->second.pset());
00251        pset.setID(i->first);
00252        psetRegistry.insertMapped(pset);
00253      }
00254      ProcessConfigurationRegistry& pcReg = *ProcessConfigurationRegistry::instance();
00255      for (ProcessConfigurationVector::const_iterator it = pcv.begin(), itEnd = pcv.end(); it != itEnd; ++it) {
00256        pcReg.insertMapped(*it);
00257      }
00258   }
00259 
00263   EventPrincipal*
00264   StreamerInputSource::deserializeEvent(EventMsgView const& eventView) {
00265     if(eventView.code() != Header::EVENT)
00266       throw cms::Exception("StreamTranslation","Event deserialization error")
00267         << "received wrong message type: expected EVENT, got "
00268         << eventView.code() << "\n";
00269     FDEBUG(9) << "Decode event: "
00270          << eventView.event() << " "
00271          << eventView.run() << " "
00272          << eventView.size() << " "
00273          << eventView.adler32_chksum() << " "
00274          << eventView.eventLength() << " "
00275          << eventView.eventData()
00276          << std::endl;
00277     EventSourceSentry(*this);
00278     // uncompress if we need to
00279     // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
00280     // need to get rid of this when 090 MTCC streamers are gotten rid of
00281     unsigned long origsize = eventView.origDataSize();
00282     unsigned long dest_size; //(should be >= eventView.origDataSize())
00283 
00284     uint32_t adler32_chksum = cms::Adler32((char*)eventView.eventData(), eventView.eventLength());
00285     //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
00286     //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
00287     //          << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
00288     if((uint32)adler32_chksum != eventView.adler32_chksum()) {
00289       std::cerr << "Error from StreamerInputSource: checksum of event data blob failed "
00290                 << " chksum from event = " << adler32_chksum << " from header = "
00291                 << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
00292       // skip event (based on option?) or throw exception?
00293     }
00294     if(origsize != 78 && origsize != 0) {
00295       // compressed
00296       dest_size = uncompressBuffer((unsigned char*)eventView.eventData(),
00297                                    eventView.eventLength(), dest_, origsize);
00298     } else { // not compressed
00299       // we need to copy anyway the buffer as we are using dest in xbuf
00300       dest_size = eventView.eventLength();
00301       dest_.resize(dest_size);
00302       unsigned char* pos = (unsigned char*) &dest_[0];
00303       unsigned char* from = (unsigned char*) eventView.eventData();
00304       std::copy(from,from+dest_size,pos);
00305     }
00306     //TBuffer xbuf(TBuffer::kRead, dest_size,
00307     //             (char*) &dest[0],kFALSE);
00308     //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
00309     //             (char*) eventView.eventData(),kFALSE);
00310     xbuf_.Reset();
00311     xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
00312     RootDebug tracer(10,10);
00313 
00314     setRefCoreStreamer(&productGetter_);
00315     std::auto_ptr<SendEvent> sd((SendEvent*)xbuf_.ReadObjectAny(tc_));
00316     setRefCoreStreamer();
00317 
00318     if(sd.get()==0) {
00319         throw cms::Exception("StreamTranslation","Event deserialization error")
00320           << "got a null event from input stream\n";
00321     }
00322     ProcessHistoryRegistry::instance()->insertMapped(sd->processHistory());
00323 
00324     FDEBUG(5) << "Got event: " << sd->aux().id() << " " << sd->products().size() << std::endl;
00325     if(runAuxiliary().get() == 0 || runAuxiliary()->run() != sd->aux().run()) {
00326       newRun_ = newLumi_ = true;
00327       RunAuxiliary* runAuxiliary = new RunAuxiliary(sd->aux().run(), sd->aux().time(), Timestamp::invalidTimestamp());
00328       runAuxiliary->setProcessHistoryID(sd->processHistory().id());
00329       setRunAuxiliary(runAuxiliary);
00330       readAndCacheRun();
00331       setRunPrematurelyRead();
00332     }
00333     if(!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
00334       LuminosityBlockAuxiliary* luminosityBlockAuxiliary =
00335         new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sd->aux().time(), Timestamp::invalidTimestamp());
00336       luminosityBlockAuxiliary->setProcessHistoryID(sd->processHistory().id());
00337       setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
00338       newLumi_ = true;
00339       readAndCacheLumi();
00340       setLumiPrematurelyRead();
00341     }
00342 
00343     boost::shared_ptr<EventSelectionIDVector> ids(new EventSelectionIDVector(sd->eventSelectionIDs()));
00344     boost::shared_ptr<BranchListIndexes> indexes(new BranchListIndexes(sd->branchListIndexes()));
00345     std::auto_ptr<EventAuxiliary> aux(new EventAuxiliary(sd->aux()));
00346     eventPrincipalCache()->fillEventPrincipal(aux, luminosityBlockPrincipal(), ids, indexes);
00347     productGetter_.setEventPrincipal(eventPrincipalCache());
00348     eventCached_ = true;
00349 
00350     // no process name list handling
00351 
00352     SendProds & sps = sd->products();
00353     for(SendProds::iterator spi = sps.begin(), spe = sps.end(); spi != spe; ++spi) {
00354         FDEBUG(10) << "check prodpair" << std::endl;
00355         if(spi->desc() == 0)
00356           throw cms::Exception("StreamTranslation","Empty Provenance");
00357         FDEBUG(5) << "Prov:"
00358              << " " << spi->desc()->className()
00359              << " " << spi->desc()->productInstanceName()
00360              << " " << spi->desc()->branchID()
00361              << std::endl;
00362 
00363         ConstBranchDescription branchDesc(*spi->desc());
00364         // This ProductProvenance constructor inserts into the entry description registry
00365         std::auto_ptr<ProductProvenance> productProvenance(
00366              new ProductProvenance(spi->branchID(),
00367                                    spi->status(),
00368                                    *spi->parents()));
00369 
00370         if(spi->prod() != 0) {
00371           std::auto_ptr<EDProduct> aprod(const_cast<EDProduct*>(spi->prod()));
00372           FDEBUG(10) << "addgroup next " << spi->branchID() << std::endl;
00373           eventPrincipalCache()->putOnRead(branchDesc, aprod, productProvenance);
00374           FDEBUG(10) << "addgroup done" << std::endl;
00375         } else {
00376           FDEBUG(10) << "addgroup empty next " << spi->branchID() << std::endl;
00377           eventPrincipalCache()->putOnRead(branchDesc, std::auto_ptr<EDProduct>(), productProvenance);
00378           FDEBUG(10) << "addgroup empty done" << std::endl;
00379         }
00380         spi->clear();
00381     }
00382 
00383     FDEBUG(10) << "Size = " << eventPrincipalCache()->size() << std::endl;
00384 
00385     return eventPrincipalCache();
00386   }
00387 
00396   unsigned int
00397   StreamerInputSource::uncompressBuffer(unsigned char *inputBuffer,
00398                                         unsigned int inputSize,
00399                                         std::vector<unsigned char> &outputBuffer,
00400                                         unsigned int expectedFullSize) {
00401     unsigned long origSize = expectedFullSize;
00402     unsigned long uncompressedSize = expectedFullSize*1.1;
00403     FDEBUG(1) << "Uncompress: original size = " << origSize
00404               << ", compressed size = " << inputSize
00405               << std::endl;
00406     outputBuffer.resize(uncompressedSize);
00407     int ret = uncompress(&outputBuffer[0], &uncompressedSize,
00408                          inputBuffer, inputSize); // do not need compression level
00409     //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
00410     if(ret == Z_OK) {
00411         // check the length against original uncompressed length
00412         FDEBUG(10) << " original size = " << origSize << " final size = "
00413                    << uncompressedSize << std::endl;
00414         if(origSize != uncompressedSize) {
00415             std::cerr << "deserializeEvent: Problem with uncompress, original size = "
00416                  << origSize << " uncompress size = " << uncompressedSize << std::endl;
00417             // we throw an error and return without event! null pointer
00418             throw cms::Exception("StreamDeserialization","Uncompression error")
00419               << "mismatch event lengths should be" << origSize << " got "
00420               << uncompressedSize << "\n";
00421         }
00422     } else {
00423         // we throw an error and return without event! null pointer
00424         std::cerr << "deserializeEvent: Problem with uncompress, return value = "
00425              << ret << std::endl;
00426         throw cms::Exception("StreamDeserialization","Uncompression error")
00427             << "Error code = " << ret << "\n ";
00428     }
00429     return (unsigned int) uncompressedSize;
00430   }
00431 
00432   void StreamerInputSource::resetAfterEndRun() {
00433      // called from an online streamer source to reset after a stop command
00434      // so an enable command will work
00435      resetLuminosityBlockAuxiliary();
00436      resetRunAuxiliary();
00437      newRun_ = newLumi_ = true;
00438      assert(!eventCached_);
00439      reset();
00440      runEndingFlag_ = false;
00441   }
00442 
00443   void StreamerInputSource::setRun(RunNumber_t) {
00444      // Need to define a dummy setRun here or else the InputSource::setRun is called
00445      // if we have a source inheriting from this and wants to define a setRun method
00446      throw Exception(errors::LogicError)
00447      << "StreamerInputSource::setRun()\n"
00448      << "Run number cannot be modified for this type of Input Source\n"
00449      << "Contact a Storage Manager Developer\n";
00450   }
00451 
00452   StreamerInputSource::ProductGetter::ProductGetter() : eventPrincipal_(0) {}
00453 
00454   StreamerInputSource::ProductGetter::~ProductGetter() {}
00455 
00456   EDProduct const*
00457   StreamerInputSource::ProductGetter::getIt(ProductID const& id) const {
00458     return eventPrincipal_ ? eventPrincipal_->getIt(id) : 0;
00459   }
00460 
00461   void
00462   StreamerInputSource::ProductGetter::setEventPrincipal(EventPrincipal *ep) {
00463     eventPrincipal_ = ep;
00464   }
00465 
00466   void
00467   StreamerInputSource::fillDescription(ParameterSetDescription& desc) {
00468     // The default value for "inputFileTransitionsEachEvent" gets defined in the derived class
00469     // as it depends on the derived class. So, we cannot redefine it here.
00470     InputSource::fillDescription(desc);
00471   }
00472 }