CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Protected Attributes | Private Member Functions | Private Attributes
edm::StreamerOutputModuleCommon Class Reference

#include <StreamerOutputModuleCommon.h>

Inheritance diagram for edm::StreamerOutputModuleCommon:
edm::StreamerOutputModuleBase edm::StreamerOutputModule< Consumer >

Public Member Functions

SerializeDataBuffergetSerializerBuffer ()
 
std::unique_ptr< EventMsgBuilderserializeEvent (SerializeDataBuffer &sbuf, EventForOutput const &e, Handle< TriggerResults > const &triggerResults, ParameterSetID const &selectorCfg)
 
std::unique_ptr< InitMsgBuilderserializeRegistry (SerializeDataBuffer &sbuf, BranchIDLists const &branchLists, ThinnedAssociationsHelper const &helper, std::string const &processName, std::string const &moduleLabel, ParameterSetID const &toplevel, SendJobHeader::ParameterSetMap const *psetMap)
 
 StreamerOutputModuleCommon (ParameterSet const &ps, SelectedProducts const *selections, std::string const &moduleLabel)
 
 ~StreamerOutputModuleCommon ()
 

Static Public Member Functions

static void fillDescription (ParameterSetDescription &desc)
 

Protected Attributes

std::unique_ptr< SerializeDataBufferserializerBuffer_
 

Private Member Functions

void setHltMask (EventForOutput const &e, Handle< TriggerResults > const &triggerResults, std::vector< unsigned char > &hltbits) const
 

Private Attributes

StreamerCompressionAlgo compressionAlgo_
 
std::string compressionAlgoStr_
 
int compressionLevel_
 
unsigned int hltsize_
 
Strings hltTriggerSelections_
 
char host_name_ [255]
 
int lumiSectionInterval_
 
int maxEventSize_
 
uint32 outputModuleId_
 
StreamSerializer serializer_
 
double timeInSecSinceUTC
 
bool useCompression_
 

Detailed Description

Definition at line 21 of file StreamerOutputModuleCommon.h.

Constructor & Destructor Documentation

◆ StreamerOutputModuleCommon()

edm::StreamerOutputModuleCommon::StreamerOutputModuleCommon ( ParameterSet const &  ps,
SelectedProducts const *  selections,
std::string const &  moduleLabel 
)
explicit

Definition at line 26 of file StreamerOutputModuleCommon.cc.

References visDQMUpload::buf, compressionAlgo_, compressionAlgoStr_, compressionLevel_, Exception, FDEBUG, edm::getAllTriggerNames(), edm::EventSelector::getEventSelectionVString(), hltsize_, hltTriggerSelections_, host_name_, dttmaxenums::L, edm::LZMA, HerwigMaxPtPartonFilter_cfi::moduleLabel, submitPVValidationJobs::now, outputModuleId_, timeInSecSinceUTC, edm::UNCOMPRESSED, useCompression_, edm::ZLIB, and edm::ZSTD.

