CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamerOutputModuleBase.cc
Go to the documentation of this file.
1 
3 
13 //#include "FWCore/Utilities/interface/Digest.h"
17 
18 #include <string>
19 #include <unistd.h>
20 #include "zlib.h"
21 
23 
24 namespace {
25  //A utility function that packs bits from source into bytes, with
26  // packInOneByte as the numeber of bytes that are packed from source to dest.
27  void printBits(unsigned char c) {
28  for (int i = 7; i >= 0; --i) {
29  int bit = ((c >> i) & 1);
30  std::cout << " " << bit;
31  }
32  }
33 
34  void packIntoString(std::vector<unsigned char> const& source,
35  std::vector<unsigned char>& package) {
36  if (source.size() < 1) {return;}
37  unsigned int packInOneByte = 4;
38  unsigned int sizeOfPackage = 1+((source.size()-1)/packInOneByte); //Two bits per HLT
39 
40  package.resize(sizeOfPackage);
41  memset(&package[0], 0x00, sizeOfPackage);
42 
43  for (std::vector<unsigned char>::size_type i=0; i != source.size() ; ++i) {
44  unsigned int whichByte = i/packInOneByte;
45  unsigned int indxWithinByte = i % packInOneByte;
46  package[whichByte] = package[whichByte] | (source[i] << (indxWithinByte*2));
47  }
48  //for (unsigned int i=0; i !=package.size() ; ++i)
49  // printBits(package[i]);
50  // std::cout << std::endl;
51 
52  }
53 }
54 
55 namespace edm {
57  OutputModule(ps),
58  selections_(&keptProducts()[InEvent]),
59  maxEventSize_(ps.getUntrackedParameter<int>("max_event_size")),
60  useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
61  compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
62  lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
63  serializer_(selections_),
64  hltsize_(0),
65  lumi_(0),
66  l1bit_(0),
67  hltbits_(0),
68  origSize_(0),
69  host_name_(),
70  hltTriggerSelections_(),
71  outputModuleId_(0) {
72  // no compression as default value - we need this!
73 
74  // test luminosity sections
75  struct timeval now;
76  struct timezone dummyTZ;
77  gettimeofday(&now, &dummyTZ);
78  timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
79 
80  if(useCompression_ == true) {
81  if(compressionLevel_ <= 0) {
82  FDEBUG(9) << "Compression Level = " << compressionLevel_
83  << " no compression" << std::endl;
85  useCompression_ = false;
86  } else if(compressionLevel_ > 9) {
87  FDEBUG(9) << "Compression Level = " << compressionLevel_
88  << " using max compression level 9" << std::endl;
90  }
91  }
92  serialize_databuffer.bufs_.resize(maxEventSize_);
93  int got_host = gethostname(host_name_, 255);
94  if(got_host != 0) strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
95  //loadExtraClasses();
96  // do the line below instead of loadExtraClasses() to avoid Root errors
98 
99  // 25-Jan-2008, KAB - pull out the trigger selection request
100  // which we need for the INIT message
102  }
103 
105 
106  void
108  start();
109  std::auto_ptr<InitMsgBuilder> init_message = serializeRegistry();
110  doOutputHeader(*init_message);
111  }
112 
113  void
115  stop();
116  }
117 
118  void
120 
121  void
123  stop(); // for closing of files, notify storage manager, etc.
124  }
125 
126  void
128 
129  void
131 
132  void
134  std::auto_ptr<EventMsgBuilder> msg = serializeEvent(e);
135  doOutputEvent(*msg); // You can't use msg in StreamerOutputModuleBase after this point
136  }
137 
138  std::auto_ptr<InitMsgBuilder>
140 
141  serializer_.serializeRegistry(serialize_databuffer);
142 
143  // resize bufs_ to reflect space used in serializer_ + header
144  // I just added an overhead for header of 50000 for now
145  unsigned int src_size = serialize_databuffer.currentSpaceUsed();
146  unsigned int new_size = src_size + 50000;
147  if(serialize_databuffer.header_buf_.size() < new_size) serialize_databuffer.header_buf_.resize(new_size);
148 
149  //Build the INIT Message
150  //Following values are strictly DUMMY and will be replaced
151  // once available with Utility function etc.
152  uint32 run = 1;
153 
154  //Get the Process PSet ID
157 
158  //In case we need to print it
159  // cms::Digest dig(toplevel.compactForm());
160  // cms::MD5Result r1 = dig.digest();
161  // std::string hexy = r1.toString();
162  // std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
163 
164  Strings hltTriggerNames = getAllTriggerNames();
165  hltsize_ = hltTriggerNames.size();
166 
167  //L1 stays dummy as of today
168  Strings l1_names; //3
169  l1_names.push_back("t1");
170  l1_names.push_back("t10");
171  l1_names.push_back("t2");
172 
173  //Setting the process name to HLT
174  std::string processName = OutputModule::processName();
175 
176  std::string moduleLabel = description().moduleLabel();
177  uLong crc = crc32(0L, Z_NULL, 0);
178  Bytef* buf = (Bytef*) moduleLabel.data();
179  crc = crc32(crc, buf, moduleLabel.length());
180  outputModuleId_ = static_cast<uint32>(crc);
181 
182  std::auto_ptr<InitMsgBuilder> init_message(
183  new InitMsgBuilder(&serialize_databuffer.header_buf_[0], serialize_databuffer.header_buf_.size(),
184  run, Version((uint8*)toplevel.compactForm().c_str()),
185  getReleaseVersion().c_str() , processName.c_str(),
186  moduleLabel.c_str(), outputModuleId_,
187  hltTriggerNames, hltTriggerSelections_, l1_names,
188  (uint32)serialize_databuffer.adler32_chksum(), host_name_));
189 
190  // copy data into the destination message
191  unsigned char* src = serialize_databuffer.bufferPointer();
192  std::copy(src, src + src_size, init_message->dataAddress());
193  init_message->setDataLength(src_size);
194  return init_message;
195  }
196 
197  void
199 
200  hltbits_.clear(); // If there was something left over from last event
201 
203  //Trig const& prod = getTrigMask(e);
204  std::vector<unsigned char> vHltState;
205 
206  if (prod.isValid()) {
208  vHltState.push_back(((prod->at(i)).state()));
209  }
210  } else {
211  // We fill all Trigger bits to valid state.
213  vHltState.push_back(hlt::Pass);
214  }
215  }
216  //Pack into member hltbits_
217  packIntoString(vHltState, hltbits_);
218 
219  //This is Just a printing code.
220  //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
221  //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
222  // printBits(hltbits_[i]);
223  //}
224  //std::cout << "\n";
225  }
226 
227 // test luminosity sections
228  void
230  struct timeval now;
231  struct timezone dummyTZ;
232  gettimeofday(&now, &dummyTZ);
233  double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC;
234  // what about overflows?
235  if(lumiSectionInterval_ > 0) lumi_ = static_cast<uint32>(timeInSec/lumiSectionInterval_) + 1;
236  }
237 
238  std::auto_ptr<EventMsgBuilder>
240  //Lets Build the Event Message first
241 
242  //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
243  // once figured out, there is no logic involved here.
244  l1bit_.push_back(true);
245  l1bit_.push_back(true);
246  l1bit_.push_back(false);
247  //End of dummy data
248 
249  setHltMask(e);
250 
251  if (lumiSectionInterval_ == 0) {
252  lumi_ = e.luminosityBlock();
253  } else {
254  setLumiSection();
255  }
256 
258 
259  // resize bufs_ to reflect space used in serializer_ + header
260  // I just added an overhead for header of 50000 for now
261  unsigned int src_size = serialize_databuffer.currentSpaceUsed();
262  unsigned int new_size = src_size + 50000;
263  if(serialize_databuffer.bufs_.size() < new_size) serialize_databuffer.bufs_.resize(new_size);
264 
265  std::auto_ptr<EventMsgBuilder>
266  msg(new EventMsgBuilder(&serialize_databuffer.bufs_[0], serialize_databuffer.bufs_.size(), e.id().run(),
267  e.id().event(), lumi_, outputModuleId_, 0,
268  l1bit_, (uint8*)&hltbits_[0], hltsize_,
269  (uint32)serialize_databuffer.adler32_chksum(), host_name_) );
270  msg->setOrigDataSize(origSize_); // we need this set to zero
271 
272  // copy data into the destination message
273  // an alternative is to have serializer only to the serialization
274  // in serializeEvent, and then call a new member "getEventData" that
275  // takes the compression arguments and a place to put the data.
276  // This will require one less copy. The only catch is that the
277  // space provided in bufs_ should be at least the uncompressed
278  // size + overhead for header because we will not know the actual
279  // compressed size.
280 
281  unsigned char* src = serialize_databuffer.bufferPointer();
282  std::copy(src,src + src_size, msg->eventAddr());
283  msg->setEventLength(src_size);
284  if(useCompression_) msg->setOrigDataSize(serialize_databuffer.currentEventSize());
285 
286  l1bit_.clear(); //Clear up for the next event to come.
287  return msg;
288  }
289 
290  void
292  desc.addUntracked<int>("max_event_size", 7000000)
293  ->setComment("Starting size in bytes of the serialized event buffer.");
294  desc.addUntracked<bool>("use_compression", true)
295  ->setComment("If True, compression will be used to write streamer file.");
296  desc.addUntracked<int>("compression_level", 1)
297  ->setComment("ROOT compression level to use.");
298  desc.addUntracked<int>("lumiSection_interval", 0)
299  ->setComment("If 0, use lumi section number from event.\n"
300  "If not 0, the interval in seconds between fake lumi sections.");
302  }
303 } // end of namespace-edm
RunNumber_t run() const
Definition: EventID.h:42
EventNumber_t event() const
Definition: EventID.h:44
static void fillDescription(ParameterSetDescription &desc)
int i
Definition: DBlmapReader.cc:9
void printBits(unsigned char c)
Definition: DumpTools.cc:134
value_type compactForm() const
Definition: Hash.h:209
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
virtual void doOutputHeader(InitMsgBuilder const &init_message) const =0
void setHltMask(EventPrincipal const &e)
static ThreadSafeRegistry * instance()
std::auto_ptr< EventMsgBuilder > serializeEvent(EventPrincipal const &e)
std::vector< std::string > Strings
Definition: MsgTools.h:18
EventID const & id() const
std::string const & processName() const
Definition: OutputModule.h:55
static SerializeDataBuffer serialize_databuffer
std::auto_ptr< InitMsgBuilder > serializeRegistry()
LuminosityBlockNumber_t luminosityBlock() const
#define FDEBUG(lev)
Definition: DebugMacros.h:18
virtual void doOutputEvent(EventMsgBuilder const &msg) const =0
std::string const & moduleLabel() const
uint16_t size_type
virtual void stop() const =0
virtual void endRun(RunPrincipal const &)
virtual void beginRun(RunPrincipal const &)
unsigned int currentSpaceUsed() const
accept
Definition: HLTenums.h:22
static std::vector< std::string > getEventSelectionVString(edm::ParameterSet const &pset)
virtual void writeRun(RunPrincipal const &)
bool isValid() const
Definition: HandleBase.h:76
virtual void start() const =0
int serializeEvent(EventPrincipal const &eventPrincipal, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer)
unsigned int uint32
Definition: MsgTools.h:13
std::string getReleaseVersion()
StreamerOutputModuleBase(ParameterSet const &ps)
unsigned char * bufferPointer() const
static void fillDescription(ParameterSetDescription &desc)
uint32_t adler32_chksum() const
char state
Definition: procUtils.cc:75
unsigned char uint8
Definition: MsgTools.h:11
std::vector< std::string > const & getAllTriggerNames()
Definition: OutputModule.cc:37
Trig getTriggerResults(Event const &ep) const
int serializeRegistry(SerializeDataBuffer &data_buffer)
tuple cout
Definition: gather_cfg.py:121
ParameterSetID getProcessParameterSetID(Registry const *reg)
Associated free functions.
Definition: Registry.cc:11
unsigned int currentEventSize() const
virtual void write(EventPrincipal const &e)
static void enable()
interface for TClass generators
ModuleDescription const & description() const
std::vector< unsigned char > hltbits_
ParameterSetID selectorConfig() const
Definition: OutputModule.h:86
virtual void writeLuminosityBlock(LuminosityBlockPrincipal const &)