1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
11 #include <boost/filesystem.hpp>
23 template<
typename Consumer>
40 virtual void start()
const;
41 virtual void stop()
const;
49 std::auto_ptr<Consumer>
c_;
70 template<
typename Consumer>
72 edm::StreamerOutputModuleBase(ps),
74 stream_label_(ps.getParameter<std::
string>(
"@module_label")),
83 outBuf_(new unsigned
char[1024*1024])
87 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing .dat files to -: " << baseRunDir;
116 std::stringstream tmpss,
ss;
117 tmpss << baseRunDir <<
"/open/" <<
"output_" << getpid() <<
".jsd";
118 ss << baseRunDir <<
"/" <<
"output_" << getpid() <<
".jsd";
124 if (stat (outJsonDefName.c_str(), &fstat) != 0) {
125 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing output definition file -: " << outJsonDefName;
128 FileIO::writeStringToFile(outTmpJsonDefName, content);
129 boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
146 template<
typename Consumer>
149 template<
typename Consumer>
154 edm::LogInfo(
"RecoEventOutputModuleForFU") <<
"start() method, initializing streams. init stream -: "
156 c_->setInitMessageFile(initFileName);
160 template<
typename Consumer>
167 template<
typename Consumer>
171 c_->doOutputHeader(init_message);
174 template<
typename Consumer>
178 c_->doOutputEvent(msg);
181 template<
typename Consumer>
186 Consumer::fillDescription(desc);
187 descriptions.
add(
"streamerOutput", desc);
190 template<
typename Consumer>
196 c_->setOutputFile(openDatFilePath_.string());
197 filelist_ = openDatFilePath_.filename().string();
200 template<
typename Consumer>
205 fileAdler32_.value() = c_->get_adler32();
206 c_->closeOutputFile();
207 processed_.value() = fms_->getEventsProcessedForLumi(ls.
luminosityBlock());
210 if(processed_.value()!=0){
219 uint32_t mergedAdler32=1;
221 if (!stat(deschecksum.c_str(), &istat)) {
223 cf = fopen(deschecksum.c_str(),
"r");
224 if (!cf)
throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Unable to open checksum file -: " << deschecksum.c_str();
225 fscanf(cf,
"%u",&mergedAdler32);
228 else edm::LogWarning(
"RecoEventOutputModuleForFU") <<
"Checksum file size is empty -: "<< deschecksum.c_str();
231 FILE *
src = fopen(openDatFilePath_.string().c_str(),
"r");
233 stat(openDatFilePath_.string().c_str(), &istat);
237 while (readInput<istat.st_size) {
238 size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
239 fread(outBuf_,toRead,1,src);
240 fwrite(outBuf_,toRead,1,des);
241 if (readAdler32Check_)
242 cms::Adler32((
const char*)outBuf_,toRead,adlera,adlerb);
255 cf = fopen(deschecksum.c_str(),
"w");
256 if (!cf)
throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
259 mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);
261 fprintf(cf,
"%u",mergedAdler32);
267 if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {
269 throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Adler32 checksum mismatch after reading file -: "
270 << openDatFilePath_.string() <<
" in LS " << ls.
luminosityBlock() << std::endl;
275 remove(openDatFilePath_.string().c_str());
279 if(processed_.value()!=0){
283 jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.
luminosityBlock());
287 accepted_.value() = 0;
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
virtual void stop() const
static void fillDescription(ParameterSetDescription &desc)
virtual void doOutputEvent(EventMsgBuilder const &msg) const
virtual void doOutputHeader(InitMsgBuilder const &init_message) const
virtual void beginLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)
virtual ~RecoEventOutputModuleForFU()
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
tuple path
else: Piece not in the list, fine.
boost::shared_ptr< FastMonitor > jsonMonitor_
virtual void setName(std::string name)
evf::FastMonitoringService * fms_
boost::filesystem::path openDatChecksumFilePath_
DataPointDefinition outJsonDef_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
std::string stream_label_
std::auto_ptr< Consumer > c_
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::pair< Binary, Binary > serialize(const T &payload, bool packingOnly=false)
virtual void start() const
void setDefaultGroup(std::string const &group)
boost::filesystem::path openDatFilePath_
virtual void endLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)