CMS 3D CMS Logo

StreamerOutputModuleCommon.cc
Go to the documentation of this file.
2 
16 
17 #include <iostream>
18 #include <memory>
19 #include <string>
20 #include <sys/time.h>
21 #include <unistd.h>
22 #include <vector>
23 #include <zlib.h>
24 
25 namespace edm {
27  :
28 
29  serializer_(selections),
30  maxEventSize_(ps.getUntrackedParameter<int>("max_event_size")),
31  useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
32  compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
33  lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
34  serializeDataBuffer_(),
35  hltsize_(0),
36  origSize_(0),
37  host_name_(),
38  hltTriggerSelections_(),
39  outputModuleId_(0) {
40  // no compression as default value - we need this!
41 
42  // test luminosity sections
43  struct timeval now;
44  struct timezone dummyTZ;
45  gettimeofday(&now, &dummyTZ);
46  timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0);
47 
48  if (useCompression_ == true) {
49  if (compressionLevel_ <= 0) {
50  FDEBUG(9) << "Compression Level = " << compressionLevel_ << " no compression" << std::endl;
52  useCompression_ = false;
53  } else if (compressionLevel_ > 9) {
54  FDEBUG(9) << "Compression Level = " << compressionLevel_ << " using max compression level 9" << std::endl;
56  }
57  }
59  int got_host = gethostname(host_name_, 255);
60  if (got_host != 0)
61  strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
62  //loadExtraClasses();
63 
64  // 25-Jan-2008, KAB - pull out the trigger selection request
65  // which we need for the INIT message
67  }
68 
70 
71  std::unique_ptr<InitMsgBuilder> StreamerOutputModuleCommon::serializeRegistry(const BranchIDLists& branchLists,
73  std::string const& processName,
74  std::string const& moduleLabel,
75  ParameterSetID const& toplevel) {
77 
78  // resize bufs_ to reflect space used in serializer_ + header
79  // I just added an overhead for header of 50000 for now
80  unsigned int src_size = serializeDataBuffer_.currentSpaceUsed();
81  unsigned int new_size = src_size + 50000;
82  if (serializeDataBuffer_.header_buf_.size() < new_size)
83  serializeDataBuffer_.header_buf_.resize(new_size);
84 
85  //Build the INIT Message
86  //Following values are strictly DUMMY and will be replaced
87  // once available with Utility function etc.
88  uint32 run = 1;
89 
90  //Get the Process PSet ID
91 
92  //In case we need to print it
93  // cms::Digest dig(toplevel.compactForm());
94  // cms::MD5Result r1 = dig.digest();
95  // std::string hexy = r1.toString();
96  // std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
97  Strings const& hltTriggerNames = edm::getAllTriggerNames();
98  hltsize_ = hltTriggerNames.size();
99 
100  //L1 stays dummy as of today
101  Strings l1_names; //3
102  l1_names.push_back("t1");
103  l1_names.push_back("t10");
104  l1_names.push_back("t2");
105 
106  //Setting the process name to HLT
107  uLong crc = crc32(0L, Z_NULL, 0);
108  Bytef const* buf = (Bytef const*)(moduleLabel.data());
109  crc = crc32(crc, buf, moduleLabel.length());
110  outputModuleId_ = static_cast<uint32>(crc);
111 
112  auto init_message = std::make_unique<InitMsgBuilder>(&serializeDataBuffer_.header_buf_[0],
114  run,
115  Version((uint8 const*)toplevel.compactForm().c_str()),
116  getReleaseVersion().c_str(),
117  processName.c_str(),
118  moduleLabel.c_str(),
120  hltTriggerNames,
122  l1_names,
124 
125  // copy data into the destination message
126  unsigned char* src = serializeDataBuffer_.bufferPointer();
127  std::copy(src, src + src_size, init_message->dataAddress());
128  init_message->setDataLength(src_size);
129  return init_message;
130  }
131 
134  std::vector<unsigned char>& hltbits) const {
135  hltbits.clear();
136 
137  std::vector<unsigned char> vHltState;
138 
139  if (triggerResults.isValid()) {
141  vHltState.push_back(((triggerResults->at(i)).state()));
142  }
143  } else {
144  // We fill all Trigger bits to valid state.
146  vHltState.push_back(hlt::Pass);
147  }
148  }
149 
150  //Pack into member hltbits
151  if (!vHltState.empty()) {
152  unsigned int packInOneByte = 4;
153  unsigned int sizeOfPackage = 1 + ((vHltState.size() - 1) / packInOneByte); //Two bits per HLT
154 
155  hltbits.resize(sizeOfPackage);
156  std::fill(hltbits.begin(), hltbits.end(), 0);
157 
158  for (std::vector<unsigned char>::size_type i = 0; i != vHltState.size(); ++i) {
159  unsigned int whichByte = i / packInOneByte;
160  unsigned int indxWithinByte = i % packInOneByte;
161  hltbits[whichByte] = hltbits[whichByte] | (vHltState[i] << (indxWithinByte * 2));
162  }
163  }
164 
165  //This is Just a printing code.
166  //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
167  //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
168  // printBits(hltbits_[i]);
169  //}
170  //std::cout << "\n";
171  }
172 
173  std::unique_ptr<EventMsgBuilder> StreamerOutputModuleCommon::serializeEvent(
174  EventForOutput const& e, Handle<TriggerResults> const& triggerResults, ParameterSetID const& selectorCfg) {
175  //Lets Build the Event Message first
176 
177  //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
178  // once figured out, there is no logic involved here.
179  std::vector<bool> l1bit = {true, true, false};
180  //End of dummy data
181 
182  std::vector<unsigned char> hltbits;
183  setHltMask(e, triggerResults, hltbits);
184 
185  uint32 lumi;
186  if (lumiSectionInterval_ == 0) {
187  lumi = e.luminosityBlock();
188  } else {
189  struct timeval now;
190  struct timezone dummyTZ;
191  gettimeofday(&now, &dummyTZ);
192  double timeInSec =
193  static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0) - timeInSecSinceUTC;
194  // what about overflows?
195  if (lumiSectionInterval_ > 0)
196  lumi = static_cast<uint32>(timeInSec / lumiSectionInterval_) + 1;
197  }
198 
200 
201  // resize bufs_ to reflect space used in serializer_ + header
202  // I just added an overhead for header of 50000 for now
203  unsigned int src_size = serializeDataBuffer_.currentSpaceUsed();
204  unsigned int new_size = src_size + 50000;
205  if (serializeDataBuffer_.bufs_.size() < new_size)
206  serializeDataBuffer_.bufs_.resize(new_size);
207 
208  auto msg = std::make_unique<EventMsgBuilder>(&serializeDataBuffer_.bufs_[0],
210  e.id().run(),
211  e.id().event(),
212  lumi,
214  0,
215  l1bit,
216  (uint8*)&hltbits[0],
217  hltsize_,
219  host_name_);
220  msg->setOrigDataSize(origSize_); // we need this set to zero
221 
222  // copy data into the destination message
223  // an alternative is to have serializer only to the serialization
224  // in serializeEvent, and then call a new member "getEventData" that
225  // takes the compression arguments and a place to put the data.
226  // This will require one less copy. The only catch is that the
227  // space provided in bufs_ should be at least the uncompressed
228  // size + overhead for header because we will not know the actual
229  // compressed size.
230 
231  unsigned char* src = serializeDataBuffer_.bufferPointer();
232  std::copy(src, src + src_size, msg->eventAddr());
233  msg->setEventLength(src_size);
234  if (useCompression_)
235  msg->setOrigDataSize(serializeDataBuffer_.currentEventSize());
236 
237  return msg;
238  }
239 
241  desc.addUntracked<int>("max_event_size", 7000000)
242  ->setComment("Starting size in bytes of the serialized event buffer.");
243  desc.addUntracked<bool>("use_compression", true)
244  ->setComment("If True, compression will be used to write streamer file.");
245  desc.addUntracked<int>("compression_level", 1)->setComment("ROOT compression level to use.");
246  desc.addUntracked<int>("lumiSection_interval", 0)
247  ->setComment(
248  "If 0, use lumi section number from event.\n"
249  "If not 0, the interval in seconds between fake lumi sections.");
250  }
251 } // namespace edm
RunNumber_t run() const
Definition: EventID.h:39
EventNumber_t event() const
Definition: EventID.h:41
EventID const & id() const
std::vector< BranchIDList > BranchIDLists
Definition: BranchIDList.h:19
Definition: helper.py:1
value_type compactForm() const
Definition: Hash.h:217
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
def copy(args, dbName)
EventSelector::Strings Strings
StreamerOutputModuleCommon(ParameterSet const &ps, SelectedProducts const *selections)
uint16_t size_type
std::vector< std::pair< BranchDescription const *, EDGetToken > > SelectedProducts
unsigned int currentSpaceUsed() const
accept
Definition: HLTenums.h:19
static void fillDescription(ParameterSetDescription &desc)
static std::vector< std::string > getEventSelectionVString(edm::ParameterSet const &pset)
unsigned char const * bufferPointer() const
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
bool isValid() const
Definition: HandleBase.h:74
unsigned int uint32
Definition: MsgTools.h:13
std::unique_ptr< InitMsgBuilder > serializeRegistry(BranchIDLists const &branchLists, ThinnedAssociationsHelper const &helper, std::string const &processName, std::string const &moduleLabel, ParameterSetID const &toplevel)
std::string getReleaseVersion()
void setHltMask(EventForOutput const &e, Handle< TriggerResults > const &triggerResults, std::vector< unsigned char > &hltbits) const
static std::string const triggerResults("TriggerResults")
tuple msg
Definition: mps_check.py:279
uint32_t adler32_chksum() const
unsigned char uint8
Definition: MsgTools.h:11
int serializeEvent(EventForOutput const &event, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer) const
HLT enums.
std::vector< std::string > const & getAllTriggerNames()
LuminosityBlockNumber_t luminosityBlock() const
unsigned int currentEventSize() const
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::unique_ptr< EventMsgBuilder > serializeEvent(EventForOutput const &e, Handle< TriggerResults > const &triggerResults, ParameterSetID const &selectorCfg)