CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
8 
9 #include <sstream>
10 #include <iomanip>
11 #include "boost/filesystem.hpp"
12 
18 
19 
20 namespace evf {
21  template<typename Consumer>
23 
32  public:
33  explicit RecoEventOutputModuleForFU(edm::ParameterSet const& ps);
35  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
36 
37  private:
38  virtual void start() const;
39  virtual void stop() const;
40  virtual void doOutputHeader(InitMsgBuilder const& init_message) const;
41  virtual void doOutputEvent(EventMsgBuilder const& msg) const;
42  // virtual void beginRun(edm::RunPrincipal const&);
45 
46  private:
47  std::auto_ptr<Consumer> c_;
51  mutable IntJ accepted_;
57  boost::shared_ptr<FastMonitor> jsonMonitor_;
60  unsigned char* outBuf_=0;
61 
62 
63  }; //end-of-class-def
64 
65  template<typename Consumer>
67  edm::StreamerOutputModuleBase(ps),
68  c_(new Consumer(ps)),
69  stream_label_(ps.getParameter<std::string>("@module_label")),
70  processed_(0),
71  accepted_(0),
72  errorEvents_(0),
73  retCodeMask_(0),
74  filelist_(),
75  filesize_(0),
76  inputFiles_(),
77  outBuf_(new unsigned char[1024*1024])
78  {
79  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
80  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
81  // create open dir if not already there
82  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
83 
84  //replace hltOutoputA with stream if the HLT menu uses this convention
85  std::string testPrefix="hltOutput";
86  if (stream_label_.find(testPrefix)==0)
87  stream_label_=std::string("stream")+stream_label_.substr(testPrefix.size());
88 
90 
91  processed_.setName("Processed");
92  accepted_.setName("Accepted");
93  errorEvents_.setName("ErrorEvents");
94  retCodeMask_.setName("ReturnCodeMask");
95  filelist_.setName("Filelist");
96  filesize_.setName("Filesize");
97  inputFiles_.setName("InputFiles");
98 
100  outJsonDef_.addLegendItem("Processed","integer",DataPointDefinition::SUM);
101  outJsonDef_.addLegendItem("Accepted","integer",DataPointDefinition::SUM);
102  outJsonDef_.addLegendItem("ErrorEvents","integer",DataPointDefinition::SUM);
103  outJsonDef_.addLegendItem("ReturnCodeMask","integer",DataPointDefinition::BINARYOR);
104  outJsonDef_.addLegendItem("Filelist","string",DataPointDefinition::MERGE);
105  outJsonDef_.addLegendItem("Filesize","integer",DataPointDefinition::SUM);
106  outJsonDef_.addLegendItem("InputFiles","string",DataPointDefinition::CAT);
107  std::stringstream tmpss,ss;
108  tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
109  ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
110  std::string outTmpJsonDefName = tmpss.str();
111  std::string outJsonDefName = ss.str();
112 
113  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
114  struct stat fstat;
115  if (stat (outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
116  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
119  FileIO::writeStringToFile(outTmpJsonDefName, content);
120  boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
121  }
122  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
123 
124  jsonMonitor_.reset(new FastMonitor(&outJsonDef_,true));
125  jsonMonitor_->setDefPath(outJsonDefName);
126  jsonMonitor_->registerGlobalMonitorable(&processed_,false);
127  jsonMonitor_->registerGlobalMonitorable(&accepted_,false);
128  jsonMonitor_->registerGlobalMonitorable(&errorEvents_,false);
129  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,false);
130  jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
131  jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
132  jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
133  jsonMonitor_->commit(nullptr);
134  }
135 
136  template<typename Consumer>
138 
139  template<typename Consumer>
140  void
142  {
143  const std::string initFileName = edm::Service<evf::EvFDaqDirector>()->getInitFilePath(stream_label_);
144  edm::LogInfo("RecoEventOutputModuleForFU") << "start() method, initializing streams. init stream -: "
145  << initFileName;
146  c_->setInitMessageFile(initFileName);
147  c_->start();
148  }
149 
150  template<typename Consumer>
151  void
153  {
154  c_->stop();
155  }
156 
157  template<typename Consumer>
158  void
160  {
161  c_->doOutputHeader(init_message);
162  }
163 
164  template<typename Consumer>
165  void
167  accepted_.value()++;
168  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
169  }
170 
171  template<typename Consumer>
172  void
176  Consumer::fillDescription(desc);
177  descriptions.add("streamerOutput", desc);
178  }
179 
180  template<typename Consumer>
182  {
183  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
184  openDatFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
185  c_->setOutputFile(openDatFilePath_.string());
186  filelist_ = openDatFilePath_.filename().string();
187  }
188 
189  template<typename Consumer>
191  {
192  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
193  long filesize=0;
194  c_->closeOutputFile();
195  processed_.value() = fms_->getEventsProcessedForLumi(ls.luminosityBlock());
196  if(processed_.value()!=0){
197  //int b;
198  // move dat file to one level up - this is VERRRRRY inefficient, come up with a smarter idea
199 
200  FILE *des = edm::Service<evf::EvFDaqDirector>()->maybeCreateAndLockFileHeadForStream(ls.luminosityBlock(),stream_label_);
201  FILE *src = fopen(openDatFilePath_.string().c_str(),"r");
202 
203  struct stat istat;
204  stat(openDatFilePath_.string().c_str(), &istat);
205  off_t readInput=0;
206  while (readInput<istat.st_size) {
207  unsigned long toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
208  fread(outBuf_,toRead,1,src);
209  fwrite(outBuf_,toRead,1,des);
210  readInput+=toRead;
211  filesize+=toRead;
212  }
213 
214  //if(des != 0 && src !=0){
215  // while((b=fgetc(src))!= EOF){
216  // fputc((unsigned char)b,des);
217  // filesize++;
218  // }
219  //}
220 
221  edm::Service<evf::EvFDaqDirector>()->unlockAndCloseMergeStream();
222  fclose(src);
223  }
224  //remove file
225  remove(openDatFilePath_.string().c_str());
226  filesize_=filesize;
227 
228  // output jsn file
229  if(processed_.value()!=0){
230  jsonMonitor_->snap(ls.luminosityBlock());
231  const std::string outputJsonNameStream =
232  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(),stream_label_);
233  jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.luminosityBlock());
234  }
235 
236  // reset monitoring params
237  accepted_.value() = 0;
238  filelist_ = "";
239  }
240 
241 } // end of namespace-edm
242 
243 #endif
#define LogDebug(id)
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
static void fillDescription(ParameterSetDescription &desc)
virtual void doOutputEvent(EventMsgBuilder const &msg) const
virtual void doOutputHeader(InitMsgBuilder const &init_message) const
virtual void beginLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
tuple path
else: Piece not in the list, fine.
boost::shared_ptr< FastMonitor > jsonMonitor_
virtual void setName(std::string name)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
#define SUM(A, B)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::pair< Binary, Binary > serialize(const T &payload, bool packingOnly=false)
Definition: Serialization.h:92
void setDefaultGroup(std::string const &group)
virtual void endLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)