1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
11 #include <boost/filesystem.hpp>
12 #include <boost/algorithm/string.hpp>
24 template<
typename Consumer>
42 virtual void start()
override;
43 virtual void stop()
override;
52 std::auto_ptr<Consumer>
c_;
76 template<
typename Consumer>
79 edm::StreamerOutputModuleBase(ps),
81 stream_label_(ps.getParameter<std::
string>(
"@module_label")),
90 transferDestination_(),
93 outBuf_(new unsigned
char[1024*1024])
102 <<
"Underscore character is reserved can not be used for stream names in FFF, but was detected in stream name -: " <<
stream_label_;
106 boost::algorithm::to_lower(stream_label_lo);
107 auto streampos = stream_label_lo.rfind(
"stream");
108 if (streampos !=0 && streampos!=std::string::npos)
110 <<
"stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for names in FFF based HLT, but was detected in stream name";
115 template<
typename Consumer>
120 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing .dat files to -: " << baseRunDir;
124 processed_.setName(
"Processed");
125 accepted_.setName(
"Accepted");
126 errorEvents_.setName(
"ErrorEvents");
127 retCodeMask_.setName(
"ReturnCodeMask");
128 filelist_.setName(
"Filelist");
129 filesize_.setName(
"Filesize");
130 inputFiles_.setName(
"InputFiles");
131 fileAdler32_.setName(
"FileAdler32");
132 transferDestination_.setName(
"TransferDestination");
133 mergeType_.setName(
"MergeType");
134 hltErrorEvents_.setName(
"HLTErrorEvents");
136 outJsonDef_.setDefaultGroup(
"data");
148 std::stringstream tmpss,
ss;
149 tmpss << baseRunDir <<
"/open/" <<
"output_" << getpid() <<
".jsd";
150 ss << baseRunDir <<
"/" <<
"output_" << getpid() <<
".jsd";
156 if (stat (outJsonDefName.c_str(), &fstat) != 0) {
157 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing output definition file -: " << outJsonDefName;
161 boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
166 jsonMonitor_->setDefPath(outJsonDefName);
167 jsonMonitor_->registerGlobalMonitorable(&processed_,
false);
168 jsonMonitor_->registerGlobalMonitorable(&accepted_,
false);
169 jsonMonitor_->registerGlobalMonitorable(&errorEvents_,
false);
170 jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,
false);
171 jsonMonitor_->registerGlobalMonitorable(&filelist_,
false);
172 jsonMonitor_->registerGlobalMonitorable(&filesize_,
false);
173 jsonMonitor_->registerGlobalMonitorable(&inputFiles_,
false);
174 jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,
false);
175 jsonMonitor_->registerGlobalMonitorable(&transferDestination_,
false);
176 jsonMonitor_->registerGlobalMonitorable(&mergeType_,
false);
177 jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_,
false);
178 jsonMonitor_->commit(
nullptr);
182 template<
typename Consumer>
185 template<
typename Consumer>
191 edm::LogInfo(
"RecoEventOutputModuleForFU") <<
"start() method, initializing streams. init stream -: "
193 c_->setInitMessageFile(openInitFileName);
198 template<
typename Consumer>
205 template<
typename Consumer>
209 c_->doOutputHeader(init_message);
213 stat(openIniFileName.c_str(), &istat);
216 uint32_t adlera=1,adlerb=0;
217 FILE *
src = fopen(openIniFileName.c_str(),
"r");
218 while (readInput<istat.st_size)
220 size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
221 fread(outBuf_,toRead,1,src);
222 cms::Adler32((
const char*)outBuf_,toRead,adlera,adlerb);
231 uint32_t adler32c = (adlerb << 16) | adlera;
232 if (adler32c != c_->get_adler32_ini()) {
233 throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Checksum mismatch of ini file -: " << openIniFileName
234 <<
" expected:" << c_->get_adler32_ini() <<
" obtained:" << adler32c;
237 edm::LogWarning(
"RecoEventOutputModuleForFU") <<
"Ini file checksum -: "<< stream_label_ <<
" " << adler32c;
242 template<
typename Consumer>
246 c_->doOutputEvent(msg);
249 template<
typename Consumer>
254 Consumer::fillDescription(desc);
255 descriptions.
add(
"EvFOutputModule", desc);
258 template<
typename Consumer>
267 template<
typename Consumer>
273 c_->setOutputFile(openDatFilePath_.string());
274 filelist_ = openDatFilePath_.filename().string();
277 template<
typename Consumer>
282 fileAdler32_.value() = c_->get_adler32();
283 c_->closeOutputFile();
284 bool abortFlag =
false;
285 processed_.value() = fms_->getEventsProcessedForLumi(ls.
luminosityBlock(),&abortFlag);
288 edm::LogInfo(
"RecoEventOutputModuleForFU") <<
"output suppressed";
292 if(processed_.value()!=0) {
303 uint32_t mergedAdler32=1;
305 if (!stat(deschecksum.c_str(), &istat)) {
307 cf = fopen(deschecksum.c_str(),
"r");
308 if (!cf)
throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Unable to open checksum file -: " << deschecksum.c_str();
309 fscanf(cf,
"%u",&mergedAdler32);
312 else edm::LogWarning(
"RecoEventOutputModuleForFU") <<
"Checksum file size is empty -: "<< deschecksum.c_str();
315 FILE *
src = fopen(openDatFilePath_.string().c_str(),
"r");
317 stat(openDatFilePath_.string().c_str(), &istat);
321 while (readInput<istat.st_size) {
322 size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
323 fread(outBuf_,toRead,1,src);
324 fwrite(outBuf_,toRead,1,des);
325 if (readAdler32Check_)
326 cms::Adler32((
const char*)outBuf_,toRead,adlera,adlerb);
332 cf = fopen(deschecksum.c_str(),
"w");
333 if (!cf)
throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
336 mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);
338 fprintf(cf,
"%u",mergedAdler32);
344 if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {
346 throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Adler32 checksum mismatch after reading file -: "
347 << openDatFilePath_.string() <<
" in LS " << ls.
luminosityBlock() << std::endl;
351 stat(openDatFilePath_.string().c_str(), &istat);
352 filesize = istat.st_size;
358 remove(openDatFilePath_.string().c_str());
362 fileAdler32_.value()=-1;
366 remove(openDatFilePath_.string().c_str());
372 jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.
luminosityBlock());
375 accepted_.value() = 0;
virtual void start() override
static void fillDescription(ParameterSetDescription &desc)
jsoncollector::IntJ accepted_
static const std::string BINARYOR
jsoncollector::StringJ mergeType_
boost::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
static const std::string ADLER32
jsoncollector::IntJ filesize_
static const std::string SUM
static bool serialize(JsonSerializable *pObj, std::string &output)
jsoncollector::IntJ hltErrorEvents_
virtual ~RecoEventOutputModuleForFU()
virtual void doOutputEvent(EventMsgBuilder const &msg) override
virtual void endLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *) override
virtual void beginLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *) override
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
jsoncollector::IntJ fileAdler32_
jsoncollector::StringJ filelist_
static const std::string MERGE
evf::FastMonitoringService * fms_
boost::filesystem::path openDatChecksumFilePath_
jsoncollector::StringJ transferDestination_
virtual void doOutputHeader(InitMsgBuilder const &init_message) override
static void writeStringToFile(std::string const &filename, std::string &content)
static const std::string CAT
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
std::string stream_label_
jsoncollector::IntJ errorEvents_
jsoncollector::IntJ retCodeMask_
jsoncollector::DataPointDefinition outJsonDef_
std::auto_ptr< Consumer > c_
virtual void stop() override
virtual void beginJob() override
void add(std::string const &label, ParameterSetDescription const &psetDescription)
jsoncollector::IntJ processed_
jsoncollector::StringJ inputFiles_
boost::filesystem::path openDatFilePath_
static const std::string SAME