CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
8 
9 #include <sstream>
10 #include <iomanip>
11 #include <boost/filesystem.hpp>
12 #include <boost/algorithm/string.hpp>
13 #include <zlib.h>
14 
21 
22 
23 namespace evf {
24  template<typename Consumer>
26 
35  public:
36  explicit RecoEventOutputModuleForFU(edm::ParameterSet const& ps);
38  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
39 
40  private:
41  virtual void start() const;
42  virtual void stop() const;
43  virtual void doOutputHeader(InitMsgBuilder const& init_message) const;
44  virtual void doOutputEvent(EventMsgBuilder const& msg) const;
45  //virtual void beginRun(edm::RunPrincipal const&, edm::ModuleCallingContext const*);
46  virtual void beginJob() override;
49 
50  private:
51  std::auto_ptr<Consumer> c_;
56  mutable IntJ accepted_;
64  boost::shared_ptr<FastMonitor> jsonMonitor_;
67  unsigned char* outBuf_=0;
68  bool readAdler32Check_=false;
69 
70 
71  }; //end-of-class-def
72 
73  template<typename Consumer>
75  edm::one::OutputModuleBase::OutputModuleBase(ps),
76  edm::StreamerOutputModuleBase(ps),
77  c_(new Consumer(ps)),
78  stream_label_(ps.getParameter<std::string>("@module_label")),
79  processed_(0),
80  accepted_(0),
81  errorEvents_(0),
82  retCodeMask_(0),
83  filelist_(),
84  filesize_(0),
85  inputFiles_(),
86  fileAdler32_(1),
87  transferDestination_(),
88  outBuf_(new unsigned char[1024*1024])
89  {
90  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
91  readAdler32Check_ = edm::Service<evf::EvFDaqDirector>()->outputAdler32Recheck();
92  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
93  // create open dir if not already there
94  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
95 
96  //replace hltOutoputA with stream if the HLT menu uses this convention
97  std::string testPrefix="hltOutput";
98  if (stream_label_.find(testPrefix)==0)
99  stream_label_=std::string("stream")+stream_label_.substr(testPrefix.size());
100 
101  if (stream_label_.find("_")!=std::string::npos) {
102  throw cms::Exception("RecoEventOutputModuleForFU")
103  << "Underscore character is reserved can not be used for stream names in FFF, but was detected in stream name -: " << stream_label_;
104  }
105 
106 
107  std::string stream_label_lo = stream_label_;
108  boost::algorithm::to_lower(stream_label_lo);
109  auto streampos = stream_label_lo.rfind("stream");
110  if (streampos !=0 && streampos!=std::string::npos)
111  throw cms::Exception("RecoEventOutputModuleForFU")
112  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for names in FFF based HLT, but was detected in stream name";
113 
115 
116  processed_.setName("Processed");
117  accepted_.setName("Accepted");
118  errorEvents_.setName("ErrorEvents");
119  retCodeMask_.setName("ReturnCodeMask");
120  filelist_.setName("Filelist");
121  filesize_.setName("Filesize");
122  inputFiles_.setName("InputFiles");
123  fileAdler32_.setName("FileAdler32");
124  transferDestination_.setName("TransferDestination");
125 
127  outJsonDef_.addLegendItem("Processed","integer",DataPointDefinition::SUM);
128  outJsonDef_.addLegendItem("Accepted","integer",DataPointDefinition::SUM);
129  outJsonDef_.addLegendItem("ErrorEvents","integer",DataPointDefinition::SUM);
130  outJsonDef_.addLegendItem("ReturnCodeMask","integer",DataPointDefinition::BINARYOR);
131  outJsonDef_.addLegendItem("Filelist","string",DataPointDefinition::MERGE);
132  outJsonDef_.addLegendItem("Filesize","integer",DataPointDefinition::SUM);
133  outJsonDef_.addLegendItem("InputFiles","string",DataPointDefinition::CAT);
134  outJsonDef_.addLegendItem("FileAdler32","integer",DataPointDefinition::ADLER32);
135  outJsonDef_.addLegendItem("TransferDestination","string",DataPointDefinition::SAME);
136  std::stringstream tmpss,ss;
137  tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
138  ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
139  std::string outTmpJsonDefName = tmpss.str();
140  std::string outJsonDefName = ss.str();
141 
142  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
143  struct stat fstat;
144  if (stat (outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
145  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
148  FileIO::writeStringToFile(outTmpJsonDefName, content);
149  boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
150  }
151  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
152 
153  jsonMonitor_.reset(new FastMonitor(&outJsonDef_,true));
154  jsonMonitor_->setDefPath(outJsonDefName);
155  jsonMonitor_->registerGlobalMonitorable(&processed_,false);
156  jsonMonitor_->registerGlobalMonitorable(&accepted_,false);
157  jsonMonitor_->registerGlobalMonitorable(&errorEvents_,false);
158  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,false);
159  jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
160  jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
161  jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
162  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,false);
163  jsonMonitor_->registerGlobalMonitorable(&transferDestination_,false);
164  jsonMonitor_->commit(nullptr);
165 
166  }
167 
168  template<typename Consumer>
170 
171  template<typename Consumer>
172  void
174  {
175  const std::string openInitFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
176  edm::LogInfo("RecoEventOutputModuleForFU") << "start() method, initializing streams. init stream -: "
177  << openInitFileName;
178  c_->setInitMessageFile(openInitFileName);
179  c_->start();
180 
181  }
182 
183  template<typename Consumer>
184  void
186  {
187  c_->stop();
188  }
189 
190  template<typename Consumer>
191  void
193  {
194  c_->doOutputHeader(init_message);
195 
196  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
197  struct stat istat;
198  stat(openIniFileName.c_str(), &istat);
199  //read back file to check integrity of what was written
200  off_t readInput=0;
201  uint32_t adlera=1,adlerb=0;
202  FILE *src = fopen(openIniFileName.c_str(),"r");
203  while (readInput<istat.st_size)
204  {
205  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
206  fread(outBuf_,toRead,1,src);
207  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
208  readInput+=toRead;
209  }
210  fclose(src);
211  uint32_t adler32c = (adlerb << 16) | adlera;
212  if (adler32c != c_->get_adler32_ini()) {
213  throw cms::Exception("RecoEventOutputModuleForFU") << "Checksum mismatch of ini file -: " << openIniFileName
214  << " expected:" << c_->get_adler32_ini() << " obtained:" << adler32c;
215  }
216  else {
217  edm::LogWarning("RecoEventOutputModuleForFU") << "Ini file checksum -: "<< stream_label_ << " " << adler32c;
218  boost::filesystem::rename(openIniFileName,edm::Service<evf::EvFDaqDirector>()->getInitFilePath(stream_label_));
219  }
220  }
221 
222  template<typename Consumer>
223  void
225  accepted_.value()++;
226  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
227  }
228 
229  template<typename Consumer>
230  void
234  Consumer::fillDescription(desc);
235  descriptions.add("streamerOutput", desc);
236  }
237 
238  template<typename Consumer>
240  {
241  //get stream transfer destination
242  transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(stream_label_);
243  }
244 
245 
246  template<typename Consumer>
248  {
249  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
250  openDatFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
251  openDatChecksumFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
252  c_->setOutputFile(openDatFilePath_.string());
253  filelist_ = openDatFilePath_.filename().string();
254  }
255 
256  template<typename Consumer>
258  {
259  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
260  long filesize=0;
261  fileAdler32_.value() = c_->get_adler32();
262  c_->closeOutputFile();
263  bool abortFlag = false;
264  processed_.value() = fms_->getEventsProcessedForLumi(ls.luminosityBlock(),&abortFlag);
265 
266  if (abortFlag) {
267  edm::LogInfo("RecoEventOutputModuleForFU") << "output suppressed";
268  return;
269  }
270 
271  if(processed_.value()!=0) {
272 
273  //lock
274  FILE *des = edm::Service<evf::EvFDaqDirector>()->maybeCreateAndLockFileHeadForStream(ls.luminosityBlock(),stream_label_);
275 
276  std::string deschecksum = edm::Service<evf::EvFDaqDirector>()->getMergedDatChecksumFilePath(ls.luminosityBlock(), stream_label_);
277 
278  struct stat istat;
279  FILE * cf = NULL;
280  uint32_t mergedAdler32=1;
281  //get adler32 accumulated checksum for the merged file
282  if (!stat(deschecksum.c_str(), &istat)) {
283  if (istat.st_size) {
284  cf = fopen(deschecksum.c_str(),"r");
285  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open checksum file -: " << deschecksum.c_str();
286  fscanf(cf,"%u",&mergedAdler32);
287  fclose(cf);
288  }
289  else edm::LogWarning("RecoEventOutputModuleForFU") << "Checksum file size is empty -: "<< deschecksum.c_str();
290  }
291 
292  FILE *src = fopen(openDatFilePath_.string().c_str(),"r");
293 
294  stat(openDatFilePath_.string().c_str(), &istat);
295  off_t readInput=0;
296  uint32_t adlera=1;
297  uint32_t adlerb=0;
298  while (readInput<istat.st_size) {
299  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
300  fread(outBuf_,toRead,1,src);
301  fwrite(outBuf_,toRead,1,des);
302  if (readAdler32Check_)
303  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
304  readInput+=toRead;
305  filesize+=toRead;
306  }
307 
308  //if(des != 0 && src !=0){
309  // while((b=fgetc(src))!= EOF){
310  // fputc((unsigned char)b,des);
311  // filesize++;
312  // }
313  //}
314 
315  //write new string representation of the checksum value
316  cf = fopen(deschecksum.c_str(),"w");
317  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
318 
319  //write adler32 combine to checksum file
320  mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);
321 
322  fprintf(cf,"%u",mergedAdler32);
323  fclose(cf);
324 
325  edm::Service<evf::EvFDaqDirector>()->unlockAndCloseMergeStream();
326  fclose(src);
327 
328  if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {
329 
330  throw cms::Exception("RecoEventOutputModuleForFU") << "Adler32 checksum mismatch after reading file -: "
331  << openDatFilePath_.string() <<" in LS " << ls.luminosityBlock() << std::endl;
332  }
333 
334  } else {
335  //return if not in empty lumisectio mode
336  if (!edm::Service<evf::EvFDaqDirector>()->emptyLumisectionMode())
337  return;
338  filelist_ = "";
339  fileAdler32_.value()=-1;
340  }
341 
342  //remove file
343  remove(openDatFilePath_.string().c_str());
344  filesize_=filesize;
345 
346  jsonMonitor_->snap(ls.luminosityBlock());
347  const std::string outputJsonNameStream =
348  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(),stream_label_);
349  jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.luminosityBlock());
350 
351  // reset monitoring params
352  accepted_.value() = 0;
353  filelist_ = "";
354  }
355 
356 } // end of namespace-edm
357 
358 #endif
#define LogDebug(id)
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
static void fillDescription(ParameterSetDescription &desc)
virtual void doOutputEvent(EventMsgBuilder const &msg) const
def ls
Definition: eostools.py:346
#define NULL
Definition: scimark2.h:8
virtual void doOutputHeader(InitMsgBuilder const &init_message) const
virtual void endLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *) override
virtual void beginLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *) override
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
tuple path
else: Piece not in the list, fine.
boost::shared_ptr< FastMonitor > jsonMonitor_
virtual void setName(std::string name)
boost::filesystem::path openDatChecksumFilePath_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
#define SUM(A, B)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::pair< Binary, Binary > serialize(const T &payload, bool packingOnly=false)
Definition: Serialization.h:55
void setDefaultGroup(std::string const &group)