CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamerOutputModuleBase.cc
Go to the documentation of this file.
1 
3 
12 //#include "FWCore/Utilities/interface/Digest.h"
16 
17 #include <iostream>
18 #include <memory>
19 #include <string>
20 #include <sys/time.h>
21 #include <unistd.h>
22 #include <vector>
23 #include "zlib.h"
24 
25 namespace {
26  //A utility function that packs bits from source into bytes, with
27  // packInOneByte as the numeber of bytes that are packed from source to dest.
28 /* inline void printBits(unsigned char c) {
29  for (int i = 7; i >= 0; --i) {
30  int bit = ((c >> i) & 1);
31  std::cout << " " << bit;
32  }
33  } */
34 
35  void packIntoString(std::vector<unsigned char> const& source,
36  std::vector<unsigned char>& package) {
37  if (source.size() < 1) {return;}
38  unsigned int packInOneByte = 4;
39  unsigned int sizeOfPackage = 1+((source.size()-1)/packInOneByte); //Two bits per HLT
40 
41  package.resize(sizeOfPackage);
42  memset(&package[0], 0x00, sizeOfPackage);
43 
44  for (std::vector<unsigned char>::size_type i=0; i != source.size() ; ++i) {
45  unsigned int whichByte = i/packInOneByte;
46  unsigned int indxWithinByte = i % packInOneByte;
47  package[whichByte] = package[whichByte] | (source[i] << (indxWithinByte*2));
48  }
49  //for (unsigned int i=0; i !=package.size() ; ++i)
50  // printBits(package[i]);
51  // std::cout << std::endl;
52 
53  }
54 }
55 
56 namespace edm {
58  OutputModule(ps),
59  selections_(&keptProducts()[InEvent]),
60  maxEventSize_(ps.getUntrackedParameter<int>("max_event_size")),
61  useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
62  compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
63  lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
64  serializer_(selections_),
65  serializeDataBuffer_(),
66  hltsize_(0),
67  lumi_(0),
68  l1bit_(0),
69  hltbits_(0),
70  origSize_(0),
71  host_name_(),
72  hltTriggerSelections_(),
73  outputModuleId_(0) {
74  // no compression as default value - we need this!
75 
76  // test luminosity sections
77  struct timeval now;
78  struct timezone dummyTZ;
79  gettimeofday(&now, &dummyTZ);
80  timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
81 
82  if(useCompression_ == true) {
83  if(compressionLevel_ <= 0) {
84  FDEBUG(9) << "Compression Level = " << compressionLevel_
85  << " no compression" << std::endl;
87  useCompression_ = false;
88  } else if(compressionLevel_ > 9) {
89  FDEBUG(9) << "Compression Level = " << compressionLevel_
90  << " using max compression level 9" << std::endl;
92  }
93  }
95  int got_host = gethostname(host_name_, 255);
96  if(got_host != 0) strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
97  //loadExtraClasses();
98 
99  // 25-Jan-2008, KAB - pull out the trigger selection request
100  // which we need for the INIT message
102  }
103 
105 
106  void
108  start();
109  std::auto_ptr<InitMsgBuilder> init_message = serializeRegistry();
110  doOutputHeader(*init_message);
111  }
112 
113  void
115  stop();
116  }
117 
118  void
120 
121  void
123  stop(); // for closing of files, notify storage manager, etc.
124  }
125 
126  void
128 
129  void
131 
132  void
134  std::auto_ptr<EventMsgBuilder> msg = serializeEvent(e, mcc);
135  doOutputEvent(*msg); // You can't use msg in StreamerOutputModuleBase after this point
136  }
137 
138  std::auto_ptr<InitMsgBuilder>
140 
142 
143  // resize bufs_ to reflect space used in serializer_ + header
144  // I just added an overhead for header of 50000 for now
145  unsigned int src_size = serializeDataBuffer_.currentSpaceUsed();
146  unsigned int new_size = src_size + 50000;
147  if(serializeDataBuffer_.header_buf_.size() < new_size) serializeDataBuffer_.header_buf_.resize(new_size);
148 
149  //Build the INIT Message
150  //Following values are strictly DUMMY and will be replaced
151  // once available with Utility function etc.
152  uint32 run = 1;
153 
154  //Get the Process PSet ID
156 
157  //In case we need to print it
158  // cms::Digest dig(toplevel.compactForm());
159  // cms::MD5Result r1 = dig.digest();
160  // std::string hexy = r1.toString();
161  // std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
162 
163  Strings hltTriggerNames = getAllTriggerNames();
164  hltsize_ = hltTriggerNames.size();
165 
166  //L1 stays dummy as of today
167  Strings l1_names; //3
168  l1_names.push_back("t1");
169  l1_names.push_back("t10");
170  l1_names.push_back("t2");
171 
172  //Setting the process name to HLT
174 
175  std::string moduleLabel = description().moduleLabel();
176  uLong crc = crc32(0L, Z_NULL, 0);
177  Bytef const* buf = (Bytef const*)(moduleLabel.data());
178  crc = crc32(crc, buf, moduleLabel.length());
179  outputModuleId_ = static_cast<uint32>(crc);
180 
181  std::auto_ptr<InitMsgBuilder> init_message(
183  run, Version((uint8 const*)toplevel.compactForm().c_str()),
184  getReleaseVersion().c_str() , processName.c_str(),
185  moduleLabel.c_str(), outputModuleId_,
186  hltTriggerNames, hltTriggerSelections_, l1_names,
188 
189  // copy data into the destination message
190  unsigned char* src = serializeDataBuffer_.bufferPointer();
191  std::copy(src, src + src_size, init_message->dataAddress());
192  init_message->setDataLength(src_size);
193  return init_message;
194  }
195 
196  void
198 
199  hltbits_.clear(); // If there was something left over from last event
200 
202  //Trig const& prod = getTrigMask(e);
203  std::vector<unsigned char> vHltState;
204 
205  if (prod.isValid()) {
207  vHltState.push_back(((prod->at(i)).state()));
208  }
209  } else {
210  // We fill all Trigger bits to valid state.
212  vHltState.push_back(hlt::Pass);
213  }
214  }
215  //Pack into member hltbits_
216  packIntoString(vHltState, hltbits_);
217 
218  //This is Just a printing code.
219  //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
220  //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
221  // printBits(hltbits_[i]);
222  //}
223  //std::cout << "\n";
224  }
225 
226 // test luminosity sections
227  void
229  struct timeval now;
230  struct timezone dummyTZ;
231  gettimeofday(&now, &dummyTZ);
232  double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC;
233  // what about overflows?
234  if(lumiSectionInterval_ > 0) lumi_ = static_cast<uint32>(timeInSec/lumiSectionInterval_) + 1;
235  }
236 
237  std::auto_ptr<EventMsgBuilder>
239  //Lets Build the Event Message first
240 
241  //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
242  // once figured out, there is no logic involved here.
243  l1bit_.push_back(true);
244  l1bit_.push_back(true);
245  l1bit_.push_back(false);
246  //End of dummy data
247 
248  setHltMask(e, mcc);
249 
250  if (lumiSectionInterval_ == 0) {
251  lumi_ = e.luminosityBlock();
252  } else {
253  setLumiSection();
254  }
255 
257 
258  // resize bufs_ to reflect space used in serializer_ + header
259  // I just added an overhead for header of 50000 for now
260  unsigned int src_size = serializeDataBuffer_.currentSpaceUsed();
261  unsigned int new_size = src_size + 50000;
262  if(serializeDataBuffer_.bufs_.size() < new_size) serializeDataBuffer_.bufs_.resize(new_size);
263 
264  std::auto_ptr<EventMsgBuilder>
266  e.id().event(), lumi_, outputModuleId_, 0,
267  l1bit_, (uint8*)&hltbits_[0], hltsize_,
269  msg->setOrigDataSize(origSize_); // we need this set to zero
270 
271  // copy data into the destination message
272  // an alternative is to have serializer only to the serialization
273  // in serializeEvent, and then call a new member "getEventData" that
274  // takes the compression arguments and a place to put the data.
275  // This will require one less copy. The only catch is that the
276  // space provided in bufs_ should be at least the uncompressed
277  // size + overhead for header because we will not know the actual
278  // compressed size.
279 
280  unsigned char* src = serializeDataBuffer_.bufferPointer();
281  std::copy(src,src + src_size, msg->eventAddr());
282  msg->setEventLength(src_size);
283  if(useCompression_) msg->setOrigDataSize(serializeDataBuffer_.currentEventSize());
284 
285  l1bit_.clear(); //Clear up for the next event to come.
286  return msg;
287  }
288 
289  void
291  desc.addUntracked<int>("max_event_size", 7000000)
292  ->setComment("Starting size in bytes of the serialized event buffer.");
293  desc.addUntracked<bool>("use_compression", true)
294  ->setComment("If True, compression will be used to write streamer file.");
295  desc.addUntracked<int>("compression_level", 1)
296  ->setComment("ROOT compression level to use.");
297  desc.addUntracked<int>("lumiSection_interval", 0)
298  ->setComment("If 0, use lumi section number from event.\n"
299  "If not 0, the interval in seconds between fake lumi sections.");
301  }
302 } // end of namespace-edm
RunNumber_t run() const
Definition: EventID.h:39
ParameterSetID getProcessParameterSetID()
Definition: Registry.cc:66
EventNumber_t event() const
Definition: EventID.h:41
static void fillDescription(ParameterSetDescription &desc)
int i
Definition: DBlmapReader.cc:9
value_type compactForm() const
Definition: Hash.h:213
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
virtual void doOutputHeader(InitMsgBuilder const &init_message) const =0
std::vector< std::string > Strings
Definition: MsgTools.h:18
EventID const & id() const
std::string const & processName() const
Definition: OutputModule.h:74
std::auto_ptr< InitMsgBuilder > serializeRegistry()
LuminosityBlockNumber_t luminosityBlock() const
int serializeEvent(EventPrincipal const &eventPrincipal, ParameterSetID const &selectorConfig, bool use_compression, int compression_level, SerializeDataBuffer &data_buffer, ModuleCallingContext const *mcc)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
virtual void doOutputEvent(EventMsgBuilder const &msg) const =0
std::string const & moduleLabel() const
uint16_t size_type
virtual void stop() const =0
unsigned int currentSpaceUsed() const
virtual void writeRun(RunPrincipal const &, ModuleCallingContext const *) override
accept
Definition: HLTenums.h:19
static std::vector< std::string > getEventSelectionVString(edm::ParameterSet const &pset)
virtual void writeLuminosityBlock(LuminosityBlockPrincipal const &, ModuleCallingContext const *) override
int serializeRegistry(SerializeDataBuffer &data_buffer, const BranchIDLists &branchIDLists, ThinnedAssociationsHelper const &thinnedAssociationsHelper)
bool isValid() const
Definition: HandleBase.h:75
virtual void write(EventPrincipal const &e, ModuleCallingContext const *) override
virtual void start() const =0
unsigned int uint32
Definition: MsgTools.h:13
void setHltMask(EventPrincipal const &e, ModuleCallingContext const *)
std::string getReleaseVersion()
StreamerOutputModuleBase(ParameterSet const &ps)
unsigned char * bufferPointer() const
ThinnedAssociationsHelper const * thinnedAssociationsHelper() const
static void fillDescription(ParameterSetDescription &desc)
uint32_t adler32_chksum() const
std::auto_ptr< EventMsgBuilder > serializeEvent(EventPrincipal const &e, ModuleCallingContext const *mcc)
virtual void beginRun(RunPrincipal const &, ModuleCallingContext const *) override
unsigned char uint8
Definition: MsgTools.h:11
std::vector< std::string > const & getAllTriggerNames()
BranchIDLists const * branchIDLists() const
virtual void endRun(RunPrincipal const &, ModuleCallingContext const *) override
unsigned int currentEventSize() const
static std::string const source
Definition: EdmProvDump.cc:42
Trig getTriggerResults(EventPrincipal const &ep, ModuleCallingContext const *) const
ModuleDescription const & description() const
std::vector< unsigned char > hltbits_
ParameterSetID selectorConfig() const
Definition: OutputModule.h:103