CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_2_7_hltpatch1/src/EventFilter/Modules/src/FUShmOutputModule.cc

Go to the documentation of this file.
00001 /*
00002    Description:
00003      EDM output module that will write data to shared memory for 
00004      the resource broker to send to the Storage Manager.
00005      See the CMS EvF Storage Manager wiki page for further notes.
00006 
00007    $Id: FUShmOutputModule.cc,v 1.15 2012/05/02 15:02:19 smorovic Exp $
00008 */
00009 
00010 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00011 
00012 #include "IOPool/Streamer/interface/EventMessage.h"
00013 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00014 #include "DataFormats/Provenance/interface/EventID.h"
00015 
00016 #include "FWCore/ServiceRegistry/interface/Service.h"
00017 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00018 #include "FWCore/Utilities/interface/DebugMacros.h"
00019 #include "FWCore/Utilities/interface/Exception.h"
00020 #include "FWCore/Utilities/src/Guid.h"
00021 
00022 #include "xdaq/Application.h"
00023 #include "xdaq/ApplicationContext.h"
00024 #include "xdaq/ApplicationGroup.h"
00025 #include "zlib.h"
00026 
00027 #include <string>
00028 #include <fstream>
00029 #include <iostream>
00030 
00031 using namespace edm;
00032 using namespace std;
00033 
00034 static SM_SharedMemoryHandle sm_sharedmemory;
00035 
00036 namespace edm
00037 {
00038 
00042   bool FUShmOutputModule::fuIdsInitialized_ = false;
00043   uint32 FUShmOutputModule::fuGuidValue_ = 0;
00044 
00045   FUShmOutputModule::FUShmOutputModule(edm::ParameterSet const& ps):
00046     shmBuffer_(0)
00047     , name_(ps.getParameter<std::string>( "@module_label" ))
00048     , count_(0)
00049     , postponeInitMsg_(false)
00050     , sentInitMsg_(false)
00051     , initBuf_(nullptr)
00052     , initBufSize_(0)
00053     , postponeStart_(false)
00054     , nExpectedEPs_(0)
00055   {
00056     FDEBUG(9) << "FUShmOutputModule: constructor" << endl;
00057     if(edm::Service<evf::ShmOutputModuleRegistry>())
00058       edm::Service<evf::ShmOutputModuleRegistry>()->registerModule(name_, this);  
00059     if (! fuIdsInitialized_) {
00060       fuIdsInitialized_ = true;
00061 
00062       edm::Guid guidObj(true);
00063       std::string guidString = guidObj.toString();
00064 
00065       uLong crc = crc32(0L, Z_NULL, 0);
00066       Bytef* buf = (Bytef*)guidString.data();
00067       crc = crc32(crc, buf, guidString.length());
00068       fuGuidValue_ = crc;
00069     }
00070   }
00071   
00072   FUShmOutputModule::~FUShmOutputModule()
00073   {
00074     FDEBUG(9) << "FUShmOutputModule: FUShmOutputModule destructor" << endl;
00075     sm_sharedmemory.detachShmBuffer();
00076     //shmdt(shmBuffer_);
00077   }
00078 
00079   void FUShmOutputModule::doOutputHeader(InitMsgBuilder const& initMessage)
00080   {
00081     //saving message for later if postpone is on
00082     if (postponeInitMsg_) {
00083       sentInitMsg_=false;
00084       if (initBuf_) delete initBuf_;//clean up if there are leftovers from last run
00085       //copy message for later sending
00086       initBufSize_ = initMessage.size();
00087       initBuf_ = new unsigned char[initBufSize_];
00088       memcpy(initBuf_, (unsigned char*) initMessage.startAddress(),sizeof(unsigned char)*initBufSize_);
00089       return;
00090     }
00091 
00092     sentInitMsg_=true;
00093     count_ = 0;
00094     if(!shmBuffer_) shmBuffer_ = sm_sharedmemory.getShmBuffer();
00095     if(!shmBuffer_) edm::LogError("FUShmOutputModule") 
00096       << " Error getting shared memory buffer for INIT. " 
00097       << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
00098       << " No INIT is sent - this is probably fatal!";
00099     if(shmBuffer_)
00100     {
00101       unsigned char* buffer = (unsigned char*) initMessage.startAddress();
00102       unsigned int size = initMessage.size();
00103       FDEBUG(10) << "writing out INIT message with size = " << size << std::endl;
00104       // no method in InitMsgBuilder to get the output module id, recast
00105       InitMsgView dummymsg(buffer);
00106       uint32 dmoduleId = dummymsg.outputModuleId();
00107 
00108       //bool ret = shmBuffer_->writeRecoInitMsg(dmoduleId, buffer, size);
00109       bool ret = sm_sharedmemory.getShmBuffer()->writeRecoInitMsg(dmoduleId, getpid(), fuGuidValue_, buffer, size,nExpectedEPs_);
00110       if(!ret) edm::LogError("FUShmOutputModule") << " Error writing preamble to ShmBuffer";
00111     }
00112   }
00113 
00114   void FUShmOutputModule::setPostponeInitMsg()
00115   {
00116     //postpone start and Init message for after beginRun
00117     postponeInitMsg_=true;
00118     postponeStart_=true;
00119     //reset this on each run
00120     if (initBuf_) delete initBuf_;
00121     initBufSize_=0;
00122     initBuf_=nullptr;
00123     sentInitMsg_=false;
00124   }
00125 
00126   void FUShmOutputModule::sendPostponedInitMsg() 
00127   {
00128     if (postponeStart_) {
00129       postponeStart_=false;
00130       start();
00131     }
00132     if (!sentInitMsg_ && postponeInitMsg_) {
00133       if(!shmBuffer_) shmBuffer_ = sm_sharedmemory.getShmBuffer();
00134       if(!shmBuffer_) edm::LogError("FUShmOutputModule")
00135         << " Error getting shared memory buffer for INIT. "
00136         << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
00137         << " No INIT is sent - this is probably fatal!";
00138       if(shmBuffer_)
00139       {
00140         FDEBUG(10) << "writing out (postponed) INIT message with size = " << initBufSize_ << std::endl;
00141         InitMsgView dummymsg(initBuf_);
00142         uint32 dmoduleId = dummymsg.outputModuleId();
00143         bool ret = sm_sharedmemory.getShmBuffer()->writeRecoInitMsg(dmoduleId, getpid(), fuGuidValue_, initBuf_, initBufSize_,nExpectedEPs_);
00144         if(!ret) edm::LogError("FUShmOutputModule") << " Error writing preamble to ShmBuffer";
00145       }
00146       sentInitMsg_=true;
00147       if (initBuf_) delete initBuf_;
00148       initBufSize_=0;
00149       initBuf_=nullptr;
00150     }
00151   }
00152 
00153 
00154   void FUShmOutputModule::doOutputEvent(EventMsgBuilder const& eventMessage)
00155   {
00156     if (!sentInitMsg_ && postponeInitMsg_) sendPostponedInitMsg();
00157     if(!shmBuffer_) edm::LogError("FUShmOutputModule") 
00158       << " Invalid shared memory buffer at first event"
00159       << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
00160       << " No event is sent - this is fatal! Should throw here";
00161     else
00162     {
00163       count_++;
00164       unsigned char* buffer = (unsigned char*) eventMessage.startAddress();
00165       unsigned int size = eventMessage.size();
00166       EventMsgView eventView(eventMessage.startAddress());
00167       unsigned int runid = eventView.run();
00168       unsigned int eventid = eventView.event();
00169       unsigned int outModId = eventView.outModId();
00170       FDEBUG(10) << "FUShmOutputModule: event size = " << size << std::endl;
00171       //bool ret = shmBuffer_->writeRecoEventData(runid, eventid, outModId, buffer, size);
00172       bool ret = sm_sharedmemory.getShmBuffer()->writeRecoEventData(runid, eventid, outModId, getpid(), fuGuidValue_, buffer, size);
00173       if(!ret) edm::LogError("FUShmOutputModule") << " Error with writing data to ShmBuffer";
00174     }
00175   }
00176 
00177   void FUShmOutputModule::start()
00178   {
00179     if (postponeStart_) return;
00180     //shmBuffer_ = evf::FUShmBuffer::getShmBuffer();
00181     shmBuffer_ = sm_sharedmemory.getShmBuffer();
00182     if(0==shmBuffer_) 
00183       edm::LogError("FUShmOutputModule")<<"Failed to attach to shared memory";
00184   }
00185 
00186   void FUShmOutputModule::sendPostponedStart() {
00187       postponeStart_=false;
00188       start();
00189   }
00190 
00191   void FUShmOutputModule::stop()
00192   {
00193     FDEBUG(9) << "FUShmOutputModule: sending terminate run" << std::endl;
00194     if(0!=shmBuffer_){
00195       sm_sharedmemory.detachShmBuffer();
00196       //shmdt(shmBuffer_);
00197       shmBuffer_ = 0;
00198     }
00199   }
00200 
00201   void FUShmOutputModule::setNExpectedEPs(unsigned int EPs) {
00202     nExpectedEPs_ = EPs;
00203   }
00204 
00205   void FUShmOutputModule::unregisterFromShm() {
00206     shmBuffer_=sm_sharedmemory.getBufferRef();
00207     if (0!=shmBuffer_) {
00208       shmBuffer_->removeClientPrcId(getpid());
00209     }
00210   }
00211 
00212 }