1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
11 #include <boost/filesystem.hpp>
12 #include <boost/algorithm/string.hpp>
24 template <
typename Consumer>
41 void start()
override;
51 std::unique_ptr<Consumer>
c_;
73 template <
typename Consumer>
76 edm::StreamerOutputModuleBase(ps),
78 streamLabel_(ps.getParameter<
std::
string>(
"@module_label")),
87 transferDestination_(),
90 outBuf_(new unsigned char[1024 * 1024]) {
97 throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Underscore character is reserved can not be used for "
98 "stream names in FFF, but was detected in stream name -: "
103 boost::algorithm::to_lower(streamLabelLow);
104 auto streampos = streamLabelLow.rfind(
"stream");
105 if (streampos != 0 && streampos != std::string::npos)
107 <<
"stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
108 "names in FFF based HLT, but was detected in stream name";
113 template <
typename Consumer>
117 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing .dat files to -: " << baseRunDir;
121 processed_.setName(
"Processed");
122 accepted_.setName(
"Accepted");
123 errorEvents_.setName(
"ErrorEvents");
124 retCodeMask_.setName(
"ReturnCodeMask");
125 filelist_.setName(
"Filelist");
126 filesize_.setName(
"Filesize");
127 inputFiles_.setName(
"InputFiles");
128 fileAdler32_.setName(
"FileAdler32");
129 transferDestination_.setName(
"TransferDestination");
130 mergeType_.setName(
"MergeType");
131 hltErrorEvents_.setName(
"HLTErrorEvents");
133 outJsonDef_.setDefaultGroup(
"data");
145 std::stringstream tmpss,
ss;
146 tmpss << baseRunDir <<
"/open/"
147 <<
"output_" << getpid() <<
".jsd";
148 ss << baseRunDir <<
"/"
149 <<
"output_" << getpid() <<
".jsd";
155 if (
stat(outJsonDefName.c_str(), &fstat) != 0) {
156 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing output definition file -: " << outJsonDefName;
160 boost::filesystem::rename(outTmpJsonDefName, outJsonDefName);
165 jsonMonitor_->setDefPath(outJsonDefName);
166 jsonMonitor_->registerGlobalMonitorable(&processed_,
false);
167 jsonMonitor_->registerGlobalMonitorable(&accepted_,
false);
168 jsonMonitor_->registerGlobalMonitorable(&errorEvents_,
false);
169 jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,
false);
170 jsonMonitor_->registerGlobalMonitorable(&filelist_,
false);
171 jsonMonitor_->registerGlobalMonitorable(&filesize_,
false);
172 jsonMonitor_->registerGlobalMonitorable(&inputFiles_,
false);
173 jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,
false);
174 jsonMonitor_->registerGlobalMonitorable(&transferDestination_,
false);
175 jsonMonitor_->registerGlobalMonitorable(&mergeType_,
false);
176 jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_,
false);
177 jsonMonitor_->commit(
nullptr);
180 template <
typename Consumer>
183 template <
typename Consumer>
188 <<
"start() method, initializing streams. init stream -: " << openInitFileName;
189 c_->setInitMessageFile(openInitFileName);
193 template <
typename Consumer>
198 template <
typename Consumer>
200 c_->doOutputHeader(init_message);
204 stat(openIniFileName.c_str(), &istat);
207 uint32_t adlera = 1, adlerb = 0;
208 FILE*
src = fopen(openIniFileName.c_str(),
"r");
209 while (readInput < istat.st_size) {
210 size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
211 fread(outBuf_, toRead, 1,
src);
212 cms::Adler32((
const char*)outBuf_, toRead, adlera, adlerb);
220 uint32_t adler32c = (adlerb << 16) | adlera;
221 if (adler32c != c_->get_adler32_ini()) {
223 <<
"Checksum mismatch of ini file -: " << openIniFileName <<
" expected:" << c_->get_adler32_ini()
224 <<
" obtained:" << adler32c;
226 LogDebug(
"RecoEventOutputModuleForFU") <<
"Ini file checksum -: " << streamLabel_ <<
" " << adler32c;
231 template <
typename Consumer>
234 c_->doOutputEvent(
msg);
237 template <
typename Consumer>
241 Consumer::fillDescription(desc);
256 template <
typename Consumer>
263 template <
typename Consumer>
267 openDatChecksumFilePath_ =
269 c_->setOutputFile(openDatFilePath_.string());
270 filelist_ = openDatFilePath_.filename().string();
273 template <
typename Consumer>
277 fileAdler32_.value() = c_->get_adler32();
278 c_->closeOutputFile();
279 bool abortFlag =
false;
280 processed_.value() = fms_->getEventsProcessedForLumi(
ls.luminosityBlock(), &abortFlag);
283 edm::LogInfo(
"RecoEventOutputModuleForFU") <<
"output suppressed";
287 if (processed_.value() != 0) {
290 stat(openDatFilePath_.string().c_str(), &istat);
291 filesize = istat.st_size;
292 boost::filesystem::rename(
293 openDatFilePath_.string().c_str(),
297 fileAdler32_.value() = -1;
301 remove(openDatFilePath_.string().c_str());
302 filesize_ = filesize;
304 jsonMonitor_->snap(
ls.luminosityBlock());
307 jsonMonitor_->outputFullJSON(outputJsonNameStream,
ls.luminosityBlock());
310 accepted_.value() = 0;