00001
00002 #include "IOPool/Streamer/interface/StreamerOutputModuleBase.h"
00003
00004 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
00005 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
00006 #include "FWCore/RootAutoLibraryLoader/interface/RootAutoLibraryLoader.h"
00007 #include "FWCore/Framework/interface/EventPrincipal.h"
00008 #include "FWCore/Framework/interface/EventSelector.h"
00009 #include "FWCore/ParameterSet/interface/Registry.h"
00010 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00011 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00012 #include "FWCore/Utilities/interface/DebugMacros.h"
00013
00014 #include "FWCore/Version/interface/GetReleaseVersion.h"
00015 #include "DataFormats/Common/interface/TriggerResults.h"
00016 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00017
00018 #include <string>
00019 #include <unistd.h>
00020 #include "zlib.h"
00021
00022 static SerializeDataBuffer serialize_databuffer;
00023
00024 namespace {
00025
00026
00027 void printBits(unsigned char c) {
00028 for (int i = 7; i >= 0; --i) {
00029 int bit = ((c >> i) & 1);
00030 std::cout << " " << bit;
00031 }
00032 }
00033
00034 void packIntoString(std::vector<unsigned char> const& source,
00035 std::vector<unsigned char>& package) {
00036 if (source.size() < 1) {return;}
00037 unsigned int packInOneByte = 4;
00038 unsigned int sizeOfPackage = 1+((source.size()-1)/packInOneByte);
00039
00040 package.resize(sizeOfPackage);
00041 memset(&package[0], 0x00, sizeOfPackage);
00042
00043 for (std::vector<unsigned char>::size_type i=0; i != source.size() ; ++i) {
00044 unsigned int whichByte = i/packInOneByte;
00045 unsigned int indxWithinByte = i % packInOneByte;
00046 package[whichByte] = package[whichByte] | (source[i] << (indxWithinByte*2));
00047 }
00048
00049
00050
00051
00052 }
00053 }
00054
00055 namespace edm {
00056 StreamerOutputModuleBase::StreamerOutputModuleBase(ParameterSet const& ps) :
00057 OutputModule(ps),
00058 selections_(&keptProducts()[InEvent]),
00059 maxEventSize_(ps.getUntrackedParameter<int>("max_event_size")),
00060 useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
00061 compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
00062 lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
00063 serializer_(selections_),
00064 hltsize_(0),
00065 lumi_(0),
00066 l1bit_(0),
00067 hltbits_(0),
00068 origSize_(0) {
00069
00070
00071 struct timeval now;
00072 struct timezone dummyTZ;
00073 gettimeofday(&now, &dummyTZ);
00074 timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00075
00076 if(useCompression_ == true) {
00077 if(compressionLevel_ <= 0) {
00078 FDEBUG(9) << "Compression Level = " << compressionLevel_
00079 << " no compression" << std::endl;
00080 compressionLevel_ = 0;
00081 useCompression_ = false;
00082 } else if(compressionLevel_ > 9) {
00083 FDEBUG(9) << "Compression Level = " << compressionLevel_
00084 << " using max compression level 9" << std::endl;
00085 compressionLevel_ = 9;
00086 }
00087 }
00088 serialize_databuffer.bufs_.resize(maxEventSize_);
00089 int got_host = gethostname(host_name_, 255);
00090 if(got_host != 0) strcpy(host_name_, "noHostNameFoundOrTooLong");
00091
00092
00093 RootAutoLibraryLoader::enable();
00094
00095
00096
00097 hltTriggerSelections_ = EventSelector::getEventSelectionVString(ps);
00098 }
00099
00100 StreamerOutputModuleBase::~StreamerOutputModuleBase() {}
00101
00102 void
00103 StreamerOutputModuleBase::beginRun(RunPrincipal const&) {
00104 start();
00105 std::auto_ptr<InitMsgBuilder> init_message = serializeRegistry();
00106 doOutputHeader(*init_message);
00107 }
00108
00109 void
00110 StreamerOutputModuleBase::endRun(RunPrincipal const&) {
00111 stop();
00112 }
00113
00114 void
00115 StreamerOutputModuleBase::beginJob() {}
00116
00117 void
00118 StreamerOutputModuleBase::endJob() {
00119 stop();
00120 }
00121
00122 void
00123 StreamerOutputModuleBase::writeRun(RunPrincipal const&) {}
00124
00125 void
00126 StreamerOutputModuleBase::writeLuminosityBlock(LuminosityBlockPrincipal const&) {}
00127
00128 void
00129 StreamerOutputModuleBase::write(EventPrincipal const& e) {
00130 std::auto_ptr<EventMsgBuilder> msg = serializeEvent(e);
00131 doOutputEvent(*msg);
00132 }
00133
00134 std::auto_ptr<InitMsgBuilder>
00135 StreamerOutputModuleBase::serializeRegistry() {
00136
00137 serializer_.serializeRegistry(serialize_databuffer);
00138
00139
00140
00141 unsigned int src_size = serialize_databuffer.currentSpaceUsed();
00142 unsigned int new_size = src_size + 50000;
00143 if(serialize_databuffer.header_buf_.size() < new_size) serialize_databuffer.header_buf_.resize(new_size);
00144
00145
00146
00147
00148 uint32 run = 1;
00149
00150
00151 pset::Registry* reg = pset::Registry::instance();
00152 ParameterSetID toplevel = pset::getProcessParameterSetID(reg);
00153
00154
00155
00156
00157
00158
00159
00160 Strings hltTriggerNames = getAllTriggerNames();
00161 hltsize_ = hltTriggerNames.size();
00162
00163
00164 Strings l1_names;
00165 l1_names.push_back("t1");
00166 l1_names.push_back("t10");
00167 l1_names.push_back("t2");
00168
00169
00170 std::string processName = OutputModule::processName();
00171
00172 std::string moduleLabel = description().moduleLabel();
00173 uLong crc = crc32(0L, Z_NULL, 0);
00174 Bytef* buf = (Bytef*) moduleLabel.data();
00175 crc = crc32(crc, buf, moduleLabel.length());
00176 outputModuleId_ = static_cast<uint32>(crc);
00177
00178 std::auto_ptr<InitMsgBuilder> init_message(
00179 new InitMsgBuilder(&serialize_databuffer.header_buf_[0], serialize_databuffer.header_buf_.size(),
00180 run, Version((uint8*)toplevel.compactForm().c_str()),
00181 getReleaseVersion().c_str() , processName.c_str(),
00182 moduleLabel.c_str(), outputModuleId_,
00183 hltTriggerNames, hltTriggerSelections_, l1_names,
00184 (uint32)serialize_databuffer.adler32_chksum(), host_name_));
00185
00186
00187 unsigned char* src = serialize_databuffer.bufferPointer();
00188 std::copy(src, src + src_size, init_message->dataAddress());
00189 init_message->setDataLength(src_size);
00190 return init_message;
00191 }
00192
00193 void
00194 StreamerOutputModuleBase::setHltMask(EventPrincipal const& e) {
00195
00196 hltbits_.clear();
00197
00198 Handle<TriggerResults> const& prod = getTriggerResults(e);
00199
00200 std::vector<unsigned char> vHltState;
00201
00202 if (prod.isValid()) {
00203 for(std::vector<unsigned char>::size_type i=0; i != hltsize_ ; ++i) {
00204 vHltState.push_back(((prod->at(i)).state()));
00205 }
00206 } else {
00207
00208 for(std::vector<unsigned char>::size_type i=0; i != hltsize_ ; ++i) {
00209 vHltState.push_back(hlt::Pass);
00210 }
00211 }
00212
00213 packIntoString(vHltState, hltbits_);
00214
00215
00216
00217
00218
00219
00220
00221 }
00222
00223
00224 void
00225 StreamerOutputModuleBase::setLumiSection() {
00226 struct timeval now;
00227 struct timezone dummyTZ;
00228 gettimeofday(&now, &dummyTZ);
00229 double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC;
00230
00231 if(lumiSectionInterval_ > 0) lumi_ = static_cast<uint32>(timeInSec/lumiSectionInterval_) + 1;
00232 }
00233
00234 std::auto_ptr<EventMsgBuilder>
00235 StreamerOutputModuleBase::serializeEvent(EventPrincipal const& e) {
00236
00237
00238
00239
00240 l1bit_.push_back(true);
00241 l1bit_.push_back(true);
00242 l1bit_.push_back(false);
00243
00244
00245 setHltMask(e);
00246
00247 if (lumiSectionInterval_ == 0) {
00248 lumi_ = e.luminosityBlock();
00249 } else {
00250 setLumiSection();
00251 }
00252
00253 serializer_.serializeEvent(e, selectorConfig(), useCompression_, compressionLevel_, serialize_databuffer);
00254
00255
00256
00257 unsigned int src_size = serialize_databuffer.currentSpaceUsed();
00258 unsigned int new_size = src_size + 50000;
00259 if(serialize_databuffer.bufs_.size() < new_size) serialize_databuffer.bufs_.resize(new_size);
00260
00261 std::auto_ptr<EventMsgBuilder>
00262 msg(new EventMsgBuilder(&serialize_databuffer.bufs_[0], serialize_databuffer.bufs_.size(), e.id().run(),
00263 e.id().event(), lumi_, outputModuleId_, 0,
00264 l1bit_, (uint8*)&hltbits_[0], hltsize_,
00265 (uint32)serialize_databuffer.adler32_chksum(), host_name_) );
00266 msg->setOrigDataSize(origSize_);
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277 unsigned char* src = serialize_databuffer.bufferPointer();
00278 std::copy(src,src + src_size, msg->eventAddr());
00279 msg->setEventLength(src_size);
00280 if(useCompression_) msg->setOrigDataSize(serialize_databuffer.currentEventSize());
00281
00282 l1bit_.clear();
00283 return msg;
00284 }
00285
00286 void
00287 StreamerOutputModuleBase::fillDescription(ParameterSetDescription& desc) {
00288 desc.addUntracked<int>("max_event_size", 7000000)
00289 ->setComment("Starting size in bytes of the serialized event buffer.");
00290 desc.addUntracked<bool>("use_compression", true)
00291 ->setComment("If True, compression will be used to write streamer file.");
00292 desc.addUntracked<int>("compression_level", 1)
00293 ->setComment("ROOT compression level to use.");
00294 desc.addUntracked<int>("lumiSection_interval", 0)
00295 ->setComment("If 0, use lumi section number from event.\n"
00296 "If not 0, the interval in seconds between fake lumi sections.");
00297 OutputModule::fillDescription(desc);
00298 }
00299 }