CMS 3D CMS Logo

EvFOutputModule.cc
Go to the documentation of this file.
2 
4 
8 
12 
16 
19 
20 #include <sys/stat.h>
21 #include <filesystem>
22 #include <boost/algorithm/string.hpp>
23 
24 namespace evf {
25 
28  std::string const& streamLabel)
29  : streamerCommon_(ps, selections),
30  processed_(0),
31  accepted_(0),
32  errorEvents_(0),
33  retCodeMask_(0),
34  filelist_(),
35  filesize_(0),
36  inputFiles_(),
37  fileAdler32_(1),
38  hltErrorEvents_(0) {
41 
42  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
43  LogDebug("EvFOutputModule") << "writing .dat files to -: " << baseRunDir;
44 
45  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
46 
47  processed_.setName("Processed");
48  accepted_.setName("Accepted");
49  errorEvents_.setName("ErrorEvents");
50  retCodeMask_.setName("ReturnCodeMask");
51  filelist_.setName("Filelist");
52  filesize_.setName("Filesize");
53  inputFiles_.setName("InputFiles");
54  fileAdler32_.setName("FileAdler32");
55  transferDestination_.setName("TransferDestination");
56  mergeType_.setName("MergeType");
57  hltErrorEvents_.setName("HLTErrorEvents");
58 
68  outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
71 
72  std::stringstream tmpss, ss;
73  tmpss << baseRunDir << "/open/"
74  << "output_" << getpid() << ".jsd";
75  ss << baseRunDir << "/"
76  << "output_" << getpid() << ".jsd";
77  std::string outTmpJsonDefName = tmpss.str();
78  std::string outJsonDefName = ss.str();
79 
80  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
81  struct stat fstat;
82  if (stat(outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
83  LogDebug("EvFOutputModule") << "writing output definition file -: " << outJsonDefName;
87  std::filesystem::rename(outTmpJsonDefName, outJsonDefName);
88  }
89  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
90 
92  jsonMonitor_->setDefPath(outJsonDefName);
93  jsonMonitor_->registerGlobalMonitorable(&processed_, false);
94  jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
95  jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
96  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
97  jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
98  jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
99  jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
100  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
101  jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
102  jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
103  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
104  jsonMonitor_->commit(nullptr);
105  }
106 
108  : edm::one::OutputModuleBase(ps),
110  ps_(ps),
111  streamLabel_(ps.getParameter<std::string>("@module_label")),
112  trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
113  psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
114  ps.getUntrackedParameter<edm::InputTag>("psetMap"))) {
115  //replace hltOutoputA with stream if the HLT menu uses this convention
116  std::string testPrefix = "hltOutput";
117  if (streamLabel_.find(testPrefix) == 0)
118  streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
119 
120  if (streamLabel_.find('_') != std::string::npos) {
121  throw cms::Exception("EvFOutputModule") << "Underscore character is reserved can not be used for stream names in "
122  "FFF, but was detected in stream name -: "
123  << streamLabel_;
124  }
125 
126  std::string streamLabelLow = streamLabel_;
127  boost::algorithm::to_lower(streamLabelLow);
128  auto streampos = streamLabelLow.rfind("stream");
129  if (streampos != 0 && streampos != std::string::npos)
130  throw cms::Exception("EvFOutputModule")
131  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
132  "names in FFF based HLT, but was detected in stream name";
133 
135  }
136 
138 
143  desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
144  ->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
145  descriptions.add("evfOutputModule", desc);
146  }
147 
149  //create run Cache holding JSON file writer and variables
150  jsonWriter_ = std::make_unique<EvFOutputJSONWriter>(ps_, &keptProducts()[edm::InEvent], streamLabel_);
151 
152  //output INI file (non-const). This doesn't require globalBeginRun to be finished
153  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
154  edm::LogInfo("EvFOutputModule") << "beginRun init stream -: " << openIniFileName;
155 
156  StreamerOutputFile stream_writer_preamble(openIniFileName);
157  uint32 preamble_adler32 = 1;
158  edm::BranchIDLists const* bidlPtr = branchIDLists();
159 
160  auto psetMapHandle = run.getHandle(psetToken_);
161 
162  std::unique_ptr<InitMsgBuilder> init_message =
163  jsonWriter_->streamerCommon_.serializeRegistry(*jsonWriter_->streamerCommon_.getSerializerBuffer(),
164  *bidlPtr,
169  psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
170 
171  //Let us turn it into a View
172  InitMsgView view(init_message->startAddress());
173 
174  //output header
175  stream_writer_preamble.write(view);
176  preamble_adler32 = stream_writer_preamble.adler32();
177  stream_writer_preamble.close();
178 
179  struct stat istat;
180  stat(openIniFileName.c_str(), &istat);
181  //read back file to check integrity of what was written
182  off_t readInput = 0;
183  uint32_t adlera = 1, adlerb = 0;
184  FILE* src = fopen(openIniFileName.c_str(), "r");
185 
186  //allocate buffer to write INI file
187  unsigned char* outBuf = new unsigned char[1024 * 1024];
188  while (readInput < istat.st_size) {
189  size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
190  fread(outBuf, toRead, 1, src);
191  cms::Adler32((const char*)outBuf, toRead, adlera, adlerb);
192  readInput += toRead;
193  }
194  fclose(src);
195 
196  //clear serialization buffers
197  jsonWriter_->streamerCommon_.getSerializerBuffer()->clearHeaderBuffer();
198 
199  //free output buffer needed only for the file write
200  delete[] outBuf;
201  outBuf = nullptr;
202 
203  uint32_t adler32c = (adlerb << 16) | adlera;
204  if (adler32c != preamble_adler32) {
205  throw cms::Exception("EvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
206  << " expected:" << preamble_adler32 << " obtained:" << adler32c;
207  } else {
208  LogDebug("EvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
209  std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
210  }
211  }
212 
214  edm::EventForOutput const& e) const {
215  Trig result;
216  e.getByToken<edm::TriggerResults>(token, result);
217  return result;
218  }
219 
220  std::shared_ptr<EvFOutputEventWriter> EvFOutputModule::globalBeginLuminosityBlock(
221  edm::LuminosityBlockForOutput const& iLB) const {
222  auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);
223 
224  return std::make_shared<EvFOutputEventWriter>(openDatFilePath);
225  }
226 
229 
230  //auto lumiWriter = const_cast<EvFOutputEventWriter*>(luminosityBlockCache(e.getLuminosityBlock().index() ));
231  auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
232  std::unique_ptr<EventMsgBuilder> msg = jsonWriter_->streamerCommon_.serializeEvent(
233  *jsonWriter_->streamerCommon_.getSerializerBuffer(), e, triggerResults, selectorConfig());
234  lumiWriter->incAccepted();
235  lumiWriter->doOutputEvent(*msg); //msg is written and discarded at this point
236  }
237 
239  auto lumiWriter = luminosityBlockCache(iLB.index());
240  //close dat file
241  lumiWriter->close();
242 
243  jsonWriter_->fileAdler32_.value() = lumiWriter->get_adler32();
244  jsonWriter_->accepted_.value() = lumiWriter->getAccepted();
245 
246  bool abortFlag = false;
247  jsonWriter_->processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
248  if (abortFlag) {
249  edm::LogInfo("EvFOutputModule") << "Abort flag has been set. Output is suppressed";
250  return;
251  }
252 
253  if (jsonWriter_->processed_.value() != 0) {
254  struct stat istat;
255  std::filesystem::path openDatFilePath = lumiWriter->getFilePath();
256  stat(openDatFilePath.string().c_str(), &istat);
257  jsonWriter_->filesize_ = istat.st_size;
258  std::filesystem::rename(openDatFilePath.string().c_str(),
260  jsonWriter_->filelist_ = openDatFilePath.filename().string();
261  } else {
262  //remove empty file when no event processing has occurred
263  remove(lumiWriter->getFilePath().c_str());
264  jsonWriter_->filesize_ = 0;
265  jsonWriter_->filelist_ = "";
266  jsonWriter_->fileAdler32_.value() = -1; //no files in signed long
267  }
268 
269  //produce JSON file
270  jsonWriter_->jsonMonitor_->snap(iLB.luminosityBlock());
271  const std::string outputJsonNameStream =
272  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_);
273  jsonWriter_->jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock());
274  }
275 
276 } // namespace evf
ConfigurationDescriptions.h
edm::one::OutputModule
Definition: OutputModule.h:30
edm::one::OutputModuleBase::moduleDescription
const ModuleDescription & moduleDescription() const
Definition: OutputModuleBase.h:129
edm::ModuleDescription::moduleLabel
std::string const & moduleLabel() const
Definition: ModuleDescription.h:43
jsoncollector::DataPointDefinition::addLegendItem
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
Definition: DataPointDefinition.cc:93
evf::EvFOutputModule::~EvFOutputModule
~EvFOutputModule() override
Definition: EvFOutputModule.cc:137
evf::EvFOutputModule::globalEndLuminosityBlock
void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) override
Definition: EvFOutputModule.cc:238
SiPixelPI::one
Definition: SiPixelPayloadInspectorHelper.h:39
evf::EvFOutputModule::trToken_
edm::EDGetTokenT< edm::TriggerResults > trToken_
Definition: EvFOutputModule.h:96
jsoncollector::DataPointDefinition::setDefaultGroup
void setDefaultGroup(std::string const &group)
Definition: DataPointDefinition.h:54
evf::EvFOutputJSONWriter::filesize_
jsoncollector::IntJ filesize_
Definition: EvFOutputModule.h:60
edm::LuminosityBlockForOutput::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Definition: LuminosityBlockForOutput.h:54
evf::FastMonitoringService::getEventsProcessedForLumi
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
Definition: FastMonitoringService.cc:726
StreamerOutputFile::close
void close()
Definition: StreamerOutputFile.h:57
edm::LuminosityBlockForOutput::index
LuminosityBlockIndex index() const
Definition: LuminosityBlockForOutput.cc:33
edm::EDGetTokenT< edm::TriggerResults >
LuminosityBlock.h
edm
HLT enums.
Definition: AlignableModifier.h:19
HLT_FULL_cff.InputTag
InputTag
Definition: HLT_FULL_cff.py:89301
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
triggerResults
static const std::string triggerResults
Definition: EdmProvDump.cc:45
mps_check.msg
tuple msg
Definition: mps_check.py:285
EventForOutput.h
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
edm::one::OutputModuleBase::thinnedAssociationsHelper
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
Definition: OutputModuleBase.cc:347
edm::Handle< edm::TriggerResults >
edm::InRun
Definition: BranchType.h:11
jsoncollector::DataPointDefinition::MERGE
static const std::string MERGE
Definition: DataPointDefinition.h:70
FileIO.h
evf::EvFOutputJSONWriter::errorEvents_
jsoncollector::IntJ errorEvents_
Definition: EvFOutputModule.h:57
uint32
unsigned int uint32
Definition: MsgTools.h:13
edm::SelectedProducts
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
Definition: SelectedProducts.h:11
edm::LuminosityBlockForOutput
Definition: LuminosityBlockForOutput.h:40
jsoncollector::DataPointDefinition::SUM
static const std::string SUM
Definition: DataPointDefinition.h:65
edm::ParameterSetMap
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
Definition: ParameterSetConverter.h:39
evf::EvFOutputJSONWriter::mergeType_
jsoncollector::StringJ mergeType_
Definition: EvFOutputModule.h:64
evf::EvFOutputJSONWriter::transferDestination_
jsoncollector::StringJ transferDestination_
Definition: EvFOutputModule.h:63
contentValuesCheck.ss
ss
Definition: contentValuesCheck.py:33
evf::FastMonitoringService
Definition: FastMonitoringService.h:155
edm::RunForOutput
Definition: RunForOutput.h:40
jsoncollector::JsonMonitorable::setName
virtual void setName(std::string name)
Definition: JsonMonitorable.h:38
edm::Hash::isValid
bool isValid() const
Definition: Hash.h:141
edm::one::OutputModuleBase::selectorConfig
ParameterSetID selectorConfig() const
Definition: OutputModuleBase.h:134
edm::ConfigurationDescriptions::add
void add(std::string const &label, ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:57
InitMsgBuilder.h
evf::EvFOutputJSONWriter::jsonMonitor_
std::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
Definition: EvFOutputModule.h:66
EventMsgBuilder.h
cms::Adler32
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
Definition: Adler32Calculator.cc:10
Service.h
StreamerOutputFile::write
void write(const InitMsgBuilder &)
Definition: StreamerOutputFile.cc:52
StreamerOutputFile
Definition: StreamerOutputFile.h:23
jsoncollector::DataPointDefinition::CAT
static const std::string CAT
Definition: DataPointDefinition.h:69
edm::InEvent
Definition: BranchType.h:11
evf::EvFOutputModule::psetToken_
edm::EDGetTokenT< edm::SendJobHeader::ParameterSetMap > psetToken_
Definition: EvFOutputModule.h:97
edm::one::OutputModuleBase::fillDescription
static void fillDescription(ParameterSetDescription &desc, std::vector< std::string > const &iDefaultOutputCommands=ProductSelectorRules::defaultSelectionStrings())
Definition: OutputModuleBase.cc:361
EvFOutputModule.h
evf::MergeTypeDAT
Definition: EvFDaqDirector.h:58
edm::BranchIDLists
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
evf::EvFOutputModule::globalBeginLuminosityBlock
std::shared_ptr< EvFOutputEventWriter > globalBeginLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const override
Definition: EvFOutputModule.cc:220
evf::EvFOutputModule::jsonWriter_
std::unique_ptr< evf::EvFOutputJSONWriter > jsonWriter_
Definition: EvFOutputModule.h:101
OutputModuleBase
jsoncollector::DataPointDefinition::BINARYOR
static const std::string BINARYOR
Definition: DataPointDefinition.h:71
evf::EvFOutputJSONWriter::fileAdler32_
jsoncollector::IntJ fileAdler32_
Definition: EvFOutputModule.h:62
Skims_PA_cff.content
content
Definition: Skims_PA_cff.py:19
LogDebug
#define LogDebug(id)
Definition: MessageLogger.h:233
edm::ParameterSet
Definition: ParameterSet.h:47
TrackRefitter_38T_cff.src
src
Definition: TrackRefitter_38T_cff.py:24
evf::EvFOutputJSONWriter::inputFiles_
jsoncollector::StringJ inputFiles_
Definition: EvFOutputModule.h:61
evf::EvFOutputJSONWriter::outJsonDef_
jsoncollector::DataPointDefinition outJsonDef_
Definition: EvFOutputModule.h:67
to_lower
std::string to_lower(const std::string &s)
Definition: CredentialStore.h:23
edm::one::OutputModuleBase::keptProducts
SelectedProductsForBranchType const & keptProducts() const
Definition: OutputModuleBase.h:97
edm::one::OutputModuleBase::branchIDLists
BranchIDLists const * branchIDLists()
Definition: OutputModuleBase.cc:327
evf::EvFOutputJSONWriter::accepted_
jsoncollector::IntJ accepted_
Definition: EvFOutputModule.h:56
evf::EvFOutputModule::EvFOutputModule
EvFOutputModule(edm::ParameterSet const &ps)
Definition: EvFOutputModule.cc:107
edm::Service
Definition: Service.h:30
jsoncollector::DataPointDefinition::SAME
static const std::string SAME
Definition: DataPointDefinition.h:67
EvFDaqDirector.h
FastMonitoringService.h
jsoncollector::FileIO::writeStringToFile
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
evf::EvFOutputJSONWriter::hltErrorEvents_
jsoncollector::IntJ hltErrorEvents_
Definition: EvFOutputModule.h:65
evf::EvFOutputModule::fms_
evf::FastMonitoringService * fms_
Definition: EvFOutputModule.h:99
evf::EvFOutputModule::ps_
edm::ParameterSet const & ps_
Definition: EvFOutputModule.h:94
JSONSerializer.h
evf::EvFOutputJSONWriter::EvFOutputJSONWriter
EvFOutputJSONWriter(edm::ParameterSet const &ps, edm::SelectedProducts const *selections, std::string const &streamLabel)
Definition: EvFOutputModule.cc:26
jsoncollector::FastMonitor
Definition: FastMonitor.h:19
evf::EvFOutputJSONWriter::processed_
jsoncollector::IntJ processed_
Definition: EvFOutputModule.h:55
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
SimL1EmulatorRepack_CalouGT_cff.processName
processName
Definition: SimL1EmulatorRepack_CalouGT_cff.py:17
evf::EvFOutputJSONWriter::retCodeMask_
jsoncollector::IntJ retCodeMask_
Definition: EvFOutputModule.h:58
evf::EvFOutputJSONWriter::filelist_
jsoncollector::StringJ filelist_
Definition: EvFOutputModule.h:59
edm::EventForOutput
Definition: EventForOutput.h:50
evf::EvFOutputModule::getTriggerResults
Trig getTriggerResults(edm::EDGetTokenT< edm::TriggerResults > const &token, edm::EventForOutput const &e) const
Definition: EvFOutputModule.cc:213
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
edm::one::OutputModuleBase::description
ModuleDescription const & description() const
Definition: OutputModuleBase.cc:351
std
Definition: JetResolutionObject.h:76
writedatasetfile.run
run
Definition: writedatasetfile.py:27
evf::EvFOutputModule::beginRun
void beginRun(edm::RunForOutput const &run) override
Definition: EvFOutputModule.cc:148
edm::ModuleDescription::mainParameterSetID
ParameterSetID const & mainParameterSetID() const
Definition: ModuleDescription.cc:53
evf::EvFOutputModule::streamLabel_
std::string streamLabel_
Definition: EvFOutputModule.h:95
jsoncollector::DataPointDefinition::ADLER32
static const std::string ADLER32
Definition: DataPointDefinition.h:72
Exception
Definition: hltDiff.cc:245
MatrixUtil.remove
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:219
evf
Definition: fillJson.h:27
Adler32Calculator.h
clusterbigeventsdebugger_cfi.selections
selections
Definition: clusterbigeventsdebugger_cfi.py:10
jsoncollector::JSONSerializer::serialize
static bool serialize(JsonSerializable *pObj, std::string &output)
Definition: JSONSerializer.cc:14
StreamerOutputFile::adler32
uint32 adler32() const
Definition: StreamerOutputFile.h:55
mps_fire.result
result
Definition: mps_fire.py:311
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
edm::StreamerOutputModuleCommon::fillDescription
static void fillDescription(ParameterSetDescription &desc)
Definition: StreamerOutputModuleCommon.cc:270
LuminosityBlockForOutput.h
edm_modernize_messagelogger.stat
stat
Definition: edm_modernize_messagelogger.py:27
HLTObjectsMonitor_cfi.TriggerResults
TriggerResults
Definition: HLTObjectsMonitor_cfi.py:9
edm::InputTag
Definition: InputTag.h:15
edm::TriggerResults
Definition: TriggerResults.h:35
beamhlt_dqm_sourceclient-live_cfg.streamLabel
streamLabel
Definition: beamhlt_dqm_sourceclient-live_cfg.py:58
evf::EvFOutputModule::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: EvFOutputModule.cc:139
InitMsgView
Definition: InitMessage.h:61
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
evf::EvFOutputModule::write
void write(edm::EventForOutput const &e) override
Definition: EvFOutputModule.cc:227
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316