CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Member Functions | Private Attributes
MTRawEventFileWriterForBU Class Reference

#include <MTRawEventFileWriterForBU.h>

Public Member Functions

uint32 adler32 (uint32 adlera, uint32 adlerb) const
 
void doFlushFile ()
 
void doOutputEvent (FRDEventMsgView const &msg)
 
void doOutputEvent (boost::shared_array< unsigned char > &msg)
 
void doOutputEventFragment (unsigned char *dataPtr, unsigned long dataSize)
 
void endOfLS (int ls)
 
void initialize (std::string const &destinationDir, std::string const &name, int ls)
 
 MTRawEventFileWriterForBU (edm::ParameterSet const &ps)
 
bool sharedMode () const
 
void start ()
 
void stop ()
 
 ~MTRawEventFileWriterForBU ()
 

Private Member Functions

void dispatchThreads (std::string fileBase, unsigned int instances, std::string suffix)
 
void finishThreads ()
 
void queueEvent (const char *buffer, unsigned long size)
 
void queueEvent (boost::shared_array< unsigned char > &msg)
 
void threadRunner (std::string fileName, unsigned int instance)
 

Private Attributes

bool debug_
 
std::string destinationDir_
 
unsigned int eventBufferSize_
 
std::vector
< fwriter::EventContainer * > 
EventPool
 
unsigned char * fileHeader_
 
std::string fileName_
 
int finishAfterLS_
 
std::deque< unsigned int > freeIds
 
DataPointMonitorlumiMon_
 
std::string lumiSectionSubDir_
 
bool lumiSubdirectoriesMode_
 
unsigned int numWriters_
 
std::vector< IntJ * > perFileCounters_
 
std::vector< DataPointMonitor * > perFileMonitors_
 
IntJ perLumiEventCount_
 
std::deque< unsigned int > queuedIds
 
bool sharedMode_
 
std::vector< uint32v_adlera_
 
std::vector< uint32v_adlerb_
 

Detailed Description

Definition at line 36 of file MTRawEventFileWriterForBU.h.

Constructor & Destructor Documentation

MTRawEventFileWriterForBU::MTRawEventFileWriterForBU ( edm::ParameterSet const &  ps)
explicit

Definition at line 65 of file MTRawEventFileWriterForBU.cc.

References eventBufferSize_, EventPool, fileHeader_, freeIds, i, lumiMon_, perLumiEventCount_, jsoncollector::JsonMonitorable::setName(), and sharedMode_.

65  :
66  lumiMon_(0),
67  numWriters_(ps.getUntrackedParameter<unsigned int>("numWriters",1))
68 ,eventBufferSize_(ps.getUntrackedParameter<unsigned int>("eventBufferSize",30))
69 ,sharedMode_(ps.getUntrackedParameter<bool>("sharedMode",true))
70 ,lumiSubdirectoriesMode_(ps.getUntrackedParameter<bool>("lumiSubdirectoriesMode",true))
71 ,debug_(ps.getUntrackedParameter<bool>("debug",false))
72 //,finishAfterLS_(ps.getUntrackedParameter<int>,-1)
73 {
74  for (unsigned int i=0;i<eventBufferSize_;i++) {
75  if (!sharedMode_)
76  EventPool.push_back(new fwriter::EventContainer(1048576));
77  else
78  EventPool.push_back(new fwriter::EventContainer());
79  freeIds.push_back(i);
80  }
81  fileHeader_= new unsigned char[1024*1024];
82 
84  // set names of the variables to be matched with JSON Definition
85  perLumiEventCount_.setName("NEvents");
86 
87  // create a vector of all monitorable parameters to be passed to the monitor
88  vector<JsonMonitorable*> lumiMonParams;
89  lumiMonParams.push_back(&perLumiEventCount_);
90 
91  // create a DataPointMonitor using vector of monitorable parameters and a path to a JSON Definition file
92  lumiMon_ = new DataPointMonitor(lumiMonParams, "/home/aspataru/cmssw/CMSSW_6_1_0_pre4/src/EventFilter/Utilities/plugins/budef.jsd");
93 }
int i
Definition: DBlmapReader.cc:9
std::vector< fwriter::EventContainer * > EventPool
std::deque< unsigned int > freeIds
MTRawEventFileWriterForBU::~MTRawEventFileWriterForBU ( )

