CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
GlobalEvFOutputModule.cc
Go to the documentation of this file.
2 
6 
10 
15 
17 
20 
23 
27 
30 
32 
33 #include <sys/stat.h>
34 #include <filesystem>
35 #include <boost/algorithm/string.hpp>
36 
38 
39 namespace evf {
40 
42 
44  public:
46  : filePath_(filePath), accepted_(0), stream_writer_events_(new StreamerOutputFile(filePath)) {}
47 
49 
50  void close() { stream_writer_events_->close(); }
51 
53  EventMsgView eview(msg.startAddress());
54  stream_writer_events_->write(eview);
55  incAccepted();
56  }
57 
58  void doOutputEventAsync(std::unique_ptr<EventMsgBuilder> msg, edm::WaitingTaskHolder iHolder) {
59  auto group = iHolder.group();
60  writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() {
61  try {
62  std::unique_ptr<EventMsgBuilder> own(msg);
63  doOutputEvent(*msg); //msg is written and discarded at this point
64  } catch (...) {
65  auto tmp = holder;
66  tmp.doneWaiting(std::current_exception());
67  }
68  });
69  }
70 
71  uint32 get_adler32() const { return stream_writer_events_->adler32(); }
72 
73  std::string const& getFilePath() const { return filePath_; }
74 
75  unsigned long getAccepted() const { return accepted_; }
76  void incAccepted() { accepted_++; }
77 
79 
80  private:
82  std::atomic<unsigned long> accepted_;
85  };
86 
88  public:
90 
93  };
94 
96  public:
99  std::string const& outJsonDefName);
100 
112  std::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
113  };
114 
120 
122  public:
123  explicit GlobalEvFOutputModule(edm::ParameterSet const& ps);
124  ~GlobalEvFOutputModule() override;
125  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
126 
127  private:
128  std::unique_ptr<edm::StreamerOutputModuleCommon> beginStream(edm::StreamID) const final;
129 
130  std::shared_ptr<GlobalEvFOutputJSONDef> globalBeginRun(edm::RunForOutput const& run) const final;
131 
133  void write(edm::EventForOutput const& e) final;
134 
135  //pure in parent class but unused here
137  void writeRun(edm::RunForOutput const&) final {}
138  void globalEndRun(edm::RunForOutput const&) const final {}
139 
140  std::shared_ptr<GlobalEvFOutputEventWriter> globalBeginLuminosityBlock(
141  edm::LuminosityBlockForOutput const& iLB) const final;
142  void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const final;
143 
145 
150 
152 
153  }; //end-of-class-def
154 
156  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
157  LogDebug("GlobalEvFOutputModule") << "writing .dat files to -: " << baseRunDir;
158 
159  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
160 
170  outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
173 
174  std::stringstream tmpss, ss;
175  tmpss << baseRunDir << "/open/"
176  << "output_" << getpid() << ".jsd";
177  ss << baseRunDir << "/"
178  << "output_" << getpid() << ".jsd";
179  std::string outTmpJsonDefName = tmpss.str();
180  outJsonDefName_ = ss.str();
181 
182  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
183  struct stat fstat;
184  if (stat(outJsonDefName_.c_str(), &fstat) != 0) { //file does not exist
185  LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_;
188  jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
189  std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
190  }
191  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
192  }
194  jsoncollector::DataPointDefinition const& outJsonDef,
195  std::string const& outJsonDefName)
196  : processed_(0),
197  accepted_(0),
198  errorEvents_(0),
199  retCodeMask_(0),
200  filelist_(),
201  filesize_(0),
202  inputFiles_(),
203  fileAdler32_(1),
204  hltErrorEvents_(0) {
205  transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel);
206  mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel, evf::MergeTypeDAT);
207 
208  processed_.setName("Processed");
209  accepted_.setName("Accepted");
210  errorEvents_.setName("ErrorEvents");
211  retCodeMask_.setName("ReturnCodeMask");
212  filelist_.setName("Filelist");
213  filesize_.setName("Filesize");
214  inputFiles_.setName("InputFiles");
215  fileAdler32_.setName("FileAdler32");
216  transferDestination_.setName("TransferDestination");
217  mergeType_.setName("MergeType");
218  hltErrorEvents_.setName("HLTErrorEvents");
219 
220  jsonMonitor_.reset(new jsoncollector::FastMonitor(&outJsonDef, true));
221  jsonMonitor_->setDefPath(outJsonDefName);
222  jsonMonitor_->registerGlobalMonitorable(&processed_, false);
223  jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
224  jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
225  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
226  jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
227  jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
228  jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
229  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
230  jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
231  jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
232  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
233  jsonMonitor_->commit(nullptr);
234  }
235 
237  : edm::global::OutputModuleBase(ps),
239  ps_(ps),
240  streamLabel_(ps.getParameter<std::string>("@module_label")),
241  trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
242  psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
243  ps.getUntrackedParameter<edm::InputTag>("psetMap"))) {
244  //replace hltOutoputA with stream if the HLT menu uses this convention
245  std::string testPrefix = "hltOutput";
246  if (streamLabel_.find(testPrefix) == 0)
247  streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
248 
249  if (streamLabel_.find('_') != std::string::npos) {
250  throw cms::Exception("GlobalEvFOutputModule")
251  << "Underscore character is reserved can not be used for stream names in "
252  "FFF, but was detected in stream name -: "
253  << streamLabel_;
254  }
255 
256  std::string streamLabelLow = streamLabel_;
257  boost::algorithm::to_lower(streamLabelLow);
258  auto streampos = streamLabelLow.rfind("stream");
259  if (streampos != 0 && streampos != std::string::npos)
260  throw cms::Exception("GlobalEvFOutputModule")
261  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
262  "names in FFF based HLT, but was detected in stream name";
263 
265  }
266 
268 
273  desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
274  ->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
275  descriptions.add("globalEvfOutputModule", desc);
276  }
277 
278  std::unique_ptr<edm::StreamerOutputModuleCommon> GlobalEvFOutputModule::beginStream(edm::StreamID) const {
279  return std::make_unique<edm::StreamerOutputModuleCommon>(
281  }
282 
283  std::shared_ptr<GlobalEvFOutputJSONDef> GlobalEvFOutputModule::globalBeginRun(edm::RunForOutput const& run) const {
284  //create run Cache holding JSON file writer and variables
285  auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>();
286 
287  edm::StreamerOutputModuleCommon streamerCommon(ps_, &keptProducts()[edm::InEvent], description().moduleLabel());
288 
289  //output INI file (non-const). This doesn't require globalBeginRun to be finished
290  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
291  edm::LogInfo("GlobalEvFOutputModule") << "beginRun init stream -: " << openIniFileName;
292 
293  StreamerOutputFile stream_writer_preamble(openIniFileName);
294  uint32 preamble_adler32 = 1;
295  edm::BranchIDLists const* bidlPtr = branchIDLists();
296 
297  auto psetMapHandle = run.getHandle(psetToken_);
298 
299  std::unique_ptr<InitMsgBuilder> init_message =
300  streamerCommon.serializeRegistry(*streamerCommon.getSerializerBuffer(),
301  *bidlPtr,
306  psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
307 
308  //Let us turn it into a View
309  InitMsgView view(init_message->startAddress());
310 
311  //output header
312  stream_writer_preamble.write(view);
313  preamble_adler32 = stream_writer_preamble.adler32();
314  stream_writer_preamble.close();
315 
316  struct stat istat;
317  stat(openIniFileName.c_str(), &istat);
318  //read back file to check integrity of what was written
319  off_t readInput = 0;
320  uint32_t adlera = 1, adlerb = 0;
321  FILE* src = fopen(openIniFileName.c_str(), "r");
322 
323  //allocate buffer to write INI file
324  std::unique_ptr<unsigned char[]> outBuf = std::make_unique<unsigned char[]>(1024 * 1024);
325  while (readInput < istat.st_size) {
326  size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
327  fread(outBuf.get(), toRead, 1, src);
328  cms::Adler32(const_cast<const char*>(reinterpret_cast<char*>(outBuf.get())), toRead, adlera, adlerb);
329  readInput += toRead;
330  }
331  fclose(src);
332 
333  //clear serialization buffers
334  streamerCommon.getSerializerBuffer()->clearHeaderBuffer();
335 
336  //free output buffer needed only for the file write
337  outBuf.reset();
338 
339  uint32_t adler32c = (adlerb << 16) | adlera;
340  if (adler32c != preamble_adler32) {
341  throw cms::Exception("GlobalEvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
342  << " expected:" << preamble_adler32 << " obtained:" << adler32c;
343  } else {
344  LogDebug("GlobalEvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
345  std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
346  }
347 
348  return jsonDef;
349  }
350 
352  edm::EventForOutput const& e) const {
353  Trig result;
355  return result;
356  }
357 
358  std::shared_ptr<GlobalEvFOutputEventWriter> GlobalEvFOutputModule::globalBeginLuminosityBlock(
359  edm::LuminosityBlockForOutput const& iLB) const {
360  auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);
361 
362  return std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath);
363  }
364 
366  edm::EventForOutput const& e,
367  edm::WaitingTaskWithArenaHolder iHolder) const {
369 
370  auto streamerCommon = streamCache(id);
371  std::unique_ptr<EventMsgBuilder> msg =
372  streamerCommon->serializeEvent(*streamerCommon->getSerializerBuffer(), e, triggerResults, selectorConfig());
373 
374  auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
375  const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)
376  ->doOutputEventAsync(std::move(msg), iHolder.makeWaitingTaskHolderAndRelease());
377  }
379 
381  auto lumiWriter = luminosityBlockCache(iLB.index());
382  //close dat file
383  const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)->close();
384 
385  //auto jsonWriter = const_cast<GlobalEvFOutputJSONWriter*>(runCache(iLB.getRun().index()));
386  auto jsonDef = runCache(iLB.getRun().index());
387  GlobalEvFOutputJSONWriter jsonWriter(streamLabel_, jsonDef->outJsonDef_, jsonDef->outJsonDefName_);
388 
389  jsonWriter.fileAdler32_.value() = lumiWriter->get_adler32();
390  jsonWriter.accepted_.value() = lumiWriter->getAccepted();
391 
392  bool abortFlag = false;
393  jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
394  if (abortFlag) {
395  edm::LogInfo("GlobalEvFOutputModule") << "Abort flag has been set. Output is suppressed";
396  return;
397  }
398 
399  if (jsonWriter.processed_.value() != 0) {
400  struct stat istat;
401  std::filesystem::path openDatFilePath = lumiWriter->getFilePath();
402  stat(openDatFilePath.string().c_str(), &istat);
403  jsonWriter.filesize_ = istat.st_size;
404  std::filesystem::rename(openDatFilePath.string().c_str(),
406  jsonWriter.filelist_ = openDatFilePath.filename().string();
407  } else {
408  //remove empty file when no event processing has occurred
409  remove(lumiWriter->getFilePath().c_str());
410  jsonWriter.filesize_ = 0;
411  jsonWriter.filelist_ = "";
412  jsonWriter.fileAdler32_.value() = -1; //no files in signed long
413  }
414 
415  //produce JSON file
416  jsonWriter.jsonMonitor_->snap(iLB.luminosityBlock());
417  const std::string outputJsonNameStream =
418  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_);
419  jsonWriter.jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock());
420  }
421 
422 } // namespace evf
423 
424 using namespace evf;
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
ParameterSetID const & mainParameterSetID() const
std::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
edm::global::OutputModule< edm::RunCache< GlobalEvFOutputJSONDef >, edm::LuminosityBlockCache< evf::GlobalEvFOutputEventWriter >, edm::StreamCache< edm::StreamerOutputModuleCommon >, edm::ExternalWork > GlobalEvFOutputModuleType
GlobalEvFOutputEventWriter(std::string const &filePath)
jsoncollector::DataPointDefinition outJsonDef_
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
std::string const & getFilePath() const
RunIndex index() const
Definition: RunForOutput.cc:32
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
GlobalEvFOutputJSONWriter(std::string const &streamLabel, jsoncollector::DataPointDefinition const &, std::string const &outJsonDefName)
std::shared_ptr< GlobalEvFOutputJSONDef > globalBeginRun(edm::RunForOutput const &run) const final
static bool serialize(JsonSerializable *pObj, std::string &output)
void write(const InitMsgBuilder &)
edm::EDGetTokenT< edm::TriggerResults > trToken_
uint8 * startAddress() const
std::string to_lower(const std::string &s)
std::unique_ptr< edm::StreamerOutputModuleCommon > beginStream(edm::StreamID) const final
void doOutputEvent(EventMsgBuilder const &msg)
std::string const & moduleLabel() const
edm::EDGetTokenT< edm::SendJobHeader::ParameterSetMap > psetToken_
Handle< PROD > getHandle(EDGetTokenT< PROD > token) const
tuple result
Definition: mps_fire.py:311
void globalEndRun(edm::RunForOutput const &) const final
LuminosityBlockIndex index() const
Trig getTriggerResults(edm::EDGetTokenT< edm::TriggerResults > const &token, edm::EventForOutput const &e) const
void writeRun(edm::RunForOutput const &) final
LuminosityBlockForOutput const & getLuminosityBlock() const
void push(tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
LuminosityBlockNumber_t luminosityBlock() const
static void fillDescription(ParameterSetDescription &desc)
virtual void setName(std::string name)
ModuleDescription const & description() const
def move
Definition: eostools.py:511
std::atomic< unsigned long > accepted_
std::shared_ptr< GlobalEvFOutputEventWriter > globalBeginLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const final
static std::string const triggerResults
Definition: EdmProvDump.cc:44
RunForOutput const & getRun() const
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
evf::FastMonitoringService * fms_
void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const final
unsigned int uint32
Definition: MsgTools.h:13
BranchIDLists const * branchIDLists() const
Log< level::Info, false > LogInfo
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
tuple group
Definition: watchdog.py:82
uint32 adler32() const
ParameterSetID selectorConfig() const
jsoncollector::StringJ transferDestination_
void write(edm::EventForOutput const &e) final
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:285
BasicHandle getByToken(EDGetToken token, TypeID const &typeID) const
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
edm::ParameterSet const & ps_
tbb::task_group * group() const noexcept
void acquire(edm::StreamID, edm::EventForOutput const &, edm::WaitingTaskWithArenaHolder) const final
void doOutputEventAsync(std::unique_ptr< EventMsgBuilder > msg, edm::WaitingTaskHolder iHolder)
bool isValid() const
Definition: Hash.h:141
SelectedProductsForBranchType const & keptProducts() const
void writeLuminosityBlock(edm::LuminosityBlockForOutput const &) final
const ModuleDescription & moduleDescription() const
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
GlobalEvFOutputModule(edm::ParameterSet const &ps)
edm::propagate_const< std::unique_ptr< StreamerOutputFile > > stream_writer_events_
void setDefaultGroup(std::string const &group)
tmp
align.sh
Definition: createJobs.py:716
edm::detail::TriggerResultsBasedEventSelector::handle_t Trig
static void fillDescription(ParameterSetDescription &desc, std::vector< std::string > const &iDefaultOutputCommands=ProductSelectorRules::defaultSelectionStrings())
#define LogDebug(id)