CMS 3D CMS Logo

StreamerOutputModuleCommon.cc
Go to the documentation of this file.
2 
16 
17 #include <iostream>
18 #include <memory>
19 #include <string>
20 #include <sys/time.h>
21 #include <unistd.h>
22 #include <vector>
23 #include <zlib.h>
24 
25 namespace edm {
28  ret.hltTriggerSelections = EventSelector::getEventSelectionVString(ps);
29  ret.compressionAlgoStr = ps.getUntrackedParameter<std::string>("compression_algorithm");
30  ret.compressionLevel = ps.getUntrackedParameter<int>("compression_level");
31  ret.lumiSectionInterval = ps.getUntrackedParameter<int>("lumiSection_interval");
32  ret.useCompression = ps.getUntrackedParameter<bool>("use_compression");
33  return ret;
34  }
35 
38  std::string const& moduleLabel)
39  :
40 
41  serializer_(selections),
42  useCompression_(p.useCompression),
43  compressionAlgoStr_(p.compressionAlgoStr),
44  compressionLevel_(p.compressionLevel),
45  lumiSectionInterval_(p.lumiSectionInterval),
46  hltsize_(0),
47  host_name_(),
48  hltTriggerSelections_(),
49  outputModuleId_(0) {
50  //limits initially set for default ZLIB option
51  int minCompressionLevel = 1;
52  int maxCompressionLevel = 9;
53 
54  // test luminosity sections
55  struct timeval now;
56  struct timezone dummyTZ;
57  gettimeofday(&now, &dummyTZ);
58  timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0);
59 
60  if (useCompression_ == true) {
61  if (compressionAlgoStr_ == "ZLIB") {
63  } else if (compressionAlgoStr_ == "LZMA") {
65  minCompressionLevel = 0;
66  } else if (compressionAlgoStr_ == "ZSTD") {
68  maxCompressionLevel = 20;
69  } else if (compressionAlgoStr_ == "UNCOMPRESSED") {
71  useCompression_ = false;
73  } else
74  throw cms::Exception("StreamerOutputModuleCommon", "Compression type unknown")
75  << "Unknown compression algorithm " << compressionAlgoStr_;
76 
77  if (compressionLevel_ < minCompressionLevel) {
78  FDEBUG(9) << "Compression Level = " << compressionLevel_ << " no compression" << std::endl;
80  useCompression_ = false;
82  } else if (compressionLevel_ > maxCompressionLevel) {
83  FDEBUG(9) << "Compression Level = " << compressionLevel_ << " using max compression level "
84  << maxCompressionLevel << std::endl;
85  compressionLevel_ = maxCompressionLevel;
87  }
88  } else
90 
91  int got_host = gethostname(host_name_, 255);
92  if (got_host != 0)
93  strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
94  //loadExtraClasses();
95 
96  // 25-Jan-2008, KAB - pull out the trigger selection request
97  // which we need for the INIT message
98  hltTriggerSelections_ = p.hltTriggerSelections;
99 
100  Strings const& hltTriggerNames = edm::getAllTriggerNames();
101  hltsize_ = hltTriggerNames.size();
102 
103  //Checksum of the module label
104  uLong crc = crc32(0L, Z_NULL, 0);
105  Bytef const* buf = (Bytef const*)(moduleLabel.data());
106  crc = crc32(crc, buf, moduleLabel.length());
107  outputModuleId_ = static_cast<uint32>(crc);
108  }
109 
111 
112  std::unique_ptr<InitMsgBuilder> StreamerOutputModuleCommon::serializeRegistry(
113  SerializeDataBuffer& sbuf,
114  const BranchIDLists& branchLists,
116  std::string const& processName,
117  std::string const& moduleLabel,
118  ParameterSetID const& toplevel,
119  SendJobHeader::ParameterSetMap const* psetMap) {
120  if (psetMap) {
121  serializer_.serializeRegistry(sbuf, branchLists, helper, *psetMap);
122  } else {
123  serializer_.serializeRegistry(sbuf, branchLists, helper);
124  }
125  // resize header_buf_ to reflect space used in serializer_ + header
126  // I just added an overhead for header of 50000 for now
127  unsigned int src_size = sbuf.currentSpaceUsed();
128  unsigned int new_size = src_size + 50000;
129  if (sbuf.header_buf_.size() < new_size)
130  sbuf.header_buf_.resize(new_size);
131 
132  //Build the INIT Message
133  //Following values are strictly DUMMY and will be replaced
134  // once available with Utility function etc.
135  uint32 run = 1;
136 
137  //Get the Process PSet ID
138 
139  //In case we need to print it
140  // cms::Digest dig(toplevel.compactForm());
141  // cms::MD5Result r1 = dig.digest();
142  // std::string hexy = r1.toString();
143  // std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
144 
145  //L1 stays dummy as of today
146  Strings l1_names; //3
147  l1_names.push_back("t1");
148  l1_names.push_back("t10");
149  l1_names.push_back("t2");
150 
151  Strings const& hltTriggerNames = edm::getAllTriggerNames();
152 
153  auto init_message = std::make_unique<InitMsgBuilder>(&sbuf.header_buf_[0],
154  sbuf.header_buf_.size(),
155  run,
156  Version((uint8 const*)toplevel.compactForm().c_str()),
157  getReleaseVersion().c_str(),
158  processName.c_str(),
159  moduleLabel.c_str(),
161  hltTriggerNames,
163  l1_names,
164  (uint32)sbuf.adler32_chksum());
165 
166  // copy data into the destination message
167  unsigned char* src = sbuf.bufferPointer();
168  std::copy(src, src + src_size, init_message->dataAddress());
169  init_message->setDataLength(src_size);
170  return init_message;
171  }
172 
175  std::vector<unsigned char>& hltbits) const {
176  hltbits.clear();
177 
178  std::vector<unsigned char> vHltState;
179 
180  if (triggerResults.isValid()) {
182  vHltState.push_back(((triggerResults->at(i)).state()));
183  }
184  } else {
185  // We fill all Trigger bits to valid state.
187  vHltState.push_back(hlt::Pass);
188  }
189  }
190 
191  //Pack into member hltbits
192  if (!vHltState.empty()) {
193  unsigned int packInOneByte = 4;
194  unsigned int sizeOfPackage = 1 + ((vHltState.size() - 1) / packInOneByte); //Two bits per HLT
195 
196  hltbits.resize(sizeOfPackage);
197  std::fill(hltbits.begin(), hltbits.end(), 0);
198 
199  for (std::vector<unsigned char>::size_type i = 0; i != vHltState.size(); ++i) {
200  unsigned int whichByte = i / packInOneByte;
201  unsigned int indxWithinByte = i % packInOneByte;
202  hltbits[whichByte] = hltbits[whichByte] | (vHltState[i] << (indxWithinByte * 2));
203  }
204  }
205 
206  //This is Just a printing code.
207  //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
208  //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
209  // printBits(hltbits_[i]);
210  //}
211  //std::cout << "\n";
212  }
213 
214  std::unique_ptr<EventMsgBuilder> StreamerOutputModuleCommon::serializeEvent(
215  SerializeDataBuffer& sbuf,
216  EventForOutput const& e,
218  ParameterSetID const& selectorCfg) {
219  constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
220  //Lets Build the Event Message first
221 
222  //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
223  // once figured out, there is no logic involved here.
224  std::vector<bool> l1bit = {true, true, false};
225  //End of dummy data
226 
227  std::vector<unsigned char> hltbits;
228  setHltMask(e, triggerResults, hltbits);
229 
230  uint32 lumi;
231  if (lumiSectionInterval_ == 0) {
232  lumi = e.luminosityBlock();
233  } else {
234  struct timeval now;
235  struct timezone dummyTZ;
236  gettimeofday(&now, &dummyTZ);
237  double timeInSec =
238  static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0) - timeInSecSinceUTC;
239  // what about overflows?
240  if (lumiSectionInterval_ > 0)
241  lumi = static_cast<uint32>(timeInSec / lumiSectionInterval_) + 1;
242  }
243 
244  serializer_.serializeEvent(sbuf, e, selectorCfg, compressionAlgo_, compressionLevel_, reserve_size);
245 
246  // resize header_buf_ to reserved size on first written event
247  if (sbuf.header_buf_.size() < reserve_size)
248  sbuf.header_buf_.resize(reserve_size);
249 
250  auto msg = std::make_unique<EventMsgBuilder>(&sbuf.header_buf_[0],
251  sbuf.comp_buf_.size(),
252  e.id().run(),
253  e.id().event(),
254  lumi,
256  0,
257  l1bit,
258  (uint8*)&hltbits[0],
259  hltsize_,
260  (uint32)sbuf.adler32_chksum(),
261  host_name_);
262 
263  // 50000 bytes is reserved for header as has been the case with previous version which did one extra copy of event data
264  uint32 headerSize = msg->headerSize();
265  if (headerSize > reserve_size)
266  throw cms::Exception("StreamerOutputModuleCommon", "Header Overflow")
267  << " header of size " << headerSize << "bytes is too big to fit into the reserved buffer space";
268 
269  //set addresses to other buffer and copy constructed header there
270  msg->setBufAddr(&sbuf.comp_buf_[reserve_size - headerSize]);
271  msg->setEventAddr(sbuf.bufferPointer());
272  std::copy(&sbuf.header_buf_[0], &sbuf.header_buf_[headerSize], (char*)(&sbuf.comp_buf_[reserve_size - headerSize]));
273 
274  unsigned int src_size = sbuf.currentSpaceUsed();
275  msg->setEventLength(src_size); //compressed size
276  if (useCompression_)
277  msg->setOrigDataSize(
278  sbuf.currentEventSize()); //uncompressed size (or 0 if no compression -> streamer input source requires this)
279  else
280  msg->setOrigDataSize(0);
281 
282  return msg;
283  }
284 
286  desc.addUntracked<int>("max_event_size", 7000000)->setComment("Obsolete parameter.");
287  desc.addUntracked<bool>("use_compression", true)
288  ->setComment("If True, compression will be used to write streamer file.");
289  desc.addUntracked<std::string>("compression_algorithm", "ZLIB")
290  ->setComment("Compression algorithm to use: UNCOMPRESSED, ZLIB, LZMA or ZSTD");
291  desc.addUntracked<int>("compression_level", 1)->setComment("Compression level to use on serialized ROOT events");
292  desc.addUntracked<int>("lumiSection_interval", 0)
293  ->setComment(
294  "If 0, use lumi section number from event.\n"
295  "If not 0, the interval in seconds between fake lumi sections.");
296  }
297 
299  auto* ptr = serializerBuffer_.get();
300  if (!ptr) {
301  serializerBuffer_ = std::make_unique<SerializeDataBuffer>();
302  ptr = serializerBuffer_.get();
303  }
304  return ptr;
305  }
306 } // namespace edm
std::unique_ptr< EventMsgBuilder > serializeEvent(SerializeDataBuffer &sbuf, EventForOutput const &e, Handle< TriggerResults > const &triggerResults, ParameterSetID const &selectorCfg)
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
Definition: helper.py:1
unsigned int currentEventSize() const
std::vector< std::string > const & getAllTriggerNames()
EventSelector::Strings Strings
ret
prodAgent to be discontinued
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
void setHltMask(EventForOutput const &e, Handle< TriggerResults > const &triggerResults, std::vector< unsigned char > &hltbits) const
#define FDEBUG(lev)
Definition: DebugMacros.h:19
uint16_t size_type
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
T getUntrackedParameter(std::string const &, T const &) const
std::vector< unsigned char > comp_buf_
std::unique_ptr< SerializeDataBuffer > serializerBuffer_
int serializeEvent(SerializeDataBuffer &data_buffer, EventForOutput const &event, ParameterSetID const &selectorConfig, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
accept
Definition: HLTenums.h:18
static void fillDescription(ParameterSetDescription &desc)
static std::vector< std::string > getEventSelectionVString(edm::ParameterSet const &pset)
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
unsigned char const * bufferPointer() const
value_type compactForm() const
Definition: Hash.h:186
unsigned int uint32
Definition: MsgTools.h:13
static Parameters parameters(ParameterSet const &ps)
unsigned int currentSpaceUsed() const
uint32_t adler32_chksum() const
std::string getReleaseVersion()
std::unique_ptr< InitMsgBuilder > serializeRegistry(SerializeDataBuffer &sbuf, BranchIDLists const &branchLists, ThinnedAssociationsHelper const &helper, std::string const &processName, std::string const &moduleLabel, ParameterSetID const &toplevel, SendJobHeader::ParameterSetMap const *psetMap)
static std::string const triggerResults("TriggerResults")
tuple msg
Definition: mps_check.py:286
unsigned char uint8
Definition: MsgTools.h:11
HLT enums.
StreamerOutputModuleCommon(Parameters const &p, SelectedProducts const *selections, std::string const &moduleLabel)
static constexpr unsigned int reserve_size