CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes
edm::streamer::StreamerOutputMsgBuilders Class Reference

#include <StreamerOutputMsgBuilders.h>

Classes

struct  Parameters
 

Public Member Functions

std::unique_ptr< EventMsgBuilderserializeEvent (SerializeDataBuffer &sbuf, EventForOutput const &e, Handle< TriggerResults > const &triggerResults, ParameterSetID const &selectorCfg, uint32_t eventMetaDataChecksum) const
 
std::pair< std::unique_ptr< EventMsgBuilder >, uint32_t > serializeEventMetaData (SerializeDataBuffer &sbuf, BranchIDLists const &branchLists, ThinnedAssociationsHelper const &helper) const
 
std::unique_ptr< InitMsgBuilderserializeRegistry (SerializeDataBuffer &sbuf, std::string const &processName, std::string const &moduleLabel, ParameterSetID const &toplevel, SendJobHeader::ParameterSetMap const *psetMap) const
 
 StreamerOutputMsgBuilders (Parameters const &p, SelectedProducts const *selections, std::string const &moduleLabel)
 
 ~StreamerOutputMsgBuilders ()
 

Static Public Member Functions

static void fillDescription (ParameterSetDescription &desc)
 
static Parameters parameters (ParameterSet const &ps)
 

Private Member Functions

std::unique_ptr< EventMsgBuilderserializeEventCommon (uint32 run, uint32 lumi, uint64 event, std::vector< unsigned char > hltbits, unsigned int hltsize, SerializeDataBuffer &sbuf) const
 
void setHltMask (EventForOutput const &e, Handle< TriggerResults > const &triggerResults, std::vector< unsigned char > &hltbits) const
 

Private Attributes

StreamerCompressionAlgo compressionAlgo_
 
std::string compressionAlgoStr_
 
int compressionLevel_
 
uint32_t eventMetaDataChecksum_ = 0
 
unsigned int hltsize_
 
Strings hltTriggerSelections_
 
char host_name_ [255]
 
int lumiSectionInterval_
 
int maxEventSize_
 
uint32 outputModuleId_
 
StreamSerializer serializer_
 
double timeInSecSinceUTC
 
bool useCompression_
 

Detailed Description

Definition at line 23 of file StreamerOutputMsgBuilders.h.

Constructor & Destructor Documentation

◆ StreamerOutputMsgBuilders()

edm::streamer::StreamerOutputMsgBuilders::StreamerOutputMsgBuilders ( Parameters const &  p,
SelectedProducts const *  selections,
std::string const &  moduleLabel 
)
explicit

Definition at line 36 of file StreamerOutputMsgBuilders.cc.

References visDQMUpload::buf, compressionAlgo_, compressionAlgoStr_, compressionLevel_, Exception, FDEBUG, edm::getAllTriggerNames(), hltsize_, hltTriggerSelections_, host_name_, dttmaxenums::L, edm::streamer::LZMA, HerwigMaxPtPartonFilter_cfi::moduleLabel, submitPVValidationJobs::now, outputModuleId_, AlCaHLTBitMon_ParallelJobs::p, timeInSecSinceUTC, edm::streamer::UNCOMPRESSED, useCompression_, edm::streamer::ZLIB, and edm::streamer::ZSTD.

