CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamerOutputModuleBase.cc
Go to the documentation of this file.
1 
3 
13 //#include "FWCore/Utilities/interface/Digest.h"
17 
18 #include <iostream>
19 #include <memory>
20 #include <string>
21 #include <sys/time.h>
22 #include <unistd.h>
23 #include <vector>
24 #include "zlib.h"
25 
26 namespace {
27  //A utility function that packs bits from source into bytes, with
28  // packInOneByte as the numeber of bytes that are packed from source to dest.
29 /* inline void printBits(unsigned char c) {
30  for (int i = 7; i >= 0; --i) {
31  int bit = ((c >> i) & 1);
32  std::cout << " " << bit;
33  }
34  } */
35 
36  void packIntoString(std::vector<unsigned char> const& source,
37  std::vector<unsigned char>& package) {
38  if (source.size() < 1) {return;}
39  unsigned int packInOneByte = 4;
40  unsigned int sizeOfPackage = 1+((source.size()-1)/packInOneByte); //Two bits per HLT
41 
42  package.resize(sizeOfPackage);
43  memset(&package[0], 0x00, sizeOfPackage);
44 
45  for (std::vector<unsigned char>::size_type i=0; i != source.size() ; ++i) {
46  unsigned int whichByte = i/packInOneByte;
47  unsigned int indxWithinByte = i % packInOneByte;
48  package[whichByte] = package[whichByte] | (source[i] << (indxWithinByte*2));
49  }
50  //for (unsigned int i=0; i !=package.size() ; ++i)
51  // printBits(package[i]);
52  // std::cout << std::endl;
53 
54  }
55 }
56 
57 namespace edm {
60  one::OutputModule<one::WatchRuns, one::WatchLuminosityBlocks>(ps),
61  selections_(&keptProducts()[InEvent]),
62  maxEventSize_(ps.getUntrackedParameter<int>("max_event_size")),
63  useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
64  compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
65  lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
66  serializer_(selections_),
67  serializeDataBuffer_(),
68  hltsize_(0),
69  lumi_(0),
70  l1bit_(0),
71  hltbits_(0),
72  origSize_(0),
73  host_name_(),
74  trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
75  hltTriggerSelections_(),
76  outputModuleId_(0) {
77  // no compression as default value - we need this!
78 
79  // test luminosity sections
80  struct timeval now;
81  struct timezone dummyTZ;
82  gettimeofday(&now, &dummyTZ);
83  timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
84 
85  if(useCompression_ == true) {
86  if(compressionLevel_ <= 0) {
87  FDEBUG(9) << "Compression Level = " << compressionLevel_
88  << " no compression" << std::endl;
90  useCompression_ = false;
91  } else if(compressionLevel_ > 9) {
92  FDEBUG(9) << "Compression Level = " << compressionLevel_
93  << " using max compression level 9" << std::endl;
95  }
96  }
98  int got_host = gethostname(host_name_, 255);
99  if(got_host != 0) strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
100  //loadExtraClasses();
101 
102  // 25-Jan-2008, KAB - pull out the trigger selection request
103  // which we need for the INIT message
105  }
106 
108 
109  void
111  start();
112  std::auto_ptr<InitMsgBuilder> init_message = serializeRegistry();
113  doOutputHeader(*init_message);
115  serializeDataBuffer_.header_buf_.shrink_to_fit();
116  }
117 
118  void
120  stop();
121  }
122 
123  void
125 
126  void
128  stop(); // for closing of files, notify storage manager, etc.
129  }
130 
131  void
133 
134  void
136 
137  void
139  std::auto_ptr<EventMsgBuilder> msg = serializeEvent(e, mcc);
140  doOutputEvent(*msg); // You can't use msg in StreamerOutputModuleBase after this point
141  }
142 
143  std::auto_ptr<InitMsgBuilder>
145 
147 
148  // resize bufs_ to reflect space used in serializer_ + header
149  // I just added an overhead for header of 50000 for now
150  unsigned int src_size = serializeDataBuffer_.currentSpaceUsed();
151  unsigned int new_size = src_size + 50000;
152  if(serializeDataBuffer_.header_buf_.size() < new_size) serializeDataBuffer_.header_buf_.resize(new_size);
153 
154  //Build the INIT Message
155  //Following values are strictly DUMMY and will be replaced
156  // once available with Utility function etc.
157  uint32 run = 1;
158 
159  //Get the Process PSet ID
161 
162  //In case we need to print it
163  // cms::Digest dig(toplevel.compactForm());
164  // cms::MD5Result r1 = dig.digest();
165  // std::string hexy = r1.toString();
166  // std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
167 
168  Strings hltTriggerNames = getAllTriggerNames();
169  hltsize_ = hltTriggerNames.size();
170 
171  //L1 stays dummy as of today
172  Strings l1_names; //3
173  l1_names.push_back("t1");
174  l1_names.push_back("t10");
175  l1_names.push_back("t2");
176 
177  //Setting the process name to HLT
179 
180  std::string moduleLabel = description().moduleLabel();
181  uLong crc = crc32(0L, Z_NULL, 0);
182  Bytef const* buf = (Bytef const*)(moduleLabel.data());
183  crc = crc32(crc, buf, moduleLabel.length());
184  outputModuleId_ = static_cast<uint32>(crc);
185 
186  std::auto_ptr<InitMsgBuilder> init_message(
188  run, Version((uint8 const*)toplevel.compactForm().c_str()),
189  getReleaseVersion().c_str() , processName.c_str(),
190  moduleLabel.c_str(), outputModuleId_,
191  hltTriggerNames, hltTriggerSelections_, l1_names,
193 
194  // copy data into the destination message
195  unsigned char* src = serializeDataBuffer_.bufferPointer();
196  std::copy(src, src + src_size, init_message->dataAddress());
197  init_message->setDataLength(src_size);
198  return init_message;
199  }
200 
201  Trig
203  //This cast is safe since we only call const functions of the EventPrincipal after this point
204  PrincipalGetAdapter adapter(const_cast<EventPrincipal&>(ep), moduleDescription());
205  adapter.setConsumer(this);
206  Trig result;
207  auto bh = adapter.getByToken_(TypeID(typeid(TriggerResults)),PRODUCT_TYPE, token, mcc);
208  convert_handle(std::move(bh), result);
209  return result;
210  }
211 
212  void
214 
215  hltbits_.clear(); // If there was something left over from last event
216 
218  //Trig const& prod = getTrigMask(e);
219  std::vector<unsigned char> vHltState;
220 
221  if (prod.isValid()) {
223  vHltState.push_back(((prod->at(i)).state()));
224  }
225  } else {
226  // We fill all Trigger bits to valid state.
228  vHltState.push_back(hlt::Pass);
229  }
230  }
231  //Pack into member hltbits_
232  packIntoString(vHltState, hltbits_);
233 
234  //This is Just a printing code.
235  //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
236  //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
237  // printBits(hltbits_[i]);
238  //}
239  //std::cout << "\n";
240  }
241 
242 // test luminosity sections
243  void
245  struct timeval now;
246  struct timezone dummyTZ;
247  gettimeofday(&now, &dummyTZ);
248  double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC;
249  // what about overflows?
250  if(lumiSectionInterval_ > 0) lumi_ = static_cast<uint32>(timeInSec/lumiSectionInterval_) + 1;
251  }
252 
253  std::auto_ptr<EventMsgBuilder>
255  //Lets Build the Event Message first
256 
257  //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
258  // once figured out, there is no logic involved here.
259  l1bit_.push_back(true);
260  l1bit_.push_back(true);
261  l1bit_.push_back(false);
262  //End of dummy data
263 
264  setHltMask(e, mcc);
265 
266  if (lumiSectionInterval_ == 0) {
267  lumi_ = e.luminosityBlock();
268  } else {
269  setLumiSection();
270  }
271 
273 
274  // resize bufs_ to reflect space used in serializer_ + header
275  // I just added an overhead for header of 50000 for now
276  unsigned int src_size = serializeDataBuffer_.currentSpaceUsed();
277  unsigned int new_size = src_size + 50000;
278  if(serializeDataBuffer_.bufs_.size() < new_size) serializeDataBuffer_.bufs_.resize(new_size);
279 
280  std::auto_ptr<EventMsgBuilder>
282  e.id().event(), lumi_, outputModuleId_, 0,
283  l1bit_, (uint8*)&hltbits_[0], hltsize_,
285  msg->setOrigDataSize(origSize_); // we need this set to zero
286 
287  // copy data into the destination message
288  // an alternative is to have serializer only to the serialization
289  // in serializeEvent, and then call a new member "getEventData" that
290  // takes the compression arguments and a place to put the data.
291  // This will require one less copy. The only catch is that the
292  // space provided in bufs_ should be at least the uncompressed
293  // size + overhead for header because we will not know the actual
294  // compressed size.
295 
296  unsigned char* src = serializeDataBuffer_.bufferPointer();
297  std::copy(src,src + src_size, msg->eventAddr());
298  msg->setEventLength(src_size);
299  if(useCompression_) msg->setOrigDataSize(serializeDataBuffer_.currentEventSize());
300 
301  l1bit_.clear(); //Clear up for the next event to come.
302  return msg;
303  }
304 
305  void
307  desc.addUntracked<int>("max_event_size", 7000000)
308  ->setComment("Starting size in bytes of the serialized event buffer.");
309  desc.addUntracked<bool>("use_compression", true)
310  ->setComment("If True, compression will be used to write streamer file.");
311  desc.addUntracked<int>("compression_level", 1)
312  ->setComment("ROOT compression level to use.");
313  desc.addUntracked<int>("lumiSection_interval", 0)
314  ->setComment("If 0, use lumi section number from event.\n"
315  "If not 0, the interval in seconds between fake lumi sections.");
317  }
318 } // end of namespace-edm
BranchIDLists const * branchIDLists() const
RunNumber_t run() const
Definition: EventID.h:39
ParameterSetID getProcessParameterSetID()
Definition: Registry.cc:66
BasicHandle getByToken_(TypeID const &id, KindOfType kindOfType, EDGetToken token, ModuleCallingContext const *mcc) const
EventNumber_t event() const
Definition: EventID.h:41
void setConsumer(EDConsumerBase const *iConsumer)
Trig getTriggerResults(EDGetTokenT< TriggerResults > const &token, EventPrincipal const &ep, ModuleCallingContext const *) const
static void fillDescription(ParameterSetDescription &desc)
int i
Definition: DBlmapReader.cc:9
ModuleDescription const & description() const
value_type compactForm() const
Definition: Hash.h:213
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
virtual void doOutputHeader(InitMsgBuilder const &init_message) const =0
std::vector< std::string > Strings
Definition: MsgTools.h:18
EventID const & id() const
std::string const & processName() const
Definition: OutputModule.h:76
std::auto_ptr< InitMsgBuilder > serializeRegistry()
LuminosityBlockNumber_t luminosityBlock() const
int serializeEvent(EventPrincipal const &eventPrincipal, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer, ModuleCallingContext const *mcc)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
virtual void doOutputEvent(EventMsgBuilder const &msg) const =0
ParameterSetID selectorConfig() const
std::string const & moduleLabel() const
uint16_t size_type
virtual void stop() const =0
unsigned int currentSpaceUsed() const
virtual void writeRun(RunPrincipal const &, ModuleCallingContext const *) override
accept
Definition: HLTenums.h:19
static std::vector< std::string > getEventSelectionVString(edm::ParameterSet const &pset)
tuple result
Definition: query.py:137
virtual void writeLuminosityBlock(LuminosityBlockPrincipal const &, ModuleCallingContext const *) override
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
bool isValid() const
Definition: HandleBase.h:75
virtual void write(EventPrincipal const &e, ModuleCallingContext const *) override
virtual void start() const =0
unsigned int uint32
Definition: MsgTools.h:13
void setHltMask(EventPrincipal const &e, ModuleCallingContext const *)
std::string getReleaseVersion()
void convert_handle(BasicHandle &&bh, Handle< T > &result)
Definition: ConvertHandle.h:20
StreamerOutputModuleBase(ParameterSet const &ps)
const ModuleDescription & moduleDescription() const
unsigned char * bufferPointer() const
edm::EDGetTokenT< edm::TriggerResults > trToken_
static void fillDescription(ParameterSetDescription &desc)
uint32_t adler32_chksum() const
std::string const & processName() const
std::auto_ptr< EventMsgBuilder > serializeEvent(EventPrincipal const &e, ModuleCallingContext const *mcc)
virtual void beginRun(RunPrincipal const &, ModuleCallingContext const *) override
unsigned char uint8
Definition: MsgTools.h:11
std::vector< std::string > const & getAllTriggerNames()
virtual void endRun(RunPrincipal const &, ModuleCallingContext const *) override
unsigned int currentEventSize() const
static std::string const source
Definition: EdmProvDump.cc:42
std::vector< unsigned char > hltbits_