CMS 3D CMS Logo

RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
8 
9 #include <sstream>
10 #include <iomanip>
11 #include <boost/filesystem.hpp>
12 #include <boost/algorithm/string.hpp>
13 #include <zlib.h>
14 
22 
23 
24 namespace evf {
25  template<typename Consumer>
27 
36  public:
37  explicit RecoEventOutputModuleForFU(edm::ParameterSet const& ps);
38  ~RecoEventOutputModuleForFU() override;
39  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
40 
41  private:
42  void initRun();
43  void start() override;
44  void stop() override;
45  void doOutputHeader(InitMsgBuilder const& init_message) override;
46  void doOutputEvent(EventMsgBuilder const& msg) override;
47  //virtual void beginRun(edm::RunForOutput const&);
48  void beginJob() override;
51 
52  private:
53  std::unique_ptr<Consumer> c_;
68  boost::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
71  unsigned char* outBuf_=nullptr;
72  bool readAdler32Check_=false;
73  }; //end-of-class-def
74 
75  template<typename Consumer>
79  c_(new Consumer(ps)),
80  streamLabel_(ps.getParameter<std::string>("@module_label")),
81  processed_(0),
82  accepted_(0),
83  errorEvents_(0),
84  retCodeMask_(0),
85  filelist_(),
86  filesize_(0),
87  inputFiles_(),
88  fileAdler32_(1),
90  mergeType_(),
91  hltErrorEvents_(0),
92  outBuf_(new unsigned char[1024*1024])
93  {
94  //replace hltOutoputA with stream if the HLT menu uses this convention
95  std::string testPrefix="hltOutput";
96  if (streamLabel_.find(testPrefix)==0)
97  streamLabel_=std::string("stream")+streamLabel_.substr(testPrefix.size());
98 
99  if (streamLabel_.find("_")!=std::string::npos) {
100  throw cms::Exception("RecoEventOutputModuleForFU")
101  << "Underscore character is reserved can not be used for stream names in FFF, but was detected in stream name -: " << streamLabel_;
102  }
103 
104  std::string streamLabelLow = streamLabel_;
105  boost::algorithm::to_lower(streamLabelLow);
106  auto streampos = streamLabelLow.rfind("stream");
107  if (streampos !=0 && streampos!=std::string::npos)
108  throw cms::Exception("RecoEventOutputModuleForFU")
109  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for names in FFF based HLT, but was detected in stream name";
110 
112  }
113 
114  template<typename Consumer>
116  {
117  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
118  readAdler32Check_ = edm::Service<evf::EvFDaqDirector>()->outputAdler32Recheck();
119  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
120  // create open dir if not already there
121  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
122 
123  processed_.setName("Processed");
124  accepted_.setName("Accepted");
125  errorEvents_.setName("ErrorEvents");
126  retCodeMask_.setName("ReturnCodeMask");
127  filelist_.setName("Filelist");
128  filesize_.setName("Filesize");
129  inputFiles_.setName("InputFiles");
130  fileAdler32_.setName("FileAdler32");
131  transferDestination_.setName("TransferDestination");
132  mergeType_.setName("MergeType");
133  hltErrorEvents_.setName("HLTErrorEvents");
134 
147  std::stringstream tmpss,ss;
148  tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
149  ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
150  std::string outTmpJsonDefName = tmpss.str();
151  std::string outJsonDefName = ss.str();
152 
153  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
154  struct stat fstat;
155  if (stat (outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
156  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
159  jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
160  boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
161  }
162  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
163 
165  jsonMonitor_->setDefPath(outJsonDefName);
166  jsonMonitor_->registerGlobalMonitorable(&processed_,false);
167  jsonMonitor_->registerGlobalMonitorable(&accepted_,false);
168  jsonMonitor_->registerGlobalMonitorable(&errorEvents_,false);
169  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,false);
170  jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
171  jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
172  jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
173  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,false);
174  jsonMonitor_->registerGlobalMonitorable(&transferDestination_,false);
175  jsonMonitor_->registerGlobalMonitorable(&mergeType_,false);
176  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_,false);
177  jsonMonitor_->commit(nullptr);
178 
179  }
180 
181  template<typename Consumer>
183 
184  template<typename Consumer>
185  void
187  {
188  initRun();
189  const std::string openInitFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
190  edm::LogInfo("RecoEventOutputModuleForFU") << "start() method, initializing streams. init stream -: "
191  << openInitFileName;
192  c_->setInitMessageFile(openInitFileName);
193  c_->start();
194 
195  }
196 
197  template<typename Consumer>
198  void
200  {
201  c_->stop();
202  }
203 
204  template<typename Consumer>
205  void
207  {
208  c_->doOutputHeader(init_message);
209 
210  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
211  struct stat istat;
212  stat(openIniFileName.c_str(), &istat);
213  //read back file to check integrity of what was written
214  off_t readInput=0;
215  uint32_t adlera=1,adlerb=0;
216  FILE *src = fopen(openIniFileName.c_str(),"r");
217  while (readInput<istat.st_size)
218  {
219  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
220  fread(outBuf_,toRead,1,src);
221  cms::Adler32((const char*)outBuf_,toRead,adlera,adlerb);
222  readInput+=toRead;
223  }
224  fclose(src);
225  //free output buffer needed only for the INI file
226  delete [] outBuf_;
227  outBuf_=nullptr;
228 
229  uint32_t adler32c = (adlerb << 16) | adlera;
230  if (adler32c != c_->get_adler32_ini()) {
231  throw cms::Exception("RecoEventOutputModuleForFU") << "Checksum mismatch of ini file -: " << openIniFileName
232  << " expected:" << c_->get_adler32_ini() << " obtained:" << adler32c;
233  }
234  else {
235  LogDebug("RecoEventOutputModuleForFU") << "Ini file checksum -: "<< streamLabel_ << " " << adler32c;
236  boost::filesystem::rename(openIniFileName,edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
237  }
238  }
239 
240  template<typename Consumer>
241  void
243  accepted_.value()++;
244  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
245  }
246 
247  template<typename Consumer>
248  void
252  Consumer::fillDescription(desc);
253  // Use addDefault here instead of add for 4 reasons:
254  // 1. Because EvFOutputModule_cfi.py is explicitly defined it does not need to be autogenerated
255  // The explicitly defined version overrides the autogenerated version of the cfi file.
256  // 2. That cfi file is not used anywhere in the release anyway
257  // 3. There are two plugin names used for the same template instantiation of this
258  // type, "ShmStreamConsumer" and "EvFOutputModule" and this causes name conflict
259  // problems for the cfi generation code which are avoided with addDefault.
260  // 4. At the present time, there is only one type of Consumer used to instantiate
261  // instances of this template, but if there were more than one type then this function
262  // would need to be specialized for each type unless the descriptions were the same
263  // and addDefault was used.
264  descriptions.addDefault(desc);
265  }
266 
267  template<typename Consumer>
269  {
270  //get stream transfer destination
273  }
274 
275 
276  template<typename Consumer>
278  {
279  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
282  c_->setOutputFile(openDatFilePath_.string());
283  filelist_ = openDatFilePath_.filename().string();
284  }
285 
286  template<typename Consumer>
288  {
289  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
290  long filesize=0;
291  fileAdler32_.value() = c_->get_adler32();
292  c_->closeOutputFile();
293  bool abortFlag = false;
295 
296  if (abortFlag) {
297  edm::LogInfo("RecoEventOutputModuleForFU") << "output suppressed";
298  return;
299  }
300 
301  if(processed_.value()!=0) {
302  //lock
303  struct stat istat;
304  stat(openDatFilePath_.string().c_str(), &istat);
305  filesize = istat.st_size;
306  boost::filesystem::rename(openDatFilePath_.string().c_str(), edm::Service<evf::EvFDaqDirector>()->getDatFilePath(ls.luminosityBlock(),streamLabel_));
307  } else {
308  filelist_ = "";
309  fileAdler32_.value()=-1;
310  }
311 
312  //remove file
313  remove(openDatFilePath_.string().c_str());
314  filesize_=filesize;
315 
316  jsonMonitor_->snap(ls.luminosityBlock());
317  const std::string outputJsonNameStream =
318  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(),streamLabel_);
319  jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.luminosityBlock());
320 
321  // reset monitoring params
322  accepted_.value() = 0;
323  filelist_ = "";
324  }
325 
326 } // end of namespace-edm
327 
328 #endif
#define LogDebug(id)
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
Definition: fillJson.h:27
static void fillDescription(ParameterSetDescription &desc)
void endLuminosityBlock(edm::LuminosityBlockForOutput const &) override
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
boost::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
static bool serialize(JsonSerializable *pObj, std::string &output)
void doOutputEvent(EventMsgBuilder const &msg) override
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
void addDefault(ParameterSetDescription const &psetDescription)
virtual void setName(std::string name)
boost::filesystem::path openDatChecksumFilePath_
void doOutputHeader(InitMsgBuilder const &init_message) override
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
StreamerOutputModuleBase(ParameterSet const &ps)
def ls(path, rec=False)
Definition: eostools.py:349
jsoncollector::DataPointDefinition outJsonDef_
tuple msg
Definition: mps_check.py:279
HLT enums.
void setDefaultGroup(std::string const &group)
void beginLuminosityBlock(edm::LuminosityBlockForOutput const &) override