00001
00002 #include "IOPool/Streamer/interface/StreamerOutputModuleBase.h"
00003
00004 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
00005 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
00006 #include "FWCore/RootAutoLibraryLoader/interface/RootAutoLibraryLoader.h"
00007 #include "FWCore/Framework/interface/EventPrincipal.h"
00008 #include "FWCore/Framework/interface/EventSelector.h"
00009 #include "FWCore/ParameterSet/interface/Registry.h"
00010 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00011 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00012 #include "FWCore/Utilities/interface/DebugMacros.h"
00013
00014 #include "FWCore/Version/interface/GetReleaseVersion.h"
00015 #include "DataFormats/Common/interface/TriggerResults.h"
00016 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00017
00018 #include <string>
00019 #include <unistd.h>
00020 #include "zlib.h"
00021
00022 static SerializeDataBuffer serialize_databuffer;
00023
00024 namespace {
00025
00026
00027 void printBits(unsigned char c) {
00028 for (int i = 7; i >= 0; --i) {
00029 int bit = ((c >> i) & 1);
00030 std::cout << " " << bit;
00031 }
00032 }
00033
00034 void packIntoString(std::vector<unsigned char> const& source,
00035 std::vector<unsigned char>& package) {
00036 if (source.size() < 1) {return;}
00037 unsigned int packInOneByte = 4;
00038 unsigned int sizeOfPackage = 1+((source.size()-1)/packInOneByte);
00039
00040 package.resize(sizeOfPackage);
00041 memset(&package[0], 0x00, sizeOfPackage);
00042
00043 for (std::vector<unsigned char>::size_type i=0; i != source.size() ; ++i) {
00044 unsigned int whichByte = i/packInOneByte;
00045 unsigned int indxWithinByte = i % packInOneByte;
00046 package[whichByte] = package[whichByte] | (source[i] << (indxWithinByte*2));
00047 }
00048
00049
00050
00051
00052 }
00053 }
00054
00055 namespace edm {
00056 StreamerOutputModuleBase::StreamerOutputModuleBase(ParameterSet const& ps) :
00057 OutputModule(ps),
00058 selections_(&keptProducts()[InEvent]),
00059 maxEventSize_(ps.getUntrackedParameter<int>("max_event_size")),
00060 useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
00061 compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
00062 lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
00063 serializer_(selections_),
00064 hltsize_(0),
00065 lumi_(0),
00066 l1bit_(0),
00067 hltbits_(0),
00068 origSize_(0),
00069 host_name_(),
00070 hltTriggerSelections_(),
00071 outputModuleId_(0) {
00072
00073
00074
00075 struct timeval now;
00076 struct timezone dummyTZ;
00077 gettimeofday(&now, &dummyTZ);
00078 timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00079
00080 if(useCompression_ == true) {
00081 if(compressionLevel_ <= 0) {
00082 FDEBUG(9) << "Compression Level = " << compressionLevel_
00083 << " no compression" << std::endl;
00084 compressionLevel_ = 0;
00085 useCompression_ = false;
00086 } else if(compressionLevel_ > 9) {
00087 FDEBUG(9) << "Compression Level = " << compressionLevel_
00088 << " using max compression level 9" << std::endl;
00089 compressionLevel_ = 9;
00090 }
00091 }
00092 serialize_databuffer.bufs_.resize(maxEventSize_);
00093 int got_host = gethostname(host_name_, 255);
00094 if(got_host != 0) strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
00095
00096
00097 RootAutoLibraryLoader::enable();
00098
00099
00100
00101 hltTriggerSelections_ = EventSelector::getEventSelectionVString(ps);
00102 }
00103
00104 StreamerOutputModuleBase::~StreamerOutputModuleBase() {}
00105
00106 void
00107 StreamerOutputModuleBase::beginRun(RunPrincipal const&) {
00108 start();
00109 std::auto_ptr<InitMsgBuilder> init_message = serializeRegistry();
00110 doOutputHeader(*init_message);
00111 }
00112
00113 void
00114 StreamerOutputModuleBase::endRun(RunPrincipal const&) {
00115 stop();
00116 }
00117
00118 void
00119 StreamerOutputModuleBase::beginJob() {}
00120
00121 void
00122 StreamerOutputModuleBase::endJob() {
00123 stop();
00124 }
00125
00126 void
00127 StreamerOutputModuleBase::writeRun(RunPrincipal const&) {}
00128
00129 void
00130 StreamerOutputModuleBase::writeLuminosityBlock(LuminosityBlockPrincipal const&) {}
00131
00132 void
00133 StreamerOutputModuleBase::write(EventPrincipal const& e) {
00134 std::auto_ptr<EventMsgBuilder> msg = serializeEvent(e);
00135 doOutputEvent(*msg);
00136 }
00137
00138 std::auto_ptr<InitMsgBuilder>
00139 StreamerOutputModuleBase::serializeRegistry() {
00140
00141 serializer_.serializeRegistry(serialize_databuffer, *branchIDLists());
00142
00143
00144
00145 unsigned int src_size = serialize_databuffer.currentSpaceUsed();
00146 unsigned int new_size = src_size + 50000;
00147 if(serialize_databuffer.header_buf_.size() < new_size) serialize_databuffer.header_buf_.resize(new_size);
00148
00149
00150
00151
00152 uint32 run = 1;
00153
00154
00155 pset::Registry* reg = pset::Registry::instance();
00156 ParameterSetID toplevel = pset::getProcessParameterSetID(reg);
00157
00158
00159
00160
00161
00162
00163
00164 Strings hltTriggerNames = getAllTriggerNames();
00165 hltsize_ = hltTriggerNames.size();
00166
00167
00168 Strings l1_names;
00169 l1_names.push_back("t1");
00170 l1_names.push_back("t10");
00171 l1_names.push_back("t2");
00172
00173
00174 std::string processName = OutputModule::processName();
00175
00176 std::string moduleLabel = description().moduleLabel();
00177 uLong crc = crc32(0L, Z_NULL, 0);
00178 Bytef* buf = (Bytef*) moduleLabel.data();
00179 crc = crc32(crc, buf, moduleLabel.length());
00180 outputModuleId_ = static_cast<uint32>(crc);
00181
00182 std::auto_ptr<InitMsgBuilder> init_message(
00183 new InitMsgBuilder(&serialize_databuffer.header_buf_[0], serialize_databuffer.header_buf_.size(),
00184 run, Version((uint8*)toplevel.compactForm().c_str()),
00185 getReleaseVersion().c_str() , processName.c_str(),
00186 moduleLabel.c_str(), outputModuleId_,
00187 hltTriggerNames, hltTriggerSelections_, l1_names,
00188 (uint32)serialize_databuffer.adler32_chksum(), host_name_));
00189
00190
00191 unsigned char* src = serialize_databuffer.bufferPointer();
00192 std::copy(src, src + src_size, init_message->dataAddress());
00193 init_message->setDataLength(src_size);
00194 return init_message;
00195 }
00196
00197 void
00198 StreamerOutputModuleBase::setHltMask(EventPrincipal const& e) {
00199
00200 hltbits_.clear();
00201
00202 Handle<TriggerResults> const& prod = getTriggerResults(e);
00203
00204 std::vector<unsigned char> vHltState;
00205
00206 if (prod.isValid()) {
00207 for(std::vector<unsigned char>::size_type i=0; i != hltsize_ ; ++i) {
00208 vHltState.push_back(((prod->at(i)).state()));
00209 }
00210 } else {
00211
00212 for(std::vector<unsigned char>::size_type i=0; i != hltsize_ ; ++i) {
00213 vHltState.push_back(hlt::Pass);
00214 }
00215 }
00216
00217 packIntoString(vHltState, hltbits_);
00218
00219
00220
00221
00222
00223
00224
00225 }
00226
00227
00228 void
00229 StreamerOutputModuleBase::setLumiSection() {
00230 struct timeval now;
00231 struct timezone dummyTZ;
00232 gettimeofday(&now, &dummyTZ);
00233 double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC;
00234
00235 if(lumiSectionInterval_ > 0) lumi_ = static_cast<uint32>(timeInSec/lumiSectionInterval_) + 1;
00236 }
00237
00238 std::auto_ptr<EventMsgBuilder>
00239 StreamerOutputModuleBase::serializeEvent(EventPrincipal const& e) {
00240
00241
00242
00243
00244 l1bit_.push_back(true);
00245 l1bit_.push_back(true);
00246 l1bit_.push_back(false);
00247
00248
00249 setHltMask(e);
00250
00251 if (lumiSectionInterval_ == 0) {
00252 lumi_ = e.luminosityBlock();
00253 } else {
00254 setLumiSection();
00255 }
00256
00257 serializer_.serializeEvent(e, selectorConfig(), useCompression_, compressionLevel_, serialize_databuffer);
00258
00259
00260
00261 unsigned int src_size = serialize_databuffer.currentSpaceUsed();
00262 unsigned int new_size = src_size + 50000;
00263 if(serialize_databuffer.bufs_.size() < new_size) serialize_databuffer.bufs_.resize(new_size);
00264
00265 std::auto_ptr<EventMsgBuilder>
00266 msg(new EventMsgBuilder(&serialize_databuffer.bufs_[0], serialize_databuffer.bufs_.size(), e.id().run(),
00267 e.id().event(), lumi_, outputModuleId_, 0,
00268 l1bit_, (uint8*)&hltbits_[0], hltsize_,
00269 (uint32)serialize_databuffer.adler32_chksum(), host_name_) );
00270 msg->setOrigDataSize(origSize_);
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281 unsigned char* src = serialize_databuffer.bufferPointer();
00282 std::copy(src,src + src_size, msg->eventAddr());
00283 msg->setEventLength(src_size);
00284 if(useCompression_) msg->setOrigDataSize(serialize_databuffer.currentEventSize());
00285
00286 l1bit_.clear();
00287 return msg;
00288 }
00289
00290 void
00291 StreamerOutputModuleBase::fillDescription(ParameterSetDescription& desc) {
00292 desc.addUntracked<int>("max_event_size", 7000000)
00293 ->setComment("Starting size in bytes of the serialized event buffer.");
00294 desc.addUntracked<bool>("use_compression", true)
00295 ->setComment("If True, compression will be used to write streamer file.");
00296 desc.addUntracked<int>("compression_level", 1)
00297 ->setComment("ROOT compression level to use.");
00298 desc.addUntracked<int>("lumiSection_interval", 0)
00299 ->setComment("If 0, use lumi section number from event.\n"
00300 "If not 0, the interval in seconds between fake lumi sections.");
00301 OutputModule::fillDescription(desc);
00302 }
00303 }