39  :
40 
42  useCompression_(p.useCompression),
43  compressionAlgoStr_(p.compressionAlgoStr),
44  compressionLevel_(p.compressionLevel),
45  lumiSectionInterval_(p.lumiSectionInterval),
46  hltsize_(0),
47  host_name_(),
49  outputModuleId_(0) {
50  //limits initially set for default ZLIB option
51  int minCompressionLevel = 1;
52  int maxCompressionLevel = 9;
53 
54  // test luminosity sections
55  struct timeval now;
56  struct timezone dummyTZ;
57  gettimeofday(&now, &dummyTZ);
58  timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0);
59 
60  if (useCompression_ == true) {
61  if (compressionAlgoStr_ == "ZLIB") {
63  } else if (compressionAlgoStr_ == "LZMA") {
65  minCompressionLevel = 0;
66  } else if (compressionAlgoStr_ == "ZSTD") {
68  maxCompressionLevel = 20;
69  } else if (compressionAlgoStr_ == "UNCOMPRESSED") {
71  useCompression_ = false;
73  } else
74  throw cms::Exception("StreamerOutputMsgBuilders", "Compression type unknown")
75  << "Unknown compression algorithm " << compressionAlgoStr_;
76 
77  if (compressionLevel_ < minCompressionLevel) {
78  FDEBUG(9) << "Compression Level = " << compressionLevel_ << " no compression" << std::endl;
80  useCompression_ = false;
82  } else if (compressionLevel_ > maxCompressionLevel) {
83  FDEBUG(9) << "Compression Level = " << compressionLevel_ << " using max compression level "
84  << maxCompressionLevel << std::endl;
85  compressionLevel_ = maxCompressionLevel;
87  }
88  } else
90 
91  int got_host = gethostname(host_name_, 255);
92  if (got_host != 0)
93  strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
94  //loadExtraClasses();
95 
96  // 25-Jan-2008, KAB - pull out the trigger selection request
97  // which we need for the INIT message
98  hltTriggerSelections_ = p.hltTriggerSelections;
99 
100  Strings const& hltTriggerNames = edm::getAllTriggerNames();
101  hltsize_ = hltTriggerNames.size();
102 
103  //Checksum of the module label
104  uLong crc = crc32(0L, Z_NULL, 0);
105  Bytef const* buf = (Bytef const*)(moduleLabel.data());
106  crc = crc32(crc, buf, moduleLabel.length());
107  outputModuleId_ = static_cast<uint32>(crc);
108  }
std::vector< std::string > const & getAllTriggerNames()
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::vector< std::string > Strings
Definition: MsgTools.h:19
unsigned int uint32
Definition: MsgTools.h:14

◆ ~StreamerOutputMsgBuilders()

edm::streamer::StreamerOutputMsgBuilders::~StreamerOutputMsgBuilders ( )

Definition at line 110 of file StreamerOutputMsgBuilders.cc.

110 {}

Member Function Documentation

◆ fillDescription()

void edm::streamer::StreamerOutputMsgBuilders::fillDescription ( ParameterSetDescription desc)
static

Definition at line 303 of file StreamerOutputMsgBuilders.cc.

References submitPVResolutionJobs::desc, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by edm::streamer::StreamerOutputModuleCommon::fillDescription(), and evf::GlobalEvFOutputModule::fillDescriptions().

303  {
304  desc.addUntracked<int>("max_event_size", 7000000)->setComment("Obsolete parameter.");
305  desc.addUntracked<bool>("use_compression", true)
306  ->setComment("If True, compression will be used to write streamer file.");
307  desc.addUntracked<std::string>("compression_algorithm", "ZLIB")
308  ->setComment("Compression algorithm to use: UNCOMPRESSED, ZLIB, LZMA or ZSTD");
309  desc.addUntracked<int>("compression_level", 1)->setComment("Compression level to use on serialized ROOT events");
310  desc.addUntracked<int>("lumiSection_interval", 0)
311  ->setComment(
312  "If 0, use lumi section number from event.\n"
313  "If not 0, the interval in seconds between fake lumi sections.");
314  }

◆ parameters()

StreamerOutputMsgBuilders::Parameters edm::streamer::StreamerOutputMsgBuilders::parameters ( ParameterSet const &  ps)
static

Definition at line 26 of file StreamerOutputMsgBuilders.cc.

References edm::EventSelector::getEventSelectionVString(), edm::ParameterSet::getUntrackedParameter(), runTheMatrix::ret, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by edm::streamer::StreamerOutputModuleCommon::parameters().

