36 #include <boost/algorithm/string.hpp> 52 checksum_ =
ret.second;
63 std::shared_ptr<MetaDataCache const> iMetaData)
75 stream_writer_events_->close();
81 stream_writer_events_->write(eview);
97 std::unique_ptr<EventMsgBuilder> own(
msg);
101 doOutputEvent(*
m->builder_,
false);
103 doOutputEvent(*
msg,
true);
106 tmp.doneWaiting(std::current_exception());
117 edm::LogWarning(
"FedRawDataInputSource") <<
"Input throttled detected, writing is paused...";
121 edm::LogWarning(
"FedRawDataInputSource") <<
"Detected that the lumisection is discarded -: " << ls_;
129 edm::LogWarning(
"FedRawDataInputSource") <<
"Detected that the lumisection is discarded -: " << ls_;
148 std::shared_ptr<MetaDataCache const>
meta_;
150 bool discarded_ =
false;
200 std::unique_ptr<SerializeDataBuffer> beginStream(
edm::StreamID)
const final;
202 std::shared_ptr<GlobalEvFOutputJSONDef> globalBeginRun(
edm::RunForOutput const&
run)
const final;
216 void cacheEventMetaData();
236 unsigned int presentBranchIDListSize_ = 0;
241 LogDebug(
"GlobalEvFOutputModule") <<
"writing .dat files to -: " << baseRunDir;
243 outJsonDef_.setDefaultGroup(
"data");
256 std::stringstream
ss;
257 ss << baseRunDir <<
"/" 258 <<
"output_" << getpid() <<
".jsd";
259 outJsonDefName_ =
ss.str();
262 std::stringstream tmpss;
263 tmpss << baseRunDir <<
"/open/" 264 <<
"output_" << getpid() <<
".jsd";
269 if (
stat(outJsonDefName_.c_str(), &fstat) != 0) {
270 LogDebug(
"GlobalEvFOutputModule") <<
"writing output definition file -: " << outJsonDefName_;
274 std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
298 transferDestination_(transferDestination),
299 mergeType_(mergeType),
333 streamLabel_(ps.getParameter<
std::
string>(
"@module_label")),
336 ps.getUntrackedParameter<
edm::
InputTag>(
"psetMap"))) {
344 <<
"Underscore character is reserved can not be used for stream names in " 345 "FFF, but was detected in stream name -: " 351 auto streampos = streamLabelLow.rfind(
"stream");
352 if (streampos != 0 && streampos != std::string::npos)
354 <<
"stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for " 355 "names in FFF based HLT, but was detected in stream name";
359 throw cms::Exception(
"GlobalEvFOutputModule") <<
"EvFDaqDirector is not available";
362 std::ofstream
file(iniFileName);
364 throw cms::Exception(
"GlobalEvFOutputModule") <<
"can not create " << iniFileName <<
"error: " << strerror(errno);
367 edm::LogInfo(
"GlobalEvFOutputModule") <<
"Constructor created initemp file -: " << iniFileName;
382 ->setComment(
"Optionally allow the map of ParameterSets to be calculated externally.");
383 descriptions.
add(
"globalEvfOutputModule",
desc);
387 return std::make_unique<SerializeDataBuffer>();
392 auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>(
streamLabel_,
false);
397 edm::LogInfo(
"GlobalEvFOutputModule") <<
"beginRun init stream -: " << openIniFileName;
400 uint32 preamble_adler32 = 1;
405 std::unique_ptr<InitMsgBuilder> init_message =
410 psetMapHandle.isValid() ? psetMapHandle.product() :
nullptr);
417 preamble_adler32 = stream_writer_preamble.
adler32();
418 stream_writer_preamble.
close();
421 stat(openIniFileName.c_str(), &istat);
424 uint32_t adlera = 1, adlerb = 0;
425 std::ifstream
src(openIniFileName, std::ifstream::binary);
428 <<
"can not read back " << openIniFileName <<
" error: " << strerror(errno);
431 std::unique_ptr<char[]> outBuf = std::make_unique<char[]>(1024 * 1024);
432 while (readInput < istat.st_size) {
433 size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
434 src.read(outBuf.get(), toRead);
436 cms::Adler32(const_cast<const char*>(outBuf.get()), toRead, adlera, adlerb);
444 uint32_t adler32c = (adlerb << 16) | adlera;
445 if (adler32c != preamble_adler32) {
446 throw cms::Exception(
"GlobalEvFOutputModule") <<
"Checksum mismatch of ini file -: " << openIniFileName
447 <<
" expected:" << preamble_adler32 <<
" obtained:" << adler32c;
449 LogDebug(
"GlobalEvFOutputModule") <<
"Ini file checksum -: " <<
streamLabel_ <<
" " << adler32c;
473 msgBuilders_ = std::make_unique<StreamerOutputMsgBuilders>(
497 auto buffer = streamCache(
id);
498 std::unique_ptr<EventMsgBuilder>
msg =
501 auto lumiWriter = luminosityBlockCache(
e.getLuminosityBlock().index());
508 auto lumiWriter = luminosityBlockCache(iLB.
index());
515 jsonDef->outJsonDef_,
516 jsonDef->outJsonDefName_,
517 jsonDef->transferDestination_,
518 jsonDef->mergeType_);
520 jsonWriter.fileAdler32_.value() = lumiWriter->get_adler32();
521 jsonWriter.accepted_.value() = lumiWriter->getAccepted();
523 bool abortFlag =
false;
529 jsonWriter.processed_.value() = 0;
530 jsonWriter.accepted_.value() = 0;
532 <<
"Output suppressed, setting error events for LS -: " << iLB.
luminosityBlock();
536 edm::LogInfo(
"GlobalEvFOutputModule") <<
"Abort flag has been set. Output is suppressed";
540 if (jsonWriter.processed_.value() != 0) {
543 stat(openDatFilePath.string().c_str(), &istat);
544 jsonWriter.filesize_ = istat.st_size;
545 std::filesystem::rename(openDatFilePath.string().c_str(),
547 jsonWriter.filelist_ = openDatFilePath.filename().string();
550 remove(lumiWriter->getFilePath().c_str());
551 jsonWriter.filesize_ = 0;
552 jsonWriter.filelist_ =
"";
553 jsonWriter.fileAdler32_.value() = -1;
560 jsonWriter.jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.
luminosityBlock());
void doOutputEvent(EventMsgBuilder const &msg, bool inc)
std::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
void cacheEventMetaData()
jsoncollector::DataPointDefinition outJsonDef_
static const std::string BINARYOR
std::vector< BranchIDList > BranchIDLists
std::string outJsonDefName_
LuminosityBlockNumber_t luminosityBlock() const
std::atomic< GlobalEvFOutputEventWriter * > lastWriter_
static const std::string ADLER32
ret
prodAgent to be discontinued
static const std::string SUM
void respondToCloseInputFile(edm::FileBlock const &) final
volatile std::atomic< bool > shutdown_flag
std::shared_ptr< GlobalEvFOutputJSONDef > globalBeginRun(edm::RunForOutput const &run) const final
static bool serialize(JsonSerializable *pObj, std::string &output)
edm::EDGetTokenT< edm::TriggerResults > trToken_
StreamerOutputMsgBuilders::Parameters commonParameters_
std::string to_lower(const std::string &s)
std::string const & getFilePath() const
LuminosityBlockIndex index() const
void write(const InitMsgBuilder &)
std::unique_ptr< T, impl::DeviceDeleter > unique_ptr
jsoncollector::IntJ processed_
ModuleDescription const & description() const
edm::EDGetTokenT< edm::SendJobHeader::ParameterSetMap > psetToken_
jsoncollector::StringJ inputFiles_
jsoncollector::IntJ filesize_
jsoncollector::IntJ accepted_
jsoncollector::IntJ hltErrorEvents_
edm::SerialTaskQueue writeQueue_
void globalEndRun(edm::RunForOutput const &) const final
std::shared_ptr< MetaDataCache const > metaDataCache_
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
oneapi::tbb::task_group * group() const noexcept
void updateDestination(std::string const &streamLabel)
static void fillDescription(ParameterSetDescription &desc)
void writeRun(edm::RunForOutput const &) final
void respondToOpenInputFile(edm::FileBlock const &) final
BranchIDLists const * branchIDLists() const
virtual void setName(std::string name)
jsoncollector::IntJ retCodeMask_
RunForOutput const & getRun() const
GlobalEvFOutputEventWriter(std::string const &filePath, unsigned int ls, std::shared_ptr< MetaDataCache const > iMetaData)
static const std::string MERGE
jsoncollector::StringJ transferDestination_
std::atomic< unsigned long > accepted_
Trig getTriggerResults(edm::EDGetTokenT< edm::TriggerResults > const &token, edm::EventForOutput const &e) const
filePath
CUSTOMIZE FOR ML.
#define DEFINE_FWK_MODULE(type)
std::unique_ptr< SerializeDataBuffer > beginStream(edm::StreamID) const final
std::shared_ptr< GlobalEvFOutputEventWriter > globalBeginLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const final
static std::string const triggerResults
static void writeStringToFile(std::string const &filename, std::string &content)
evf::FastMonitoringService * fms_
std::shared_ptr< MetaDataCache const > meta_
void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const final
static const std::string CAT
Log< level::Info, false > LogInfo
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
~GlobalEvFOutputModule() override
jsoncollector::StringJ filelist_
jsoncollector::IntJ fileAdler32_
ParameterSetID selectorConfig() const
SelectedProductsForBranchType const & keptProducts() const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
WaitingTaskHolder makeWaitingTaskHolderAndRelease()
std::vector< bool > getTriggerResults(const std::vector< unsigned int > &triggers, const edm::TriggerResults &triggerResults)
jsoncollector::StringJ transferDestination_
void write(edm::EventForOutput const &e) final
edm::global::OutputModule< edm::RunCache< GlobalEvFOutputJSONDef >, edm::LuminosityBlockCache< evf::GlobalEvFOutputEventWriter >, edm::StreamCache< SerializeDataBuffer >, edm::WatchInputFiles, edm::ExternalWork > GlobalEvFOutputModuleType
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
jsoncollector::StringJ mergeType_
std::vector< AlignmentParameters * > Parameters
unsigned int presentBranchIDListSize_
edm::SerialTaskQueue & queue()
void acquire(edm::StreamID, edm::EventForOutput const &, edm::WaitingTaskWithArenaHolder) const final
edm::detail::TriggerResultsBasedEventSelector::handle_t Trig
void doOutputEventAsync(std::unique_ptr< EventMsgBuilder > msg, edm::WaitingTaskHolder iHolder)
unsigned long getAccepted() const
std::unique_ptr< const StreamerOutputMsgBuilders > msgBuilders_
void writeLuminosityBlock(edm::LuminosityBlockForOutput const &) final
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
GlobalEvFOutputModule(edm::ParameterSet const &ps)
edm::propagate_const< std::unique_ptr< StreamerOutputFile > > stream_writer_events_
const ModuleDescription & moduleDescription() const
Log< level::Warning, false > LogWarning
void setMetaCache(std::shared_ptr< MetaDataCache const > iMetaData)
std::string const & moduleLabel() const
static void fillDescription(ParameterSetDescription &desc, std::vector< std::string > const &iDefaultOutputCommands=ProductSelectorRules::defaultSelectionStrings())
uint32 get_adler32() const
GlobalEvFOutputJSONWriter(std::string const &streamLabel, jsoncollector::DataPointDefinition const &, std::string const &outJsonDefName, jsoncollector::StringJ const &transferDestination, jsoncollector::StringJ const &mergeType)
jsoncollector::IntJ errorEvents_
~GlobalEvFOutputEventWriter()
jsoncollector::StringJ mergeType_
static const std::string SAME