Definition at line 96 of file MTRawEventFileWriterForBU.cc.

References finishThreads(), lumiMon_, perFileCounters_, and perFileMonitors_.

97 {
98  finishThreads();
99  if (lumiMon_ != 0)
100  delete lumiMon_;
101  while (!perFileMonitors_.empty()) {
102  delete perFileMonitors_.back();
103  perFileMonitors_.pop_back();
104  }
105  while (!perFileCounters_.empty()) {
106  delete perFileCounters_.back();
107  perFileCounters_.pop_back();
108  }
109 }
std::vector< DataPointMonitor * > perFileMonitors_
std::vector< IntJ * > perFileCounters_

Member Function Documentation

uint32 MTRawEventFileWriterForBU::adler32 ( uint32  adlera,
uint32  adlerb 
) const
inline

Definition at line 48 of file MTRawEventFileWriterForBU.h.

48 { return 0;/*return (adlerb_ << 16) | adlera_;*/ }
void MTRawEventFileWriterForBU::dispatchThreads ( std::string  fileBase,
unsigned int  instances,
std::string  suffix 
)
private

Definition at line 254 of file MTRawEventFileWriterForBU.cc.

References i, perFileCounters_, perFileMonitors_, perLumiEventCount_, jsoncollector::JsonMonitorable::setName(), and threadRunner().

Referenced by initialize().

255 {
256 #ifdef linux
257  close_flag_=false;
258  //v_adlera_(numWriters_,1);
259  //v_adlerb_(numWriters_,0);
260  for (unsigned int i=0;i<instances;i++)
261  {
262  std::ostringstream instanceName;
263  instanceName << fileBase << "_" << i;
264  if (suffix.size())
265  instanceName << "." << suffix;
266 
267  // populate perFileCounters
268  IntJ* currentVal = new IntJ();
269  *currentVal = 0;
270  currentVal->setName("NEvents");
271  perFileCounters_.push_back(currentVal);
272  // create perFileMonitors
273  perLumiEventCount_.setName("NEvents");
274 
275  // create a vector of all monitorable parameters to be passed to the monitor
276  vector<JsonMonitorable*> lumiMonParams;
277  lumiMonParams.push_back(perFileCounters_[i]);
278 
279  // create a DataPointMonitor using vector of monitorable parameters and a path to a JSON Definition file
280  perFileMonitors_.push_back(new DataPointMonitor(lumiMonParams, "/home/aspataru/cmssw/CMSSW_6_1_0_pre4/src/EventFilter/Utilities/plugins/budef.jsd"));
281 
282  writers.push_back(std::auto_ptr<std::thread>(new std::thread(&MTRawEventFileWriterForBU::threadRunner,this,instanceName.str(),i)));
283  }
284 #endif
285 }
int i
Definition: DBlmapReader.cc:9
void threadRunner(std::string fileName, unsigned int instance)
std::vector< DataPointMonitor * > perFileMonitors_
std::vector< IntJ * > perFileCounters_
void MTRawEventFileWriterForBU::doFlushFile ( )

Definition at line 124 of file MTRawEventFileWriterForBU.cc.

125 {
126  //not implemented
127 }
void MTRawEventFileWriterForBU::doOutputEvent ( FRDEventMsgView const &  msg)

Definition at line 111 of file MTRawEventFileWriterForBU.cc.

References queueEvent(), sharedMode_, FRDEventMsgView::size(), and FRDEventMsgView::startAddress().

