CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/IOPool/Streamer/src/StreamerOutputModuleBase.cc

Go to the documentation of this file.
00001 
00002 #include "IOPool/Streamer/interface/StreamerOutputModuleBase.h"
00003 
00004 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
00005 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
00006 #include "FWCore/RootAutoLibraryLoader/interface/RootAutoLibraryLoader.h"
00007 #include "FWCore/Framework/interface/EventPrincipal.h"
00008 #include "FWCore/Framework/interface/EventSelector.h"
00009 #include "FWCore/ParameterSet/interface/Registry.h"
00010 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00011 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00012 #include "FWCore/Utilities/interface/DebugMacros.h"
00013 //#include "FWCore/Utilities/interface/Digest.h"
00014 #include "FWCore/Version/interface/GetReleaseVersion.h"
00015 #include "DataFormats/Common/interface/TriggerResults.h"
00016 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00017 
00018 #include <string>
00019 #include <unistd.h>
00020 #include "zlib.h"
00021 
00022 static SerializeDataBuffer serialize_databuffer;
00023 
00024 namespace {
00025   //A utility function that packs bits from source into bytes, with
00026   // packInOneByte as the numeber of bytes that are packed from source to dest.
00027   void printBits(unsigned char c) {
00028     for (int i = 7; i >= 0; --i) {
00029       int bit = ((c >> i) & 1);
00030       std::cout << " " << bit;
00031     }
00032   }
00033 
00034   void packIntoString(std::vector<unsigned char> const& source,
00035                       std::vector<unsigned char>& package) {
00036      if (source.size() < 1) {return;}
00037      unsigned int packInOneByte = 4;
00038      unsigned int sizeOfPackage = 1+((source.size()-1)/packInOneByte); //Two bits per HLT
00039 
00040      package.resize(sizeOfPackage);
00041      memset(&package[0], 0x00, sizeOfPackage);
00042 
00043      for (std::vector<unsigned char>::size_type i=0; i != source.size() ; ++i) {
00044        unsigned int whichByte = i/packInOneByte;
00045        unsigned int indxWithinByte = i % packInOneByte;
00046        package[whichByte] = package[whichByte] | (source[i] << (indxWithinByte*2));
00047      }
00048     //for (unsigned int i=0; i !=package.size() ; ++i)
00049     //   printBits(package[i]);
00050     // std::cout << std::endl;
00051 
00052   }
00053 }
00054 
00055 namespace edm {
00056   StreamerOutputModuleBase::StreamerOutputModuleBase(ParameterSet const& ps) :
00057     OutputModule(ps),
00058     selections_(&keptProducts()[InEvent]),
00059     maxEventSize_(ps.getUntrackedParameter<int>("max_event_size")),
00060     useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
00061     compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
00062     lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
00063     serializer_(selections_),
00064     hltsize_(0),
00065     lumi_(0),
00066     l1bit_(0),
00067     hltbits_(0),
00068     origSize_(0) { // no compression as default value - we need this!
00069 
00070     // test luminosity sections
00071     struct timeval now;
00072     struct timezone dummyTZ;
00073     gettimeofday(&now, &dummyTZ);
00074     timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00075 
00076     if(useCompression_ == true) {
00077       if(compressionLevel_ <= 0) {
00078         FDEBUG(9) << "Compression Level = " << compressionLevel_
00079                   << " no compression" << std::endl;
00080         compressionLevel_ = 0;
00081         useCompression_ = false;
00082       } else if(compressionLevel_ > 9) {
00083         FDEBUG(9) << "Compression Level = " << compressionLevel_
00084                   << " using max compression level 9" << std::endl;
00085         compressionLevel_ = 9;
00086       }
00087     }
00088     serialize_databuffer.bufs_.resize(maxEventSize_);
00089     int got_host = gethostname(host_name_, 255);
00090     if(got_host != 0) strcpy(host_name_, "noHostNameFoundOrTooLong");
00091     //loadExtraClasses();
00092     // do the line below instead of loadExtraClasses() to avoid Root errors
00093     RootAutoLibraryLoader::enable();
00094 
00095     // 25-Jan-2008, KAB - pull out the trigger selection request
00096     // which we need for the INIT message
00097     hltTriggerSelections_ = EventSelector::getEventSelectionVString(ps);
00098   }
00099 
00100   StreamerOutputModuleBase::~StreamerOutputModuleBase() {}
00101 
00102   void
00103   StreamerOutputModuleBase::beginRun(RunPrincipal const&) {
00104     start();
00105     std::auto_ptr<InitMsgBuilder>  init_message = serializeRegistry();
00106     doOutputHeader(*init_message);
00107   }
00108 
00109   void
00110   StreamerOutputModuleBase::endRun(RunPrincipal const&) {
00111     stop();
00112   }
00113 
00114   void
00115   StreamerOutputModuleBase::beginJob() {}
00116 
00117   void
00118   StreamerOutputModuleBase::endJob() {
00119     stop();  // for closing of files, notify storage manager, etc.
00120   }
00121 
00122   void
00123   StreamerOutputModuleBase::writeRun(RunPrincipal const&) {}
00124 
00125   void
00126   StreamerOutputModuleBase::writeLuminosityBlock(LuminosityBlockPrincipal const&) {}
00127 
00128   void
00129   StreamerOutputModuleBase::write(EventPrincipal const& e) {
00130     std::auto_ptr<EventMsgBuilder> msg = serializeEvent(e);
00131     doOutputEvent(*msg); // You can't use msg in StreamerOutputModuleBase after this point
00132   }
00133 
00134   std::auto_ptr<InitMsgBuilder>
00135   StreamerOutputModuleBase::serializeRegistry() {
00136 
00137     serializer_.serializeRegistry(serialize_databuffer);
00138 
00139     // resize bufs_ to reflect space used in serializer_ + header
00140     // I just added an overhead for header of 50000 for now
00141     unsigned int src_size = serialize_databuffer.currentSpaceUsed();
00142     unsigned int new_size = src_size + 50000;
00143     if(serialize_databuffer.header_buf_.size() < new_size) serialize_databuffer.header_buf_.resize(new_size);
00144 
00145     //Build the INIT Message
00146     //Following values are strictly DUMMY and will be replaced
00147     // once available with Utility function etc.
00148     uint32 run = 1;
00149 
00150     //Get the Process PSet ID
00151     pset::Registry* reg = pset::Registry::instance();
00152     ParameterSetID toplevel = pset::getProcessParameterSetID(reg);
00153 
00154     //In case we need to print it
00155     //  cms::Digest dig(toplevel.compactForm());
00156     //  cms::MD5Result r1 = dig.digest();
00157     //  std::string hexy = r1.toString();
00158     //  std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
00159 
00160     //Setting protocol version V
00161     Version v(8,(uint8*)toplevel.compactForm().c_str());
00162 
00163     Strings hltTriggerNames = getAllTriggerNames();
00164     hltsize_ = hltTriggerNames.size();
00165 
00166     //L1 stays dummy as of today
00167     Strings l1_names;  //3
00168     l1_names.push_back("t1");
00169     l1_names.push_back("t10");
00170     l1_names.push_back("t2");
00171 
00172     //Setting the process name to HLT
00173     std::string processName = OutputModule::processName();
00174 
00175     std::string moduleLabel = description().moduleLabel();
00176     uLong crc = crc32(0L, Z_NULL, 0);
00177     Bytef* buf = (Bytef*) moduleLabel.data();
00178     crc = crc32(crc, buf, moduleLabel.length());
00179     outputModuleId_ = static_cast<uint32>(crc);
00180 
00181     std::auto_ptr<InitMsgBuilder> init_message(
00182         new InitMsgBuilder(&serialize_databuffer.header_buf_[0], serialize_databuffer.header_buf_.size(),
00183                            run, v, getReleaseVersion().c_str() , processName.c_str(),
00184                            moduleLabel.c_str(), outputModuleId_,
00185                            hltTriggerNames, hltTriggerSelections_, l1_names,
00186                            (uint32)serialize_databuffer.adler32_chksum(), host_name_));
00187 
00188     // copy data into the destination message
00189     unsigned char* src = serialize_databuffer.bufferPointer();
00190     std::copy(src, src + src_size, init_message->dataAddress());
00191     init_message->setDataLength(src_size);
00192     return init_message;
00193   }
00194 
00195   void
00196   StreamerOutputModuleBase::setHltMask(EventPrincipal const& e) {
00197 
00198     hltbits_.clear();  // If there was something left over from last event
00199 
00200     Handle<TriggerResults> const& prod = getTriggerResults(e);
00201     //Trig const& prod = getTrigMask(e);
00202     std::vector<unsigned char> vHltState;
00203 
00204     if (prod.isValid()) {
00205       for(std::vector<unsigned char>::size_type i=0; i != hltsize_ ; ++i) {
00206         vHltState.push_back(((prod->at(i)).state()));
00207       }
00208     } else {
00209      // We fill all Trigger bits to valid state.
00210      for(std::vector<unsigned char>::size_type i=0; i != hltsize_ ; ++i) {
00211            vHltState.push_back(hlt::Pass);
00212       }
00213     }
00214     //Pack into member hltbits_
00215     packIntoString(vHltState, hltbits_);
00216 
00217     //This is Just a printing code.
00218     //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
00219     //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
00220     //  printBits(hltbits_[i]);
00221     //}
00222     //std::cout << "\n";
00223   }
00224 
00225 // test luminosity sections
00226   void
00227   StreamerOutputModuleBase::setLumiSection() {
00228     struct timeval now;
00229     struct timezone dummyTZ;
00230     gettimeofday(&now, &dummyTZ);
00231     double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC;
00232     // what about overflows?
00233     if(lumiSectionInterval_ > 0) lumi_ = static_cast<uint32>(timeInSec/lumiSectionInterval_) + 1;
00234   }
00235 
00236   std::auto_ptr<EventMsgBuilder>
00237   StreamerOutputModuleBase::serializeEvent(EventPrincipal const& e) {
00238     //Lets Build the Event Message first
00239 
00240     //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
00241     // once figured out, there is no logic involved here.
00242     l1bit_.push_back(true);
00243     l1bit_.push_back(true);
00244     l1bit_.push_back(false);
00245     //End of dummy data
00246 
00247     setHltMask(e);
00248 
00249     if (lumiSectionInterval_ == 0) {
00250       lumi_ = e.luminosityBlock();
00251     } else {
00252       setLumiSection();
00253     }
00254 
00255     serializer_.serializeEvent(e, selectorConfig(), useCompression_, compressionLevel_, serialize_databuffer);
00256 
00257     // resize bufs_ to reflect space used in serializer_ + header
00258     // I just added an overhead for header of 50000 for now
00259     unsigned int src_size = serialize_databuffer.currentSpaceUsed();
00260     unsigned int new_size = src_size + 50000;
00261     if(serialize_databuffer.bufs_.size() < new_size) serialize_databuffer.bufs_.resize(new_size);
00262 
00263     std::auto_ptr<EventMsgBuilder>
00264       msg(new EventMsgBuilder(&serialize_databuffer.bufs_[0], serialize_databuffer.bufs_.size(), e.id().run(),
00265                               e.id().event(), lumi_, outputModuleId_, 0,
00266                               l1bit_, (uint8*)&hltbits_[0], hltsize_,
00267                               (uint32)serialize_databuffer.adler32_chksum(), host_name_) );
00268     msg->setOrigDataSize(origSize_); // we need this set to zero
00269 
00270     // copy data into the destination message
00271     // an alternative is to have serializer only to the serialization
00272     // in serializeEvent, and then call a new member "getEventData" that
00273     // takes the compression arguments and a place to put the data.
00274     // This will require one less copy.  The only catch is that the
00275     // space provided in bufs_ should be at least the uncompressed
00276     // size + overhead for header because we will not know the actual
00277     // compressed size.
00278 
00279     unsigned char* src = serialize_databuffer.bufferPointer();
00280     std::copy(src,src + src_size, msg->eventAddr());
00281     msg->setEventLength(src_size);
00282     if(useCompression_) msg->setOrigDataSize(serialize_databuffer.currentEventSize());
00283 
00284     l1bit_.clear();  //Clear up for the next event to come.
00285     return msg;
00286   }
00287 
00288   void
00289   StreamerOutputModuleBase::fillDescription(ParameterSetDescription& desc) {
00290     desc.addUntracked<int>("max_event_size", 7000000)
00291         ->setComment("Starting size in bytes of the serialized event buffer.");
00292     desc.addUntracked<bool>("use_compression", true)
00293         ->setComment("If True, compression will be used to write streamer file.");
00294     desc.addUntracked<int>("compression_level", 1)
00295         ->setComment("ROOT compression level to use.");
00296     desc.addUntracked<int>("lumiSection_interval", 0)
00297         ->setComment("If 0, use lumi section number from event.\n"
00298                      "If not 0, the interval in seconds between fake lumi sections.");
00299     OutputModule::fillDescription(desc);
00300   }
00301 } // end of namespace-edm