26  {
28  ret.hltTriggerSelections = EventSelector::getEventSelectionVString(ps);
29  ret.compressionAlgoStr = ps.getUntrackedParameter<std::string>("compression_algorithm");
30  ret.compressionLevel = ps.getUntrackedParameter<int>("compression_level");
31  ret.lumiSectionInterval = ps.getUntrackedParameter<int>("lumiSection_interval");
32  ret.useCompression = ps.getUntrackedParameter<bool>("use_compression");
33  return ret;
34  }
ret
prodAgent to be discontinued
static std::vector< std::string > getEventSelectionVString(edm::ParameterSet const &pset)
std::vector< AlignmentParameters * > Parameters
Definition: Utilities.h:32

◆ serializeEvent()

std::unique_ptr< EventMsgBuilder > edm::streamer::StreamerOutputMsgBuilders::serializeEvent ( SerializeDataBuffer sbuf,
EventForOutput const &  e,
Handle< TriggerResults > const &  triggerResults,
ParameterSetID const &  selectorCfg,
uint32_t  eventMetaDataChecksum 
) const

Definition at line 212 of file StreamerOutputMsgBuilders.cc.

References funct::abs(), compressionAlgo_, compressionLevel_, ALPAKA_ACCELERATOR_NAMESPACE::brokenline::constexpr(), MillePedeFileConverter_cfg::e, hltsize_, BXlumiParameters_cfi::lumi, lumiSectionInterval_, submitPVValidationJobs::now, edm::streamer::SerializeDataBuffer::reserve_size, edm::streamer::StreamSerializer::serializeEvent(), serializeEventCommon(), serializer_, setHltMask(), timeInSecSinceUTC, and edm::triggerResults().

Referenced by edm::streamer::StreamerOutputModuleCommon::serializeEvent().

