CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_4_5_patch3/src/IOPool/Streamer/src/StreamerOutputModuleBase.cc

Go to the documentation of this file.
00001 
00002 #include "IOPool/Streamer/interface/StreamerOutputModuleBase.h"
00003 
00004 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
00005 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
00006 #include "FWCore/RootAutoLibraryLoader/interface/RootAutoLibraryLoader.h"
00007 #include "FWCore/Framework/interface/EventPrincipal.h"
00008 #include "FWCore/Framework/interface/EventSelector.h"
00009 #include "FWCore/ParameterSet/interface/Registry.h"
00010 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00011 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00012 #include "FWCore/Utilities/interface/DebugMacros.h"
00013 //#include "FWCore/Utilities/interface/Digest.h"
00014 #include "FWCore/Version/interface/GetReleaseVersion.h"
00015 #include "DataFormats/Common/interface/TriggerResults.h"
00016 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00017 
00018 #include <string>
00019 #include <unistd.h>
00020 #include "zlib.h"
00021 
00022 static SerializeDataBuffer serialize_databuffer;
00023 
00024 namespace {
00025   //A utility function that packs bits from source into bytes, with
00026   // packInOneByte as the numeber of bytes that are packed from source to dest.
00027   void printBits(unsigned char c) {
00028     for (int i = 7; i >= 0; --i) {
00029       int bit = ((c >> i) & 1);
00030       std::cout << " " << bit;
00031     }
00032   }
00033 
00034   void packIntoString(std::vector<unsigned char> const& source,
00035                       std::vector<unsigned char>& package) {
00036      if (source.size() < 1) {return;}
00037      unsigned int packInOneByte = 4;
00038      unsigned int sizeOfPackage = 1+((source.size()-1)/packInOneByte); //Two bits per HLT
00039 
00040      package.resize(sizeOfPackage);
00041      memset(&package[0], 0x00, sizeOfPackage);
00042 
00043      for (std::vector<unsigned char>::size_type i=0; i != source.size() ; ++i) {
00044        unsigned int whichByte = i/packInOneByte;
00045        unsigned int indxWithinByte = i % packInOneByte;
00046        package[whichByte] = package[whichByte] | (source[i] << (indxWithinByte*2));
00047      }
00048     //for (unsigned int i=0; i !=package.size() ; ++i)
00049     //   printBits(package[i]);
00050     // std::cout << std::endl;
00051 
00052   }
00053 }
00054 
00055 namespace edm {
00056   StreamerOutputModuleBase::StreamerOutputModuleBase(ParameterSet const& ps) :
00057     OutputModule(ps),
00058     selections_(&keptProducts()[InEvent]),
00059     maxEventSize_(ps.getUntrackedParameter<int>("max_event_size")),
00060     useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
00061     compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
00062     lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
00063     serializer_(selections_),
00064     hltsize_(0),
00065     lumi_(0),
00066     l1bit_(0),
00067     hltbits_(0),
00068     origSize_(0) { // no compression as default value - we need this!
00069 
00070     // test luminosity sections
00071     struct timeval now;
00072     struct timezone dummyTZ;
00073     gettimeofday(&now, &dummyTZ);
00074     timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00075 
00076     if(useCompression_ == true) {
00077       if(compressionLevel_ <= 0) {
00078         FDEBUG(9) << "Compression Level = " << compressionLevel_
00079                   << " no compression" << std::endl;
00080         compressionLevel_ = 0;
00081         useCompression_ = false;
00082       } else if(compressionLevel_ > 9) {
00083         FDEBUG(9) << "Compression Level = " << compressionLevel_
00084                   << " using max compression level 9" << std::endl;
00085         compressionLevel_ = 9;
00086       }
00087     }
00088     serialize_databuffer.bufs_.resize(maxEventSize_);
00089     int got_host = gethostname(host_name_, 255);
00090     if(got_host != 0) strcpy(host_name_, "noHostNameFoundOrTooLong");
00091     //loadExtraClasses();
00092     // do the line below instead of loadExtraClasses() to avoid Root errors
00093     RootAutoLibraryLoader::enable();
00094 
00095     // 25-Jan-2008, KAB - pull out the trigger selection request
00096     // which we need for the INIT message
00097     hltTriggerSelections_ = EventSelector::getEventSelectionVString(ps);
00098   }
00099 
00100   StreamerOutputModuleBase::~StreamerOutputModuleBase() {}
00101 
00102   void
00103   StreamerOutputModuleBase::beginRun(RunPrincipal const&) {
00104     start();
00105     std::auto_ptr<InitMsgBuilder>  init_message = serializeRegistry();
00106     doOutputHeader(*init_message);
00107   }
00108 
00109   void
00110   StreamerOutputModuleBase::endRun(RunPrincipal const&) {
00111     stop();
00112   }
00113 
00114   void
00115   StreamerOutputModuleBase::beginJob() {}
00116 
00117   void
00118   StreamerOutputModuleBase::endJob() {
00119     stop();  // for closing of files, notify storage manager, etc.
00120   }
00121 
00122   void
00123   StreamerOutputModuleBase::writeRun(RunPrincipal const&) {}
00124 
00125   void
00126   StreamerOutputModuleBase::writeLuminosityBlock(LuminosityBlockPrincipal const&) {}
00127 
00128   void
00129   StreamerOutputModuleBase::write(EventPrincipal const& e) {
00130     std::auto_ptr<EventMsgBuilder> msg = serializeEvent(e);
00131     doOutputEvent(*msg); // You can't use msg in StreamerOutputModuleBase after this point
00132   }
00133 
00134   std::auto_ptr<InitMsgBuilder>
00135   StreamerOutputModuleBase::serializeRegistry() {
00136 
00137     serializer_.serializeRegistry(serialize_databuffer);
00138 
00139     // resize bufs_ to reflect space used in serializer_ + header
00140     // I just added an overhead for header of 50000 for now
00141     unsigned int src_size = serialize_databuffer.currentSpaceUsed();
00142     unsigned int new_size = src_size + 50000;
00143     if(serialize_databuffer.header_buf_.size() < new_size) serialize_databuffer.header_buf_.resize(new_size);
00144 
00145     //Build the INIT Message
00146     //Following values are strictly DUMMY and will be replaced
00147     // once available with Utility function etc.
00148     uint32 run = 1;
00149 
00150     //Get the Process PSet ID
00151     pset::Registry* reg = pset::Registry::instance();
00152     ParameterSetID toplevel = pset::getProcessParameterSetID(reg);
00153 
00154     //In case we need to print it
00155     //  cms::Digest dig(toplevel.compactForm());
00156     //  cms::MD5Result r1 = dig.digest();
00157     //  std::string hexy = r1.toString();
00158     //  std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
00159 
00160     Strings hltTriggerNames = getAllTriggerNames();
00161     hltsize_ = hltTriggerNames.size();
00162 
00163     //L1 stays dummy as of today
00164     Strings l1_names;  //3
00165     l1_names.push_back("t1");
00166     l1_names.push_back("t10");
00167     l1_names.push_back("t2");
00168 
00169     //Setting the process name to HLT
00170     std::string processName = OutputModule::processName();
00171 
00172     std::string moduleLabel = description().moduleLabel();
00173     uLong crc = crc32(0L, Z_NULL, 0);
00174     Bytef* buf = (Bytef*) moduleLabel.data();
00175     crc = crc32(crc, buf, moduleLabel.length());
00176     outputModuleId_ = static_cast<uint32>(crc);
00177 
00178     std::auto_ptr<InitMsgBuilder> init_message(
00179         new InitMsgBuilder(&serialize_databuffer.header_buf_[0], serialize_databuffer.header_buf_.size(),
00180                            run, Version((uint8*)toplevel.compactForm().c_str()),
00181                            getReleaseVersion().c_str() , processName.c_str(),
00182                            moduleLabel.c_str(), outputModuleId_,
00183                            hltTriggerNames, hltTriggerSelections_, l1_names,
00184                            (uint32)serialize_databuffer.adler32_chksum(), host_name_));
00185 
00186     // copy data into the destination message
00187     unsigned char* src = serialize_databuffer.bufferPointer();
00188     std::copy(src, src + src_size, init_message->dataAddress());
00189     init_message->setDataLength(src_size);
00190     return init_message;
00191   }
00192 
00193   void
00194   StreamerOutputModuleBase::setHltMask(EventPrincipal const& e) {
00195 
00196     hltbits_.clear();  // If there was something left over from last event
00197 
00198     Handle<TriggerResults> const& prod = getTriggerResults(e);
00199     //Trig const& prod = getTrigMask(e);
00200     std::vector<unsigned char> vHltState;
00201 
00202     if (prod.isValid()) {
00203       for(std::vector<unsigned char>::size_type i=0; i != hltsize_ ; ++i) {
00204         vHltState.push_back(((prod->at(i)).state()));
00205       }
00206     } else {
00207      // We fill all Trigger bits to valid state.
00208      for(std::vector<unsigned char>::size_type i=0; i != hltsize_ ; ++i) {
00209            vHltState.push_back(hlt::Pass);
00210       }
00211     }
00212     //Pack into member hltbits_
00213     packIntoString(vHltState, hltbits_);
00214 
00215     //This is Just a printing code.
00216     //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
00217     //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
00218     //  printBits(hltbits_[i]);
00219     //}
00220     //std::cout << "\n";
00221   }
00222 
00223 // test luminosity sections
00224   void
00225   StreamerOutputModuleBase::setLumiSection() {
00226     struct timeval now;
00227     struct timezone dummyTZ;
00228     gettimeofday(&now, &dummyTZ);
00229     double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC;
00230     // what about overflows?
00231     if(lumiSectionInterval_ > 0) lumi_ = static_cast<uint32>(timeInSec/lumiSectionInterval_) + 1;
00232   }
00233 
00234   std::auto_ptr<EventMsgBuilder>
00235   StreamerOutputModuleBase::serializeEvent(EventPrincipal const& e) {
00236     //Lets Build the Event Message first
00237 
00238     //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
00239     // once figured out, there is no logic involved here.
00240     l1bit_.push_back(true);
00241     l1bit_.push_back(true);
00242     l1bit_.push_back(false);
00243     //End of dummy data
00244 
00245     setHltMask(e);
00246 
00247     if (lumiSectionInterval_ == 0) {
00248       lumi_ = e.luminosityBlock();
00249     } else {
00250       setLumiSection();
00251     }
00252 
00253     serializer_.serializeEvent(e, selectorConfig(), useCompression_, compressionLevel_, serialize_databuffer);
00254 
00255     // resize bufs_ to reflect space used in serializer_ + header
00256     // I just added an overhead for header of 50000 for now
00257     unsigned int src_size = serialize_databuffer.currentSpaceUsed();
00258     unsigned int new_size = src_size + 50000;
00259     if(serialize_databuffer.bufs_.size() < new_size) serialize_databuffer.bufs_.resize(new_size);
00260 
00261     std::auto_ptr<EventMsgBuilder>
00262       msg(new EventMsgBuilder(&serialize_databuffer.bufs_[0], serialize_databuffer.bufs_.size(), e.id().run(),
00263                               e.id().event(), lumi_, outputModuleId_, 0,
00264                               l1bit_, (uint8*)&hltbits_[0], hltsize_,
00265                               (uint32)serialize_databuffer.adler32_chksum(), host_name_) );
00266     msg->setOrigDataSize(origSize_); // we need this set to zero
00267 
00268     // copy data into the destination message
00269     // an alternative is to have serializer only to the serialization
00270     // in serializeEvent, and then call a new member "getEventData" that
00271     // takes the compression arguments and a place to put the data.
00272     // This will require one less copy.  The only catch is that the
00273     // space provided in bufs_ should be at least the uncompressed
00274     // size + overhead for header because we will not know the actual
00275     // compressed size.
00276 
00277     unsigned char* src = serialize_databuffer.bufferPointer();
00278     std::copy(src,src + src_size, msg->eventAddr());
00279     msg->setEventLength(src_size);
00280     if(useCompression_) msg->setOrigDataSize(serialize_databuffer.currentEventSize());
00281 
00282     l1bit_.clear();  //Clear up for the next event to come.
00283     return msg;
00284   }
00285 
00286   void
00287   StreamerOutputModuleBase::fillDescription(ParameterSetDescription& desc) {
00288     desc.addUntracked<int>("max_event_size", 7000000)
00289         ->setComment("Starting size in bytes of the serialized event buffer.");
00290     desc.addUntracked<bool>("use_compression", true)
00291         ->setComment("If True, compression will be used to write streamer file.");
00292     desc.addUntracked<int>("compression_level", 1)
00293         ->setComment("ROOT compression level to use.");
00294     desc.addUntracked<int>("lumiSection_interval", 0)
00295         ->setComment("If 0, use lumi section number from event.\n"
00296                      "If not 0, the interval in seconds between fake lumi sections.");
00297     OutputModule::fillDescription(desc);
00298   }
00299 } // end of namespace-edm