CMS 3D CMS Logo

RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
8 
9 #include <sstream>
10 #include <iomanip>
11 #include <boost/filesystem.hpp>
12 #include <boost/algorithm/string.hpp>
13 #include <zlib.h>
14 
22 
23 namespace evf {
24  template <typename Consumer>
34  public:
36  ~RecoEventOutputModuleForFU() override;
37  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
38 
39  private:
40  void initRun();
41  void start() override;
42  void stop() override;
43  void doOutputHeader(InitMsgBuilder const& init_message) override;
44  void doOutputEvent(EventMsgBuilder const& msg) override;
45  //virtual void beginRun(edm::RunForOutput const&);
46  void beginJob() override;
49 
50  private:
51  std::unique_ptr<Consumer> c_;
66  std::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
69  unsigned char* outBuf_ = nullptr;
70  bool readAdler32Check_ = false;
71  }; //end-of-class-def
72 
73  template <typename Consumer>
77  c_(new Consumer(ps)),
78  streamLabel_(ps.getParameter<std::string>("@module_label")),
79  processed_(0),
80  accepted_(0),
81  errorEvents_(0),
82  retCodeMask_(0),
83  filelist_(),
84  filesize_(0),
85  inputFiles_(),
86  fileAdler32_(1),
88  mergeType_(),
89  hltErrorEvents_(0),
90  outBuf_(new unsigned char[1024 * 1024]) {
91  //replace hltOutoputA with stream if the HLT menu uses this convention
92  std::string testPrefix = "hltOutput";
93  if (streamLabel_.find(testPrefix) == 0)
94  streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
95 
96  if (streamLabel_.find("_") != std::string::npos) {
97  throw cms::Exception("RecoEventOutputModuleForFU") << "Underscore character is reserved can not be used for "
98  "stream names in FFF, but was detected in stream name -: "
99  << streamLabel_;
100  }
101 
102  std::string streamLabelLow = streamLabel_;
103  boost::algorithm::to_lower(streamLabelLow);
104  auto streampos = streamLabelLow.rfind("stream");
105  if (streampos != 0 && streampos != std::string::npos)
106  throw cms::Exception("RecoEventOutputModuleForFU")
107  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
108  "names in FFF based HLT, but was detected in stream name";
109 
111  }
112 
113  template <typename Consumer>
115  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
116  readAdler32Check_ = edm::Service<evf::EvFDaqDirector>()->outputAdler32Recheck();
117  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
118  // create open dir if not already there
119  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
120 
121  processed_.setName("Processed");
122  accepted_.setName("Accepted");
123  errorEvents_.setName("ErrorEvents");
124  retCodeMask_.setName("ReturnCodeMask");
125  filelist_.setName("Filelist");
126  filesize_.setName("Filesize");
127  inputFiles_.setName("InputFiles");
128  fileAdler32_.setName("FileAdler32");
129  transferDestination_.setName("TransferDestination");
130  mergeType_.setName("MergeType");
131  hltErrorEvents_.setName("HLTErrorEvents");
132 
142  outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
145  std::stringstream tmpss, ss;
146  tmpss << baseRunDir << "/open/"
147  << "output_" << getpid() << ".jsd";
148  ss << baseRunDir << "/"
149  << "output_" << getpid() << ".jsd";
150  std::string outTmpJsonDefName = tmpss.str();
151  std::string outJsonDefName = ss.str();
152 
153  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
154  struct stat fstat;
155  if (stat(outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
156  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
159  jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
160  boost::filesystem::rename(outTmpJsonDefName, outJsonDefName);
161  }
162  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
163 
165  jsonMonitor_->setDefPath(outJsonDefName);
166  jsonMonitor_->registerGlobalMonitorable(&processed_, false);
167  jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
168  jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
169  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
170  jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
171  jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
172  jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
173  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
174  jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
175  jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
176  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
177  jsonMonitor_->commit(nullptr);
178  }
179 
180  template <typename Consumer>
182 
183  template <typename Consumer>
185  initRun();
186  const std::string openInitFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
187  edm::LogInfo("RecoEventOutputModuleForFU")
188  << "start() method, initializing streams. init stream -: " << openInitFileName;
189  c_->setInitMessageFile(openInitFileName);
190  c_->start();
191  }
192 
193  template <typename Consumer>
195  c_->stop();
196  }
197 
198  template <typename Consumer>
200  c_->doOutputHeader(init_message);
201 
202  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
203  struct stat istat;
204  stat(openIniFileName.c_str(), &istat);
205  //read back file to check integrity of what was written
206  off_t readInput = 0;
207  uint32_t adlera = 1, adlerb = 0;
208  FILE* src = fopen(openIniFileName.c_str(), "r");
209  while (readInput < istat.st_size) {
210  size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
211  fread(outBuf_, toRead, 1, src);
212  cms::Adler32((const char*)outBuf_, toRead, adlera, adlerb);
213  readInput += toRead;
214  }
215  fclose(src);
216  //free output buffer needed only for the INI file
217  delete[] outBuf_;
218  outBuf_ = nullptr;
219 
220  uint32_t adler32c = (adlerb << 16) | adlera;
221  if (adler32c != c_->get_adler32_ini()) {
222  throw cms::Exception("RecoEventOutputModuleForFU")
223  << "Checksum mismatch of ini file -: " << openIniFileName << " expected:" << c_->get_adler32_ini()
224  << " obtained:" << adler32c;
225  } else {
226  LogDebug("RecoEventOutputModuleForFU") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
227  boost::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
228  }
229  }
230 
231  template <typename Consumer>
233  accepted_.value()++;
234  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
235  }
236 
237  template <typename Consumer>
241  Consumer::fillDescription(desc);
242  // Use addDefault here instead of add for 4 reasons:
243  // 1. Because EvFOutputModule_cfi.py is explicitly defined it does not need to be autogenerated
244  // The explicitly defined version overrides the autogenerated version of the cfi file.
245  // 2. That cfi file is not used anywhere in the release anyway
246  // 3. There are two plugin names used for the same template instantiation of this
247  // type, "ShmStreamConsumer" and "EvFOutputModule" and this causes name conflict
248  // problems for the cfi generation code which are avoided with addDefault.
249  // 4. At the present time, there is only one type of Consumer used to instantiate
250  // instances of this template, but if there were more than one type then this function
251  // would need to be specialized for each type unless the descriptions were the same
252  // and addDefault was used.
253  descriptions.addDefault(desc);
254  }
255 
256  template <typename Consumer>
258  //get stream transfer destination
261  }
262 
263  template <typename Consumer>
265  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
269  c_->setOutputFile(openDatFilePath_.string());
270  filelist_ = openDatFilePath_.filename().string();
271  }
272 
273  template <typename Consumer>
275  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
276  long filesize = 0;
277  fileAdler32_.value() = c_->get_adler32();
278  c_->closeOutputFile();
279  bool abortFlag = false;
281 
282  if (abortFlag) {
283  edm::LogInfo("RecoEventOutputModuleForFU") << "output suppressed";
284  return;
285  }
286 
287  if (processed_.value() != 0) {
288  //lock
289  struct stat istat;
290  stat(openDatFilePath_.string().c_str(), &istat);
291  filesize = istat.st_size;
292  boost::filesystem::rename(
293  openDatFilePath_.string().c_str(),
295  } else {
296  filelist_ = "";
297  fileAdler32_.value() = -1;
298  }
299 
300  //remove file
301  remove(openDatFilePath_.string().c_str());
302  filesize_ = filesize;
303 
304  jsonMonitor_->snap(ls.luminosityBlock());
305  const std::string outputJsonNameStream =
306  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(), streamLabel_);
307  jsonMonitor_->outputFullJSON(outputJsonNameStream, ls.luminosityBlock());
308 
309  // reset monitoring params
310  accepted_.value() = 0;
311  filelist_ = "";
312  }
313 
314 } // namespace evf
315 
316 #endif
#define LogDebug(id)
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
Definition: fillJson.h:27
static void fillDescription(ParameterSetDescription &desc)
void endLuminosityBlock(edm::LuminosityBlockForOutput const &) override
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
static bool serialize(JsonSerializable *pObj, std::string &output)
void doOutputEvent(EventMsgBuilder const &msg) override
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
void addDefault(ParameterSetDescription const &psetDescription)
virtual void setName(std::string name)
boost::filesystem::path openDatChecksumFilePath_
void doOutputHeader(InitMsgBuilder const &init_message) override
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
StreamerOutputModuleBase(ParameterSet const &ps)
def ls(path, rec=False)
Definition: eostools.py:349
jsoncollector::DataPointDefinition outJsonDef_
tuple msg
Definition: mps_check.py:285
HLT enums.
std::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
void setDefaultGroup(std::string const &group)
void beginLuminosityBlock(edm::LuminosityBlockForOutput const &) override