CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
8 
9 #include <sstream>
10 #include <iomanip>
11 #include <boost/filesystem.hpp>
12 #include <boost/algorithm/string.hpp>
13 #include <zlib.h>
14 
21 
22 
23 namespace evf {
24  template<typename Consumer>
26 
35  public:
36  explicit RecoEventOutputModuleForFU(edm::ParameterSet const& ps);
38  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
39 
40  private:
41  virtual void start() override;
42  virtual void stop() override;
43  virtual void doOutputHeader(InitMsgBuilder const& init_message) override;
44  virtual void doOutputEvent(EventMsgBuilder const& msg) override;
45  //virtual void beginRun(edm::RunPrincipal const&, edm::ModuleCallingContext const*);
46  virtual void beginJob() override;
49 
50  private:
51  std::auto_ptr<Consumer> c_;
65  boost::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
68  unsigned char* outBuf_=0;
69  bool readAdler32Check_=false;
70 
71 
72  }; //end-of-class-def
73 
74  template<typename Consumer>
76  edm::one::OutputModuleBase::OutputModuleBase(ps),
77  edm::StreamerOutputModuleBase(ps),
78  c_(new Consumer(ps)),
79  stream_label_(ps.getParameter<std::string>("@module_label")),
80  processed_(0),
81  accepted_(0),
82  errorEvents_(0),
83  retCodeMask_(0),
84  filelist_(),
85  filesize_(0),
86  inputFiles_(),
87  fileAdler32_(1),
88  transferDestination_(),
89  hltErrorEvents_(0),
90  outBuf_(new unsigned char[1024*1024])
91  {
92  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
93  readAdler32Check_ = edm::Service<evf::EvFDaqDirector>()->outputAdler32Recheck();
94  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
95  // create open dir if not already there
96  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
97 
98  //replace hltOutoputA with stream if the HLT menu uses this convention
99  std::string testPrefix="hltOutput";
100  if (stream_label_.find(testPrefix)==0)
101  stream_label_=std::string("stream")+stream_label_.substr(testPrefix.size());
102 
103  if (stream_label_.find("_")!=std::string::npos) {
104  throw cms::Exception("RecoEventOutputModuleForFU")
105  << "Underscore character is reserved can not be used for stream names in FFF, but was detected in stream name -: " << stream_label_;
106  }
107 
108 
109  std::string stream_label_lo = stream_label_;
110  boost::algorithm::to_lower(stream_label_lo);
111  auto streampos = stream_label_lo.rfind("stream");
112  if (streampos !=0 && streampos!=std::string::npos)
113  throw cms::Exception("RecoEventOutputModuleForFU")
114  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for names in FFF based HLT, but was detected in stream name";
115 
117 
118  processed_.setName("Processed");
119  accepted_.setName("Accepted");
120  errorEvents_.setName("ErrorEvents");
121  retCodeMask_.setName("ReturnCodeMask");
122  filelist_.setName("Filelist");
123  filesize_.setName("Filesize");
124  inputFiles_.setName("InputFiles");
125  fileAdler32_.setName("FileAdler32");
126  transferDestination_.setName("TransferDestination");
127  hltErrorEvents_.setName("HLTErrorEvents");
128 
140  std::stringstream tmpss,ss;
141  tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
142  ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
143  std::string outTmpJsonDefName = tmpss.str();
144  std::string outJsonDefName = ss.str();
145 
146  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
147  struct stat fstat;
148  if (stat (outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
149  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
152  jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
153  boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
154  }
155  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
156 
158  jsonMonitor_->setDefPath(outJsonDefName);
159  jsonMonitor_->registerGlobalMonitorable(&processed_,false);
160  jsonMonitor_->registerGlobalMonitorable(&accepted_,false);
161  jsonMonitor_->registerGlobalMonitorable(&errorEvents_,false);
162  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,false);
163  jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
164  jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
165  jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
166  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,false);
167  jsonMonitor_->registerGlobalMonitorable(&transferDestination_,false);
168  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_,false);
169  jsonMonitor_->commit(nullptr);
170 
171  }
172 
173  template<typename Consumer>
175 
176  template<typename Consumer>
177  void
179  {
180  const std::string openInitFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
181  edm::LogInfo("RecoEventOutputModuleForFU") << "start() method, initializing streams. init stream -: "
182  << openInitFileName;
183  c_->setInitMessageFile(openInitFileName);
184  c_->start();
185 
186  }
187 
188  template<typename Consumer>
189  void
191  {
192  c_->stop();
193  }
194 
195  template<typename Consumer>
196  void
198  {
199  c_->doOutputHeader(init_message);
200 
201  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
202  struct stat istat;
203  stat(openIniFileName.c_str(), &istat);
204  //read back file to check integrity of what was written
205  off_t readInput=0;
206  uint32_t adlera=1,adlerb=0;
207  FILE *src = fopen(openIniFileName.c_str(),"r");
208  while (readInput<istat.st_size)
209  {
210  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
211  fread(outBuf_,toRead,1,src);
212  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
213  readInput+=toRead;
214  }
215  fclose(src);
216  uint32_t adler32c = (adlerb << 16) | adlera;
217  if (adler32c != c_->get_adler32_ini()) {
218  throw cms::Exception("RecoEventOutputModuleForFU") << "Checksum mismatch of ini file -: " << openIniFileName
219  << " expected:" << c_->get_adler32_ini() << " obtained:" << adler32c;
220  }
221  else {
222  edm::LogWarning("RecoEventOutputModuleForFU") << "Ini file checksum -: "<< stream_label_ << " " << adler32c;
223  boost::filesystem::rename(openIniFileName,edm::Service<evf::EvFDaqDirector>()->getInitFilePath(stream_label_));
224  }
225  }
226 
227  template<typename Consumer>
228  void
230  accepted_.value()++;
231  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
232  }
233 
234  template<typename Consumer>
235  void
239  Consumer::fillDescription(desc);
240  descriptions.add("EvFOutputModule", desc);
241  }
242 
243  template<typename Consumer>
245  {
246  //get stream transfer destination
247  transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(stream_label_);
248  }
249 
250 
251  template<typename Consumer>
253  {
254  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
255  openDatFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
256  openDatChecksumFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
257  c_->setOutputFile(openDatFilePath_.string());
258  filelist_ = openDatFilePath_.filename().string();
259  }
260 
261  template<typename Consumer>
263  {
264  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
265  long filesize=0;
266  fileAdler32_.value() = c_->get_adler32();
267  c_->closeOutputFile();
268  bool abortFlag = false;
269  processed_.value() = fms_->getEventsProcessedForLumi(ls.luminosityBlock(),&abortFlag);
270 
271  if (abortFlag) {
272  edm::LogInfo("RecoEventOutputModuleForFU") << "output suppressed";
273  return;
274  }
275 
276  if(processed_.value()!=0) {
277 
278  //lock
279  FILE *des = edm::Service<evf::EvFDaqDirector>()->maybeCreateAndLockFileHeadForStream(ls.luminosityBlock(),stream_label_);
280 
281  std::string deschecksum = edm::Service<evf::EvFDaqDirector>()->getMergedDatChecksumFilePath(ls.luminosityBlock(), stream_label_);
282 
283  struct stat istat;
284  FILE * cf = NULL;
285  uint32_t mergedAdler32=1;
286  //get adler32 accumulated checksum for the merged file
287  if (!stat(deschecksum.c_str(), &istat)) {
288  if (istat.st_size) {
289  cf = fopen(deschecksum.c_str(),"r");
290  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open checksum file -: " << deschecksum.c_str();
291  fscanf(cf,"%u",&mergedAdler32);
292  fclose(cf);
293  }
294  else edm::LogWarning("RecoEventOutputModuleForFU") << "Checksum file size is empty -: "<< deschecksum.c_str();
295  }
296 
297  FILE *src = fopen(openDatFilePath_.string().c_str(),"r");
298 
299  stat(openDatFilePath_.string().c_str(), &istat);
300  off_t readInput=0;
301  uint32_t adlera=1;
302  uint32_t adlerb=0;
303  while (readInput<istat.st_size) {
304  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
305  fread(outBuf_,toRead,1,src);
306  fwrite(outBuf_,toRead,1,des);
307  if (readAdler32Check_)
308  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
309  readInput+=toRead;
310  filesize+=toRead;
311  }
312 
313  //if(des != 0 && src !=0){
314  // while((b=fgetc(src))!= EOF){
315  // fputc((unsigned char)b,des);
316  // filesize++;
317  // }
318  //}
319 
320  //write new string representation of the checksum value
321  cf = fopen(deschecksum.c_str(),"w");
322  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
323 
324  //write adler32 combine to checksum file
325  mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);
326 
327  fprintf(cf,"%u",mergedAdler32);
328  fclose(cf);
329 
330  edm::Service<evf::EvFDaqDirector>()->unlockAndCloseMergeStream();
331  fclose(src);
332 
333  if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {
334 
335  throw cms::Exception("RecoEventOutputModuleForFU") << "Adler32 checksum mismatch after reading file -: "
336  << openDatFilePath_.string() <<" in LS " << ls.luminosityBlock() << std::endl;
337  }
338 
339  } else {
340  //return if not in empty lumisectio mode
341  if (!edm::Service<evf::EvFDaqDirector>()->emptyLumisectionMode())
342  return;
343  filelist_ = "";
344  fileAdler32_.value()=-1;
345  }
346 
347  //remove file
348  remove(openDatFilePath_.string().c_str());
349  filesize_=filesize;
350 
351  jsonMonitor_->snap(ls.luminosityBlock());
352  const std::string outputJsonNameStream =
353  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(),stream_label_);
354  jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.luminosityBlock());
355 
356  // reset monitoring params
357  accepted_.value() = 0;
358  filelist_ = "";
359  }
360 
361 } // end of namespace-edm
362 
363 #endif
#define LogDebug(id)
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
static void fillDescription(ParameterSetDescription &desc)
boost::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
static bool serialize(JsonSerializable *pObj, std::string &output)
def ls
Definition: eostools.py:348
#define NULL
Definition: scimark2.h:8
virtual void doOutputEvent(EventMsgBuilder const &msg) override
virtual void endLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *) override
virtual void beginLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *) override
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
virtual void setName(std::string name)
boost::filesystem::path openDatChecksumFilePath_
virtual void doOutputHeader(InitMsgBuilder const &init_message) override
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
jsoncollector::DataPointDefinition outJsonDef_
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void setDefaultGroup(std::string const &group)