CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamerOutputModuleBase.cc
Go to the documentation of this file.
1 
3 
13 //#include "FWCore/Utilities/interface/Digest.h"
17 
18 #include <iostream>
19 #include <memory>
20 #include <string>
21 #include <sys/time.h>
22 #include <unistd.h>
23 #include <vector>
24 #include "zlib.h"
25 
26 namespace {
27  //A utility function that packs bits from source into bytes, with
28  // packInOneByte as the numeber of bytes that are packed from source to dest.
29 /* inline void printBits(unsigned char c) {
30  for (int i = 7; i >= 0; --i) {
31  int bit = ((c >> i) & 1);
32  std::cout << " " << bit;
33  }
34  } */
35 
36  void packIntoString(std::vector<unsigned char> const& source,
37  std::vector<unsigned char>& package) {
38  if (source.size() < 1) {return;}
39  unsigned int packInOneByte = 4;
40  unsigned int sizeOfPackage = 1+((source.size()-1)/packInOneByte); //Two bits per HLT
41 
42  package.resize(sizeOfPackage);
43  memset(&package[0], 0x00, sizeOfPackage);
44 
45  for (std::vector<unsigned char>::size_type i=0; i != source.size() ; ++i) {
46  unsigned int whichByte = i/packInOneByte;
47  unsigned int indxWithinByte = i % packInOneByte;
48  package[whichByte] = package[whichByte] | (source[i] << (indxWithinByte*2));
49  }
50  //for (unsigned int i=0; i !=package.size() ; ++i)
51  // printBits(package[i]);
52  // std::cout << std::endl;
53 
54  }
55 }
56 
57 namespace edm {
60  one::OutputModule<one::WatchRuns, one::WatchLuminosityBlocks>(ps),
61  selections_(&keptProducts()[InEvent]),
62  maxEventSize_(ps.getUntrackedParameter<int>("max_event_size")),
63  useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
64  compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
65  lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
66  serializer_(selections_),
67  serializeDataBuffer_(),
68  hltsize_(0),
69  lumi_(0),
70  l1bit_(0),
71  hltbits_(0),
72  origSize_(0),
73  host_name_(),
74  trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
75  hltTriggerSelections_(),
76  outputModuleId_(0) {
77  // no compression as default value - we need this!
78 
79  // test luminosity sections
80  struct timeval now;
81  struct timezone dummyTZ;
82  gettimeofday(&now, &dummyTZ);
83  timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
84 
85  if(useCompression_ == true) {
86  if(compressionLevel_ <= 0) {
87  FDEBUG(9) << "Compression Level = " << compressionLevel_
88  << " no compression" << std::endl;
90  useCompression_ = false;
91  } else if(compressionLevel_ > 9) {
92  FDEBUG(9) << "Compression Level = " << compressionLevel_
93  << " using max compression level 9" << std::endl;
95  }
96  }
98  int got_host = gethostname(host_name_, 255);
99  if(got_host != 0) strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
100  //loadExtraClasses();
101 
102  // 25-Jan-2008, KAB - pull out the trigger selection request
103  // which we need for the INIT message
105  }
106 
108 
109  void
111  start();
112  std::auto_ptr<InitMsgBuilder> init_message = serializeRegistry();
113  doOutputHeader(*init_message);
114  }
115 
116  void
118  stop();
119  }
120 
121  void
123 
124  void
126  stop(); // for closing of files, notify storage manager, etc.
127  }
128 
129  void
131 
132  void
134 
135  void
137  std::auto_ptr<EventMsgBuilder> msg = serializeEvent(e, mcc);
138  doOutputEvent(*msg); // You can't use msg in StreamerOutputModuleBase after this point
139  }
140 
141  std::auto_ptr<InitMsgBuilder>
143 
145 
146  // resize bufs_ to reflect space used in serializer_ + header
147  // I just added an overhead for header of 50000 for now
148  unsigned int src_size = serializeDataBuffer_.currentSpaceUsed();
149  unsigned int new_size = src_size + 50000;
150  if(serializeDataBuffer_.header_buf_.size() < new_size) serializeDataBuffer_.header_buf_.resize(new_size);
151 
152  //Build the INIT Message
153  //Following values are strictly DUMMY and will be replaced
154  // once available with Utility function etc.
155  uint32 run = 1;
156 
157  //Get the Process PSet ID
159 
160  //In case we need to print it
161  // cms::Digest dig(toplevel.compactForm());
162  // cms::MD5Result r1 = dig.digest();
163  // std::string hexy = r1.toString();
164  // std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
165 
166  Strings hltTriggerNames = getAllTriggerNames();
167  hltsize_ = hltTriggerNames.size();
168 
169  //L1 stays dummy as of today
170  Strings l1_names; //3
171  l1_names.push_back("t1");
172  l1_names.push_back("t10");
173  l1_names.push_back("t2");
174 
175  //Setting the process name to HLT
177 
178  std::string moduleLabel = description().moduleLabel();
179  uLong crc = crc32(0L, Z_NULL, 0);
180  Bytef const* buf = (Bytef const*)(moduleLabel.data());
181  crc = crc32(crc, buf, moduleLabel.length());
182  outputModuleId_ = static_cast<uint32>(crc);
183 
184  std::auto_ptr<InitMsgBuilder> init_message(
186  run, Version((uint8 const*)toplevel.compactForm().c_str()),
187  getReleaseVersion().c_str() , processName.c_str(),
188  moduleLabel.c_str(), outputModuleId_,
189  hltTriggerNames, hltTriggerSelections_, l1_names,
191 
192  // copy data into the destination message
193  unsigned char* src = serializeDataBuffer_.bufferPointer();
194  std::copy(src, src + src_size, init_message->dataAddress());
195  init_message->setDataLength(src_size);
196  return init_message;
197  }
198 
199  Trig
201  //This cast is safe since we only call const functions of the EventPrincipal after this point
202  PrincipalGetAdapter adapter(const_cast<EventPrincipal&>(ep), moduleDescription());
203  adapter.setConsumer(this);
204  Trig result;
205  auto bh = adapter.getByToken_(TypeID(typeid(TriggerResults)),PRODUCT_TYPE, token, mcc);
206  convert_handle(std::move(bh), result);
207  return result;
208  }
209 
210  void
212 
213  hltbits_.clear(); // If there was something left over from last event
214 
216  //Trig const& prod = getTrigMask(e);
217  std::vector<unsigned char> vHltState;
218 
219  if (prod.isValid()) {
221  vHltState.push_back(((prod->at(i)).state()));
222  }
223  } else {
224  // We fill all Trigger bits to valid state.
226  vHltState.push_back(hlt::Pass);
227  }
228  }
229  //Pack into member hltbits_
230  packIntoString(vHltState, hltbits_);
231 
232  //This is Just a printing code.
233  //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
234  //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
235  // printBits(hltbits_[i]);
236  //}
237  //std::cout << "\n";
238  }
239 
240 // test luminosity sections
241  void
243  struct timeval now;
244  struct timezone dummyTZ;
245  gettimeofday(&now, &dummyTZ);
246  double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC;
247  // what about overflows?
248  if(lumiSectionInterval_ > 0) lumi_ = static_cast<uint32>(timeInSec/lumiSectionInterval_) + 1;
249  }
250 
251  std::auto_ptr<EventMsgBuilder>
253  //Lets Build the Event Message first
254 
255  //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
256  // once figured out, there is no logic involved here.
257  l1bit_.push_back(true);
258  l1bit_.push_back(true);
259  l1bit_.push_back(false);
260  //End of dummy data
261 
262  setHltMask(e, mcc);
263 
264  if (lumiSectionInterval_ == 0) {
265  lumi_ = e.luminosityBlock();
266  } else {
267  setLumiSection();
268  }
269 
271 
272  // resize bufs_ to reflect space used in serializer_ + header
273  // I just added an overhead for header of 50000 for now
274  unsigned int src_size = serializeDataBuffer_.currentSpaceUsed();
275  unsigned int new_size = src_size + 50000;
276  if(serializeDataBuffer_.bufs_.size() < new_size) serializeDataBuffer_.bufs_.resize(new_size);
277 
278  std::auto_ptr<EventMsgBuilder>
280  e.id().event(), lumi_, outputModuleId_, 0,
281  l1bit_, (uint8*)&hltbits_[0], hltsize_,
283  msg->setOrigDataSize(origSize_); // we need this set to zero
284 
285  // copy data into the destination message
286  // an alternative is to have serializer only to the serialization
287  // in serializeEvent, and then call a new member "getEventData" that
288  // takes the compression arguments and a place to put the data.
289  // This will require one less copy. The only catch is that the
290  // space provided in bufs_ should be at least the uncompressed
291  // size + overhead for header because we will not know the actual
292  // compressed size.
293 
294  unsigned char* src = serializeDataBuffer_.bufferPointer();
295  std::copy(src,src + src_size, msg->eventAddr());
296  msg->setEventLength(src_size);
297  if(useCompression_) msg->setOrigDataSize(serializeDataBuffer_.currentEventSize());
298 
299  l1bit_.clear(); //Clear up for the next event to come.
300  return msg;
301  }
302 
303  void
305  desc.addUntracked<int>("max_event_size", 7000000)
306  ->setComment("Starting size in bytes of the serialized event buffer.");
307  desc.addUntracked<bool>("use_compression", true)
308  ->setComment("If True, compression will be used to write streamer file.");
309  desc.addUntracked<int>("compression_level", 1)
310  ->setComment("ROOT compression level to use.");
311  desc.addUntracked<int>("lumiSection_interval", 0)
312  ->setComment("If 0, use lumi section number from event.\n"
313  "If not 0, the interval in seconds between fake lumi sections.");
315  }
316 } // end of namespace-edm
BranchIDLists const * branchIDLists() const
RunNumber_t run() const
Definition: EventID.h:39
ParameterSetID getProcessParameterSetID()
Definition: Registry.cc:66
BasicHandle getByToken_(TypeID const &id, KindOfType kindOfType, EDGetToken token, ModuleCallingContext const *mcc) const
EventNumber_t event() const
Definition: EventID.h:41
void setConsumer(EDConsumerBase const *iConsumer)
Trig getTriggerResults(EDGetTokenT< TriggerResults > const &token, EventPrincipal const &ep, ModuleCallingContext const *) const
static void fillDescription(ParameterSetDescription &desc)
int i
Definition: DBlmapReader.cc:9
ModuleDescription const & description() const
value_type compactForm() const
Definition: Hash.h:213
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
virtual void doOutputHeader(InitMsgBuilder const &init_message) const =0
std::vector< std::string > Strings
Definition: MsgTools.h:18
EventID const & id() const
std::string const & processName() const
Definition: OutputModule.h:76
std::auto_ptr< InitMsgBuilder > serializeRegistry()
LuminosityBlockNumber_t luminosityBlock() const
int serializeEvent(EventPrincipal const &eventPrincipal, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer, ModuleCallingContext const *mcc)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
virtual void doOutputEvent(EventMsgBuilder const &msg) const =0
ParameterSetID selectorConfig() const
std::string const & moduleLabel() const
uint16_t size_type
virtual void stop() const =0
unsigned int currentSpaceUsed() const
virtual void writeRun(RunPrincipal const &, ModuleCallingContext const *) override
accept
Definition: HLTenums.h:19
static std::vector< std::string > getEventSelectionVString(edm::ParameterSet const &pset)
tuple result
Definition: query.py:137
virtual void writeLuminosityBlock(LuminosityBlockPrincipal const &, ModuleCallingContext const *) override
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
bool isValid() const
Definition: HandleBase.h:75
virtual void write(EventPrincipal const &e, ModuleCallingContext const *) override
virtual void start() const =0
unsigned int uint32
Definition: MsgTools.h:13
void setHltMask(EventPrincipal const &e, ModuleCallingContext const *)
std::string getReleaseVersion()
void convert_handle(BasicHandle &&bh, Handle< T > &result)
Definition: ConvertHandle.h:20
StreamerOutputModuleBase(ParameterSet const &ps)
const ModuleDescription & moduleDescription() const
unsigned char * bufferPointer() const
edm::EDGetTokenT< edm::TriggerResults > trToken_
static void fillDescription(ParameterSetDescription &desc)
uint32_t adler32_chksum() const
std::string const & processName() const
std::auto_ptr< EventMsgBuilder > serializeEvent(EventPrincipal const &e, ModuleCallingContext const *mcc)
virtual void beginRun(RunPrincipal const &, ModuleCallingContext const *) override
unsigned char uint8
Definition: MsgTools.h:11
std::vector< std::string > const & getAllTriggerNames()
virtual void endRun(RunPrincipal const &, ModuleCallingContext const *) override
unsigned int currentEventSize() const
static std::string const source
Definition: EdmProvDump.cc:42
std::vector< unsigned char > hltbits_