00001
00002 #include "IOPool/Streamer/interface/StreamerOutputModuleBase.h"
00003
00004 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
00005 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
00006 #include "FWCore/RootAutoLibraryLoader/interface/RootAutoLibraryLoader.h"
00007 #include "FWCore/Framework/interface/EventPrincipal.h"
00008 #include "FWCore/Framework/interface/EventSelector.h"
00009 #include "FWCore/ParameterSet/interface/Registry.h"
00010 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00011 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00012 #include "FWCore/Utilities/interface/DebugMacros.h"
00013
00014 #include "FWCore/Version/interface/GetReleaseVersion.h"
00015 #include "DataFormats/Common/interface/TriggerResults.h"
00016 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00017
00018 #include <string>
00019 #include <unistd.h>
00020 #include "zlib.h"
00021
00022 static SerializeDataBuffer serialize_databuffer;
00023
00024 namespace {
00025
00026
00027 void printBits(unsigned char c) {
00028 for (int i = 7; i >= 0; --i) {
00029 int bit = ((c >> i) & 1);
00030 std::cout << " " << bit;
00031 }
00032 }
00033
00034 void packIntoString(std::vector<unsigned char> const& source,
00035 std::vector<unsigned char>& package) {
00036 if (source.size() < 1) {return;}
00037 unsigned int packInOneByte = 4;
00038 unsigned int sizeOfPackage = 1+((source.size()-1)/packInOneByte);
00039
00040 package.resize(sizeOfPackage);
00041 memset(&package[0], 0x00, sizeOfPackage);
00042
00043 for (std::vector<unsigned char>::size_type i=0; i != source.size() ; ++i) {
00044 unsigned int whichByte = i/packInOneByte;
00045 unsigned int indxWithinByte = i % packInOneByte;
00046 package[whichByte] = package[whichByte] | (source[i] << (indxWithinByte*2));
00047 }
00048
00049
00050
00051
00052 }
00053 }
00054
00055 namespace edm {
00056 StreamerOutputModuleBase::StreamerOutputModuleBase(ParameterSet const& ps) :
00057 OutputModule(ps),
00058 selections_(&keptProducts()[InEvent]),
00059 maxEventSize_(ps.getUntrackedParameter<int>("max_event_size")),
00060 useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
00061 compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
00062 lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
00063 serializer_(selections_),
00064 hltsize_(0),
00065 lumi_(0),
00066 l1bit_(0),
00067 hltbits_(0),
00068 origSize_(0) {
00069
00070
00071 struct timeval now;
00072 struct timezone dummyTZ;
00073 gettimeofday(&now, &dummyTZ);
00074 timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00075
00076 if(useCompression_ == true) {
00077 if(compressionLevel_ <= 0) {
00078 FDEBUG(9) << "Compression Level = " << compressionLevel_
00079 << " no compression" << std::endl;
00080 compressionLevel_ = 0;
00081 useCompression_ = false;
00082 } else if(compressionLevel_ > 9) {
00083 FDEBUG(9) << "Compression Level = " << compressionLevel_
00084 << " using max compression level 9" << std::endl;
00085 compressionLevel_ = 9;
00086 }
00087 }
00088 serialize_databuffer.bufs_.resize(maxEventSize_);
00089 int got_host = gethostname(host_name_, 255);
00090 if(got_host != 0) strcpy(host_name_, "noHostNameFoundOrTooLong");
00091
00092
00093 RootAutoLibraryLoader::enable();
00094
00095
00096
00097 hltTriggerSelections_ = EventSelector::getEventSelectionVString(ps);
00098 }
00099
00100 StreamerOutputModuleBase::~StreamerOutputModuleBase() {}
00101
00102 void
00103 StreamerOutputModuleBase::beginRun(RunPrincipal const&) {
00104 start();
00105 std::auto_ptr<InitMsgBuilder> init_message = serializeRegistry();
00106 doOutputHeader(*init_message);
00107 }
00108
00109 void
00110 StreamerOutputModuleBase::endRun(RunPrincipal const&) {
00111 stop();
00112 }
00113
00114 void
00115 StreamerOutputModuleBase::beginJob() {}
00116
00117 void
00118 StreamerOutputModuleBase::endJob() {
00119 stop();
00120 }
00121
00122 void
00123 StreamerOutputModuleBase::writeRun(RunPrincipal const&) {}
00124
00125 void
00126 StreamerOutputModuleBase::writeLuminosityBlock(LuminosityBlockPrincipal const&) {}
00127
00128 void
00129 StreamerOutputModuleBase::write(EventPrincipal const& e) {
00130 std::auto_ptr<EventMsgBuilder> msg = serializeEvent(e);
00131 doOutputEvent(*msg);
00132 }
00133
00134 std::auto_ptr<InitMsgBuilder>
00135 StreamerOutputModuleBase::serializeRegistry() {
00136
00137 serializer_.serializeRegistry(serialize_databuffer);
00138
00139
00140
00141 unsigned int src_size = serialize_databuffer.currentSpaceUsed();
00142 unsigned int new_size = src_size + 50000;
00143 if(serialize_databuffer.header_buf_.size() < new_size) serialize_databuffer.header_buf_.resize(new_size);
00144
00145
00146
00147
00148 uint32 run = 1;
00149
00150
00151 pset::Registry* reg = pset::Registry::instance();
00152 ParameterSetID toplevel = pset::getProcessParameterSetID(reg);
00153
00154
00155
00156
00157
00158
00159
00160
00161 Version v(8,(uint8*)toplevel.compactForm().c_str());
00162
00163 Strings hltTriggerNames = getAllTriggerNames();
00164 hltsize_ = hltTriggerNames.size();
00165
00166
00167 Strings l1_names;
00168 l1_names.push_back("t1");
00169 l1_names.push_back("t10");
00170 l1_names.push_back("t2");
00171
00172
00173 std::string processName = OutputModule::processName();
00174
00175 std::string moduleLabel = description().moduleLabel();
00176 uLong crc = crc32(0L, Z_NULL, 0);
00177 Bytef* buf = (Bytef*) moduleLabel.data();
00178 crc = crc32(crc, buf, moduleLabel.length());
00179 outputModuleId_ = static_cast<uint32>(crc);
00180
00181 std::auto_ptr<InitMsgBuilder> init_message(
00182 new InitMsgBuilder(&serialize_databuffer.header_buf_[0], serialize_databuffer.header_buf_.size(),
00183 run, v, getReleaseVersion().c_str() , processName.c_str(),
00184 moduleLabel.c_str(), outputModuleId_,
00185 hltTriggerNames, hltTriggerSelections_, l1_names,
00186 (uint32)serialize_databuffer.adler32_chksum(), host_name_));
00187
00188
00189 unsigned char* src = serialize_databuffer.bufferPointer();
00190 std::copy(src, src + src_size, init_message->dataAddress());
00191 init_message->setDataLength(src_size);
00192 return init_message;
00193 }
00194
00195 void
00196 StreamerOutputModuleBase::setHltMask(EventPrincipal const& e) {
00197
00198 hltbits_.clear();
00199
00200 Handle<TriggerResults> const& prod = getTriggerResults(e);
00201
00202 std::vector<unsigned char> vHltState;
00203
00204 if (prod.isValid()) {
00205 for(std::vector<unsigned char>::size_type i=0; i != hltsize_ ; ++i) {
00206 vHltState.push_back(((prod->at(i)).state()));
00207 }
00208 } else {
00209
00210 for(std::vector<unsigned char>::size_type i=0; i != hltsize_ ; ++i) {
00211 vHltState.push_back(hlt::Pass);
00212 }
00213 }
00214
00215 packIntoString(vHltState, hltbits_);
00216
00217
00218
00219
00220
00221
00222
00223 }
00224
00225
00226 void
00227 StreamerOutputModuleBase::setLumiSection() {
00228 struct timeval now;
00229 struct timezone dummyTZ;
00230 gettimeofday(&now, &dummyTZ);
00231 double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC;
00232
00233 if(lumiSectionInterval_ > 0) lumi_ = static_cast<uint32>(timeInSec/lumiSectionInterval_) + 1;
00234 }
00235
00236 std::auto_ptr<EventMsgBuilder>
00237 StreamerOutputModuleBase::serializeEvent(EventPrincipal const& e) {
00238
00239
00240
00241
00242 l1bit_.push_back(true);
00243 l1bit_.push_back(true);
00244 l1bit_.push_back(false);
00245
00246
00247 setHltMask(e);
00248
00249 if (lumiSectionInterval_ == 0) {
00250 lumi_ = e.luminosityBlock();
00251 } else {
00252 setLumiSection();
00253 }
00254
00255 serializer_.serializeEvent(e, selectorConfig(), useCompression_, compressionLevel_, serialize_databuffer);
00256
00257
00258
00259 unsigned int src_size = serialize_databuffer.currentSpaceUsed();
00260 unsigned int new_size = src_size + 50000;
00261 if(serialize_databuffer.bufs_.size() < new_size) serialize_databuffer.bufs_.resize(new_size);
00262
00263 std::auto_ptr<EventMsgBuilder>
00264 msg(new EventMsgBuilder(&serialize_databuffer.bufs_[0], serialize_databuffer.bufs_.size(), e.id().run(),
00265 e.id().event(), lumi_, outputModuleId_, 0,
00266 l1bit_, (uint8*)&hltbits_[0], hltsize_,
00267 (uint32)serialize_databuffer.adler32_chksum(), host_name_) );
00268 msg->setOrigDataSize(origSize_);
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279 unsigned char* src = serialize_databuffer.bufferPointer();
00280 std::copy(src,src + src_size, msg->eventAddr());
00281 msg->setEventLength(src_size);
00282 if(useCompression_) msg->setOrigDataSize(serialize_databuffer.currentEventSize());
00283
00284 l1bit_.clear();
00285 return msg;
00286 }
00287
00288 void
00289 StreamerOutputModuleBase::fillDescription(ParameterSetDescription& desc) {
00290 desc.addUntracked<int>("max_event_size", 7000000)
00291 ->setComment("Starting size in bytes of the serialized event buffer.");
00292 desc.addUntracked<bool>("use_compression", true)
00293 ->setComment("If True, compression will be used to write streamer file.");
00294 desc.addUntracked<int>("compression_level", 1)
00295 ->setComment("ROOT compression level to use.");
00296 desc.addUntracked<int>("lumiSection_interval", 0)
00297 ->setComment("If 0, use lumi section number from event.\n"
00298 "If not 0, the interval in seconds between fake lumi sections.");
00299 OutputModule::fillDescription(desc);
00300 }
00301 }