CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
MTRawEventFileWriterForBU.cc
Go to the documentation of this file.
1 // $Id: MTRawEventFileWriterForBU.cc,v 1.1.2.7 2013/01/16 17:47:37 aspataru Exp $
2 
6 
7 #include "../interface/FileIO.h"
8 
9 #include <iostream>
10 #include <iomanip>
11 #include <stdio.h>
12 #include <errno.h>
13 #include <string.h>
14 #include <assert.h>
15 #include <boost/tokenizer.hpp>
16 
17 
18 namespace fwriter {
20  public:
21  EventContainer(unsigned int evtBufSize)
22  {
23  writtenSize_=0;
24  evtBufSize_=evtBufSize;
25  data_.reset(new unsigned char [evtBufSize]);
26  shared_mode_=false;
27  }
29  {
30  writtenSize_=0;
31  evtBufSize_=0;
32  shared_data_.reset();
33  shared_mode_=true;
34  }
36 
37  bool sharedMode() {return shared_mode_;}
38  unsigned int getSize() const {return writtenSize_;}
39  unsigned int getBufSize() const {return evtBufSize_;}
40  unsigned char* getBuffer() const {return data_.get();}
41  boost::shared_array<unsigned char> * getSharedBuffer() {return & shared_data_;}
42 
43  void putNewEvent(unsigned char* addr, unsigned int size) {
44  if (size>evtBufSize_) {
45  data_.reset(new unsigned char[size]);
47  }
48  memcpy(data_.get(),addr,size);
50  }
51 
52  void putNewEvent(boost::shared_array<unsigned char> & msg) {
53  shared_data_ = msg;
54  }
55 
56  private:
57  unsigned int writtenSize_;
58  unsigned int evtBufSize_;
59  std::auto_ptr<unsigned char> data_;
60  boost::shared_array<unsigned char> shared_data_;
62  };
63 }
64 
66  lumiMon_(0),
67  numWriters_(ps.getUntrackedParameter<unsigned int>("numWriters",1))
68 ,eventBufferSize_(ps.getUntrackedParameter<unsigned int>("eventBufferSize",30))
69 ,sharedMode_(ps.getUntrackedParameter<bool>("sharedMode",true))
70 ,lumiSubdirectoriesMode_(ps.getUntrackedParameter<bool>("lumiSubdirectoriesMode",true))
71 ,debug_(ps.getUntrackedParameter<bool>("debug",false))
72 //,finishAfterLS_(ps.getUntrackedParameter<int>,-1)
73 {
74  for (unsigned int i=0;i<eventBufferSize_;i++) {
75  if (!sharedMode_)
76  EventPool.push_back(new fwriter::EventContainer(1048576));
77  else
78  EventPool.push_back(new fwriter::EventContainer());
79  freeIds.push_back(i);
80  }
81  fileHeader_= new unsigned char[1024*1024];
82 
84  // set names of the variables to be matched with JSON Definition
85  perLumiEventCount_.setName("NEvents");
86 
87  // create a vector of all monitorable parameters to be passed to the monitor
88  vector<JsonMonitorable*> lumiMonParams;
89  lumiMonParams.push_back(&perLumiEventCount_);
90 
91  // create a DataPointMonitor using vector of monitorable parameters and a path to a JSON Definition file
92  lumiMon_ = new DataPointMonitor(lumiMonParams, "/home/aspataru/cmssw/CMSSW_6_1_0_pre4/src/EventFilter/Utilities/plugins/budef.jsd");
93 }
94 
95 
97 {
98  finishThreads();
99  if (lumiMon_ != 0)
100  delete lumiMon_;
101  while (!perFileMonitors_.empty()) {
102  delete perFileMonitors_.back();
103  perFileMonitors_.pop_back();
104  }
105  while (!perFileCounters_.empty()) {
106  delete perFileCounters_.back();
107  perFileCounters_.pop_back();
108  }
109 }
110 
112 {
113  if (sharedMode_) return;
114  queueEvent((const char*)msg.startAddress(), msg.size());
115 }
116 
117 
118 void MTRawEventFileWriterForBU::doOutputEvent(boost::shared_array<unsigned char> & msg)
119 {
120  if (!sharedMode_) return;
121  queueEvent(msg);
122 }
123 
125 {
126  //not implemented
127 }
128 
130  unsigned long dataSize)
131 {
132  queueEvent((const char*) dataPtr, dataSize);
133 
134  /*
135  ost_->flush();
136  if (ost_->fail()) {
137  throw cms::Exception("RawEventFileWriterForBU", "doOutputEventFragment")
138  << "Error writing FED Raw Data event data to "
139  << fileName_ << ". Possibly the output disk "
140  << "is full?" << std::endl;
141  }
142  */
143 
144  //cms::Adler32((const char*) dataPtr, dataSize, adlera_, adlerb_);
145 }
146 
147 void MTRawEventFileWriterForBU::initialize(std::string const& destinationDir, std::string const& name, int ls)
148 {
149 
150  destinationDir_ = destinationDir+"/";
152  std::ostringstream lsdir;
153  lsdir << "ls" << std::setfill('0') << std::setw(6) << ls << "/";
154  lumiSectionSubDir_ = lsdir.str();
155  mkdir((destinationDir_+lumiSectionSubDir_).c_str(),0755);
156  }
157  else lumiSectionSubDir_="";
158 
159  std::string fileBase=name;
160  std::string fileSuffix;
161 
162  boost::char_separator<char> sep(".");
163  boost::tokenizer<boost::char_separator<char>> tokens(name, sep);
164 
165  fileBase=*tokens.begin();
166  for (auto tok_iter = tokens.begin(); tok_iter != tokens.end(); ++tok_iter) {
167  fileSuffix=*tok_iter;
168  }
169  finishThreads();
170  dispatchThreads(fileBase,numWriters_,fileSuffix);
171 }
172 
174 {
175  finishThreads();
176  //writing empty EoLS file (will be filled with information)
177  // MARK! BU EOL json OLD!!!
178 
179  // create a DataPoint object and take a snapshot of the monitored data into it
180  DataPoint dp;
181  lumiMon_->snap(dp);
182 
183  std::ostringstream ostr;
184  ostr << destinationDir_ << "/EoLS_" << std::setfill('0') << std::setw(6) << ls << ".json";
185  int outfd_ = open(ostr.str().c_str(), O_WRONLY | O_CREAT, S_IRWXU);
186  if(outfd_!=0){close(outfd_); outfd_=0;}
187 
188  // serialize the DataPoint and output it
189  string output;
190  JSONSerializer::serialize(&dp, output);
191 
192  string path = ostr.str();
193  FileIO::writeStringToFile(path, output);
194 
195  perLumiEventCount_ = 0;
196 
197 
198 }
199 
200 void MTRawEventFileWriterForBU::queueEvent(const char* buffer,unsigned long size)
201 {
202 
203 #ifdef linux
204  bool queuing = false;
205  unsigned int freeId = 0xffff;
206  while (!queuing) {
207  queue_lock.lock();
208  if (freeIds.size()) {
209  freeId = freeIds.front();
210  freeIds.pop_front();
211  queuing = true;
212  }
213  queue_lock.unlock();
214  if (!queuing) usleep(100000);
215  }
216  assert(freeId!=0xff);
217  EventPool[freeId]->putNewEvent((unsigned char*)buffer,size);
218 
219  queue_lock.lock();
220  queuedIds.push_back(freeId);
222  queue_lock.unlock();
223 #endif
224 }
225 
226 
227 void MTRawEventFileWriterForBU::queueEvent(boost::shared_array<unsigned char>& msg)
228 {
229 
230 #ifdef linux
231  bool queuing = false;
232  unsigned int freeId = 0xffff;
233  while (!queuing) {
234  queue_lock.lock();
235  if (freeIds.size()) {
236  freeId = freeIds.front();
237  freeIds.pop_front();
238  queuing = true;
239  }
240  queue_lock.unlock();
241  if (!queuing) usleep(100000);
242  }
243  assert(freeId!=0xff);
244  EventPool[freeId]->putNewEvent(msg);
245 
246  queue_lock.lock();
247  queuedIds.push_back(freeId);
249  queue_lock.unlock();
250 #endif
251 }
252 
253 
255 {
256 #ifdef linux
257  close_flag_=false;
258  //v_adlera_(numWriters_,1);
259  //v_adlerb_(numWriters_,0);
260  for (unsigned int i=0;i<instances;i++)
261  {
262  std::ostringstream instanceName;
263  instanceName << fileBase << "_" << i;
264  if (suffix.size())
265  instanceName << "." << suffix;
266 
267  // populate perFileCounters
268  IntJ* currentVal = new IntJ();
269  *currentVal = 0;
270  currentVal->setName("NEvents");
271  perFileCounters_.push_back(currentVal);
272  // create perFileMonitors
273  perLumiEventCount_.setName("NEvents");
274 
275  // create a vector of all monitorable parameters to be passed to the monitor
276  vector<JsonMonitorable*> lumiMonParams;
277  lumiMonParams.push_back(perFileCounters_[i]);
278 
279  // create a DataPointMonitor using vector of monitorable parameters and a path to a JSON Definition file
280  perFileMonitors_.push_back(new DataPointMonitor(lumiMonParams, "/home/aspataru/cmssw/CMSSW_6_1_0_pre4/src/EventFilter/Utilities/plugins/budef.jsd"));
281 
282  writers.push_back(std::auto_ptr<std::thread>(new std::thread(&MTRawEventFileWriterForBU::threadRunner,this,instanceName.str(),i)));
283  }
284 #endif
285 }
286 
287 
289 {
290 #ifdef linux
291  //new file..
292  if (debug_)
293  std::cout << "opening file for writing " << fileName.c_str() << std::endl;
294  int outfd_ = open(fileName.c_str(), O_WRONLY | O_CREAT, S_IRWXU);
295  if(outfd_ == -1) {
296  throw cms::Exception("RawEventFileWriterForBU","initialize")
297  << "Error opening FED Raw Data event output file: " << fileName
298  << ": " << strerror(errno) << "\n";
299  }
300  std::auto_ptr<std::ofstream> ost_;
301  ost_.reset(new std::ofstream(fileName.c_str(), std::ios_base::binary | std::ios_base::out));
302 
303  if (!ost_->is_open()) {
304  throw cms::Exception("RawEventFileWriterForBU","initialize")
305  << "Error opening FED Raw Data event output file: " << fileName << "\n";
306  }
307  //prepare header
308  /*
309  memset ((void*)fileHeader_,0,1024*1024);
310  fileHeader_[0]=3;//version
311  ost_->write((const char*)fileHeader_,1024*1024);
312  */
313  //event writing loop
314  while (1) {
315  queue_lock.lock();
316 
317  if (!queuedIds.size()) {
318  if (!close_flag_) {
319  queue_lock.unlock();
320  usleep(100000);//todo:use timed cond wait
321  continue;
322  }
323  }
324  if (close_flag_) {
325  queue_lock.unlock();
326  break;
327  }
328  //take next event
329  unsigned int qid = queuedIds.back();
330  queuedIds.pop_back();
331  queue_lock.unlock();
332  if (!EventPool[qid]->sharedMode()) {
333  ost_->write((const char*) EventPool[qid]->getBuffer(), EventPool[qid]->getSize());
334  }
335  else {
336  boost::shared_array<unsigned char> * sharedBuf = EventPool[qid]->getSharedBuffer();
337  FRDEventMsgView frd((*sharedBuf).get());
338  ost_->write((const char*) frd.startAddress(),frd.size());
339  sharedBuf->reset();//release reference
340  }
341  if (ost_->fail()) {
342  //todo:signal to main thread
343  throw cms::Exception("RawEventFileWriterForBU", "doOutputEventFragment")
344  << "Error writing FED Raw Data event data to "
345  << fileName_ << ". Possibly the output disk "
346  << "is full?" << std::endl;
347  }
348 
349  queue_lock.lock();
350  freeIds.push_back(qid);
351  perFileCounters_[instance]->value()++;
352  queue_lock.unlock();
353  }
354  //flush and close file
355  ost_->flush();
356  if (ost_->fail()) {
357  throw cms::Exception("RawEventFileWriterForBU", "doOutputEventFragment")
358  << "Error writing FED Raw Data event data to "
359  << fileName_ << ". Possibly the output disk "
360  << "is full?" << std::endl;
361  }
362  ost_.reset();
363  if(outfd_!=0){ close(outfd_); outfd_=0;}
364 
365  //move file to destination dir
366  int fretval = rename(fileName.c_str(),(destinationDir_+lumiSectionSubDir_+fileName.substr(fileName.rfind("/"))).c_str());
367  if (debug_)
368  std::cout << " tried move " << fileName << " to " << destinationDir_+lumiSectionSubDir_
369  << " status " << fretval << " errno " << strerror(errno) << std::endl;
370 
371  // MARK! BU per-file json OLD!!!
372  DataPoint dp;
373  perFileMonitors_[instance]->snap(dp);
374  string output;
375  JSONSerializer::serialize(&dp, output);
376  std::stringstream ss;
377  ss << destinationDir_ << lumiSectionSubDir_ << fileName.substr(fileName.rfind("/") + 1, fileName.size() - fileName.rfind("/") - 5) << ".jsn";
378  string path = ss.str();
379  FileIO::writeStringToFile(path, output);
380  if (debug_)
381  std::cout << "Wrote JSON input file: " << path << std::endl;
382 
383  perFileCounters_[instance]->value() = 0;
384 
385 #endif
386 }
void doOutputEvent(FRDEventMsgView const &msg)
int i
Definition: DBlmapReader.cc:9
MTRawEventFileWriterForBU(edm::ParameterSet const &ps)
void initialize(std::string const &destinationDir, std::string const &name, int ls)
void putNewEvent(boost::shared_array< unsigned char > &msg)
unsigned int getBufSize() const
void threadRunner(std::string fileName, unsigned int instance)
static PFTauRenderPlugin instance
uint32 size() const
std::vector< fwriter::EventContainer * > EventPool
void doOutputEventFragment(unsigned char *dataPtr, unsigned long dataSize)
void putNewEvent(unsigned char *addr, unsigned int size)
uint8 * startAddress() const
EventContainer(unsigned int evtBufSize)
void queueEvent(const char *buffer, unsigned long size)
tuple out
Definition: dbtoconf.py:99
std::deque< unsigned int > queuedIds
auto dp
Definition: deltaR.h:24
boost::shared_array< unsigned char > * getSharedBuffer()
std::auto_ptr< unsigned char > data_
boost::shared_array< unsigned char > shared_data_
Binary serialize(const T &payload, bool packingOnly=false)
Definition: Serialization.h:88
tuple cout
Definition: gather_cfg.py:121
volatile std::atomic< bool > shutdown_flag false
std::deque< unsigned int > freeIds
unsigned char * getBuffer() const
std::vector< DataPointMonitor * > perFileMonitors_
tuple size
Write out results.
void dispatchThreads(std::string fileBase, unsigned int instances, std::string suffix)
std::vector< IntJ * > perFileCounters_
void snap(DataPoint &outputDataPoint)