217  {
218  constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
219  //Lets Build the Event Message first
220 
221  std::vector<unsigned char> hltbits;
222  setHltMask(e, triggerResults, hltbits);
223 
224  uint32 lumi = e.luminosityBlock();
225  if (lumiSectionInterval_ != 0) {
226  struct timeval now;
227  struct timezone dummyTZ;
228  gettimeofday(&now, &dummyTZ);
229  double timeInSec =
230  static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0) - timeInSecSinceUTC;
231  // what about overflows?
232  lumi = static_cast<uint32>(timeInSec / std::abs(lumiSectionInterval_)) + 1;
233  }
235  sbuf, e, selectorCfg, eventMetaDataChecksum, compressionAlgo_, compressionLevel_, reserve_size);
236 
237  return serializeEventCommon(e.id().run(), lumi, e.id().event(), hltbits, hltsize_, sbuf);
238  }
int serializeEvent(SerializeDataBuffer &data_buffer, EventForOutput const &event, ParameterSetID const &selectorConfig, uint32_t metaDataChecksum, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
std::unique_ptr< EventMsgBuilder > serializeEventCommon(uint32 run, uint32 lumi, uint64 event, std::vector< unsigned char > hltbits, unsigned int hltsize, SerializeDataBuffer &sbuf) const
static constexpr unsigned int reserve_size
void setHltMask(EventForOutput const &e, Handle< TriggerResults > const &triggerResults, std::vector< unsigned char > &hltbits) const
Abs< T >::type abs(const T &t)
Definition: Abs.h:22
static std::string const triggerResults("TriggerResults")
unsigned int uint32
Definition: MsgTools.h:14

◆ serializeEventCommon()

std::unique_ptr< EventMsgBuilder > edm::streamer::StreamerOutputMsgBuilders::serializeEventCommon ( uint32  run,
uint32  lumi,
uint64  event,
std::vector< unsigned char >  hltbits,
unsigned int  hltsize,
SerializeDataBuffer sbuf 
) const
private

Definition at line 252 of file StreamerOutputMsgBuilders.cc.

References edm::streamer::SerializeDataBuffer::adler32_chksum(), edm::streamer::SerializeDataBuffer::bufferPointer(), edm::streamer::SerializeDataBuffer::comp_buf_, ALPAKA_ACCELERATOR_NAMESPACE::brokenline::constexpr(), filterCSVwithJSON::copy, edm::streamer::SerializeDataBuffer::currentEventSize(), edm::streamer::SerializeDataBuffer::currentSpaceUsed(), edmPickEvents::event, Exception, edm::streamer::SerializeDataBuffer::header_buf_, host_name_, BXlumiParameters_cfi::lumi, mps_check::msg, outputModuleId_, edm::streamer::SerializeDataBuffer::reserve_size, writedatasetfile::run, and useCompression_.

Referenced by serializeEvent(), and serializeEventMetaData().

257  {
258  // resize header_buf_ to reserved size on first written event
259  constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
260  if (sbuf.header_buf_.size() < reserve_size)
261  sbuf.header_buf_.resize(reserve_size);
262 
263  //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
264  // once figured out, there is no logic involved here.
265  std::vector<bool> l1bit = {true, true, false};
266  //End of dummy data
267 
268  auto msg = std::make_unique<EventMsgBuilder>(&sbuf.header_buf_[0],
269  sbuf.comp_buf_.size(),
270  run,
271  event,
272  lumi,
274  0,
275  l1bit,
276  (uint8*)&hltbits[0],
277  hltsize,
278  (uint32)sbuf.adler32_chksum(),
279  host_name_);
280 
281  // 50000 bytes is reserved for header as has been the case with previous version which did one extra copy of event data
282  uint32 headerSize = msg->headerSize();
283  if (headerSize > reserve_size)
284  throw cms::Exception("StreamerOutputMsgBuilders", "Header Overflow")
285  << " header of size " << headerSize << "bytes is too big to fit into the reserved buffer space";
286 
287  //set addresses to other buffer and copy constructed header there
288  msg->setBufAddr(&sbuf.comp_buf_[reserve_size - headerSize]);
289  msg->setEventAddr(sbuf.bufferPointer());
290  std::copy(&sbuf.header_buf_[0], &sbuf.header_buf_[headerSize], (char*)(&sbuf.comp_buf_[reserve_size - headerSize]));
291 
292  unsigned int src_size = sbuf.currentSpaceUsed();
293  msg->setEventLength(src_size); //compressed size
294  if (useCompression_)
295  msg->setOrigDataSize(
296  sbuf.currentEventSize()); //uncompressed size (or 0 if no compression -> streamer input source requires this)
297  else
298  msg->setOrigDataSize(0);
299 
300  return msg;
301  }
static constexpr unsigned int reserve_size
unsigned char uint8
Definition: MsgTools.h:12
tuple msg
Definition: mps_check.py:286
unsigned int uint32
Definition: MsgTools.h:14

◆ serializeEventMetaData()

std::pair< std::unique_ptr< EventMsgBuilder >, uint32_t > edm::streamer::StreamerOutputMsgBuilders::serializeEventMetaData ( SerializeDataBuffer sbuf,
BranchIDLists const &  branchLists,
ThinnedAssociationsHelper const &  helper 
) const

Definition at line 240 of file StreamerOutputMsgBuilders.cc.

References edm::streamer::SerializeDataBuffer::adler32_chksum_, compressionAlgo_, compressionLevel_, ALPAKA_ACCELERATOR_NAMESPACE::brokenline::constexpr(), edm::streamer::SerializeDataBuffer::reserve_size, serializeEventCommon(), edm::streamer::StreamSerializer::serializeEventMetaData(), and serializer_.

Referenced by edm::streamer::StreamerOutputModuleCommon::serializeEventMetaData().

241  {
242  constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
243  //Lets Build the Event Message first
244 
245  std::vector<unsigned char> hltbits;
246  serializer_.serializeEventMetaData(sbuf, branchLists, helper, compressionAlgo_, compressionLevel_, reserve_size);
247  auto eventMetaDataChecksum = sbuf.adler32_chksum_;
248 
249  return std::make_pair(serializeEventCommon(0, 0, 0, hltbits, 0, sbuf), eventMetaDataChecksum);
250  }
Definition: helper.py:1
int serializeEventMetaData(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
data_buffer.adler32_chksum_ is the meta data checksum to pass to subsequent events ...
std::unique_ptr< EventMsgBuilder > serializeEventCommon(uint32 run, uint32 lumi, uint64 event, std::vector< unsigned char > hltbits, unsigned int hltsize, SerializeDataBuffer &sbuf) const
static constexpr unsigned int reserve_size

◆ serializeRegistry()

std::unique_ptr< InitMsgBuilder > edm::streamer::StreamerOutputMsgBuilders::serializeRegistry ( SerializeDataBuffer sbuf,
std::string const &  processName,
std::string const &  moduleLabel,
ParameterSetID const &  toplevel,
SendJobHeader::ParameterSetMap const *  psetMap 
) const

Definition at line 112 of file StreamerOutputMsgBuilders.cc.

References edm::streamer::SerializeDataBuffer::adler32_chksum(), edm::streamer::SerializeDataBuffer::bufferPointer(), edm::Hash< I >::compactForm(), filterCSVwithJSON::copy, edm::streamer::SerializeDataBuffer::currentSpaceUsed(), edm::getAllTriggerNames(), edm::getReleaseVersion(), edm::streamer::SerializeDataBuffer::header_buf_, hltTriggerSelections_, HerwigMaxPtPartonFilter_cfi::moduleLabel, outputModuleId_, SimL1EmulatorRepack_CalouGT_cff::processName, writedatasetfile::run, serializer_, edm::streamer::StreamSerializer::serializeRegistry(), and TrackRefitter_38T_cff::src.

Referenced by edm::streamer::StreamerOutputModuleCommon::serializeRegistry().

117  {
118  if (psetMap) {
119  serializer_.serializeRegistry(sbuf, *psetMap);
120  } else {
122  }
123  // resize header_buf_ to reflect space used in serializer_ + header
124  // I just added an overhead for header of 50000 for now
125  unsigned int src_size = sbuf.currentSpaceUsed();
126  unsigned int new_size = src_size + 50000;
127  if (sbuf.header_buf_.size() < new_size)
128  sbuf.header_buf_.resize(new_size);
129 
130  //Build the INIT Message
131  //Following values are strictly DUMMY and will be replaced
132  // once available with Utility function etc.
133  uint32 run = 1;
134 
135  //Get the Process PSet ID
136 
137  //In case we need to print it
138  // cms::Digest dig(toplevel.compactForm());
139  // cms::MD5Result r1 = dig.digest();
140  // std::string hexy = r1.toString();
141  // std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
142 
143  //L1 stays dummy as of today
144  Strings l1_names; //3
145  l1_names.push_back("t1");
146  l1_names.push_back("t10");
147  l1_names.push_back("t2");
148 
149  Strings const& hltTriggerNames = edm::getAllTriggerNames();
150 
151  auto init_message = std::make_unique<InitMsgBuilder>(&sbuf.header_buf_[0],
152  sbuf.header_buf_.size(),
153  run,
154  Version((uint8 const*)toplevel.compactForm().c_str()),
155  getReleaseVersion().c_str(),
156  processName.c_str(),
157  moduleLabel.c_str(),
159  hltTriggerNames,
161  l1_names,
162  (uint32)sbuf.adler32_chksum());
163 
164  // copy data into the destination message
165  unsigned char* src = sbuf.bufferPointer();
166  std::copy(src, src + src_size, init_message->dataAddress());
167  init_message->setDataLength(src_size);
168  return init_message;
169  }
std::vector< std::string > const & getAllTriggerNames()
int serializeRegistry(SerializeDataBuffer &data_buffer) const
unsigned char uint8
Definition: MsgTools.h:12
std::string getReleaseVersion()
std::vector< std::string > Strings
Definition: MsgTools.h:19
unsigned int uint32
Definition: MsgTools.h:14

◆ setHltMask()

void edm::streamer::StreamerOutputMsgBuilders::setHltMask ( EventForOutput const &  e,
Handle< TriggerResults > const &  triggerResults,
std::vector< unsigned char > &  hltbits 
) const
private

Definition at line 171 of file StreamerOutputMsgBuilders.cc.

References ntuplemaker::fill, hltsize_, mps_fire::i, edm::hlt::Pass, and edm::triggerResults().

Referenced by serializeEvent().

173  {
174  hltbits.clear();
175 
176  std::vector<unsigned char> vHltState;
177 
178  if (triggerResults.isValid()) {
180  vHltState.push_back(((triggerResults->at(i)).state()));
181  }
182  } else {
183  // We fill all Trigger bits to valid state.
185  vHltState.push_back(hlt::Pass);
186  }
187  }
188 
189  //Pack into member hltbits
190  if (!vHltState.empty()) {
191  unsigned int packInOneByte = 4;
192  unsigned int sizeOfPackage = 1 + ((vHltState.size() - 1) / packInOneByte); //Two bits per HLT
193 
194  hltbits.resize(sizeOfPackage);
195  std::fill(hltbits.begin(), hltbits.end(), 0);
196 
197  for (std::vector<unsigned char>::size_type i = 0; i != vHltState.size(); ++i) {
198  unsigned int whichByte = i / packInOneByte;
199  unsigned int indxWithinByte = i % packInOneByte;
200  hltbits[whichByte] = hltbits[whichByte] | (vHltState[i] << (indxWithinByte * 2));
201  }
202  }
203 
204  //This is Just a printing code.
205  //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
206  //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
207  // printBits(hltbits_[i]);
208  //}
209  //std::cout << "\n";
210  }
uint16_t size_type
accept
Definition: HLTenums.h:18
static std::string const triggerResults("TriggerResults")

