Go to the documentation of this file.00001
00002 #include "IOPool/Streamer/src/TestConsumer.h"
00003 #include "FWCore/Utilities/interface/Exception.h"
00004 #include "FWCore/Utilities/interface/EDMException.h"
00005
00006 #include <string>
00007 #include <cstring>
00008 #include <fstream>
00009 #include <sstream>
00010
00011 using namespace edm;
00012
00013 namespace edmtest
00014 {
00015 typedef boost::shared_ptr<std::ofstream> OutPtr;
00016 typedef std::vector<char> SaveArea;
00017
00018 std::string makeFileName(const std::string& base, int num)
00019 {
00020 std::ostringstream ost;
00021 ost << base << num << ".dat";
00022 return ost.str();
00023 }
00024
00025 OutPtr makeFile(const std::string name,int num)
00026 {
00027 OutPtr p(new std::ofstream(makeFileName(name,num).c_str(),
00028 std::ios_base::binary | std::ios_base::out));
00029
00030 if(!(*p))
00031 {
00032 throw edm::Exception(errors::Configuration,"TestConsumer")
00033 << "cannot open file " << name;
00034 }
00035
00036 return p;
00037 }
00038
00039 struct Worker
00040 {
00041 Worker(const std::string& s, int m);
00042
00043 std::string filename_;
00044 int file_num_;
00045 int cnt_;
00046 int max_;
00047 OutPtr ost_;
00048 SaveArea reg_;
00049
00050 void checkCount();
00051 void saveReg(void* buf, int len);
00052 void writeReg();
00053 };
00054
00055 Worker::Worker(const std::string& s,int m):
00056 filename_(s),
00057 file_num_(),
00058 cnt_(0),
00059 max_(m),
00060 ost_(makeFile(filename_,file_num_))
00061 {
00062 }
00063
00064 void Worker::checkCount()
00065 {
00066 if(cnt_!=0 && (cnt_%max_) == 0)
00067 {
00068 ++file_num_;
00069 ost_ = makeFile(filename_,file_num_);
00070 writeReg();
00071 }
00072 ++cnt_;
00073
00074 }
00075
00076 void Worker::writeReg()
00077 {
00078 if(!reg_.empty())
00079 {
00080 int len = reg_.size();
00081 ost_->write((const char*)(&len),sizeof(int));
00082 ost_->write((const char*)®_[0],len);
00083 }
00084 }
00085
00086 void Worker::saveReg(void* buf, int len)
00087 {
00088 reg_.resize(len);
00089 memcpy(®_[0],buf,len);
00090 }
00091
00092
00093
00094
00095 TestConsumer::TestConsumer(edm::ParameterSet const& ps,
00096 edm::EventBuffer* buf):
00097 worker_(new Worker(ps.getParameter<std::string>("fileName"),
00098 ps.getUntrackedParameter<int>("numPerFile",1<<31))),
00099 bufs_(buf)
00100 {
00101
00102
00103 }
00104
00105 TestConsumer::~TestConsumer()
00106 {
00107 }
00108
00109 void TestConsumer::bufferReady()
00110 {
00111 worker_->checkCount();
00112
00113 EventBuffer::ConsumerBuffer cb(*bufs_);
00114
00115 int sz = cb.size();
00116 worker_->ost_->write((const char*)(&sz),sizeof(int));
00117 worker_->ost_->write((const char*)cb.buffer(),sz);
00118
00119 }
00120
00121 void TestConsumer::stop()
00122 {
00123 EventBuffer::ProducerBuffer pb(*bufs_);
00124 pb.commit();
00125 }
00126
00127 void TestConsumer::sendRegistry(void* buf, int len)
00128 {
00129 worker_->saveReg(buf,len);
00130 worker_->writeReg();
00131 }
00132 }