36 #include <boost/algorithm/string.hpp>
64 std::unique_ptr<EventMsgBuilder> own(msg);
68 tmp.doneWaiting(std::current_exception());
79 edm::LogWarning(
"FedRawDataInputSource") <<
"Input throttled detected, writing is paused...";
171 LogDebug(
"GlobalEvFOutputModule") <<
"writing .dat files to -: " << baseRunDir;
188 std::stringstream tmpss,
ss;
189 tmpss << baseRunDir <<
"/open/"
190 <<
"output_" << getpid() <<
".jsd";
191 ss << baseRunDir <<
"/"
192 <<
"output_" << getpid() <<
".jsd";
254 streamLabel_(ps.getParameter<std::
string>(
"@module_label")),
255 trToken_(consumes<edm::TriggerResults>(edm::
InputTag(
"TriggerResults"))),
257 ps.getUntrackedParameter<edm::
InputTag>(
"psetMap"))) {
265 <<
"Underscore character is reserved can not be used for stream names in "
266 "FFF, but was detected in stream name -: "
272 auto streampos = streamLabelLow.rfind(
"stream");
273 if (streampos != 0 && streampos != std::string::npos)
275 <<
"stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
276 "names in FFF based HLT, but was detected in stream name";
288 ->setComment(
"Optionally allow the map of ParameterSets to be calculated externally.");
289 descriptions.
add(
"globalEvfOutputModule", desc);
293 return std::make_unique<edm::StreamerOutputModuleCommon>(
299 auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>();
305 edm::LogInfo(
"GlobalEvFOutputModule") <<
"beginRun init stream -: " << openIniFileName;
308 uint32 preamble_adler32 = 1;
313 std::unique_ptr<InitMsgBuilder> init_message =
314 streamerCommon.serializeRegistry(*streamerCommon.getSerializerBuffer(),
320 psetMapHandle.
isValid() ? psetMapHandle.product() :
nullptr);
326 stream_writer_preamble.
write(view);
327 preamble_adler32 = stream_writer_preamble.
adler32();
328 stream_writer_preamble.
close();
331 stat(openIniFileName.c_str(), &istat);
334 uint32_t adlera = 1, adlerb = 0;
335 FILE*
src = fopen(openIniFileName.c_str(),
"r");
338 std::unique_ptr<unsigned char[]> outBuf = std::make_unique<unsigned char[]>(1024 * 1024);
339 while (readInput < istat.st_size) {
340 size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
341 fread(outBuf.get(), toRead, 1,
src);
342 cms::Adler32(const_cast<const char*>(reinterpret_cast<char*>(outBuf.get())), toRead, adlera, adlerb);
348 streamerCommon.getSerializerBuffer()->clearHeaderBuffer();
353 uint32_t adler32c = (adlerb << 16) | adlera;
354 if (adler32c != preamble_adler32) {
355 throw cms::Exception(
"GlobalEvFOutputModule") <<
"Checksum mismatch of ini file -: " << openIniFileName
356 <<
" expected:" << preamble_adler32 <<
" obtained:" << adler32c;
358 LogDebug(
"GlobalEvFOutputModule") <<
"Ini file checksum -: " <<
streamLabel_ <<
" " << adler32c;
376 return std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath);
384 auto streamerCommon = streamCache(
id);
385 std::unique_ptr<EventMsgBuilder>
msg =
395 auto lumiWriter = luminosityBlockCache(iLB.
index());
403 jsonWriter.fileAdler32_.value() = lumiWriter->get_adler32();
404 jsonWriter.accepted_.value() = lumiWriter->getAccepted();
406 bool abortFlag =
false;
409 edm::LogInfo(
"GlobalEvFOutputModule") <<
"Abort flag has been set. Output is suppressed";
413 if (jsonWriter.processed_.value() != 0) {
416 stat(openDatFilePath.string().c_str(), &istat);
417 jsonWriter.filesize_ = istat.st_size;
418 std::filesystem::rename(openDatFilePath.string().c_str(),
420 jsonWriter.filelist_ = openDatFilePath.filename().string();
423 remove(lumiWriter->getFilePath().c_str());
424 jsonWriter.filesize_ = 0;
425 jsonWriter.filelist_ =
"";
426 jsonWriter.fileAdler32_.value() = -1;
433 jsonWriter.jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.
luminosityBlock());
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
ParameterSetID const & mainParameterSetID() const
std::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
edm::global::OutputModule< edm::RunCache< GlobalEvFOutputJSONDef >, edm::LuminosityBlockCache< evf::GlobalEvFOutputEventWriter >, edm::StreamCache< edm::StreamerOutputModuleCommon >, edm::ExternalWork > GlobalEvFOutputModuleType
GlobalEvFOutputEventWriter(std::string const &filePath)
jsoncollector::DataPointDefinition outJsonDef_
static const std::string BINARYOR
std::vector< BranchIDList > BranchIDLists
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
std::string const & getFilePath() const
std::string outJsonDefName_
static const std::string ADLER32
#define DEFINE_FWK_MODULE(type)
static const std::string SUM
GlobalEvFOutputJSONWriter(std::string const &streamLabel, jsoncollector::DataPointDefinition const &, std::string const &outJsonDefName)
string filePath
DQM Environment
volatile std::atomic< bool > shutdown_flag
std::shared_ptr< GlobalEvFOutputJSONDef > globalBeginRun(edm::RunForOutput const &run) const final
static bool serialize(JsonSerializable *pObj, std::string &output)
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
void write(const InitMsgBuilder &)
edm::EDGetTokenT< edm::TriggerResults > trToken_
uint8 * startAddress() const
std::string to_lower(const std::string &s)
std::unique_ptr< edm::StreamerOutputModuleCommon > beginStream(edm::StreamID) const final
void doOutputEvent(EventMsgBuilder const &msg)
jsoncollector::IntJ processed_
std::string const & moduleLabel() const
edm::EDGetTokenT< edm::SendJobHeader::ParameterSetMap > psetToken_
jsoncollector::StringJ inputFiles_
jsoncollector::IntJ filesize_
Handle< PROD > getHandle(EDGetTokenT< PROD > token) const
jsoncollector::IntJ accepted_
jsoncollector::IntJ hltErrorEvents_
edm::SerialTaskQueue writeQueue_
void globalEndRun(edm::RunForOutput const &) const final
LuminosityBlockIndex index() const
oneapi::tbb::task_group * group() const noexcept
Trig getTriggerResults(edm::EDGetTokenT< edm::TriggerResults > const &token, edm::EventForOutput const &e) const
void writeRun(edm::RunForOutput const &) final
LuminosityBlockForOutput const & getLuminosityBlock() const
LuminosityBlockNumber_t luminosityBlock() const
static void fillDescription(ParameterSetDescription &desc)
virtual void setName(std::string name)
ModuleDescription const & description() const
jsoncollector::IntJ retCodeMask_
static const std::string MERGE
std::atomic< unsigned long > accepted_
std::shared_ptr< GlobalEvFOutputEventWriter > globalBeginLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const final
static std::string const triggerResults
RunForOutput const & getRun() const
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
static void writeStringToFile(std::string const &filename, std::string &content)
evf::FastMonitoringService * fms_
void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const final
static const std::string CAT
BranchIDLists const * branchIDLists() const
Log< level::Info, false > LogInfo
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
~GlobalEvFOutputModule() override
jsoncollector::StringJ filelist_
uint32 get_adler32() const
jsoncollector::IntJ fileAdler32_
WaitingTaskHolder makeWaitingTaskHolderAndRelease()
ParameterSetID selectorConfig() const
jsoncollector::StringJ transferDestination_
void write(edm::EventForOutput const &e) final
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
BasicHandle getByToken(EDGetToken token, TypeID const &typeID) const
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
edm::ParameterSet const & ps_
jsoncollector::StringJ mergeType_
unsigned long getAccepted() const
edm::SerialTaskQueue & queue()
void acquire(edm::StreamID, edm::EventForOutput const &, edm::WaitingTaskWithArenaHolder) const final
void doOutputEventAsync(std::unique_ptr< EventMsgBuilder > msg, edm::WaitingTaskHolder iHolder)
static std::atomic< unsigned int > counter
SelectedProductsForBranchType const & keptProducts() const
void writeLuminosityBlock(edm::LuminosityBlockForOutput const &) final
const ModuleDescription & moduleDescription() const
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
GlobalEvFOutputModule(edm::ParameterSet const &ps)
edm::propagate_const< std::unique_ptr< StreamerOutputFile > > stream_writer_events_
void setDefaultGroup(std::string const &group)
Log< level::Warning, false > LogWarning
edm::detail::TriggerResultsBasedEventSelector::handle_t Trig
static void fillDescription(ParameterSetDescription &desc, std::vector< std::string > const &iDefaultOutputCommands=ProductSelectorRules::defaultSelectionStrings())
jsoncollector::IntJ errorEvents_
~GlobalEvFOutputEventWriter()
static const std::string SAME