CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FUShmOutputModule.cc
Go to the documentation of this file.
1 /*
2  Description:
3  EDM output module that will write data to shared memory for
4  the resource broker to send to the Storage Manager.
5  See the CMS EvF Storage Manager wiki page for further notes.
6 
7  $Id: FUShmOutputModule.cc,v 1.18 2012/10/11 17:48:11 smorovic Exp $
8 */
9 
11 
14 
20 
21 #include "xdaq/Application.h"
22 #include "xdaq/ApplicationContext.h"
23 #include "xdaq/ApplicationGroup.h"
24 #include "zlib.h"
25 
26 #include <string>
27 #include <fstream>
28 #include <iostream>
29 
30 using namespace edm;
31 using namespace std;
32 
34 
35 namespace edm
36 {
37 
43 
45  shmBuffer_(0)
46  , name_(ps.getParameter<std::string>( "@module_label" ))
47  , count_(0)
48  , nExpectedEPs_(0)
49  , numDatasets_(0)
50  , streamId_()
51  {
52  FDEBUG(9) << "FUShmOutputModule: constructor" << endl;
54  edm::Service<evf::ShmOutputModuleRegistry>()->registerModule(name_, this);
55  if (! fuIdsInitialized_) {
56  fuIdsInitialized_ = true;
57 
58  edm::Guid guidObj(true);
59  std::string guidString = guidObj.toString();
60 
61  uLong crc = crc32(0L, Z_NULL, 0);
62  Bytef* buf = (Bytef*)guidString.data();
63  crc = crc32(crc, buf, guidString.length());
64  fuGuidValue_ = crc;
65  }
66  }
67 
69  {
70  FDEBUG(9) << "FUShmOutputModule: FUShmOutputModule destructor" << endl;
72  //shmdt(shmBuffer_);
73  }
74 
76  {
77  if (numDatasets_) return;
78  try {
79  //compose dataset name string
80  if (name_.size() > std::string("hltOutput").size() && name_.find("hltOutput")!=std::string::npos)
81  streamId_=name_.substr(name_.find("hltOutput")+std::string("hltOutput").size());
82  else return;
83 
84  //make local copy of dataset definitions
85  if (streamId_.size()) {
86  Strings streamDatasetList = streams.getParameter<Strings>(streamId_);
87  for (size_t i=0;i<streamDatasetList.size();i++) {
88  selectedDatasetNames_.push_back(streamDatasetList[i]);
89  Strings thisDatasetPaths = datasets.getParameter<Strings>(streamDatasetList[i]);
90  datasetPaths_.push_back(thisDatasetPaths);
91  numDatasets_++;
92  }
93  }
94  }
95  catch (...) {
96  //not present:ignore
97  selectedDatasetNames_.clear();
98  datasetPaths_.clear();
99  numDatasets_=0;
101  }
102  }
103 
105  {
106  }
107 
108 
110  {
111  unsigned char* buffer = (unsigned char*) initMessage.startAddress();
112  unsigned int size = initMessage.size();
113  InitMsgView dummymsg(buffer);
114  parseDatasets(dummymsg);
115  count_ = 0;
117  if(!shmBuffer_) edm::LogError("FUShmOutputModule")
118  << " Error getting shared memory buffer for INIT. "
119  << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
120  << " No INIT is sent - this is probably fatal!";
121  if(shmBuffer_)
122  {
123  FDEBUG(10) << "writing out INIT message with size = " << size << std::endl;
124  // no method in InitMsgBuilder to get the output module id, recast
125  uint32 dmoduleId = dummymsg.outputModuleId();
126 
127  //bool ret = shmBuffer_->writeRecoInitMsg(dmoduleId, buffer, size);
128  bool ret = sm_sharedmemory.getShmBuffer()->writeRecoInitMsg(dmoduleId, getpid(), fuGuidValue_, buffer, size,nExpectedEPs_);
129  if(!ret) edm::LogError("FUShmOutputModule") << " Error writing preamble to ShmBuffer";
130  }
131  }
132 
134  {
135  if(!shmBuffer_) edm::LogError("FUShmOutputModule")
136  << " Invalid shared memory buffer at first event"
137  << " Make sure you configure the ResourceBroker before the FUEventProcessor! "
138  << " No event is sent - this is fatal! Should throw here";
139  else
140  {
141  count_++;
142  unsigned char* buffer = (unsigned char*) eventMessage.startAddress();
143  unsigned int size = eventMessage.size();
144  EventMsgView eventView(eventMessage.startAddress());
145  countEventForDatasets(eventView);
146  unsigned int runid = eventView.run();
147  unsigned int eventid = eventView.event();
148  unsigned int outModId = eventView.outModId();
149  FDEBUG(10) << "FUShmOutputModule: event size = " << size << std::endl;
150  //bool ret = shmBuffer_->writeRecoEventData(runid, eventid, outModId, buffer, size);
151  bool ret = sm_sharedmemory.getShmBuffer()->writeRecoEventData(runid, eventid, outModId, getpid(), fuGuidValue_, buffer, size);
152  if(!ret) edm::LogError("FUShmOutputModule") << " Error with writing data to ShmBuffer";
153  }
154  }
155 
157  {
158  //shmBuffer_ = evf::FUShmBuffer::getShmBuffer();
160  if(0==shmBuffer_)
161  edm::LogError("FUShmOutputModule")<<"Failed to attach to shared memory";
162  }
163 
165  {
166  FDEBUG(9) << "FUShmOutputModule: sending terminate run" << std::endl;
167  if(0!=shmBuffer_){
169  //shmdt(shmBuffer_);
170  shmBuffer_ = 0;
171  }
172  }
173 
174  void FUShmOutputModule::setNExpectedEPs(unsigned int EPs) {
175  nExpectedEPs_ = EPs;
176  }
177 
180  if (0!=shmBuffer_) {
181  shmBuffer_->removeClientPrcId(getpid());
182  }
183  }
184 
186  {
187  //reset counter
188  for (size_t i=0;i<datasetCounts_.size();i++) datasetCounts_[i]=0;
189  if (!numDatasets_) return;
190  if (dpEventSelectors_.size()) return;
191  Strings allPaths;
192  initMessage.hltTriggerNames(allPaths);
193  totalPaths_ = allPaths.size();
194  for (size_t i=0;i<numDatasets_;i++)
195  {
196  dpEventSelectors_.push_back(std::pair<std::string,
198  datasetCounts_.push_back(0);
199  }
200  }
201 
203  {
204  if (!numDatasets_) return;
205  uint8 hlt_out[totalPaths_];
206  eventMessage.hltTriggerBits( hlt_out );
207  for (size_t i=0;i<numDatasets_;i++) {
208  if ( dpEventSelectors_[i].second->acceptEvent( hlt_out, totalPaths_)) {
209  datasetCounts_[i]++;
210  }
211  }
212  }
213 
214 }
T getParameter(std::string const &) const
int i
Definition: DBlmapReader.cc:9
FUShmOutputModule(edm::ParameterSet const &ps)
bool writeRecoEventData(unsigned int runNumber, unsigned int evtNumber, unsigned int outModId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
Definition: FUShmBuffer.cc:714
std::vector< std::string > selectedDatasetNames_
void hltTriggerBits(uint8 *put_here) const
std::vector< std::string > Strings
Definition: MsgTools.h:18
uint8 * startAddress() const
std::vector< unsigned int > datasetCounts_
void insertStreamAndDatasetInfo(edm::ParameterSet &streams, edm::ParameterSet datasets)
bool removeClientPrcId(pid_t prcId)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void setNExpectedEPs(unsigned int EPs)
bool writeRecoInitMsg(unsigned int outModId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize, unsigned int nExpectedEPs)
Definition: FUShmBuffer.cc:693
void hltTriggerNames(Strings &save_here) const
Definition: InitMessage.cc:142
void doOutputHeader(InitMsgBuilder const &initMessage)
std::vector< Strings > datasetPaths_
std::string const toString() const
Automatic conversion from string reprentation.
Definition: Guid.cc:41
U second(std::pair< T, U > const &p)
Definition: Guid.h:23
uint32 size() const
evf::FUShmBuffer * getShmBuffer()
unsigned int uint32
Definition: MsgTools.h:13
tuple description
Definition: idDealer.py:66
evf::FUShmBuffer * getBufferRef()
std::vector< std::pair< std::string, edm::EventSelector * > > dpEventSelectors_
static SM_SharedMemoryHandle sm_sharedmemory
unsigned char uint8
Definition: MsgTools.h:11
uint32 outputModuleId() const
Definition: InitMessage.h:76
void doOutputEvent(EventMsgBuilder const &eventMessage)
void countEventForDatasets(EventMsgView const &eventMessage)
uint8 * startAddress() const
tuple size
Write out results.
void parseDatasets(InitMsgView const &initMessage)
evf::FUShmBuffer * shmBuffer_
uint32 size() const
static void fillDescription(ParameterSetDescription &)