
Go to the documentation of this file.
00001 /*
00002    Description:
00003      EDM output module that will write data to shared memory for 
00004      the resource broker to send to the Storage Manager.
00005      See the CMS EvF Storage Manager wiki page for further notes.
00007    $Id:,v 1.15 2012/05/02 15:02:19 smorovic Exp $
00008 */
00010 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00012 #include "IOPool/Streamer/interface/EventMessage.h"
00013 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00014 #include "DataFormats/Provenance/interface/EventID.h"
00016 #include "FWCore/ServiceRegistry/interface/Service.h"
00017 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00018 #include "FWCore/Utilities/interface/DebugMacros.h"
00019 #include "FWCore/Utilities/interface/Exception.h"
00020 #include "FWCore/Utilities/src/Guid.h"
00022 #include "xdaq/Application.h"
00023 #include "xdaq/ApplicationContext.h"
00024 #include "xdaq/ApplicationGroup.h"
00025 #include "zlib.h"
00027 #include <string>
00028 #include <fstream>
00029 #include <iostream>
00031 using namespace edm;
00032 using namespace std;
00034 static SM_SharedMemoryHandle sm_sharedmemory;
00036 namespace edm
00037 {
00042   bool FUShmOutputModule::fuIdsInitialized_ = false;
00043   uint32 FUShmOutputModule::fuGuidValue_ = 0;
00045   FUShmOutputModule::FUShmOutputModule(edm::ParameterSet const& ps):
00046     shmBuffer_(0)
00047     , name_(ps.getParameter<std::string>( "@module_label" ))
00048     , count_(0)
00049     , postponeInitMsg_(false)
00050     , sentInitMsg_(false)
00051     , initBuf_(nullptr)
00052     , initBufSize_(0)
00053     , postponeStart_(false)
00054     , nExpectedEPs_(0)
00055   {
00056     FDEBUG(9) << "FUShmOutputModule: constructor" << endl;
00057     if(edm::Service<evf::ShmOutputModuleRegistry>())
00058       edm::Service<evf::ShmOutputModuleRegistry>()->registerModule(name_, this);  
00059     if (! fuIdsInitialized_) {
00060       fuIdsInitialized_ = true;
00062       edm::Guid guidObj(true);
00063       std::string guidString = guidObj.toString();
00065       uLong crc = crc32(0L, Z_NULL, 0);
00066       Bytef* buf = (Bytef*);
00067       crc = crc32(crc, buf, guidString.length());
00068       fuGuidValue_ = crc;
00069     }
00070   }
00072   FUShmOutputModule::~FUShmOutputModule()
00073   {
00074     FDEBUG(9) << "FUShmOutputModule: FUShmOutputModule destructor" << endl;
00075     sm_sharedmemory.detachShmBuffer();
00076     //shmdt(shmBuffer_);
00077   }
00079   void FUShmOutputModule::doOutputHeader(InitMsgBuilder const& initMessage)
00080   {
00081     //saving message for later if postpone is on
00082     if (postponeInitMsg_) {
00083       sentInitMsg_=false;
00084       if (initBuf_) delete initBuf_;//clean up if there are leftovers from last run
00085       //copy message for later sending
00086       initBufSize_ = initMessage.size();
00087       initBuf_ = new unsigned char[initBufSize_];
00088       memcpy(initBuf_, (unsigned char*) initMessage.startAddress(),sizeof(unsigned char)*initBufSize_);
00089       return;
00090     }
00092     sentInitMsg_=true;
00093     count_ = 0;
00094     if(!shmBuffer_) shmBuffer_ = sm_sharedmemory.getShmBuffer();
00095     if(!shmBuffer_) edm::LogError("FUShmOutputModule") 
00096       << " Error getting shared memory buffer for INIT. " 
00097       << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
00098       << " No INIT is sent - this is probably fatal!";
00099     if(shmBuffer_)
00100     {
00101       unsigned char* buffer = (unsigned char*) initMessage.startAddress();
00102       unsigned int size = initMessage.size();
00103       FDEBUG(10) << "writing out INIT message with size = " << size << std::endl;
00104       // no method in InitMsgBuilder to get the output module id, recast
00105       InitMsgView dummymsg(buffer);
00106       uint32 dmoduleId = dummymsg.outputModuleId();
00108       //bool ret = shmBuffer_->writeRecoInitMsg(dmoduleId, buffer, size);
00109       bool ret = sm_sharedmemory.getShmBuffer()->writeRecoInitMsg(dmoduleId, getpid(), fuGuidValue_, buffer, size,nExpectedEPs_);
00110       if(!ret) edm::LogError("FUShmOutputModule") << " Error writing preamble to ShmBuffer";
00111     }
00112   }
00114   void FUShmOutputModule::setPostponeInitMsg()
00115   {
00116     //postpone start and Init message for after beginRun
00117     postponeInitMsg_=true;
00118     postponeStart_=true;
00119     //reset this on each run
00120     if (initBuf_) delete initBuf_;
00121     initBufSize_=0;
00122     initBuf_=nullptr;
00123     sentInitMsg_=false;
00124   }
00126   void FUShmOutputModule::sendPostponedInitMsg() 
00127   {
00128     if (postponeStart_) {
00129       postponeStart_=false;
00130       start();
00131     }
00132     if (!sentInitMsg_ && postponeInitMsg_) {
00133       if(!shmBuffer_) shmBuffer_ = sm_sharedmemory.getShmBuffer();
00134       if(!shmBuffer_) edm::LogError("FUShmOutputModule")
00135         << " Error getting shared memory buffer for INIT. "
00136         << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
00137         << " No INIT is sent - this is probably fatal!";
00138       if(shmBuffer_)
00139       {
00140         FDEBUG(10) << "writing out (postponed) INIT message with size = " << initBufSize_ << std::endl;
00141         InitMsgView dummymsg(initBuf_);
00142         uint32 dmoduleId = dummymsg.outputModuleId();
00143         bool ret = sm_sharedmemory.getShmBuffer()->writeRecoInitMsg(dmoduleId, getpid(), fuGuidValue_, initBuf_, initBufSize_,nExpectedEPs_);
00144         if(!ret) edm::LogError("FUShmOutputModule") << " Error writing preamble to ShmBuffer";
00145       }
00146       sentInitMsg_=true;
00147       if (initBuf_) delete initBuf_;
00148       initBufSize_=0;
00149       initBuf_=nullptr;
00150     }
00151   }
00154   void FUShmOutputModule::doOutputEvent(EventMsgBuilder const& eventMessage)
00155   {
00156     if (!sentInitMsg_ && postponeInitMsg_) sendPostponedInitMsg();
00157     if(!shmBuffer_) edm::LogError("FUShmOutputModule") 
00158       << " Invalid shared memory buffer at first event"
00159       << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
00160       << " No event is sent - this is fatal! Should throw here";
00161     else
00162     {
00163       count_++;
00164       unsigned char* buffer = (unsigned char*) eventMessage.startAddress();
00165       unsigned int size = eventMessage.size();
00166       EventMsgView eventView(eventMessage.startAddress());
00167       unsigned int runid =;
00168       unsigned int eventid = eventView.event();
00169       unsigned int outModId = eventView.outModId();
00170       FDEBUG(10) << "FUShmOutputModule: event size = " << size << std::endl;
00171       //bool ret = shmBuffer_->writeRecoEventData(runid, eventid, outModId, buffer, size);
00172       bool ret = sm_sharedmemory.getShmBuffer()->writeRecoEventData(runid, eventid, outModId, getpid(), fuGuidValue_, buffer, size);
00173       if(!ret) edm::LogError("FUShmOutputModule") << " Error with writing data to ShmBuffer";
00174     }
00175   }
00177   void FUShmOutputModule::start()
00178   {
00179     if (postponeStart_) return;
00180     //shmBuffer_ = evf::FUShmBuffer::getShmBuffer();
00181     shmBuffer_ = sm_sharedmemory.getShmBuffer();
00182     if(0==shmBuffer_) 
00183       edm::LogError("FUShmOutputModule")<<"Failed to attach to shared memory";
00184   }
00186   void FUShmOutputModule::sendPostponedStart() {
00187       postponeStart_=false;
00188       start();
00189   }
00191   void FUShmOutputModule::stop()
00192   {
00193     FDEBUG(9) << "FUShmOutputModule: sending terminate run" << std::endl;
00194     if(0!=shmBuffer_){
00195       sm_sharedmemory.detachShmBuffer();
00196       //shmdt(shmBuffer_);
00197       shmBuffer_ = 0;
00198     }
00199   }
00201   void FUShmOutputModule::setNExpectedEPs(unsigned int EPs) {
00202     nExpectedEPs_ = EPs;
00203   }
00205   void FUShmOutputModule::unregisterFromShm() {
00206     shmBuffer_=sm_sharedmemory.getBufferRef();
00207     if (0!=shmBuffer_) {
00208       shmBuffer_->removeClientPrcId(getpid());
00209     }
00210   }
00212 }