CMS 3D CMS Logo

EvFOutputModule.cc
Go to the documentation of this file.
2 
4 
8 
12 
16 
19 
20 #include <sys/stat.h>
21 #include <boost/filesystem.hpp>
22 #include <boost/algorithm/string.hpp>
23 
24 namespace evf {
25 
27  streamerCommon_(ps, selections),
28  processed_(0),
29  accepted_(0),
30  errorEvents_(0),
31  retCodeMask_(0),
32  filelist_(),
33  filesize_(0),
34  inputFiles_(),
35  fileAdler32_(1),
36  hltErrorEvents_(0)
37  {
38 
39  transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel);
40  mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel,evf::MergeTypeDAT);
41 
42  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
43  LogDebug("EvFOutputModule") << "writing .dat files to -: " << baseRunDir;
44 
45  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
46 
47  processed_.setName("Processed");
48  accepted_.setName("Accepted");
49  errorEvents_.setName("ErrorEvents");
50  retCodeMask_.setName("ReturnCodeMask");
51  filelist_.setName("Filelist");
52  filesize_.setName("Filesize");
53  inputFiles_.setName("InputFiles");
54  fileAdler32_.setName("FileAdler32");
55  transferDestination_.setName("TransferDestination");
56  mergeType_.setName("MergeType");
57  hltErrorEvents_.setName("HLTErrorEvents");
58 
71 
72  std::stringstream tmpss,ss;
73  tmpss << baseRunDir << "/open/" << "output_" << getpid() << ".jsd";
74  ss << baseRunDir << "/" << "output_" << getpid() << ".jsd";
75  std::string outTmpJsonDefName = tmpss.str();
76  std::string outJsonDefName = ss.str();
77 
78  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
79  struct stat fstat;
80  if (stat (outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
81  LogDebug("EvFOutputModule") << "writing output definition file -: " << outJsonDefName;
84  jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
85  boost::filesystem::rename(outTmpJsonDefName,outJsonDefName);
86  }
87  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
88 
90  jsonMonitor_->setDefPath(outJsonDefName);
91  jsonMonitor_->registerGlobalMonitorable(&processed_,false);
92  jsonMonitor_->registerGlobalMonitorable(&accepted_,false);
93  jsonMonitor_->registerGlobalMonitorable(&errorEvents_,false);
94  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_,false);
95  jsonMonitor_->registerGlobalMonitorable(&filelist_,false);
96  jsonMonitor_->registerGlobalMonitorable(&filesize_,false);
97  jsonMonitor_->registerGlobalMonitorable(&inputFiles_,false);
98  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_,false);
99  jsonMonitor_->registerGlobalMonitorable(&transferDestination_,false);
100  jsonMonitor_->registerGlobalMonitorable(&mergeType_,false);
101  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_,false);
102  jsonMonitor_->commit(nullptr);
103  }
104 
106  edm::one::OutputModuleBase(ps),
108  ps_(ps),
109  streamLabel_(ps.getParameter<std::string>("@module_label")),
110  trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults")))
111  {
112  //replace hltOutoputA with stream if the HLT menu uses this convention
113  std::string testPrefix="hltOutput";
114  if (streamLabel_.find(testPrefix)==0)
115  streamLabel_=std::string("stream")+streamLabel_.substr(testPrefix.size());
116 
117  if (streamLabel_.find("_")!=std::string::npos) {
118  throw cms::Exception("EvFOutputModule")
119  << "Underscore character is reserved can not be used for stream names in FFF, but was detected in stream name -: " << streamLabel_;
120  }
121 
122  std::string streamLabelLow = streamLabel_;
123  boost::algorithm::to_lower(streamLabelLow);
124  auto streampos = streamLabelLow.rfind("stream");
125  if (streampos !=0 && streampos!=std::string::npos)
126  throw cms::Exception("EvFOutputModule")
127  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for names in FFF based HLT, but was detected in stream name";
128 
130  }
131 
132 
134 
135 
140  descriptions.addDefault(desc);
141  }
142 
143 
144  void
146  {
147  //create run Cache holding JSON file writer and variables
148  jsonWriter_ = std::make_unique<EvFOutputJSONWriter>(ps_,&keptProducts()[edm::InEvent],streamLabel_);
149 
150  //output INI file (non-const). This doesn't require globalBeginRun to be finished
151  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
152  edm::LogInfo("EvFOutputModule") << "beginRun init stream -: " << openIniFileName;
153 
154  StreamerOutputFile stream_writer_preamble(openIniFileName);
155  uint32 preamble_adler32 = 1;
156  edm::BranchIDLists const* bidlPtr = branchIDLists();
157 
158  std::unique_ptr<InitMsgBuilder> init_message =
159  jsonWriter_->streamerCommon_.serializeRegistry(*bidlPtr, *thinnedAssociationsHelper(),
160  OutputModule::processName(), description().moduleLabel(), moduleDescription().mainParameterSetID());
161 
162  //Let us turn it into a View
163  InitMsgView view(init_message->startAddress());
164 
165  //output header
166  stream_writer_preamble.write(view);
167  preamble_adler32 = stream_writer_preamble.adler32();
168  stream_writer_preamble.close();
169 
170  struct stat istat;
171  stat(openIniFileName.c_str(), &istat);
172  //read back file to check integrity of what was written
173  off_t readInput=0;
174  uint32_t adlera=1,adlerb=0;
175  FILE *src = fopen(openIniFileName.c_str(),"r");
176 
177  //allocate buffer to write INI file
178  unsigned char * outBuf = new unsigned char[1024*1024];
179  while (readInput<istat.st_size)
180  {
181  size_t toRead= readInput+1024*1024 < istat.st_size ? 1024*1024 : istat.st_size-readInput;
182  fread(outBuf,toRead,1,src);
183  cms::Adler32((const char*)outBuf,toRead,adlera,adlerb);
184  readInput+=toRead;
185  }
186  fclose(src);
187 
188  //clear serialization buffers
189  jsonWriter_->streamerCommon_.clearSerializeDataBuffer();
190 
191  //free output buffer needed only for the file write
192  delete [] outBuf;
193  outBuf=nullptr;
194 
195  uint32_t adler32c = (adlerb << 16) | adlera;
196  if (adler32c != preamble_adler32) {
197  throw cms::Exception("EvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
198  << " expected:" << preamble_adler32 << " obtained:" << adler32c;
199  }
200  else {
201  LogDebug("EvFOutputModule") << "Ini file checksum -: "<< streamLabel_ << " " << adler32c;
202  boost::filesystem::rename(openIniFileName,edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
203  }
204 
205  }
206 
207 
208  Trig
210  Trig result;
212  return result;
213  }
214 
215 
216  std::shared_ptr<EvFOutputEventWriter>
218  {
219  auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(),streamLabel_);
220 
221  return std::make_shared<EvFOutputEventWriter>(openDatFilePath);
222  }
223 
224 
225  void
227 
229 
230  //auto lumiWriter = const_cast<EvFOutputEventWriter*>(luminosityBlockCache(e.getLuminosityBlock().index() ));
231  auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index() );
232 
233  std::unique_ptr<EventMsgBuilder> msg = jsonWriter_->streamerCommon_.serializeEvent(e, triggerResults, selectorConfig());
234  lumiWriter->incAccepted();
235  lumiWriter->doOutputEvent(*msg); //msg is written and discarded at this point
236  }
237 
238 
239  void
241  {
242  auto lumiWriter = luminosityBlockCache(iLB.index());
243  //close dat file
244  lumiWriter->close();
245 
246  jsonWriter_->fileAdler32_.value() = lumiWriter->get_adler32();
247  jsonWriter_->accepted_.value() = lumiWriter->getAccepted();
248 
249  bool abortFlag = false;
250  jsonWriter_->processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(),&abortFlag);
251  if (abortFlag) {
252  edm::LogInfo("EvFOutputModule") << "Abort flag has been set. Output is suppressed";
253  return;
254  }
255 
256  if (jsonWriter_->processed_.value()!=0) {
257  struct stat istat;
258  boost::filesystem::path openDatFilePath = lumiWriter->getFilePath();
259  stat(openDatFilePath.string().c_str(), &istat);
260  jsonWriter_->filesize_ = istat.st_size;
261  boost::filesystem::rename(openDatFilePath.string().c_str(), edm::Service<evf::EvFDaqDirector>()->getDatFilePath(iLB.luminosityBlock(),streamLabel_));
262  jsonWriter_->filelist_ = openDatFilePath.filename().string();
263  } else {
264  //remove empty file when no event processing has occurred
265  remove(lumiWriter->getFilePath().c_str());
266  jsonWriter_->filesize_ = 0;
267  jsonWriter_->filelist_ = "";
268  jsonWriter_->fileAdler32_.value()=-1; //no files in signed long
269  }
270 
271  //produce JSON file
272  jsonWriter_->jsonMonitor_->snap(iLB.luminosityBlock());
273  const std::string outputJsonNameStream =
274  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(),streamLabel_);
275  jsonWriter_->jsonMonitor_->outputFullJSON(outputJsonNameStream,iLB.luminosityBlock());
276 
277  }
278 
279 
280 } // end of namespace-evf
281 
#define LogDebug(id)
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
Definition: fillJson.h:27
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=0)
EvFOutputModule(edm::ParameterSet const &ps)
std::unique_ptr< evf::EvFOutputJSONWriter > jsonWriter_
ModuleDescription const & description() const
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
edm::ParameterSet const & ps_
jsoncollector::IntJ fileAdler32_
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
BasicHandle getByToken(EDGetToken token, TypeID const &typeID) const
static bool serialize(JsonSerializable *pObj, std::string &output)
void write(const InitMsgBuilder &)
EvFOutputJSONWriter(edm::ParameterSet const &ps, edm::SelectedProducts const *selections, std::string const &streamLabel)
jsoncollector::DataPointDefinition outJsonDef_
jsoncollector::IntJ hltErrorEvents_
ParameterSetID selectorConfig() const
jsoncollector::StringJ inputFiles_
void beginRun(edm::RunForOutput const &run) override
LuminosityBlockIndex index() const
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
LuminosityBlockForOutput const & getLuminosityBlock() const
LuminosityBlockNumber_t luminosityBlock() const
jsoncollector::IntJ retCodeMask_
void addDefault(ParameterSetDescription const &psetDescription)
static void fillDescription(ParameterSetDescription &desc)
virtual void setName(std::string name)
void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) override
SelectedProductsForBranchType const & keptProducts() const
static std::string const triggerResults
Definition: EdmProvDump.cc:45
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
unsigned int uint32
Definition: MsgTools.h:13
jsoncollector::IntJ processed_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
const ModuleDescription & moduleDescription() const
void write(edm::EventForOutput const &e) override
uint32 adler32() const
edm::EDGetTokenT< edm::TriggerResults > trToken_
tuple msg
Definition: mps_check.py:285
jsoncollector::IntJ filesize_
jsoncollector::StringJ filelist_
HLT enums.
jsoncollector::StringJ transferDestination_
std::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
void setDefaultGroup(std::string const &group)
static void fillDescription(ParameterSetDescription &desc)
jsoncollector::IntJ accepted_
std::shared_ptr< EvFOutputEventWriter > globalBeginLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const override
jsoncollector::StringJ mergeType_
jsoncollector::IntJ errorEvents_
Trig getTriggerResults(edm::EDGetTokenT< edm::TriggerResults > const &token, edm::EventForOutput const &e) const
BranchIDLists const * branchIDLists()
evf::FastMonitoringService * fms_