CMS 3D CMS Logo

RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
7 
8 #include <sstream>
9 #include <iomanip>
10 #include <boost/filesystem.hpp>
11 #include <boost/algorithm/string.hpp>
12 #include <zlib.h>
13 
20 
21 
22 namespace evf {
23  template<typename Consumer>
25 
34  public:
35  explicit RecoEventOutputModuleForFU(edm::ParameterSet const& ps);
36  ~RecoEventOutputModuleForFU() override;
37  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
38 
39  private:
40  void initRun();
41  void start() override;
42  void stop() override;
43  void doOutputHeader(InitMsgBuilder const& init_message) override;
44  void doOutputEvent(EventMsgBuilder const& msg) override;
45  //virtual void beginRun(edm::RunForOutput const&);
46  void beginJob() override;
49 
50  private:
51  std::auto_ptr<Consumer> c_;
66  boost::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
69  unsigned char* outBuf_=nullptr;
70  bool readAdler32Check_=false;
71  }; //end-of-class-def
72 
73  template<typename Consumer>
77  c_(new Consumer(ps)),
78  streamLabel_(ps.getParameter<std::string>("@module_label")),
79  processed_(0),
80  accepted_(0),
81  errorEvents_(0),
82  retCodeMask_(0),
83  filelist_(),
84  filesize_(0),
85  inputFiles_(),
86  fileAdler32_(1),
88  mergeType_(),
89  hltErrorEvents_(0),
90  outBuf_(new unsigned char[1024*1024])
91  {
92  //replace hltOutoputA with stream if the HLT menu uses this convention
93  std::string testPrefix="hltOutput";
94  if (streamLabel_.find(testPrefix)==0)
95  streamLabel_=std::string("stream")+streamLabel_.substr(testPrefix.size());
96 
97  if (streamLabel_.find("_")!=std::string::npos) {
98  throw cms::Exception("RecoEventOutputModuleForFU")
99  << "Underscore character is reserved can not be used for stream names in FFF, but was detected in stream name -: " << streamLabel_;
100  }
101 
102  std::string streamLabelLow = streamLabel_;
103  boost::algorithm::to_lower(streamLabelLow);
104  auto streampos = streamLabelLow.rfind("stream");
105  if (streampos !=0 && streampos!=std::string::npos)
106  throw cms::Exception("RecoEventOutputModuleForFU")
107  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for names in FFF based HLT, but was detected in stream name";
108 
110  }
111 
112  template<typename Consumer>
114  {
115  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
116  readAdler32Check_ = edm::Service<evf::EvFDaqDirector>()->outputAdler32Recheck();
117  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
118  // create open dir if not already there
119  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
120 
121  processed_.setName("Processed");
122  accepted_.setName("Accepted");
123  errorEvents_.setName("ErrorEvents");
124  retCodeMask_.setName("ReturnCodeMask");
125  filelist_.setName("Filelist");
126  filesize_.setName("Filesize");
127  inputFiles_.setName("InputFiles");
128  fileAdler32_.setName("FileAdler32");
129  transferDestination_.setName("TransferDestination");
130  mergeType_.setName("MergeType");
131  hltErrorEvents_.setName("HLTErrorEvents");
132 
145  std::stringstream tmpss,ss;
146  tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
147  ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
148  std::string outTmpJsonDefName = tmpss.str();
149  std::string outJsonDefName = ss.str();
150 
151  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
152  struct stat fstat;
153  if (stat (outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
154  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
157  jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
158  boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
159  }
160  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
161 
163  jsonMonitor_->setDefPath(outJsonDefName);
164  jsonMonitor_->registerGlobalMonitorable(&processed_,false);
165  jsonMonitor_->registerGlobalMonitorable(&accepted_,false);
166  jsonMonitor_->registerGlobalMonitorable(&errorEvents_,false);
167  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,false);
168  jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
169  jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
170  jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
171  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,false);
172  jsonMonitor_->registerGlobalMonitorable(&transferDestination_,false);
173  jsonMonitor_->registerGlobalMonitorable(&mergeType_,false);
174  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_,false);
175  jsonMonitor_->commit(nullptr);
176 
177  }
178 
179  template<typename Consumer>
181 
182  template<typename Consumer>
183  void
185  {
186  initRun();
187  const std::string openInitFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
188  edm::LogInfo("RecoEventOutputModuleForFU") << "start() method, initializing streams. init stream -: "
189  << openInitFileName;
190  c_->setInitMessageFile(openInitFileName);
191  c_->start();
192 
193  }
194 
195  template<typename Consumer>
196  void
198  {
199  c_->stop();
200  }
201 
202  template<typename Consumer>
203  void
205  {
206  c_->doOutputHeader(init_message);
207 
208  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
209  struct stat istat;
210  stat(openIniFileName.c_str(), &istat);
211  //read back file to check integrity of what was written
212  off_t readInput=0;
213  uint32_t adlera=1,adlerb=0;
214  FILE *src = fopen(openIniFileName.c_str(),"r");
215  while (readInput<istat.st_size)
216  {
217  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
218  fread(outBuf_,toRead,1,src);
219  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
220  readInput+=toRead;
221  }
222  fclose(src);
223  //free output buffer needed only for the INI file
224  delete [] outBuf_;
225  outBuf_=nullptr;
226 
227  uint32_t adler32c = (adlerb << 16) | adlera;
228  if (adler32c != c_->get_adler32_ini()) {
229  throw cms::Exception("RecoEventOutputModuleForFU") << "Checksum mismatch of ini file -: " << openIniFileName
230  << " expected:" << c_->get_adler32_ini() << " obtained:" << adler32c;
231  }
232  else {
233  LogDebug("RecoEventOutputModuleForFU") << "Ini file checksum -: "<< streamLabel_ << " " << adler32c;
234  boost::filesystem::rename(openIniFileName,edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
235  }
236  }
237 
238  template<typename Consumer>
239  void
241  accepted_.value()++;
242  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
243  }
244 
245  template<typename Consumer>
246  void
250  Consumer::fillDescription(desc);
251  // Use addDefault here instead of add for 4 reasons:
252  // 1. Because EvFOutputModule_cfi.py is explicitly defined it does not need to be autogenerated
253  // The explicitly defined version overrides the autogenerated version of the cfi file.
254  // 2. That cfi file is not used anywhere in the release anyway
255  // 3. There are two plugin names used for the same template instantiation of this
256  // type, "ShmStreamConsumer" and "EvFOutputModule" and this causes name conflict
257  // problems for the cfi generation code which are avoided with addDefault.
258  // 4. At the present time, there is only one type of Consumer used to instantiate
259  // instances of this template, but if there were more than one type then this function
260  // would need to be specialized for each type unless the descriptions were the same
261  // and addDefault was used.
262  descriptions.addDefault(desc);
263  }
264 
265  template<typename Consumer>
267  {
268  //get stream transfer destination
271  }
272 
273 
274  template<typename Consumer>
276  {
277  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
280  c_->setOutputFile(openDatFilePath_.string());
281  filelist_ = openDatFilePath_.filename().string();
282  }
283 
284  template<typename Consumer>
286  {
287  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
288  long filesize=0;
289  fileAdler32_.value() = c_->get_adler32();
290  c_->closeOutputFile();
291  bool abortFlag = false;
293 
294  if (abortFlag) {
295  edm::LogInfo("RecoEventOutputModuleForFU") << "output suppressed";
296  return;
297  }
298 
299  if(processed_.value()!=0) {
300  //lock
301  struct stat istat;
302  stat(openDatFilePath_.string().c_str(), &istat);
303  filesize = istat.st_size;
304  boost::filesystem::rename(openDatFilePath_.string().c_str(), edm::Service<evf::EvFDaqDirector>()->getDatFilePath(ls.luminosityBlock(),streamLabel_));
305  } else {
306  filelist_ = "";
307  fileAdler32_.value()=-1;
308  }
309 
310  //remove file
311  remove(openDatFilePath_.string().c_str());
312  filesize_=filesize;
313 
314  jsonMonitor_->snap(ls.luminosityBlock());
315  const std::string outputJsonNameStream =
316  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(),streamLabel_);
317  jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.luminosityBlock());
318 
319  // reset monitoring params
320  accepted_.value() = 0;
321  filelist_ = "";
322  }
323 
324 } // end of namespace-edm
325 
326 #endif
#define LogDebug(id)
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
static void fillDescription(ParameterSetDescription &desc)
void endLuminosityBlock(edm::LuminosityBlockForOutput const &) override
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
boost::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
static bool serialize(JsonSerializable *pObj, std::string &output)
void doOutputEvent(EventMsgBuilder const &msg) override
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
void addDefault(ParameterSetDescription const &psetDescription)
virtual void setName(std::string name)
boost::filesystem::path openDatChecksumFilePath_
void doOutputHeader(InitMsgBuilder const &init_message) override
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
StreamerOutputModuleBase(ParameterSet const &ps)
def ls(path, rec=False)
Definition: eostools.py:348
jsoncollector::DataPointDefinition outJsonDef_
tuple msg
Definition: mps_check.py:277
HLT enums.
void setDefaultGroup(std::string const &group)
void beginLuminosityBlock(edm::LuminosityBlockForOutput const &) override