1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h 2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h 10 #include <boost/filesystem.hpp> 11 #include <boost/algorithm/string.hpp> 24 template<
typename Consumer>
42 void start()
override;
52 std::auto_ptr<Consumer>
c_;
77 template<
typename Consumer>
94 outBuf_(new unsigned char[1024*1024]),
96 dataRwFulk_(
evf::
EvFDaqDirector::make_flock( F_UNLCK, SEEK_SET, 0, 0, getpid() ))
106 <<
"Underscore character is reserved can not be used for stream names in FFF, but was detected in stream name -: " <<
streamLabel_;
110 boost::algorithm::to_lower(streamLabelLow);
111 auto streampos = streamLabelLow.rfind(
"stream");
112 if (streampos !=0 && streampos!=std::string::npos)
114 <<
"stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for names in FFF based HLT, but was detected in stream name";
119 template<
typename Consumer>
124 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing .dat files to -: " << baseRunDir;
152 std::stringstream tmpss,ss;
153 tmpss << baseRunDir <<
"/open/" <<
"output_" << getpid() <<
".jsd";
154 ss << baseRunDir <<
"/" <<
"output_" << getpid() <<
".jsd";
160 if (
stat (outJsonDefName.c_str(), &fstat) != 0) {
161 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing output definition file -: " << outJsonDefName;
165 boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
186 template<
typename Consumer>
189 template<
typename Consumer>
195 edm::LogInfo(
"RecoEventOutputModuleForFU") <<
"start() method, initializing streams. init stream -: " 197 c_->setInitMessageFile(openInitFileName);
202 template<
typename Consumer>
209 template<
typename Consumer>
213 c_->doOutputHeader(init_message);
217 stat(openIniFileName.c_str(), &istat);
220 uint32_t adlera=1,adlerb=0;
221 FILE *
src = fopen(openIniFileName.c_str(),
"r");
222 while (readInput<istat.st_size)
224 size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
235 uint32_t adler32c = (adlerb << 16) | adlera;
236 if (adler32c !=
c_->get_adler32_ini()) {
237 throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Checksum mismatch of ini file -: " << openIniFileName
238 <<
" expected:" <<
c_->get_adler32_ini() <<
" obtained:" << adler32c;
246 template<
typename Consumer>
250 c_->doOutputEvent(msg);
253 template<
typename Consumer>
258 Consumer::fillDescription(desc);
273 template<
typename Consumer>
282 template<
typename Consumer>
292 template<
typename Consumer>
298 c_->closeOutputFile();
299 bool abortFlag =
false;
303 edm::LogInfo(
"RecoEventOutputModuleForFU") <<
"output suppressed";
315 int data_readwrite_fd = fileno(des);
318 auto finishFile = [des, data_readwrite_fd,
this](FILE*
f) {
320 fcntl(data_readwrite_fd,F_SETLKW, &dataRwFulk_);
324 std::unique_ptr<FILE,decltype(finishFile)> desGuard{des,finishFile};
326 if (data_readwrite_fd == -1)
327 edm::LogError(
"RecoEventOutputModuleForFU") <<
"problem with creating filedesc for datamerge " << strerror(errno);
329 LogDebug(
"RecoEventOutputModuleForFU") <<
"creating filedesc for datamerge -: " << data_readwrite_fd;
330 fcntl(data_readwrite_fd, F_SETLKW, &
dataRwFlk_);
336 uint32_t mergedAdler32=1;
338 if (!
stat(deschecksum.c_str(), &istat)) {
340 cf = fopen(deschecksum.c_str(),
"r");
341 if (!cf)
throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Unable to open checksum file -: " << deschecksum.c_str();
342 fscanf(cf,
"%u",&mergedAdler32);
345 else edm::LogWarning(
"RecoEventOutputModuleForFU") <<
"Checksum file size is empty -: "<< deschecksum.c_str();
354 while (readInput<istat.st_size) {
355 size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
365 cf = fopen(deschecksum.c_str(),
"w");
366 if (!cf)
throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
371 fprintf(cf,
"%u",mergedAdler32);
378 throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Adler32 checksum mismatch after reading file -: " 384 filesize = istat.st_size;
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
static void fillDescription(ParameterSetDescription &desc)
void endLuminosityBlock(edm::LuminosityBlockForOutput const &) override
jsoncollector::IntJ accepted_
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
static const std::string BINARYOR
jsoncollector::StringJ mergeType_
boost::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
static const std::string ADLER32
jsoncollector::IntJ filesize_
static const std::string SUM
static bool serialize(JsonSerializable *pObj, std::string &output)
jsoncollector::IntJ hltErrorEvents_
void doOutputEvent(EventMsgBuilder const &msg) override
~RecoEventOutputModuleForFU() override
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
void addDefault(ParameterSetDescription const &psetDescription)
jsoncollector::IntJ fileAdler32_
virtual void setName(std::string name)
jsoncollector::StringJ filelist_
static const std::string MERGE
evf::FastMonitoringService * fms_
boost::filesystem::path openDatChecksumFilePath_
jsoncollector::StringJ transferDestination_
void doOutputHeader(InitMsgBuilder const &init_message) override
static void writeStringToFile(std::string const &filename, std::string &content)
static const std::string CAT
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
StreamerOutputModuleBase(ParameterSet const &ps)
jsoncollector::IntJ errorEvents_
jsoncollector::IntJ retCodeMask_
jsoncollector::DataPointDefinition outJsonDef_
std::auto_ptr< Consumer > c_
jsoncollector::IntJ processed_
jsoncollector::StringJ inputFiles_
void setDefaultGroup(std::string const &group)
void beginLuminosityBlock(edm::LuminosityBlockForOutput const &) override
boost::filesystem::path openDatFilePath_
static const std::string SAME