29  :
30 
32  useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
33  compressionAlgoStr_(ps.getUntrackedParameter<std::string>("compression_algorithm")),
34  compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
35  lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
36  hltsize_(0),
37  host_name_(),
39  outputModuleId_(0) {
40  //limits initially set for default ZLIB option
41  int minCompressionLevel = 1;
42  int maxCompressionLevel = 9;
43 
44  // test luminosity sections
45  struct timeval now;
46  struct timezone dummyTZ;
47  gettimeofday(&now, &dummyTZ);
48  timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0);
49 
50  if (useCompression_ == true) {
51  if (compressionAlgoStr_ == "ZLIB") {
53  } else if (compressionAlgoStr_ == "LZMA") {
55  minCompressionLevel = 0;
56  } else if (compressionAlgoStr_ == "ZSTD") {
58  maxCompressionLevel = 20;
59  } else if (compressionAlgoStr_ == "UNCOMPRESSED") {
61  useCompression_ = false;
63  } else
64  throw cms::Exception("StreamerOutputModuleCommon", "Compression type unknown")
65  << "Unknown compression algorithm " << compressionAlgoStr_;
66 
67  if (compressionLevel_ < minCompressionLevel) {
68  FDEBUG(9) << "Compression Level = " << compressionLevel_ << " no compression" << std::endl;
70  useCompression_ = false;
72  } else if (compressionLevel_ > maxCompressionLevel) {
73  FDEBUG(9) << "Compression Level = " << compressionLevel_ << " using max compression level "
74  << maxCompressionLevel << std::endl;
75  compressionLevel_ = maxCompressionLevel;
77  }
78  } else
80 
81  int got_host = gethostname(host_name_, 255);
82  if (got_host != 0)
83  strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
84  //loadExtraClasses();
85 
86  // 25-Jan-2008, KAB - pull out the trigger selection request
87  // which we need for the INIT message
89 
90  Strings const& hltTriggerNames = edm::getAllTriggerNames();
91  hltsize_ = hltTriggerNames.size();
92 
93  //Checksum of the module label
94  uLong crc = crc32(0L, Z_NULL, 0);
95  Bytef const* buf = (Bytef const*)(moduleLabel.data());
96  crc = crc32(crc, buf, moduleLabel.length());
97  outputModuleId_ = static_cast<uint32>(crc);
98  }
std::vector< std::string > const & getAllTriggerNames()
std::vector< std::string > Strings
Definition: MsgTools.h:18
#define FDEBUG(lev)
Definition: DebugMacros.h:19
static std::vector< std::string > getEventSelectionVString(edm::ParameterSet const &pset)
unsigned int uint32
Definition: MsgTools.h:13

◆ ~StreamerOutputModuleCommon()

edm::StreamerOutputModuleCommon::~StreamerOutputModuleCommon ( )

Definition at line 100 of file StreamerOutputModuleCommon.cc.

100 {}

Member Function Documentation

◆ fillDescription()

void edm::StreamerOutputModuleCommon::fillDescription ( ParameterSetDescription desc)
static

Definition at line 275 of file StreamerOutputModuleCommon.cc.

References submitPVResolutionJobs::desc, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by edm::StreamerOutputModuleBase::fillDescription(), evf::EvFOutputModule::fillDescriptions(), and evf::GlobalEvFOutputModule::fillDescriptions().

275  {
276  desc.addUntracked<int>("max_event_size", 7000000)->setComment("Obsolete parameter.");
277  desc.addUntracked<bool>("use_compression", true)
278  ->setComment("If True, compression will be used to write streamer file.");
279  desc.addUntracked<std::string>("compression_algorithm", "ZLIB")
280  ->setComment("Compression algorithm to use: UNCOMPRESSED, ZLIB, LZMA or ZSTD");
281  desc.addUntracked<int>("compression_level", 1)->setComment("Compression level to use on serialized ROOT events");
282  desc.addUntracked<int>("lumiSection_interval", 0)
283  ->setComment(
284  "If 0, use lumi section number from event.\n"
285  "If not 0, the interval in seconds between fake lumi sections.");
286  }

◆ getSerializerBuffer()

SerializeDataBuffer * edm::StreamerOutputModuleCommon::getSerializerBuffer ( )

Definition at line 288 of file StreamerOutputModuleCommon.cc.

References serializerBuffer_.

Referenced by edm::StreamerOutputModuleBase::beginRun(), and edm::StreamerOutputModuleBase::write().

288  {
289  auto* ptr = serializerBuffer_.get();
290  if (!ptr) {
291  serializerBuffer_ = std::make_unique<SerializeDataBuffer>();
292  ptr = serializerBuffer_.get();
293  }
294  return ptr;
295  }
std::unique_ptr< SerializeDataBuffer > serializerBuffer_

◆ serializeEvent()

std::unique_ptr< EventMsgBuilder > edm::StreamerOutputModuleCommon::serializeEvent ( SerializeDataBuffer sbuf,
EventForOutput const &  e,
Handle< TriggerResults > const &  triggerResults,
ParameterSetID const &  selectorCfg 
)

