00001
00002 #include "IOPool/Streamer/src/TestConsumer.h"
00003 #include "FWCore/Utilities/interface/Exception.h"
00004
00005 #include "boost/shared_ptr.hpp"
00006
00007 #include <string>
00008 #include <cstring>
00009 #include <fstream>
00010 #include <sstream>
00011
00012 using namespace edm;
00013
00014 namespace edmtest
00015 {
00016 typedef boost::shared_ptr<std::ofstream> OutPtr;
00017 typedef std::vector<char> SaveArea;
00018
00019 std::string makeFileName(const std::string& base, int num)
00020 {
00021 std::ostringstream ost;
00022 ost << base << num << ".dat";
00023 return ost.str();
00024 }
00025
00026 OutPtr makeFile(const std::string name,int num)
00027 {
00028 OutPtr p(new std::ofstream(makeFileName(name,num).c_str(),
00029 std::ios_base::binary | std::ios_base::out));
00030
00031 if(!(*p))
00032 {
00033 throw edm::Exception(errors::Configuration,"TestConsumer")
00034 << "cannot open file " << name;
00035 }
00036
00037 return p;
00038 }
00039
00040 struct Worker
00041 {
00042 Worker(const std::string& s, int m);
00043
00044 std::string filename_;
00045 int file_num_;
00046 int cnt_;
00047 int max_;
00048 OutPtr ost_;
00049 SaveArea reg_;
00050
00051 void checkCount();
00052 void saveReg(void* buf, int len);
00053 void writeReg();
00054 };
00055
00056 Worker::Worker(const std::string& s,int m):
00057 filename_(s),
00058 file_num_(),
00059 cnt_(0),
00060 max_(m),
00061 ost_(makeFile(filename_,file_num_))
00062 {
00063 }
00064
00065 void Worker::checkCount()
00066 {
00067 if(cnt_!=0 && (cnt_%max_) == 0)
00068 {
00069 ++file_num_;
00070 ost_ = makeFile(filename_,file_num_);
00071 writeReg();
00072 }
00073 ++cnt_;
00074
00075 }
00076
00077 void Worker::writeReg()
00078 {
00079 if(!reg_.empty())
00080 {
00081 int len = reg_.size();
00082 ost_->write((const char*)(&len),sizeof(int));
00083 ost_->write((const char*)®_[0],len);
00084 }
00085 }
00086
00087 void Worker::saveReg(void* buf, int len)
00088 {
00089 reg_.resize(len);
00090 memcpy(®_[0],buf,len);
00091 }
00092
00093
00094
00095
00096 TestConsumer::TestConsumer(edm::ParameterSet const& ps,
00097 edm::EventBuffer* buf):
00098 worker_(new Worker(ps.getParameter<std::string>("fileName"),
00099 ps.getUntrackedParameter<int>("numPerFile",1<<31))),
00100 bufs_(buf)
00101 {
00102
00103
00104 }
00105
00106 TestConsumer::~TestConsumer()
00107 {
00108 delete worker_;
00109 }
00110
00111 void TestConsumer::bufferReady()
00112 {
00113 worker_->checkCount();
00114
00115 EventBuffer::ConsumerBuffer cb(*bufs_);
00116
00117 int sz = cb.size();
00118 worker_->ost_->write((const char*)(&sz),sizeof(int));
00119 worker_->ost_->write((const char*)cb.buffer(),sz);
00120
00121 }
00122
00123 void TestConsumer::stop()
00124 {
00125 EventBuffer::ProducerBuffer pb(*bufs_);
00126 pb.commit();
00127 }
00128
00129 void TestConsumer::sendRegistry(void* buf, int len)
00130 {
00131 worker_->saveReg(buf,len);
00132 worker_->writeReg();
00133 }
00134 }