Member Data Documentation

◆ compressionAlgo_

StreamerCompressionAlgo edm::streamer::StreamerOutputMsgBuilders::compressionAlgo_
private

◆ compressionAlgoStr_

std::string edm::streamer::StreamerOutputMsgBuilders::compressionAlgoStr_
private

Definition at line 73 of file StreamerOutputMsgBuilders.h.

Referenced by StreamerOutputMsgBuilders().

◆ compressionLevel_

int edm::streamer::StreamerOutputMsgBuilders::compressionLevel_
private

◆ eventMetaDataChecksum_

uint32_t edm::streamer::StreamerOutputMsgBuilders::eventMetaDataChecksum_ = 0
private

Definition at line 88 of file StreamerOutputMsgBuilders.h.

◆ hltsize_

unsigned int edm::streamer::StreamerOutputMsgBuilders::hltsize_
private

◆ hltTriggerSelections_

Strings edm::streamer::StreamerOutputMsgBuilders::hltTriggerSelections_
private

Definition at line 85 of file StreamerOutputMsgBuilders.h.

Referenced by serializeRegistry(), and StreamerOutputMsgBuilders().

◆ host_name_

char edm::streamer::StreamerOutputMsgBuilders::host_name_[255]
private

Definition at line 83 of file StreamerOutputMsgBuilders.h.

Referenced by serializeEventCommon(), and StreamerOutputMsgBuilders().

◆ lumiSectionInterval_

int edm::streamer::StreamerOutputMsgBuilders::lumiSectionInterval_
private

Definition at line 79 of file StreamerOutputMsgBuilders.h.

Referenced by serializeEvent().

◆ maxEventSize_

int edm::streamer::StreamerOutputMsgBuilders::maxEventSize_
private

Definition at line 71 of file StreamerOutputMsgBuilders.h.

◆ outputModuleId_

uint32 edm::streamer::StreamerOutputMsgBuilders::outputModuleId_
private

◆ serializer_

StreamSerializer edm::streamer::StreamerOutputMsgBuilders::serializer_
private

◆ timeInSecSinceUTC

double edm::streamer::StreamerOutputMsgBuilders::timeInSecSinceUTC
private

Definition at line 80 of file StreamerOutputMsgBuilders.h.

Referenced by serializeEvent(), and StreamerOutputMsgBuilders().

◆ useCompression_

bool edm::streamer::StreamerOutputMsgBuilders::useCompression_
private

Definition at line 72 of file StreamerOutputMsgBuilders.h.

Referenced by serializeEventCommon(), and StreamerOutputMsgBuilders().