CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_13_patch3/src/EventFilter/Modules/src/FUShmOutputModule.cc

Go to the documentation of this file.
00001 /*
00002    Description:
00003      EDM output module that will write data to shared memory for 
00004      the resource broker to send to the Storage Manager.
00005      See the CMS EvF Storage Manager wiki page for further notes.
00006 
00007    $Id: FUShmOutputModule.cc,v 1.18 2012/10/11 17:48:11 smorovic Exp $
00008 */
00009 
00010 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00011 
00012 #include "EventFilter/Modules/src/FUShmOutputModule.h"
00013 #include "DataFormats/Provenance/interface/EventID.h"
00014 
00015 #include "FWCore/ServiceRegistry/interface/Service.h"
00016 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00017 #include "FWCore/Utilities/interface/DebugMacros.h"
00018 #include "FWCore/Utilities/interface/Exception.h"
00019 #include "FWCore/Utilities/src/Guid.h"
00020 
00021 #include "xdaq/Application.h"
00022 #include "xdaq/ApplicationContext.h"
00023 #include "xdaq/ApplicationGroup.h"
00024 #include "zlib.h"
00025 
00026 #include <string>
00027 #include <fstream>
00028 #include <iostream>
00029 
00030 using namespace edm;
00031 using namespace std;
00032 
00033 static SM_SharedMemoryHandle sm_sharedmemory;
00034 
00035 namespace edm
00036 {
00037 
00041   bool FUShmOutputModule::fuIdsInitialized_ = false;
00042   uint32 FUShmOutputModule::fuGuidValue_ = 0;
00043 
00044   FUShmOutputModule::FUShmOutputModule(edm::ParameterSet const& ps):
00045     shmBuffer_(0)
00046     , name_(ps.getParameter<std::string>( "@module_label" ))
00047     , count_(0)
00048     , nExpectedEPs_(0)
00049     , numDatasets_(0)
00050     , streamId_()
00051   {
00052     FDEBUG(9) << "FUShmOutputModule: constructor" << endl;
00053     if(edm::Service<evf::ShmOutputModuleRegistry>())
00054       edm::Service<evf::ShmOutputModuleRegistry>()->registerModule(name_, this);  
00055     if (! fuIdsInitialized_) {
00056       fuIdsInitialized_ = true;
00057 
00058       edm::Guid guidObj(true);
00059       std::string guidString = guidObj.toString();
00060 
00061       uLong crc = crc32(0L, Z_NULL, 0);
00062       Bytef* buf = (Bytef*)guidString.data();
00063       crc = crc32(crc, buf, guidString.length());
00064       fuGuidValue_ = crc;
00065     }
00066   }
00067 
00068   FUShmOutputModule::~FUShmOutputModule()
00069   {
00070     FDEBUG(9) << "FUShmOutputModule: FUShmOutputModule destructor" << endl;
00071     sm_sharedmemory.detachShmBuffer();
00072     //shmdt(shmBuffer_);
00073   }
00074 
00075   void FUShmOutputModule::insertStreamAndDatasetInfo(edm::ParameterSet & streams, edm::ParameterSet datasets)
00076   {
00077     if (numDatasets_) return;
00078     try {
00079       //compose dataset name string
00080       if (name_.size() > std::string("hltOutput").size() && name_.find("hltOutput")!=std::string::npos)
00081         streamId_=name_.substr(name_.find("hltOutput")+std::string("hltOutput").size());
00082       else return;
00083 
00084       //make local copy of dataset definitions
00085       if (streamId_.size()) {
00086         Strings streamDatasetList = streams.getParameter<Strings>(streamId_);
00087         for (size_t i=0;i<streamDatasetList.size();i++) {
00088           selectedDatasetNames_.push_back(streamDatasetList[i]);
00089           Strings thisDatasetPaths = datasets.getParameter<Strings>(streamDatasetList[i]); 
00090           datasetPaths_.push_back(thisDatasetPaths);
00091           numDatasets_++;
00092         }
00093       }
00094     }
00095     catch (...) {
00096       //not present:ignore
00097       selectedDatasetNames_.clear();
00098       datasetPaths_.clear();
00099       numDatasets_=0;
00100       streamId_=std::string();
00101     }
00102   }
00103 
00104   void FUShmOutputModule::fillDescription(ParameterSetDescription& description)
00105   {
00106   }
00107 
00108 
00109   void FUShmOutputModule::doOutputHeader(InitMsgBuilder const& initMessage)
00110   {
00111     unsigned char* buffer = (unsigned char*) initMessage.startAddress();
00112     unsigned int size = initMessage.size();
00113     InitMsgView dummymsg(buffer);
00114     parseDatasets(dummymsg);
00115     count_ = 0;
00116     if(!shmBuffer_) shmBuffer_ = sm_sharedmemory.getShmBuffer();
00117     if(!shmBuffer_) edm::LogError("FUShmOutputModule") 
00118       << " Error getting shared memory buffer for INIT. " 
00119       << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
00120       << " No INIT is sent - this is probably fatal!";
00121     if(shmBuffer_)
00122     {
00123       FDEBUG(10) << "writing out INIT message with size = " << size << std::endl;
00124       // no method in InitMsgBuilder to get the output module id, recast
00125       uint32 dmoduleId = dummymsg.outputModuleId();
00126 
00127       //bool ret = shmBuffer_->writeRecoInitMsg(dmoduleId, buffer, size);
00128       bool ret = sm_sharedmemory.getShmBuffer()->writeRecoInitMsg(dmoduleId, getpid(), fuGuidValue_, buffer, size,nExpectedEPs_);
00129       if(!ret) edm::LogError("FUShmOutputModule") << " Error writing preamble to ShmBuffer";
00130     }
00131   }
00132 
00133   void FUShmOutputModule::doOutputEvent(EventMsgBuilder const& eventMessage)
00134   {
00135     if(!shmBuffer_) edm::LogError("FUShmOutputModule") 
00136       << " Invalid shared memory buffer at first event"
00137       << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
00138       << " No event is sent - this is fatal! Should throw here";
00139     else
00140     {
00141       count_++;
00142       unsigned char* buffer = (unsigned char*) eventMessage.startAddress();
00143       unsigned int size = eventMessage.size();
00144       EventMsgView eventView(eventMessage.startAddress());
00145       countEventForDatasets(eventView);
00146       unsigned int runid = eventView.run();
00147       unsigned int eventid = eventView.event();
00148       unsigned int outModId = eventView.outModId();
00149       FDEBUG(10) << "FUShmOutputModule: event size = " << size << std::endl;
00150       //bool ret = shmBuffer_->writeRecoEventData(runid, eventid, outModId, buffer, size);
00151       bool ret = sm_sharedmemory.getShmBuffer()->writeRecoEventData(runid, eventid, outModId, getpid(), fuGuidValue_, buffer, size);
00152       if(!ret) edm::LogError("FUShmOutputModule") << " Error with writing data to ShmBuffer";
00153     }
00154   }
00155 
00156   void FUShmOutputModule::start()
00157   {
00158     //shmBuffer_ = evf::FUShmBuffer::getShmBuffer();
00159     shmBuffer_ = sm_sharedmemory.getShmBuffer();
00160     if(0==shmBuffer_) 
00161       edm::LogError("FUShmOutputModule")<<"Failed to attach to shared memory";
00162   }
00163 
00164   void FUShmOutputModule::stop()
00165   {
00166     FDEBUG(9) << "FUShmOutputModule: sending terminate run" << std::endl;
00167     if(0!=shmBuffer_){
00168       sm_sharedmemory.detachShmBuffer();
00169       //shmdt(shmBuffer_);
00170       shmBuffer_ = 0;
00171     }
00172   }
00173 
00174   void FUShmOutputModule::setNExpectedEPs(unsigned int EPs) {
00175     nExpectedEPs_ = EPs;
00176   }
00177 
00178   void FUShmOutputModule::unregisterFromShm() {
00179     shmBuffer_=sm_sharedmemory.getBufferRef();
00180     if (0!=shmBuffer_) {
00181       shmBuffer_->removeClientPrcId(getpid());
00182     }
00183   }
00184 
00185   void FUShmOutputModule::parseDatasets(InitMsgView const& initMessage)
00186   {
00187      //reset counter
00188      for (size_t i=0;i<datasetCounts_.size();i++) datasetCounts_[i]=0;
00189      if (!numDatasets_) return;
00190      if (dpEventSelectors_.size()) return;
00191      Strings allPaths;
00192      initMessage.hltTriggerNames(allPaths);
00193      totalPaths_ = allPaths.size();
00194      for (size_t i=0;i<numDatasets_;i++)
00195      {
00196        dpEventSelectors_.push_back(std::pair<std::string,
00197          edm::EventSelector*>(selectedDatasetNames_[i],new edm::EventSelector(datasetPaths_[i],allPaths))); 
00198        datasetCounts_.push_back(0);
00199      }
00200   }
00201 
00202   void FUShmOutputModule::countEventForDatasets(EventMsgView const& eventMessage)
00203   {
00204     if (!numDatasets_) return;
00205     uint8 hlt_out[totalPaths_];
00206     eventMessage.hltTriggerBits( hlt_out );
00207     for (size_t i=0;i<numDatasets_;i++) {
00208       if ( dpEventSelectors_[i].second->acceptEvent( hlt_out, totalPaths_)) {
00209         datasetCounts_[i]++;
00210       }
00211     }
00212   }
00213 
00214 }