CMS 3D CMS Logo

GlobalEvFOutputModule.cc
Go to the documentation of this file.
2 
6 
10 
16 
18 
21 
24 
28 
31 
33 
34 #include <sys/stat.h>
35 #include <filesystem>
36 #include <boost/algorithm/string.hpp>
37 
39 
40 namespace evf {
41 
43 
45  public:
46  explicit GlobalEvFOutputEventWriter(std::string const& filePath, unsigned int ls)
48 
50 
51  bool close() {
52  stream_writer_events_->close();
53  return (discarded_ || edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_));
54  }
55 
57  EventMsgView eview(msg.startAddress());
58  stream_writer_events_->write(eview);
59  incAccepted();
60  }
61 
62  void doOutputEventAsync(std::unique_ptr<EventMsgBuilder> msg, edm::WaitingTaskHolder iHolder) {
65  if (discarded_) {
66  incAccepted();
67  msg.reset();
68  return;
69  }
70  auto group = iHolder.group();
71  writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() {
72  try {
73  std::unique_ptr<EventMsgBuilder> own(msg);
74  doOutputEvent(*msg); //msg is written and discarded at this point
75  } catch (...) {
76  auto tmp = holder;
77  tmp.doneWaiting(std::current_exception());
78  }
79  });
80  }
81 
82  inline void throttledCheck() {
83  unsigned int counter = 0;
84  while (edm::Service<evf::EvFDaqDirector>()->inputThrottled() && !discarded_) {
85  if (edm::shutdown_flag.load(std::memory_order_relaxed))
86  break;
87  if (!(counter % 100))
88  edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
89  usleep(100000);
90  counter++;
91  if (edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
92  edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
93  discarded_ = true;
94  }
95  }
96  }
97 
98  inline void discardedCheck() {
99  if (!discarded_ && edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
100  edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
101  discarded_ = true;
102  }
103  }
104 
105  uint32 get_adler32() const { return stream_writer_events_->adler32(); }
106 
107  std::string const& getFilePath() const { return filePath_; }
108 
109  unsigned long getAccepted() const { return accepted_; }
110  void incAccepted() { accepted_++; }
111 
113 
114  private:
116  const unsigned ls_;
117  std::atomic<unsigned long> accepted_;
120  bool discarded_ = false;
121  };
122 
124  public:
125  GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd);
127 
132  };
133 
135  public:
138  std::string const& outJsonDefName,
139  jsoncollector::StringJ const& transferDestination,
140  jsoncollector::StringJ const& mergeType);
141 
153  std::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
154  };
155 
161 
163  public:
164  explicit GlobalEvFOutputModule(edm::ParameterSet const& ps);
165  ~GlobalEvFOutputModule() override;
166  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
167 
168  private:
169  std::unique_ptr<edm::StreamerOutputModuleCommon> beginStream(edm::StreamID) const final;
170 
171  std::shared_ptr<GlobalEvFOutputJSONDef> globalBeginRun(edm::RunForOutput const& run) const final;
172 
174  void write(edm::EventForOutput const& e) final;
175 
176  //pure in parent class but unused here
178  void writeRun(edm::RunForOutput const&) final {}
179  void globalEndRun(edm::RunForOutput const&) const final {}
180 
181  std::shared_ptr<GlobalEvFOutputEventWriter> globalBeginLuminosityBlock(
182  edm::LuminosityBlockForOutput const& iLB) const final;
183  void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const final;
184 
186 
191 
193 
194  }; //end-of-class-def
195 
197  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
198  LogDebug("GlobalEvFOutputModule") << "writing .dat files to -: " << baseRunDir;
199 
209  outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
212 
213  std::stringstream ss;
214  ss << baseRunDir << "/"
215  << "output_" << getpid() << ".jsd";
216  outJsonDefName_ = ss.str();
217 
218  if (writeJsd) {
219  std::stringstream tmpss;
220  tmpss << baseRunDir << "/open/"
221  << "output_" << getpid() << ".jsd";
222  std::string outTmpJsonDefName = tmpss.str();
223  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
224  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
225  struct stat fstat;
226  if (stat(outJsonDefName_.c_str(), &fstat) != 0) { //file does not exist
227  LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_;
231  std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
232  }
233  }
234  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
235  }
236 
240  }
241 
243  jsoncollector::DataPointDefinition const& outJsonDef,
244  std::string const& outJsonDefName,
245  jsoncollector::StringJ const& transferDestination,
246  jsoncollector::StringJ const& mergeType)
247  : processed_(0),
248  accepted_(0),
249  errorEvents_(0),
250  retCodeMask_(0),
251  filelist_(),
252  filesize_(0),
253  inputFiles_(),
254  fileAdler32_(1),
255  transferDestination_(transferDestination),
256  mergeType_(mergeType),
257  hltErrorEvents_(0) {
258  processed_.setName("Processed");
259  accepted_.setName("Accepted");
260  errorEvents_.setName("ErrorEvents");
261  retCodeMask_.setName("ReturnCodeMask");
262  filelist_.setName("Filelist");
263  filesize_.setName("Filesize");
264  inputFiles_.setName("InputFiles");
265  fileAdler32_.setName("FileAdler32");
266  transferDestination_.setName("TransferDestination");
267  mergeType_.setName("MergeType");
268  hltErrorEvents_.setName("HLTErrorEvents");
269 
270  jsonMonitor_.reset(new jsoncollector::FastMonitor(&outJsonDef, true));
271  jsonMonitor_->setDefPath(outJsonDefName);
272  jsonMonitor_->registerGlobalMonitorable(&processed_, false);
273  jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
274  jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
275  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
276  jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
277  jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
278  jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
279  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
280  jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
281  jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
282  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
283  jsonMonitor_->commit(nullptr);
284  }
285 
287  : edm::global::OutputModuleBase(ps),
289  ps_(ps),
290  streamLabel_(ps.getParameter<std::string>("@module_label")),
291  trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
292  psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
293  ps.getUntrackedParameter<edm::InputTag>("psetMap"))) {
294  //replace hltOutoputA with stream if the HLT menu uses this convention
295  std::string testPrefix = "hltOutput";
296  if (streamLabel_.find(testPrefix) == 0)
297  streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
298 
299  if (streamLabel_.find('_') != std::string::npos) {
300  throw cms::Exception("GlobalEvFOutputModule")
301  << "Underscore character is reserved can not be used for stream names in "
302  "FFF, but was detected in stream name -: "
303  << streamLabel_;
304  }
305 
306  std::string streamLabelLow = streamLabel_;
307  boost::algorithm::to_lower(streamLabelLow);
308  auto streampos = streamLabelLow.rfind("stream");
309  if (streampos != 0 && streampos != std::string::npos)
310  throw cms::Exception("GlobalEvFOutputModule")
311  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
312  "names in FFF based HLT, but was detected in stream name";
313 
314  //output initemp file. This lets hltd know number of streams early on
315  if (!edm::Service<evf::EvFDaqDirector>().isAvailable())
316  throw cms::Exception("GlobalEvFOutputModule") << "EvFDaqDirector is not available";
317 
318  const std::string iniFileName = edm::Service<evf::EvFDaqDirector>()->getInitTempFilePath(streamLabel_);
319  std::ofstream file(iniFileName);
320  if (!file)
321  throw cms::Exception("GlobalEvFOutputModule") << "can not create " << iniFileName << "error: " << strerror(errno);
322  file.close();
323 
324  edm::LogInfo("GlobalEvFOutputModule") << "Constructor created initemp file -: " << iniFileName;
325 
326  //create JSD
328 
330  }
331 
333 
338  desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
339  ->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
340  descriptions.add("globalEvfOutputModule", desc);
341  }
342 
343  std::unique_ptr<edm::StreamerOutputModuleCommon> GlobalEvFOutputModule::beginStream(edm::StreamID) const {
344  return std::make_unique<edm::StreamerOutputModuleCommon>(
346  }
347 
348  std::shared_ptr<GlobalEvFOutputJSONDef> GlobalEvFOutputModule::globalBeginRun(edm::RunForOutput const& run) const {
349  //create run Cache holding JSON file writer and variables
350  auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>(streamLabel_, false);
351  jsonDef->updateDestination(streamLabel_);
353 
354  //output INI file (non-const). This doesn't require globalBeginRun to be finished
355  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
356  edm::LogInfo("GlobalEvFOutputModule") << "beginRun init stream -: " << openIniFileName;
357 
358  StreamerOutputFile stream_writer_preamble(openIniFileName);
359  uint32 preamble_adler32 = 1;
360  edm::BranchIDLists const* bidlPtr = branchIDLists();
361 
362  auto psetMapHandle = run.getHandle(psetToken_);
363 
364  std::unique_ptr<InitMsgBuilder> init_message =
365  streamerCommon.serializeRegistry(*streamerCommon.getSerializerBuffer(),
366  *bidlPtr,
371  psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
372 
373  //Let us turn it into a View
374  InitMsgView view(init_message->startAddress());
375 
376  //output header
377  stream_writer_preamble.write(view);
378  preamble_adler32 = stream_writer_preamble.adler32();
379  stream_writer_preamble.close();
380 
381  struct stat istat;
382  stat(openIniFileName.c_str(), &istat);
383  //read back file to check integrity of what was written
384  off_t readInput = 0;
385  uint32_t adlera = 1, adlerb = 0;
386  std::ifstream src(openIniFileName, std::ifstream::binary);
387  if (!src)
388  throw cms::Exception("GlobalEvFOutputModule")
389  << "can not read back " << openIniFileName << " error: " << strerror(errno);
390 
391  //allocate buffer to write INI file
392  std::unique_ptr<char[]> outBuf = std::make_unique<char[]>(1024 * 1024);
393  while (readInput < istat.st_size) {
394  size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
395  src.read(outBuf.get(), toRead);
396  //cms::Adler32(const_cast<const char*>(reinterpret_cast<char*>(outBuf.get())), toRead, adlera, adlerb);
397  cms::Adler32(const_cast<const char*>(outBuf.get()), toRead, adlera, adlerb);
398  readInput += toRead;
399  }
400  src.close();
401 
402  //clear serialization buffers
403  streamerCommon.getSerializerBuffer()->clearHeaderBuffer();
404 
405  //free output buffer needed only for the file write
406  outBuf.reset();
407 
408  uint32_t adler32c = (adlerb << 16) | adlera;
409  if (adler32c != preamble_adler32) {
410  throw cms::Exception("GlobalEvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
411  << " expected:" << preamble_adler32 << " obtained:" << adler32c;
412  } else {
413  LogDebug("GlobalEvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
414  std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
415  }
416 
417  return jsonDef;
418  }
419 
421  edm::EventForOutput const& e) const {
422  Trig result;
423  e.getByToken<edm::TriggerResults>(token, result);
424  return result;
425  }
426 
427  std::shared_ptr<GlobalEvFOutputEventWriter> GlobalEvFOutputModule::globalBeginLuminosityBlock(
428  edm::LuminosityBlockForOutput const& iLB) const {
429  auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);
430 
431  return std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath, iLB.luminosityBlock());
432  }
433 
435  edm::EventForOutput const& e,
436  edm::WaitingTaskWithArenaHolder iHolder) const {
438 
439  auto streamerCommon = streamCache(id);
440  std::unique_ptr<EventMsgBuilder> msg =
441  streamerCommon->serializeEvent(*streamerCommon->getSerializerBuffer(), e, triggerResults, selectorConfig());
442 
443  auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
444  const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)
445  ->doOutputEventAsync(std::move(msg), iHolder.makeWaitingTaskHolderAndRelease());
446  }
448 
450  auto lumiWriter = luminosityBlockCache(iLB.index());
451  //close dat file
452  const bool discarded = const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)->close();
453 
454  //auto jsonWriter = const_cast<GlobalEvFOutputJSONWriter*>(runCache(iLB.getRun().index()));
455  auto jsonDef = runCache(iLB.getRun().index());
457  jsonDef->outJsonDef_,
458  jsonDef->outJsonDefName_,
459  jsonDef->transferDestination_,
460  jsonDef->mergeType_);
461 
462  jsonWriter.fileAdler32_.value() = lumiWriter->get_adler32();
463  jsonWriter.accepted_.value() = lumiWriter->getAccepted();
464 
465  bool abortFlag = false;
466 
467  if (!discarded) {
468  jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
469  } else {
470  jsonWriter.errorEvents_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
471  jsonWriter.processed_.value() = 0;
472  jsonWriter.accepted_.value() = 0;
473  edm::LogInfo("GlobalEvFOutputModule")
474  << "Output suppressed, setting error events for LS -: " << iLB.luminosityBlock();
475  }
476 
477  if (abortFlag) {
478  edm::LogInfo("GlobalEvFOutputModule") << "Abort flag has been set. Output is suppressed";
479  return;
480  }
481 
482  if (jsonWriter.processed_.value() != 0) {
483  struct stat istat;
484  std::filesystem::path openDatFilePath = lumiWriter->getFilePath();
485  stat(openDatFilePath.string().c_str(), &istat);
486  jsonWriter.filesize_ = istat.st_size;
487  std::filesystem::rename(openDatFilePath.string().c_str(),
489  jsonWriter.filelist_ = openDatFilePath.filename().string();
490  } else {
491  //remove empty file when no event processing has occurred
492  remove(lumiWriter->getFilePath().c_str());
493  jsonWriter.filesize_ = 0;
494  jsonWriter.filelist_ = "";
495  jsonWriter.fileAdler32_.value() = -1; //no files in signed long
496  }
497 
498  //produce JSON file
499  jsonWriter.jsonMonitor_->snap(iLB.luminosityBlock());
500  const std::string outputJsonNameStream =
501  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_);
502  jsonWriter.jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock());
503  }
504 
505 } // namespace evf
506 
507 using namespace evf;
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
RunIndex index() const
Definition: RunForOutput.cc:32
Definition: fillJson.h:27
std::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
edm::global::OutputModule< edm::RunCache< GlobalEvFOutputJSONDef >, edm::LuminosityBlockCache< evf::GlobalEvFOutputEventWriter >, edm::StreamCache< edm::StreamerOutputModuleCommon >, edm::ExternalWork > GlobalEvFOutputModuleType
jsoncollector::DataPointDefinition outJsonDef_
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
LuminosityBlockNumber_t luminosityBlock() const
volatile std::atomic< bool > shutdown_flag
std::shared_ptr< GlobalEvFOutputJSONDef > globalBeginRun(edm::RunForOutput const &run) const final
static bool serialize(JsonSerializable *pObj, std::string &output)
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
void write(const InitMsgBuilder &)
edm::EDGetTokenT< edm::TriggerResults > trToken_
std::string to_lower(const std::string &s)
std::string const & getFilePath() const
LuminosityBlockIndex index() const
ParameterSetID const & mainParameterSetID() const
std::unique_ptr< edm::StreamerOutputModuleCommon > beginStream(edm::StreamID) const final
void doOutputEvent(EventMsgBuilder const &msg)
ModuleDescription const & description() const
edm::EDGetTokenT< edm::SendJobHeader::ParameterSetMap > psetToken_
bool isValid() const
Definition: Hash.h:141
void globalEndRun(edm::RunForOutput const &) const final
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
oneapi::tbb::task_group * group() const noexcept
void updateDestination(std::string const &streamLabel)
void writeRun(edm::RunForOutput const &) final
uint32 adler32() const
BranchIDLists const * branchIDLists() const
static void fillDescription(ParameterSetDescription &desc)
virtual void setName(std::string name)
RunForOutput const & getRun() const
jsoncollector::StringJ transferDestination_
std::atomic< unsigned long > accepted_
Trig getTriggerResults(edm::EDGetTokenT< edm::TriggerResults > const &token, edm::EventForOutput const &e) const
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
std::shared_ptr< GlobalEvFOutputEventWriter > globalBeginLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const final
static std::string const triggerResults
Definition: EdmProvDump.cc:47
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
evf::FastMonitoringService * fms_
void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const final
unsigned int uint32
Definition: MsgTools.h:13
Log< level::Info, false > LogInfo
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
def ls(path, rec=False)
Definition: eostools.py:349
ParameterSetID selectorConfig() const
SelectedProductsForBranchType const & keptProducts() const
std::unique_ptr< InitMsgBuilder > serializeRegistry(SerializeDataBuffer &sbuf, BranchIDLists const &branchLists, ThinnedAssociationsHelper const &helper, std::string const &processName, std::string const &moduleLabel, ParameterSetID const &toplevel, SendJobHeader::ParameterSetMap const *psetMap)
jsoncollector::StringJ transferDestination_
def load(fileName)
Definition: svgfig.py:547
GlobalEvFOutputEventWriter(std::string const &filePath, unsigned int ls)
void write(edm::EventForOutput const &e) final
GlobalEvFOutputJSONDef(std::string const &streamLabel, bool writeJsd)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:286
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
edm::ParameterSet const & ps_
HLT enums.
void acquire(edm::StreamID, edm::EventForOutput const &, edm::WaitingTaskWithArenaHolder) const final
edm::detail::TriggerResultsBasedEventSelector::handle_t Trig
void doOutputEventAsync(std::unique_ptr< EventMsgBuilder > msg, edm::WaitingTaskHolder iHolder)
void writeLuminosityBlock(edm::LuminosityBlockForOutput const &) final
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
GlobalEvFOutputModule(edm::ParameterSet const &ps)
edm::propagate_const< std::unique_ptr< StreamerOutputFile > > stream_writer_events_
const ModuleDescription & moduleDescription() const
void setDefaultGroup(std::string const &group)
Log< level::Warning, false > LogWarning
tmp
align.sh
Definition: createJobs.py:716
std::string const & moduleLabel() const
static void fillDescription(ParameterSetDescription &desc, std::vector< std::string > const &iDefaultOutputCommands=ProductSelectorRules::defaultSelectionStrings())
GlobalEvFOutputJSONWriter(std::string const &streamLabel, jsoncollector::DataPointDefinition const &, std::string const &outJsonDefName, jsoncollector::StringJ const &transferDestination, jsoncollector::StringJ const &mergeType)
def move(src, dest)
Definition: eostools.py:511
jsoncollector::StringJ mergeType_
#define LogDebug(id)