CMS 3D CMS Logo

RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
8 
9 #include <filesystem>
10 #include <iomanip>
11 #include <sstream>
12 
13 #include <zlib.h>
14 #include <boost/algorithm/string.hpp>
15 
23 
24 namespace evf {
25  template <typename Consumer>
35  public:
37  ~RecoEventOutputModuleForFU() override;
38  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
39 
40  private:
41  void initRun();
42  void start() override;
43  void stop() override;
44  void doOutputHeader(InitMsgBuilder const& init_message) override;
45  void doOutputEvent(EventMsgBuilder const& msg) override;
46  //virtual void beginRun(edm::RunForOutput const&);
47  void beginJob() override;
50 
51  private:
52  std::unique_ptr<Consumer> c_;
67  std::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
70  unsigned char* outBuf_ = nullptr;
71  bool readAdler32Check_ = false;
72  }; //end-of-class-def
73 
74  template <typename Consumer>
77  edm::StreamerOutputModuleBase(ps),
78  c_(new Consumer(ps)),
79  streamLabel_(ps.getParameter<std::string>("@module_label")),
80  processed_(0),
81  accepted_(0),
82  errorEvents_(0),
83  retCodeMask_(0),
84  filelist_(),
85  filesize_(0),
86  inputFiles_(),
87  fileAdler32_(1),
88  transferDestination_(),
89  mergeType_(),
90  hltErrorEvents_(0),
91  outBuf_(new unsigned char[1024 * 1024]) {
92  //replace hltOutoputA with stream if the HLT menu uses this convention
93  std::string testPrefix = "hltOutput";
94  if (streamLabel_.find(testPrefix) == 0)
95  streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
96 
97  if (streamLabel_.find("_") != std::string::npos) {
98  throw cms::Exception("RecoEventOutputModuleForFU") << "Underscore character is reserved can not be used for "
99  "stream names in FFF, but was detected in stream name -: "
100  << streamLabel_;
101  }
102 
103  std::string streamLabelLow = streamLabel_;
104  boost::algorithm::to_lower(streamLabelLow);
105  auto streampos = streamLabelLow.rfind("stream");
106  if (streampos != 0 && streampos != std::string::npos)
107  throw cms::Exception("RecoEventOutputModuleForFU")
108  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
109  "names in FFF based HLT, but was detected in stream name";
110 
112  }
113 
114  template <typename Consumer>
116  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
117  readAdler32Check_ = edm::Service<evf::EvFDaqDirector>()->outputAdler32Recheck();
118  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
119  // create open dir if not already there
120  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
121 
122  processed_.setName("Processed");
123  accepted_.setName("Accepted");
124  errorEvents_.setName("ErrorEvents");
125  retCodeMask_.setName("ReturnCodeMask");
126  filelist_.setName("Filelist");
127  filesize_.setName("Filesize");
128  inputFiles_.setName("InputFiles");
129  fileAdler32_.setName("FileAdler32");
130  transferDestination_.setName("TransferDestination");
131  mergeType_.setName("MergeType");
132  hltErrorEvents_.setName("HLTErrorEvents");
133 
134  outJsonDef_.setDefaultGroup("data");
135  outJsonDef_.addLegendItem("Processed", "integer", jsoncollector::DataPointDefinition::SUM);
136  outJsonDef_.addLegendItem("Accepted", "integer", jsoncollector::DataPointDefinition::SUM);
137  outJsonDef_.addLegendItem("ErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
138  outJsonDef_.addLegendItem("ReturnCodeMask", "integer", jsoncollector::DataPointDefinition::BINARYOR);
139  outJsonDef_.addLegendItem("Filelist", "string", jsoncollector::DataPointDefinition::MERGE);
140  outJsonDef_.addLegendItem("Filesize", "integer", jsoncollector::DataPointDefinition::SUM);
141  outJsonDef_.addLegendItem("InputFiles", "string", jsoncollector::DataPointDefinition::CAT);
142  outJsonDef_.addLegendItem("FileAdler32", "integer", jsoncollector::DataPointDefinition::ADLER32);
143  outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
144  outJsonDef_.addLegendItem("MergeType", "string", jsoncollector::DataPointDefinition::SAME);
145  outJsonDef_.addLegendItem("HLTErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
146  std::stringstream tmpss, ss;
147  tmpss << baseRunDir << "/open/"
148  << "output_" << getpid() << ".jsd";
149  ss << baseRunDir << "/"
150  << "output_" << getpid() << ".jsd";
151  std::string outTmpJsonDefName = tmpss.str();
152  std::string outJsonDefName = ss.str();
153 
154  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
155  struct stat fstat;
156  if (stat(outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
157  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
161  std::filesystem::rename(outTmpJsonDefName, outJsonDefName);
162  }
163  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
164 
165  jsonMonitor_.reset(new jsoncollector::FastMonitor(&outJsonDef_, true));
166  jsonMonitor_->setDefPath(outJsonDefName);
167  jsonMonitor_->registerGlobalMonitorable(&processed_, false);
168  jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
169  jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
170  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
171  jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
172  jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
173  jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
174  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
175  jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
176  jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
177  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
178  jsonMonitor_->commit(nullptr);
179  }
180 
181  template <typename Consumer>
183 
184  template <typename Consumer>
186  initRun();
187  const std::string openInitFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
188  edm::LogInfo("RecoEventOutputModuleForFU")
189  << "start() method, initializing streams. init stream -: " << openInitFileName;
190  c_->setInitMessageFile(openInitFileName);
191  c_->start();
192  }
193 
194  template <typename Consumer>
196  c_->stop();
197  }
198 
199  template <typename Consumer>
201  c_->doOutputHeader(init_message);
202 
203  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
204  struct stat istat;
205  stat(openIniFileName.c_str(), &istat);
206  //read back file to check integrity of what was written
207  off_t readInput = 0;
208  uint32_t adlera = 1, adlerb = 0;
209  FILE* src = fopen(openIniFileName.c_str(), "r");
210  while (readInput < istat.st_size) {
211  size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
212  fread(outBuf_, toRead, 1, src);
213  cms::Adler32((const char*)outBuf_, toRead, adlera, adlerb);
214  readInput += toRead;
215  }
216  fclose(src);
217  //free output buffer needed only for the INI file
218  delete[] outBuf_;
219  outBuf_ = nullptr;
220 
221  uint32_t adler32c = (adlerb << 16) | adlera;
222  if (adler32c != c_->get_adler32_ini()) {
223  throw cms::Exception("RecoEventOutputModuleForFU")
224  << "Checksum mismatch of ini file -: " << openIniFileName << " expected:" << c_->get_adler32_ini()
225  << " obtained:" << adler32c;
226  } else {
227  LogDebug("RecoEventOutputModuleForFU") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
228  std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
229  }
230  }
231 
232  template <typename Consumer>
234  accepted_.value()++;
235  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
236  }
237 
238  template <typename Consumer>
242  Consumer::fillDescription(desc);
243  // Use addDefault here instead of add for 4 reasons:
244  // 1. Because EvFOutputModule_cfi.py is explicitly defined it does not need to be autogenerated
245  // The explicitly defined version overrides the autogenerated version of the cfi file.
246  // 2. That cfi file is not used anywhere in the release anyway
247  // 3. There are two plugin names used for the same template instantiation of this
248  // type, "ShmStreamConsumer" and "EvFOutputModule" and this causes name conflict
249  // problems for the cfi generation code which are avoided with addDefault.
250  // 4. At the present time, there is only one type of Consumer used to instantiate
251  // instances of this template, but if there were more than one type then this function
252  // would need to be specialized for each type unless the descriptions were the same
253  // and addDefault was used.
254  descriptions.addDefault(desc);
255  }
256 
257  template <typename Consumer>
259  //get stream transfer destination
260  transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel_);
261  mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel_, evf::MergeTypeDAT);
262  }
263 
264  template <typename Consumer>
266  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
267  openDatFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(), streamLabel_);
268  openDatChecksumFilePath_ =
269  edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(), streamLabel_);
270  c_->setOutputFile(openDatFilePath_.string());
271  filelist_ = openDatFilePath_.filename().string();
272  }
273 
274  template <typename Consumer>
276  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
277  long filesize = 0;
278  fileAdler32_.value() = c_->get_adler32();
279  c_->closeOutputFile();
280  bool abortFlag = false;
281  processed_.value() = fms_->getEventsProcessedForLumi(ls.luminosityBlock(), &abortFlag);
282 
283  if (abortFlag) {
284  edm::LogInfo("RecoEventOutputModuleForFU") << "output suppressed";
285  return;
286  }
287 
288  if (processed_.value() != 0) {
289  //lock
290  struct stat istat;
291  stat(openDatFilePath_.string().c_str(), &istat);
292  filesize = istat.st_size;
293  std::filesystem::rename(openDatFilePath_.string().c_str(),
294  edm::Service<evf::EvFDaqDirector>()->getDatFilePath(ls.luminosityBlock(), streamLabel_));
295  } else {
296  filelist_ = "";
297  fileAdler32_.value() = -1;
298  }
299 
300  //remove file
301  remove(openDatFilePath_.string().c_str());
302  filesize_ = filesize;
303 
304  jsonMonitor_->snap(ls.luminosityBlock());
305  const std::string outputJsonNameStream =
306  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(), streamLabel_);
307  jsonMonitor_->outputFullJSON(outputJsonNameStream, ls.luminosityBlock());
308 
309  // reset monitoring params
310  accepted_.value() = 0;
311  filelist_ = "";
312  }
313 
314 } // namespace evf
315 
316 #endif
ConfigurationDescriptions.h
InitMsgBuilder
Definition: InitMsgBuilder.h:9
eostools.ls
def ls(path, rec=False)
Definition: eostools.py:349
evf::RecoEventOutputModuleForFU::doOutputHeader
void doOutputHeader(InitMsgBuilder const &init_message) override
Definition: RecoEventOutputModuleForFU.h:200
evf::RecoEventOutputModuleForFU::fms_
evf::FastMonitoringService * fms_
Definition: RecoEventOutputModuleForFU.h:68
SiPixelPI::one
Definition: SiPixelPayloadInspectorHelper.h:39
evf::RecoEventOutputModuleForFU::readAdler32Check_
bool readAdler32Check_
Definition: RecoEventOutputModuleForFU.h:71
edm::StreamerOutputModuleBase::fillDescription
static void fillDescription(ParameterSetDescription &desc)
Definition: StreamerOutputModuleBase.cc:68
evf::RecoEventOutputModuleForFU::openDatFilePath_
std::filesystem::path openDatFilePath_
Definition: RecoEventOutputModuleForFU.h:54
evf::RecoEventOutputModuleForFU::inputFiles_
jsoncollector::StringJ inputFiles_
Definition: RecoEventOutputModuleForFU.h:62
evf::RecoEventOutputModuleForFU::filelist_
jsoncollector::StringJ filelist_
Definition: RecoEventOutputModuleForFU.h:60
evf::RecoEventOutputModuleForFU::streamLabel_
std::string streamLabel_
Definition: RecoEventOutputModuleForFU.h:53
edm
HLT enums.
Definition: AlignableModifier.h:19
evf::RecoEventOutputModuleForFU
Definition: RecoEventOutputModuleForFU.h:26
evf::RecoEventOutputModuleForFU::errorEvents_
jsoncollector::IntJ errorEvents_
Definition: RecoEventOutputModuleForFU.h:58
evf::RecoEventOutputModuleForFU::retCodeMask_
jsoncollector::IntJ retCodeMask_
Definition: RecoEventOutputModuleForFU.h:59
edm::StreamerOutputModuleBase
Definition: StreamerOutputModuleBase.h:20
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
evf::RecoEventOutputModuleForFU::beginJob
void beginJob() override
Definition: RecoEventOutputModuleForFU.h:258
evf::RecoEventOutputModuleForFU::hltErrorEvents_
jsoncollector::IntJ hltErrorEvents_
Definition: RecoEventOutputModuleForFU.h:66
JsonMonitorable.h
evf::RecoEventOutputModuleForFU::filesize_
jsoncollector::IntJ filesize_
Definition: RecoEventOutputModuleForFU.h:61
mps_check.msg
tuple msg
Definition: mps_check.py:285
evf::RecoEventOutputModuleForFU::~RecoEventOutputModuleForFU
~RecoEventOutputModuleForFU() override
Definition: RecoEventOutputModuleForFU.h:182
evf::RecoEventOutputModuleForFU::beginLuminosityBlock
void beginLuminosityBlock(edm::LuminosityBlockForOutput const &) override
Definition: RecoEventOutputModuleForFU.h:265
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
evf::RecoEventOutputModuleForFU::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: RecoEventOutputModuleForFU.h:239
evf::RecoEventOutputModuleForFU::processed_
jsoncollector::IntJ processed_
Definition: RecoEventOutputModuleForFU.h:56
jsoncollector::DataPointDefinition::MERGE
static const std::string MERGE
Definition: DataPointDefinition.h:69
FileIO.h
to_lower
std::string to_lower(const std::string &s)
Definition: CredentialStore.cc:157
EventMsgBuilder
Definition: EventMsgBuilder.h:8
jsoncollector::DataPointDefinition::SUM
static const std::string SUM
Definition: DataPointDefinition.h:64
edm::LuminosityBlockForOutput
Definition: LuminosityBlockForOutput.h:40
evf::RecoEventOutputModuleForFU::c_
std::unique_ptr< Consumer > c_
Definition: RecoEventOutputModuleForFU.h:52
contentValuesCheck.ss
ss
Definition: contentValuesCheck.py:33
evf::FastMonitoringService
Definition: FastMonitoringService.h:71
hgcalPlots.stat
stat
Definition: hgcalPlots.py:1119
cms::Adler32
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
Definition: Adler32Calculator.cc:10
Service.h
jsoncollector::DataPointDefinition::CAT
static const std::string CAT
Definition: DataPointDefinition.h:68
evf::MergeTypeDAT
Definition: EvFDaqDirector.h:58
evf::RecoEventOutputModuleForFU::openDatChecksumFilePath_
std::filesystem::path openDatChecksumFilePath_
Definition: RecoEventOutputModuleForFU.h:55
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
OutputModuleBase
jsoncollector::DataPointDefinition::BINARYOR
static const std::string BINARYOR
Definition: DataPointDefinition.h:70
Skims_PA_cff.content
content
Definition: Skims_PA_cff.py:19
LogDebug
#define LogDebug(id)
Definition: MessageLogger.h:223
edm::ParameterSet
Definition: ParameterSet.h:47
TrackRefitter_38T_cff.src
src
Definition: TrackRefitter_38T_cff.py:24
evf::RecoEventOutputModuleForFU::outJsonDef_
jsoncollector::DataPointDefinition outJsonDef_
Definition: RecoEventOutputModuleForFU.h:69
StreamerOutputModuleBase.h
evf::RecoEventOutputModuleForFU::stop
void stop() override
Definition: RecoEventOutputModuleForFU.h:195
edm::Service
Definition: Service.h:30
jsoncollector::DataPointDefinition::SAME
static const std::string SAME
Definition: DataPointDefinition.h:66
EvFDaqDirector.h
FastMonitoringService.h
jsoncollector::FileIO::writeStringToFile
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
JSONSerializer.h
evf::RecoEventOutputModuleForFU::transferDestination_
jsoncollector::StringJ transferDestination_
Definition: RecoEventOutputModuleForFU.h:64
evf::RecoEventOutputModuleForFU::RecoEventOutputModuleForFU
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
Definition: RecoEventOutputModuleForFU.h:75
jsoncollector::FastMonitor
Definition: FastMonitor.h:19
evf::RecoEventOutputModuleForFU::fileAdler32_
jsoncollector::IntJ fileAdler32_
Definition: RecoEventOutputModuleForFU.h:63
jsoncollector::DataPointDefinition
Definition: DataPointDefinition.h:20
evf::RecoEventOutputModuleForFU::doOutputEvent
void doOutputEvent(EventMsgBuilder const &msg) override
Definition: RecoEventOutputModuleForFU.h:233
evf::RecoEventOutputModuleForFU::jsonMonitor_
std::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
Definition: RecoEventOutputModuleForFU.h:67
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
std
Definition: JetResolutionObject.h:76
jsoncollector::DataPointDefinition::ADLER32
static const std::string ADLER32
Definition: DataPointDefinition.h:71
Exception
Definition: hltDiff.cc:246
MatrixUtil.remove
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:219
evf
Definition: fillJson.h:27
Adler32Calculator.h
evf::RecoEventOutputModuleForFU::mergeType_
jsoncollector::StringJ mergeType_
Definition: RecoEventOutputModuleForFU.h:65
jsoncollector::IntJ
Definition: JsonMonitorable.h:66
jsoncollector::JSONSerializer::serialize
static bool serialize(JsonSerializable *pObj, std::string &output)
Definition: JSONSerializer.cc:14
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
LuminosityBlockForOutput.h
evf::RecoEventOutputModuleForFU::endLuminosityBlock
void endLuminosityBlock(edm::LuminosityBlockForOutput const &) override
Definition: RecoEventOutputModuleForFU.h:275
evf::RecoEventOutputModuleForFU::initRun
void initRun()
Definition: RecoEventOutputModuleForFU.h:115
edm::ConfigurationDescriptions::addDefault
void addDefault(ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:99
evf::RecoEventOutputModuleForFU::start
void start() override
Definition: RecoEventOutputModuleForFU.h:185
jsoncollector::StringJ
Definition: JsonMonitorable.h:140
FastMonitor.h
evf::RecoEventOutputModuleForFU::outBuf_
unsigned char * outBuf_
Definition: RecoEventOutputModuleForFU.h:70
evf::RecoEventOutputModuleForFU::accepted_
jsoncollector::IntJ accepted_
Definition: RecoEventOutputModuleForFU.h:57