CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_13_patch3/src/IOPool/Streamer/src/StreamerInputSource.cc

Go to the documentation of this file.
00001 #include "IOPool/Streamer/interface/StreamerInputSource.h"
00002 
00003 #include "IOPool/Streamer/interface/EventMessage.h"
00004 #include "IOPool/Streamer/interface/InitMessage.h"
00005 #include "IOPool/Streamer/interface/ClassFiller.h"
00006 
00007 #include "FWCore/Framework/interface/EventPrincipal.h"
00008 #include "FWCore/Framework/interface/FileBlock.h"
00009 #include "FWCore/Framework/src/PrincipalCache.h"
00010 #include "FWCore/ParameterSet/interface/FillProductRegistryTransients.h"
00011 #include "DataFormats/Provenance/interface/BranchDescription.h"
00012 #include "DataFormats/Provenance/interface/ProductProvenance.h"
00013 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
00014 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
00015 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
00016 #include "DataFormats/Provenance/interface/EventSelectionID.h"
00017 #include "DataFormats/Provenance/interface/BranchListIndex.h"
00018 
00019 #include "zlib.h"
00020 
00021 #include "DataFormats/Common/interface/RefCoreStreamer.h"
00022 #include "FWCore/Utilities/interface/WrappedClassName.h"
00023 #include "FWCore/Utilities/interface/Exception.h"
00024 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00025 #include "FWCore/ParameterSet/interface/Registry.h"
00026 #include "FWCore/Utilities/interface/EDMException.h"
00027 #include "FWCore/Utilities/interface/ThreadSafeRegistry.h"
00028 #include "FWCore/Utilities/interface/Adler32Calculator.h"
00029 
00030 #include "DataFormats/Provenance/interface/BranchIDListRegistry.h"
00031 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00032 #include "DataFormats/Provenance/interface/ProcessConfigurationRegistry.h"
00033 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00034 #include "FWCore/Utilities/interface/DebugMacros.h"
00035 
00036 #include <string>
00037 #include <iostream>
00038 #include <set>
00039 
00040 namespace edm {
00041   namespace {
00042     int const init_size = 1024*1024;
00043   }
00044 
00045   std::string StreamerInputSource::processName_;
00046   unsigned int StreamerInputSource::protocolVersion_;
00047 
00048 
00049   StreamerInputSource::StreamerInputSource(
00050                     ParameterSet const& pset,
00051                     InputSourceDescription const& desc):
00052     InputSource(pset, desc),
00053     // The default value for the following parameter get defined in at least one derived class
00054     // where it has a different default value.
00055     inputFileTransitionsEachEvent_(
00056       pset.getUntrackedParameter<bool>("inputFileTransitionsEachEvent", false)),
00057     newRun_(true),
00058     newLumi_(true),
00059     eventCached_(false),
00060     tc_(getTClass(typeid(SendEvent))),
00061     dest_(init_size),
00062     xbuf_(TBuffer::kRead, init_size),
00063     runEndingFlag_(false),
00064     productGetter_() {
00065   }
00066 
00067   StreamerInputSource::~StreamerInputSource() {}
00068 
00069   // ---------------------------------------
00070   boost::shared_ptr<FileBlock>
00071   StreamerInputSource::readFile_() {
00072     return boost::shared_ptr<FileBlock>(new FileBlock);
00073   }
00074 
00075   void
00076   StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header, ProductRegistry& reg, bool subsequent) {
00077 
00078     SendDescs const& descs = header.descs();
00079 
00080     FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
00081 
00082     if (subsequent) {
00083       ProductRegistry pReg;
00084       pReg.updateFromInput(descs);
00085       fillProductRegistryTransients(header.processConfigurations(), pReg);
00086       std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
00087       if (!mergeInfo.empty()) {
00088         throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
00089       }
00090       BranchIDListHelper::updateFromInput(header.branchIDLists(), std::string());
00091     } else {
00092       declareStreamers(descs);
00093       buildClassCache(descs);
00094       loadExtraClasses();
00095       reg.updateFromInput(descs);
00096       fillProductRegistryTransients(header.processConfigurations(), reg);
00097       BranchIDListHelper::updateFromInput(header.branchIDLists(), std::string());
00098     }
00099   }
00100 
00101   void
00102   StreamerInputSource::declareStreamers(SendDescs const& descs) {
00103     SendDescs::const_iterator i(descs.begin()), e(descs.end());
00104 
00105     for(; i != e; ++i) {
00106         //pi->init();
00107         std::string const real_name = wrappedClassName(i->className());
00108         FDEBUG(6) << "declare: " << real_name << std::endl;
00109         loadCap(real_name);
00110     }
00111   }
00112 
00113 
00114   void
00115   StreamerInputSource::buildClassCache(SendDescs const& descs) {
00116     SendDescs::const_iterator i(descs.begin()), e(descs.end());
00117 
00118     for(; i != e; ++i) {
00119         //pi->init();
00120         std::string const real_name = wrappedClassName(i->className());
00121         FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
00122         doBuildRealData(real_name);
00123     }
00124   }
00125 
00126   boost::shared_ptr<RunAuxiliary>
00127   StreamerInputSource::readRunAuxiliary_() {
00128     assert(newRun_);
00129     assert(runAuxiliary());
00130     newRun_ = false;
00131     return runAuxiliary();
00132   }
00133 
00134   boost::shared_ptr<LuminosityBlockAuxiliary>
00135   StreamerInputSource::readLuminosityBlockAuxiliary_() {
00136     assert(!newRun_);
00137     assert(newLumi_);
00138     assert(luminosityBlockAuxiliary());
00139     newLumi_ = false;
00140     return luminosityBlockAuxiliary();
00141   }
00142 
00143   EventPrincipal*
00144   StreamerInputSource::readEvent_() {
00145     assert(!newRun_);
00146     assert(!newLumi_);
00147     assert(eventCached_);
00148     eventCached_ = false;
00149     eventPrincipalCache()->setLuminosityBlockPrincipal(luminosityBlockPrincipal());
00150     return eventPrincipalCache();
00151   }
00152 
00153   InputSource::ItemType
00154   StreamerInputSource::getNextItemType() {
00155     if (runEndingFlag_) {
00156       return IsStop;
00157     }
00158     if(newRun_ && runAuxiliary()) {
00159       return IsRun;
00160     }
00161     if(newLumi_ && luminosityBlockAuxiliary()) {
00162       return IsLumi;
00163     }
00164     if (eventCached_) {
00165       return IsEvent;
00166     }
00167     if (inputFileTransitionsEachEvent_) {
00168       resetRunAuxiliary();
00169       resetLuminosityBlockAuxiliary();
00170     }
00171     read();
00172     if (!eventCached_) {
00173       return IsStop;
00174     } else {
00175       runEndingFlag_ = false;
00176       if (inputFileTransitionsEachEvent_) {
00177         return IsFile;
00178       }
00179     }
00180     if(newRun_) {
00181       return IsRun;
00182     } else if(newLumi_) {
00183       return IsLumi;
00184     }
00185     return IsEvent;
00186   }
00187 
00192   std::auto_ptr<SendJobHeader>
00193   StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
00194     if(initView.code() != Header::INIT)
00195       throw cms::Exception("StreamTranslation","Registry deserialization error")
00196         << "received wrong message type: expected INIT, got "
00197         << initView.code() << "\n";
00198 
00199     //Get the process name and store if for Protocol version 4 and above.
00200     if (initView.protocolVersion() > 3) {
00201 
00202          processName_ = initView.processName();
00203          protocolVersion_ = initView.protocolVersion();
00204 
00205          FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
00206          FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
00207     }
00208 
00209    // calculate the adler32 checksum
00210    uint32_t adler32_chksum = cms::Adler32((char*)initView.descData(),initView.descLength());
00211    //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
00212    //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
00213    //          << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
00214     if((uint32)adler32_chksum != initView.adler32_chksum()) {
00215       std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed "
00216                 << " chksum from registry data = " << adler32_chksum << " from header = "
00217                 << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
00218       // skip event (based on option?) or throw exception?
00219     }
00220 
00221     TClass* desc = getTClass(typeid(SendJobHeader));
00222 
00223     TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
00224                  (char*)initView.descData(),kFALSE);
00225     RootDebug tracer(10,10);
00226     std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
00227 
00228     if(sd.get()==0) {
00229         throw cms::Exception("StreamTranslation","Registry deserialization error")
00230           << "Could not read the initial product registry list\n";
00231     }
00232 
00233     return sd;
00234   }
00235 
00240   void
00241   StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
00242      std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
00243      ProcessConfigurationVector const& pcv = sd->processConfigurations();
00244      mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent);
00245      if (subsequent) {
00246        principalCache().adjustEventToNewProductRegistry(productRegistry());
00247      }
00248      SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
00249      pset::Registry& psetRegistry = *pset::Registry::instance();
00250      for (SendJobHeader::ParameterSetMap::const_iterator i = psetMap.begin(), iEnd = psetMap.end(); i != iEnd; ++i) {
00251        ParameterSet pset(i->second.pset());
00252        pset.setID(i->first);
00253        psetRegistry.insertMapped(pset);
00254      }
00255      ProcessConfigurationRegistry& pcReg = *ProcessConfigurationRegistry::instance();
00256      for (ProcessConfigurationVector::const_iterator it = pcv.begin(), itEnd = pcv.end(); it != itEnd; ++it) {
00257        pcReg.insertMapped(*it);
00258      }
00259   }
00260 
00264   EventPrincipal*
00265   StreamerInputSource::deserializeEvent(EventMsgView const& eventView) {
00266     if(eventView.code() != Header::EVENT)
00267       throw cms::Exception("StreamTranslation","Event deserialization error")
00268         << "received wrong message type: expected EVENT, got "
00269         << eventView.code() << "\n";
00270     FDEBUG(9) << "Decode event: "
00271          << eventView.event() << " "
00272          << eventView.run() << " "
00273          << eventView.size() << " "
00274          << eventView.adler32_chksum() << " "
00275          << eventView.eventLength() << " "
00276          << eventView.eventData()
00277          << std::endl;
00278     EventSourceSentry(*this);
00279     // uncompress if we need to
00280     // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
00281     // need to get rid of this when 090 MTCC streamers are gotten rid of
00282     unsigned long origsize = eventView.origDataSize();
00283     unsigned long dest_size; //(should be >= eventView.origDataSize())
00284 
00285     uint32_t adler32_chksum = cms::Adler32((char*)eventView.eventData(), eventView.eventLength());
00286     //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
00287     //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
00288     //          << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
00289     if((uint32)adler32_chksum != eventView.adler32_chksum()) {
00290       std::cerr << "Error from StreamerInputSource: checksum of event data blob failed "
00291                 << " chksum from event = " << adler32_chksum << " from header = "
00292                 << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
00293       // skip event (based on option?) or throw exception?
00294     }
00295     if(origsize != 78 && origsize != 0) {
00296       // compressed
00297       dest_size = uncompressBuffer((unsigned char*)eventView.eventData(),
00298                                    eventView.eventLength(), dest_, origsize);
00299     } else { // not compressed
00300       // we need to copy anyway the buffer as we are using dest in xbuf
00301       dest_size = eventView.eventLength();
00302       dest_.resize(dest_size);
00303       unsigned char* pos = (unsigned char*) &dest_[0];
00304       unsigned char* from = (unsigned char*) eventView.eventData();
00305       std::copy(from,from+dest_size,pos);
00306     }
00307     //TBuffer xbuf(TBuffer::kRead, dest_size,
00308     //             (char*) &dest[0],kFALSE);
00309     //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
00310     //             (char*) eventView.eventData(),kFALSE);
00311     xbuf_.Reset();
00312     xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
00313     RootDebug tracer(10,10);
00314 
00315     setRefCoreStreamer(&productGetter_);
00316     std::auto_ptr<SendEvent> sd((SendEvent*)xbuf_.ReadObjectAny(tc_));
00317     setRefCoreStreamer();
00318 
00319     if(sd.get()==0) {
00320         throw cms::Exception("StreamTranslation","Event deserialization error")
00321           << "got a null event from input stream\n";
00322     }
00323     ProcessHistoryRegistry::instance()->insertMapped(sd->processHistory());
00324 
00325     FDEBUG(5) << "Got event: " << sd->aux().id() << " " << sd->products().size() << std::endl;
00326     if(runAuxiliary().get() == 0 || runAuxiliary()->run() != sd->aux().run()) {
00327       newRun_ = newLumi_ = true;
00328       RunAuxiliary* runAuxiliary = new RunAuxiliary(sd->aux().run(), sd->aux().time(), Timestamp::invalidTimestamp());
00329       runAuxiliary->setProcessHistoryID(sd->processHistory().id());
00330       setRunAuxiliary(runAuxiliary);
00331       resetLuminosityBlockAuxiliary();
00332     }
00333     if(!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
00334       LuminosityBlockAuxiliary* luminosityBlockAuxiliary =
00335         new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sd->aux().time(), Timestamp::invalidTimestamp());
00336       luminosityBlockAuxiliary->setProcessHistoryID(sd->processHistory().id());
00337       setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
00338       newLumi_ = true;
00339     }
00340 
00341     boost::shared_ptr<EventSelectionIDVector> ids(new EventSelectionIDVector(sd->eventSelectionIDs()));
00342     boost::shared_ptr<BranchListIndexes> indexes(new BranchListIndexes(sd->branchListIndexes()));
00343     BranchIDListHelper::fixBranchListIndexes(*indexes);
00344     eventPrincipalCache()->fillEventPrincipal(sd->aux(), boost::shared_ptr<LuminosityBlockPrincipal>(), ids, indexes);
00345     productGetter_.setEventPrincipal(eventPrincipalCache());
00346     eventCached_ = true;
00347 
00348     // no process name list handling
00349 
00350     SendProds & sps = sd->products();
00351     for(SendProds::iterator spi = sps.begin(), spe = sps.end(); spi != spe; ++spi) {
00352         FDEBUG(10) << "check prodpair" << std::endl;
00353         if(spi->desc() == 0)
00354           throw cms::Exception("StreamTranslation","Empty Provenance");
00355         FDEBUG(5) << "Prov:"
00356              << " " << spi->desc()->className()
00357              << " " << spi->desc()->productInstanceName()
00358              << " " << spi->desc()->branchID()
00359              << std::endl;
00360 
00361         ConstBranchDescription branchDesc(*spi->desc());
00362         // This ProductProvenance constructor inserts into the entry description registry
00363         ProductProvenance productProvenance(spi->branchID(), *spi->parents());
00364 
00365         if(spi->prod() != 0) {
00366           FDEBUG(10) << "addgroup next " << spi->branchID() << std::endl;
00367           eventPrincipalCache()->putOnRead(branchDesc, spi->prod(), productProvenance);
00368           FDEBUG(10) << "addgroup done" << std::endl;
00369         } else {
00370           FDEBUG(10) << "addgroup empty next " << spi->branchID() << std::endl;
00371           eventPrincipalCache()->putOnRead(branchDesc, spi->prod(), productProvenance);
00372           FDEBUG(10) << "addgroup empty done" << std::endl;
00373         }
00374         spi->clear();
00375     }
00376 
00377     FDEBUG(10) << "Size = " << eventPrincipalCache()->size() << std::endl;
00378 
00379     return eventPrincipalCache();
00380   }
00381 
00390   unsigned int
00391   StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
00392                                         unsigned int inputSize,
00393                                         std::vector<unsigned char>& outputBuffer,
00394                                         unsigned int expectedFullSize) {
00395     unsigned long origSize = expectedFullSize;
00396     unsigned long uncompressedSize = expectedFullSize*1.1;
00397     FDEBUG(1) << "Uncompress: original size = " << origSize
00398               << ", compressed size = " << inputSize
00399               << std::endl;
00400     outputBuffer.resize(uncompressedSize);
00401     int ret = uncompress(&outputBuffer[0], &uncompressedSize,
00402                          inputBuffer, inputSize); // do not need compression level
00403     //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
00404     if(ret == Z_OK) {
00405         // check the length against original uncompressed length
00406         FDEBUG(10) << " original size = " << origSize << " final size = "
00407                    << uncompressedSize << std::endl;
00408         if(origSize != uncompressedSize) {
00409             std::cerr << "deserializeEvent: Problem with uncompress, original size = "
00410                  << origSize << " uncompress size = " << uncompressedSize << std::endl;
00411             // we throw an error and return without event! null pointer
00412             throw cms::Exception("StreamDeserialization","Uncompression error")
00413               << "mismatch event lengths should be" << origSize << " got "
00414               << uncompressedSize << "\n";
00415         }
00416     } else {
00417         // we throw an error and return without event! null pointer
00418         std::cerr << "deserializeEvent: Problem with uncompress, return value = "
00419              << ret << std::endl;
00420         throw cms::Exception("StreamDeserialization","Uncompression error")
00421             << "Error code = " << ret << "\n ";
00422     }
00423     return (unsigned int) uncompressedSize;
00424   }
00425 
00426   void StreamerInputSource::resetAfterEndRun() {
00427      // called from an online streamer source to reset after a stop command
00428      // so an enable command will work
00429      resetLuminosityBlockAuxiliary();
00430      resetRunAuxiliary();
00431      newRun_ = newLumi_ = true;
00432      assert(!eventCached_);
00433      reset();
00434      runEndingFlag_ = false;
00435   }
00436 
00437   void StreamerInputSource::setRun(RunNumber_t) {
00438      // Need to define a dummy setRun here or else the InputSource::setRun is called
00439      // if we have a source inheriting from this and wants to define a setRun method
00440      throw Exception(errors::LogicError)
00441      << "StreamerInputSource::setRun()\n"
00442      << "Run number cannot be modified for this type of Input Source\n"
00443      << "Contact a Storage Manager Developer\n";
00444   }
00445 
00446   StreamerInputSource::ProductGetter::ProductGetter() : eventPrincipal_(0) {}
00447 
00448   StreamerInputSource::ProductGetter::~ProductGetter() {}
00449 
00450   WrapperHolder
00451   StreamerInputSource::ProductGetter::getIt(ProductID const& id) const {
00452     return eventPrincipal_ ? eventPrincipal_->getIt(id) : WrapperHolder();
00453   }
00454 
00455   void
00456   StreamerInputSource::ProductGetter::setEventPrincipal(EventPrincipal *ep) {
00457     eventPrincipal_ = ep;
00458   }
00459 
00460   void
00461   StreamerInputSource::fillDescription(ParameterSetDescription& desc) {
00462     // The default value for "inputFileTransitionsEachEvent" gets defined in the derived class
00463     // as it depends on the derived class. So, we cannot redefine it here.
00464     InputSource::fillDescription(desc);
00465   }
00466 }