1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
11 #include <boost/filesystem.hpp>
12 #include <boost/algorithm/string.hpp>
24 template<
typename Consumer>
41 virtual void start()
const;
42 virtual void stop()
const;
51 std::auto_ptr<Consumer>
c_;
73 template<
typename Consumer>
75 edm::StreamerOutputModuleBase(ps),
77 stream_label_(ps.getParameter<std::
string>(
"@module_label")),
86 transferDestination_(),
87 outBuf_(new unsigned
char[1024*1024])
91 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing .dat files to -: " << baseRunDir;
102 <<
"Underscore character is reserved can not be used for stream names in FFF, but was detected in stream name -: " <<
stream_label_;
107 boost::algorithm::to_lower(stream_label_lo);
108 auto streampos = stream_label_lo.rfind(
"stream");
109 if (streampos !=0 && streampos!=std::string::npos)
111 <<
"stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for names in FFF based HLT, but was detected in stream name";
135 std::stringstream tmpss,
ss;
136 tmpss << baseRunDir <<
"/open/" <<
"output_" << getpid() <<
".jsd";
137 ss << baseRunDir <<
"/" <<
"output_" << getpid() <<
".jsd";
143 if (stat (outJsonDefName.c_str(), &fstat) != 0) {
144 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing output definition file -: " << outJsonDefName;
147 FileIO::writeStringToFile(outTmpJsonDefName, content);
148 boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
167 template<
typename Consumer>
170 template<
typename Consumer>
175 edm::LogInfo(
"RecoEventOutputModuleForFU") <<
"start() method, initializing streams. init stream -: "
177 c_->setInitMessageFile(openInitFileName);
182 template<
typename Consumer>
189 template<
typename Consumer>
193 c_->doOutputHeader(init_message);
197 stat(openIniFileName.c_str(), &istat);
200 uint32_t adlera=1,adlerb=0;
201 FILE *
src = fopen(openIniFileName.c_str(),
"r");
202 while (readInput<istat.st_size)
204 size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
205 fread(outBuf_,toRead,1,src);
206 cms::Adler32((
const char*)outBuf_,toRead,adlera,adlerb);
210 uint32_t adler32c = (adlerb << 16) | adlera;
211 if (adler32c != c_->get_adler32_ini()) {
212 throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Checksum mismatch of ini file -: " << openIniFileName
213 <<
" expected:" << c_->get_adler32_ini() <<
" obtained:" << adler32c;
216 edm::LogWarning(
"RecoEventOutputModuleForFU") <<
"Ini file checksum -: "<< stream_label_ <<
" " << adler32c;
221 template<
typename Consumer>
225 c_->doOutputEvent(msg);
228 template<
typename Consumer>
233 Consumer::fillDescription(desc);
234 descriptions.
add(
"streamerOutput", desc);
237 template<
typename Consumer>
245 template<
typename Consumer>
251 c_->setOutputFile(openDatFilePath_.string());
252 filelist_ = openDatFilePath_.filename().string();
255 template<
typename Consumer>
260 fileAdler32_.value() = c_->get_adler32();
261 c_->closeOutputFile();
262 bool abortFlag =
false;
263 processed_.value() = fms_->getEventsProcessedForLumi(ls.
luminosityBlock(),&abortFlag);
266 edm::LogInfo(
"RecoEventOutputModuleForFU") <<
"output suppressed";
270 if(processed_.value()!=0) {
279 uint32_t mergedAdler32=1;
281 if (!stat(deschecksum.c_str(), &istat)) {
283 cf = fopen(deschecksum.c_str(),
"r");
284 if (!cf)
throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Unable to open checksum file -: " << deschecksum.c_str();
285 fscanf(cf,
"%u",&mergedAdler32);
288 else edm::LogWarning(
"RecoEventOutputModuleForFU") <<
"Checksum file size is empty -: "<< deschecksum.c_str();
291 FILE *
src = fopen(openDatFilePath_.string().c_str(),
"r");
293 stat(openDatFilePath_.string().c_str(), &istat);
297 while (readInput<istat.st_size) {
298 size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
299 fread(outBuf_,toRead,1,src);
300 fwrite(outBuf_,toRead,1,des);
301 if (readAdler32Check_)
302 cms::Adler32((
const char*)outBuf_,toRead,adlera,adlerb);
315 cf = fopen(deschecksum.c_str(),
"w");
316 if (!cf)
throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
319 mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);
321 fprintf(cf,
"%u",mergedAdler32);
327 if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {
329 throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Adler32 checksum mismatch after reading file -: "
330 << openDatFilePath_.string() <<
" in LS " << ls.
luminosityBlock() << std::endl;
338 fileAdler32_.value()=-1;
342 remove(openDatFilePath_.string().c_str());
348 jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.
luminosityBlock());
351 accepted_.value() = 0;
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
virtual void stop() const
static void fillDescription(ParameterSetDescription &desc)
virtual void doOutputEvent(EventMsgBuilder const &msg) const
virtual void doOutputHeader(InitMsgBuilder const &init_message) const
virtual void beginLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)
virtual ~RecoEventOutputModuleForFU()
StringJ transferDestination_
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
tuple path
else: Piece not in the list, fine.
boost::shared_ptr< FastMonitor > jsonMonitor_
virtual void setName(std::string name)
evf::FastMonitoringService * fms_
boost::filesystem::path openDatChecksumFilePath_
DataPointDefinition outJsonDef_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
std::string stream_label_
std::auto_ptr< Consumer > c_
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::pair< Binary, Binary > serialize(const T &payload, bool packingOnly=false)
virtual void start() const
void setDefaultGroup(std::string const &group)
boost::filesystem::path openDatFilePath_
virtual void endLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)