CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
8 
9 #include <sstream>
10 #include <iomanip>
11 #include <boost/filesystem.hpp>
12 #include <boost/algorithm/string.hpp>
13 #include <zlib.h>
14 
21 
22 
23 namespace evf {
24  template<typename Consumer>
26 
35  public:
36  explicit RecoEventOutputModuleForFU(edm::ParameterSet const& ps);
38  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
39 
40  private:
41  void initRun();
42  virtual void start() override;
43  virtual void stop() override;
44  virtual void doOutputHeader(InitMsgBuilder const& init_message) override;
45  virtual void doOutputEvent(EventMsgBuilder const& msg) override;
46  //virtual void beginRun(edm::RunPrincipal const&, edm::ModuleCallingContext const*);
47  virtual void beginJob() override;
50 
51  private:
52  std::auto_ptr<Consumer> c_;
66  boost::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
69  unsigned char* outBuf_=nullptr;
70  bool readAdler32Check_=false;
71 
72 
73  }; //end-of-class-def
74 
75  template<typename Consumer>
77  edm::one::OutputModuleBase::OutputModuleBase(ps),
78  edm::StreamerOutputModuleBase(ps),
79  c_(new Consumer(ps)),
80  stream_label_(ps.getParameter<std::string>("@module_label")),
81  processed_(0),
82  accepted_(0),
83  errorEvents_(0),
84  retCodeMask_(0),
85  filelist_(),
86  filesize_(0),
87  inputFiles_(),
88  fileAdler32_(1),
89  transferDestination_(),
90  hltErrorEvents_(0),
91  outBuf_(new unsigned char[1024*1024])
92  {
93  //replace hltOutoputA with stream if the HLT menu uses this convention
94  std::string testPrefix="hltOutput";
95  if (stream_label_.find(testPrefix)==0)
96  stream_label_=std::string("stream")+stream_label_.substr(testPrefix.size());
97 
98  if (stream_label_.find("_")!=std::string::npos) {
99  throw cms::Exception("RecoEventOutputModuleForFU")
100  << "Underscore character is reserved can not be used for stream names in FFF, but was detected in stream name -: " << stream_label_;
101  }
102 
103  std::string stream_label_lo = stream_label_;
104  boost::algorithm::to_lower(stream_label_lo);
105  auto streampos = stream_label_lo.rfind("stream");
106  if (streampos !=0 && streampos!=std::string::npos)
107  throw cms::Exception("RecoEventOutputModuleForFU")
108  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for names in FFF based HLT, but was detected in stream name";
109 
111  }
112 
113  template<typename Consumer>
115  {
116  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
117  readAdler32Check_ = edm::Service<evf::EvFDaqDirector>()->outputAdler32Recheck();
118  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
119  // create open dir if not already there
120  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
121 
122  processed_.setName("Processed");
123  accepted_.setName("Accepted");
124  errorEvents_.setName("ErrorEvents");
125  retCodeMask_.setName("ReturnCodeMask");
126  filelist_.setName("Filelist");
127  filesize_.setName("Filesize");
128  inputFiles_.setName("InputFiles");
129  fileAdler32_.setName("FileAdler32");
130  transferDestination_.setName("TransferDestination");
131  hltErrorEvents_.setName("HLTErrorEvents");
132 
133  outJsonDef_.setDefaultGroup("data");
134  outJsonDef_.addLegendItem("Processed","integer",jsoncollector::DataPointDefinition::SUM);
135  outJsonDef_.addLegendItem("Accepted","integer",jsoncollector::DataPointDefinition::SUM);
136  outJsonDef_.addLegendItem("ErrorEvents","integer",jsoncollector::DataPointDefinition::SUM);
137  outJsonDef_.addLegendItem("ReturnCodeMask","integer",jsoncollector::DataPointDefinition::BINARYOR);
138  outJsonDef_.addLegendItem("Filelist","string",jsoncollector::DataPointDefinition::MERGE);
139  outJsonDef_.addLegendItem("Filesize","integer",jsoncollector::DataPointDefinition::SUM);
140  outJsonDef_.addLegendItem("InputFiles","string",jsoncollector::DataPointDefinition::CAT);
141  outJsonDef_.addLegendItem("FileAdler32","integer",jsoncollector::DataPointDefinition::ADLER32);
142  outJsonDef_.addLegendItem("TransferDestination","string",jsoncollector::DataPointDefinition::SAME);
143  outJsonDef_.addLegendItem("HLTErrorEvents","integer",jsoncollector::DataPointDefinition::SUM);
144  std::stringstream tmpss,ss;
145  tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
146  ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
147  std::string outTmpJsonDefName = tmpss.str();
148  std::string outJsonDefName = ss.str();
149 
150  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
151  struct stat fstat;
152  if (stat (outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
153  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
155  jsoncollector::JSONSerializer::serialize(&outJsonDef_,content);
156  jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
157  boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
158  }
159  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
160 
161  jsonMonitor_.reset(new jsoncollector::FastMonitor(&outJsonDef_,true));
162  jsonMonitor_->setDefPath(outJsonDefName);
163  jsonMonitor_->registerGlobalMonitorable(&processed_,false);
164  jsonMonitor_->registerGlobalMonitorable(&accepted_,false);
165  jsonMonitor_->registerGlobalMonitorable(&errorEvents_,false);
166  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,false);
167  jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
168  jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
169  jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
170  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,false);
171  jsonMonitor_->registerGlobalMonitorable(&transferDestination_,false);
172  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_,false);
173  jsonMonitor_->commit(nullptr);
174 
175  }
176 
177  template<typename Consumer>
179 
180  template<typename Consumer>
181  void
183  {
184  initRun();
185  const std::string openInitFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
186  edm::LogInfo("RecoEventOutputModuleForFU") << "start() method, initializing streams. init stream -: "
187  << openInitFileName;
188  c_->setInitMessageFile(openInitFileName);
189  c_->start();
190 
191  }
192 
193  template<typename Consumer>
194  void
196  {
197  c_->stop();
198  }
199 
200  template<typename Consumer>
201  void
203  {
204  c_->doOutputHeader(init_message);
205 
206  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
207  struct stat istat;
208  stat(openIniFileName.c_str(), &istat);
209  //read back file to check integrity of what was written
210  off_t readInput=0;
211  uint32_t adlera=1,adlerb=0;
212  FILE *src = fopen(openIniFileName.c_str(),"r");
213  while (readInput<istat.st_size)
214  {
215  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
216  fread(outBuf_,toRead,1,src);
217  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
218  readInput+=toRead;
219  }
220  fclose(src);
221  //free output buffer if micromerge is not done by the module
222  if (edm::Service<evf::EvFDaqDirector>()->microMergeDisabled()) {
223  delete [] outBuf_;
224  outBuf_=nullptr;
225  }
226  uint32_t adler32c = (adlerb << 16) | adlera;
227  if (adler32c != c_->get_adler32_ini()) {
228  throw cms::Exception("RecoEventOutputModuleForFU") << "Checksum mismatch of ini file -: " << openIniFileName
229  << " expected:" << c_->get_adler32_ini() << " obtained:" << adler32c;
230  }
231  else {
232  edm::LogWarning("RecoEventOutputModuleForFU") << "Ini file checksum -: "<< stream_label_ << " " << adler32c;
233  boost::filesystem::rename(openIniFileName,edm::Service<evf::EvFDaqDirector>()->getInitFilePath(stream_label_));
234  }
235  }
236 
237  template<typename Consumer>
238  void
240  accepted_.value()++;
241  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
242  }
243 
244  template<typename Consumer>
245  void
249  Consumer::fillDescription(desc);
250  descriptions.add("EvFOutputModule", desc);
251  }
252 
253  template<typename Consumer>
255  {
256  //get stream transfer destination
257  transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(stream_label_);
258  }
259 
260 
261  template<typename Consumer>
263  {
264  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
265  openDatFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
266  openDatChecksumFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
267  c_->setOutputFile(openDatFilePath_.string());
268  filelist_ = openDatFilePath_.filename().string();
269  }
270 
271  template<typename Consumer>
273  {
274  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
275  long filesize=0;
276  fileAdler32_.value() = c_->get_adler32();
277  c_->closeOutputFile();
278  bool abortFlag = false;
279  processed_.value() = fms_->getEventsProcessedForLumi(ls.luminosityBlock(),&abortFlag);
280 
281  if (abortFlag) {
282  edm::LogInfo("RecoEventOutputModuleForFU") << "output suppressed";
283  return;
284  }
285 
286  if(processed_.value()!=0) {
287 
288  //lock
289  struct stat istat;
290  if (!edm::Service<evf::EvFDaqDirector>()->microMergeDisabled()) {
291  FILE *des = edm::Service<evf::EvFDaqDirector>()->maybeCreateAndLockFileHeadForStream(ls.luminosityBlock(),stream_label_);
292 
293  std::string deschecksum = edm::Service<evf::EvFDaqDirector>()->getMergedDatChecksumFilePath(ls.luminosityBlock(), stream_label_);
294 
295  struct stat istat;
296  FILE * cf = NULL;
297  uint32_t mergedAdler32=1;
298  //get adler32 accumulated checksum for the merged file
299  if (!stat(deschecksum.c_str(), &istat)) {
300  if (istat.st_size) {
301  cf = fopen(deschecksum.c_str(),"r");
302  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open checksum file -: " << deschecksum.c_str();
303  fscanf(cf,"%u",&mergedAdler32);
304  fclose(cf);
305  }
306  else edm::LogWarning("RecoEventOutputModuleForFU") << "Checksum file size is empty -: "<< deschecksum.c_str();
307  }
308 
309  FILE *src = fopen(openDatFilePath_.string().c_str(),"r");
310 
311  stat(openDatFilePath_.string().c_str(), &istat);
312  off_t readInput=0;
313  uint32_t adlera=1;
314  uint32_t adlerb=0;
315  while (readInput<istat.st_size) {
316  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
317  fread(outBuf_,toRead,1,src);
318  fwrite(outBuf_,toRead,1,des);
319  if (readAdler32Check_)
320  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
321  readInput+=toRead;
322  filesize+=toRead;
323  }
324 
325  //write new string representation of the checksum value
326  cf = fopen(deschecksum.c_str(),"w");
327  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
328 
329  //write adler32 combine to checksum file
330  mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);
331 
332  fprintf(cf,"%u",mergedAdler32);
333  fclose(cf);
334 
335  edm::Service<evf::EvFDaqDirector>()->unlockAndCloseMergeStream();
336  fclose(src);
337 
338  if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {
339 
340  throw cms::Exception("RecoEventOutputModuleForFU") << "Adler32 checksum mismatch after reading file -: "
341  << openDatFilePath_.string() <<" in LS " << ls.luminosityBlock() << std::endl;
342  }
343  }
344  else { //no micromerge by HLT
345  stat(openDatFilePath_.string().c_str(), &istat);
346  filesize = istat.st_size;
347  boost::filesystem::rename(openDatFilePath_.string().c_str(), edm::Service<evf::EvFDaqDirector>()->getDatFilePath(ls.luminosityBlock(),stream_label_));
348  }
349  } else {
350  //return if not in empty lumisection mode
351  if (!edm::Service<evf::EvFDaqDirector>()->emptyLumisectionMode()) {
352  remove(openDatFilePath_.string().c_str());
353  return;
354  }
355  filelist_ = "";
356  fileAdler32_.value()=-1;
357  }
358 
359  //remove file
360  remove(openDatFilePath_.string().c_str());
361  filesize_=filesize;
362 
363  jsonMonitor_->snap(ls.luminosityBlock());
364  const std::string outputJsonNameStream =
365  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(),stream_label_);
366  jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.luminosityBlock());
367 
368  // reset monitoring params
369  accepted_.value() = 0;
370  filelist_ = "";
371  }
372 
373 } // end of namespace-edm
374 
375 #endif
#define LogDebug(id)
static void fillDescription(ParameterSetDescription &desc)
boost::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
static bool serialize(JsonSerializable *pObj, std::string &output)
def ls
Definition: eostools.py:348
#define NULL
Definition: scimark2.h:8
virtual void doOutputEvent(EventMsgBuilder const &msg) override
virtual void endLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *) override
virtual void beginLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *) override
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
boost::filesystem::path openDatChecksumFilePath_
virtual void doOutputHeader(InitMsgBuilder const &init_message) override
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
jsoncollector::DataPointDefinition outJsonDef_
void add(std::string const &label, ParameterSetDescription const &psetDescription)