1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
11 #include <boost/filesystem.hpp>
23 template<
typename Consumer>
40 virtual void start()
const;
41 virtual void stop()
const;
50 std::auto_ptr<Consumer>
c_;
72 template<
typename Consumer>
74 edm::StreamerOutputModuleBase(ps),
76 stream_label_(ps.getParameter<std::
string>(
"@module_label")),
85 transferDestination_(),
86 outBuf_(new unsigned
char[1024*1024])
90 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing .dat files to -: " << baseRunDir;
121 std::stringstream tmpss,
ss;
122 tmpss << baseRunDir <<
"/open/" <<
"output_" << getpid() <<
".jsd";
123 ss << baseRunDir <<
"/" <<
"output_" << getpid() <<
".jsd";
129 if (stat (outJsonDefName.c_str(), &fstat) != 0) {
130 LogDebug(
"RecoEventOutputModuleForFU") <<
"writing output definition file -: " << outJsonDefName;
133 FileIO::writeStringToFile(outTmpJsonDefName, content);
134 boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
153 template<
typename Consumer>
156 template<
typename Consumer>
161 edm::LogInfo(
"RecoEventOutputModuleForFU") <<
"start() method, initializing streams. init stream -: "
163 c_->setInitMessageFile(openInitFileName);
168 template<
typename Consumer>
175 template<
typename Consumer>
179 c_->doOutputHeader(init_message);
183 stat(openIniFileName.c_str(), &istat);
186 uint32_t adlera=1,adlerb=0;
187 FILE *
src = fopen(openIniFileName.c_str(),
"r");
188 while (readInput<istat.st_size)
190 size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
191 fread(outBuf_,toRead,1,src);
192 cms::Adler32((
const char*)outBuf_,toRead,adlera,adlerb);
196 uint32_t adler32c = (adlerb << 16) | adlera;
197 if (adler32c != c_->get_adler32_ini()) {
198 throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Checksum mismatch of ini file -: " << openIniFileName
199 <<
" expected:" << c_->get_adler32_ini() <<
" obtained:" << adler32c;
202 edm::LogWarning(
"RecoEventOutputModuleForFU") <<
"Ini file checksum -: "<< stream_label_ <<
" " << adler32c;
207 template<
typename Consumer>
211 c_->doOutputEvent(msg);
214 template<
typename Consumer>
219 Consumer::fillDescription(desc);
220 descriptions.
add(
"streamerOutput", desc);
223 template<
typename Consumer>
231 template<
typename Consumer>
237 c_->setOutputFile(openDatFilePath_.string());
238 filelist_ = openDatFilePath_.filename().string();
241 template<
typename Consumer>
246 fileAdler32_.value() = c_->get_adler32();
247 c_->closeOutputFile();
248 processed_.value() = fms_->getEventsProcessedForLumi(ls.
luminosityBlock());
251 if(processed_.value()!=0){
260 uint32_t mergedAdler32=1;
262 if (!stat(deschecksum.c_str(), &istat)) {
264 cf = fopen(deschecksum.c_str(),
"r");
265 if (!cf)
throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Unable to open checksum file -: " << deschecksum.c_str();
266 fscanf(cf,
"%u",&mergedAdler32);
269 else edm::LogWarning(
"RecoEventOutputModuleForFU") <<
"Checksum file size is empty -: "<< deschecksum.c_str();
272 FILE *
src = fopen(openDatFilePath_.string().c_str(),
"r");
274 stat(openDatFilePath_.string().c_str(), &istat);
278 while (readInput<istat.st_size) {
279 size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
280 fread(outBuf_,toRead,1,src);
281 fwrite(outBuf_,toRead,1,des);
282 if (readAdler32Check_)
283 cms::Adler32((
const char*)outBuf_,toRead,adlera,adlerb);
296 cf = fopen(deschecksum.c_str(),
"w");
297 if (!cf)
throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Unable to open or rewind checksum file for writing -:" << deschecksum.c_str();
300 mergedAdler32 = adler32_combine(mergedAdler32,fileAdler32_.value(),filesize);
302 fprintf(cf,
"%u",mergedAdler32);
308 if (readAdler32Check_ && ((adlerb << 16) | adlera) != fileAdler32_.value()) {
310 throw cms::Exception(
"RecoEventOutputModuleForFU") <<
"Adler32 checksum mismatch after reading file -: "
311 << openDatFilePath_.string() <<
" in LS " << ls.
luminosityBlock() << std::endl;
316 remove(openDatFilePath_.string().c_str());
320 if(processed_.value()!=0){
324 jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.
luminosityBlock());
328 accepted_.value() = 0;
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
virtual void stop() const
static void fillDescription(ParameterSetDescription &desc)
virtual void doOutputEvent(EventMsgBuilder const &msg) const
virtual void doOutputHeader(InitMsgBuilder const &init_message) const
virtual void beginLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)
virtual ~RecoEventOutputModuleForFU()
StringJ transferDestination_
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
tuple path
else: Piece not in the list, fine.
boost::shared_ptr< FastMonitor > jsonMonitor_
virtual void setName(std::string name)
evf::FastMonitoringService * fms_
boost::filesystem::path openDatChecksumFilePath_
DataPointDefinition outJsonDef_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
std::string stream_label_
std::auto_ptr< Consumer > c_
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::pair< Binary, Binary > serialize(const T &payload, bool packingOnly=false)
virtual void start() const
void setDefaultGroup(std::string const &group)
boost::filesystem::path openDatFilePath_
virtual void endLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)