CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
RecoEventOutputModuleForFU.h
Go to the documentation of this file.
1 #ifndef IOPool_Streamer_RecoEventOutputModuleForFU_h
2 #define IOPool_Streamer_RecoEventOutputModuleForFU_h
3 
8 
9 #include <sstream>
10 #include <iomanip>
11 #include "boost/filesystem.hpp"
12 
18 
19 
20 namespace evf {
21  template<typename Consumer>
23 
32  public:
33  explicit RecoEventOutputModuleForFU(edm::ParameterSet const& ps);
35  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
36 
37  private:
38  virtual void start() const;
39  virtual void stop() const;
40  virtual void doOutputHeader(InitMsgBuilder const& init_message) const;
41  virtual void doOutputEvent(EventMsgBuilder const& msg) const;
42  // virtual void beginRun(edm::RunPrincipal const&);
45 
46  private:
47  std::auto_ptr<Consumer> c_;
51  mutable IntJ accepted_;
57  boost::shared_ptr<FastMonitor> jsonMonitor_;
60  unsigned char* outBuf_=0;
61 
62 
63  }; //end-of-class-def
64 
65  template<typename Consumer>
67  edm::StreamerOutputModuleBase(ps),
68  c_(new Consumer(ps)),
69  stream_label_(ps.getParameter<std::string>("@module_label")),
70  processed_(0),
71  accepted_(0),
72  errorEvents_(0),
73  retCodeMask_(0),
74  filelist_(),
75  filesize_(0),
76  inputFiles_(),
77  outBuf_(new unsigned char[1024*1024])
78  {
79  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
80  LogDebug("RecoEventOutputModuleForFU") << "writing .dat files to -: " << baseRunDir;
81  // create open dir if not already there
82  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
83 
85 
86  processed_.setName("Processed");
87  accepted_.setName("Accepted");
88  errorEvents_.setName("ErrorEvents");
89  retCodeMask_.setName("ReturnCodeMask");
90  filelist_.setName("Filelist");
91  filesize_.setName("Filesize");
92  inputFiles_.setName("InputFiles");
93 
95  outJsonDef_.addLegendItem("Processed","integer",DataPointDefinition::SUM);
97  outJsonDef_.addLegendItem("ErrorEvents","integer",DataPointDefinition::SUM);
98  outJsonDef_.addLegendItem("ReturnCodeMask","integer",DataPointDefinition::BINARYOR);
99  outJsonDef_.addLegendItem("Filelist","string",DataPointDefinition::MERGE);
100  outJsonDef_.addLegendItem("Filesize","integer",DataPointDefinition::SUM);
101  outJsonDef_.addLegendItem("InputFiles","string",DataPointDefinition::CAT);
102  std::stringstream tmpss,ss;
103  tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
104  ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
105  std::string outTmpJsonDefName = tmpss.str();
106  std::string outJsonDefName = ss.str();
107 
108  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
109  struct stat fstat;
110  if (stat (outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
111  LogDebug("RecoEventOutputModuleForFU") << "writing output definition file -: " << outJsonDefName;
114  FileIO::writeStringToFile(outTmpJsonDefName, content);
115  boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
116  }
117  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
118 
119  jsonMonitor_.reset(new FastMonitor(&outJsonDef_,true));
120  jsonMonitor_->setDefPath(outJsonDefName);
121  jsonMonitor_->registerGlobalMonitorable(&processed_,false);
122  jsonMonitor_->registerGlobalMonitorable(&accepted_,false);
123  jsonMonitor_->registerGlobalMonitorable(&errorEvents_,false);
124  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,false);
125  jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
126  jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
127  jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
128  jsonMonitor_->commit(nullptr);
129  }
130 
131  template<typename Consumer>
133 
134  template<typename Consumer>
135  void
137  {
138  const std::string initFileName = edm::Service<evf::EvFDaqDirector>()->getInitFilePath(stream_label_);
139  edm::LogInfo("RecoEventOutputModuleForFU") << "start() method, initializing streams. init stream -: "
140  << initFileName;
141  c_->setInitMessageFile(initFileName);
142  c_->start();
143  }
144 
145  template<typename Consumer>
146  void
148  {
149  c_->stop();
150  }
151 
152  template<typename Consumer>
153  void
155  {
156  c_->doOutputHeader(init_message);
157  }
158 
159  template<typename Consumer>
160  void
162  accepted_.value()++;
163  c_->doOutputEvent(msg); // You can't use msg in RecoEventOutputModuleForFU after this point
164  }
165 
166  template<typename Consumer>
167  void
171  Consumer::fillDescription(desc);
172  descriptions.add("streamerOutput", desc);
173  }
174 
175  template<typename Consumer>
177  {
178  //edm::LogInfo("RecoEventOutputModuleForFU") << "begin lumi";
179  openDatFilePath_ = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(ls.luminosityBlock(),stream_label_);
180  c_->setOutputFile(openDatFilePath_.string());
181  filelist_ = openDatFilePath_.filename().string();
182  }
183 
184  template<typename Consumer>
186  {
187  //edm::LogInfo("RecoEventOutputModuleForFU") << "end lumi";
188  long filesize=0;
189  c_->closeOutputFile();
190  processed_.value() = fms_->getEventsProcessedForLumi(ls.luminosityBlock());
191  if(processed_.value()!=0){
192  //int b;
193  // move dat file to one level up - this is VERRRRRY inefficient, come up with a smarter idea
194 
195  FILE *des = edm::Service<evf::EvFDaqDirector>()->maybeCreateAndLockFileHeadForStream(ls.luminosityBlock(),stream_label_);
196  FILE *src = fopen(openDatFilePath_.string().c_str(),"r");
197 
198  struct stat istat;
199  stat(openDatFilePath_.string().c_str(), &istat);
200  off_t readInput=0;
201  while (readInput<istat.st_size) {
202  unsigned long toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
203  fread(outBuf_,toRead,1,src);
204  fwrite(outBuf_,toRead,1,des);
205  readInput+=toRead;
206  }
207 
208  //if(des != 0 && src !=0){
209  // while((b=fgetc(src))!= EOF){
210  // fputc((unsigned char)b,des);
211  // filesize++;
212  // }
213  //}
214 
215  edm::Service<evf::EvFDaqDirector>()->unlockAndCloseMergeStream();
216  fclose(src);
217  }
218  //remove file
219  remove(openDatFilePath_.string().c_str());
220  filesize_=filesize;
221 
222  // output jsn file
223  if(processed_.value()!=0){
224  jsonMonitor_->snap(ls.luminosityBlock());
225  const std::string outputJsonNameStream =
226  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(ls.luminosityBlock(),stream_label_);
227  jsonMonitor_->outputFullJSON(outputJsonNameStream,ls.luminosityBlock());
228  }
229 
230  // reset monitoring params
231  accepted_.value() = 0;
232  filelist_ = "";
233  }
234 
235 } // end of namespace-edm
236 
237 #endif
#define LogDebug(id)
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
static void fillDescription(ParameterSetDescription &desc)
virtual void doOutputEvent(EventMsgBuilder const &msg) const
virtual void doOutputHeader(InitMsgBuilder const &init_message) const
virtual void beginLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)
RecoEventOutputModuleForFU(edm::ParameterSet const &ps)
LuminosityBlockNumber_t luminosityBlock() const
tuple path
else: Piece not in the list, fine.
boost::shared_ptr< FastMonitor > jsonMonitor_
virtual void setName(std::string name)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
#define SUM(A, B)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::pair< Binary, Binary > serialize(const T &payload, bool packingOnly=false)
Definition: Serialization.h:92
void setDefaultGroup(std::string const &group)
virtual void endLuminosityBlock(edm::LuminosityBlockPrincipal const &, edm::ModuleCallingContext const *)