23 #include <boost/algorithm/string.hpp> 45 LogDebug(
"EvFOutputModule") <<
"writing .dat files to -: " << baseRunDir;
74 std::stringstream tmpss,
ss;
75 tmpss << baseRunDir <<
"/open/" 76 <<
"output_" << getpid() <<
".jsd";
77 ss << baseRunDir <<
"/" 78 <<
"output_" << getpid() <<
".jsd";
84 if (
stat(outJsonDefName.c_str(), &fstat) != 0) {
85 LogDebug(
"EvFOutputModule") <<
"writing output definition file -: " << outJsonDefName;
89 std::filesystem::rename(outTmpJsonDefName, outJsonDefName);
112 commonParameters_(
edm::StreamerOutputModuleCommon::
parameters(ps)),
113 streamLabel_(ps.getParameter<
std::
string>(
"@module_label")),
116 ps.getUntrackedParameter<
edm::
InputTag>(
"psetMap"))) {
123 throw cms::Exception(
"EvFOutputModule") <<
"Underscore character is reserved can not be used for stream names in " 124 "FFF, but was detected in stream name -: " 130 auto streampos = streamLabelLow.rfind(
"stream");
131 if (streampos != 0 && streampos != std::string::npos)
133 <<
"stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for " 134 "names in FFF based HLT, but was detected in stream name";
146 ->setComment(
"Optionally allow the map of ParameterSets to be calculated externally.");
147 descriptions.
add(
"evfOutputModule",
desc);
152 jsonWriter_ = std::make_unique<EvFOutputJSONWriter>(
157 edm::LogInfo(
"EvFOutputModule") <<
"beginRun init stream -: " << openIniFileName;
160 uint32 preamble_adler32 = 1;
165 std::unique_ptr<InitMsgBuilder> init_message =
172 psetMapHandle.
isValid() ? psetMapHandle.product() :
nullptr);
179 preamble_adler32 = stream_writer_preamble.
adler32();
180 stream_writer_preamble.
close();
183 stat(openIniFileName.c_str(), &istat);
186 uint32_t adlera = 1, adlerb = 0;
187 FILE*
src = fopen(openIniFileName.c_str(),
"r");
190 unsigned char* outBuf =
new unsigned char[1024 * 1024];
191 while (readInput < istat.st_size) {
192 size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
193 fread(outBuf, toRead, 1,
src);
194 cms::Adler32((
const char*)outBuf, toRead, adlera, adlerb);
200 jsonWriter_->streamerCommon_.getSerializerBuffer()->clearHeaderBuffer();
206 uint32_t adler32c = (adlerb << 16) | adlera;
207 if (adler32c != preamble_adler32) {
208 throw cms::Exception(
"EvFOutputModule") <<
"Checksum mismatch of ini file -: " << openIniFileName
209 <<
" expected:" << preamble_adler32 <<
" obtained:" << adler32c;
227 return std::make_shared<EvFOutputEventWriter>(openDatFilePath);
236 edm::LogWarning(
"FedRawDataInputSource") <<
"Input throttled detected, writing is paused...";
244 auto lumiWriter = luminosityBlockCache(
e.getLuminosityBlock().index());
245 std::unique_ptr<EventMsgBuilder>
msg =
jsonWriter_->streamerCommon_.serializeEvent(
247 lumiWriter->incAccepted();
248 lumiWriter->doOutputEvent(*
msg);
252 auto lumiWriter = luminosityBlockCache(iLB.
index());
256 jsonWriter_->fileAdler32_.value() = lumiWriter->get_adler32();
257 jsonWriter_->accepted_.value() = lumiWriter->getAccepted();
259 bool abortFlag =
false;
262 edm::LogInfo(
"EvFOutputModule") <<
"Abort flag has been set. Output is suppressed";
269 stat(openDatFilePath.string().c_str(), &istat);
271 std::filesystem::rename(openDatFilePath.string().c_str(),
273 jsonWriter_->filelist_ = openDatFilePath.filename().string();
276 remove(lumiWriter->getFilePath().c_str());
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
EvFOutputModule(edm::ParameterSet const &ps)
std::unique_ptr< evf::EvFOutputJSONWriter > jsonWriter_
static const std::string BINARYOR
std::vector< BranchIDList > BranchIDLists
jsoncollector::IntJ fileAdler32_
LuminosityBlockNumber_t luminosityBlock() const
static const std::string ADLER32
std::shared_ptr< EvFOutputEventWriter > globalBeginLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const override
static const std::string SUM
volatile std::atomic< bool > shutdown_flag
static bool serialize(JsonSerializable *pObj, std::string &output)
void write(const InitMsgBuilder &)
std::string to_lower(const std::string &s)
LuminosityBlockIndex index() const
jsoncollector::DataPointDefinition outJsonDef_
jsoncollector::IntJ hltErrorEvents_
ParameterSetID const & mainParameterSetID() const
Trig getTriggerResults(edm::EDGetTokenT< edm::TriggerResults > const &token, edm::EventForOutput const &e) const
jsoncollector::StringJ inputFiles_
void beginRun(edm::RunForOutput const &run) override
ModuleDescription const & description() const
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
jsoncollector::IntJ retCodeMask_
BranchIDLists const * branchIDLists() const
static void fillDescription(ParameterSetDescription &desc)
virtual void setName(std::string name)
~EvFOutputModule() override
static const std::string MERGE
void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) override
static std::string const triggerResults
static void writeStringToFile(std::string const &filename, std::string &content)
static const std::string CAT
jsoncollector::IntJ processed_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Log< level::Info, false > LogInfo
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
void write(edm::EventForOutput const &e) override
ParameterSetID selectorConfig() const
SelectedProductsForBranchType const & keptProducts() const
edm::EDGetTokenT< edm::TriggerResults > trToken_
edm::StreamerOutputModuleCommon::Parameters commonParameters_
EvFOutputJSONWriter(edm::StreamerOutputModuleCommon::Parameters const &commonParameters, edm::SelectedProducts const *selections, std::string const &streamLabel, std::string const &moduleLabel)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
jsoncollector::IntJ filesize_
jsoncollector::StringJ filelist_
jsoncollector::StringJ transferDestination_
edm::EDGetTokenT< edm::SendJobHeader::ParameterSetMap > psetToken_
std::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
const ModuleDescription & moduleDescription() const
void setDefaultGroup(std::string const &group)
Log< level::Warning, false > LogWarning
jsoncollector::IntJ accepted_
std::string const & moduleLabel() const
jsoncollector::StringJ mergeType_
static void fillDescription(ParameterSetDescription &desc, std::vector< std::string > const &iDefaultOutputCommands=ProductSelectorRules::defaultSelectionStrings())
jsoncollector::IntJ errorEvents_
static const std::string SAME
evf::FastMonitoringService * fms_