36 #include <boost/algorithm/string.hpp> 73 std::unique_ptr<EventMsgBuilder> own(
msg);
77 tmp.doneWaiting(std::current_exception());
88 edm::LogWarning(
"FedRawDataInputSource") <<
"Input throttled detected, writing is paused...";
92 edm::LogWarning(
"FedRawDataInputSource") <<
"Detected that the lumisection is discarded -: " <<
ls_;
100 edm::LogWarning(
"FedRawDataInputSource") <<
"Detected that the lumisection is discarded -: " <<
ls_;
198 LogDebug(
"GlobalEvFOutputModule") <<
"writing .dat files to -: " << baseRunDir;
213 std::stringstream
ss;
214 ss << baseRunDir <<
"/" 215 <<
"output_" << getpid() <<
".jsd";
219 std::stringstream tmpss;
220 tmpss << baseRunDir <<
"/open/" 221 <<
"output_" << getpid() <<
".jsd";
255 transferDestination_(transferDestination),
256 mergeType_(mergeType),
290 streamLabel_(ps.getParameter<
std::
string>(
"@module_label")),
293 ps.getUntrackedParameter<
edm::
InputTag>(
"psetMap"))) {
301 <<
"Underscore character is reserved can not be used for stream names in " 302 "FFF, but was detected in stream name -: " 308 auto streampos = streamLabelLow.rfind(
"stream");
309 if (streampos != 0 && streampos != std::string::npos)
311 <<
"stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for " 312 "names in FFF based HLT, but was detected in stream name";
316 throw cms::Exception(
"GlobalEvFOutputModule") <<
"EvFDaqDirector is not available";
319 std::ofstream
file(iniFileName);
321 throw cms::Exception(
"GlobalEvFOutputModule") <<
"can not create " << iniFileName <<
"error: " << strerror(errno);
324 edm::LogInfo(
"GlobalEvFOutputModule") <<
"Constructor created initemp file -: " << iniFileName;
339 ->setComment(
"Optionally allow the map of ParameterSets to be calculated externally.");
340 descriptions.
add(
"globalEvfOutputModule",
desc);
344 return std::make_unique<edm::StreamerOutputModuleCommon>(
350 auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>(
streamLabel_,
false);
356 edm::LogInfo(
"GlobalEvFOutputModule") <<
"beginRun init stream -: " << openIniFileName;
359 uint32 preamble_adler32 = 1;
364 std::unique_ptr<InitMsgBuilder> init_message =
371 psetMapHandle.
isValid() ? psetMapHandle.product() :
nullptr);
378 preamble_adler32 = stream_writer_preamble.
adler32();
379 stream_writer_preamble.
close();
382 stat(openIniFileName.c_str(), &istat);
385 uint32_t adlera = 1, adlerb = 0;
386 std::ifstream
src(openIniFileName, std::ifstream::binary);
389 <<
"can not read back " << openIniFileName <<
" error: " << strerror(errno);
392 std::unique_ptr<char[]> outBuf = std::make_unique<char[]>(1024 * 1024);
393 while (readInput < istat.st_size) {
394 size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
395 src.read(outBuf.get(), toRead);
397 cms::Adler32(const_cast<const char*>(outBuf.get()), toRead, adlera, adlerb);
408 uint32_t adler32c = (adlerb << 16) | adlera;
409 if (adler32c != preamble_adler32) {
410 throw cms::Exception(
"GlobalEvFOutputModule") <<
"Checksum mismatch of ini file -: " << openIniFileName
411 <<
" expected:" << preamble_adler32 <<
" obtained:" << adler32c;
413 LogDebug(
"GlobalEvFOutputModule") <<
"Ini file checksum -: " <<
streamLabel_ <<
" " << adler32c;
431 return std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath, iLB.
luminosityBlock());
439 auto streamerCommon = streamCache(
id);
440 std::unique_ptr<EventMsgBuilder>
msg =
443 auto lumiWriter = luminosityBlockCache(
e.getLuminosityBlock().index());
450 auto lumiWriter = luminosityBlockCache(iLB.
index());
457 jsonDef->outJsonDef_,
458 jsonDef->outJsonDefName_,
459 jsonDef->transferDestination_,
460 jsonDef->mergeType_);
462 jsonWriter.fileAdler32_.value() = lumiWriter->get_adler32();
463 jsonWriter.accepted_.value() = lumiWriter->getAccepted();
465 bool abortFlag =
false;
471 jsonWriter.processed_.value() = 0;
472 jsonWriter.accepted_.value() = 0;
474 <<
"Output suppressed, setting error events for LS -: " << iLB.
luminosityBlock();
478 edm::LogInfo(
"GlobalEvFOutputModule") <<
"Abort flag has been set. Output is suppressed";
482 if (jsonWriter.processed_.value() != 0) {
485 stat(openDatFilePath.string().c_str(), &istat);
486 jsonWriter.filesize_ = istat.st_size;
487 std::filesystem::rename(openDatFilePath.string().c_str(),
489 jsonWriter.filelist_ = openDatFilePath.filename().string();
492 remove(lumiWriter->getFilePath().c_str());
493 jsonWriter.filesize_ = 0;
494 jsonWriter.filelist_ =
"";
495 jsonWriter.fileAdler32_.value() = -1;
502 jsonWriter.jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.
luminosityBlock());
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
std::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
edm::global::OutputModule< edm::RunCache< GlobalEvFOutputJSONDef >, edm::LuminosityBlockCache< evf::GlobalEvFOutputEventWriter >, edm::StreamCache< edm::StreamerOutputModuleCommon >, edm::ExternalWork > GlobalEvFOutputModuleType
jsoncollector::DataPointDefinition outJsonDef_
static const std::string BINARYOR
std::vector< BranchIDList > BranchIDLists
std::string outJsonDefName_
LuminosityBlockNumber_t luminosityBlock() const
static const std::string ADLER32
static const std::string SUM
volatile std::atomic< bool > shutdown_flag
std::shared_ptr< GlobalEvFOutputJSONDef > globalBeginRun(edm::RunForOutput const &run) const final
static bool serialize(JsonSerializable *pObj, std::string &output)
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
void write(const InitMsgBuilder &)
edm::EDGetTokenT< edm::TriggerResults > trToken_
std::string to_lower(const std::string &s)
std::string const & getFilePath() const
LuminosityBlockIndex index() const
ParameterSetID const & mainParameterSetID() const
std::unique_ptr< edm::StreamerOutputModuleCommon > beginStream(edm::StreamID) const final
void doOutputEvent(EventMsgBuilder const &msg)
jsoncollector::IntJ processed_
ModuleDescription const & description() const
edm::EDGetTokenT< edm::SendJobHeader::ParameterSetMap > psetToken_
jsoncollector::StringJ inputFiles_
jsoncollector::IntJ filesize_
jsoncollector::IntJ accepted_
jsoncollector::IntJ hltErrorEvents_
edm::SerialTaskQueue writeQueue_
void globalEndRun(edm::RunForOutput const &) const final
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
oneapi::tbb::task_group * group() const noexcept
void updateDestination(std::string const &streamLabel)
void writeRun(edm::RunForOutput const &) final
BranchIDLists const * branchIDLists() const
static void fillDescription(ParameterSetDescription &desc)
virtual void setName(std::string name)
jsoncollector::IntJ retCodeMask_
RunForOutput const & getRun() const
static const std::string MERGE
jsoncollector::StringJ transferDestination_
std::atomic< unsigned long > accepted_
Trig getTriggerResults(edm::EDGetTokenT< edm::TriggerResults > const &token, edm::EventForOutput const &e) const
filePath
CUSTOMIZE FOR ML.
#define DEFINE_FWK_MODULE(type)
std::shared_ptr< GlobalEvFOutputEventWriter > globalBeginLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const final
static std::string const triggerResults
static void writeStringToFile(std::string const &filename, std::string &content)
evf::FastMonitoringService * fms_
void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const final
static const std::string CAT
Log< level::Info, false > LogInfo
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
~GlobalEvFOutputModule() override
jsoncollector::StringJ filelist_
jsoncollector::IntJ fileAdler32_
ParameterSetID selectorConfig() const
SelectedProductsForBranchType const & keptProducts() const
WaitingTaskHolder makeWaitingTaskHolderAndRelease()
std::unique_ptr< InitMsgBuilder > serializeRegistry(SerializeDataBuffer &sbuf, BranchIDLists const &branchLists, ThinnedAssociationsHelper const &helper, std::string const &processName, std::string const &moduleLabel, ParameterSetID const &toplevel, SendJobHeader::ParameterSetMap const *psetMap)
jsoncollector::StringJ transferDestination_
GlobalEvFOutputEventWriter(std::string const &filePath, unsigned int ls)
void write(edm::EventForOutput const &e) final
GlobalEvFOutputJSONDef(std::string const &streamLabel, bool writeJsd)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
edm::ParameterSet const & ps_
jsoncollector::StringJ mergeType_
edm::SerialTaskQueue & queue()
void acquire(edm::StreamID, edm::EventForOutput const &, edm::WaitingTaskWithArenaHolder) const final
edm::detail::TriggerResultsBasedEventSelector::handle_t Trig
void doOutputEventAsync(std::unique_ptr< EventMsgBuilder > msg, edm::WaitingTaskHolder iHolder)
SerializeDataBuffer * getSerializerBuffer()
unsigned long getAccepted() const
void writeLuminosityBlock(edm::LuminosityBlockForOutput const &) final
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
GlobalEvFOutputModule(edm::ParameterSet const &ps)
edm::propagate_const< std::unique_ptr< StreamerOutputFile > > stream_writer_events_
const ModuleDescription & moduleDescription() const
void setDefaultGroup(std::string const &group)
Log< level::Warning, false > LogWarning
std::string const & moduleLabel() const
static void fillDescription(ParameterSetDescription &desc, std::vector< std::string > const &iDefaultOutputCommands=ProductSelectorRules::defaultSelectionStrings())
uint32 get_adler32() const
GlobalEvFOutputJSONWriter(std::string const &streamLabel, jsoncollector::DataPointDefinition const &, std::string const &outJsonDefName, jsoncollector::StringJ const &transferDestination, jsoncollector::StringJ const &mergeType)
jsoncollector::IntJ errorEvents_
~GlobalEvFOutputEventWriter()
jsoncollector::StringJ mergeType_
static const std::string SAME