Definition at line 204 of file StreamerOutputModuleCommon.cc.

References SerializeDataBuffer::adler32_chksum(), SerializeDataBuffer::bufferPointer(), SerializeDataBuffer::comp_buf_, compressionAlgo_, compressionLevel_, filterCSVwithJSON::copy, SerializeDataBuffer::currentEventSize(), SerializeDataBuffer::currentSpaceUsed(), MillePedeFileConverter_cfg::e, Exception, SerializeDataBuffer::header_buf_, hltsize_, host_name_, BXlumiParameters_cfi::lumi, lumiSectionInterval_, mps_check::msg, submitPVValidationJobs::now, outputModuleId_, SerializeDataBuffer::reserve_size, edm::StreamSerializer::serializeEvent(), serializer_, setHltMask(), timeInSecSinceUTC, edm::triggerResults(), and useCompression_.

Referenced by edm::StreamerOutputModuleBase::write().

208  {
209  constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
210  //Lets Build the Event Message first
211 
212  //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
213  // once figured out, there is no logic involved here.
214  std::vector<bool> l1bit = {true, true, false};
215  //End of dummy data
216 
217  std::vector<unsigned char> hltbits;
218  setHltMask(e, triggerResults, hltbits);
219 
220  uint32 lumi;
221  if (lumiSectionInterval_ == 0) {
222  lumi = e.luminosityBlock();
223  } else {
224  struct timeval now;
225  struct timezone dummyTZ;
226  gettimeofday(&now, &dummyTZ);
227  double timeInSec =
228  static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0) - timeInSecSinceUTC;
229  // what about overflows?
230  if (lumiSectionInterval_ > 0)
231  lumi = static_cast<uint32>(timeInSec / lumiSectionInterval_) + 1;
232  }
233 
234  serializer_.serializeEvent(sbuf, e, selectorCfg, compressionAlgo_, compressionLevel_, reserve_size);
235 
236  // resize header_buf_ to reserved size on first written event
237  if (sbuf.header_buf_.size() < reserve_size)
238  sbuf.header_buf_.resize(reserve_size);
239 
240  auto msg = std::make_unique<EventMsgBuilder>(&sbuf.header_buf_[0],
241  sbuf.comp_buf_.size(),
242  e.id().run(),
243  e.id().event(),
244  lumi,
246  0,
247  l1bit,
248  (uint8*)&hltbits[0],
249  hltsize_,
250  (uint32)sbuf.adler32_chksum(),
251  host_name_);
252 
253  // 50000 bytes is reserved for header as has been the case with previous version which did one extra copy of event data
254  uint32 headerSize = msg->headerSize();
255  if (headerSize > reserve_size)
256  throw cms::Exception("StreamerOutputModuleCommon", "Header Overflow")
257  << " header of size " << headerSize << "bytes is too big to fit into the reserved buffer space";
258 
259  //set addresses to other buffer and copy constructed header there
260  msg->setBufAddr(&sbuf.comp_buf_[reserve_size - headerSize]);
261  msg->setEventAddr(sbuf.bufferPointer());
262  std::copy(&sbuf.header_buf_[0], &sbuf.header_buf_[headerSize], (char*)(&sbuf.comp_buf_[reserve_size - headerSize]));
263 
264  unsigned int src_size = sbuf.currentSpaceUsed();
265  msg->setEventLength(src_size); //compressed size
266  if (useCompression_)
267  msg->setOrigDataSize(
268  sbuf.currentEventSize()); //uncompressed size (or 0 if no compression -> streamer input source requires this)
269  else
270  msg->setOrigDataSize(0);
271 
272  return msg;
273  }
unsigned int currentEventSize() const
void setHltMask(EventForOutput const &e, Handle< TriggerResults > const &triggerResults, std::vector< unsigned char > &hltbits) const
std::vector< unsigned char > comp_buf_
int serializeEvent(SerializeDataBuffer &data_buffer, EventForOutput const &event, ParameterSetID const &selectorConfig, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
unsigned char const * bufferPointer() const
unsigned int uint32
Definition: MsgTools.h:13
unsigned int currentSpaceUsed() const
uint32_t adler32_chksum() const
static std::string const triggerResults("TriggerResults")
tuple msg
Definition: mps_check.py:285
unsigned char uint8
Definition: MsgTools.h:11
static constexpr unsigned int reserve_size

◆ serializeRegistry()

std::unique_ptr< InitMsgBuilder > edm::StreamerOutputModuleCommon::serializeRegistry ( SerializeDataBuffer sbuf,
BranchIDLists const &  branchLists,
ThinnedAssociationsHelper const &  helper,
std::string const &  processName,
std::string const &  moduleLabel,
ParameterSetID const &  toplevel,
SendJobHeader::ParameterSetMap const *  psetMap 
)

Definition at line 102 of file StreamerOutputModuleCommon.cc.

References SerializeDataBuffer::adler32_chksum(), SerializeDataBuffer::bufferPointer(), edm::Hash< I >::compactForm(), filterCSVwithJSON::copy, SerializeDataBuffer::currentSpaceUsed(), edm::getAllTriggerNames(), edm::getReleaseVersion(), SerializeDataBuffer::header_buf_, hltTriggerSelections_, HerwigMaxPtPartonFilter_cfi::moduleLabel, outputModuleId_, SimL1EmulatorRepack_CalouGT_cff::processName, writedatasetfile::run, serializer_, edm::StreamSerializer::serializeRegistry(), and TrackRefitter_38T_cff::src.

Referenced by edm::StreamerOutputModuleBase::beginRun().

109  {
110  if (psetMap) {
111  serializer_.serializeRegistry(sbuf, branchLists, helper, *psetMap);
112  } else {
113  serializer_.serializeRegistry(sbuf, branchLists, helper);
114  }
115  // resize header_buf_ to reflect space used in serializer_ + header
116  // I just added an overhead for header of 50000 for now
117  unsigned int src_size = sbuf.currentSpaceUsed();
118  unsigned int new_size = src_size + 50000;
119  if (sbuf.header_buf_.size() < new_size)
120  sbuf.header_buf_.resize(new_size);
121 
122  //Build the INIT Message
123  //Following values are strictly DUMMY and will be replaced
124  // once available with Utility function etc.
125  uint32 run = 1;
126 
127  //Get the Process PSet ID
128 
129  //In case we need to print it
130  // cms::Digest dig(toplevel.compactForm());
131  // cms::MD5Result r1 = dig.digest();
132  // std::string hexy = r1.toString();
133  // std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
134 
135  //L1 stays dummy as of today
136  Strings l1_names; //3
137  l1_names.push_back("t1");
138  l1_names.push_back("t10");
139  l1_names.push_back("t2");
140 
141  Strings const& hltTriggerNames = edm::getAllTriggerNames();
142 
143  auto init_message = std::make_unique<InitMsgBuilder>(&sbuf.header_buf_[0],
144  sbuf.header_buf_.size(),
145  run,
146  Version((uint8 const*)toplevel.compactForm().c_str()),
147  getReleaseVersion().c_str(),
148  processName.c_str(),
149  moduleLabel.c_str(),
151  hltTriggerNames,
153  l1_names,
154  (uint32)sbuf.adler32_chksum());
155 
156  // copy data into the destination message
157  unsigned char* src = sbuf.bufferPointer();
158  std::copy(src, src + src_size, init_message->dataAddress());
159  init_message->setDataLength(src_size);
160  return init_message;
161  }
Definition: helper.py:1
std::vector< std::string > const & getAllTriggerNames()
std::vector< std::string > Strings
Definition: MsgTools.h:18
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
unsigned char const * bufferPointer() const
unsigned int uint32
Definition: MsgTools.h:13
unsigned int currentSpaceUsed() const
uint32_t adler32_chksum() const
std::string getReleaseVersion()
unsigned char uint8
Definition: MsgTools.h:11

◆ setHltMask()

void edm::StreamerOutputModuleCommon::setHltMask ( EventForOutput const &  e,
Handle< TriggerResults > const &  triggerResults,
std::vector< unsigned char > &  hltbits 
) const
private

Definition at line 163 of file StreamerOutputModuleCommon.cc.

References ntuplemaker::fill, hltsize_, mps_fire::i, edm::hlt::Pass, and edm::triggerResults().

Referenced by serializeEvent().

165  {
166  hltbits.clear();
167 
168  std::vector<unsigned char> vHltState;
169 
170  if (triggerResults.isValid()) {
172  vHltState.push_back(((triggerResults->at(i)).state()));
173  }
174  } else {
175  // We fill all Trigger bits to valid state.
177  vHltState.push_back(hlt::Pass);
178  }
179  }
180 
181  //Pack into member hltbits
182  if (!vHltState.empty()) {
183  unsigned int packInOneByte = 4;
184  unsigned int sizeOfPackage = 1 + ((vHltState.size() - 1) / packInOneByte); //Two bits per HLT
185 
186  hltbits.resize(sizeOfPackage);
187  std::fill(hltbits.begin(), hltbits.end(), 0);
188 
189  for (std::vector<unsigned char>::size_type i = 0; i != vHltState.size(); ++i) {
190  unsigned int whichByte = i / packInOneByte;
191  unsigned int indxWithinByte = i % packInOneByte;
192  hltbits[whichByte] = hltbits[whichByte] | (vHltState[i] << (indxWithinByte * 2));
193  }
194  }
195 
196  //This is Just a printing code.
197  //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
198  //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
199  // printBits(hltbits_[i]);
200  //}
201  //std::cout << "\n";
202  }
uint16_t size_type
accept
Definition: HLTenums.h:18
static std::string const triggerResults("TriggerResults")

