CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Protected Attributes | Private Member Functions | Private Attributes
edm::StreamerOutputModuleCommon Class Reference

#include <StreamerOutputModuleCommon.h>

Inheritance diagram for edm::StreamerOutputModuleCommon:
edm::StreamerOutputModuleBase edm::StreamerOutputModule< Consumer > evf::RecoEventOutputModuleForFU< Consumer >

Public Member Functions

SerializeDataBuffergetSerializerBuffer ()
 
std::unique_ptr< EventMsgBuilderserializeEvent (SerializeDataBuffer &sbuf, EventForOutput const &e, Handle< TriggerResults > const &triggerResults, ParameterSetID const &selectorCfg)
 
std::unique_ptr< InitMsgBuilderserializeRegistry (SerializeDataBuffer &sbuf, BranchIDLists const &branchLists, ThinnedAssociationsHelper const &helper, std::string const &processName, std::string const &moduleLabel, ParameterSetID const &toplevel)
 
 StreamerOutputModuleCommon (ParameterSet const &ps, SelectedProducts const *selections)
 
 ~StreamerOutputModuleCommon ()
 

Static Public Member Functions

static void fillDescription (ParameterSetDescription &desc)
 

Protected Attributes

std::unique_ptr< SerializeDataBufferserializerBuffer_
 

Private Member Functions

void setHltMask (EventForOutput const &e, Handle< TriggerResults > const &triggerResults, std::vector< unsigned char > &hltbits) const
 

Private Attributes

StreamerCompressionAlgo compressionAlgo_
 
std::string compressionAlgoStr_
 
int compressionLevel_
 
unsigned int hltsize_
 
Strings hltTriggerSelections_
 
char host_name_ [255]
 
int lumiSectionInterval_
 
int maxEventSize_
 
uint32 outputModuleId_
 
StreamSerializer serializer_
 
double timeInSecSinceUTC
 
bool useCompression_
 

Detailed Description

Definition at line 20 of file StreamerOutputModuleCommon.h.

Constructor & Destructor Documentation

edm::StreamerOutputModuleCommon::StreamerOutputModuleCommon ( ParameterSet const &  ps,
SelectedProducts const *  selections 
)
explicit

Definition at line 26 of file StreamerOutputModuleCommon.cc.

References compressionAlgo_, compressionAlgoStr_, compressionLevel_, Exception, FDEBUG, edm::EventSelector::getEventSelectionVString(), hltTriggerSelections_, host_name_, edm::LZMA, timeInSecSinceUTC, edm::UNCOMPRESSED, useCompression_, edm::ZLIB, and edm::ZSTD.

27  :
28 
30  useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
31  compressionAlgoStr_(ps.getUntrackedParameter<std::string>("compression_algorithm")),
32  compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
33  lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
34  hltsize_(0),
35  host_name_(),
37  outputModuleId_(0) {
38  //limits initially set for default ZLIB option
39  int minCompressionLevel = 1;
40  int maxCompressionLevel = 9;
41 
42  // test luminosity sections
43  struct timeval now;
44  struct timezone dummyTZ;
45  gettimeofday(&now, &dummyTZ);
46  timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0);
47 
48  if (useCompression_ == true) {
49  if (compressionAlgoStr_ == "ZLIB") {
51  } else if (compressionAlgoStr_ == "LZMA") {
53  minCompressionLevel = 0;
54  } else if (compressionAlgoStr_ == "ZSTD") {
56  maxCompressionLevel = 20;
57  } else if (compressionAlgoStr_ == "UNCOMPRESSED") {
59  useCompression_ = false;
61  } else
62  throw cms::Exception("StreamerOutputModuleCommon", "Compression type unknown")
63  << "Unknown compression algorithm " << compressionAlgoStr_;
64 
65  if (compressionLevel_ < minCompressionLevel) {
66  FDEBUG(9) << "Compression Level = " << compressionLevel_ << " no compression" << std::endl;
68  useCompression_ = false;
70  } else if (compressionLevel_ > maxCompressionLevel) {
71  FDEBUG(9) << "Compression Level = " << compressionLevel_ << " using max compression level "
72  << maxCompressionLevel << std::endl;
73  compressionLevel_ = maxCompressionLevel;
75  }
76  } else
78 
79  int got_host = gethostname(host_name_, 255);
80  if (got_host != 0)
81  strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
82  //loadExtraClasses();
83 
84  // 25-Jan-2008, KAB - pull out the trigger selection request
85  // which we need for the INIT message
87  }
static std::vector< std::string > getEventSelectionVString(edm::ParameterSet const &pset)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::StreamerOutputModuleCommon::~StreamerOutputModuleCommon ( )

