CMS 3D CMS Logo

RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
7 
8 #include <sstream>
9 #include <iomanip>
10 #include <boost/filesystem.hpp>
11 #include <boost/algorithm/string.hpp>
12 #include <zlib.h>
13 
20 
21 
22 namespace evf {
23  template<typename Consumer>
25 
34  public:
35  explicit RecoEventOutputModuleForFU(edm::ParameterSet const& ps);
37  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
38 
39  private:
40  void initRun();
41  virtual void start() override;
42  virtual void stop() override;
43  virtual void doOutputHeader(InitMsgBuilder const& init_message) override;
44  virtual void doOutputEvent(EventMsgBuilder const& msg) override;
45  //virtual void beginRun(edm::RunForOutput const&);
46  virtual void beginJob() override;
47  virtual void beginLuminosityBlock(edm::LuminosityBlockForOutput const&) override;
48  virtual void endLuminosityBlock(edm::LuminosityBlockForOutput const&) override;
49 
50  private:
51  std::auto_ptr<Consumer> c_;
66  boost::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
69  unsigned char* outBuf_=nullptr;
70  bool readAdler32Check_=false;
71 
72 
73  }; //end-of-class-def
74 
75  template<typename Consumer>
79  c_(new Consumer(ps)),
80  stream_label_(ps.getParameter<std::string>("@module_label")),
81  processed_(0),
82  accepted_(0),
83  errorEvents_(0),
84  retCodeMask_(0),
85  filelist_(),
86  filesize_(0),
87  inputFiles_(),
88  fileAdler32_(1),
90  mergeType_(),
91  hltErrorEvents_(0),
92  outBuf_(new unsigned char[1024*1024])
93  {
94  //replace hltOutoputA with stream if the HLT menu uses this convention
95  std::string testPrefix="hltOutput";
96  if (stream_label_.find(testPrefix)==0)
97  stream_label_=std::string("stream")+stream_label_.substr(testPrefix.size());
98 
99  if (stream_label_.find("_")!=std::string::npos) {
100  throw cms::Exception("RecoEventOutputModuleForFU")
101  << "Underscore character is reserved can not be used for stream names in FFF, but was detected in stream name -: " << stream_label_;
102  }
103 
104  std::string stream_label_lo = stream_label_;
105  boost::algorithm::to_lower(stream_label_lo);
106  auto streampos = stream_label_lo.rfind("stream");
107  if (streampos !=0 && streampos!=std::string::npos)
108  throw cms::Exception("RecoEventOutputModuleForFU")
109  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for names in FFF based HLT, but was detected in stream name";
110 
112  }
113 
114  template<typename Consumer>
116  {
117  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
118  readAdler32Check_ = edm::Service<evf::EvFDaqDirector>()->outputAdler32Recheck();
119  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
120  // create open dir if not already there
121  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
122 
123  processed_.setName("Processed");
124  accepted_.setName("Accepted");
125  errorEvents_.setName("ErrorEvents");
126  retCodeMask_.setName("ReturnCodeMask");
127  filelist_.setName("Filelist");
128  filesize_.setName("Filesize");
129  inputFiles_.setName("InputFiles");
130  fileAdler32_.setName("FileAdler32");
131  transferDestination_.setName("TransferDestination");
132  mergeType_.setName("MergeType");
133  hltErrorEvents_.setName("HLTErrorEvents");
134 
147  std::stringstream tmpss,ss;
148  tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
149  ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
150  std::string outTmpJsonDefName = tmpss.str();
151  std::string outJsonDefName = ss.str();
152 
153  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
154  struct stat fstat;
155  if (stat (outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
156  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
159  jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
160  boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
161  }
162  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
163 
165  jsonMonitor_->setDefPath(outJsonDefName);
166  jsonMonitor_->registerGlobalMonitorable(&processed_,false);
167  jsonMonitor_->registerGlobalMonitorable(&accepted_,false);
168  jsonMonitor_->registerGlobalMonitorable(&errorEvents_,false);
169  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,false);
170  jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
171  jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
172  jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
173  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,false);
174  jsonMonitor_->registerGlobalMonitorable(&transferDestination_,false);
175  jsonMonitor_->registerGlobalMonitorable(&mergeType_,false);
176  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_,false);
177  jsonMonitor_->commit(nullptr);
178 
179  }
180 
181  template<typename Consumer>
183 
184  template<typename Consumer>
185  void
187  {
188  initRun();
189  const std::string openInitFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
190  edm::LogInfo("RecoEventOutputModuleForFU") << "start() method, initializing streams. init stream -: "
191  << openInitFileName;
192  c_->setInitMessageFile(openInitFileName);
193  c_->start();
194 
195  }
196 
197  template<typename Consumer>
198  void
200  {
201  c_->stop();
202  }
203 
204  template<typename Consumer>
205  void
207  {
208  c_->doOutputHeader(init_message);
209 
210  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
211  struct stat istat;
212  stat(openIniFileName.c_str(), &istat);
213  //read back file to check integrity of what was written
214  off_t readInput=0;
215  uint32_t adlera=1,adlerb=0;
216  FILE *src = fopen(openIniFileName.c_str(),"r");
217  while (readInput<istat.st_size)
218  {
219  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
220  fread(outBuf_,toRead,1,src);
221  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
222  readInput+=toRead;
223  }
224  fclose(src);
225  //free output buffer if micromerge is not done by the module
226  if (edm::Service<evf::EvFDaqDirector>()->microMergeDisabled()) {
227  delete [] outBuf_;
228  outBuf_=nullptr;
229  }
230  uint32_t adler32c = (adlerb << 16) | adlera;
231  if (adler32c != c_->get_adler32_ini()) {
232  throw cms::Exception("RecoEventOutputModuleForFU") << "Checksum mismatch of ini file -: " << openIniFileName
233  << " expected:" << c_->get_adler32_ini() << " obtained:" << adler32c;
234  }
235  else {
236  edm::LogWarning("RecoEventOutputModuleForFU") << "Ini file checksum -: "<< stream_label_ << " " << adler32c;
237  boost::filesystem::rename(openIniFileName,edm::Service<evf::EvFDaqDirector>()->getInitFilePath(stream_label_));
238  }
239  }
240 
241  template<typename Consumer>
242  void
244  accepted_.value()++;
245  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
246  }
247 
248  template<typename Consumer>
249  void
253  Consumer::fillDescription(desc);
254  descriptions.add("EvFOutputModule", desc);
255  }
256 
257  template<typename Consumer>
259  {
260  //get stream transfer destination
263  }
264 
265 
266  template<typename Consumer>
268  {
269  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
272  c_->setOutputFile(openDatFilePath_.string());
273  filelist_ = openDatFilePath_.filename().string();
274  }
275 
276  template<typename Consumer>
278  {
279  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
280  long filesize=0;
281  fileAdler32_.value() = c_->get_adler32();
282  c_->closeOutputFile();
283  bool abortFlag = false;
285 
286  if (abortFlag) {
287  edm::LogInfo("RecoEventOutputModuleForFU") << "output suppressed";
288  return;
289  }
290 
291  if(processed_.value()!=0) {
292 
293  //lock
294  struct stat istat;
295  if (!edm::Service<evf::EvFDaqDirector>()->microMergeDisabled()) {
296  FILE *des = edm::Service<evf::EvFDaqDirector>()->maybeCreateAndLockFileHeadForStream(ls.luminosityBlock(),stream_label_);
297 
298  std::string deschecksum = edm::Service<evf::EvFDaqDirector>()->getMergedDatChecksumFilePath(ls.luminosityBlock(), stream_label_);
299 
300  struct stat istat;
301  FILE * cf = NULL;
302  uint32_t mergedAdler32=1;
303  //get adler32 accumulated checksum for the merged file
304  if (!stat(deschecksum.c_str(), &istat)) {
305  if (istat.st_size) {
306  cf = fopen(deschecksum.c_str(),"r");
307  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open checksum file -: " << deschecksum.c_str();
308  fscanf(cf,"%u",&mergedAdler32);
309  fclose(cf);
310  }
311  else edm::LogWarning("RecoEventOutputModuleForFU") << "Checksum file size is empty -: "<< deschecksum.c_str();
312  }
313 
314  FILE *src = fopen(openDatFilePath_.string().c_str(),"r");
315 
316  stat(openDatFilePath_.string().c_str(), &istat);
317  off_t readInput=0;
318  uint32_t adlera=1;
319  uint32_t adlerb=0;
320  while (readInput<istat.st_size) {
321  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
322  fread(outBuf_,toRead,1,src);
323  fwrite(outBuf_,toRead,1,des);
324  if (readAdler32Check_)
325  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
326  readInput+=toRead;
327  filesize+=toRead;
328  }
329 
330  //write new string representation of the checksum value
331  cf = fopen(deschecksum.c_str(),"w");
332  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
333 
334  //write adler32 combine to checksum file
335  mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);
336 
337  fprintf(cf,"%u",mergedAdler32);
338  fclose(cf);
339 
340  edm::Service<evf::EvFDaqDirector>()->unlockAndCloseMergeStream();
341  fclose(src);
342 
343  if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {
344 
345  throw cms::Exception("RecoEventOutputModuleForFU") << "Adler32 checksum mismatch after reading file -: "
346  << openDatFilePath_.string() <<" in LS " << ls.luminosityBlock() << std::endl;
347  }
348  }
349  else { //no micromerge by HLT
350  stat(openDatFilePath_.string().c_str(), &istat);
351  filesize = istat.st_size;
352  boost::filesystem::rename(openDatFilePath_.string().c_str(), edm::Service<evf::EvFDaqDirector>()->getDatFilePath(ls.luminosityBlock(),stream_label_));
353  }
354  } else {
355  //return if not in empty lumisection mode
356  if (!edm::Service<evf::EvFDaqDirector>()->emptyLumisectionMode()) {
357  remove(openDatFilePath_.string().c_str());
358  return;
359  }
360  filelist_ = "";
361  fileAdler32_.value()=-1;
362  }
363 
364  //remove file
365  remove(openDatFilePath_.string().c_str());
366  filesize_=filesize;
367 
368  jsonMonitor_->snap(ls.luminosityBlock());
369  const std::string outputJsonNameStream =
370  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(),stream_label_);
371  jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.luminosityBlock());
372 
373  // reset monitoring params
374  accepted_.value() = 0;
375  filelist_ = "";
376  }
377 
378 } // end of namespace-edm
379 
380 #endif
#define LogDebug(id)
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
static void fillDescription(ParameterSetDescription &desc)
virtual void endLuminosityBlock(edm::LuminosityBlockForOutput const &) override
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
boost::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
static bool serialize(JsonSerializable *pObj, std::string &output)
#define NULL
Definition: scimark2.h:8
virtual void doOutputEvent(EventMsgBuilder const &msg) override
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
virtual void setName(std::string name)
boost::filesystem::path openDatChecksumFilePath_
virtual void doOutputHeader(InitMsgBuilder const &init_message) override
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
StreamerOutputModuleBase(ParameterSet const &ps)
def ls(path, rec=False)
Definition: eostools.py:348
jsoncollector::DataPointDefinition outJsonDef_
void add(std::string const &label, ParameterSetDescription const &psetDescription)
HLT enums.
void setDefaultGroup(std::string const &group)
virtual void beginLuminosityBlock(edm::LuminosityBlockForOutput const &) override