Member Data Documentation

◆ compressionAlgo_

StreamerCompressionAlgo edm::StreamerOutputModuleCommon::compressionAlgo_
private

Definition at line 59 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent(), and StreamerOutputModuleCommon().

◆ compressionAlgoStr_

std::string edm::StreamerOutputModuleCommon::compressionAlgoStr_
private

Definition at line 56 of file StreamerOutputModuleCommon.h.

Referenced by StreamerOutputModuleCommon().

◆ compressionLevel_

int edm::StreamerOutputModuleCommon::compressionLevel_
private

Definition at line 57 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent(), and StreamerOutputModuleCommon().

◆ hltsize_

unsigned int edm::StreamerOutputModuleCommon::hltsize_
private

◆ hltTriggerSelections_

Strings edm::StreamerOutputModuleCommon::hltTriggerSelections_
private

Definition at line 68 of file StreamerOutputModuleCommon.h.

Referenced by serializeRegistry(), and StreamerOutputModuleCommon().

◆ host_name_

char edm::StreamerOutputModuleCommon::host_name_[255]
private

Definition at line 66 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent(), and StreamerOutputModuleCommon().

◆ lumiSectionInterval_

int edm::StreamerOutputModuleCommon::lumiSectionInterval_
private

Definition at line 62 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent().

◆ maxEventSize_

int edm::StreamerOutputModuleCommon::maxEventSize_
private

Definition at line 54 of file StreamerOutputModuleCommon.h.

◆ outputModuleId_

uint32 edm::StreamerOutputModuleCommon::outputModuleId_
private

◆ serializer_

StreamSerializer edm::StreamerOutputModuleCommon::serializer_
private

Definition at line 52 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent(), and serializeRegistry().

◆ serializerBuffer_

std::unique_ptr<SerializeDataBuffer> edm::StreamerOutputModuleCommon::serializerBuffer_
protected

◆ timeInSecSinceUTC

double edm::StreamerOutputModuleCommon::timeInSecSinceUTC
private

Definition at line 63 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent(), and StreamerOutputModuleCommon().

◆ useCompression_

bool edm::StreamerOutputModuleCommon::useCompression_
private

Definition at line 55 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent(), and StreamerOutputModuleCommon().