112 {
113  if (sharedMode_) return;
114  queueEvent((const char*)msg.startAddress(), msg.size());
115 }
void queueEvent(const char *buffer, unsigned long size)
void MTRawEventFileWriterForBU::doOutputEvent ( boost::shared_array< unsigned char > &  msg)

Definition at line 118 of file MTRawEventFileWriterForBU.cc.

References queueEvent(), and sharedMode_.

119 {
120  if (!sharedMode_) return;
121  queueEvent(msg);
122 }
void queueEvent(const char *buffer, unsigned long size)
void MTRawEventFileWriterForBU::doOutputEventFragment ( unsigned char *  dataPtr,
unsigned long  dataSize 
)

Definition at line 129 of file MTRawEventFileWriterForBU.cc.

References queueEvent().

131 {
132  queueEvent((const char*) dataPtr, dataSize);
133 
134  /*
135  ost_->flush();
136  if (ost_->fail()) {
137  throw cms::Exception("RawEventFileWriterForBU", "doOutputEventFragment")
138  << "Error writing FED Raw Data event data to "
139  << fileName_ << ". Possibly the output disk "
140  << "is full?" << std::endl;
141  }
142  */
143 
144  //cms::Adler32((const char*) dataPtr, dataSize, adlera_, adlerb_);
145 }
void queueEvent(const char *buffer, unsigned long size)
void MTRawEventFileWriterForBU::endOfLS ( int  ls)

Definition at line 173 of file MTRawEventFileWriterForBU.cc.

References destinationDir_, reco::dp, finishThreads(), lumiMon_, convertSQLitetoXML_cfg::output, getHLTPrescaleColumns::path, perLumiEventCount_, cond::serialize(), and jsoncollector::DataPointMonitor::snap().

174 {
175  finishThreads();
176  //writing empty EoLS file (will be filled with information)
177  // MARK! BU EOL json OLD!!!
178 
179  // create a DataPoint object and take a snapshot of the monitored data into it
180  DataPoint dp;
181  lumiMon_->snap(dp);
182 
183  std::ostringstream ostr;
184  ostr << destinationDir_ << "/EoLS_" << std::setfill('0') << std::setw(6) << ls << ".json";
185  int outfd_ = open(ostr.str().c_str(), O_WRONLY | O_CREAT, S_IRWXU);
186  if(outfd_!=0){close(outfd_); outfd_=0;}
187 
188  // serialize the DataPoint and output it
189  string output;
190  JSONSerializer::serialize(&dp, output);
191 
192  string path = ostr.str();
193  FileIO::writeStringToFile(path, output);
194 
195  perLumiEventCount_ = 0;
196 
197 
198 }
auto dp
Definition: deltaR.h:24
Binary serialize(const T &payload, bool packingOnly=false)
Definition: Serialization.h:88
void snap(DataPoint &outputDataPoint)
void MTRawEventFileWriterForBU::finishThreads ( )
inlineprivate

Definition at line 67 of file MTRawEventFileWriterForBU.h.

References w().

Referenced by endOfLS(), initialize(), and ~MTRawEventFileWriterForBU().

68  {
69 #ifdef linux
70  close_flag_=true;
71  for (auto w: writers) w->join();
72  writers.clear();
73 #endif
74  }
T w() const
void MTRawEventFileWriterForBU::initialize ( std::string const &  destinationDir,
std::string const &  name,
int  ls 
)

Definition at line 147 of file MTRawEventFileWriterForBU.cc.

References destinationDir_, dispatchThreads(), finishThreads(), lumiSectionSubDir_, lumiSubdirectoriesMode_, mergeVDriftHistosByStation::name, numWriters_, and AlCaHLTBitMon_QueryRunRegistry::string.