Definition at line 89 of file StreamerOutputModuleCommon.cc.

89 {}

Member Function Documentation

void edm::StreamerOutputModuleCommon::fillDescription ( ParameterSetDescription desc)
static

Definition at line 265 of file StreamerOutputModuleCommon.cc.

References edm::ParameterSetDescription::addUntracked(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by edm::StreamerOutputModuleBase::fillDescription(), and evf::EvFOutputModule::fillDescriptions().

265  {
266  desc.addUntracked<int>("max_event_size", 7000000)->setComment("Obsolete parameter.");
267  desc.addUntracked<bool>("use_compression", true)
268  ->setComment("If True, compression will be used to write streamer file.");
269  desc.addUntracked<std::string>("compression_algorithm", "ZLIB")
270  ->setComment("Compression algorithm to use: UNCOMPRESSED, ZLIB, LZMA or ZSTD");
271  desc.addUntracked<int>("compression_level", 1)->setComment("Compression level to use on serialized ROOT events");
272  desc.addUntracked<int>("lumiSection_interval", 0)
273  ->setComment(
274  "If 0, use lumi section number from event.\n"
275  "If not 0, the interval in seconds between fake lumi sections.");
276  }
SerializeDataBuffer * edm::StreamerOutputModuleCommon::getSerializerBuffer ( )

Definition at line 278 of file StreamerOutputModuleCommon.cc.

References serializerBuffer_.

Referenced by edm::StreamerOutputModuleBase::beginRun(), and edm::StreamerOutputModuleBase::write().

278  {
279  auto* ptr = serializerBuffer_.get();
280  if (!ptr) {
282  ptr = serializerBuffer_.get();
283  }
284  return ptr;
285  }
std::unique_ptr< SerializeDataBuffer > serializerBuffer_
std::unique_ptr< EventMsgBuilder > edm::StreamerOutputModuleCommon::serializeEvent ( SerializeDataBuffer sbuf,
EventForOutput const &  e,
Handle< TriggerResults > const &  triggerResults,
ParameterSetID const &  selectorCfg 
)

Definition at line 194 of file StreamerOutputModuleCommon.cc.

References SerializeDataBuffer::adler32_chksum(), SerializeDataBuffer::bufferPointer(), SerializeDataBuffer::comp_buf_, compressionAlgo_, compressionLevel_, constexpr, filterCSVwithJSON::copy, SerializeDataBuffer::currentEventSize(), SerializeDataBuffer::currentSpaceUsed(), edm::EventID::event(), Exception, SerializeDataBuffer::header_buf_, hltsize_, host_name_, edm::EventForOutput::id(), BXlumiParameters_cfi::lumi, edm::EventForOutput::luminosityBlock(), lumiSectionInterval_, mps_check::msg, outputModuleId_, SerializeDataBuffer::reserve_size, edm::EventID::run(), edm::StreamSerializer::serializeEvent(), serializer_, setHltMask(), timeInSecSinceUTC, and useCompression_.

Referenced by edm::StreamerOutputModuleBase::write().

198  {
199  constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
200  //Lets Build the Event Message first
201 
202  //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
203  // once figured out, there is no logic involved here.
204  std::vector<bool> l1bit = {true, true, false};
205  //End of dummy data
206 
207  std::vector<unsigned char> hltbits;
208  setHltMask(e, triggerResults, hltbits);
209 
210  uint32 lumi;
211  if (lumiSectionInterval_ == 0) {
212  lumi = e.luminosityBlock();
213  } else {
214  struct timeval now;
215  struct timezone dummyTZ;
216  gettimeofday(&now, &dummyTZ);
217  double timeInSec =
218  static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0) - timeInSecSinceUTC;
219  // what about overflows?
220  if (lumiSectionInterval_ > 0)
221  lumi = static_cast<uint32>(timeInSec / lumiSectionInterval_) + 1;
222  }
223 
224  serializer_.serializeEvent(sbuf, e, selectorCfg, compressionAlgo_, compressionLevel_, reserve_size);
225 
226  // resize header_buf_ to reserved size on first written event
227  if (sbuf.header_buf_.size() < reserve_size)
228  sbuf.header_buf_.resize(reserve_size);
229 
230  auto msg = std::make_unique<EventMsgBuilder>(&sbuf.header_buf_[0],
231  sbuf.comp_buf_.size(),
232  e.id().run(),
233  e.id().event(),
234  lumi,
236  0,
237  l1bit,
238  (uint8*)&hltbits[0],
239  hltsize_,
240  (uint32)sbuf.adler32_chksum(),
241  host_name_);
242 
243  // 50000 bytes is reserved for header as has been the case with previous version which did one extra copy of event data
244  uint32 headerSize = msg->headerSize();
245  if (headerSize > reserve_size)
246  throw cms::Exception("StreamerOutputModuleCommon", "Header Overflow")
247  << " header of size " << headerSize << "bytes is too big to fit into the reserved buffer space";
248 
249  //set addresses to other buffer and copy constructed header there
250  msg->setBufAddr(&sbuf.comp_buf_[reserve_size - headerSize]);
251  msg->setEventAddr(sbuf.bufferPointer());
252  std::copy(&sbuf.header_buf_[0], &sbuf.header_buf_[headerSize], (char*)(&sbuf.comp_buf_[reserve_size - headerSize]));
253 
254  unsigned int src_size = sbuf.currentSpaceUsed();
255  msg->setEventLength(src_size); //compressed size
256  if (useCompression_)
257  msg->setOrigDataSize(
258  sbuf.currentEventSize()); //uncompressed size (or 0 if no compression -> streamer input source requires this)
259  else
260  msg->setOrigDataSize(0);
261 
262  return msg;
263  }
unsigned int currentSpaceUsed() const
std::vector< unsigned char > comp_buf_
unsigned char const * bufferPointer() const
unsigned int uint32
Definition: MsgTools.h:13
void setHltMask(EventForOutput const &e, Handle< TriggerResults > const &triggerResults, std::vector< unsigned char > &hltbits) const
int serializeEvent(SerializeDataBuffer &data_buffer, EventForOutput const &event, ParameterSetID const &selectorConfig, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
tuple msg
Definition: mps_check.py:285
uint32_t adler32_chksum() const
unsigned char uint8
Definition: MsgTools.h:11
static constexpr unsigned int reserve_size
unsigned int currentEventSize() const
#define constexpr
std::unique_ptr< InitMsgBuilder > edm::StreamerOutputModuleCommon::serializeRegistry ( SerializeDataBuffer sbuf,
BranchIDLists const &  branchLists,
ThinnedAssociationsHelper const &  helper,
std::string const &  processName,
std::string const &  moduleLabel,
ParameterSetID const &  toplevel 
)

Definition at line 91 of file StreamerOutputModuleCommon.cc.

References SerializeDataBuffer::adler32_chksum(), SerializeDataBuffer::bufferPointer(), edm::Hash< I >::compactForm(), filterCSVwithJSON::copy, SerializeDataBuffer::currentSpaceUsed(), edm::getAllTriggerNames(), edm::getReleaseVersion(), SerializeDataBuffer::header_buf_, hltsize_, hltTriggerSelections_, dttmaxenums::L, outputModuleId_, writedatasetfile::run, serializer_, edm::StreamSerializer::serializeRegistry(), and TrackRefitter_38T_cff::src.

Referenced by edm::StreamerOutputModuleBase::beginRun().

96  {
97  serializer_.serializeRegistry(sbuf, branchLists, helper);
98 
99  // resize header_buf_ to reflect space used in serializer_ + header
100  // I just added an overhead for header of 50000 for now
101  unsigned int src_size = sbuf.currentSpaceUsed();
102  unsigned int new_size = src_size + 50000;
103  if (sbuf.header_buf_.size() < new_size)
104  sbuf.header_buf_.resize(new_size);
105 
106  //Build the INIT Message
107  //Following values are strictly DUMMY and will be replaced
108  // once available with Utility function etc.
109  uint32 run = 1;
110 
111  //Get the Process PSet ID
112 
113  //In case we need to print it
114  // cms::Digest dig(toplevel.compactForm());
115  // cms::MD5Result r1 = dig.digest();
116  // std::string hexy = r1.toString();
117  // std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
118  Strings const& hltTriggerNames = edm::getAllTriggerNames();
119  hltsize_ = hltTriggerNames.size();
120 
121  //L1 stays dummy as of today
122  Strings l1_names; //3
123  l1_names.push_back("t1");
124  l1_names.push_back("t10");
125  l1_names.push_back("t2");
126 
127  //Setting the process name to HLT
128  uLong crc = crc32(0L, Z_NULL, 0);
129  Bytef const* buf = (Bytef const*)(moduleLabel.data());
130  crc = crc32(crc, buf, moduleLabel.length());
131  outputModuleId_ = static_cast<uint32>(crc);
132 
133  auto init_message = std::make_unique<InitMsgBuilder>(&sbuf.header_buf_[0],
134  sbuf.header_buf_.size(),
135  run,
136  Version((uint8 const*)toplevel.compactForm().c_str()),
137  getReleaseVersion().c_str(),
138  processName.c_str(),
139  moduleLabel.c_str(),
141  hltTriggerNames,
143  l1_names,
144  (uint32)sbuf.adler32_chksum());
145 
146  // copy data into the destination message
147  unsigned char* src = sbuf.bufferPointer();
148  std::copy(src, src + src_size, init_message->dataAddress());
149  init_message->setDataLength(src_size);
150  return init_message;
151  }
Definition: helper.py:1
std::vector< std::string > Strings
Definition: MsgTools.h:18
unsigned int currentSpaceUsed() const
unsigned char const * bufferPointer() const
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
unsigned int uint32
Definition: MsgTools.h:13
std::string getReleaseVersion()
uint32_t adler32_chksum() const
unsigned char uint8
Definition: MsgTools.h:11
std::vector< std::string > const & getAllTriggerNames()
void edm::StreamerOutputModuleCommon::setHltMask ( EventForOutput const &  e,
Handle< TriggerResults > const &  triggerResults,
std::vector< unsigned char > &  hltbits 
) const
private

Definition at line 153 of file StreamerOutputModuleCommon.cc.

References ntuplemaker::fill, hltsize_, mps_fire::i, edm::HandleBase::isValid(), and edm::hlt::Pass.

Referenced by serializeEvent().

155  {
156  hltbits.clear();
157 
158  std::vector<unsigned char> vHltState;
159 
160  if (triggerResults.isValid()) {
162  vHltState.push_back(((triggerResults->at(i)).state()));
163  }
164  } else {
165  // We fill all Trigger bits to valid state.
167  vHltState.push_back(hlt::Pass);
168  }
169  }
170 
171  //Pack into member hltbits
172  if (!vHltState.empty()) {
173  unsigned int packInOneByte = 4;
174  unsigned int sizeOfPackage = 1 + ((vHltState.size() - 1) / packInOneByte); //Two bits per HLT
175 
176  hltbits.resize(sizeOfPackage);
177  std::fill(hltbits.begin(), hltbits.end(), 0);
178 
179  for (std::vector<unsigned char>::size_type i = 0; i != vHltState.size(); ++i) {
180  unsigned int whichByte = i / packInOneByte;
181  unsigned int indxWithinByte = i % packInOneByte;
182  hltbits[whichByte] = hltbits[whichByte] | (vHltState[i] << (indxWithinByte * 2));
183  }
184  }
185 
186  //This is Just a printing code.
187  //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
188  //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
189  // printBits(hltbits_[i]);
190  //}
191  //std::cout << "\n";
192  }
uint16_t size_type
accept
Definition: HLTenums.h:18

