CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
8 
9 #include <sstream>
10 #include <iomanip>
11 #include <boost/filesystem.hpp>
12 #include <zlib.h>
13 
20 
21 
22 namespace evf {
23  template<typename Consumer>
25 
34  public:
35  explicit RecoEventOutputModuleForFU(edm::ParameterSet const& ps);
37  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
38 
39  private:
40  virtual void start() const;
41  virtual void stop() const;
42  virtual void doOutputHeader(InitMsgBuilder const& init_message) const;
43  virtual void doOutputEvent(EventMsgBuilder const& msg) const;
44  //virtual void beginRun(edm::RunPrincipal const&, edm::ModuleCallingContext const*);
45  virtual void beginJob();
48 
49  private:
50  std::auto_ptr<Consumer> c_;
55  mutable IntJ accepted_;
63  boost::shared_ptr<FastMonitor> jsonMonitor_;
66  unsigned char* outBuf_=0;
67  bool readAdler32Check_=false;
68 
69 
70  }; //end-of-class-def
71 
72  template<typename Consumer>
74  edm::StreamerOutputModuleBase(ps),
75  c_(new Consumer(ps)),
76  stream_label_(ps.getParameter<std::string>("@module_label")),
77  processed_(0),
78  accepted_(0),
79  errorEvents_(0),
80  retCodeMask_(0),
81  filelist_(),
82  filesize_(0),
83  inputFiles_(),
84  fileAdler32_(1),
85  transferDestination_(),
86  outBuf_(new unsigned char[1024*1024])
87  {
88  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
89  readAdler32Check_ = edm::Service<evf::EvFDaqDirector>()->outputAdler32Recheck();
90  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
91  // create open dir if not already there
92  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
93 
94  //replace hltOutoputA with stream if the HLT menu uses this convention
95  std::string testPrefix="hltOutput";
96  if (stream_label_.find(testPrefix)==0)
97  stream_label_=std::string("stream")+stream_label_.substr(testPrefix.size());
98 
100 
101  processed_.setName("Processed");
102  accepted_.setName("Accepted");
103  errorEvents_.setName("ErrorEvents");
104  retCodeMask_.setName("ReturnCodeMask");
105  filelist_.setName("Filelist");
106  filesize_.setName("Filesize");
107  inputFiles_.setName("InputFiles");
108  fileAdler32_.setName("FileAdler32");
109  transferDestination_.setName("TransferDestination");
110 
112  outJsonDef_.addLegendItem("Processed","integer",DataPointDefinition::SUM);
113  outJsonDef_.addLegendItem("Accepted","integer",DataPointDefinition::SUM);
114  outJsonDef_.addLegendItem("ErrorEvents","integer",DataPointDefinition::SUM);
115  outJsonDef_.addLegendItem("ReturnCodeMask","integer",DataPointDefinition::BINARYOR);
116  outJsonDef_.addLegendItem("Filelist","string",DataPointDefinition::MERGE);
117  outJsonDef_.addLegendItem("Filesize","integer",DataPointDefinition::SUM);
118  outJsonDef_.addLegendItem("InputFiles","string",DataPointDefinition::CAT);
119  outJsonDef_.addLegendItem("FileAdler32","integer",DataPointDefinition::ADLER32);
120  outJsonDef_.addLegendItem("TransferDestination","string",DataPointDefinition::SAME);
121  std::stringstream tmpss,ss;
122  tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
123  ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
124  std::string outTmpJsonDefName = tmpss.str();
125  std::string outJsonDefName = ss.str();
126 
127  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
128  struct stat fstat;
129  if (stat (outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
130  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
133  FileIO::writeStringToFile(outTmpJsonDefName, content);
134  boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
135  }
136  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
137 
138  jsonMonitor_.reset(new FastMonitor(&outJsonDef_,true));
139  jsonMonitor_->setDefPath(outJsonDefName);
140  jsonMonitor_->registerGlobalMonitorable(&processed_,false);
141  jsonMonitor_->registerGlobalMonitorable(&accepted_,false);
142  jsonMonitor_->registerGlobalMonitorable(&errorEvents_,false);
143  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,false);
144  jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
145  jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
146  jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
147  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,false);
148  jsonMonitor_->registerGlobalMonitorable(&transferDestination_,false);
149  jsonMonitor_->commit(nullptr);
150 
151  }
152 
153  template<typename Consumer>
155 
156  template<typename Consumer>
157  void
159  {
160  const std::string openInitFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
161  edm::LogInfo("RecoEventOutputModuleForFU") << "start() method, initializing streams. init stream -: "
162  << openInitFileName;
163  c_->setInitMessageFile(openInitFileName);
164  c_->start();
165 
166  }
167 
168  template<typename Consumer>
169  void
171  {
172  c_->stop();
173  }
174 
175  template<typename Consumer>
176  void
178  {
179  c_->doOutputHeader(init_message);
180 
181  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(stream_label_);
182  struct stat istat;
183  stat(openIniFileName.c_str(), &istat);
184  //read back file to check integrity of what was written
185  off_t readInput=0;
186  uint32_t adlera=1,adlerb=0;
187  FILE *src = fopen(openIniFileName.c_str(),"r");
188  while (readInput<istat.st_size)
189  {
190  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
191  fread(outBuf_,toRead,1,src);
192  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
193  readInput+=toRead;
194  }
195  fclose(src);
196  uint32_t adler32c = (adlerb << 16) | adlera;
197  if (adler32c != c_->get_adler32_ini()) {
198  throw cms::Exception("RecoEventOutputModuleForFU") << "Checksum mismatch of ini file -: " << openIniFileName
199  << " expected:" << c_->get_adler32_ini() << " obtained:" << adler32c;
200  }
201  else {
202  edm::LogWarning("RecoEventOutputModuleForFU") << "Ini file checksum -: "<< stream_label_ << " " << adler32c;
203  boost::filesystem::rename(openIniFileName,edm::Service<evf::EvFDaqDirector>()->getInitFilePath(stream_label_));
204  }
205  }
206 
207  template<typename Consumer>
208  void
210  accepted_.value()++;
211  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
212  }
213 
214  template<typename Consumer>
215  void
219  Consumer::fillDescription(desc);
220  descriptions.add("streamerOutput", desc);
221  }
222 
223  template<typename Consumer>
225  {
226  //get stream transfer destination
227  transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(stream_label_);
228  }
229 
230 
231  template<typename Consumer>
233  {
234  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
235  openDatFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
236  openDatChecksumFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
237  c_->setOutputFile(openDatFilePath_.string());
238  filelist_ = openDatFilePath_.filename().string();
239  }
240 
241  template<typename Consumer>
243  {
244  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
245  long filesize=0;
246  fileAdler32_.value() = c_->get_adler32();
247  c_->closeOutputFile();
248  processed_.value() = fms_->getEventsProcessedForLumi(ls.luminosityBlock());
249 
250 
251  if(processed_.value()!=0){
252 
253  //lock
254  FILE *des = edm::Service<evf::EvFDaqDirector>()->maybeCreateAndLockFileHeadForStream(ls.luminosityBlock(),stream_label_);
255 
256  std::string deschecksum = edm::Service<evf::EvFDaqDirector>()->getMergedDatChecksumFilePath(ls.luminosityBlock(), stream_label_);
257 
258  struct stat istat;
259  FILE * cf = NULL;
260  uint32_t mergedAdler32=1;
261  //get adler32 accumulated checksum for the merged file
262  if (!stat(deschecksum.c_str(), &istat)) {
263  if (istat.st_size) {
264  cf = fopen(deschecksum.c_str(),"r");
265  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open checksum file -: " << deschecksum.c_str();
266  fscanf(cf,"%u",&mergedAdler32);
267  fclose(cf);
268  }
269  else edm::LogWarning("RecoEventOutputModuleForFU") << "Checksum file size is empty -: "<< deschecksum.c_str();
270  }
271 
272  FILE *src = fopen(openDatFilePath_.string().c_str(),"r");
273 
274  stat(openDatFilePath_.string().c_str(), &istat);
275  off_t readInput=0;
276  uint32_t adlera=1;
277  uint32_t adlerb=0;
278  while (readInput<istat.st_size) {
279  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
280  fread(outBuf_,toRead,1,src);
281  fwrite(outBuf_,toRead,1,des);
282  if (readAdler32Check_)
283  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
284  readInput+=toRead;
285  filesize+=toRead;
286  }
287 
288  //if(des != 0 && src !=0){
289  // while((b=fgetc(src))!= EOF){
290  // fputc((unsigned char)b,des);
291  // filesize++;
292  // }
293  //}
294 
295  //write new string representation of the checksum value
296  cf = fopen(deschecksum.c_str(),"w");
297  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
298 
299  //write adler32 combine to checksum file
300  mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);
301 
302  fprintf(cf,"%u",mergedAdler32);
303  fclose(cf);
304 
305  edm::Service<evf::EvFDaqDirector>()->unlockAndCloseMergeStream();
306  fclose(src);
307 
308  if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {
309 
310  throw cms::Exception("RecoEventOutputModuleForFU") << "Adler32 checksum mismatch after reading file -: "
311  << openDatFilePath_.string() <<" in LS " << ls.luminosityBlock() << std::endl;
312  }
313 
314  }
315  //remove file
316  remove(openDatFilePath_.string().c_str());
317  filesize_=filesize;
318 
319  // output jsn file
320  if(processed_.value()!=0){
321  jsonMonitor_->snap(ls.luminosityBlock());
322  const std::string outputJsonNameStream =
323  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(),stream_label_);
324  jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.luminosityBlock());
325  }
326 
327  // reset monitoring params
328  accepted_.value() = 0;
329  filelist_ = "";
330  }
331 
332 } // end of namespace-edm
333 
334 #endif
#define LogDebug(id)
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
static void fillDescription(ParameterSetDescription &desc)
virtual void doOutputEvent(EventMsgBuilder const &msg) const
#define NULL
Definition: scimark2.h:8
virtual void doOutputHeader(InitMsgBuilder const &init_message) const
virtual void beginLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
tuple path
else: Piece not in the list, fine.
boost::shared_ptr< FastMonitor > jsonMonitor_
virtual void setName(std::string name)
boost::filesystem::path openDatChecksumFilePath_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
#define SUM(A, B)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::pair< Binary, Binary > serialize(const T &payload, bool packingOnly=false)
Definition: Serialization.h:92
void setDefaultGroup(std::string const &group)
virtual void endLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)