148 {
149 
150  destinationDir_ = destinationDir+"/";
152  std::ostringstream lsdir;
153  lsdir << "ls" << std::setfill('0') << std::setw(6) << ls << "/";
154  lumiSectionSubDir_ = lsdir.str();
155  mkdir((destinationDir_+lumiSectionSubDir_).c_str(),0755);
156  }
157  else lumiSectionSubDir_="";
158 
159  std::string fileBase=name;
160  std::string fileSuffix;
161 
162  boost::char_separator<char> sep(".");
163  boost::tokenizer<boost::char_separator<char>> tokens(name, sep);
164 
165  fileBase=*tokens.begin();
166  for (auto tok_iter = tokens.begin(); tok_iter != tokens.end(); ++tok_iter) {
167  fileSuffix=*tok_iter;
168  }
169  finishThreads();
170  dispatchThreads(fileBase,numWriters_,fileSuffix);
171 }
void dispatchThreads(std::string fileBase, unsigned int instances, std::string suffix)
void MTRawEventFileWriterForBU::queueEvent ( const char *  buffer,
unsigned long  size 
)
inlineprivate

Definition at line 200 of file MTRawEventFileWriterForBU.cc.

References EventPool, freeIds, perLumiEventCount_, queuedIds, and jsoncollector::IntJ::value().

Referenced by doOutputEvent(), and doOutputEventFragment().

201 {
202 
203 #ifdef linux
204  bool queuing = false;
205  unsigned int freeId = 0xffff;
206  while (!queuing) {
207  queue_lock.lock();
208  if (freeIds.size()) {
209  freeId = freeIds.front();
210  freeIds.pop_front();
211  queuing = true;
212  }
213  queue_lock.unlock();
214  if (!queuing) usleep(100000);
215  }
216  assert(freeId!=0xff);
217  EventPool[freeId]->putNewEvent((unsigned char*)buffer,size);
218 
219  queue_lock.lock();
220  queuedIds.push_back(freeId);
222  queue_lock.unlock();
223 #endif
224 }
std::vector< fwriter::EventContainer * > EventPool
std::deque< unsigned int > queuedIds
std::deque< unsigned int > freeIds
tuple size
Write out results.
void MTRawEventFileWriterForBU::queueEvent ( boost::shared_array< unsigned char > &  msg)
inlineprivate

Definition at line 227 of file MTRawEventFileWriterForBU.cc.

References EventPool, freeIds, perLumiEventCount_, queuedIds, and jsoncollector::IntJ::value().

228 {
229 
230 #ifdef linux
231  bool queuing = false;
232  unsigned int freeId = 0xffff;
233  while (!queuing) {
234  queue_lock.lock();
235  if (freeIds.size()) {
236  freeId = freeIds.front();
237  freeIds.pop_front();
238  queuing = true;
239  }
240  queue_lock.unlock();
241  if (!queuing) usleep(100000);
242  }
243  assert(freeId!=0xff);
244  EventPool[freeId]->putNewEvent(msg);
245 
246  queue_lock.lock();
247  queuedIds.push_back(freeId);
249  queue_lock.unlock();
250 #endif
251 }
std::vector< fwriter::EventContainer * > EventPool
std::deque< unsigned int > queuedIds
std::deque< unsigned int > freeIds
bool MTRawEventFileWriterForBU::sharedMode ( ) const
inline

Definition at line 56 of file MTRawEventFileWriterForBU.h.

Referenced by threadRunner().

void MTRawEventFileWriterForBU::start ( void  )
inline
void MTRawEventFileWriterForBU::stop ( )
inline

Definition at line 51 of file MTRawEventFileWriterForBU.h.

51  {
52  finishThreads();
53  }
void MTRawEventFileWriterForBU::threadRunner ( std::string  fileName,
unsigned int  instance 
)
private

Definition at line 288 of file MTRawEventFileWriterForBU.cc.

References gather_cfg::cout, debug_, destinationDir_, reco::dp, EventPool, edm::hlt::Exception, fileName_, freeIds, instance, lumiSectionSubDir_, dbtoconf::out, convertSQLitetoXML_cfg::output, getHLTPrescaleColumns::path, perFileCounters_, perFileMonitors_, queuedIds, cond::serialize(), and sharedMode().

