CMS 3D CMS Logo

StreamerInputSource.cc

Go to the documentation of this file.
00001 #include "IOPool/Streamer/interface/StreamerInputSource.h"
00002 #include "DataFormats/Provenance/interface/ProcessConfiguration.h" 
00003 
00004 #include "IOPool/Streamer/interface/EventMessage.h"
00005 #include "IOPool/Streamer/interface/InitMessage.h"
00006 #include "IOPool/Streamer/interface/ClassFiller.h"
00007 
00008 #include "FWCore/Framework/interface/EventPrincipal.h"
00009 #include "FWCore/Framework/interface/FileBlock.h"
00010 #include "DataFormats/Provenance/interface/BranchDescription.h"
00011 #include "DataFormats/Provenance/interface/EventEntryDescription.h"
00012 #include "DataFormats/Provenance/interface/EventEntryInfo.h"
00013 #include "DataFormats/Provenance/interface/EntryDescriptionRegistry.h"
00014 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
00015 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
00016 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
00017 
00018 #include "zlib.h"
00019 
00020 #include "DataFormats/Common/interface/RefCoreStreamer.h"
00021 #include "FWCore/Utilities/interface/WrappedClassName.h"
00022 #include "DataFormats/Common/interface/EDProduct.h"
00023 #include "FWCore/Utilities/interface/Exception.h"
00024 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
00025 #include "FWCore/Framework/interface/RunPrincipal.h"
00026 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00027 #include "FWCore/ParameterSet/interface/Registry.h"
00028 #include "FWCore/Utilities/interface/ThreadSafeRegistry.h"
00029 
00030 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00031 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
00032 #include "FWCore/Utilities/interface/DebugMacros.h"
00033 
00034 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00035 
00036 #include <string>
00037 #include <iostream>
00038 #include <set>
00039 
00040 namespace edm {
00041   namespace {
00042     const int init_size = 1024*1024;
00043   }
00044 
00045   std::string StreamerInputSource::processName_;
00046   unsigned int StreamerInputSource::protocolVersion_;
00047 
00048 
00049   StreamerInputSource::StreamerInputSource(
00050                     ParameterSet const& pset,
00051                     InputSourceDescription const& desc):
00052     InputSource(pset, desc),
00053     // The value for the following parameter gets overwritten in at least one derived class
00054     // where it has a different default value.
00055     inputFileTransitionsEachEvent_(
00056       pset.getUntrackedParameter<bool>("inputFileTransitionsEachEvent", false)),
00057     newRun_(true),
00058     newLumi_(true),
00059     ep_(),
00060     tc_(getTClass(typeid(SendEvent))),
00061     dest_(init_size),
00062     xbuf_(TBuffer::kRead, init_size),
00063     runEndingFlag_(false),
00064     productGetter_()
00065   {
00066   }
00067 
00068   StreamerInputSource::~StreamerInputSource() {}
00069 
00070   // ---------------------------------------
00071   boost::shared_ptr<FileBlock>
00072   StreamerInputSource::readFile_() {
00073     productRegistryUpdate().setProductIDs(productRegistry()->nextID());
00074     return boost::shared_ptr<FileBlock>(new FileBlock);
00075   }
00076 
00077   void
00078   StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header,
00079                          ProductRegistry& reg, bool subsequent) {
00080 
00081     SendDescs const& descs = header.descs();
00082 
00083     SendDescs::const_iterator i(descs.begin()), e(descs.end());
00084 
00085     FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
00086 
00087     if (subsequent) {
00088       ProductRegistry pReg;
00089       for(; i != e; ++i) {
00090         pReg.copyProduct(*i);
00091         FDEBUG(6) << "StreamInput prod = " << i->className() << std::endl;
00092       }
00093       std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
00094       if (!mergeInfo.empty()) {
00095         throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
00096       }
00097     } else {
00098       declareStreamers(descs);
00099       buildClassCache(descs);
00100       loadExtraClasses();
00101       for(; i != e; ++i) {
00102         i->setDefaultTransients();
00103         i->init();
00104         reg.copyProduct(*i);
00105         FDEBUG(6) << "StreamInput prod = " << i->className() << std::endl;
00106       }
00107     }
00108     reg.setNextID(header.nextID());
00109   }
00110 
00111   void
00112   StreamerInputSource::declareStreamers(SendDescs const& descs) {
00113     SendDescs::const_iterator i(descs.begin()), e(descs.end());
00114 
00115     for(; i != e; ++i) {
00116         //pi->init();
00117         std::string const real_name = wrappedClassName(i->className());
00118         FDEBUG(6) << "declare: " << real_name << std::endl;
00119         loadCap(real_name);
00120     }
00121   }
00122 
00123 
00124   void
00125   StreamerInputSource::buildClassCache(SendDescs const& descs) { 
00126     SendDescs::const_iterator i(descs.begin()), e(descs.end());
00127 
00128     for(; i != e; ++i) {
00129         //pi->init();
00130         std::string const real_name = wrappedClassName(i->className());
00131         FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
00132         doBuildRealData(real_name);
00133     }
00134   }
00135 
00136   void
00137   StreamerInputSource::saveTriggerNames(InitMsgView const* header) {
00138 
00139     ParameterSet trigger_pset;
00140     std::vector<std::string> paths;
00141     header->hltTriggerNames(paths);
00142     trigger_pset.addParameter<Strings>("@trigger_paths", paths);
00143     pset::Registry* psetRegistry = pset::Registry::instance();
00144     psetRegistry->insertMapped(trigger_pset);
00145   }
00146 
00147   boost::shared_ptr<RunPrincipal>
00148   StreamerInputSource::readRun_() {
00149     assert(newRun_);
00150     assert(runPrincipal());
00151     newRun_ = false;
00152     return runPrincipal();
00153   }
00154 
00155   boost::shared_ptr<LuminosityBlockPrincipal>
00156   StreamerInputSource::readLuminosityBlock_() {
00157     assert(!newRun_);
00158     assert(newLumi_);
00159     assert(luminosityBlockPrincipal());
00160     newLumi_ = false;
00161     return luminosityBlockPrincipal();
00162   }
00163 
00164   std::auto_ptr<EventPrincipal>
00165   StreamerInputSource::readEvent_() {
00166     assert(!newRun_);
00167     assert(!newLumi_);
00168     assert(ep_.get() != 0);
00169     // This copy resets ep_.
00170     return ep_;
00171   }
00172 
00173   InputSource::ItemType 
00174   StreamerInputSource::getNextItemType() {
00175     if (runEndingFlag_) {
00176       return IsStop;
00177     }
00178     if(newRun_ && runPrincipal()) {
00179       return IsRun;
00180     }
00181     if(newLumi_ && luminosityBlockPrincipal()) {
00182       return IsLumi;
00183     }
00184     if (ep_.get() != 0) {
00185       return IsEvent;
00186     }
00187     if (inputFileTransitionsEachEvent_) {
00188       resetRunPrincipal();
00189       resetLuminosityBlockPrincipal();
00190     }
00191     ep_ = read();
00192     if (ep_.get() == 0) {
00193       return IsStop;
00194     } else {
00195       runEndingFlag_ = false;
00196       if (inputFileTransitionsEachEvent_) {
00197         return IsFile;
00198       }
00199     }
00200     if(newRun_) {
00201       return IsRun;
00202     } else if(newLumi_) {
00203       return IsLumi;
00204     }
00205     return IsEvent;
00206   }
00207 
00212   std::auto_ptr<SendJobHeader>
00213   StreamerInputSource::deserializeRegistry(InitMsgView const& initView)
00214   {
00215     if(initView.code() != Header::INIT)
00216       throw cms::Exception("StreamTranslation","Registry deserialization error")
00217         << "received wrong message type: expected INIT, got "
00218         << initView.code() << "\n";
00219 
00220     //Get the process name and store if for Protocol version 4 and above.
00221     if (initView.protocolVersion() > 3) {
00222 
00223          processName_ = initView.processName();
00224          protocolVersion_ = initView.protocolVersion();
00225 
00226          FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
00227          FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
00228     }
00229     
00230     TClass* desc = getTClass(typeid(SendJobHeader));
00231 
00232     RootBuffer xbuf(TBuffer::kRead, initView.descLength(),
00233                  (char*)initView.descData(),kFALSE);
00234     RootDebug tracer(10,10);
00235     std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
00236 
00237     if(sd.get()==0) {
00238         throw cms::Exception("StreamTranslation","Registry deserialization error")
00239           << "Could not read the initial product registry list\n";
00240     }
00241 
00242     return sd;  
00243   }
00244 
00249   void
00250   StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent)
00251   {
00252      std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
00253      mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent);
00254      ModuleDescriptionRegistry & moduleDescriptionRegistry = *ModuleDescriptionRegistry::instance();
00255      ModuleDescriptionMap const& mdMap = sd->moduleDescriptionMap();
00256      for (ModuleDescriptionMap::const_iterator k = mdMap.begin(), kEnd = mdMap.end(); k != kEnd; ++k) {
00257        moduleDescriptionRegistry.insertMapped(k->second);
00258      } 
00259      SendJobHeader::ParameterSetMap const & psetMap = sd->processParameterSet();
00260      pset::Registry& psetRegistry = *pset::Registry::instance();
00261      for (SendJobHeader::ParameterSetMap::const_iterator i = psetMap.begin(), iEnd = psetMap.end(); i != iEnd; ++i) {
00262        psetRegistry.insertMapped(ParameterSet(i->second.pset_));
00263      }
00264   }
00265 
00269   std::auto_ptr<EventPrincipal>
00270   StreamerInputSource::deserializeEvent(EventMsgView const& eventView)
00271   {
00272     if(eventView.code() != Header::EVENT)
00273       throw cms::Exception("StreamTranslation","Event deserialization error")
00274         << "received wrong message type: expected EVENT, got "
00275         << eventView.code() << "\n";
00276     FDEBUG(9) << "Decode event: "
00277          << eventView.event() << " "
00278          << eventView.run() << " "
00279          << eventView.size() << " "
00280          << eventView.eventLength() << " "
00281          << eventView.eventData()
00282          << std::endl;
00283     EventSourceSentry(*this);
00284     // uncompress if we need to
00285     // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
00286     // need to get rid of this when 090 MTCC streamers are gotten rid of
00287     unsigned long origsize = eventView.origDataSize();
00288     unsigned long dest_size; //(should be >= eventView.origDataSize() )
00289 
00290     if(origsize != 78 && origsize != 0)
00291     {
00292       // compressed
00293       dest_size = uncompressBuffer((unsigned char*)eventView.eventData(),
00294                                    eventView.eventLength(), dest_, origsize);
00295     }
00296     else // not compressed
00297     {
00298       // we need to copy anyway the buffer as we are using dest in xbuf
00299       dest_size = eventView.eventLength();
00300       dest_.resize(dest_size);
00301       unsigned char* pos = (unsigned char*) &dest_[0];
00302       unsigned char* from = (unsigned char*) eventView.eventData();
00303       std::copy(from,from+dest_size,pos);
00304     }
00305     //TBuffer xbuf(TBuffer::kRead, dest_size,
00306     //             (char*) &dest[0],kFALSE);
00307     //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
00308     //             (char*) eventView.eventData(),kFALSE);
00309     xbuf_.Reset();
00310     xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
00311     RootDebug tracer(10,10);
00312 
00313     setRefCoreStreamer(&productGetter_);
00314     std::auto_ptr<SendEvent> sd((SendEvent*)xbuf_.ReadObjectAny(tc_));
00315 
00316     if(sd.get()==0) {
00317         throw cms::Exception("StreamTranslation","Event deserialization error")
00318           << "got a null event from input stream\n";
00319     }
00320     sd->processHistory().setDefaultTransients();
00321     ProcessHistoryRegistry::instance()->insertMapped(sd->processHistory());
00322 
00323     FDEBUG(5) << "Got event: " << sd->aux().id() << " " << sd->products().size() << std::endl;
00324     if(!runPrincipal() || runPrincipal()->run() != sd->aux().run()) {
00325         newRun_ = newLumi_ = true;
00326         RunAuxiliary runAux(sd->aux().run(), sd->aux().time(), Timestamp::invalidTimestamp());
00327         setRunPrincipal(boost::shared_ptr<RunPrincipal>(
00328           new RunPrincipal(runAux,
00329                            productRegistry(),
00330                            processConfiguration())));
00331         resetLuminosityBlockPrincipal();
00332     }
00333     if(!luminosityBlockPrincipal() || luminosityBlockPrincipal()->luminosityBlock() != eventView.lumi()) {
00334       
00335       LuminosityBlockAuxiliary lumiAux(runPrincipal()->run(), eventView.lumi(), sd->aux().time(), Timestamp::invalidTimestamp());
00336       setLuminosityBlockPrincipal(boost::shared_ptr<LuminosityBlockPrincipal>(
00337         new LuminosityBlockPrincipal(lumiAux,
00338                                      productRegistry(),
00339                                      processConfiguration())));
00340       newLumi_ = true;
00341     }
00342 
00343     std::auto_ptr<EventPrincipal> ep(new EventPrincipal(sd->aux(),
00344                                                    productRegistry(),
00345                                                    processConfiguration(),
00346                                                    sd->processHistory().id()));
00347     productGetter_.setEventPrincipal(ep.get());
00348 
00349     // no process name list handling
00350 
00351     ProductID largestID;
00352     SendProds & sps = sd->products();
00353     for(SendProds::iterator spi = sps.begin(), spe = sps.end(); spi != spe; ++spi) {
00354         FDEBUG(10) << "check prodpair" << std::endl;
00355         if(spi->desc() == 0)
00356           throw cms::Exception("StreamTranslation","Empty Provenance");
00357         FDEBUG(5) << "Prov:"
00358              << " " << spi->desc()->className()
00359              << " " << spi->desc()->productInstanceName()
00360              << " " << spi->desc()->branchID()
00361              << std::endl;
00362 
00363         ConstBranchDescription branchDesc(*spi->desc());
00364         // This EventEntryInfo constructor inserts into the entry description registry
00365         boost::shared_ptr<EventEntryInfo> eventEntryDesc(
00366              new EventEntryInfo(spi->branchID(),
00367                                 spi->status(),
00368                                 spi->mod(),
00369                                 spi->productID(),
00370                                 *spi->parents()));
00371 
00372         ep->branchMapperPtr()->insert(*eventEntryDesc);
00373         if(spi->productID() > largestID) {
00374            largestID = spi->productID();
00375         }
00376         if(spi->prod() != 0) {
00377           std::auto_ptr<EDProduct> aprod(const_cast<EDProduct*>(spi->prod()));
00378           FDEBUG(10) << "addgroup next " << spi->branchID() << std::endl;
00379           ep->addGroup(aprod, branchDesc, eventEntryDesc);
00380           FDEBUG(10) << "addgroup done" << std::endl;
00381         } else {
00382           FDEBUG(10) << "addgroup empty next " << spi->branchID() << std::endl;
00383           ep->addGroup(branchDesc, eventEntryDesc);
00384           FDEBUG(10) << "addgroup empty done" << std::endl;
00385         }
00386         spi->clear();
00387     }
00388 
00389     if(largestID.id() >= productRegistry()->nextID()) {
00390        edm::LogError("MetaDataError")<<"The input file has a critical problem, the 'nextID' for the ProductRegistry ("
00391                                      <<productRegistry()->nextID()
00392                                      <<")\n is less than the largest ProductID ("
00393                                      <<largestID.id()
00394                                      <<") used in a previous process.\n"
00395           " Will modify the ProductRegistry to attempt to correct the problem,\n"
00396           " although it is possible that edm::Ref*'s or edm::Ptr's may still fail.\n"
00397           " Please contact StreamerOutputModule developers.";
00398        //NOTE: this works since only EDProducers get their ProductIDs for the event from
00399        // the ProductRegistry and they do not do that until they 'put' their data
00400        // so at this point no one has tried to use the ProductIDs yet
00401        productRegistryUpdate().setNextID(largestID.id()+1);
00402        productRegistryUpdate().setProductIDs(largestID.id()+1);
00403     }
00404     FDEBUG(10) << "Size = " << ep->size() << std::endl;
00405 
00406     return ep;     
00407   }
00408 
00417   unsigned int
00418   StreamerInputSource::uncompressBuffer(unsigned char *inputBuffer,
00419                                        unsigned int inputSize,
00420                                        std::vector<unsigned char> &outputBuffer,
00421                                        unsigned int expectedFullSize)
00422   {
00423     unsigned long origSize = expectedFullSize;
00424     unsigned long uncompressedSize = expectedFullSize;
00425     FDEBUG(1) << "Uncompress: original size = " << origSize
00426               << ", compressed size = " << inputSize
00427               << std::endl;
00428     outputBuffer.resize(origSize);
00429     int ret = uncompress(&outputBuffer[0], &uncompressedSize,
00430                          inputBuffer, inputSize); // do not need compression level
00431     //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
00432     if(ret == Z_OK) {
00433         // check the length against original uncompressed length
00434         FDEBUG(10) << " original size = " << origSize << " final size = " 
00435                    << uncompressedSize << std::endl;
00436         if(origSize != uncompressedSize) {
00437             std::cerr << "deserializeEvent: Problem with uncompress, original size = "
00438                  << origSize << " uncompress size = " << uncompressedSize << std::endl;
00439             // we throw an error and return without event! null pointer
00440             throw cms::Exception("StreamDeserialization","Uncompression error")
00441               << "mismatch event lengths should be" << origSize << " got "
00442               << uncompressedSize << "\n";
00443         }
00444     } else {
00445         // we throw an error and return without event! null pointer
00446         std::cerr << "deserializeEvent: Problem with uncompress, return value = "
00447              << ret << std::endl;
00448         throw cms::Exception("StreamDeserialization","Uncompression error")
00449             << "Error code = " << ret << "\n ";
00450     }
00451 
00452     return (unsigned int) uncompressedSize;
00453   }
00454 
00455   void StreamerInputSource::resetAfterEndRun()
00456   {
00457      // called from an online streamer source to reset after a stop command
00458      // so an enable command will work
00459      assert(ep_.get() == 0);
00460      reset();
00461      runEndingFlag_ = false;
00462   }
00463 
00464   void StreamerInputSource::setRun(RunNumber_t) 
00465   {
00466      // Need to define a dummy setRun here or else the InputSource::setRun is called
00467      // if we have a source inheriting from this and wants to define a setRun method
00468      throw edm::Exception(edm::errors::LogicError)
00469      << "StreamerInputSource::setRun()\n"
00470      << "Run number cannot be modified for this type of Input Source\n"
00471      << "Contact a Storage Manager Developer\n";
00472   }
00473 
00474   StreamerInputSource::ProductGetter::ProductGetter() : eventPrincipal_(0) {}
00475 
00476   StreamerInputSource::ProductGetter::~ProductGetter() {}
00477 
00478   EDProduct const*
00479   StreamerInputSource::ProductGetter::getIt(edm::ProductID const& id) const {
00480     return eventPrincipal_ ? eventPrincipal_->getIt(id) : 0;
00481   }
00482 
00483   void
00484   StreamerInputSource::ProductGetter::setEventPrincipal(EventPrincipal *ep) {
00485     eventPrincipal_ = ep;
00486   }
00487 }

Generated on Tue Jun 9 17:39:18 2009 for CMSSW by  doxygen 1.5.4