Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00011
00012 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00013 #include "DataFormats/Provenance/interface/EventID.h"
00014
00015 #include "FWCore/ServiceRegistry/interface/Service.h"
00016 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00017 #include "FWCore/Utilities/interface/DebugMacros.h"
00018 #include "FWCore/Utilities/interface/Exception.h"
00019 #include "FWCore/Utilities/src/Guid.h"
00020
00021 #include "xdaq/Application.h"
00022 #include "xdaq/ApplicationContext.h"
00023 #include "xdaq/ApplicationGroup.h"
00024 #include "zlib.h"
00025
00026 #include <string>
00027 #include <fstream>
00028 #include <iostream>
00029
00030 using namespace edm;
00031 using namespace std;
00032
00033 static SM_SharedMemoryHandle sm_sharedmemory;
00034
00035 namespace edm
00036 {
00037
00041 bool FUShmOutputModule::fuIdsInitialized_ = false;
00042 uint32 FUShmOutputModule::fuGuidValue_ = 0;
00043
00044 FUShmOutputModule::FUShmOutputModule(edm::ParameterSet const& ps):
00045 shmBuffer_(0)
00046 , name_(ps.getParameter<std::string>( "@module_label" ))
00047 , count_(0)
00048 , nExpectedEPs_(0)
00049 , numDatasets_(0)
00050 , streamId_()
00051 {
00052 FDEBUG(9) << "FUShmOutputModule: constructor" << endl;
00053 if(edm::Service<evf::ShmOutputModuleRegistry>())
00054 edm::Service<evf::ShmOutputModuleRegistry>()->registerModule(name_, this);
00055 if (! fuIdsInitialized_) {
00056 fuIdsInitialized_ = true;
00057
00058 edm::Guid guidObj(true);
00059 std::string guidString = guidObj.toString();
00060
00061 uLong crc = crc32(0L, Z_NULL, 0);
00062 Bytef* buf = (Bytef*)guidString.data();
00063 crc = crc32(crc, buf, guidString.length());
00064 fuGuidValue_ = crc;
00065 }
00066 }
00067
00068 FUShmOutputModule::~FUShmOutputModule()
00069 {
00070 FDEBUG(9) << "FUShmOutputModule: FUShmOutputModule destructor" << endl;
00071 sm_sharedmemory.detachShmBuffer();
00072
00073 }
00074
00075 void FUShmOutputModule::insertStreamAndDatasetInfo(edm::ParameterSet & streams, edm::ParameterSet datasets)
00076 {
00077 if (numDatasets_) return;
00078 try {
00079
00080 if (name_.size() > std::string("hltOutput").size() && name_.find("hltOutput")!=std::string::npos)
00081 streamId_=name_.substr(name_.find("hltOutput")+std::string("hltOutput").size());
00082 else return;
00083
00084
00085 if (streamId_.size()) {
00086 Strings streamDatasetList = streams.getParameter<Strings>(streamId_);
00087 for (size_t i=0;i<streamDatasetList.size();i++) {
00088 selectedDatasetNames_.push_back(streamDatasetList[i]);
00089 Strings thisDatasetPaths = datasets.getParameter<Strings>(streamDatasetList[i]);
00090 datasetPaths_.push_back(thisDatasetPaths);
00091 numDatasets_++;
00092 }
00093 }
00094 }
00095 catch (...) {
00096
00097 selectedDatasetNames_.clear();
00098 datasetPaths_.clear();
00099 numDatasets_=0;
00100 streamId_=std::string();
00101 }
00102 }
00103
00104 void FUShmOutputModule::fillDescription(ParameterSetDescription& description)
00105 {
00106 }
00107
00108
00109 void FUShmOutputModule::doOutputHeader(InitMsgBuilder const& initMessage)
00110 {
00111 unsigned char* buffer = (unsigned char*) initMessage.startAddress();
00112 unsigned int size = initMessage.size();
00113 InitMsgView dummymsg(buffer);
00114 parseDatasets(dummymsg);
00115 count_ = 0;
00116 if(!shmBuffer_) shmBuffer_ = sm_sharedmemory.getShmBuffer();
00117 if(!shmBuffer_) edm::LogError("FUShmOutputModule")
00118 << " Error getting shared memory buffer for INIT. "
00119 << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
00120 << " No INIT is sent - this is probably fatal!";
00121 if(shmBuffer_)
00122 {
00123 FDEBUG(10) << "writing out INIT message with size = " << size << std::endl;
00124
00125 uint32 dmoduleId = dummymsg.outputModuleId();
00126
00127
00128 bool ret = sm_sharedmemory.getShmBuffer()->writeRecoInitMsg(dmoduleId, getpid(), fuGuidValue_, buffer, size,nExpectedEPs_);
00129 if(!ret) edm::LogError("FUShmOutputModule") << " Error writing preamble to ShmBuffer";
00130 }
00131 }
00132
00133 void FUShmOutputModule::doOutputEvent(EventMsgBuilder const& eventMessage)
00134 {
00135 if(!shmBuffer_) edm::LogError("FUShmOutputModule")
00136 << " Invalid shared memory buffer at first event"
00137 << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
00138 << " No event is sent - this is fatal! Should throw here";
00139 else
00140 {
00141 count_++;
00142 unsigned char* buffer = (unsigned char*) eventMessage.startAddress();
00143 unsigned int size = eventMessage.size();
00144 EventMsgView eventView(eventMessage.startAddress());
00145 countEventForDatasets(eventView);
00146 unsigned int runid = eventView.run();
00147 unsigned int eventid = eventView.event();
00148 unsigned int outModId = eventView.outModId();
00149 FDEBUG(10) << "FUShmOutputModule: event size = " << size << std::endl;
00150
00151 bool ret = sm_sharedmemory.getShmBuffer()->writeRecoEventData(runid, eventid, outModId, getpid(), fuGuidValue_, buffer, size);
00152 if(!ret) edm::LogError("FUShmOutputModule") << " Error with writing data to ShmBuffer";
00153 }
00154 }
00155
00156 void FUShmOutputModule::start()
00157 {
00158
00159 shmBuffer_ = sm_sharedmemory.getShmBuffer();
00160 if(0==shmBuffer_)
00161 edm::LogError("FUShmOutputModule")<<"Failed to attach to shared memory";
00162 }
00163
00164 void FUShmOutputModule::stop()
00165 {
00166 FDEBUG(9) << "FUShmOutputModule: sending terminate run" << std::endl;
00167 if(0!=shmBuffer_){
00168 sm_sharedmemory.detachShmBuffer();
00169
00170 shmBuffer_ = 0;
00171 }
00172 }
00173
00174 void FUShmOutputModule::setNExpectedEPs(unsigned int EPs) {
00175 nExpectedEPs_ = EPs;
00176 }
00177
00178 void FUShmOutputModule::unregisterFromShm() {
00179 shmBuffer_=sm_sharedmemory.getBufferRef();
00180 if (0!=shmBuffer_) {
00181 shmBuffer_->removeClientPrcId(getpid());
00182 }
00183 }
00184
00185 void FUShmOutputModule::parseDatasets(InitMsgView const& initMessage)
00186 {
00187
00188 for (size_t i=0;i<datasetCounts_.size();i++) datasetCounts_[i]=0;
00189 if (!numDatasets_) return;
00190 if (dpEventSelectors_.size()) return;
00191 Strings allPaths;
00192 initMessage.hltTriggerNames(allPaths);
00193 totalPaths_ = allPaths.size();
00194 for (size_t i=0;i<numDatasets_;i++)
00195 {
00196 dpEventSelectors_.push_back(std::pair<std::string,
00197 edm::EventSelector*>(selectedDatasetNames_[i],new edm::EventSelector(datasetPaths_[i],allPaths)));
00198 datasetCounts_.push_back(0);
00199 }
00200 }
00201
00202 void FUShmOutputModule::countEventForDatasets(EventMsgView const& eventMessage)
00203 {
00204 if (!numDatasets_) return;
00205 uint8 hlt_out[totalPaths_];
00206 eventMessage.hltTriggerBits( hlt_out );
00207 for (size_t i=0;i<numDatasets_;i++) {
00208 if ( dpEventSelectors_[i].second->acceptEvent( hlt_out, totalPaths_)) {
00209 datasetCounts_[i]++;
00210 }
00211 }
00212 }
00213
00214 }