CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/IOPool/Streamer/src/TestFileReader.cc

Go to the documentation of this file.
00001  
00002 #include "IOPool/Streamer/interface/TestFileReader.h"
00003 #include "IOPool/Streamer/interface/InitMessage.h"
00004 #include "IOPool/Streamer/interface/EventMessage.h"
00005 #include "IOPool/Streamer/interface/HLTInfo.h"
00006 #include "IOPool/Streamer/interface/StreamerInputSource.h"
00007 #include "FWCore/Utilities/interface/EDMException.h"
00008 #include "IOPool/Streamer/interface/Utilities.h"
00009 
00010 #include "boost/bind.hpp"
00011 #include "boost/shared_array.hpp"
00012 
00013 #include <algorithm>
00014 #include <cstdlib>
00015 #include <iterator>
00016 
00017 using namespace edm;
00018 
00019 namespace edmtestp
00020 {  
00021 
00022   namespace 
00023   {
00024     struct BufHelper
00025     {
00026       explicit BufHelper(int len): buf_(new char[len]) { }
00027       ~BufHelper() {}
00028       void release() {buf_.reset();}
00029       char* get() const { return buf_.get(); }
00030 
00031     private:
00032       BufHelper(const BufHelper& ) { }
00033       BufHelper& operator=(const BufHelper&) { return *this; }
00034 
00035       boost::shared_array<char> buf_;
00036     };
00037   }
00038 
00039   // ----------------------------------
00040 
00041   TestFileReader::TestFileReader(const std::string& filename,
00042                                  edm::EventBuffer& to,
00043                                  edm::ProductRegistry& prods):
00044     filename_(filename),
00045     //ist_(filename_.c_str(),ios_base::binary | ios_base::in),
00046     //reader_(ist_),
00047     streamReader_(new StreamerInputFile(filename)),
00048     to_(to) {
00049 
00050    const InitMsgView* init =  streamReader_->startMessage();
00051    std::auto_ptr<edm::SendJobHeader> p = StreamerInputSource::deserializeRegistry(*init);
00064     if(edm::registryIsSubset(*p, prods) == false) {
00065         throw edm::Exception(errors::Configuration,"TestFileReader")
00066           << "the header record in file " << filename_
00067           << "is not consistent with the one for the program \n";
00068     }
00069 
00070     // 13-Oct-2008, KAB - Added the following code to put the 
00071     // INIT message on the input queue.
00072     EventBuffer::ProducerBuffer b(to_);
00073     int len = init->size();
00074     char* buf_ = new char[len];
00075     memcpy(buf_, init->startAddress(), len);
00076     new (b.buffer()) stor::FragEntry(buf_, buf_, len, 1, 1,
00077                                      init->code(), init->run(),
00078                                      0, init->outputModuleId(),
00079                                      getpid(), 0);
00080     b.commit(sizeof(stor::FragEntry));
00081   }
00082 
00083   TestFileReader::~TestFileReader() {
00084   }
00085 
00086   void TestFileReader::start() {
00087     me_.reset(new boost::thread(boost::bind(TestFileReader::run,this)));
00088   }
00089 
00090   void TestFileReader::join() {
00091     me_->join();
00092   }
00093 
00094   void TestFileReader::run(TestFileReader* t) {
00095     t->readEvents();
00096   }
00097 
00098   void TestFileReader::readEvents() {
00099 
00100    while(streamReader_->next()) { 
00101        EventBuffer::ProducerBuffer b(to_);
00102        const EventMsgView* eview = streamReader_->currentRecord();
00103 
00104        // 13-Oct-2008, KAB - we need to make a copy of the event message
00105        // for two reasons:  1) the processing of the events is often done
00106        // asynchronously in the code that uses this reader, so we can't
00107        // keep re-using the same buffer from the stream_reader, and
00108        // 2) the code that uses this reader often uses deleters to
00109        // free up the memory used by the FragEntry, so we want the
00110        // first argument to the FragEntry to be something that can 
00111        // be deleted successfully.
00112        int len = eview->size();
00113        char* buf_ = new char[len];
00114        memcpy(buf_, eview->startAddress(), len);
00115 
00116        //stor::FragEntry* msg =
00117        //   new (b.buffer()) stor::FragEntry(eview->startAddress(),
00118        //                                    eview->startAddress(),
00119        // the first arg should be startAddress() right?
00120           //new (b.buffer()) stor::FragEntry((void*)eview->eventData(),
00121        new (b.buffer()) stor::FragEntry(buf_, buf_, len, 1, 1,
00122                                         eview->code(), eview->run(),
00123                                         eview->event(), eview->outModId(),
00124                                         getpid(), 0);
00125        b.commit(sizeof(stor::FragEntry));
00126    }
00127 
00128    /***
00129     while(1)
00130       {
00131         int len=0;
00132         ist_.read((char*)&len,sizeof(int));
00133 
00134         if(!ist_ || len==0 || ist_.eof()) break;
00135 
00136         EventBuffer::ProducerBuffer b(to_);
00137         // Pay attention here.
00138         // This is a bit of a mess.
00139         // Here we allocate an array (on the heap), fill it with data
00140         // from the file, then pass the bare pointer off onto the queue.
00141         // The ownership is to be picked up by the code that pulls it
00142         // off the queue.  The current implementation of the queue
00143         // is primitive - it knows nothing about the types of things
00144         // that are on the queue.
00145 
00146         BufHelper data(len);
00147         //std::cout << "allocated frag " << len << std::endl;
00148         ist_.read((char*)data.get(),len);
00149         //std::cout << "read frag to " << (void*)data.get() << std::endl;
00150         if(!ist_ || ist_.eof()) std::cerr << "got end!!!!" << std::endl;
00151         //HEREHERE need a real event number here for id
00152         stor::FragEntry* msg = 
00153           new (b.buffer()) stor::FragEntry(data.get(),data.get(),len,1,1,Header::EVENT,0,1,0);
00154         assert(msg); // Suppresses compiler warning about unused variable
00155         //new (b.buffer()) stor::FragEntry(0,0,len);
00156         //std::cout << "make entry for frag " << (void*)msg << " " << msg->buffer_address_ << std::endl;
00157         data.release();
00158         //std::cout << "release frag" << std::endl;
00159         b.commit(sizeof(stor::FragEntry));
00160         //std::cout << "commit frag " << sizeof(stor::FragEntry) << std::endl;
00161         //sleep(2);
00162       } **/
00163   }
00164 
00165 }