1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
11 #include <boost/filesystem.hpp>
12 #include <boost/algorithm/string.hpp>
24 template<
typename Consumer>
41 virtual void start()
const;
42 virtual void stop()
const;
51 std::auto_ptr<Consumer>
c_;
73 template<
typename Consumer>
76 edm::StreamerOutputModuleBase(ps),
78 stream_label_(ps.getParameter<std::
string>(
"@module_label")),
87 transferDestination_(),
88 outBuf_(new unsigned
char[1024*1024])
92 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing .dat files to -: " << baseRunDir;
103 <<
"Underscore character is reserved can not be used for stream names in FFF, but was detected in stream name -: " <<
stream_label_;
108 boost::algorithm::to_lower(stream_label_lo);
109 auto streampos = stream_label_lo.rfind(
"stream");
110 if (streampos !=0 && streampos!=std::string::npos)
112 <<
"stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for names in FFF based HLT, but was detected in stream name";
136 std::stringstream tmpss,
ss;
137 tmpss << baseRunDir <<
"/open/" <<
"output_" << getpid() <<
".jsd";
138 ss << baseRunDir <<
"/" <<
"output_" << getpid() <<
".jsd";
144 if (stat (outJsonDefName.c_str(), &fstat) != 0) {
145 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing output definition file -: " << outJsonDefName;
148 FileIO::writeStringToFile(outTmpJsonDefName, content);
149 boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
168 template<
typename Consumer>
171 template<
typename Consumer>
176 edm::LogInfo(
"RecoEventOutputModuleForFU") <<
"start() method, initializing streams. init stream -: "
178 c_->setInitMessageFile(openInitFileName);
183 template<
typename Consumer>
190 template<
typename Consumer>
194 c_->doOutputHeader(init_message);
198 stat(openIniFileName.c_str(), &istat);
201 uint32_t adlera=1,adlerb=0;
202 FILE *
src = fopen(openIniFileName.c_str(),
"r");
203 while (readInput<istat.st_size)
205 size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
206 fread(outBuf_,toRead,1,src);
207 cms::Adler32((
const char*)outBuf_,toRead,adlera,adlerb);
211 uint32_t adler32c = (adlerb << 16) | adlera;
212 if (adler32c != c_->get_adler32_ini()) {
213 throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Checksum mismatch of ini file -: " << openIniFileName
214 <<
" expected:" << c_->get_adler32_ini() <<
" obtained:" << adler32c;
217 edm::LogWarning(
"RecoEventOutputModuleForFU") <<
"Ini file checksum -: "<< stream_label_ <<
" " << adler32c;
222 template<
typename Consumer>
226 c_->doOutputEvent(msg);
229 template<
typename Consumer>
234 Consumer::fillDescription(desc);
235 descriptions.
add(
"streamerOutput", desc);
238 template<
typename Consumer>
246 template<
typename Consumer>
252 c_->setOutputFile(openDatFilePath_.string());
253 filelist_ = openDatFilePath_.filename().string();
256 template<
typename Consumer>
261 fileAdler32_.value() = c_->get_adler32();
262 c_->closeOutputFile();
263 bool abortFlag =
false;
264 processed_.value() = fms_->getEventsProcessedForLumi(ls.
luminosityBlock(),&abortFlag);
267 edm::LogInfo(
"RecoEventOutputModuleForFU") <<
"output suppressed";
271 if(processed_.value()!=0) {
280 uint32_t mergedAdler32=1;
282 if (!stat(deschecksum.c_str(), &istat)) {
284 cf = fopen(deschecksum.c_str(),
"r");
285 if (!cf)
throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Unable to open checksum file -: " << deschecksum.c_str();
286 fscanf(cf,
"%u",&mergedAdler32);
289 else edm::LogWarning(
"RecoEventOutputModuleForFU") <<
"Checksum file size is empty -: "<< deschecksum.c_str();
292 FILE *
src = fopen(openDatFilePath_.string().c_str(),
"r");
294 stat(openDatFilePath_.string().c_str(), &istat);
298 while (readInput<istat.st_size) {
299 size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
300 fread(outBuf_,toRead,1,src);
301 fwrite(outBuf_,toRead,1,des);
302 if (readAdler32Check_)
303 cms::Adler32((
const char*)outBuf_,toRead,adlera,adlerb);
316 cf = fopen(deschecksum.c_str(),
"w");
317 if (!cf)
throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
320 mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);
322 fprintf(cf,
"%u",mergedAdler32);
328 if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {
330 throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Adler32 checksum mismatch after reading file -: "
331 << openDatFilePath_.string() <<
" in LS " << ls.
luminosityBlock() << std::endl;
339 fileAdler32_.value()=-1;
343 remove(openDatFilePath_.string().c_str());
349 jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.
luminosityBlock());
352 accepted_.value() = 0;
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
virtual void stop() const
static void fillDescription(ParameterSetDescription &desc)
virtual void doOutputEvent(EventMsgBuilder const &msg) const
virtual void doOutputHeader(InitMsgBuilder const &init_message) const
virtual ~RecoEventOutputModuleForFU()
StringJ transferDestination_
virtual void endLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *) override
virtual void beginLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *) override
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
tuple path
else: Piece not in the list, fine.
boost::shared_ptr< FastMonitor > jsonMonitor_
virtual void setName(std::string name)
evf::FastMonitoringService * fms_
boost::filesystem::path openDatChecksumFilePath_
DataPointDefinition outJsonDef_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
std::string stream_label_
std::auto_ptr< Consumer > c_
virtual void beginJob() override
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::pair< Binary, Binary > serialize(const T &payload, bool packingOnly=false)
virtual void start() const
void setDefaultGroup(std::string const &group)
boost::filesystem::path openDatFilePath_