CMS 3D CMS Logo

StreamerOutputModuleCommon.cc
Go to the documentation of this file.
2 
16 
17 #include <iostream>
18 #include <memory>
19 #include <string>
20 #include <sys/time.h>
21 #include <unistd.h>
22 #include <vector>
23 #include <zlib.h>
24 
25 namespace edm {
27  :
28 
29  serializer_(selections),
30  useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
31  compressionAlgoStr_(ps.getUntrackedParameter<std::string>("compression_algorithm")),
32  compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
33  lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
34  hltsize_(0),
35  host_name_(),
36  hltTriggerSelections_(),
37  outputModuleId_(0) {
38  //limits initially set for default ZLIB option
39  int minCompressionLevel = 1;
40  int maxCompressionLevel = 9;
41 
42  // test luminosity sections
43  struct timeval now;
44  struct timezone dummyTZ;
45  gettimeofday(&now, &dummyTZ);
46  timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0);
47 
48  if (useCompression_ == true) {
49  if (compressionAlgoStr_ == "ZLIB") {
51  } else if (compressionAlgoStr_ == "LZMA") {
53  minCompressionLevel = 0;
54  } else if (compressionAlgoStr_ == "ZSTD") {
56  maxCompressionLevel = 20;
57  } else if (compressionAlgoStr_ == "UNCOMPRESSED") {
59  useCompression_ = false;
61  } else
62  throw cms::Exception("StreamerOutputModuleCommon", "Compression type unknown")
63  << "Unknown compression algorithm " << compressionAlgoStr_;
64 
65  if (compressionLevel_ < minCompressionLevel) {
66  FDEBUG(9) << "Compression Level = " << compressionLevel_ << " no compression" << std::endl;
68  useCompression_ = false;
70  } else if (compressionLevel_ > maxCompressionLevel) {
71  FDEBUG(9) << "Compression Level = " << compressionLevel_ << " using max compression level "
72  << maxCompressionLevel << std::endl;
73  compressionLevel_ = maxCompressionLevel;
75  }
76  } else
78 
79  int got_host = gethostname(host_name_, 255);
80  if (got_host != 0)
81  strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
82  //loadExtraClasses();
83 
84  // 25-Jan-2008, KAB - pull out the trigger selection request
85  // which we need for the INIT message
87  }
88 
90 
91  std::unique_ptr<InitMsgBuilder> StreamerOutputModuleCommon::serializeRegistry(
92  SerializeDataBuffer& sbuf,
93  const BranchIDLists& branchLists,
95  std::string const& processName,
96  std::string const& moduleLabel,
97  ParameterSetID const& toplevel,
98  SendJobHeader::ParameterSetMap const* psetMap) {
99  if (psetMap) {
100  serializer_.serializeRegistry(sbuf, branchLists, helper, *psetMap);
101  } else {
102  serializer_.serializeRegistry(sbuf, branchLists, helper);
103  }
104  // resize header_buf_ to reflect space used in serializer_ + header
105  // I just added an overhead for header of 50000 for now
106  unsigned int src_size = sbuf.currentSpaceUsed();
107  unsigned int new_size = src_size + 50000;
108  if (sbuf.header_buf_.size() < new_size)
109  sbuf.header_buf_.resize(new_size);
110 
111  //Build the INIT Message
112  //Following values are strictly DUMMY and will be replaced
113  // once available with Utility function etc.
114  uint32 run = 1;
115 
116  //Get the Process PSet ID
117 
118  //In case we need to print it
119  // cms::Digest dig(toplevel.compactForm());
120  // cms::MD5Result r1 = dig.digest();
121  // std::string hexy = r1.toString();
122  // std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
123  Strings const& hltTriggerNames = edm::getAllTriggerNames();
124  hltsize_ = hltTriggerNames.size();
125 
126  //L1 stays dummy as of today
127  Strings l1_names; //3
128  l1_names.push_back("t1");
129  l1_names.push_back("t10");
130  l1_names.push_back("t2");
131 
132  //Setting the process name to HLT
133  uLong crc = crc32(0L, Z_NULL, 0);
134  Bytef const* buf = (Bytef const*)(moduleLabel.data());
135  crc = crc32(crc, buf, moduleLabel.length());
136  outputModuleId_ = static_cast<uint32>(crc);
137 
138  auto init_message = std::make_unique<InitMsgBuilder>(&sbuf.header_buf_[0],
139  sbuf.header_buf_.size(),
140  run,
141  Version((uint8 const*)toplevel.compactForm().c_str()),
142  getReleaseVersion().c_str(),
143  processName.c_str(),
144  moduleLabel.c_str(),
146  hltTriggerNames,
148  l1_names,
149  (uint32)sbuf.adler32_chksum());
150 
151  // copy data into the destination message
152  unsigned char* src = sbuf.bufferPointer();
153  std::copy(src, src + src_size, init_message->dataAddress());
154  init_message->setDataLength(src_size);
155  return init_message;
156  }
157 
160  std::vector<unsigned char>& hltbits) const {
161  hltbits.clear();
162 
163  std::vector<unsigned char> vHltState;
164 
165  if (triggerResults.isValid()) {
167  vHltState.push_back(((triggerResults->at(i)).state()));
168  }
169  } else {
170  // We fill all Trigger bits to valid state.
172  vHltState.push_back(hlt::Pass);
173  }
174  }
175 
176  //Pack into member hltbits
177  if (!vHltState.empty()) {
178  unsigned int packInOneByte = 4;
179  unsigned int sizeOfPackage = 1 + ((vHltState.size() - 1) / packInOneByte); //Two bits per HLT
180 
181  hltbits.resize(sizeOfPackage);
182  std::fill(hltbits.begin(), hltbits.end(), 0);
183 
184  for (std::vector<unsigned char>::size_type i = 0; i != vHltState.size(); ++i) {
185  unsigned int whichByte = i / packInOneByte;
186  unsigned int indxWithinByte = i % packInOneByte;
187  hltbits[whichByte] = hltbits[whichByte] | (vHltState[i] << (indxWithinByte * 2));
188  }
189  }
190 
191  //This is Just a printing code.
192  //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
193  //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
194  // printBits(hltbits_[i]);
195  //}
196  //std::cout << "\n";
197  }
198 
199  std::unique_ptr<EventMsgBuilder> StreamerOutputModuleCommon::serializeEvent(
200  SerializeDataBuffer& sbuf,
201  EventForOutput const& e,
203  ParameterSetID const& selectorCfg) {
204  constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
205  //Lets Build the Event Message first
206 
207  //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
208  // once figured out, there is no logic involved here.
209  std::vector<bool> l1bit = {true, true, false};
210  //End of dummy data
211 
212  std::vector<unsigned char> hltbits;
213  setHltMask(e, triggerResults, hltbits);
214 
215  uint32 lumi;
216  if (lumiSectionInterval_ == 0) {
217  lumi = e.luminosityBlock();
218  } else {
219  struct timeval now;
220  struct timezone dummyTZ;
221  gettimeofday(&now, &dummyTZ);
222  double timeInSec =
223  static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0) - timeInSecSinceUTC;
224  // what about overflows?
225  if (lumiSectionInterval_ > 0)
226  lumi = static_cast<uint32>(timeInSec / lumiSectionInterval_) + 1;
227  }
228 
229  serializer_.serializeEvent(sbuf, e, selectorCfg, compressionAlgo_, compressionLevel_, reserve_size);
230 
231  // resize header_buf_ to reserved size on first written event
232  if (sbuf.header_buf_.size() < reserve_size)
233  sbuf.header_buf_.resize(reserve_size);
234 
235  auto msg = std::make_unique<EventMsgBuilder>(&sbuf.header_buf_[0],
236  sbuf.comp_buf_.size(),
237  e.id().run(),
238  e.id().event(),
239  lumi,
241  0,
242  l1bit,
243  (uint8*)&hltbits[0],
244  hltsize_,
245  (uint32)sbuf.adler32_chksum(),
246  host_name_);
247 
248  // 50000 bytes is reserved for header as has been the case with previous version which did one extra copy of event data
249  uint32 headerSize = msg->headerSize();
250  if (headerSize > reserve_size)
251  throw cms::Exception("StreamerOutputModuleCommon", "Header Overflow")
252  << " header of size " << headerSize << "bytes is too big to fit into the reserved buffer space";
253 
254  //set addresses to other buffer and copy constructed header there
255  msg->setBufAddr(&sbuf.comp_buf_[reserve_size - headerSize]);
256  msg->setEventAddr(sbuf.bufferPointer());
257  std::copy(&sbuf.header_buf_[0], &sbuf.header_buf_[headerSize], (char*)(&sbuf.comp_buf_[reserve_size - headerSize]));
258 
259  unsigned int src_size = sbuf.currentSpaceUsed();
260  msg->setEventLength(src_size); //compressed size
261  if (useCompression_)
262  msg->setOrigDataSize(
263  sbuf.currentEventSize()); //uncompressed size (or 0 if no compression -> streamer input source requires this)
264  else
265  msg->setOrigDataSize(0);
266 
267  return msg;
268  }
269 
271  desc.addUntracked<int>("max_event_size", 7000000)->setComment("Obsolete parameter.");
272  desc.addUntracked<bool>("use_compression", true)
273  ->setComment("If True, compression will be used to write streamer file.");
274  desc.addUntracked<std::string>("compression_algorithm", "ZLIB")
275  ->setComment("Compression algorithm to use: UNCOMPRESSED, ZLIB, LZMA or ZSTD");
276  desc.addUntracked<int>("compression_level", 1)->setComment("Compression level to use on serialized ROOT events");
277  desc.addUntracked<int>("lumiSection_interval", 0)
278  ->setComment(
279  "If 0, use lumi section number from event.\n"
280  "If not 0, the interval in seconds between fake lumi sections.");
281  }
282 
284  auto* ptr = serializerBuffer_.get();
285  if (!ptr) {
286  serializerBuffer_ = std::make_unique<SerializeDataBuffer>();
287  ptr = serializerBuffer_.get();
288  }
289  return ptr;
290  }
291 } // namespace edm
edm::StreamerOutputModuleCommon::serializeRegistry
std::unique_ptr< InitMsgBuilder > serializeRegistry(SerializeDataBuffer &sbuf, BranchIDLists const &branchLists, ThinnedAssociationsHelper const &helper, std::string const &processName, std::string const &moduleLabel, ParameterSetID const &toplevel, SendJobHeader::ParameterSetMap const *psetMap)
Definition: StreamerOutputModuleCommon.cc:91
edm::StreamerOutputModuleCommon::serializer_
StreamSerializer serializer_
Definition: StreamerOutputModuleCommon.h:50
dttmaxenums::L
Definition: DTTMax.h:29
electrons_cff.bool
bool
Definition: electrons_cff.py:366
mps_fire.i
i
Definition: mps_fire.py:428
edm::getAllTriggerNames
std::vector< std::string > const & getAllTriggerNames()
Definition: getAllTriggerNames.cc:22
edm::UNCOMPRESSED
Definition: StreamSerializer.h:66
filterCSVwithJSON.copy
copy
Definition: filterCSVwithJSON.py:36
TriggerResults.h
uint8
unsigned char uint8
Definition: MsgTools.h:11
submitPVValidationJobs.now
now
Definition: submitPVValidationJobs.py:639
edm::ZSTD
Definition: StreamSerializer.h:66
edm
HLT enums.
Definition: AlignableModifier.h:19
SerializeDataBuffer::adler32_chksum
uint32_t adler32_chksum() const
Definition: StreamSerializer.h:45
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
SerializeDataBuffer::header_buf_
SBuffer header_buf_
Definition: StreamSerializer.h:59
edm::StreamerOutputModuleCommon::hltTriggerSelections_
Strings hltTriggerSelections_
Definition: StreamerOutputModuleCommon.h:66
mps_check.msg
tuple msg
Definition: mps_check.py:285
EventForOutput.h
edm::StreamerOutputModuleCommon::outputModuleId_
uint32 outputModuleId_
Definition: StreamerOutputModuleCommon.h:67
edm::Handle
Definition: AssociativeIterator.h:50
getAllTriggerNames.h
SerializeDataBuffer::comp_buf_
std::vector< unsigned char > comp_buf_
Definition: StreamSerializer.h:54
edm::StreamerOutputModuleCommon::serializerBuffer_
std::unique_ptr< SerializeDataBuffer > serializerBuffer_
Definition: StreamerOutputModuleCommon.h:43
uint32
unsigned int uint32
Definition: MsgTools.h:13
edm::SelectedProducts
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
Definition: SelectedProducts.h:11
edm::StreamerOutputModuleCommon::host_name_
char host_name_[255]
Definition: StreamerOutputModuleCommon.h:64
ModuleDescription.h
EventSelector.h
edm::LZMA
Definition: StreamSerializer.h:66
BXlumiParameters_cfi.lumi
lumi
Definition: BXlumiParameters_cfi.py:6
edm::StreamerOutputModuleCommon::compressionLevel_
int compressionLevel_
Definition: StreamerOutputModuleCommon.h:55
edm::StreamerOutputModuleCommon::compressionAlgo_
StreamerCompressionAlgo compressionAlgo_
Definition: StreamerOutputModuleCommon.h:57
trigger::size_type
uint16_t size_type
Definition: TriggerTypeDefs.h:18
edm::StreamerOutputModuleCommon::timeInSecSinceUTC
double timeInSecSinceUTC
Definition: StreamerOutputModuleCommon.h:61
InitMsgBuilder.h
EventMsgBuilder.h
edm::Hash< ParameterSetType >
SerializeDataBuffer::bufferPointer
unsigned const char * bufferPointer() const
Definition: StreamSerializer.h:41
edm::StreamSerializer::serializeEvent
int serializeEvent(SerializeDataBuffer &data_buffer, EventForOutput const &event, ParameterSetID const &selectorConfig, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const
Definition: StreamSerializer.cc:132
ParameterSetDescription.h
edm::BranchIDLists
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
edm::StreamerOutputModuleCommon::~StreamerOutputModuleCommon
~StreamerOutputModuleCommon()
Definition: StreamerOutputModuleCommon.cc:89
ntuplemaker.fill
fill
Definition: ntuplemaker.py:304
edm::ThinnedAssociationsHelper
Definition: ThinnedAssociationsHelper.h:37
edm::ParameterSet
Definition: ParameterSet.h:47
GetReleaseVersion.h
TrackRefitter_38T_cff.src
src
Definition: TrackRefitter_38T_cff.py:24
edm::StreamerOutputModuleCommon::useCompression_
bool useCompression_
Definition: StreamerOutputModuleCommon.h:53
edm::Strings
EventSelector::Strings Strings
Definition: EventSelector.cc:48
FDEBUG
#define FDEBUG(lev)
Definition: DebugMacros.h:19
helper
Definition: helper.py:1
createfilelist.int
int
Definition: createfilelist.py:10
StreamerOutputModuleCommon.h
edm::Hash::compactForm
value_type compactForm() const
Definition: Hash.h:186
edm::StreamerOutputModuleCommon::getSerializerBuffer
SerializeDataBuffer * getSerializerBuffer()
Definition: StreamerOutputModuleCommon.cc:283
visDQMUpload.buf
buf
Definition: visDQMUpload.py:160
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
SimL1EmulatorRepack_CalouGT_cff.processName
processName
Definition: SimL1EmulatorRepack_CalouGT_cff.py:17
edm::StreamSerializer::serializeRegistry
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
Definition: StreamSerializer.cc:45
edm::SendJobHeader::ParameterSetMap
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
Definition: StreamedProducts.h:104
edm::EventForOutput
Definition: EventForOutput.h:50
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
edm::StreamerOutputModuleCommon::serializeEvent
std::unique_ptr< EventMsgBuilder > serializeEvent(SerializeDataBuffer &sbuf, EventForOutput const &e, Handle< TriggerResults > const &triggerResults, ParameterSetID const &selectorCfg)
Definition: StreamerOutputModuleCommon.cc:199
std
Definition: JetResolutionObject.h:76
writedatasetfile.run
run
Definition: writedatasetfile.py:27
RunInfoPI::state
state
Definition: RunInfoPayloadInspectoHelper.h:16
SerializeDataBuffer
Definition: StreamSerializer.h:23
edm::getReleaseVersion
std::string getReleaseVersion()
Definition: GetReleaseVersion.cc:7
Exception
Definition: hltDiff.cc:245
edm::ZLIB
Definition: StreamSerializer.h:66
edm::triggerResults
static const std::string triggerResults("TriggerResults")
edm::StreamerOutputModuleCommon::lumiSectionInterval_
int lumiSectionInterval_
Definition: StreamerOutputModuleCommon.h:60
edm::EventSelector::getEventSelectionVString
static std::vector< std::string > getEventSelectionVString(edm::ParameterSet const &pset)
Definition: EventSelector.cc:812
edm::StreamerOutputModuleCommon::hltsize_
unsigned int hltsize_
Definition: StreamerOutputModuleCommon.h:63
clusterbigeventsdebugger_cfi.selections
selections
Definition: clusterbigeventsdebugger_cfi.py:10
ParameterSetID.h
SerializeDataBuffer::reserve_size
static constexpr unsigned int reserve_size
Definition: StreamSerializer.h:26
DebugMacros.h
edm::StreamerOutputModuleCommon::setHltMask
void setHltMask(EventForOutput const &e, Handle< TriggerResults > const &triggerResults, std::vector< unsigned char > &hltbits) const
Definition: StreamerOutputModuleCommon.cc:158
ParameterSet.h
HerwigMaxPtPartonFilter_cfi.moduleLabel
moduleLabel
Definition: HerwigMaxPtPartonFilter_cfi.py:4
edm::StreamerOutputModuleCommon::fillDescription
static void fillDescription(ParameterSetDescription &desc)
Definition: StreamerOutputModuleCommon.cc:270
edm::StreamerOutputModuleCommon::StreamerOutputModuleCommon
StreamerOutputModuleCommon(ParameterSet const &ps, SelectedProducts const *selections)
Definition: StreamerOutputModuleCommon.cc:26
edm::hlt::Pass
accept
Definition: HLTenums.h:18
edm::StreamerOutputModuleCommon::compressionAlgoStr_
std::string compressionAlgoStr_
Definition: StreamerOutputModuleCommon.h:54
Version
Definition: InitMessage.h:39
lumi
Definition: LumiSectionData.h:20
SerializeDataBuffer::currentEventSize
unsigned int currentEventSize() const
Definition: StreamSerializer.h:44
SelectedProducts.h
SerializeDataBuffer::currentSpaceUsed
unsigned int currentSpaceUsed() const
Definition: StreamSerializer.h:43
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37