CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
8 
9 #include <sstream>
10 #include <iomanip>
11 #include <boost/filesystem.hpp>
12 #include <zlib.h>
13 
20 
21 
22 namespace evf {
23  template<typename Consumer>
25 
34  public:
35  explicit RecoEventOutputModuleForFU(edm::ParameterSet const& ps);
37  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
38 
39  private:
40  virtual void start() const;
41  virtual void stop() const;
42  virtual void doOutputHeader(InitMsgBuilder const& init_message) const;
43  virtual void doOutputEvent(EventMsgBuilder const& msg) const;
44  // virtual void beginRun(edm::RunPrincipal const&);
47 
48  private:
49  std::auto_ptr<Consumer> c_;
54  mutable IntJ accepted_;
61  boost::shared_ptr<FastMonitor> jsonMonitor_;
64  unsigned char* outBuf_=0;
65  bool readAdler32Check_=false;
66 
67 
68  }; //end-of-class-def
69 
70  template<typename Consumer>
72  edm::StreamerOutputModuleBase(ps),
73  c_(new Consumer(ps)),
74  stream_label_(ps.getParameter<std::string>("@module_label")),
75  processed_(0),
76  accepted_(0),
77  errorEvents_(0),
78  retCodeMask_(0),
79  filelist_(),
80  filesize_(0),
81  inputFiles_(),
82  fileAdler32_(1),
83  outBuf_(new unsigned char[1024*1024])
84  {
85  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
86  readAdler32Check_ = edm::Service<evf::EvFDaqDirector>()->outputAdler32Recheck();
87  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
88  // create open dir if not already there
89  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
90 
91  //replace hltOutoputA with stream if the HLT menu uses this convention
92  std::string testPrefix="hltOutput";
93  if (stream_label_.find(testPrefix)==0)
94  stream_label_=std::string("stream")+stream_label_.substr(testPrefix.size());
95 
97 
98  processed_.setName("Processed");
99  accepted_.setName("Accepted");
100  errorEvents_.setName("ErrorEvents");
101  retCodeMask_.setName("ReturnCodeMask");
102  filelist_.setName("Filelist");
103  filesize_.setName("Filesize");
104  inputFiles_.setName("InputFiles");
105  fileAdler32_.setName("FileAdler32");
106 
108  outJsonDef_.addLegendItem("Processed","integer",DataPointDefinition::SUM);
109  outJsonDef_.addLegendItem("Accepted","integer",DataPointDefinition::SUM);
110  outJsonDef_.addLegendItem("ErrorEvents","integer",DataPointDefinition::SUM);
111  outJsonDef_.addLegendItem("ReturnCodeMask","integer",DataPointDefinition::BINARYOR);
112  outJsonDef_.addLegendItem("Filelist","string",DataPointDefinition::MERGE);
113  outJsonDef_.addLegendItem("Filesize","integer",DataPointDefinition::SUM);
114  outJsonDef_.addLegendItem("InputFiles","string",DataPointDefinition::CAT);
115  outJsonDef_.addLegendItem("FileAdler32","integer",DataPointDefinition::ADLER32);
116  std::stringstream tmpss,ss;
117  tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
118  ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
119  std::string outTmpJsonDefName = tmpss.str();
120  std::string outJsonDefName = ss.str();
121 
122  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
123  struct stat fstat;
124  if (stat (outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
125  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
128  FileIO::writeStringToFile(outTmpJsonDefName, content);
129  boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
130  }
131  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
132 
133  jsonMonitor_.reset(new FastMonitor(&outJsonDef_,true));
134  jsonMonitor_->setDefPath(outJsonDefName);
135  jsonMonitor_->registerGlobalMonitorable(&processed_,false);
136  jsonMonitor_->registerGlobalMonitorable(&accepted_,false);
137  jsonMonitor_->registerGlobalMonitorable(&errorEvents_,false);
138  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,false);
139  jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
140  jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
141  jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
142  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,false);
143  jsonMonitor_->commit(nullptr);
144  }
145 
146  template<typename Consumer>
148 
149  template<typename Consumer>
150  void
152  {
153  const std::string initFileName = edm::Service<evf::EvFDaqDirector>()->getInitFilePath(stream_label_);
154  edm::LogInfo("RecoEventOutputModuleForFU") << "start() method, initializing streams. init stream -: "
155  << initFileName;
156  c_->setInitMessageFile(initFileName);
157  c_->start();
158  }
159 
160  template<typename Consumer>
161  void
163  {
164  c_->stop();
165  }
166 
167  template<typename Consumer>
168  void
170  {
171  c_->doOutputHeader(init_message);
172  }
173 
174  template<typename Consumer>
175  void
177  accepted_.value()++;
178  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
179  }
180 
181  template<typename Consumer>
182  void
186  Consumer::fillDescription(desc);
187  descriptions.add("streamerOutput", desc);
188  }
189 
190  template<typename Consumer>
192  {
193  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
194  openDatFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
195  openDatChecksumFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
196  c_->setOutputFile(openDatFilePath_.string());
197  filelist_ = openDatFilePath_.filename().string();
198  }
199 
200  template<typename Consumer>
202  {
203  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
204  long filesize=0;
205  fileAdler32_.value() = c_->get_adler32();
206  c_->closeOutputFile();
207  processed_.value() = fms_->getEventsProcessedForLumi(ls.luminosityBlock());
208 
209 
210  if(processed_.value()!=0){
211 
212  //lock
213  FILE *des = edm::Service<evf::EvFDaqDirector>()->maybeCreateAndLockFileHeadForStream(ls.luminosityBlock(),stream_label_);
214 
215  std::string deschecksum = edm::Service<evf::EvFDaqDirector>()->getMergedDatChecksumFilePath(ls.luminosityBlock(), stream_label_);
216 
217  struct stat istat;
218  FILE * cf = NULL;
219  uint32_t mergedAdler32=1;
220  //get adler32 accumulated checksum for the merged file
221  if (!stat(deschecksum.c_str(), &istat)) {
222  if (istat.st_size) {
223  cf = fopen(deschecksum.c_str(),"r");
224  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open checksum file -: " << deschecksum.c_str();
225  fscanf(cf,"%u",&mergedAdler32);
226  fclose(cf);
227  }
228  else edm::LogWarning("RecoEventOutputModuleForFU") << "Checksum file size is empty -: "<< deschecksum.c_str();
229  }
230 
231  FILE *src = fopen(openDatFilePath_.string().c_str(),"r");
232 
233  stat(openDatFilePath_.string().c_str(), &istat);
234  off_t readInput=0;
235  uint32_t adlera=1;
236  uint32_t adlerb=0;
237  while (readInput<istat.st_size) {
238  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
239  fread(outBuf_,toRead,1,src);
240  fwrite(outBuf_,toRead,1,des);
241  if (readAdler32Check_)
242  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
243  readInput+=toRead;
244  filesize+=toRead;
245  }
246 
247  //if(des != 0 && src !=0){
248  // while((b=fgetc(src))!= EOF){
249  // fputc((unsigned char)b,des);
250  // filesize++;
251  // }
252  //}
253 
254  //write new string representation of the checksum value
255  cf = fopen(deschecksum.c_str(),"w");
256  if (!cf) throw cms::Exception("RecoEventOutputModuleForFU") << "Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
257 
258  //write adler32 combine to checksum file
259  mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);
260 
261  fprintf(cf,"%u",mergedAdler32);
262  fclose(cf);
263 
264  edm::Service<evf::EvFDaqDirector>()->unlockAndCloseMergeStream();
265  fclose(src);
266 
267  if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {
268 
269  throw cms::Exception("RecoEventOutputModuleForFU") << "Adler32 checksum mismatch after reading file -: "
270  << openDatFilePath_.string() <<" in LS " << ls.luminosityBlock() << std::endl;
271  }
272 
273  }
274  //remove file
275  remove(openDatFilePath_.string().c_str());
276  filesize_=filesize;
277 
278  // output jsn file
279  if(processed_.value()!=0){
280  jsonMonitor_->snap(ls.luminosityBlock());
281  const std::string outputJsonNameStream =
282  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(),stream_label_);
283  jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.luminosityBlock());
284  }
285 
286  // reset monitoring params
287  accepted_.value() = 0;
288  filelist_ = "";
289  }
290 
291 } // end of namespace-edm
292 
293 #endif
#define LogDebug(id)
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
static void fillDescription(ParameterSetDescription &desc)
virtual void doOutputEvent(EventMsgBuilder const &msg) const
#define NULL
Definition: scimark2.h:8
virtual void doOutputHeader(InitMsgBuilder const &init_message) const
virtual void beginLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
tuple path
else: Piece not in the list, fine.
boost::shared_ptr< FastMonitor > jsonMonitor_
virtual void setName(std::string name)
boost::filesystem::path openDatChecksumFilePath_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
#define SUM(A, B)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::pair< Binary, Binary > serialize(const T &payload, bool packingOnly=false)
Definition: Serialization.h:92
void setDefaultGroup(std::string const &group)
virtual void endLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)