CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
GlobalEvFOutputModule.cc
Go to the documentation of this file.
2 
6 
10 
16 
18 
21 
24 
28 
31 
33 
34 #include <sys/stat.h>
35 #include <filesystem>
36 #include <boost/algorithm/string.hpp>
37 
39 
40 namespace evf {
41 
43 
45  public:
48 
50 
51  void close() { stream_writer_events_->close(); }
52 
54  EventMsgView eview(msg.startAddress());
55  stream_writer_events_->write(eview);
56  incAccepted();
57  }
58 
59  void doOutputEventAsync(std::unique_ptr<EventMsgBuilder> msg, edm::WaitingTaskHolder iHolder) {
61  auto group = iHolder.group();
62  writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() {
63  try {
64  std::unique_ptr<EventMsgBuilder> own(msg);
65  doOutputEvent(*msg); //msg is written and discarded at this point
66  } catch (...) {
67  auto tmp = holder;
68  tmp.doneWaiting(std::current_exception());
69  }
70  });
71  }
72 
73  inline void throttledCheck() {
74  unsigned int counter = 0;
75  while (edm::Service<evf::EvFDaqDirector>()->inputThrottled()) {
76  if (edm::shutdown_flag.load(std::memory_order_relaxed))
77  break;
78  if (!(counter % 100))
79  edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
80  usleep(100000);
81  counter++;
82  }
83  }
84 
85  uint32 get_adler32() const { return stream_writer_events_->adler32(); }
86 
87  std::string const& getFilePath() const { return filePath_; }
88 
89  unsigned long getAccepted() const { return accepted_; }
90  void incAccepted() { accepted_++; }
91 
93 
94  private:
96  std::atomic<unsigned long> accepted_;
99  };
100 
102  public:
104 
109  };
110 
112  public:
115  std::string const& outJsonDefName,
116  jsoncollector::StringJ const& transferDestination,
117  jsoncollector::StringJ const& mergeType);
118 
130  std::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
131  };
132 
138 
140  public:
141  explicit GlobalEvFOutputModule(edm::ParameterSet const& ps);
142  ~GlobalEvFOutputModule() override;
143  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
144 
145  private:
146  std::unique_ptr<edm::StreamerOutputModuleCommon> beginStream(edm::StreamID) const final;
147 
148  std::shared_ptr<GlobalEvFOutputJSONDef> globalBeginRun(edm::RunForOutput const& run) const final;
149 
151  void write(edm::EventForOutput const& e) final;
152 
153  //pure in parent class but unused here
155  void writeRun(edm::RunForOutput const&) final {}
156  void globalEndRun(edm::RunForOutput const&) const final {}
157 
158  std::shared_ptr<GlobalEvFOutputEventWriter> globalBeginLuminosityBlock(
159  edm::LuminosityBlockForOutput const& iLB) const final;
160  void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const final;
161 
163 
168 
170 
171  }; //end-of-class-def
172 
174  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
175  LogDebug("GlobalEvFOutputModule") << "writing .dat files to -: " << baseRunDir;
176 
177  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
178 
188  outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
191 
192  std::stringstream tmpss, ss;
193  tmpss << baseRunDir << "/open/"
194  << "output_" << getpid() << ".jsd";
195  ss << baseRunDir << "/"
196  << "output_" << getpid() << ".jsd";
197  std::string outTmpJsonDefName = tmpss.str();
198  outJsonDefName_ = ss.str();
199 
200  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
201  struct stat fstat;
202  if (stat(outJsonDefName_.c_str(), &fstat) != 0) { //file does not exist
203  LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_;
207  std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
208  }
209  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
210 
213  }
214 
216  jsoncollector::DataPointDefinition const& outJsonDef,
217  std::string const& outJsonDefName,
218  jsoncollector::StringJ const& transferDestination,
219  jsoncollector::StringJ const& mergeType)
220  : processed_(0),
221  accepted_(0),
222  errorEvents_(0),
223  retCodeMask_(0),
224  filelist_(),
225  filesize_(0),
226  inputFiles_(),
227  fileAdler32_(1),
228  transferDestination_(transferDestination),
229  mergeType_(mergeType),
230  hltErrorEvents_(0) {
231  processed_.setName("Processed");
232  accepted_.setName("Accepted");
233  errorEvents_.setName("ErrorEvents");
234  retCodeMask_.setName("ReturnCodeMask");
235  filelist_.setName("Filelist");
236  filesize_.setName("Filesize");
237  inputFiles_.setName("InputFiles");
238  fileAdler32_.setName("FileAdler32");
239  transferDestination_.setName("TransferDestination");
240  mergeType_.setName("MergeType");
241  hltErrorEvents_.setName("HLTErrorEvents");
242 
243  jsonMonitor_.reset(new jsoncollector::FastMonitor(&outJsonDef, true));
244  jsonMonitor_->setDefPath(outJsonDefName);
245  jsonMonitor_->registerGlobalMonitorable(&processed_, false);
246  jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
247  jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
248  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
249  jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
250  jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
251  jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
252  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
253  jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
254  jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
255  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
256  jsonMonitor_->commit(nullptr);
257  }
258 
260  : edm::global::OutputModuleBase(ps),
262  ps_(ps),
263  streamLabel_(ps.getParameter<std::string>("@module_label")),
264  trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
265  psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
266  ps.getUntrackedParameter<edm::InputTag>("psetMap"))) {
267  //replace hltOutoputA with stream if the HLT menu uses this convention
268  std::string testPrefix = "hltOutput";
269  if (streamLabel_.find(testPrefix) == 0)
270  streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
271 
272  if (streamLabel_.find('_') != std::string::npos) {
273  throw cms::Exception("GlobalEvFOutputModule")
274  << "Underscore character is reserved can not be used for stream names in "
275  "FFF, but was detected in stream name -: "
276  << streamLabel_;
277  }
278 
279  std::string streamLabelLow = streamLabel_;
280  boost::algorithm::to_lower(streamLabelLow);
281  auto streampos = streamLabelLow.rfind("stream");
282  if (streampos != 0 && streampos != std::string::npos)
283  throw cms::Exception("GlobalEvFOutputModule")
284  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
285  "names in FFF based HLT, but was detected in stream name";
286 
288  }
289 
291 
296  desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
297  ->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
298  descriptions.add("globalEvfOutputModule", desc);
299  }
300 
301  std::unique_ptr<edm::StreamerOutputModuleCommon> GlobalEvFOutputModule::beginStream(edm::StreamID) const {
302  return std::make_unique<edm::StreamerOutputModuleCommon>(
304  }
305 
306  std::shared_ptr<GlobalEvFOutputJSONDef> GlobalEvFOutputModule::globalBeginRun(edm::RunForOutput const& run) const {
307  //create run Cache holding JSON file writer and variables
308  auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>(streamLabel_);
309 
311 
312  //output INI file (non-const). This doesn't require globalBeginRun to be finished
313  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
314  edm::LogInfo("GlobalEvFOutputModule") << "beginRun init stream -: " << openIniFileName;
315 
316  StreamerOutputFile stream_writer_preamble(openIniFileName);
317  uint32 preamble_adler32 = 1;
318  edm::BranchIDLists const* bidlPtr = branchIDLists();
319 
320  auto psetMapHandle = run.getHandle(psetToken_);
321 
322  std::unique_ptr<InitMsgBuilder> init_message =
323  streamerCommon.serializeRegistry(*streamerCommon.getSerializerBuffer(),
324  *bidlPtr,
329  psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
330 
331  //Let us turn it into a View
332  InitMsgView view(init_message->startAddress());
333 
334  //output header
335  stream_writer_preamble.write(view);
336  preamble_adler32 = stream_writer_preamble.adler32();
337  stream_writer_preamble.close();
338 
339  struct stat istat;
340  stat(openIniFileName.c_str(), &istat);
341  //read back file to check integrity of what was written
342  off_t readInput = 0;
343  uint32_t adlera = 1, adlerb = 0;
344  FILE* src = fopen(openIniFileName.c_str(), "r");
345 
346  //allocate buffer to write INI file
347  std::unique_ptr<unsigned char[]> outBuf = std::make_unique<unsigned char[]>(1024 * 1024);
348  while (readInput < istat.st_size) {
349  size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
350  fread(outBuf.get(), toRead, 1, src);
351  cms::Adler32(const_cast<const char*>(reinterpret_cast<char*>(outBuf.get())), toRead, adlera, adlerb);
352  readInput += toRead;
353  }
354  fclose(src);
355 
356  //clear serialization buffers
357  streamerCommon.getSerializerBuffer()->clearHeaderBuffer();
358 
359  //free output buffer needed only for the file write
360  outBuf.reset();
361 
362  uint32_t adler32c = (adlerb << 16) | adlera;
363  if (adler32c != preamble_adler32) {
364  throw cms::Exception("GlobalEvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
365  << " expected:" << preamble_adler32 << " obtained:" << adler32c;
366  } else {
367  LogDebug("GlobalEvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
368  std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
369  }
370 
371  return jsonDef;
372  }
373 
375  edm::EventForOutput const& e) const {
376  Trig result;
377  e.getByToken<edm::TriggerResults>(token, result);
378  return result;
379  }
380 
381  std::shared_ptr<GlobalEvFOutputEventWriter> GlobalEvFOutputModule::globalBeginLuminosityBlock(
382  edm::LuminosityBlockForOutput const& iLB) const {
383  auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);
384 
385  return std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath);
386  }
387 
389  edm::EventForOutput const& e,
390  edm::WaitingTaskWithArenaHolder iHolder) const {
392 
393  auto streamerCommon = streamCache(id);
394  std::unique_ptr<EventMsgBuilder> msg =
395  streamerCommon->serializeEvent(*streamerCommon->getSerializerBuffer(), e, triggerResults, selectorConfig());
396 
397  auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
398  const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)
399  ->doOutputEventAsync(std::move(msg), iHolder.makeWaitingTaskHolderAndRelease());
400  }
402 
404  auto lumiWriter = luminosityBlockCache(iLB.index());
405  //close dat file
406  const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)->close();
407 
408  //auto jsonWriter = const_cast<GlobalEvFOutputJSONWriter*>(runCache(iLB.getRun().index()));
409  auto jsonDef = runCache(iLB.getRun().index());
411  jsonDef->outJsonDef_,
412  jsonDef->outJsonDefName_,
413  jsonDef->transferDestination_,
414  jsonDef->mergeType_);
415 
416  jsonWriter.fileAdler32_.value() = lumiWriter->get_adler32();
417  jsonWriter.accepted_.value() = lumiWriter->getAccepted();
418 
419  bool abortFlag = false;
420  jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
421  if (abortFlag) {
422  edm::LogInfo("GlobalEvFOutputModule") << "Abort flag has been set. Output is suppressed";
423  return;
424  }
425 
426  if (jsonWriter.processed_.value() != 0) {
427  struct stat istat;
428  std::filesystem::path openDatFilePath = lumiWriter->getFilePath();
429  stat(openDatFilePath.string().c_str(), &istat);
430  jsonWriter.filesize_ = istat.st_size;
431  std::filesystem::rename(openDatFilePath.string().c_str(),
433  jsonWriter.filelist_ = openDatFilePath.filename().string();
434  } else {
435  //remove empty file when no event processing has occurred
436  remove(lumiWriter->getFilePath().c_str());
437  jsonWriter.filesize_ = 0;
438  jsonWriter.filelist_ = "";
439  jsonWriter.fileAdler32_.value() = -1; //no files in signed long
440  }
441 
442  //produce JSON file
443  jsonWriter.jsonMonitor_->snap(iLB.luminosityBlock());
444  const std::string outputJsonNameStream =
445  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_);
446  jsonWriter.jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock());
447  }
448 
449 } // namespace evf
450 
451 using namespace evf;
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
RunIndex index() const
Definition: RunForOutput.cc:32
Definition: fillJson.h:27
std::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
edm::global::OutputModule< edm::RunCache< GlobalEvFOutputJSONDef >, edm::LuminosityBlockCache< evf::GlobalEvFOutputEventWriter >, edm::StreamCache< edm::StreamerOutputModuleCommon >, edm::ExternalWork > GlobalEvFOutputModuleType
GlobalEvFOutputEventWriter(std::string const &filePath)
jsoncollector::DataPointDefinition outJsonDef_
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
LuminosityBlockNumber_t luminosityBlock() const
volatile std::atomic< bool > shutdown_flag
std::shared_ptr< GlobalEvFOutputJSONDef > globalBeginRun(edm::RunForOutput const &run) const final
static bool serialize(JsonSerializable *pObj, std::string &output)
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
void write(const InitMsgBuilder &)
edm::EDGetTokenT< edm::TriggerResults > trToken_
std::string to_lower(const std::string &s)
std::string const & getFilePath() const
LuminosityBlockIndex index() const
ParameterSetID const & mainParameterSetID() const
std::unique_ptr< edm::StreamerOutputModuleCommon > beginStream(edm::StreamID) const final
void doOutputEvent(EventMsgBuilder const &msg)
ModuleDescription const & description() const
edm::EDGetTokenT< edm::SendJobHeader::ParameterSetMap > psetToken_
bool isValid() const
Definition: Hash.h:141
void globalEndRun(edm::RunForOutput const &) const final
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
oneapi::tbb::task_group * group() const noexcept
GlobalEvFOutputJSONDef(std::string const &streamLabel)
void writeRun(edm::RunForOutput const &) final
uint32 adler32() const
BranchIDLists const * branchIDLists() const
static void fillDescription(ParameterSetDescription &desc)
virtual void setName(std::string name)
RunForOutput const & getRun() const
jsoncollector::StringJ transferDestination_
std::atomic< unsigned long > accepted_
Trig getTriggerResults(edm::EDGetTokenT< edm::TriggerResults > const &token, edm::EventForOutput const &e) const
#define DEFINE_FWK_MODULE(type)
Definition: MakerMacros.h:16
std::shared_ptr< GlobalEvFOutputEventWriter > globalBeginLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const final
static std::string const triggerResults
Definition: EdmProvDump.cc:47
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
evf::FastMonitoringService * fms_
void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const final
unsigned int uint32
Definition: MsgTools.h:13
Log< level::Info, false > LogInfo
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
ParameterSetID selectorConfig() const
SelectedProductsForBranchType const & keptProducts() const
jsoncollector::StringJ transferDestination_
def load(fileName)
Definition: svgfig.py:547
void write(edm::EventForOutput const &e) final
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:286
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
edm::ParameterSet const & ps_
HLT enums.
void acquire(edm::StreamID, edm::EventForOutput const &, edm::WaitingTaskWithArenaHolder) const final
edm::detail::TriggerResultsBasedEventSelector::handle_t Trig
void doOutputEventAsync(std::unique_ptr< EventMsgBuilder > msg, edm::WaitingTaskHolder iHolder)
void writeLuminosityBlock(edm::LuminosityBlockForOutput const &) final
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
GlobalEvFOutputModule(edm::ParameterSet const &ps)
edm::propagate_const< std::unique_ptr< StreamerOutputFile > > stream_writer_events_
const ModuleDescription & moduleDescription() const
void setDefaultGroup(std::string const &group)
Log< level::Warning, false > LogWarning
tmp
align.sh
Definition: createJobs.py:716
std::string const & moduleLabel() const
static void fillDescription(ParameterSetDescription &desc, std::vector< std::string > const &iDefaultOutputCommands=ProductSelectorRules::defaultSelectionStrings())
GlobalEvFOutputJSONWriter(std::string const &streamLabel, jsoncollector::DataPointDefinition const &, std::string const &outJsonDefName, jsoncollector::StringJ const &transferDestination, jsoncollector::StringJ const &mergeType)
def move(src, dest)
Definition: eostools.py:511
jsoncollector::StringJ mergeType_
#define LogDebug(id)