Referenced by dispatchThreads().

289 {
290 #ifdef linux
291  //new file..
292  if (debug_)
293  std::cout << "opening file for writing " << fileName.c_str() << std::endl;
294  int outfd_ = open(fileName.c_str(), O_WRONLY | O_CREAT, S_IRWXU);
295  if(outfd_ == -1) {
296  throw cms::Exception("RawEventFileWriterForBU","initialize")
297  << "Error opening FED Raw Data event output file: " << fileName
298  << ": " << strerror(errno) << "\n";
299  }
300  std::auto_ptr<std::ofstream> ost_;
301  ost_.reset(new std::ofstream(fileName.c_str(), std::ios_base::binary | std::ios_base::out));
302 
303  if (!ost_->is_open()) {
304  throw cms::Exception("RawEventFileWriterForBU","initialize")
305  << "Error opening FED Raw Data event output file: " << fileName << "\n";
306  }
307  //prepare header
308  /*
309  memset ((void*)fileHeader_,0,1024*1024);
310  fileHeader_[0]=3;//version
311  ost_->write((const char*)fileHeader_,1024*1024);
312  */
313  //event writing loop
314  while (1) {
315  queue_lock.lock();
316 
317  if (!queuedIds.size()) {
318  if (!close_flag_) {
319  queue_lock.unlock();
320  usleep(100000);//todo:use timed cond wait
321  continue;
322  }
323  }
324  if (close_flag_) {
325  queue_lock.unlock();
326  break;
327  }
328  //take next event
329  unsigned int qid = queuedIds.back();
330  queuedIds.pop_back();
331  queue_lock.unlock();
332  if (!EventPool[qid]->sharedMode()) {
333  ost_->write((const char*) EventPool[qid]->getBuffer(), EventPool[qid]->getSize());
334  }
335  else {
336  boost::shared_array<unsigned char> * sharedBuf = EventPool[qid]->getSharedBuffer();
337  FRDEventMsgView frd((*sharedBuf).get());
338  ost_->write((const char*) frd.startAddress(),frd.size());
339  sharedBuf->reset();//release reference
340  }
341  if (ost_->fail()) {
342  //todo:signal to main thread
343  throw cms::Exception("RawEventFileWriterForBU", "doOutputEventFragment")
344  << "Error writing FED Raw Data event data to "
345  << fileName_ << ". Possibly the output disk "
346  << "is full?" << std::endl;
347  }
348 
349  queue_lock.lock();
350  freeIds.push_back(qid);
351  perFileCounters_[instance]->value()++;
352  queue_lock.unlock();
353  }
354  //flush and close file
355  ost_->flush();
356  if (ost_->fail()) {
357  throw cms::Exception("RawEventFileWriterForBU", "doOutputEventFragment")
358  << "Error writing FED Raw Data event data to "
359  << fileName_ << ". Possibly the output disk "
360  << "is full?" << std::endl;
361  }
362  ost_.reset();
363  if(outfd_!=0){ close(outfd_); outfd_=0;}
364 
365  //move file to destination dir
366  int fretval = rename(fileName.c_str(),(destinationDir_+lumiSectionSubDir_+fileName.substr(fileName.rfind("/"))).c_str());
367  if (debug_)
368  std::cout << " tried move " << fileName << " to " << destinationDir_+lumiSectionSubDir_
369  << " status " << fretval << " errno " << strerror(errno) << std::endl;
370 
371  // MARK! BU per-file json OLD!!!
372  DataPoint dp;
373  perFileMonitors_[instance]->snap(dp);
374  string output;
375  JSONSerializer::serialize(&dp, output);
376  std::stringstream ss;
377  ss << destinationDir_ << lumiSectionSubDir_ << fileName.substr(fileName.rfind("/") + 1, fileName.size() - fileName.rfind("/") - 5) << ".jsn";
378  string path = ss.str();
379  FileIO::writeStringToFile(path, output);
380  if (debug_)
381  std::cout << "Wrote JSON input file: " << path << std::endl;
382 
383  perFileCounters_[instance]->value() = 0;
384 
385 #endif
386 }
static PFTauRenderPlugin instance
std::vector< fwriter::EventContainer * > EventPool
tuple out
Definition: dbtoconf.py:99
std::deque< unsigned int > queuedIds
auto dp
Definition: deltaR.h:24
Binary serialize(const T &payload, bool packingOnly=false)
Definition: Serialization.h:88
tuple cout
Definition: gather_cfg.py:121
std::deque< unsigned int > freeIds
std::vector< DataPointMonitor * > perFileMonitors_
std::vector< IntJ * > perFileCounters_

