CMS 3D CMS Logo

StreamerOutputModuleBase.cc
Go to the documentation of this file.
1 
3 
11 //#include "FWCore/Utilities/interface/Digest.h"
16 
17 #include <iostream>
18 #include <memory>
19 #include <string>
20 #include <sys/time.h>
21 #include <unistd.h>
22 #include <vector>
23 #include "zlib.h"
24 
25 namespace {
26  //A utility function that packs bits from source into bytes, with
27  // packInOneByte as the numeber of bytes that are packed from source to dest.
28 /* inline void printBits(unsigned char c) {
29  for (int i = 7; i >= 0; --i) {
30  int bit = ((c >> i) & 1);
31  std::cout << " " << bit;
32  }
33  } */
34 
35  void packIntoString(std::vector<unsigned char> const& source,
36  std::vector<unsigned char>& package) {
37  if (source.empty()) {return;}
38  unsigned int packInOneByte = 4;
39  unsigned int sizeOfPackage = 1+((source.size()-1)/packInOneByte); //Two bits per HLT
40 
41  package.resize(sizeOfPackage);
42  memset(&package[0], 0x00, sizeOfPackage);
43 
44  for (std::vector<unsigned char>::size_type i=0; i != source.size() ; ++i) {
45  unsigned int whichByte = i/packInOneByte;
46  unsigned int indxWithinByte = i % packInOneByte;
47  package[whichByte] = package[whichByte] | (source[i] << (indxWithinByte*2));
48  }
49  //for (unsigned int i=0; i !=package.size() ; ++i)
50  // printBits(package[i]);
51  // std::cout << std::endl;
52 
53  }
54 }
55 
56 namespace edm {
59  one::OutputModule<one::WatchRuns, one::WatchLuminosityBlocks>(ps),
60  selections_(&keptProducts()[InEvent]),
61  maxEventSize_(ps.getUntrackedParameter<int>("max_event_size")),
62  useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
63  compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
64  lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
65  serializer_(selections_),
66  serializeDataBuffer_(),
67  hltsize_(0),
68  lumi_(0),
69  l1bit_(0),
70  hltbits_(0),
71  origSize_(0),
72  host_name_(),
73  trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
74  hltTriggerSelections_(),
75  outputModuleId_(0) {
76  // no compression as default value - we need this!
77 
78  // test luminosity sections
79  struct timeval now;
80  struct timezone dummyTZ;
81  gettimeofday(&now, &dummyTZ);
82  timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
83 
84  if(useCompression_ == true) {
85  if(compressionLevel_ <= 0) {
86  FDEBUG(9) << "Compression Level = " << compressionLevel_
87  << " no compression" << std::endl;
89  useCompression_ = false;
90  } else if(compressionLevel_ > 9) {
91  FDEBUG(9) << "Compression Level = " << compressionLevel_
92  << " using max compression level 9" << std::endl;
94  }
95  }
97  int got_host = gethostname(host_name_, 255);
98  if(got_host != 0) strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
99  //loadExtraClasses();
100 
101  // 25-Jan-2008, KAB - pull out the trigger selection request
102  // which we need for the INIT message
104  }
105 
107 
108  void
110  start();
111  std::unique_ptr<InitMsgBuilder> init_message = serializeRegistry();
112  doOutputHeader(*init_message);
114  serializeDataBuffer_.header_buf_.shrink_to_fit();
115  }
116 
117  void
119  stop();
120  }
121 
122  void
124 
125  void
127  stop(); // for closing of files, notify storage manager, etc.
128  }
129 
130  void
132 
133  void
135 
136  void
138  std::unique_ptr<EventMsgBuilder> msg = serializeEvent(e);
139  doOutputEvent(*msg); // You can't use msg in StreamerOutputModuleBase after this point
140  }
141 
142  std::unique_ptr<InitMsgBuilder>
144 
146 
147  // resize bufs_ to reflect space used in serializer_ + header
148  // I just added an overhead for header of 50000 for now
149  unsigned int src_size = serializeDataBuffer_.currentSpaceUsed();
150  unsigned int new_size = src_size + 50000;
151  if(serializeDataBuffer_.header_buf_.size() < new_size) serializeDataBuffer_.header_buf_.resize(new_size);
152 
153  //Build the INIT Message
154  //Following values are strictly DUMMY and will be replaced
155  // once available with Utility function etc.
156  uint32 run = 1;
157 
158  //Get the Process PSet ID
160 
161  //In case we need to print it
162  // cms::Digest dig(toplevel.compactForm());
163  // cms::MD5Result r1 = dig.digest();
164  // std::string hexy = r1.toString();
165  // std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
166 
167  Strings hltTriggerNames = getAllTriggerNames();
168  hltsize_ = hltTriggerNames.size();
169 
170  //L1 stays dummy as of today
171  Strings l1_names; //3
172  l1_names.push_back("t1");
173  l1_names.push_back("t10");
174  l1_names.push_back("t2");
175 
176  //Setting the process name to HLT
178 
179  std::string moduleLabel = description().moduleLabel();
180  uLong crc = crc32(0L, Z_NULL, 0);
181  Bytef const* buf = (Bytef const*)(moduleLabel.data());
182  crc = crc32(crc, buf, moduleLabel.length());
183  outputModuleId_ = static_cast<uint32>(crc);
184 
185  auto init_message = std::make_unique<InitMsgBuilder>(
187  run, Version((uint8 const*)toplevel.compactForm().c_str()),
188  getReleaseVersion().c_str() , processName.c_str(),
189  moduleLabel.c_str(), outputModuleId_,
190  hltTriggerNames, hltTriggerSelections_, l1_names,
192 
193  // copy data into the destination message
194  unsigned char* src = serializeDataBuffer_.bufferPointer();
195  std::copy(src, src + src_size, init_message->dataAddress());
196  init_message->setDataLength(src_size);
197  return init_message;
198  }
199 
200  Trig
202  Trig result;
203  e.getByToken<TriggerResults>(token, result);
204  return result;
205  }
206 
207  void
209 
210  hltbits_.clear(); // If there was something left over from last event
211 
213  //Trig const& prod = getTrigMask(e);
214  std::vector<unsigned char> vHltState;
215 
216  if (prod.isValid()) {
218  vHltState.push_back(((prod->at(i)).state()));
219  }
220  } else {
221  // We fill all Trigger bits to valid state.
223  vHltState.push_back(hlt::Pass);
224  }
225  }
226  //Pack into member hltbits_
227  packIntoString(vHltState, hltbits_);
228 
229  //This is Just a printing code.
230  //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
231  //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
232  // printBits(hltbits_[i]);
233  //}
234  //std::cout << "\n";
235  }
236 
237 // test luminosity sections
238  void
240  struct timeval now;
241  struct timezone dummyTZ;
242  gettimeofday(&now, &dummyTZ);
243  double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC;
244  // what about overflows?
245  if(lumiSectionInterval_ > 0) lumi_ = static_cast<uint32>(timeInSec/lumiSectionInterval_) + 1;
246  }
247 
248  std::unique_ptr<EventMsgBuilder>
250  //Lets Build the Event Message first
251 
252  //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
253  // once figured out, there is no logic involved here.
254  l1bit_.push_back(true);
255  l1bit_.push_back(true);
256  l1bit_.push_back(false);
257  //End of dummy data
258 
259  setHltMask(e);
260 
261  if (lumiSectionInterval_ == 0) {
262  lumi_ = e.luminosityBlock();
263  } else {
264  setLumiSection();
265  }
266 
268 
269  // resize bufs_ to reflect space used in serializer_ + header
270  // I just added an overhead for header of 50000 for now
271  unsigned int src_size = serializeDataBuffer_.currentSpaceUsed();
272  unsigned int new_size = src_size + 50000;
273  if(serializeDataBuffer_.bufs_.size() < new_size) serializeDataBuffer_.bufs_.resize(new_size);
274 
275  auto msg = std::make_unique<EventMsgBuilder>(
277  e.id().event(), lumi_, outputModuleId_, 0,
278  l1bit_, (uint8*)&hltbits_[0], hltsize_,
280  msg->setOrigDataSize(origSize_); // we need this set to zero
281 
282  // copy data into the destination message
283  // an alternative is to have serializer only to the serialization
284  // in serializeEvent, and then call a new member "getEventData" that
285  // takes the compression arguments and a place to put the data.
286  // This will require one less copy. The only catch is that the
287  // space provided in bufs_ should be at least the uncompressed
288  // size + overhead for header because we will not know the actual
289  // compressed size.
290 
291  unsigned char* src = serializeDataBuffer_.bufferPointer();
292  std::copy(src,src + src_size, msg->eventAddr());
293  msg->setEventLength(src_size);
295 
296  l1bit_.clear(); //Clear up for the next event to come.
297  return msg;
298  }
299 
300  void
302  desc.addUntracked<int>("max_event_size", 7000000)
303  ->setComment("Starting size in bytes of the serialized event buffer.");
304  desc.addUntracked<bool>("use_compression", true)
305  ->setComment("If True, compression will be used to write streamer file.");
306  desc.addUntracked<int>("compression_level", 1)
307  ->setComment("ROOT compression level to use.");
308  desc.addUntracked<int>("lumiSection_interval", 0)
309  ->setComment("If 0, use lumi section number from event.\n"
310  "If not 0, the interval in seconds between fake lumi sections.");
312  }
313 } // end of namespace-edm
RunNumber_t run() const
Definition: EventID.h:39
std::unique_ptr< EventMsgBuilder > serializeEvent(EventForOutput const &e)
ParameterSetID const & mainParameterSetID() const
EventNumber_t event() const
Definition: EventID.h:41
static void fillDescription(ParameterSetDescription &desc)
EventID const & id() const
ModuleDescription const & description() const
value_type compactForm() const
Definition: Hash.h:217
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
def copy(args, dbName)
Trig getTriggerResults(EDGetTokenT< TriggerResults > const &token, EventForOutput const &e) const
virtual void doOutputHeader(InitMsgBuilder const &init_message)=0
EventSelector::Strings Strings
void endRun(RunForOutput const &) override
std::string const & processName() const
Definition: OutputModule.h:75
void write(EventForOutput const &e) override
#define FDEBUG(lev)
Definition: DebugMacros.h:20
ParameterSetID selectorConfig() const
std::string const & moduleLabel() const
uint16_t size_type
unsigned int currentSpaceUsed() const
bool getByToken(EDGetToken token, TypeID const &typeID, BasicHandle &result) const
accept
Definition: HLTenums.h:19
void beginRun(RunForOutput const &) override
void writeLuminosityBlock(LuminosityBlockForOutput const &) override
static void fillDescription(ParameterSetDescription &desc, std::vector< std::string > const &iDefaultOutputCommands=ProductSelectorRules::defaultSelectionStrings())
static std::vector< std::string > getEventSelectionVString(edm::ParameterSet const &pset)
unsigned char const * bufferPointer() const
virtual void doOutputEvent(EventMsgBuilder const &msg)=0
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
bool isValid() const
Definition: HandleBase.h:74
unsigned int uint32
Definition: MsgTools.h:13
std::string getReleaseVersion()
StreamerOutputModuleBase(ParameterSet const &ps)
const ModuleDescription & moduleDescription() const
std::unique_ptr< InitMsgBuilder > serializeRegistry()
edm::EDGetTokenT< edm::TriggerResults > trToken_
tuple msg
Definition: mps_check.py:277
void setHltMask(EventForOutput const &e)
void writeRun(RunForOutput const &) override
uint32_t adler32_chksum() const
std::string const & processName() const
int serializeEvent(EventForOutput const &event, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer)
unsigned char uint8
Definition: MsgTools.h:11
HLT enums.
std::vector< std::string > const & getAllTriggerNames()
LuminosityBlockNumber_t luminosityBlock() const
unsigned int currentEventSize() const
static std::string const source
Definition: EdmProvDump.cc:43
std::vector< unsigned char > hltbits_
BranchIDLists const * branchIDLists()