CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
IPCMethod.cc
Go to the documentation of this file.
1 //
3 // IPCMethod.cc
4 // -------
5 //
6 // Contains common functionality for FUResourceTable and FUResourceQueue.
7 //
8 // Created on: Oct 26, 2011
9 // Andrei Spataru : aspataru@cern.ch
11 
14 
15 #include "interface/evb/i2oEVBMsgs.h"
16 
17 #include <iomanip>
18 
19 using std::ofstream;
20 using std::endl;
21 using namespace evf;
22 
24 // construction/destruction
26 
27 //______________________________________________________________________________
28 IPCMethod::IPCMethod(bool segmentationMode, UInt_t nbRawCells,
29  UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
30  UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu,
31  SMProxy *sm, log4cplus::Logger logger, unsigned int timeout,
32  EvffedFillerRB *frb, xdaq::Application*app) throw (evf::Exception) :
33  bu_(bu), sm_(sm), log_(logger), nbDqmCells_(nbDqmCells),
34  nbRawCells_(nbRawCells), nbRecoCells_(nbRecoCells),
35  acceptSMDataDiscard_(0), acceptSMDqmDiscard_(0), doCrcCheck_(1),
36  shutdownTimeout_(timeout), nbPending_(0), nbClientsToShutDown_(0),
37  isReadyToShutDown_(true), isActive_(false), runNumber_(0xffffffff),
38  frb_(frb), app_(app) {
39 
40  // if the freeResRequiredForAllocate_ threshold is set in configuration use that
41  // otherwise use nbRawCells / 2
42  if (freeResReq < 0)
43  freeResRequiredForAllocate_ = nbRawCells_ / 2;
44  else
45  freeResRequiredForAllocate_ = freeResReq;
46 
47  sem_init(&lock_, 0, 1);
48  //pthread_mutex_init(&crashHandlerLock_, NULL);
49 }
50 
52 
53 }
54 
56 // implementation of member functions
58 
59 //______________________________________________________________________________
61  assert(!freeResourceIds_.empty());
62 
63  lock();
64  UInt_t fuResourceId = freeResourceIds_.front();
65  freeResourceIds_.pop();
66  nbPending_++;
67  nbAllocated_++;
68  unlock();
69 
70  return fuResourceId;
71 }
72 
73 //______________________________________________________________________________
75  std::ostringstream oss;
76  oss << "/tmp/evt" << cell->evtNumber() << ".dump";
77  ofstream fout(oss.str().c_str());
78  fout.fill('0');
79 
80  fout << "#\n# evt " << cell->evtNumber() << "\n#\n" << endl;
81  for (unsigned int i = 0; i < cell->nFed(); i++) {
82  if (cell->fedSize(i) == 0)
83  continue;
84  fout << "# fedid " << i << endl;
85  unsigned char* addr = cell->fedAddr(i);
86  for (unsigned int j = 0; j < cell->fedSize(i); j++) {
87  fout << std::setiosflags(std::ios::right) << std::setw(2)
88  << std::hex << (int) (*addr) << std::dec;
89  if ((j + 1) % 8)
90  fout << " ";
91  else
92  fout << endl;
93  ++addr;
94  }
95  fout << endl;
96  }
97  fout.close();
98 }
99 
100 //______________________________________________________________________________
102  std::string s = "Status not implemented";
103  return s;
104 }
105 
107 // implementation of private member functions
109 
110 //______________________________________________________________________________
112  UInt_t nbFreeSlots = this->nbFreeSlots();
113  /*UInt_t nbFreeSlotsMax = 0*///reverting to larger chunk requests for BU
114  //UInt_t nbFreeSlotsMax = nbResources() / 2;
115  UInt_t nbFreeSlotsMax = freeResRequiredForAllocate_;
116 
117  if (nbFreeSlots > nbFreeSlotsMax) {
118  UIntVec_t fuResourceIds;
119  for (UInt_t i = 0; i < nbFreeSlots; i++)
120  fuResourceIds.push_back(allocateResource());
121 
122  bu_->sendAllocate(fuResourceIds);
123 
124  nbAllocSent_++;
125  }
126 }
127 
128 //______________________________________________________________________________
130  if (freeResourceIds_.size() < nbRawCells_) {
131  LOG4CPLUS_INFO(
132  log_,
133  "There are " << nbRawCells_ - freeResourceIds_.size()
134  << " pending ALLOCATE messages! Forgetting...");
135  while (!freeResourceIds_.empty())
136  freeResourceIds_.pop();
137  for (UInt_t i = 0; i < nbRawCells_; i++)
138  freeResourceIds_.push(i);
139  }
140 }
141 
142 //______________________________________________________________________________
144  for (UInt_t i = 0; i < resources_.size(); i++)
145  resources_[i]->release(true);
146 
147 }
148 
149 //______________________________________________________________________________
150 void IPCMethod::sendDiscard(UInt_t buResourceId) {
151  bu_->sendDiscard(buResourceId);
152  nbDiscarded_++;
153 }
154 
155 //______________________________________________________________________________
156 void IPCMethod::sendInitMessage(UInt_t fuResourceId, UInt_t outModId,
157  UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize,
158  UInt_t nExpectedEPs) {
159  if (0 == sm_) {
160  LOG4CPLUS_ERROR(log_, "No StorageManager, DROP INIT MESSAGE!");
161  } else {
162  acceptSMDataDiscard_[fuResourceId] = true;
163  UInt_t nbBytes = sm_->sendInitMessage(fuResourceId, outModId,
164  fuProcessId, fuGuid, data, dataSize, nExpectedEPs);
165  sumOfSquares_ += (uint64_t) nbBytes * (uint64_t) nbBytes;
166  sumOfSizes_ += nbBytes;
167  }
168 }
169 
170 //______________________________________________________________________________
172  UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid,
173  UChar_t *data, UInt_t dataSize) {
174  if (0 == sm_) {
175  LOG4CPLUS_ERROR(log_, "No StorageManager, DROP DATA EVENT!");
176  } else {
177  acceptSMDataDiscard_[fuResourceId] = true;
178  UInt_t nbBytes = sm_->sendDataEvent(fuResourceId, runNumber, evtNumber,
179  outModId, fuProcessId, fuGuid, data, dataSize);
180  sumOfSquares_ += (uint64_t) nbBytes * (uint64_t) nbBytes;
181  sumOfSizes_ += nbBytes;
182  }
183 }
184 
185 //______________________________________________________________________________
187  UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data,
188  UInt_t dataSize) {
189  if (0 == sm_) {
190  LOG4CPLUS_ERROR(log_, "No StorageManager, DROP ERROR EVENT!");
191  } else {
192  acceptSMDataDiscard_[fuResourceId] = true;
193  UInt_t nbBytes = sm_->sendErrorEvent(fuResourceId, runNumber,
194  evtNumber, fuProcessId, fuGuid, data, dataSize);
195  sumOfSquares_ += (uint64_t) nbBytes * (uint64_t) nbBytes;
196  sumOfSizes_ += nbBytes;
197  }
198 
199  // if (0!=shmBuffer_) {
200  // UInt_t n=nbDqmCells_;
201 
202  // for (UInt_t i=0;i<n;i++) {
203  // if(shmBuffer_->dqmCell(i)->fuProcessId()==fuProcessId)
204  // {
205  // if(shmBuffer_->dqmState(i)!=dqm::SENT){
206  // shmBuffer_->setDqmState(i,dqm::SENT);
207  // shmBuffer_->discardDqmCell(i);
208  // acceptSMDqmDiscard_[i] = false;
209  // }
210  // }
211  // }
212  // n=nbRecoCells_;
213  // for (UInt_t i=0;i<n;i++) {
214  // if(shmBuffer_->recoCell(i)->fuProcessId()==fuProcessId)
215  // {
216  // shmBuffer_->discardOrphanedRecoCell(i);
217  // }
218  // }
219 
220  // }
221 }
222 
223 //______________________________________________________________________________
225  UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid,
226  UChar_t* data, UInt_t dataSize) {
227  if (0 == sm_) {
228  LOG4CPLUS_WARN(log_, "No StorageManager, DROP DQM EVENT.");
229  } else {
230 
232 
233  acceptSMDqmDiscard_[fuDqmId]++;
234  if (acceptSMDqmDiscard_[fuDqmId] > 1)
235  LOG4CPLUS_WARN(
236  log_,
237  "DQM Cell " << fuDqmId
238  << " being sent more than once for folder "
239  << folderId << " process " << fuProcessId
240  << " guid " << fuGuid);
241  nbSentDqm_++;
242  sm_->sendDqmEvent(fuDqmId, runNumber, evtAtUpdate, folderId,
243  fuProcessId, fuGuid, data, dataSize);
244  }
245 }
246 
247 //______________________________________________________________________________
249  while (0 != bufRef->getNextReference())
250  bufRef = bufRef->getNextReference();
251 
252  I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =
253  (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();
254 
255  UInt_t iBlock = block->blockNb;
256  UInt_t nBlock = block->nbBlocksInSuperFragment;
257  UInt_t iSuperFrag = block->superFragmentNb;
258  UInt_t nSuperFrag = block->nbSuperFragmentsInEvent;
259 
260  return ((iSuperFrag == nSuperFrag - 1) && (iBlock == nBlock - 1));
261 }
262 
263 //______________________________________________________________________________
265  for (UInt_t i = 0; i < resources_.size(); i++) {
266  resources_[i]->scheduleCRCError();
267  }
268 }
static const char runNumber_[]
int i
Definition: DBlmapReader.cc:9
void sendDiscard(UInt_t buResourceId)
Definition: IPCMethod.cc:150
bool * acceptSMDataDiscard_
Definition: IPCMethod.h:352
uint64_t sumOfSquares_
Definition: IPCMethod.h:381
void releaseResources()
releases all FUResource&#39;s
Definition: IPCMethod.cc:143
void sendDiscard(UInt_t buResourceId)
Definition: BUProxy.cc:108
UInt_t nbDiscarded_
Definition: IPCMethod.h:367
void sendAllocate()
Definition: IPCMethod.cc:111
void sendAllocate(const UIntVec_t &fuResourceIds)
Definition: BUProxy.cc:51
std::ostream & logger()
Definition: fwLog.cc:41
virtual ~IPCMethod()
Definition: IPCMethod.cc:51
toolbox::mem::Reference MemRef_t
Definition: FUTypes.h:10
bool isLastMessageOfEvent(MemRef_t *bufRef)
Definition: IPCMethod.cc:248
log4cplus::Logger log_
Definition: IPCMethod.h:342
SMProxy * sm_
Definition: IPCMethod.h:340
UInt_t nbRawCells_
Definition: IPCMethod.h:345
UInt_t nbSentDqm_
Definition: IPCMethod.h:364
virtual std::string printStatus()
Definition: IPCMethod.cc:101
unsigned int evtNumber() const
Definition: FUShmRawCell.h:28
void resetPendingAllocates()
resets free resources to the maximum number
Definition: IPCMethod.cc:129
void dumpEvent(evf::FUShmRawCell *cell)
Definition: IPCMethod.cc:74
IPCMethod(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int timeout, EvffedFillerRB *frb, xdaq::Application *app)
Definition: IPCMethod.cc:28
UInt_t sendDqmEvent(UInt_t fuDqmId, UInt_t runNumber, UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: SMProxy.cc:153
UInt_t nbPending_
Definition: IPCMethod.h:360
UInt_t allocateResource()
Definition: IPCMethod.cc:60
unsigned char UChar_t
Definition: FUTypes.h:14
void sendDqmEvent(UInt_t fuDqmId, UInt_t runNumber, UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:224
int j
Definition: DBlmapReader.cc:9
int * acceptSMDqmDiscard_
Definition: IPCMethod.h:353
FUResourceVec_t resources_
Definition: IPCMethod.h:393
unsigned int freeResRequiredForAllocate_
Definition: IPCMethod.h:350
UInt_t sendErrorEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: SMProxy.cc:122
void injectCRCError()
Definition: IPCMethod.cc:264
block
Formating index page&#39;s pieces.
Definition: Association.py:232
unsigned char * fedAddr(unsigned int i) const
Definition: FUShmRawCell.cc:94
UInt_t nbFreeSlots() const
Definition: IPCMethod.h:204
unsigned int UInt_t
Definition: FUTypes.h:12
unsigned long long uint64_t
Definition: Time.h:15
UInt_t sendDataEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: SMProxy.cc:91
std::atomic< int > nbPendingSMDqmDiscards_
Definition: IPCMethod.h:366
void sendErrorEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:186
UInt_t nbAllocSent_
Definition: IPCMethod.h:379
UInt_t sumOfSizes_
Definition: IPCMethod.h:382
unsigned int nFed() const
Definition: FUShmRawCell.h:33
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
void sendDataEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:171
UInt_t nbAllocated_
Definition: IPCMethod.h:359
std::queue< UInt_t > freeResourceIds_
Definition: IPCMethod.h:348
void unlock()
Definition: IPCMethod.h:314
unsigned int fedSize(unsigned int i) const
Definition: FUShmRawCell.cc:82
void sendInitMessage(UInt_t fuResourceId, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize, UInt_t nExpectedEPs)
Definition: IPCMethod.cc:156
UInt_t sendInitMessage(UInt_t fuResourceId, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize, UInt_t nExpectedEPs)
Definition: SMProxy.cc:62
BUProxy * bu_
Definition: IPCMethod.h:339
std::vector< UInt_t > UIntVec_t
Definition: FUTypes.h:15