Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00011
00012 #include "IOPool/Streamer/interface/EventMessage.h"
00013 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00014 #include "DataFormats/Provenance/interface/EventID.h"
00015
00016 #include "FWCore/ServiceRegistry/interface/Service.h"
00017 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00018 #include "FWCore/Utilities/interface/DebugMacros.h"
00019 #include "FWCore/Utilities/interface/Exception.h"
00020 #include "FWCore/Utilities/src/Guid.h"
00021
00022 #include "xdaq/Application.h"
00023 #include "xdaq/ApplicationContext.h"
00024 #include "xdaq/ApplicationGroup.h"
00025 #include "zlib.h"
00026
00027 #include <string>
00028 #include <fstream>
00029 #include <iostream>
00030
00031 using namespace edm;
00032 using namespace std;
00033
00034 static SM_SharedMemoryHandle sm_sharedmemory;
00035
00036 namespace edm
00037 {
00038
00042 bool FUShmOutputModule::fuIdsInitialized_ = false;
00043 uint32 FUShmOutputModule::fuGuidValue_ = 0;
00044
00045 FUShmOutputModule::FUShmOutputModule(edm::ParameterSet const& ps):
00046 shmBuffer_(0)
00047 , name_(ps.getParameter<std::string>( "@module_label" ))
00048 , count_(0)
00049 , postponeInitMsg_(false)
00050 , sentInitMsg_(false)
00051 , initBuf_(nullptr)
00052 , initBufSize_(0)
00053 , postponeStart_(false)
00054 , nExpectedEPs_(0)
00055 {
00056 FDEBUG(9) << "FUShmOutputModule: constructor" << endl;
00057 if(edm::Service<evf::ShmOutputModuleRegistry>())
00058 edm::Service<evf::ShmOutputModuleRegistry>()->registerModule(name_, this);
00059 if (! fuIdsInitialized_) {
00060 fuIdsInitialized_ = true;
00061
00062 edm::Guid guidObj(true);
00063 std::string guidString = guidObj.toString();
00064
00065 uLong crc = crc32(0L, Z_NULL, 0);
00066 Bytef* buf = (Bytef*)guidString.data();
00067 crc = crc32(crc, buf, guidString.length());
00068 fuGuidValue_ = crc;
00069 }
00070 }
00071
00072 FUShmOutputModule::~FUShmOutputModule()
00073 {
00074 FDEBUG(9) << "FUShmOutputModule: FUShmOutputModule destructor" << endl;
00075 sm_sharedmemory.detachShmBuffer();
00076
00077 }
00078
00079 void FUShmOutputModule::doOutputHeader(InitMsgBuilder const& initMessage)
00080 {
00081
00082 if (postponeInitMsg_) {
00083 sentInitMsg_=false;
00084 if (initBuf_) delete initBuf_;
00085
00086 initBufSize_ = initMessage.size();
00087 initBuf_ = new unsigned char[initBufSize_];
00088 memcpy(initBuf_, (unsigned char*) initMessage.startAddress(),sizeof(unsigned char)*initBufSize_);
00089 return;
00090 }
00091
00092 sentInitMsg_=true;
00093 count_ = 0;
00094 if(!shmBuffer_) shmBuffer_ = sm_sharedmemory.getShmBuffer();
00095 if(!shmBuffer_) edm::LogError("FUShmOutputModule")
00096 << " Error getting shared memory buffer for INIT. "
00097 << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
00098 << " No INIT is sent - this is probably fatal!";
00099 if(shmBuffer_)
00100 {
00101 unsigned char* buffer = (unsigned char*) initMessage.startAddress();
00102 unsigned int size = initMessage.size();
00103 FDEBUG(10) << "writing out INIT message with size = " << size << std::endl;
00104
00105 InitMsgView dummymsg(buffer);
00106 uint32 dmoduleId = dummymsg.outputModuleId();
00107
00108
00109 bool ret = sm_sharedmemory.getShmBuffer()->writeRecoInitMsg(dmoduleId, getpid(), fuGuidValue_, buffer, size,nExpectedEPs_);
00110 if(!ret) edm::LogError("FUShmOutputModule") << " Error writing preamble to ShmBuffer";
00111 }
00112 }
00113
00114 void FUShmOutputModule::setPostponeInitMsg()
00115 {
00116
00117 postponeInitMsg_=true;
00118 postponeStart_=true;
00119
00120 if (initBuf_) delete initBuf_;
00121 initBufSize_=0;
00122 initBuf_=nullptr;
00123 sentInitMsg_=false;
00124 }
00125
00126 void FUShmOutputModule::sendPostponedInitMsg()
00127 {
00128 if (postponeStart_) {
00129 postponeStart_=false;
00130 start();
00131 }
00132 if (!sentInitMsg_ && postponeInitMsg_) {
00133 if(!shmBuffer_) shmBuffer_ = sm_sharedmemory.getShmBuffer();
00134 if(!shmBuffer_) edm::LogError("FUShmOutputModule")
00135 << " Error getting shared memory buffer for INIT. "
00136 << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
00137 << " No INIT is sent - this is probably fatal!";
00138 if(shmBuffer_)
00139 {
00140 FDEBUG(10) << "writing out (postponed) INIT message with size = " << initBufSize_ << std::endl;
00141 InitMsgView dummymsg(initBuf_);
00142 uint32 dmoduleId = dummymsg.outputModuleId();
00143 bool ret = sm_sharedmemory.getShmBuffer()->writeRecoInitMsg(dmoduleId, getpid(), fuGuidValue_, initBuf_, initBufSize_,nExpectedEPs_);
00144 if(!ret) edm::LogError("FUShmOutputModule") << " Error writing preamble to ShmBuffer";
00145 }
00146 sentInitMsg_=true;
00147 if (initBuf_) delete initBuf_;
00148 initBufSize_=0;
00149 initBuf_=nullptr;
00150 }
00151 }
00152
00153
00154 void FUShmOutputModule::doOutputEvent(EventMsgBuilder const& eventMessage)
00155 {
00156 if (!sentInitMsg_ && postponeInitMsg_) sendPostponedInitMsg();
00157 if(!shmBuffer_) edm::LogError("FUShmOutputModule")
00158 << " Invalid shared memory buffer at first event"
00159 << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
00160 << " No event is sent - this is fatal! Should throw here";
00161 else
00162 {
00163 count_++;
00164 unsigned char* buffer = (unsigned char*) eventMessage.startAddress();
00165 unsigned int size = eventMessage.size();
00166 EventMsgView eventView(eventMessage.startAddress());
00167 unsigned int runid = eventView.run();
00168 unsigned int eventid = eventView.event();
00169 unsigned int outModId = eventView.outModId();
00170 FDEBUG(10) << "FUShmOutputModule: event size = " << size << std::endl;
00171
00172 bool ret = sm_sharedmemory.getShmBuffer()->writeRecoEventData(runid, eventid, outModId, getpid(), fuGuidValue_, buffer, size);
00173 if(!ret) edm::LogError("FUShmOutputModule") << " Error with writing data to ShmBuffer";
00174 }
00175 }
00176
00177 void FUShmOutputModule::start()
00178 {
00179 if (postponeStart_) return;
00180
00181 shmBuffer_ = sm_sharedmemory.getShmBuffer();
00182 if(0==shmBuffer_)
00183 edm::LogError("FUShmOutputModule")<<"Failed to attach to shared memory";
00184 }
00185
00186 void FUShmOutputModule::sendPostponedStart() {
00187 postponeStart_=false;
00188 start();
00189 }
00190
00191 void FUShmOutputModule::stop()
00192 {
00193 FDEBUG(9) << "FUShmOutputModule: sending terminate run" << std::endl;
00194 if(0!=shmBuffer_){
00195 sm_sharedmemory.detachShmBuffer();
00196
00197 shmBuffer_ = 0;
00198 }
00199 }
00200
00201 void FUShmOutputModule::setNExpectedEPs(unsigned int EPs) {
00202 nExpectedEPs_ = EPs;
00203 }
00204
00205 void FUShmOutputModule::unregisterFromShm() {
00206 shmBuffer_=sm_sharedmemory.getBufferRef();
00207 if (0!=shmBuffer_) {
00208 shmBuffer_->removeClientPrcId(getpid());
00209 }
00210 }
00211
00212 }