Member Data Documentation

bool MTRawEventFileWriterForBU::debug_
private

Definition at line 87 of file MTRawEventFileWriterForBU.h.

Referenced by threadRunner().

std::string MTRawEventFileWriterForBU::destinationDir_
private

Definition at line 60 of file MTRawEventFileWriterForBU.h.

Referenced by endOfLS(), initialize(), and threadRunner().

unsigned int MTRawEventFileWriterForBU::eventBufferSize_
private

Definition at line 84 of file MTRawEventFileWriterForBU.h.

Referenced by MTRawEventFileWriterForBU().

std::vector<fwriter::EventContainer*> MTRawEventFileWriterForBU::EventPool
private

Definition at line 97 of file MTRawEventFileWriterForBU.h.

Referenced by MTRawEventFileWriterForBU(), queueEvent(), and threadRunner().

unsigned char* MTRawEventFileWriterForBU::fileHeader_
private

Definition at line 105 of file MTRawEventFileWriterForBU.h.

Referenced by MTRawEventFileWriterForBU().

std::string MTRawEventFileWriterForBU::fileName_
private

Definition at line 59 of file MTRawEventFileWriterForBU.h.

Referenced by threadRunner().

int MTRawEventFileWriterForBU::finishAfterLS_
private

Definition at line 88 of file MTRawEventFileWriterForBU.h.

std::deque<unsigned int> MTRawEventFileWriterForBU::freeIds
private

Definition at line 95 of file MTRawEventFileWriterForBU.h.

Referenced by MTRawEventFileWriterForBU(), queueEvent(), and threadRunner().

DataPointMonitor* MTRawEventFileWriterForBU::lumiMon_
private
std::string MTRawEventFileWriterForBU::lumiSectionSubDir_
private

Definition at line 61 of file MTRawEventFileWriterForBU.h.

Referenced by initialize(), and threadRunner().

bool MTRawEventFileWriterForBU::lumiSubdirectoriesMode_
private

Definition at line 86 of file MTRawEventFileWriterForBU.h.

Referenced by initialize().

unsigned int MTRawEventFileWriterForBU::numWriters_
private

Definition at line 83 of file MTRawEventFileWriterForBU.h.

Referenced by initialize().

std::vector<IntJ*> MTRawEventFileWriterForBU::perFileCounters_
private
std::vector<DataPointMonitor*> MTRawEventFileWriterForBU::perFileMonitors_
private
IntJ MTRawEventFileWriterForBU::perLumiEventCount_
private
std::deque<unsigned int> MTRawEventFileWriterForBU::queuedIds
private

Definition at line 96 of file MTRawEventFileWriterForBU.h.

Referenced by queueEvent(), and threadRunner().

bool MTRawEventFileWriterForBU::sharedMode_
private

Definition at line 85 of file MTRawEventFileWriterForBU.h.

Referenced by doOutputEvent(), and MTRawEventFileWriterForBU().

std::vector<uint32> MTRawEventFileWriterForBU::v_adlera_
private

Definition at line 102 of file MTRawEventFileWriterForBU.h.

std::vector<uint32> MTRawEventFileWriterForBU::v_adlerb_
private

Definition at line 103 of file MTRawEventFileWriterForBU.h.