1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
14 #include <boost/algorithm/string.hpp>
25 template <
typename Consumer>
42 void start()
override;
52 std::unique_ptr<Consumer>
c_;
74 template <
typename Consumer>
77 edm::StreamerOutputModuleBase(ps),
79 streamLabel_(ps.getParameter<std::
string>(
"@module_label")),
88 transferDestination_(),
91 outBuf_(new unsigned char[1024 * 1024]) {
98 throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Underscore character is reserved can not be used for "
99 "stream names in FFF, but was detected in stream name -: "
105 auto streampos = streamLabelLow.rfind(
"stream");
106 if (streampos != 0 && streampos != std::string::npos)
108 <<
"stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
109 "names in FFF based HLT, but was detected in stream name";
114 template <
typename Consumer>
118 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing .dat files to -: " << baseRunDir;
122 processed_.setName(
"Processed");
123 accepted_.setName(
"Accepted");
124 errorEvents_.setName(
"ErrorEvents");
125 retCodeMask_.setName(
"ReturnCodeMask");
126 filelist_.setName(
"Filelist");
127 filesize_.setName(
"Filesize");
128 inputFiles_.setName(
"InputFiles");
129 fileAdler32_.setName(
"FileAdler32");
130 transferDestination_.setName(
"TransferDestination");
131 mergeType_.setName(
"MergeType");
132 hltErrorEvents_.setName(
"HLTErrorEvents");
134 outJsonDef_.setDefaultGroup(
"data");
146 std::stringstream tmpss,
ss;
147 tmpss << baseRunDir <<
"/open/"
148 <<
"output_" << getpid() <<
".jsd";
149 ss << baseRunDir <<
"/"
150 <<
"output_" << getpid() <<
".jsd";
156 if (
stat(outJsonDefName.c_str(), &fstat) != 0) {
157 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing output definition file -: " << outJsonDefName;
161 std::filesystem::rename(outTmpJsonDefName, outJsonDefName);
166 jsonMonitor_->setDefPath(outJsonDefName);
167 jsonMonitor_->registerGlobalMonitorable(&processed_,
false);
168 jsonMonitor_->registerGlobalMonitorable(&accepted_,
false);
169 jsonMonitor_->registerGlobalMonitorable(&errorEvents_,
false);
170 jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,
false);
171 jsonMonitor_->registerGlobalMonitorable(&filelist_,
false);
172 jsonMonitor_->registerGlobalMonitorable(&filesize_,
false);
173 jsonMonitor_->registerGlobalMonitorable(&inputFiles_,
false);
174 jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,
false);
175 jsonMonitor_->registerGlobalMonitorable(&transferDestination_,
false);
176 jsonMonitor_->registerGlobalMonitorable(&mergeType_,
false);
177 jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_,
false);
178 jsonMonitor_->commit(
nullptr);
181 template <
typename Consumer>
184 template <
typename Consumer>
189 <<
"start() method, initializing streams. init stream -: " << openInitFileName;
190 c_->setInitMessageFile(openInitFileName);
194 template <
typename Consumer>
199 template <
typename Consumer>
201 c_->doOutputHeader(init_message);
205 stat(openIniFileName.c_str(), &istat);
208 uint32_t adlera = 1, adlerb = 0;
209 FILE*
src = fopen(openIniFileName.c_str(),
"r");
210 while (readInput < istat.st_size) {
211 size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
212 fread(outBuf_, toRead, 1, src);
213 cms::Adler32((
const char*)outBuf_, toRead, adlera, adlerb);
221 uint32_t adler32c = (adlerb << 16) | adlera;
222 if (adler32c != c_->get_adler32_ini()) {
224 <<
"Checksum mismatch of ini file -: " << openIniFileName <<
" expected:" << c_->get_adler32_ini()
225 <<
" obtained:" << adler32c;
227 LogDebug(
"RecoEventOutputModuleForFU") <<
"Ini file checksum -: " << streamLabel_ <<
" " << adler32c;
232 template <
typename Consumer>
235 c_->doOutputEvent(msg);
238 template <
typename Consumer>
242 Consumer::fillDescription(desc);
257 template <
typename Consumer>
264 template <
typename Consumer>
268 openDatChecksumFilePath_ =
270 c_->setOutputFile(openDatFilePath_.string());
271 filelist_ = openDatFilePath_.filename().string();
274 template <
typename Consumer>
278 fileAdler32_.value() = c_->get_adler32();
279 c_->closeOutputFile();
280 bool abortFlag =
false;
281 processed_.value() = fms_->getEventsProcessedForLumi(ls.
luminosityBlock(), &abortFlag);
284 edm::LogInfo(
"RecoEventOutputModuleForFU") <<
"output suppressed";
288 if (processed_.value() != 0) {
291 stat(openDatFilePath_.string().c_str(), &istat);
292 filesize = istat.st_size;
293 std::filesystem::rename(openDatFilePath_.string().c_str(),
297 fileAdler32_.value() = -1;
301 remove(openDatFilePath_.string().c_str());
302 filesize_ = filesize;
307 jsonMonitor_->outputFullJSON(outputJsonNameStream, ls.
luminosityBlock());
310 accepted_.value() = 0;
static void fillDescription(ParameterSetDescription &desc)
void endLuminosityBlock(edm::LuminosityBlockForOutput const &) override
jsoncollector::IntJ accepted_
static const std::string BINARYOR
jsoncollector::StringJ mergeType_
static const std::string ADLER32
jsoncollector::IntJ filesize_
static const std::string SUM
static bool serialize(JsonSerializable *pObj, std::string &output)
std::string to_lower(const std::string &s)
jsoncollector::IntJ hltErrorEvents_
void doOutputEvent(EventMsgBuilder const &msg) override
~RecoEventOutputModuleForFU() override
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
void addDefault(ParameterSetDescription const &psetDescription)
jsoncollector::IntJ fileAdler32_
jsoncollector::StringJ filelist_
static const std::string MERGE
evf::FastMonitoringService * fms_
jsoncollector::StringJ transferDestination_
void doOutputHeader(InitMsgBuilder const &init_message) override
static void writeStringToFile(std::string const &filename, std::string &content)
static const std::string CAT
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Log< level::Info, false > LogInfo
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
jsoncollector::IntJ errorEvents_
jsoncollector::IntJ retCodeMask_
jsoncollector::DataPointDefinition outJsonDef_
std::filesystem::path openDatChecksumFilePath_
jsoncollector::IntJ processed_
jsoncollector::StringJ inputFiles_
std::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
std::unique_ptr< Consumer > c_
std::filesystem::path openDatFilePath_
void beginLuminosityBlock(edm::LuminosityBlockForOutput const &) override
static const std::string SAME