CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
8 
9 #include <sstream>
10 #include <iomanip>
11 #include <boost/filesystem.hpp>
12 #include <boost/algorithm/string.hpp>
13 #include <zlib.h>
14 
21 
22 
23 namespace evf {
24  template<typename Consumer>
26 
35  public:
36  explicit RecoEventOutputModuleForFU(edm::ParameterSet const& ps);
38  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
39 
40  private:
41  virtual void start() const;
42  virtual void stop() const;
43  virtual void doOutputHeader(InitMsgBuilder const& init_message) const;
44  virtual void doOutputEvent(EventMsgBuilder const& msg) const;
45  //virtual void beginRun(edm::RunPrincipal const&, edm::ModuleCallingContext const*);
46  virtual void beginJob();
49 
50  private:
51  std::auto_ptr<Consumer> c_;
56  mutable IntJ accepted_;
64  boost::shared_ptr<FastMonitor> jsonMonitor_;
67  unsigned char* outBuf_=0;
68  bool readAdler32Check_=false;
69 
70 
71  }; //end-of-class-def
72 
73  template<typename Consumer>
75  edm::StreamerOutputModuleBase(ps),
76  c_(new Consumer(ps)),
77  stream_label_(ps.getParameter<std::string>("@module_label")),
78  processed_(0),
79  accepted_(0),
80  errorEvents_(0),
81  retCodeMask_(0),
82  filelist_(),
83  filesize_(0),
84  inputFiles_(),
85  fileAdler32_(1),
86  transferDestination_(),
87  outBuf_(new unsigned char[1024*1024])
88  {
89  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
90  readAdler32Check_ = edm::Service<evf::EvFDaqDirector>()->outputAdler32Recheck();
91  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
92  // create open dir if not already there
93  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
94 
95  //replace hltOutoputA with stream if the HLT menu uses this convention
96  std::string testPrefix="hltOutput";
97  if (stream_label_.find(testPrefix)==0)
98  stream_label_=std::string("stream")+stream_label_.substr(testPrefix.size());
99 
100  if (stream_label_.find("_")!=std::string::npos) {
101  throw cms::Exception("RecoEventOutputModuleForFU")
102  << "Underscore character is reserved can not be used for stream names in FFF, but was detected in stream name -: " << stream_label_;
103  }
104 
105 
106  std::string stream_label_lo = stream_label_;
107  boost::algorithm::to_lower(stream_label_lo);
108  auto streampos = stream_label_lo.rfind("stream");
109  if (streampos !=0 && streampos!=std::string::npos)
110  throw cms::Exception("RecoEventOutputModuleForFU")
111  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for names in FFF based HLT, but was detected in stream name";
112 
114 
115  processed_.setName("Processed");
116  accepted_.setName("Accepted");
117  errorEvents_.setName("ErrorEvents");
118  retCodeMask_.setName("ReturnCodeMask");
119  filelist_.setName("Filelist");
120  filesize_.setName("Filesize");
121  inputFiles_.setName("InputFiles");
122  fileAdler32_.setName("FileAdler32");
123  transferDestination_.setName("TransferDestination");
124 
126  outJsonDef_.addLegendItem("Processed","integer",DataPointDefinition::SUM);
127  outJsonDef_.addLegendItem("Accepted","integer",DataPointDefinition::SUM);
128  outJsonDef_.addLegendItem("ErrorEvents","integer",DataPointDefinition::SUM);
129  outJsonDef_.addLegendItem("ReturnCodeMask","integer",DataPointDefinition::BINARYOR);
130  outJsonDef_.addLegendItem("Filelist","string",DataPointDefinition::MERGE);
131  outJsonDef_.addLegendItem("Filesize","integer",DataPointDefinition::SUM);
132  outJsonDef_.addLegendItem("InputFiles","string",DataPointDefinition::CAT);
133  outJsonDef_.addLegendItem("FileAdler32","integer",DataPointDefinition::ADLER32);
134  outJsonDef_.addLegendItem("TransferDestination","string",DataPointDefinition::SAME);
135  std::stringstream tmpss,ss;
136  tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
137  ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
138  std::string outTmpJsonDefName = tmpss.str();
139  std::string outJsonDefName = ss.str();
140 
141  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
142  struct stat fstat;
143  if (stat (outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
144  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
147  FileIO::writeStringToFile(outTmpJsonDefName, content);
148  boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
149  }
150  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
151 
152  jsonMonitor_.reset(new FastMonitor(&outJsonDef_,true));
153  jsonMonitor_->setDefPath(outJsonDefName);
154  jsonMonitor_->registerGlobalMonitorable(&processed_,false);
155  jsonMonitor_->registerGlobalMonitorable(&accepted_,false);
156  jsonMonitor_->registerGlobalMonitorable(&errorEvents_,false);
157  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,false);
158  jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
159  jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
160  jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
161  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,false);
162  jsonMonitor_->registerGlobalMonitorable(&transferDestination_,false);
163  jsonMonitor_->commit(nullptr);
164 
165  }
166 
167  template<typename Consumer>
169 
170  template<typename Consumer>
171  void
173  {
174  const std::string openInitFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
175  edm::LogInfo("RecoEventOutputModuleForFU") << "start() method, initializing streams. init stream -: "
176  << openInitFileName;
177  c_->setInitMessageFile(openInitFileName);
178  c_->start();
179 
180  }
181 
182  template<typename Consumer>
183  void
185  {
186  c_->stop();
187  }
188 
189  template<typename Consumer>
190  void
192  {
193  c_->doOutputHeader(init_message);
194 
195  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
196  struct stat istat;
197  stat(openIniFileName.c_str(), &istat);
198  //read back file to check integrity of what was written
199  off_t readInput=0;
200  uint32_t adlera=1,adlerb=0;
201  FILE *src = fopen(openIniFileName.c_str(),"r");
202  while (readInput<istat.st_size)
203  {
204  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
205  fread(outBuf_,toRead,1,src);
206  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
207  readInput+=toRead;
208  }
209  fclose(src);
210  uint32_t adler32c = (adlerb << 16) | adlera;
211  if (adler32c != c_->get_adler32_ini()) {
212  throw cms::Exception("RecoEventOutputModuleForFU") << "Checksum mismatch of ini file -: " << openIniFileName
213  << " expected:" << c_->get_adler32_ini() << " obtained:" << adler32c;
214  }
215  else {
216  edm::LogWarning("RecoEventOutputModuleForFU") << "Ini file checksum -: "<< stream_label_ << " " << adler32c;
217  boost::filesystem::rename(openIniFileName,edm::Service<evf::EvFDaqDirector>()->getInitFilePath(stream_label_));
218  }
219  }
220 
221  template<typename Consumer>
222  void
224  accepted_.value()++;
225  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
226  }
227 
228  template<typename Consumer>
229  void
233  Consumer::fillDescription(desc);
234  descriptions.add("streamerOutput", desc);
235  }
236 
237  template<typename Consumer>
239  {
240  //get stream transfer destination
241  transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(stream_label_);
242  }
243 
244 
245  template<typename Consumer>
247  {
248  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
249  openDatFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
250  openDatChecksumFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
251  c_->setOutputFile(openDatFilePath_.string());
252  filelist_ = openDatFilePath_.filename().string();
253  }
254 
255  template<typename Consumer>
257  {
258  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
259  long filesize=0;
260  fileAdler32_.value() = c_->get_adler32();
261  c_->closeOutputFile();
262  bool abortFlag = false;
263  processed_.value() = fms_->getEventsProcessedForLumi(ls.luminosityBlock(),&abortFlag);
264 
265  if (abortFlag) {
266  edm::LogInfo("RecoEventOutputModuleForFU") << "output suppressed";
267  return;
268  }
269 
270  if(processed_.value()!=0) {
271 
272  //lock
273  FILE *des = edm::Service<evf::EvFDaqDirector>()->maybeCreateAndLockFileHeadForStream(ls.luminosityBlock(),stream_label_);
274 
275  std::string deschecksum = edm::Service<evf::EvFDaqDirector>()->getMergedDatChecksumFilePath(ls.luminosityBlock(), stream_label_);
276 
277  struct stat istat;
278  FILE * cf = NULL;
279  uint32_t mergedAdler32=1;
280  //get adler32 accumulated checksum for the merged file
281  if (!stat(deschecksum.c_str(), &istat)) {
282  if (istat.st_size) {
283  cf = fopen(deschecksum.c_str(),"r");
284  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open checksum file -: " << deschecksum.c_str();
285  fscanf(cf,"%u",&mergedAdler32);
286  fclose(cf);
287  }
288  else edm::LogWarning("RecoEventOutputModuleForFU") << "Checksum file size is empty -: "<< deschecksum.c_str();
289  }
290 
291  FILE *src = fopen(openDatFilePath_.string().c_str(),"r");
292 
293  stat(openDatFilePath_.string().c_str(), &istat);
294  off_t readInput=0;
295  uint32_t adlera=1;
296  uint32_t adlerb=0;
297  while (readInput<istat.st_size) {
298  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
299  fread(outBuf_,toRead,1,src);
300  fwrite(outBuf_,toRead,1,des);
301  if (readAdler32Check_)
302  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
303  readInput+=toRead;
304  filesize+=toRead;
305  }
306 
307  //if(des != 0 && src !=0){
308  // while((b=fgetc(src))!= EOF){
309  // fputc((unsigned char)b,des);
310  // filesize++;
311  // }
312  //}
313 
314  //write new string representation of the checksum value
315  cf = fopen(deschecksum.c_str(),"w");
316  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
317 
318  //write adler32 combine to checksum file
319  mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);
320 
321  fprintf(cf,"%u",mergedAdler32);
322  fclose(cf);
323 
324  edm::Service<evf::EvFDaqDirector>()->unlockAndCloseMergeStream();
325  fclose(src);
326 
327  if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {
328 
329  throw cms::Exception("RecoEventOutputModuleForFU") << "Adler32 checksum mismatch after reading file -: "
330  << openDatFilePath_.string() <<" in LS " << ls.luminosityBlock() << std::endl;
331  }
332 
333  } else {
334  //return if not in empty lumisectio mode
335  if (!edm::Service<evf::EvFDaqDirector>()->emptyLumisectionMode())
336  return;
337  filelist_ = "";
338  fileAdler32_.value()=-1;
339  }
340 
341  //remove file
342  remove(openDatFilePath_.string().c_str());
343  filesize_=filesize;
344 
345  jsonMonitor_->snap(ls.luminosityBlock());
346  const std::string outputJsonNameStream =
347  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(),stream_label_);
348  jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.luminosityBlock());
349 
350  // reset monitoring params
351  accepted_.value() = 0;
352  filelist_ = "";
353  }
354 
355 } // end of namespace-edm
356 
357 #endif
#define LogDebug(id)
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
static void fillDescription(ParameterSetDescription &desc)
virtual void doOutputEvent(EventMsgBuilder const &msg) const
def ls
Definition: eostools.py:346
#define NULL
Definition: scimark2.h:8
virtual void doOutputHeader(InitMsgBuilder const &init_message) const
virtual void beginLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
tuple path
else: Piece not in the list, fine.
boost::shared_ptr< FastMonitor > jsonMonitor_
virtual void setName(std::string name)
boost::filesystem::path openDatChecksumFilePath_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
#define SUM(A, B)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::pair< Binary, Binary > serialize(const T &payload, bool packingOnly=false)
Definition: Serialization.h:92
void setDefaultGroup(std::string const &group)
virtual void endLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)