CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_13_patch3/src/IOPool/Streamer/src/StreamerOutputModuleBase.cc

Go to the documentation of this file.
00001 
00002 #include "IOPool/Streamer/interface/StreamerOutputModuleBase.h"
00003 
00004 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
00005 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
00006 #include "FWCore/RootAutoLibraryLoader/interface/RootAutoLibraryLoader.h"
00007 #include "FWCore/Framework/interface/EventPrincipal.h"
00008 #include "FWCore/Framework/interface/EventSelector.h"
00009 #include "FWCore/ParameterSet/interface/Registry.h"
00010 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00011 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00012 #include "FWCore/Utilities/interface/DebugMacros.h"
00013 //#include "FWCore/Utilities/interface/Digest.h"
00014 #include "FWCore/Version/interface/GetReleaseVersion.h"
00015 #include "DataFormats/Common/interface/TriggerResults.h"
00016 #include "DataFormats/Provenance/interface/ParameterSetID.h"
00017 
00018 #include <string>
00019 #include <unistd.h>
00020 #include "zlib.h"
00021 
00022 static SerializeDataBuffer serialize_databuffer;
00023 
00024 namespace {
00025   //A utility function that packs bits from source into bytes, with
00026   // packInOneByte as the numeber of bytes that are packed from source to dest.
00027   void printBits(unsigned char c) {
00028     for (int i = 7; i >= 0; --i) {
00029       int bit = ((c >> i) & 1);
00030       std::cout << " " << bit;
00031     }
00032   }
00033 
00034   void packIntoString(std::vector<unsigned char> const& source,
00035                       std::vector<unsigned char>& package) {
00036      if (source.size() < 1) {return;}
00037      unsigned int packInOneByte = 4;
00038      unsigned int sizeOfPackage = 1+((source.size()-1)/packInOneByte); //Two bits per HLT
00039 
00040      package.resize(sizeOfPackage);
00041      memset(&package[0], 0x00, sizeOfPackage);
00042 
00043      for (std::vector<unsigned char>::size_type i=0; i != source.size() ; ++i) {
00044        unsigned int whichByte = i/packInOneByte;
00045        unsigned int indxWithinByte = i % packInOneByte;
00046        package[whichByte] = package[whichByte] | (source[i] << (indxWithinByte*2));
00047      }
00048     //for (unsigned int i=0; i !=package.size() ; ++i)
00049     //   printBits(package[i]);
00050     // std::cout << std::endl;
00051 
00052   }
00053 }
00054 
00055 namespace edm {
00056   StreamerOutputModuleBase::StreamerOutputModuleBase(ParameterSet const& ps) :
00057     OutputModule(ps),
00058     selections_(&keptProducts()[InEvent]),
00059     maxEventSize_(ps.getUntrackedParameter<int>("max_event_size")),
00060     useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
00061     compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
00062     lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
00063     serializer_(selections_),
00064     hltsize_(0),
00065     lumi_(0),
00066     l1bit_(0),
00067     hltbits_(0),
00068     origSize_(0),
00069     host_name_(),
00070     hltTriggerSelections_(),
00071     outputModuleId_(0) {
00072     // no compression as default value - we need this!
00073 
00074     // test luminosity sections
00075     struct timeval now;
00076     struct timezone dummyTZ;
00077     gettimeofday(&now, &dummyTZ);
00078     timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
00079 
00080     if(useCompression_ == true) {
00081       if(compressionLevel_ <= 0) {
00082         FDEBUG(9) << "Compression Level = " << compressionLevel_
00083                   << " no compression" << std::endl;
00084         compressionLevel_ = 0;
00085         useCompression_ = false;
00086       } else if(compressionLevel_ > 9) {
00087         FDEBUG(9) << "Compression Level = " << compressionLevel_
00088                   << " using max compression level 9" << std::endl;
00089         compressionLevel_ = 9;
00090       }
00091     }
00092     serialize_databuffer.bufs_.resize(maxEventSize_);
00093     int got_host = gethostname(host_name_, 255);
00094     if(got_host != 0) strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
00095     //loadExtraClasses();
00096     // do the line below instead of loadExtraClasses() to avoid Root errors
00097     RootAutoLibraryLoader::enable();
00098 
00099     // 25-Jan-2008, KAB - pull out the trigger selection request
00100     // which we need for the INIT message
00101     hltTriggerSelections_ = EventSelector::getEventSelectionVString(ps);
00102   }
00103 
00104   StreamerOutputModuleBase::~StreamerOutputModuleBase() {}
00105 
00106   void
00107   StreamerOutputModuleBase::beginRun(RunPrincipal const&) {
00108     start();
00109     std::auto_ptr<InitMsgBuilder>  init_message = serializeRegistry();
00110     doOutputHeader(*init_message);
00111   }
00112 
00113   void
00114   StreamerOutputModuleBase::endRun(RunPrincipal const&) {
00115     stop();
00116   }
00117 
00118   void
00119   StreamerOutputModuleBase::beginJob() {}
00120 
00121   void
00122   StreamerOutputModuleBase::endJob() {
00123     stop();  // for closing of files, notify storage manager, etc.
00124   }
00125 
00126   void
00127   StreamerOutputModuleBase::writeRun(RunPrincipal const&) {}
00128 
00129   void
00130   StreamerOutputModuleBase::writeLuminosityBlock(LuminosityBlockPrincipal const&) {}
00131 
00132   void
00133   StreamerOutputModuleBase::write(EventPrincipal const& e) {
00134     std::auto_ptr<EventMsgBuilder> msg = serializeEvent(e);
00135     doOutputEvent(*msg); // You can't use msg in StreamerOutputModuleBase after this point
00136   }
00137 
00138   std::auto_ptr<InitMsgBuilder>
00139   StreamerOutputModuleBase::serializeRegistry() {
00140 
00141     serializer_.serializeRegistry(serialize_databuffer);
00142 
00143     // resize bufs_ to reflect space used in serializer_ + header
00144     // I just added an overhead for header of 50000 for now
00145     unsigned int src_size = serialize_databuffer.currentSpaceUsed();
00146     unsigned int new_size = src_size + 50000;
00147     if(serialize_databuffer.header_buf_.size() < new_size) serialize_databuffer.header_buf_.resize(new_size);
00148 
00149     //Build the INIT Message
00150     //Following values are strictly DUMMY and will be replaced
00151     // once available with Utility function etc.
00152     uint32 run = 1;
00153 
00154     //Get the Process PSet ID
00155     pset::Registry* reg = pset::Registry::instance();
00156     ParameterSetID toplevel = pset::getProcessParameterSetID(reg);
00157 
00158     //In case we need to print it
00159     //  cms::Digest dig(toplevel.compactForm());
00160     //  cms::MD5Result r1 = dig.digest();
00161     //  std::string hexy = r1.toString();
00162     //  std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
00163 
00164     Strings hltTriggerNames = getAllTriggerNames();
00165     hltsize_ = hltTriggerNames.size();
00166 
00167     //L1 stays dummy as of today
00168     Strings l1_names;  //3
00169     l1_names.push_back("t1");
00170     l1_names.push_back("t10");
00171     l1_names.push_back("t2");
00172 
00173     //Setting the process name to HLT
00174     std::string processName = OutputModule::processName();
00175 
00176     std::string moduleLabel = description().moduleLabel();
00177     uLong crc = crc32(0L, Z_NULL, 0);
00178     Bytef* buf = (Bytef*) moduleLabel.data();
00179     crc = crc32(crc, buf, moduleLabel.length());
00180     outputModuleId_ = static_cast<uint32>(crc);
00181 
00182     std::auto_ptr<InitMsgBuilder> init_message(
00183         new InitMsgBuilder(&serialize_databuffer.header_buf_[0], serialize_databuffer.header_buf_.size(),
00184                            run, Version((uint8*)toplevel.compactForm().c_str()),
00185                            getReleaseVersion().c_str() , processName.c_str(),
00186                            moduleLabel.c_str(), outputModuleId_,
00187                            hltTriggerNames, hltTriggerSelections_, l1_names,
00188                            (uint32)serialize_databuffer.adler32_chksum(), host_name_));
00189 
00190     // copy data into the destination message
00191     unsigned char* src = serialize_databuffer.bufferPointer();
00192     std::copy(src, src + src_size, init_message->dataAddress());
00193     init_message->setDataLength(src_size);
00194     return init_message;
00195   }
00196 
00197   void
00198   StreamerOutputModuleBase::setHltMask(EventPrincipal const& e) {
00199 
00200     hltbits_.clear();  // If there was something left over from last event
00201 
00202     Handle<TriggerResults> const& prod = getTriggerResults(e);
00203     //Trig const& prod = getTrigMask(e);
00204     std::vector<unsigned char> vHltState;
00205 
00206     if (prod.isValid()) {
00207       for(std::vector<unsigned char>::size_type i=0; i != hltsize_ ; ++i) {
00208         vHltState.push_back(((prod->at(i)).state()));
00209       }
00210     } else {
00211      // We fill all Trigger bits to valid state.
00212      for(std::vector<unsigned char>::size_type i=0; i != hltsize_ ; ++i) {
00213            vHltState.push_back(hlt::Pass);
00214       }
00215     }
00216     //Pack into member hltbits_
00217     packIntoString(vHltState, hltbits_);
00218 
00219     //This is Just a printing code.
00220     //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
00221     //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
00222     //  printBits(hltbits_[i]);
00223     //}
00224     //std::cout << "\n";
00225   }
00226 
00227 // test luminosity sections
00228   void
00229   StreamerOutputModuleBase::setLumiSection() {
00230     struct timeval now;
00231     struct timezone dummyTZ;
00232     gettimeofday(&now, &dummyTZ);
00233     double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC;
00234     // what about overflows?
00235     if(lumiSectionInterval_ > 0) lumi_ = static_cast<uint32>(timeInSec/lumiSectionInterval_) + 1;
00236   }
00237 
00238   std::auto_ptr<EventMsgBuilder>
00239   StreamerOutputModuleBase::serializeEvent(EventPrincipal const& e) {
00240     //Lets Build the Event Message first
00241 
00242     //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
00243     // once figured out, there is no logic involved here.
00244     l1bit_.push_back(true);
00245     l1bit_.push_back(true);
00246     l1bit_.push_back(false);
00247     //End of dummy data
00248 
00249     setHltMask(e);
00250 
00251     if (lumiSectionInterval_ == 0) {
00252       lumi_ = e.luminosityBlock();
00253     } else {
00254       setLumiSection();
00255     }
00256 
00257     serializer_.serializeEvent(e, selectorConfig(), useCompression_, compressionLevel_, serialize_databuffer);
00258 
00259     // resize bufs_ to reflect space used in serializer_ + header
00260     // I just added an overhead for header of 50000 for now
00261     unsigned int src_size = serialize_databuffer.currentSpaceUsed();
00262     unsigned int new_size = src_size + 50000;
00263     if(serialize_databuffer.bufs_.size() < new_size) serialize_databuffer.bufs_.resize(new_size);
00264 
00265     std::auto_ptr<EventMsgBuilder>
00266       msg(new EventMsgBuilder(&serialize_databuffer.bufs_[0], serialize_databuffer.bufs_.size(), e.id().run(),
00267                               e.id().event(), lumi_, outputModuleId_, 0,
00268                               l1bit_, (uint8*)&hltbits_[0], hltsize_,
00269                               (uint32)serialize_databuffer.adler32_chksum(), host_name_) );
00270     msg->setOrigDataSize(origSize_); // we need this set to zero
00271 
00272     // copy data into the destination message
00273     // an alternative is to have serializer only to the serialization
00274     // in serializeEvent, and then call a new member "getEventData" that
00275     // takes the compression arguments and a place to put the data.
00276     // This will require one less copy.  The only catch is that the
00277     // space provided in bufs_ should be at least the uncompressed
00278     // size + overhead for header because we will not know the actual
00279     // compressed size.
00280 
00281     unsigned char* src = serialize_databuffer.bufferPointer();
00282     std::copy(src,src + src_size, msg->eventAddr());
00283     msg->setEventLength(src_size);
00284     if(useCompression_) msg->setOrigDataSize(serialize_databuffer.currentEventSize());
00285 
00286     l1bit_.clear();  //Clear up for the next event to come.
00287     return msg;
00288   }
00289 
00290   void
00291   StreamerOutputModuleBase::fillDescription(ParameterSetDescription& desc) {
00292     desc.addUntracked<int>("max_event_size", 7000000)
00293         ->setComment("Starting size in bytes of the serialized event buffer.");
00294     desc.addUntracked<bool>("use_compression", true)
00295         ->setComment("If True, compression will be used to write streamer file.");
00296     desc.addUntracked<int>("compression_level", 1)
00297         ->setComment("ROOT compression level to use.");
00298     desc.addUntracked<int>("lumiSection_interval", 0)
00299         ->setComment("If 0, use lumi section number from event.\n"
00300                      "If not 0, the interval in seconds between fake lumi sections.");
00301     OutputModule::fillDescription(desc);
00302   }
00303 } // end of namespace-edm