CMS 3D CMS Logo

RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
7 
8 #include <sstream>
9 #include <iomanip>
10 #include <boost/filesystem.hpp>
11 #include <boost/algorithm/string.hpp>
12 #include <zlib.h>
13 
20 
21 
22 namespace evf {
23  template<typename Consumer>
25 
34  public:
35  explicit RecoEventOutputModuleForFU(edm::ParameterSet const& ps);
37  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
38 
39  private:
40  void initRun();
41  virtual void start() override;
42  virtual void stop() override;
43  virtual void doOutputHeader(InitMsgBuilder const& init_message) override;
44  virtual void doOutputEvent(EventMsgBuilder const& msg) override;
45  //virtual void beginRun(edm::RunForOutput const&);
46  virtual void beginJob() override;
47  virtual void beginLuminosityBlock(edm::LuminosityBlockForOutput const&) override;
48  virtual void endLuminosityBlock(edm::LuminosityBlockForOutput const&) override;
49 
50  private:
51  std::auto_ptr<Consumer> c_;
66  boost::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
69  unsigned char* outBuf_=nullptr;
70  bool readAdler32Check_=false;
71 
72 
73  }; //end-of-class-def
74 
75  template<typename Consumer>
79  c_(new Consumer(ps)),
80  stream_label_(ps.getParameter<std::string>("@module_label")),
81  processed_(0),
82  accepted_(0),
83  errorEvents_(0),
84  retCodeMask_(0),
85  filelist_(),
86  filesize_(0),
87  inputFiles_(),
88  fileAdler32_(1),
90  mergeType_(),
91  hltErrorEvents_(0),
92  outBuf_(new unsigned char[1024*1024])
93  {
94  //replace hltOutoputA with stream if the HLT menu uses this convention
95  std::string testPrefix="hltOutput";
96  if (stream_label_.find(testPrefix)==0)
97  stream_label_=std::string("stream")+stream_label_.substr(testPrefix.size());
98 
99  if (stream_label_.find("_")!=std::string::npos) {
100  throw cms::Exception("RecoEventOutputModuleForFU")
101  << "Underscore character is reserved can not be used for stream names in FFF, but was detected in stream name -: " << stream_label_;
102  }
103 
104  std::string stream_label_lo = stream_label_;
105  boost::algorithm::to_lower(stream_label_lo);
106  auto streampos = stream_label_lo.rfind("stream");
107  if (streampos !=0 && streampos!=std::string::npos)
108  throw cms::Exception("RecoEventOutputModuleForFU")
109  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for names in FFF based HLT, but was detected in stream name";
110 
112  }
113 
114  template<typename Consumer>
116  {
117  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
118  readAdler32Check_ = edm::Service<evf::EvFDaqDirector>()->outputAdler32Recheck();
119  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
120  // create open dir if not already there
121  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
122 
123  processed_.setName("Processed");
124  accepted_.setName("Accepted");
125  errorEvents_.setName("ErrorEvents");
126  retCodeMask_.setName("ReturnCodeMask");
127  filelist_.setName("Filelist");
128  filesize_.setName("Filesize");
129  inputFiles_.setName("InputFiles");
130  fileAdler32_.setName("FileAdler32");
131  transferDestination_.setName("TransferDestination");
132  mergeType_.setName("MergeType");
133  hltErrorEvents_.setName("HLTErrorEvents");
134 
147  std::stringstream tmpss,ss;
148  tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
149  ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
150  std::string outTmpJsonDefName = tmpss.str();
151  std::string outJsonDefName = ss.str();
152 
153  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
154  struct stat fstat;
155  if (stat (outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
156  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
159  jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
160  boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
161  }
162  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
163 
165  jsonMonitor_->setDefPath(outJsonDefName);
166  jsonMonitor_->registerGlobalMonitorable(&processed_,false);
167  jsonMonitor_->registerGlobalMonitorable(&accepted_,false);
168  jsonMonitor_->registerGlobalMonitorable(&errorEvents_,false);
169  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,false);
170  jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
171  jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
172  jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
173  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,false);
174  jsonMonitor_->registerGlobalMonitorable(&transferDestination_,false);
175  jsonMonitor_->registerGlobalMonitorable(&mergeType_,false);
176  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_,false);
177  jsonMonitor_->commit(nullptr);
178 
179  }
180 
181  template<typename Consumer>
183 
184  template<typename Consumer>
185  void
187  {
188  initRun();
189  const std::string openInitFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
190  edm::LogInfo("RecoEventOutputModuleForFU") << "start() method, initializing streams. init stream -: "
191  << openInitFileName;
192  c_->setInitMessageFile(openInitFileName);
193  c_->start();
194 
195  }
196 
197  template<typename Consumer>
198  void
200  {
201  c_->stop();
202  }
203 
204  template<typename Consumer>
205  void
207  {
208  c_->doOutputHeader(init_message);
209 
210  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
211  struct stat istat;
212  stat(openIniFileName.c_str(), &istat);
213  //read back file to check integrity of what was written
214  off_t readInput=0;
215  uint32_t adlera=1,adlerb=0;
216  FILE *src = fopen(openIniFileName.c_str(),"r");
217  while (readInput<istat.st_size)
218  {
219  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
220  fread(outBuf_,toRead,1,src);
221  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
222  readInput+=toRead;
223  }
224  fclose(src);
225  //free output buffer if micromerge is not done by the module
226  if (edm::Service<evf::EvFDaqDirector>()->microMergeDisabled()) {
227  delete [] outBuf_;
228  outBuf_=nullptr;
229  }
230  uint32_t adler32c = (adlerb << 16) | adlera;
231  if (adler32c != c_->get_adler32_ini()) {
232  throw cms::Exception("RecoEventOutputModuleForFU") << "Checksum mismatch of ini file -: " << openIniFileName
233  << " expected:" << c_->get_adler32_ini() << " obtained:" << adler32c;
234  }
235  else {
236  edm::LogWarning("RecoEventOutputModuleForFU") << "Ini file checksum -: "<< stream_label_ << " " << adler32c;
237  boost::filesystem::rename(openIniFileName,edm::Service<evf::EvFDaqDirector>()->getInitFilePath(stream_label_));
238  }
239  }
240 
241  template<typename Consumer>
242  void
244  accepted_.value()++;
245  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
246  }
247 
248  template<typename Consumer>
249  void
253  Consumer::fillDescription(desc);
254  // Use addDefault here instead of add for 4 reasons:
255  // 1. Because EvFOutputModule_cfi.py is explicitly defined it does not need to be autogenerated
256  // The explicitly defined version overrides the autogenerated version of the cfi file.
257  // 2. That cfi file is not used anywhere in the release anyway
258  // 3. There are two plugin names used for the same template instantiation of this
259  // type, "ShmStreamConsumer" and "EvFOutputModule" and this causes name conflict
260  // problems for the cfi generation code which are avoided with addDefault.
261  // 4. At the present time, there is only one type of Consumer used to instantiate
262  // instances of this template, but if there were more than one type then this function
263  // would need to be specialized for each type unless the descriptions were the same
264  // and addDefault was used.
265  descriptions.addDefault(desc);
266  }
267 
268  template<typename Consumer>
270  {
271  //get stream transfer destination
274  }
275 
276 
277  template<typename Consumer>
279  {
280  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
283  c_->setOutputFile(openDatFilePath_.string());
284  filelist_ = openDatFilePath_.filename().string();
285  }
286 
287  template<typename Consumer>
289  {
290  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
291  long filesize=0;
292  fileAdler32_.value() = c_->get_adler32();
293  c_->closeOutputFile();
294  bool abortFlag = false;
296 
297  if (abortFlag) {
298  edm::LogInfo("RecoEventOutputModuleForFU") << "output suppressed";
299  return;
300  }
301 
302  if(processed_.value()!=0) {
303 
304  //lock
305  struct stat istat;
306  if (!edm::Service<evf::EvFDaqDirector>()->microMergeDisabled()) {
307  FILE *des = edm::Service<evf::EvFDaqDirector>()->maybeCreateAndLockFileHeadForStream(ls.luminosityBlock(),stream_label_);
308 
309  std::string deschecksum = edm::Service<evf::EvFDaqDirector>()->getMergedDatChecksumFilePath(ls.luminosityBlock(), stream_label_);
310 
311  struct stat istat;
312  FILE * cf = NULL;
313  uint32_t mergedAdler32=1;
314  //get adler32 accumulated checksum for the merged file
315  if (!stat(deschecksum.c_str(), &istat)) {
316  if (istat.st_size) {
317  cf = fopen(deschecksum.c_str(),"r");
318  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open checksum file -: " << deschecksum.c_str();
319  fscanf(cf,"%u",&mergedAdler32);
320  fclose(cf);
321  }
322  else edm::LogWarning("RecoEventOutputModuleForFU") << "Checksum file size is empty -: "<< deschecksum.c_str();
323  }
324 
325  FILE *src = fopen(openDatFilePath_.string().c_str(),"r");
326 
327  stat(openDatFilePath_.string().c_str(), &istat);
328  off_t readInput=0;
329  uint32_t adlera=1;
330  uint32_t adlerb=0;
331  while (readInput<istat.st_size) {
332  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
333  fread(outBuf_,toRead,1,src);
334  fwrite(outBuf_,toRead,1,des);
335  if (readAdler32Check_)
336  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
337  readInput+=toRead;
338  filesize+=toRead;
339  }
340 
341  //write new string representation of the checksum value
342  cf = fopen(deschecksum.c_str(),"w");
343  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
344 
345  //write adler32 combine to checksum file
346  mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);
347 
348  fprintf(cf,"%u",mergedAdler32);
349  fclose(cf);
350 
351  edm::Service<evf::EvFDaqDirector>()->unlockAndCloseMergeStream();
352  fclose(src);
353 
354  if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {
355 
356  throw cms::Exception("RecoEventOutputModuleForFU") << "Adler32 checksum mismatch after reading file -: "
357  << openDatFilePath_.string() <<" in LS " << ls.luminosityBlock() << std::endl;
358  }
359  }
360  else { //no micromerge by HLT
361  stat(openDatFilePath_.string().c_str(), &istat);
362  filesize = istat.st_size;
363  boost::filesystem::rename(openDatFilePath_.string().c_str(), edm::Service<evf::EvFDaqDirector>()->getDatFilePath(ls.luminosityBlock(),stream_label_));
364  }
365  } else {
366  //return if not in empty lumisection mode
367  if (!edm::Service<evf::EvFDaqDirector>()->emptyLumisectionMode()) {
368  remove(openDatFilePath_.string().c_str());
369  return;
370  }
371  filelist_ = "";
372  fileAdler32_.value()=-1;
373  }
374 
375  //remove file
376  remove(openDatFilePath_.string().c_str());
377  filesize_=filesize;
378 
379  jsonMonitor_->snap(ls.luminosityBlock());
380  const std::string outputJsonNameStream =
381  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(),stream_label_);
382  jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.luminosityBlock());
383 
384  // reset monitoring params
385  accepted_.value() = 0;
386  filelist_ = "";
387  }
388 
389 } // end of namespace-edm
390 
391 #endif
#define LogDebug(id)
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
static void fillDescription(ParameterSetDescription &desc)
virtual void endLuminosityBlock(edm::LuminosityBlockForOutput const &) override
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
boost::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
static bool serialize(JsonSerializable *pObj, std::string &output)
#define NULL
Definition: scimark2.h:8
virtual void doOutputEvent(EventMsgBuilder const &msg) override
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
void addDefault(ParameterSetDescription const &psetDescription)
virtual void setName(std::string name)
boost::filesystem::path openDatChecksumFilePath_
virtual void doOutputHeader(InitMsgBuilder const &init_message) override
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
StreamerOutputModuleBase(ParameterSet const &ps)
def ls(path, rec=False)
Definition: eostools.py:348
jsoncollector::DataPointDefinition outJsonDef_
HLT enums.
void setDefaultGroup(std::string const &group)
virtual void beginLuminosityBlock(edm::LuminosityBlockForOutput const &) override