CMS 3D CMS Logo

RawEventFileWriterForBU.cc
Go to the documentation of this file.
1 #include <cerrno>
2 #include <cstdio>
3 #include <cstdlib>
4 #include <cstring>
5 #include <iostream>
6 #include <sstream>
7 
8 // CMSSW headers
18 
19 using namespace jsoncollector;
20 using namespace edm::streamer;
21 
22 //TODO:get run directory information from DaqDirector
23 
25  : microSleep_(ps.getParameter<int>("microSleep")),
26  frdFileVersion_(ps.getParameter<unsigned int>("frdFileVersion")) {
27  //per-file JSD and FastMonitor
28  rawJsonDef_.setDefaultGroup("legend");
29  rawJsonDef_.addLegendItem("NEvents", "integer", DataPointDefinition::SUM);
30 
31  perFileEventCount_.setName("NEvents");
32  perFileSize_.setName("NBytes");
33 
34  fileMon_ = new FastMonitor(&rawJsonDef_, false);
37  fileMon_->commit(nullptr);
38 
39  //per-lumi JSD and FastMonitor
40  eolJsonDef_.setDefaultGroup("legend");
41  eolJsonDef_.addLegendItem("NEvents", "integer", DataPointDefinition::SUM);
43  eolJsonDef_.addLegendItem("TotalEvents", "integer", DataPointDefinition::SUM);
44  eolJsonDef_.addLegendItem("NLostEvents", "integer", DataPointDefinition::SUM);
45 
46  perLumiEventCount_.setName("NEvents");
47  perLumiFileCount_.setName("NFiles");
48  perLumiTotalEventCount_.setName("TotalEvents");
49  perLumiLostEventCount_.setName("NLostEvents");
50  perLumiSize_.setName("NBytes");
51 
52  lumiMon_ = new FastMonitor(&eolJsonDef_, false);
58  lumiMon_->commit(nullptr);
59 
60  //per-run JSD and FastMonitor
61  eorJsonDef_.setDefaultGroup("legend");
62  eorJsonDef_.addLegendItem("NEvents", "integer", DataPointDefinition::SUM);
65  eorJsonDef_.addLegendItem("LastLumi", "integer", DataPointDefinition::SUM);
66 
67  perRunEventCount_.setName("NEvents");
68  perRunFileCount_.setName("NFiles");
69  perRunLumiCount_.setName("NLumis");
70  perRunLastLumi_.setName("LastLumi");
71 
72  runMon_ = new FastMonitor(&eorJsonDef_, false);
77  runMon_->commit(nullptr);
78 }
79 
81 
83  delete fileMon_;
84  delete lumiMon_;
85  delete runMon_;
86 }
87 
89  ssize_t retval = write(outfd_, (void*)msg.startAddress(), msg.size());
90 
91  if ((unsigned)retval != msg.size()) {
92  throw cms::Exception("RawEventFileWriterForBU", "doOutputEvent")
93  << "Error writing FED Raw Data event data to " << fileName_ << ". Possibly the output disk "
94  << "is full?" << std::endl;
95  }
96 
97  // throttle event output
98  usleep(microSleep_);
100  perFileSize_.value() += msg.size();
101 
102  // cms::Adler32((const char*) msg.startAddress(), msg.size(), adlera_, adlerb_);
103 }
104 
105 void RawEventFileWriterForBU::initialize(std::string const& destinationDir, std::string const& name, int ls) {
106  destinationDir_ = destinationDir;
107 
108  if (outfd_ != -1) {
110  closefd();
111  }
112 
113  fileName_ = name;
114 
115  if (!writtenJSDs_) {
116  writeJsds();
117  /* std::stringstream ss;
118  ss << destinationDir_ << "/jsd";
119  mkdir(ss.str().c_str(), S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
120 
121  std::string rawJSDName = ss.str()+"/rawData.jsd";
122  std::string eolJSDName = ss.str()+"/EoLS.jsd";
123  std::string eorJSDName = ss.str()+"/EoR.jsd";
124 
125  fileMon_->setDefPath(rawJSDName);
126  lumiMon_->setDefPath(eolJSDName);
127  runMon_->setDefPath(eorJSDName);
128 
129  struct stat fstat;
130  if (stat (rawJSDName.c_str(), &fstat) != 0) {
131  std::string content;
132  JSONSerializer::serialize(&rawJsonDef_,content);
133  FileIO::writeStringToFile(rawJSDName, content);
134  }
135 
136  if (stat (eolJSDName.c_str(), &fstat) != 0) {
137  std::string content;
138  JSONSerializer::serialize(&eolJsonDef_,content);
139  FileIO::writeStringToFile(eolJSDName, content);
140  }
141 
142  if (stat (eorJSDName.c_str(), &fstat) != 0) {
143  std::string content;
144  JSONSerializer::serialize(&eorJsonDef_,content);
145  FileIO::writeStringToFile(eorJSDName, content);
146  }
147 */
148  writtenJSDs_ = true;
149  }
150 
151  outfd_ = open(fileName_.c_str(), O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
152  edm::LogInfo("RawEventFileWriterForBU") << " opened " << fileName_;
153 
154  if (outfd_ < 0) { //attention here... it may happen that outfd_ is *not* set (e.g. missing initialize call...)
155  throw cms::Exception("RawEventFileWriterForBU", "initialize")
156  << "Error opening FED Raw Data event output file: " << name << ": " << strerror(errno) << "\n";
157  }
158 
160  perFileSize_.value() = 0;
161 
162  adlera_ = 1;
163  adlerb_ = 0;
164 
165  if (frdFileVersion_ == 1) {
166  //reserve space for file header
167  ftruncate(outfd_, sizeof(FRDFileHeader_v1));
168  lseek(outfd_, sizeof(FRDFileHeader_v1), SEEK_SET);
169  perFileSize_.value() = sizeof(FRDFileHeader_v1);
170  } else if (frdFileVersion_ == 2) {
171  ftruncate(outfd_, sizeof(FRDFileHeader_v2));
172  lseek(outfd_, sizeof(FRDFileHeader_v2), SEEK_SET);
173  perFileSize_.value() = sizeof(FRDFileHeader_v2);
174  }
175  assert(frdFileVersion_ <= 2);
176 }
177 
179  std::stringstream ss;
180  ss << destinationDir_ << "/jsd";
181  mkdir(ss.str().c_str(), S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
182 
183  std::string rawJSDName = ss.str() + "/rawData.jsd";
184  std::string eolJSDName = ss.str() + "/EoLS.jsd";
185  std::string eorJSDName = ss.str() + "/EoR.jsd";
186 
187  fileMon_->setDefPath(rawJSDName);
188  lumiMon_->setDefPath(eolJSDName);
189  runMon_->setDefPath(eorJSDName);
190 
191  struct stat fstat;
192  if (stat(rawJSDName.c_str(), &fstat) != 0) {
195  FileIO::writeStringToFile(rawJSDName, content);
196  }
197 
198  if (stat(eolJSDName.c_str(), &fstat) != 0) {
201  FileIO::writeStringToFile(eolJSDName, content);
202  }
203 
204  if (stat(eorJSDName.c_str(), &fstat) != 0) {
207  FileIO::writeStringToFile(eorJSDName, content);
208  }
209 }
210 
212  if (frdFileVersion_ == 1) {
213  //rewind
214  lseek(outfd_, 0, SEEK_SET);
215  FRDFileHeader_v1 frdFileHeader(perFileEventCount_.value(), (uint32_t)ls, perFileSize_.value());
216  write(outfd_, (char*)&frdFileHeader, sizeof(FRDFileHeader_v1));
217  closefd();
218  //move raw file from open to run directory
219  rename(fileName_.c_str(), (destinationDir_ + fileName_.substr(fileName_.rfind('/'))).c_str());
220 
221  edm::LogInfo("RawEventFileWriterForBU")
222  << "Wrote RAW input file: " << fileName_ << " with perFileEventCount = " << perFileEventCount_.value()
223  << " and size " << perFileSize_.value();
224  } else if (frdFileVersion_ == 2) {
225  lseek(outfd_, 0, SEEK_SET);
226  FRDFileHeader_v2 frdFileHeader(0, perFileEventCount_.value(), (uint32_t)run_, (uint32_t)ls, perFileSize_.value());
227  write(outfd_, (char*)&frdFileHeader, sizeof(FRDFileHeader_v2));
228  closefd();
229  //move raw file from open to run directory
230  rename(fileName_.c_str(), (destinationDir_ + fileName_.substr(fileName_.rfind('/'))).c_str());
231  edm::LogInfo("RawEventFileWriterForBU")
232  << "Wrote RAW input file: " << fileName_ << " with perFileEventCount = " << perFileEventCount_.value()
233  << " and size " << perFileSize_.value();
234  } else {
235  closefd();
236  //move raw file from open to run directory
237  rename(fileName_.c_str(), (destinationDir_ + fileName_.substr(fileName_.rfind('/'))).c_str());
238  //create equivalent JSON file
239  //TODO:fix this to use DaqDirector convention and better extension replace
241  std::string path = source.replace_extension(".jsn").string();
242 
243  fileMon_->snap(ls);
246 
247  //move the json file from open
248  rename(path.c_str(), (destinationDir_ + path.substr(path.rfind('/'))).c_str());
249 
250  edm::LogInfo("RawEventFileWriterForBU")
251  << "Wrote JSON input file: " << path << " with perFileEventCount = " << perFileEventCount_.value()
252  << " and size " << perFileSize_.value();
253  }
254  //there is a small chance that script gets interrupted while this isn't consistent (non-atomic)
259  //update open lumi value when first file is completed
260  lumiOpen_ = ls;
261 }
262 
264  if (outfd_ != -1) {
266  closefd();
267  }
268  lumiMon_->snap(ls);
269 
270  std::ostringstream ostr;
271 
272  if (run_ == -1)
274 
275  ostr << destinationDir_ << "/" << runPrefix_ << "_ls" << std::setfill('0') << std::setw(4) << ls << "_EoLS"
276  << ".jsn";
277  //outfd_ = open(ostr.str().c_str(), O_WRONLY | O_CREAT, S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
278  //closefd();
279 
280  std::string path = ostr.str();
283 
286  perRunLumiCount_.value() += 1;
288 
289  perLumiEventCount_ = 0;
290  perLumiFileCount_ = 0;
292  perLumiSize_ = 0;
293  lumiClosed_ = ls;
294 }
295 
297  if (lumiOpen_ > lumiClosed_)
299  edm::LogInfo("RawEventFileWriterForBU") << "Writing EOR file!";
300  if (!destinationDir_.empty()) {
301  // create EoR file
302  if (run_ == -1)
304  std::string path = destinationDir_ + "/" + runPrefix_ + "_ls0000_EoR.jsn";
305  runMon_->snap(0);
307  }
308 }
309 
310 //TODO:get from DaqDirector !
312  //dirty hack: extract run number from destination directory
313  std::string::size_type pos = destinationDir.find("run");
314  std::string run = destinationDir.substr(pos + 3);
315  run_ = atoi(run.c_str());
316  std::stringstream ss;
317  ss << "run" << std::setfill('0') << std::setw(6) << run_;
318  runPrefix_ = ss.str();
319 }
320 
322  desc.add<int>("microSleep", 0);
323  desc.add<unsigned int>("frdFileVersion", 0);
324 }
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
jsoncollector::FastMonitor * fileMon_
void initialize(std::string const &destinationDir, std::string const &name, int ls)
void setDefPath(std::string const &dpath)
Definition: FastMonitor.h:32
jsoncollector::IntJ perLumiLostEventCount_
edm::streamer::uint32 adlera_
std::pair< Binary, Binary > serialize(const T &payload)
Definition: Serialization.h:66
Definition: rename.py:1
jsoncollector::IntJ perFileEventCount_
jsoncollector::DataPointDefinition eolJsonDef_
jsoncollector::IntJ perRunLumiCount_
jsoncollector::IntJ perRunLastLumi_
jsoncollector::DataPointDefinition eorJsonDef_
jsoncollector::IntJ perLumiFileCount_
jsoncollector::IntJ perRunEventCount_
void doOutputEvent(edm::streamer::FRDEventMsgView const &msg)
edm::streamer::uint32 adlerb_
assert(be >=bs)
uint16_t size_type
void registerGlobalMonitorable(JsonMonitorable *newMonitorable, bool NAifZeroUpdates, unsigned int *nBins=nullptr)
Definition: FastMonitor.cc:67
jsoncollector::IntJ perLumiEventCount_
virtual void setName(std::string name)
jsoncollector::IntJ perLumiSize_
jsoncollector::FastMonitor * runMon_
jsoncollector::IntJ perRunFileCount_
static void extendDescription(edm::ParameterSetDescription &desc)
RawEventFileWriterForBU(edm::ParameterSet const &ps)
void commit(std::vector< unsigned int > *streamLumisPtr)
Definition: FastMonitor.cc:116
Log< level::Info, false > LogInfo
jsoncollector::DataPointDefinition rawJsonDef_
def ls(path, rec=False)
Definition: eostools.py:349
#define SUM(A, B)
bool outputFullJSON(std::string const &path, unsigned int lumi, bool output=true)
Definition: FastMonitor.cc:244
tuple msg
Definition: mps_check.py:286
def mkdir(path)
Definition: eostools.py:251
jsoncollector::IntJ perLumiTotalEventCount_
void snap(unsigned int ls)
Definition: FastMonitor.cc:190
jsoncollector::IntJ perFileSize_
void setDefaultGroup(std::string const &group)
void makeRunPrefix(std::string const &destinationDir)
static std::string const source
Definition: EdmProvDump.cc:49
jsoncollector::FastMonitor * lumiMon_
void discardCollected(unsigned int forLumi)
Definition: FastMonitor.cc:262