CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
8 
9 #include <sstream>
10 #include <iomanip>
11 #include <boost/filesystem.hpp>
12 #include <boost/algorithm/string.hpp>
13 #include <zlib.h>
14 
21 
22 
23 namespace evf {
24  template<typename Consumer>
26 
35  public:
36  explicit RecoEventOutputModuleForFU(edm::ParameterSet const& ps);
38  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
39 
40  private:
41  void initRun();
42  virtual void start() override;
43  virtual void stop() override;
44  virtual void doOutputHeader(InitMsgBuilder const& init_message) override;
45  virtual void doOutputEvent(EventMsgBuilder const& msg) override;
46  //virtual void beginRun(edm::RunPrincipal const&, edm::ModuleCallingContext const*);
47  virtual void beginJob() override;
50 
51  private:
52  std::auto_ptr<Consumer> c_;
67  boost::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
70  unsigned char* outBuf_=nullptr;
71  bool readAdler32Check_=false;
72 
73 
74  }; //end-of-class-def
75 
76  template<typename Consumer>
78  edm::one::OutputModuleBase::OutputModuleBase(ps),
79  edm::StreamerOutputModuleBase(ps),
80  c_(new Consumer(ps)),
81  stream_label_(ps.getParameter<std::string>("@module_label")),
82  processed_(0),
83  accepted_(0),
84  errorEvents_(0),
85  retCodeMask_(0),
86  filelist_(),
87  filesize_(0),
88  inputFiles_(),
89  fileAdler32_(1),
90  transferDestination_(),
91  mergeType_(),
92  hltErrorEvents_(0),
93  outBuf_(new unsigned char[1024*1024])
94  {
95  //replace hltOutoputA with stream if the HLT menu uses this convention
96  std::string testPrefix="hltOutput";
97  if (stream_label_.find(testPrefix)==0)
98  stream_label_=std::string("stream")+stream_label_.substr(testPrefix.size());
99 
100  if (stream_label_.find("_")!=std::string::npos) {
101  throw cms::Exception("RecoEventOutputModuleForFU")
102  << "Underscore character is reserved can not be used for stream names in FFF, but was detected in stream name -: " << stream_label_;
103  }
104 
105  std::string stream_label_lo = stream_label_;
106  boost::algorithm::to_lower(stream_label_lo);
107  auto streampos = stream_label_lo.rfind("stream");
108  if (streampos !=0 && streampos!=std::string::npos)
109  throw cms::Exception("RecoEventOutputModuleForFU")
110  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for names in FFF based HLT, but was detected in stream name";
111 
113  }
114 
115  template<typename Consumer>
117  {
118  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
119  readAdler32Check_ = edm::Service<evf::EvFDaqDirector>()->outputAdler32Recheck();
120  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
121  // create open dir if not already there
122  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
123 
124  processed_.setName("Processed");
125  accepted_.setName("Accepted");
126  errorEvents_.setName("ErrorEvents");
127  retCodeMask_.setName("ReturnCodeMask");
128  filelist_.setName("Filelist");
129  filesize_.setName("Filesize");
130  inputFiles_.setName("InputFiles");
131  fileAdler32_.setName("FileAdler32");
132  transferDestination_.setName("TransferDestination");
133  mergeType_.setName("MergeType");
134  hltErrorEvents_.setName("HLTErrorEvents");
135 
136  outJsonDef_.setDefaultGroup("data");
137  outJsonDef_.addLegendItem("Processed","integer",jsoncollector::DataPointDefinition::SUM);
138  outJsonDef_.addLegendItem("Accepted","integer",jsoncollector::DataPointDefinition::SUM);
139  outJsonDef_.addLegendItem("ErrorEvents","integer",jsoncollector::DataPointDefinition::SUM);
140  outJsonDef_.addLegendItem("ReturnCodeMask","integer",jsoncollector::DataPointDefinition::BINARYOR);
141  outJsonDef_.addLegendItem("Filelist","string",jsoncollector::DataPointDefinition::MERGE);
142  outJsonDef_.addLegendItem("Filesize","integer",jsoncollector::DataPointDefinition::SUM);
143  outJsonDef_.addLegendItem("InputFiles","string",jsoncollector::DataPointDefinition::CAT);
144  outJsonDef_.addLegendItem("FileAdler32","integer",jsoncollector::DataPointDefinition::ADLER32);
145  outJsonDef_.addLegendItem("TransferDestination","string",jsoncollector::DataPointDefinition::SAME);
146  outJsonDef_.addLegendItem("MergeType","string",jsoncollector::DataPointDefinition::SAME);
147  outJsonDef_.addLegendItem("HLTErrorEvents","integer",jsoncollector::DataPointDefinition::SUM);
148  std::stringstream tmpss,ss;
149  tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
150  ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
151  std::string outTmpJsonDefName = tmpss.str();
152  std::string outJsonDefName = ss.str();
153 
154  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
155  struct stat fstat;
156  if (stat (outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
157  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
159  jsoncollector::JSONSerializer::serialize(&outJsonDef_,content);
160  jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
161  boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
162  }
163  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
164 
165  jsonMonitor_.reset(new jsoncollector::FastMonitor(&outJsonDef_,true));
166  jsonMonitor_->setDefPath(outJsonDefName);
167  jsonMonitor_->registerGlobalMonitorable(&processed_,false);
168  jsonMonitor_->registerGlobalMonitorable(&accepted_,false);
169  jsonMonitor_->registerGlobalMonitorable(&errorEvents_,false);
170  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,false);
171  jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
172  jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
173  jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
174  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,false);
175  jsonMonitor_->registerGlobalMonitorable(&transferDestination_,false);
176  jsonMonitor_->registerGlobalMonitorable(&mergeType_,false);
177  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_,false);
178  jsonMonitor_->commit(nullptr);
179 
180  }
181 
182  template<typename Consumer>
184 
185  template<typename Consumer>
186  void
188  {
189  initRun();
190  const std::string openInitFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
191  edm::LogInfo("RecoEventOutputModuleForFU") << "start() method, initializing streams. init stream -: "
192  << openInitFileName;
193  c_->setInitMessageFile(openInitFileName);
194  c_->start();
195 
196  }
197 
198  template<typename Consumer>
199  void
201  {
202  c_->stop();
203  }
204 
205  template<typename Consumer>
206  void
208  {
209  c_->doOutputHeader(init_message);
210 
211  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
212  struct stat istat;
213  stat(openIniFileName.c_str(), &istat);
214  //read back file to check integrity of what was written
215  off_t readInput=0;
216  uint32_t adlera=1,adlerb=0;
217  FILE *src = fopen(openIniFileName.c_str(),"r");
218  while (readInput<istat.st_size)
219  {
220  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
221  fread(outBuf_,toRead,1,src);
222  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
223  readInput+=toRead;
224  }
225  fclose(src);
226  //free output buffer if micromerge is not done by the module
227  if (edm::Service<evf::EvFDaqDirector>()->microMergeDisabled()) {
228  delete [] outBuf_;
229  outBuf_=nullptr;
230  }
231  uint32_t adler32c = (adlerb << 16) | adlera;
232  if (adler32c != c_->get_adler32_ini()) {
233  throw cms::Exception("RecoEventOutputModuleForFU") << "Checksum mismatch of ini file -: " << openIniFileName
234  << " expected:" << c_->get_adler32_ini() << " obtained:" << adler32c;
235  }
236  else {
237  edm::LogWarning("RecoEventOutputModuleForFU") << "Ini file checksum -: "<< stream_label_ << " " << adler32c;
238  boost::filesystem::rename(openIniFileName,edm::Service<evf::EvFDaqDirector>()->getInitFilePath(stream_label_));
239  }
240  }
241 
242  template<typename Consumer>
243  void
245  accepted_.value()++;
246  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
247  }
248 
249  template<typename Consumer>
250  void
254  Consumer::fillDescription(desc);
255  descriptions.add("EvFOutputModule", desc);
256  }
257 
258  template<typename Consumer>
260  {
261  //get stream transfer destination
262  transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(stream_label_);
263  mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(stream_label_,evf::MergeTypeDAT);
264  }
265 
266 
267  template<typename Consumer>
269  {
270  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
271  openDatFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
272  openDatChecksumFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
273  c_->setOutputFile(openDatFilePath_.string());
274  filelist_ = openDatFilePath_.filename().string();
275  }
276 
277  template<typename Consumer>
279  {
280  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
281  long filesize=0;
282  fileAdler32_.value() = c_->get_adler32();
283  c_->closeOutputFile();
284  bool abortFlag = false;
285  processed_.value() = fms_->getEventsProcessedForLumi(ls.luminosityBlock(),&abortFlag);
286 
287  if (abortFlag) {
288  edm::LogInfo("RecoEventOutputModuleForFU") << "output suppressed";
289  return;
290  }
291 
292  if(processed_.value()!=0) {
293 
294  //lock
295  struct stat istat;
296  if (!edm::Service<evf::EvFDaqDirector>()->microMergeDisabled()) {
297  FILE *des = edm::Service<evf::EvFDaqDirector>()->maybeCreateAndLockFileHeadForStream(ls.luminosityBlock(),stream_label_);
298 
299  std::string deschecksum = edm::Service<evf::EvFDaqDirector>()->getMergedDatChecksumFilePath(ls.luminosityBlock(), stream_label_);
300 
301  struct stat istat;
302  FILE * cf = NULL;
303  uint32_t mergedAdler32=1;
304  //get adler32 accumulated checksum for the merged file
305  if (!stat(deschecksum.c_str(), &istat)) {
306  if (istat.st_size) {
307  cf = fopen(deschecksum.c_str(),"r");
308  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open checksum file -: " << deschecksum.c_str();
309  fscanf(cf,"%u",&mergedAdler32);
310  fclose(cf);
311  }
312  else edm::LogWarning("RecoEventOutputModuleForFU") << "Checksum file size is empty -: "<< deschecksum.c_str();
313  }
314 
315  FILE *src = fopen(openDatFilePath_.string().c_str(),"r");
316 
317  stat(openDatFilePath_.string().c_str(), &istat);
318  off_t readInput=0;
319  uint32_t adlera=1;
320  uint32_t adlerb=0;
321  while (readInput<istat.st_size) {
322  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
323  fread(outBuf_,toRead,1,src);
324  fwrite(outBuf_,toRead,1,des);
325  if (readAdler32Check_)
326  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
327  readInput+=toRead;
328  filesize+=toRead;
329  }
330 
331  //write new string representation of the checksum value
332  cf = fopen(deschecksum.c_str(),"w");
333  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
334 
335  //write adler32 combine to checksum file
336  mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);
337 
338  fprintf(cf,"%u",mergedAdler32);
339  fclose(cf);
340 
341  edm::Service<evf::EvFDaqDirector>()->unlockAndCloseMergeStream();
342  fclose(src);
343 
344  if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {
345 
346  throw cms::Exception("RecoEventOutputModuleForFU") << "Adler32 checksum mismatch after reading file -: "
347  << openDatFilePath_.string() <<" in LS " << ls.luminosityBlock() << std::endl;
348  }
349  }
350  else { //no micromerge by HLT
351  stat(openDatFilePath_.string().c_str(), &istat);
352  filesize = istat.st_size;
353  boost::filesystem::rename(openDatFilePath_.string().c_str(), edm::Service<evf::EvFDaqDirector>()->getDatFilePath(ls.luminosityBlock(),stream_label_));
354  }
355  } else {
356  //return if not in empty lumisection mode
357  if (!edm::Service<evf::EvFDaqDirector>()->emptyLumisectionMode()) {
358  remove(openDatFilePath_.string().c_str());
359  return;
360  }
361  filelist_ = "";
362  fileAdler32_.value()=-1;
363  }
364 
365  //remove file
366  remove(openDatFilePath_.string().c_str());
367  filesize_=filesize;
368 
369  jsonMonitor_->snap(ls.luminosityBlock());
370  const std::string outputJsonNameStream =
371  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(),stream_label_);
372  jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.luminosityBlock());
373 
374  // reset monitoring params
375  accepted_.value() = 0;
376  filelist_ = "";
377  }
378 
379 } // end of namespace-edm
380 
381 #endif
#define LogDebug(id)
static void fillDescription(ParameterSetDescription &desc)
boost::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
static bool serialize(JsonSerializable *pObj, std::string &output)
def ls
Definition: eostools.py:348
#define NULL
Definition: scimark2.h:8
virtual void doOutputEvent(EventMsgBuilder const &msg) override
virtual void endLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *) override
virtual void beginLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *) override
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
boost::filesystem::path openDatChecksumFilePath_
virtual void doOutputHeader(InitMsgBuilder const &init_message) override
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
jsoncollector::DataPointDefinition outJsonDef_
void add(std::string const &label, ParameterSetDescription const &psetDescription)