Member Data Documentation

StreamerCompressionAlgo edm::StreamerOutputModuleCommon::compressionAlgo_
private

Definition at line 55 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent(), and StreamerOutputModuleCommon().

std::string edm::StreamerOutputModuleCommon::compressionAlgoStr_
private

Definition at line 52 of file StreamerOutputModuleCommon.h.

Referenced by StreamerOutputModuleCommon().

int edm::StreamerOutputModuleCommon::compressionLevel_
private

Definition at line 53 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent(), and StreamerOutputModuleCommon().

unsigned int edm::StreamerOutputModuleCommon::hltsize_
private

Definition at line 61 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent(), serializeRegistry(), and setHltMask().

Strings edm::StreamerOutputModuleCommon::hltTriggerSelections_
private

Definition at line 64 of file StreamerOutputModuleCommon.h.

Referenced by serializeRegistry(), and StreamerOutputModuleCommon().

char edm::StreamerOutputModuleCommon::host_name_[255]
private

Definition at line 62 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent(), and StreamerOutputModuleCommon().

int edm::StreamerOutputModuleCommon::lumiSectionInterval_
private

Definition at line 58 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent().

int edm::StreamerOutputModuleCommon::maxEventSize_
private

Definition at line 50 of file StreamerOutputModuleCommon.h.

uint32 edm::StreamerOutputModuleCommon::outputModuleId_
private

Definition at line 65 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent(), and serializeRegistry().

StreamSerializer edm::StreamerOutputModuleCommon::serializer_
private

Definition at line 48 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent(), and serializeRegistry().

std::unique_ptr<SerializeDataBuffer> edm::StreamerOutputModuleCommon::serializerBuffer_
protected
double edm::StreamerOutputModuleCommon::timeInSecSinceUTC
private

Definition at line 59 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent(), and StreamerOutputModuleCommon().

bool edm::StreamerOutputModuleCommon::useCompression_
private

Definition at line 51 of file StreamerOutputModuleCommon.h.

Referenced by serializeEvent(), and StreamerOutputModuleCommon().