CMS 3D CMS Logo

EvFOutputModule.cc
Go to the documentation of this file.
2 
4 
8 
12 
16 
20 
21 #include <sys/stat.h>
22 #include <filesystem>
23 #include <boost/algorithm/string.hpp>
24 
25 namespace evf {
26 
29  std::string const& streamLabel,
30  std::string const& moduleLabel)
31  : streamerCommon_(commonParameters, selections, moduleLabel),
32  processed_(0),
33  accepted_(0),
34  errorEvents_(0),
35  retCodeMask_(0),
36  filelist_(),
37  filesize_(0),
38  inputFiles_(),
39  fileAdler32_(1),
40  hltErrorEvents_(0) {
43 
44  std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
45  LogDebug("EvFOutputModule") << "writing .dat files to -: " << baseRunDir;
46 
47  edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
48 
49  processed_.setName("Processed");
50  accepted_.setName("Accepted");
51  errorEvents_.setName("ErrorEvents");
52  retCodeMask_.setName("ReturnCodeMask");
53  filelist_.setName("Filelist");
54  filesize_.setName("Filesize");
55  inputFiles_.setName("InputFiles");
56  fileAdler32_.setName("FileAdler32");
57  transferDestination_.setName("TransferDestination");
58  mergeType_.setName("MergeType");
59  hltErrorEvents_.setName("HLTErrorEvents");
60 
70  outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
73 
74  std::stringstream tmpss, ss;
75  tmpss << baseRunDir << "/open/"
76  << "output_" << getpid() << ".jsd";
77  ss << baseRunDir << "/"
78  << "output_" << getpid() << ".jsd";
79  std::string outTmpJsonDefName = tmpss.str();
80  std::string outJsonDefName = ss.str();
81 
82  edm::Service<evf::EvFDaqDirector>()->lockInitLock();
83  struct stat fstat;
84  if (stat(outJsonDefName.c_str(), &fstat) != 0) { //file does not exist
85  LogDebug("EvFOutputModule") << "writing output definition file -: " << outJsonDefName;
89  std::filesystem::rename(outTmpJsonDefName, outJsonDefName);
90  }
91  edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
92 
94  jsonMonitor_->setDefPath(outJsonDefName);
95  jsonMonitor_->registerGlobalMonitorable(&processed_, false);
96  jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
97  jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
98  jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
99  jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
100  jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
101  jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
102  jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
103  jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
104  jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
105  jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
106  jsonMonitor_->commit(nullptr);
107  }
108 
110  : edm::one::OutputModuleBase(ps),
112  commonParameters_(edm::StreamerOutputModuleCommon::parameters(ps)),
113  streamLabel_(ps.getParameter<std::string>("@module_label")),
114  trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
115  psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
116  ps.getUntrackedParameter<edm::InputTag>("psetMap"))) {
117  //replace hltOutoputA with stream if the HLT menu uses this convention
118  std::string testPrefix = "hltOutput";
119  if (streamLabel_.find(testPrefix) == 0)
120  streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
121 
122  if (streamLabel_.find('_') != std::string::npos) {
123  throw cms::Exception("EvFOutputModule") << "Underscore character is reserved can not be used for stream names in "
124  "FFF, but was detected in stream name -: "
125  << streamLabel_;
126  }
127 
128  std::string streamLabelLow = streamLabel_;
129  boost::algorithm::to_lower(streamLabelLow);
130  auto streampos = streamLabelLow.rfind("stream");
131  if (streampos != 0 && streampos != std::string::npos)
132  throw cms::Exception("EvFOutputModule")
133  << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
134  "names in FFF based HLT, but was detected in stream name";
135 
137  }
138 
140 
145  desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
146  ->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
147  descriptions.add("evfOutputModule", desc);
148  }
149 
151  //create run Cache holding JSON file writer and variables
152  jsonWriter_ = std::make_unique<EvFOutputJSONWriter>(
154 
155  //output INI file (non-const). This doesn't require globalBeginRun to be finished
156  const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
157  edm::LogInfo("EvFOutputModule") << "beginRun init stream -: " << openIniFileName;
158 
159  StreamerOutputFile stream_writer_preamble(openIniFileName);
160  uint32 preamble_adler32 = 1;
161  edm::BranchIDLists const* bidlPtr = branchIDLists();
162 
163  auto psetMapHandle = run.getHandle(psetToken_);
164 
165  std::unique_ptr<InitMsgBuilder> init_message =
166  jsonWriter_->streamerCommon_.serializeRegistry(*jsonWriter_->streamerCommon_.getSerializerBuffer(),
167  *bidlPtr,
172  psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
173 
174  //Let us turn it into a View
175  InitMsgView view(init_message->startAddress());
176 
177  //output header
178  stream_writer_preamble.write(view);
179  preamble_adler32 = stream_writer_preamble.adler32();
180  stream_writer_preamble.close();
181 
182  struct stat istat;
183  stat(openIniFileName.c_str(), &istat);
184  //read back file to check integrity of what was written
185  off_t readInput = 0;
186  uint32_t adlera = 1, adlerb = 0;
187  FILE* src = fopen(openIniFileName.c_str(), "r");
188 
189  //allocate buffer to write INI file
190  unsigned char* outBuf = new unsigned char[1024 * 1024];
191  while (readInput < istat.st_size) {
192  size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
193  fread(outBuf, toRead, 1, src);
194  cms::Adler32((const char*)outBuf, toRead, adlera, adlerb);
195  readInput += toRead;
196  }
197  fclose(src);
198 
199  //clear serialization buffers
200  jsonWriter_->streamerCommon_.getSerializerBuffer()->clearHeaderBuffer();
201 
202  //free output buffer needed only for the file write
203  delete[] outBuf;
204  outBuf = nullptr;
205 
206  uint32_t adler32c = (adlerb << 16) | adlera;
207  if (adler32c != preamble_adler32) {
208  throw cms::Exception("EvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
209  << " expected:" << preamble_adler32 << " obtained:" << adler32c;
210  } else {
211  LogDebug("EvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
212  std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
213  }
214  }
215 
217  edm::EventForOutput const& e) const {
218  Trig result;
219  e.getByToken<edm::TriggerResults>(token, result);
220  return result;
221  }
222 
223  std::shared_ptr<EvFOutputEventWriter> EvFOutputModule::globalBeginLuminosityBlock(
224  edm::LuminosityBlockForOutput const& iLB) const {
225  auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);
226 
227  return std::make_shared<EvFOutputEventWriter>(openDatFilePath);
228  }
229 
231  unsigned int counter = 0;
232  while (edm::Service<evf::EvFDaqDirector>()->inputThrottled()) {
233  if (edm::shutdown_flag.load(std::memory_order_relaxed))
234  break;
235  if (!(counter % 100))
236  edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
237  usleep(100000);
238  counter++;
239  }
240 
242 
243  //auto lumiWriter = const_cast<EvFOutputEventWriter*>(luminosityBlockCache(e.getLuminosityBlock().index() ));
244  auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
245  std::unique_ptr<EventMsgBuilder> msg = jsonWriter_->streamerCommon_.serializeEvent(
246  *jsonWriter_->streamerCommon_.getSerializerBuffer(), e, triggerResults, selectorConfig());
247  lumiWriter->incAccepted();
248  lumiWriter->doOutputEvent(*msg); //msg is written and discarded at this point
249  }
250 
252  auto lumiWriter = luminosityBlockCache(iLB.index());
253  //close dat file
254  lumiWriter->close();
255 
256  jsonWriter_->fileAdler32_.value() = lumiWriter->get_adler32();
257  jsonWriter_->accepted_.value() = lumiWriter->getAccepted();
258 
259  bool abortFlag = false;
260  jsonWriter_->processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
261  if (abortFlag) {
262  edm::LogInfo("EvFOutputModule") << "Abort flag has been set. Output is suppressed";
263  return;
264  }
265 
266  if (jsonWriter_->processed_.value() != 0) {
267  struct stat istat;
268  std::filesystem::path openDatFilePath = lumiWriter->getFilePath();
269  stat(openDatFilePath.string().c_str(), &istat);
270  jsonWriter_->filesize_ = istat.st_size;
271  std::filesystem::rename(openDatFilePath.string().c_str(),
273  jsonWriter_->filelist_ = openDatFilePath.filename().string();
274  } else {
275  //remove empty file when no event processing has occurred
276  remove(lumiWriter->getFilePath().c_str());
277  jsonWriter_->filesize_ = 0;
278  jsonWriter_->filelist_ = "";
279  jsonWriter_->fileAdler32_.value() = -1; //no files in signed long
280  }
281 
282  //produce JSON file
283  jsonWriter_->jsonMonitor_->snap(iLB.luminosityBlock());
284  const std::string outputJsonNameStream =
285  edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_);
286  jsonWriter_->jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock());
287  }
288 
289 } // namespace evf
void addLegendItem(std::string const &name, std::string const &type, std::string const &operation)
Definition: fillJson.h:27
EvFOutputModule(edm::ParameterSet const &ps)
std::unique_ptr< evf::EvFOutputJSONWriter > jsonWriter_
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
jsoncollector::IntJ fileAdler32_
LuminosityBlockNumber_t luminosityBlock() const
std::shared_ptr< EvFOutputEventWriter > globalBeginLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) const override
volatile std::atomic< bool > shutdown_flag
static bool serialize(JsonSerializable *pObj, std::string &output)
void write(const InitMsgBuilder &)
std::string to_lower(const std::string &s)
LuminosityBlockIndex index() const
jsoncollector::DataPointDefinition outJsonDef_
jsoncollector::IntJ hltErrorEvents_
ParameterSetID const & mainParameterSetID() const
Trig getTriggerResults(edm::EDGetTokenT< edm::TriggerResults > const &token, edm::EventForOutput const &e) const
jsoncollector::StringJ inputFiles_
void beginRun(edm::RunForOutput const &run) override
ModuleDescription const & description() const
bool isValid() const
Definition: Hash.h:141
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
jsoncollector::IntJ retCodeMask_
std::string streamLabel_
uint32 adler32() const
BranchIDLists const * branchIDLists() const
static void fillDescription(ParameterSetDescription &desc)
virtual void setName(std::string name)
void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const &iLB) override
static std::string const triggerResults
Definition: EdmProvDump.cc:47
static void writeStringToFile(std::string const &filename, std::string &content)
Definition: FileIO.cc:21
unsigned int uint32
Definition: MsgTools.h:13
jsoncollector::IntJ processed_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Log< level::Info, false > LogInfo
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
void write(edm::EventForOutput const &e) override
ParameterSetID selectorConfig() const
SelectedProductsForBranchType const & keptProducts() const
def load(fileName)
Definition: svgfig.py:547
edm::EDGetTokenT< edm::TriggerResults > trToken_
edm::StreamerOutputModuleCommon::Parameters commonParameters_
EvFOutputJSONWriter(edm::StreamerOutputModuleCommon::Parameters const &commonParameters, edm::SelectedProducts const *selections, std::string const &streamLabel, std::string const &moduleLabel)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
tuple msg
Definition: mps_check.py:286
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
jsoncollector::IntJ filesize_
jsoncollector::StringJ filelist_
HLT enums.
jsoncollector::StringJ transferDestination_
edm::EDGetTokenT< edm::SendJobHeader::ParameterSetMap > psetToken_
std::shared_ptr< jsoncollector::FastMonitor > jsonMonitor_
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool *abortFlag=nullptr)
const ModuleDescription & moduleDescription() const
void setDefaultGroup(std::string const &group)
Log< level::Warning, false > LogWarning
jsoncollector::IntJ accepted_
std::string const & moduleLabel() const
jsoncollector::StringJ mergeType_
static void fillDescription(ParameterSetDescription &desc, std::vector< std::string > const &iDefaultOutputCommands=ProductSelectorRules::defaultSelectionStrings())
jsoncollector::IntJ errorEvents_
evf::FastMonitoringService * fms_
#define LogDebug(id)