CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FUResourceTable.cc
Go to the documentation of this file.
1 //
3 // FUResourceTable
4 // ---------------
5 //
6 // 12/10/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
7 // 20/01/2012 Andrei Spataru <aspataru@cern.ch>
9 
11 #include "EvffedFillerRB.h"
12 
13 #include "interface/evb/i2oEVBMsgs.h"
14 #include "xcept/tools.h"
15 
16 #include <sys/types.h>
17 #include <signal.h>
18 
19 #ifdef linux
20 #include <thread>
21 #endif
22 //#define DEBUG_RES_TAB
23 
24 using namespace evf;
25 using namespace std;
27 // construction/destruction
29 
30 //______________________________________________________________________________
31 FUResourceTable::FUResourceTable(bool segmentationMode, UInt_t nbRawCells,
32  UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
33  UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu,
34  SMProxy *sm, log4cplus::Logger logger, unsigned int timeout,
35  EvffedFillerRB *frb, xdaq::Application*app) throw (evf::Exception) :
36 
37  // call super constructor
38  IPCMethod(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
39  rawCellSize, recoCellSize, dqmCellSize, freeResReq, bu, sm,
40  logger, timeout, frb, app), shmBuffer_(0)
41 
42 {
43  initialize(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
44  rawCellSize, recoCellSize, dqmCellSize);
45 }
46 
47 //______________________________________________________________________________
49  clear();
50  //workloop cancels used to be here in the previous version
51  shmdt( shmBuffer_);
53  LOG4CPLUS_INFO(log_, "SHARED MEMORY SUCCESSFULLY RELEASED.");
54  if (0 != acceptSMDataDiscard_)
55  delete[] acceptSMDataDiscard_;
56  if (0 != acceptSMDqmDiscard_)
57  delete[] acceptSMDqmDiscard_;
58 }
59 
61 // implementation of member functions
63 
64 //______________________________________________________________________________
65 void FUResourceTable::initialize(bool segmentationMode, UInt_t nbRawCells,
66  UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
67  UInt_t recoCellSize, UInt_t dqmCellSize) throw (evf::Exception) {
68  clear();
69 
70  shmBuffer_ = FUShmBuffer::createShmBuffer(segmentationMode, nbRawCells,
71  nbRecoCells, nbDqmCells, rawCellSize, recoCellSize, dqmCellSize);
72  if (0 == shmBuffer_) {
73  string msg = "CREATION OF SHARED MEMORY SEGMENT FAILED!";
74  LOG4CPLUS_FATAL(log_, msg);
75  XCEPT_RAISE(evf::Exception, msg);
76  }
77 
78  for (UInt_t i = 0; i < nbRawCells_; i++) {
79  FUResource* newResource = new FUResource(i, log_, frb_, app_);
80  newResource->release(true);
81  resources_.push_back(newResource);
82  freeResourceIds_.push(i);
83  }
84 
85  acceptSMDataDiscard_ = new bool[nbRecoCells];
86  acceptSMDqmDiscard_ = new int[nbDqmCells];
87 
88  resetCounters();
89  stopFlag_=false;
90 }
91 
92 //______________________________________________________________________________
94  bool reschedule = true;
95  FUShmRecoCell* cell = 0;
96  try {
97  cell = shmBuffer_->recoCellToRead();
98  } catch (evf::Exception& e) {
99  rethrowShmBufferException(e, "FUResourceTable:sendData:recoCellToRead");
100  }
101 
102  if (0 == cell->eventSize()) {
103  LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
104  UInt_t cellIndex = cell->index();
105  try {
106  shmBuffer_->finishReadingRecoCell(cell);
107  shmBuffer_->discardRecoCell(cellIndex);
108  } catch (evf::Exception& e) {
109  rethrowShmBufferException(e,
110  "FUResourceTable:sendData:finishReadingRecoCell/discardRecoCell");
111  }
112  shutdownStatus_|=1<<7;
113  reschedule = false;
114  } else {
115  try {
116  if (cell->type() == 0) {
117  UInt_t cellIndex = cell->index();
118  UInt_t cellOutModId = cell->outModId();
119  UInt_t cellFUProcId = cell->fuProcessId();
120  UInt_t cellFUGuid = cell->fuGuid();
121  UChar_t* cellPayloadAddr = cell->payloadAddr();
122  UInt_t cellEventSize = cell->eventSize();
123  UInt_t cellExpectedEPs = cell->nExpectedEPs();
124  try {
125  shmBuffer_->finishReadingRecoCell(cell);
126  } catch (evf::Exception& e) {
127  rethrowShmBufferException(e,
128  "FUResourceTable:sendData:finishReadingRecoCell");
129  }
130 
131  lock();
132  nbPendingSMDiscards_++;
133  unlock();
134 
135  sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
136  cellFUGuid, cellPayloadAddr, cellEventSize,
137  cellExpectedEPs);
138  } else if (cell->type() == 1) {
139  UInt_t cellIndex = cell->index();
140  UInt_t cellRawIndex = cell->rawCellIndex();
141  UInt_t cellRunNumber = cell->runNumber();
142  UInt_t cellEvtNumber = cell->evtNumber();
143  UInt_t cellOutModId = cell->outModId();
144  UInt_t cellFUProcId = cell->fuProcessId();
145  UInt_t cellFUGuid = cell->fuGuid();
146  UChar_t *cellPayloadAddr = cell->payloadAddr();
147  UInt_t cellEventSize = cell->eventSize();
148  try {
149  shmBuffer_->finishReadingRecoCell(cell);
150  } catch (evf::Exception& e) {
151  rethrowShmBufferException(e,
152  "FUResourceTable:sendData:finishReadingRecoCell");
153  }
154 
155  lock();
156  nbPendingSMDiscards_++;
157  resources_[cellRawIndex]->incNbSent();
158  if (resources_[cellRawIndex]->nbSent() == 1)
159  nbSent_++;
160  unlock();
161 
162  sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
163  cellOutModId, cellFUProcId, cellFUGuid,
164  cellPayloadAddr, cellEventSize);
165  } else if (cell->type() == 2) {
166  UInt_t cellIndex = cell->index();
167  UInt_t cellRawIndex = cell->rawCellIndex();
168  //UInt_t cellRunNumber = cell->runNumber();
169  UInt_t cellEvtNumber = cell->evtNumber();
170  UInt_t cellFUProcId = cell->fuProcessId();
171  UInt_t cellFUGuid = cell->fuGuid();
172  UChar_t *cellPayloadAddr = cell->payloadAddr();
173  UInt_t cellEventSize = cell->eventSize();
174  try {
175  shmBuffer_->finishReadingRecoCell(cell);
176  } catch (evf::Exception& e) {
177  rethrowShmBufferException(e,
178  "FUResourceTable:sendData:recoCellToRead");
179  }
180 
181  lock();
182  nbPendingSMDiscards_++;
183  resources_[cellRawIndex]->incNbSent();
184  if (resources_[cellRawIndex]->nbSent() == 1) {
185  nbSent_++;
186  nbSentError_++;
187  }
188  unlock();
189 
190  sendErrorEvent(cellIndex, runNumber_, cellEvtNumber,
191  cellFUProcId, cellFUGuid, cellPayloadAddr,
192  cellEventSize);
193  } else {
194  string errmsg =
195  "Unknown RecoCell type (neither INIT/DATA/ERROR).";
196  XCEPT_RAISE(evf::Exception, errmsg);
197  }
198  } catch (xcept::Exception& e) {
199  LOG4CPLUS_FATAL(
200  log_,
201  "Failed to send EVENT DATA to StorageManager: "
202  << xcept::stdformat_exception_history(e));
203  reschedule = false;
204  }
205  }
206 
207  sDataActive_=reschedule;
208  return reschedule;
209 }
210 
211 //______________________________________________________________________________
213  bool reschedule = true;
214  FUShmRecoCell* cell = 0;
215  try {
216  cell = shmBuffer_->recoCellToRead();
217  } catch (evf::Exception& e) {
218  rethrowShmBufferException(e,
219  "FUResourceTable:sendDataWhileHalting:recoCellToRead");
220  }
221 
222  if (0 == cell->eventSize()) {
223  LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
224  UInt_t cellIndex = cell->index();
225  try {
226  shmBuffer_->finishReadingRecoCell(cell);
227  shmBuffer_->discardRecoCell(cellIndex);
228  } catch (evf::Exception& e) {
229  rethrowShmBufferException(e,
230  "FUResourceTable:sendDataWhileHalting:finishReadingRecoCell/discardRecoCell");
231  }
232  shutdownStatus_|=1<<8;
233  reschedule = false;
234  } else {
235  LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell.");
236  UInt_t cellIndex = cell->index();
237  try {
238  shmBuffer_->finishReadingRecoCell(cell);
239  shmBuffer_->discardRecoCell(cellIndex);
240  } catch (evf::Exception& e) {
241  rethrowShmBufferException(e,
242  "FUResourceTable:sendDataWhileHalting:finishReadingRecoCell/discardRecoCell");
243  }
244  }
245 
246  sDataActive_=reschedule;
247  return reschedule;
248 }
249 
250 //______________________________________________________________________________
252  bool reschedule = true;
253  FUShmDqmCell* cell = 0;
254  // initialize to a value to avoid warnings
256  try {
257  cell = shmBuffer_->dqmCellToRead();
258  state = shmBuffer_->dqmState(cell->index());
259  } catch (evf::Exception& e) {
260  rethrowShmBufferException(e,
261  "FUResourceTable:sendDqm:dqmCellToRead/dqmState");
262  }
263 
264  if (state == dqm::EMPTY) {
265  LOG4CPLUS_INFO(log_, "Don't reschedule sendDqm workloop.");
266  std::cout << "shut down dqm workloop " << std::endl;
267  UInt_t cellIndex = cell->index();
268  try {
269  shmBuffer_->finishReadingDqmCell(cell);
270  shmBuffer_->discardDqmCell(cellIndex);
271  } catch (evf::Exception& e) {
272  rethrowShmBufferException(e,
273  "FUResourceTable:sendDqm:finishReadingDqmCell/discardDqmCell");
274  }
275  shutdownStatus_|=1<<9;
276  reschedule = false;
277  } else {
278  try {
279  UInt_t cellIndex = cell->index();
280  UInt_t cellRunNumber = cell->runNumber();
281  UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
282  UInt_t cellFolderId = cell->folderId();
283  UInt_t cellFUProcId = cell->fuProcessId();
284  UInt_t cellFUGuid = cell->fuGuid();
285  UChar_t *cellPayloadAddr = cell->payloadAddr();
286  UInt_t cellEventSize = cell->eventSize();
287  sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
288  cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
289  cellEventSize);
290  try {
291  shmBuffer_->finishReadingDqmCell(cell);
292  } catch (evf::Exception& e) {
293  rethrowShmBufferException(e,
294  "FUResourceTable:sendDqm:finishReadingDqmCell");
295  }
296  } catch (xcept::Exception& e) {
297  LOG4CPLUS_FATAL(
298  log_,
299  "Failed to send DQM DATA to StorageManager: "
300  << xcept::stdformat_exception_history(e));
301  reschedule = false;
302  }
303  }
304 
305  sDqmActive_=reschedule;
306  return reschedule;
307 }
308 
309 //______________________________________________________________________________
311  bool reschedule = true;
312  FUShmDqmCell* cell = 0;
313  // initialize to a value to avoid warnings
315  try {
316  cell = shmBuffer_->dqmCellToRead();
317  state = shmBuffer_->dqmState(cell->index());
318  } catch (evf::Exception& e) {
319  rethrowShmBufferException(e,
320  "FUResourceTable:sendDqmWhileHalting:dqmCellToRead/dqmState");
321  }
322 
323  if (state == dqm::EMPTY) {
324  LOG4CPLUS_INFO(log_, "Don't reschedule sendDqm workloop.");
325  std::cout << "shut down dqm workloop " << std::endl;
326  UInt_t cellIndex = cell->index();
327  try {
328  shmBuffer_->finishReadingDqmCell(cell);
329  shmBuffer_->discardDqmCell(cellIndex);
330  } catch (evf::Exception& e) {
331  rethrowShmBufferException(e,
332  "FUResourceTable:sendDqmWhileHalting:finishReadingDqmCell/discardDqmCell");
333  }
334  shutdownStatus_|=1<<10;
335  reschedule = false;
336  } else {
337  UInt_t cellIndex = cell->index();
338  try {
339  shmBuffer_->finishReadingDqmCell(cell);
340  shmBuffer_->discardDqmCell(cellIndex);
341  } catch (evf::Exception& e) {
342  rethrowShmBufferException(e,
343  "FUResourceTable:sendDqmWhileHalting:finishReadingDqmCell/discardDqmCell");
344  }
345  }
346 
347  sDqmActive_=reschedule;
348  return reschedule;
349 }
350 
351 // common procedure for discard() and discardWhileHalting()
352 // when the workloop should not be rescheduled
353 //______________________________________________________________________________
355  std::cout << " entered shutdown cycle " << std::endl;
356  shutdownStatus_|=1<<11;
357  try {
358  shmBuffer_->writeRecoEmptyEvent();
359  } catch (evf::Exception& e) {
360  rethrowShmBufferException(e,
361  "FUResourceTable:discardNoReschedule:writeRecoEmptyEvent");
362  }
363 
364  UInt_t count = 0;
365  while (count < 100) {
366  std::cout << " shutdown cycle " << shmBuffer_->nClients() << " "
367  << FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << std::endl;
368  if (shmBuffer_->nClients() == 0 && FUShmBuffer::shm_nattch(
369  shmBuffer_->shmid()) == 1) {
370  shutdownStatus_|=1<<12;
371  //isReadyToShutDown_ = true;
372  break;
373  } else {
374  count++;
375  std::cout << " shutdown cycle attempt " << count << std::endl;
376  LOG4CPLUS_DEBUG(
377  log_,
378  "FUResourceTable: Wait for all clients to detach,"
379  << " nClients=" << shmBuffer_->nClients()
380  << " nattch=" << FUShmBuffer::shm_nattch(
381  shmBuffer_->shmid()) << " (" << count << ")");
382  ::usleep( shutdownTimeout_);
383  if (count * shutdownTimeout_ > 10000000)
384  LOG4CPLUS_WARN(
385  log_,
386  "FUResourceTable:LONG Wait (>10s) for all clients to detach,"
387  << " nClients=" << shmBuffer_->nClients()
388  << " nattch=" << FUShmBuffer::shm_nattch(
389  shmBuffer_->shmid()) << " (" << count << ")");
390 
391  }
392  }
393 
394  bool allEmpty = false;
395  std::cout << "Checking if all dqm cells are empty " << std::endl;
396  while (!allEmpty) {
397  UInt_t n = nbDqmCells_;
398  allEmpty = true;
399  shmBuffer_->lock();
400  for (UInt_t i = 0; i < n; i++) {
401  // initialize to a value to avoid warnings
403  try {
404  state = shmBuffer_->dqmState(i);
405  } catch (evf::Exception& e) {
406  rethrowShmBufferException(e,
407  "FUResourceTable:discardNoReschedule:dqmState");
408  }
409  if (state != dqm::EMPTY)
410  allEmpty = false;
411  }
412  shmBuffer_->unlock();
413  }
414  shutdownStatus_|=1<<13;
415 
416  std::cout << "Number of pending discards before declaring ready to shut down: " << nbPendingSMDqmDiscards_ << std::endl;
417  if (nbPendingSMDqmDiscards_ != 0) {
418  LOG4CPLUS_WARN(
419  log_,
420  "FUResourceTable: pending DQM discards not zero: ="
421  << nbPendingSMDqmDiscards_
422  << " while cells are all empty. This may cause problems at next start ");
423 
424  }
425 
426  try {
427  shmBuffer_->writeDqmEmptyEvent();
428  } catch (evf::Exception& e) {
429  rethrowShmBufferException(e,
430  "FUResourceTable:discardNoReschedule:writeDqmEmptyEvent");
431  }
432 
433  isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the
434  // sendDqm loop has been shut down as well
435 }
436 
437 //______________________________________________________________________________
439  FUShmRawCell* cell = 0;
440  // initialize to a value to avoid warnings
442  try {
443  cell = shmBuffer_->rawCellToDiscard();
444  state = shmBuffer_->evtState(cell->index());
445  } catch (evf::Exception& e) {
446  rethrowShmBufferException(e,
447  "FUResourceTable:discard:rawCellToRead/evtState");
448  }
449 
450  bool reschedule = true;
451  bool shutDown = (state == evt::STOP);
452  bool isLumi = (state == evt::USEDLS);
453  UInt_t fuResourceId = cell->fuResourceId();
454  UInt_t buResourceId = cell->buResourceId();
455 
456  if (state == evt::EMPTY) {
457  LOG4CPLUS_ERROR(log_, "WARNING! ATTEMPTING TO DISCARD EMPTY CELL!!!");
458  return true;
459  }
460 
461  if (shutDown) {
462  LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
463  if (nbClientsToShutDown_ > 0)
464  --nbClientsToShutDown_;
465  if (nbClientsToShutDown_ == 0) {
466  LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
467  isActive_ = false;
468  reschedule = false;
469  }
470  }
471 
472  try {
473  shmBuffer_->discardRawCell(cell);
474  } catch (evf::Exception& e) {
475  rethrowShmBufferException(e, "FUResourceTable:discard:discardRawCell");
476  }
477  // UPDATED
478  if (isLumi)
479  nbEolDiscarded_++;
480 
481  if (!shutDown && !isLumi) {
482  if (fuResourceId >= nbResources()) {
483  LOG4CPLUS_WARN(
484  log_,
485  "cell " << cell->index() << " in state " << state
486  << " scheduled for discard has no associated FU resource ");
487  } else {
488  resources_[fuResourceId]->release(true);
489  lock();
490  freeResourceIds_.push(fuResourceId);
491  assert(freeResourceIds_.size() <= resources_.size());
492  unlock();
493 
494  sendDiscard(buResourceId);
495  sendAllocate();
496  }
497  }
498 
499  if (!reschedule) {
500  discardNoReschedule();
501  }
502 
503  return reschedule;
504 }
505 
506 //______________________________________________________________________________
507 bool FUResourceTable::discardWhileHalting(bool sendDiscards) {
508  FUShmRawCell* cell = 0;
509  // initialize to a value to avoid warnings
511  try {
512  cell = shmBuffer_->rawCellToDiscard();
513  state = shmBuffer_->evtState(cell->index());
514  } catch (evf::Exception& e) {
515  rethrowShmBufferException(e,
516  "FUResourceTable:discardWhileHalting:rawCellToRead/evtState");
517  }
518 
519  bool reschedule = true;
520  bool shutDown = (state == evt::STOP);
521  bool isLumi = (state == evt::USEDLS);
522  UInt_t fuResourceId = cell->fuResourceId();
523  UInt_t buResourceId = cell->buResourceId();
524 
525  if (state == evt::EMPTY) {
526  LOG4CPLUS_ERROR(log_, "WARNING! ATTEMPTING TO DISCARD EMPTY CELL!!!");
527  return true;
528  }
529 
530  if (shutDown) {
531  LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
532  if (nbClientsToShutDown_ > 0)
533  --nbClientsToShutDown_;
534  if (nbClientsToShutDown_ == 0) {
535  LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
536  isActive_ = false;
537  reschedule = false;
538  }
539  }
540 
541  try {
542  shmBuffer_->discardRawCell(cell);
543  } catch (evf::Exception& e) {
544  rethrowShmBufferException(e,
545  "FUResourceTable:discardWhileHalting:discardRawCell");
546  }
547  // UPDATED
548  if (isLumi)
549  nbEolDiscarded_++;
550 
551  if (!shutDown && !isLumi) {
552  if (fuResourceId >= nbResources()) {
553  LOG4CPLUS_WARN(
554  log_,
555  "cell " << cell->index() << " in state " << state
556  << " scheduled for discard has no associated FU resource ");
557  } else {
558  resources_[fuResourceId]->release(true);
559  lock();
560  freeResourceIds_.push(fuResourceId);
561  assert(freeResourceIds_.size() <= resources_.size());
562  unlock();
563 
564  /*
565  sendDiscard(buResourceId);
566  sendAllocate();
567  */
568  if (sendDiscards)
569  sendDiscard(buResourceId);
570  }
571  }
572 
573  if (!reschedule) {
574  discardNoReschedule();
575  }
576 
577  return reschedule;
578 }
579 
580 //______________________________________________________________________________
582  bool eventComplete = false;
583  // UPDATED
584  bool lastMsg = isLastMessageOfEvent(bufRef);
585  I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =
586  (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();
587 
588  UInt_t fuResourceId = (UInt_t) block->fuTransactionId;
589  UInt_t buResourceId = (UInt_t) block->buResourceId;
590  // Check input
591  if ((int) block->fuTransactionId < 0 || fuResourceId >= nbRawCells_
592  || (int) block->buResourceId < 0) {
593  stringstream failureStr;
594  failureStr << "Received TAKE message with invalid bu/fu resource id:"
595  << " fuResourceId: " << fuResourceId << " buResourceId: "
596  << buResourceId;
597  LOG4CPLUS_ERROR(log_, failureStr.str());
598  XCEPT_RAISE(evf::Exception, failureStr.str());
599  }
600  FUResource* resource = resources_[fuResourceId];
601 
602  // allocate resource
603  if (!resource->fatalError() && !resource->isAllocated()) {
604  FUShmRawCell* cell = 0;
605  try {
606  cell = shmBuffer_->rawCellToWrite();
607  } catch (evf::Exception& e) {
608  rethrowShmBufferException(e,
609  "FUResourceTable:buildResource:rawCellToWrite");
610  }
611  if (cell == 0) {
612  bufRef->release();
613  return eventComplete;
614  }
615  resource->allocate(cell);
616  timeval now;
617  gettimeofday(&now, 0);
618 
619  frb_->setRBTimeStamp(
620  ((uint64_t)(now.tv_sec) << 32) + (uint64_t)(now.tv_usec));
621 
622  frb_->setRBEventCount(nbCompleted_);
623 
624  if (doCrcCheck_ > 0 && 0 == nbAllocated_ % doCrcCheck_)
625  resource->doCrcCheck(true);
626  else
627  resource->doCrcCheck(false);
628  }
629 
630 #ifdef DEBUG_RES_TAB
631  std::cout << "Received frame for resource " << buResourceId << std::endl;
632 #endif
633  // keep building this resource if it is healthy
634  if (!resource->fatalError()) {
635 #ifdef DEBUG_RES_TAB
636  std::cout << "No fatal error for " << buResourceId << ", keep building..."<< std::endl;
637 #endif
638  resource->process(bufRef);
639  lock();
640  nbErrors_ += resource->nbErrors();
641  nbCrcErrors_ += resource->nbCrcErrors();
642  unlock();
643 #ifdef DEBUG_RES_TAB
644  std::cout << "Checking if resource is complete " << buResourceId << std::endl;
645 #endif
646  // make resource available for pick-up
647  if (resource->isComplete()) {
648 #ifdef DEBUG_RES_TAB
649  std::cout << "@@@@RESOURCE is COMPLETE " << buResourceId << std::endl;
650 #endif
651  lock();
652  nbCompleted_++;
653  nbPending_--;
654  unlock();
655  if (doDumpEvents_ > 0 && nbCompleted_ % doDumpEvents_ == 0)
656  dumpEvent(resource->shmCell());
657  try {
658  shmBuffer_->finishWritingRawCell(resource->shmCell());
659  } catch (evf::Exception& e) {
660  rethrowShmBufferException(e,
661  "FUResourceTable:buildResource:finishWritingRawCell");
662  }
663  eventComplete = true;
664  }
665 
666  }
667  // bad event, release msg, and the whole resource if this was the last one
668  if (resource->fatalError()) {
669  if (lastMsg) {
670  try {
671  shmBuffer_->releaseRawCell(resource->shmCell());
672  } catch (evf::Exception& e) {
673  rethrowShmBufferException(e,
674  "FUResourceTable:buildResource:releaseRawCell");
675  }
676  resource->release(true);
677  lock();
678  freeResourceIds_.push(fuResourceId);
679  nbDiscarded_++;
680  nbLost_++;
681  nbPending_--;
682  unlock();
683  bu_->sendDiscard(buResourceId);
684  sendAllocate();
685  }
686  //bufRef->release(); // this should now be safe re: appendToSuperFrag as corrupted blocks will be removed...
687  }
688 
689  return eventComplete;
690 }
691 
692 //______________________________________________________________________________
695  msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
696  UInt_t recoIndex = msg->rbBufferID;
697 
698  // Check input
699  if ((int) msg->rbBufferID < 0 || recoIndex >= nbRecoCells_)
700  LOG4CPLUS_ERROR(
701  log_,
702  "Received DISCARD DATA message with invalid recoIndex:"
703  << recoIndex);
704 
705  if (acceptSMDataDiscard_[recoIndex]) {
706  lock();
707  nbPendingSMDiscards_--;
708  unlock();
709  acceptSMDataDiscard_[recoIndex] = false;
710 
711  try {
712  shmBuffer_->discardRecoCell(recoIndex);
713  } catch (evf::Exception& e) {
714  rethrowShmBufferException(e,
715  "FUResourceTable:discardDataEvent:discardRecoCell");
716  }
717 
718  } else {
719  LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
720  }
721 
722  bufRef->release();
723  return true;
724 }
725 
726 //______________________________________________________________________________
729  msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
730  UInt_t recoIndex = msg->rbBufferID;
731 
732  // Check input
733  if ((int) msg->rbBufferID < 0 || recoIndex >= nbRecoCells_)
734  LOG4CPLUS_ERROR(
735  log_,
736  "Received DISCARD DATA message with invalid recoIndex:"
737  << recoIndex);
738 
739  if (acceptSMDataDiscard_[recoIndex]) {
740  lock();
741  nbPendingSMDiscards_--;
742  unlock();
743  acceptSMDataDiscard_[recoIndex] = false;
744 
745  } else {
746  LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
747  }
748 
749  bufRef->release();
750  return false;
751 }
752 
753 //______________________________________________________________________________
756  msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
757  UInt_t dqmIndex = msg->rbBufferID;
758 
759  // Check input
760  if ((int) msg->rbBufferID < 0 || dqmIndex >= nbDqmCells_)
761  LOG4CPLUS_ERROR(
762  log_,
763  "Received DISCARD DQM message with invalid dqmIndex:"
764  << dqmIndex);
765 
766  unsigned int ntries = 0;
767  try {
768  while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
769  if (ntries)//tolerate one attempt
770  LOG4CPLUS_WARN(
771  log_,
772  "DQM discard for cell " << dqmIndex
773  << " which is not yet in SENT state - waiting");
774  ::usleep(10000);
775  if (ntries++ > 10) {
776  LOG4CPLUS_ERROR(
777  log_,
778  "DQM cell " << dqmIndex
779  << " discard timed out while cell still in state "
780  << shmBuffer_->dqmState(dqmIndex));
781  bufRef->release();
782  return true;
783  }
784  }
785  } catch (evf::Exception& e) {
786  rethrowShmBufferException(e, "FUResourceTable:discardDqmEvent:dqmState");
787  }
788  if (acceptSMDqmDiscard_[dqmIndex] > 0) {
789  acceptSMDqmDiscard_[dqmIndex]--;
790  if (--nbPendingSMDqmDiscards_ < 0) {
791  LOG4CPLUS_WARN(
792  log_,
793  "Spurious??? DQM discard by StorageManager, index "
794  << dqmIndex << " cell state "
795  << shmBuffer_->dqmState(dqmIndex)
796  << " accept flag " << acceptSMDqmDiscard_[dqmIndex]);
797  }
798  try {
799  shmBuffer_->discardDqmCell(dqmIndex);
800  } catch (evf::Exception& e) {
801  rethrowShmBufferException(e,
802  "FUResourceTable:discardDqmEvent:discardDqmCell");
803  }
804 
805  } else {
806  LOG4CPLUS_ERROR(
807  log_,
808  "Spurious DQM discard for cell " << dqmIndex
809  << " from StorageManager while cell is not accepting discards");
810  }
811 
812  bufRef->release();
813  return true;
814 }
815 
816 //______________________________________________________________________________
819  msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
820  UInt_t dqmIndex = msg->rbBufferID;
821 
822  // Check input
823  if ((int) msg->rbBufferID < 0 || dqmIndex >= nbDqmCells_)
824  LOG4CPLUS_ERROR(
825  log_,
826  "Received DISCARD DQM message with invalid dqmIndex:"
827  << dqmIndex);
828 
829  unsigned int ntries = 0;
830  try {
831  while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
832  if (ntries)//tolerate one attempt
833  LOG4CPLUS_WARN(
834  log_,
835  "DQM discard for cell " << dqmIndex
836  << " which is not yet in SENT state - waiting");
837  ::usleep(10000);
838  if (ntries++ > 10) {
839  LOG4CPLUS_ERROR(
840  log_,
841  "DQM cell " << dqmIndex
842  << " discard timed out while cell still in state "
843  << shmBuffer_->dqmState(dqmIndex));
844  bufRef->release();
845  return true;
846  }
847  }
848  } catch (evf::Exception& e) {
849  rethrowShmBufferException(e,
850  "FUResourceTable:discardDqmEventWhileHalting:dqmState(2)");
851  }
852  if (acceptSMDqmDiscard_[dqmIndex] > 0) {
853  acceptSMDqmDiscard_[dqmIndex]--;
854  if (--nbPendingSMDqmDiscards_ < 0) {
855  try {
856  LOG4CPLUS_WARN(
857  log_,
858  "Spurious??? DQM discard by StorageManager, index "
859  << dqmIndex << " cell state "
860  << shmBuffer_->dqmState(dqmIndex)
861  << " accept flag "
862  << acceptSMDqmDiscard_[dqmIndex]);
863  } catch (evf::Exception& e) {
864  rethrowShmBufferException(e,
865  "FUResourceTable:discardDqmEventWhileHalting:dqmState");
866  }
867  }
868 
869  } else {
870  LOG4CPLUS_ERROR(
871  log_,
872  "Spurious DQM discard for cell " << dqmIndex
873  << " from StorageManager while cell is not accepting discards");
874  }
875 
876  bufRef->release();
877  return false;
878 }
879 
880 //______________________________________________________________________________
882  I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME
883  *msg =
884  (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *) bufRef->getDataLocation();
885  //make sure to fill up the shmem so no process will miss it
886  // but processes will have to handle duplicates
887 
888  // Check input
889  int lumiCheck = (int) msg->lumiSection;
890  if (lumiCheck < 0)
891  LOG4CPLUS_ERROR(log_,
892  "Received EOL message with invalid index:" << lumiCheck);
893 
894  for (unsigned int i = 0; i < nbRawCells_; i++) {
895  // UPDATED
896  if (stopFlag_) break;
897  nbEolPosted_++;
898  try {
899  shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
900  } catch (evf::Exception& e) {
901  rethrowShmBufferException(e,
902  "FUResourceTable:postEndOfLumiSection:writeRawLumiSectionEvent");
903  }
904  }
905 }
906 
907 //______________________________________________________________________________
909  FUShmRawCell* cell = 0;
910  try {
911  cell = shmBuffer_->rawCellToRead();
912  } catch (evf::Exception& e) {
913  rethrowShmBufferException(e, "FUResourceTable:dropEvent:rawCellToRead");
914  }
915  UInt_t fuResourceId = cell->fuResourceId();
916  try {
917  shmBuffer_->finishReadingRawCell(cell);
918  shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
919  } catch (evf::Exception& e) {
920  rethrowShmBufferException(e,
921  "FUResourceTable:dropEvent:finishReadingRawCell/scheduleRawCellForDiscard");
922  }
923 }
924 
925 //______________________________________________________________________________
927  bool retval = false;
928  vector < pid_t > pids = cellPrcIds();
929  UInt_t iRawCell = pids.size();
930  for (UInt_t i = 0; i < pids.size(); i++) {
931  if (pid == pids[i]) {
932  iRawCell = i;
933  break;
934  }
935  }
936 
937  if (iRawCell < pids.size()) {
938  try {
939  bool shmret = shmBuffer_->writeErrorEventData(runNumber, pid, iRawCell, true);
940  if (!shmret)
941  LOG4CPLUS_WARN(log_,"Problem writing to the error stream.");
942  } catch (evf::Exception& e) {
943  rethrowShmBufferException(e,
944  "FUResourceTable:handleCrashedEP:writeErrorEventData");
945  }
946  retval = true;
947  } else
948  LOG4CPLUS_WARN(log_,
949  "No raw data to send to error stream for process " << pid);
950  try {
951  bool success = shmBuffer_->removeClientPrcId(pid);
952  if (!success)
953  LOG4CPLUS_WARN(log_,
954  "removeClientPrcId: " << pid << " not in shared memory index, was in raw cell " << iRawCell);
955  } catch (evf::Exception& e) {
956  rethrowShmBufferException(e,
957  "FUResourceTable:handleCrashedEP:removeClientPrcId");
958  }
959  return retval;
960 }
961 
962 //______________________________________________________________________________
963 void FUResourceTable::shutdownWatchdog(unsigned int timeout)
964 {
965  unsigned int timeoutUs=timeout*1000000+1;
966  bool warned=false;
967  while (!watchDogEnd_) {
968 
969  usleep(50000);
970  timeoutUs-=50000;
971  if (timeoutUs<=50000) {
972  LOG4CPLUS_ERROR(log_,"Timeout in shutdownClients, status:"<< std::hex << shutdownStatus_);
973  watchDogSetFailed_=true;
974  break;
975  }
976  if (timeoutUs<=1000000*timeout/2 && !warned) {
977  warned=true;
978  LOG4CPLUS_WARN(log_,"Long shutdown of clients, status:" << std::hex << shutdownStatus_);
979  }
980  }
981 }
982 
983 //______________________________________________________________________________
985  nbClientsToShutDown_ = nbClients();
986  isReadyToShutDown_ = false;
987 
988  shutdownStatus_=1;
989 
990  //start watchdog thread
991  watchDogEnd_=false;
992  watchDogSetFailed_=false;
993  #ifdef linux
994  std::thread watch(&FUResourceTable::shutdownWatchdog,this,20);
995  #endif
996  if (nbClientsToShutDown_ == 0) {
997  shutdownStatus_|=1<<1;
998  LOG4CPLUS_INFO(
999  log_,
1000  "No clients to shut down. Checking if there are raw cells not assigned to any process yet");
1001  UInt_t n = nbResources();
1002  try {
1003  for (UInt_t i = 0; i < n; i++) {
1004  evt::State_t state = shmBuffer_->evtState(i);
1005  if (state != evt::EMPTY) {
1006  LOG4CPLUS_WARN(
1007  log_,
1008  "Schedule discard at STOP for orphaned event in state "
1009  << state);
1010  shmBuffer_->scheduleRawCellForDiscardServerSide(i);
1011  }
1012  }
1013  shmBuffer_->scheduleRawEmptyCellForDiscard();
1014  } catch (evf::Exception& e) {
1015  rethrowShmBufferException(e,
1016  "FUResourceTable:shutDownClients:evtState/scheduleRawEmptyCellForDiscard");
1017  }
1018  } else {
1019  // UPDATED
1020  int checks = 0;
1021  try {
1022  while (shmBuffer_->nbRawCellsToWrite() < nbClients() && nbClients()
1023  != 0) {
1024  shutdownStatus_|=1<<2;
1025  checks++;
1026  {
1027  #ifdef linux
1028  auto lk = lockCrashHandlerTimed(10);
1029  #else
1030  bool lk=true;
1031  #endif
1032  if (lk) {
1033  vector < pid_t > prcids = clientPrcIds();
1034  for (UInt_t i = 0; i < prcids.size(); i++) {
1035  pid_t pid = prcids[i];
1036  int status = kill(pid, 0);
1037  if (status != 0) {
1038  LOG4CPLUS_ERROR(log_,
1039  "EP prc " << pid << " completed with error.");
1040  handleCrashedEP(runNumber_, pid);
1041  }
1042  }
1043  }
1044  else {
1045  XCEPT_RAISE(evf::Exception,
1046  "Timed out access to the Crash Handler in stop. SM discards not arriving?");
1047 
1048  }
1049  }
1050 
1051  LOG4CPLUS_WARN(
1052  log_,
1053  "no cell to write stop "
1054  << shmBuffer_->nbRawCellsToWrite()
1055  << " nClients " << nbClients());
1056  if (checks > 15) {
1057  string msg = "No Raw Cell to Write STOP messages";
1058  XCEPT_RAISE(evf::Exception, msg);
1059  }
1060  ::usleep(500000);
1061  }
1062  shutdownStatus_|=1<<3;
1063 
1064  } catch (evf::Exception& e) {
1065  watchDogEnd_=true;
1066  #ifdef linux
1067  watch.join();
1068  #endif
1069  rethrowShmBufferException(e,
1070  "FUResourceTable:shutDownClients:nbRawCellsToWrite");
1071  }
1072  nbClientsToShutDown_ = nbClients();
1073  if (nbClientsToShutDown_ == 0) {
1074  shutdownStatus_|=1<<4;
1075  UInt_t n = nbResources();
1076  for (UInt_t i = 0; i < n; i++) {
1077  // initialize to a value to avoid warnings
1079  try {
1080  state = shmBuffer_->evtState(i);
1081  } catch (evf::Exception& e) {
1082  watchDogEnd_=true;
1083  #ifdef linux
1084  watch.join();
1085  #endif
1086  rethrowShmBufferException(e,
1087  "FUResourceTable:shutDownClients:evtState");
1088  }
1089  if (state != evt::EMPTY) {
1090  LOG4CPLUS_WARN(
1091  log_,
1092  "Schedule discard at STOP for orphaned event in state "
1093  << state);
1094  try {
1095  shmBuffer_->setEvtDiscard(i, 1, true);
1096  shmBuffer_->scheduleRawCellForDiscardServerSide(i);
1097  } catch (evf::Exception& e) {
1098  watchDogEnd_=true;
1099  #ifdef linux
1100  watch.join();
1101  #endif
1102  rethrowShmBufferException(e,
1103  "FUResourceTable:shutDownClients:scheduleRawCellForDiscardServerSide");
1104  }
1105  }
1106  }
1107  try {
1108  shmBuffer_->scheduleRawEmptyCellForDiscard();
1109  } catch (evf::Exception& e) {
1110  watchDogEnd_=true;
1111  #ifdef linux
1112  watch.join();
1113  #endif
1114  rethrowShmBufferException(e,
1115  "FUResourceTable:shutDownClients:scheduleRawEmptyCellForDiscard");
1116  }
1117  }
1118  UInt_t n = nbClientsToShutDown_;
1119  shutdownStatus_|=1<<5;
1120  try {
1121  for (UInt_t i = 0; i < n; ++i)
1122  shmBuffer_->writeRawEmptyEvent();
1123  } catch (evf::Exception& e) {
1124  watchDogEnd_=true;
1125  #ifdef linux
1126  watch.join();
1127  #endif
1128  rethrowShmBufferException(e,
1129  "FUResourceTable:shutDownClients:writeRawEmptyEvent");
1130  }
1131  shutdownStatus_|=1<<6;
1132  }
1133  watchDogEnd_=true;
1134  #ifdef linux
1135  watch.join();
1136  if (watchDogSetFailed_)
1137  XCEPT_RAISE(evf::Exception, "Failed (timed out) shutdown of clients");
1138  #endif
1139 }
1140 
1141 //______________________________________________________________________________
1143  for (UInt_t i = 0; i < resources_.size(); i++) {
1144  resources_[i]->release(true);
1145  delete resources_[i];
1146  }
1147  resources_.clear();
1148  while (!freeResourceIds_.empty())
1149  freeResourceIds_.pop();
1150 }
1151 
1153 // implementation of private member functions
1155 
1156 //______________________________________________________________________________
1158  if (0 != shmBuffer_) {
1159  try {
1160  for (UInt_t i = 0; i < shmBuffer_->nRecoCells(); i++)
1161  acceptSMDataDiscard_[i] = false;
1162  for (UInt_t i = 0; i < shmBuffer_->nDqmCells(); i++)
1163  acceptSMDqmDiscard_[i] = 0;
1164  } catch (evf::Exception& e) {
1165  rethrowShmBufferException(e,
1166  "FUResourceTable:resetCounters:nRecoCells/nDqmCells");
1167  }
1168  }
1169 
1170  // UPDATE: reset pending allocate's
1171  nbAllocated_ = 0;
1172  nbPending_ = 0;
1173  nbCompleted_ = 0;
1174  nbSent_ = 0;
1175  nbSentError_ = 0;
1176  nbSentDqm_ = 0;
1177  nbPendingSMDiscards_ = 0;
1178  nbPendingSMDqmDiscards_ = 0;
1179  nbDiscarded_ = 0;
1180  nbLost_ = 0;
1181  // UPDATED
1182  nbEolPosted_ = 0;
1183  nbEolDiscarded_ = 0;
1184 
1185  nbErrors_ = 0;
1186  nbCrcErrors_ = 0;
1187  nbAllocSent_ = 0;
1188 
1189  sumOfSquares_ = 0;
1190  sumOfSizes_ = 0;
1191 
1192  //"send" workloop states
1193  sDqmActive_=true;
1194  sDataActive_=true;
1195 
1196 }
1197 
1198 //______________________________________________________________________________
1200  UInt_t result(0);
1201  try {
1202  if (0 != shmBuffer_)
1203  result = shmBuffer_->nClients();
1204  } catch (evf::Exception& e) {
1205  rethrowShmBufferException(e, "FUResourceTable:nbClients:nClients");
1206  }
1207  return result;
1208 }
1209 
1210 //______________________________________________________________________________
1211 vector<pid_t> FUResourceTable::clientPrcIds() const {
1212  vector < pid_t > result;
1213  try {
1214  if (0 != shmBuffer_) {
1215  UInt_t n = nbClients();
1216  for (UInt_t i = 0; i < n; i++)
1217  result.push_back(shmBuffer_->clientPrcId(i));
1218  }
1219  } catch (evf::Exception& e) {
1220  rethrowShmBufferException(e,
1221  "FUResourceTable:clientPrcIds:clientPrcIds");
1222  }
1223  return result;
1224 }
1225 
1226 //______________________________________________________________________________
1228  stringstream ss;
1229  try {
1230  if (0 != shmBuffer_) {
1231  UInt_t n = nbClients();
1232  for (UInt_t i = 0; i < n; i++) {
1233  if (i > 0)
1234  ss << ",";
1235  ss << shmBuffer_->clientPrcId(i);
1236  }
1237  }
1238  } catch (evf::Exception& e) {
1239  rethrowShmBufferException(e,
1240  "FUResourceTable:clientPrcIdsAsString:clientPrcId");
1241  }
1242  return ss.str();
1243 }
1244 
1245 //______________________________________________________________________________
1246 vector<string> FUResourceTable::cellStates() const {
1247  vector < string > result;
1248  if (0 != shmBuffer_) {
1249  UInt_t n = nbResources();
1250  shmBuffer_->lock();
1251  try {
1252  for (UInt_t i = 0; i < n; i++) {
1253  evt::State_t state = shmBuffer_->evtState(i);
1254  if (state == evt::EMPTY)
1255  result.push_back("EMPTY");
1256  else if (state == evt::STOP)
1257  result.push_back("STOP");
1258  else if (state == evt::LUMISECTION)
1259  result.push_back("LUMISECTION");
1260  // UPDATED
1261  else if (state == evt::USEDLS)
1262  result.push_back("USEDLS");
1263  else if (state == evt::RAWWRITING)
1264  result.push_back("RAWWRITING");
1265  else if (state == evt::RAWWRITTEN)
1266  result.push_back("RAWWRITTEN");
1267  else if (state == evt::RAWREADING)
1268  result.push_back("RAWREADING");
1269  else if (state == evt::RAWREAD)
1270  result.push_back("RAWREAD");
1271  else if (state == evt::PROCESSING)
1272  result.push_back("PROCESSING");
1273  else if (state == evt::PROCESSED)
1274  result.push_back("PROCESSED");
1275  else if (state == evt::RECOWRITING)
1276  result.push_back("RECOWRITING");
1277  else if (state == evt::RECOWRITTEN)
1278  result.push_back("RECOWRITTEN");
1279  else if (state == evt::SENDING)
1280  result.push_back("SENDING");
1281  else if (state == evt::SENT)
1282  result.push_back("SENT");
1283  else if (state == evt::DISCARDING)
1284  result.push_back("DISCARDING");
1285  }
1286  } catch (evf::Exception& e) {
1287  rethrowShmBufferException(e, "FUResourceTable:cellStates:evtState");
1288  }
1289  shmBuffer_->unlock();
1290  }
1291  return result;
1292 }
1293 
1294 vector<string> FUResourceTable::dqmCellStates() const {
1295  vector < string > result;
1296  if (0 != shmBuffer_) {
1297  UInt_t n = nbDqmCells_;
1298  shmBuffer_->lock();
1299  try {
1300  for (UInt_t i = 0; i < n; i++) {
1301  dqm::State_t state = shmBuffer_->dqmState(i);
1302  if (state == dqm::EMPTY)
1303  result.push_back("EMPTY");
1304  else if (state == dqm::WRITING)
1305  result.push_back("WRITING");
1306  else if (state == dqm::WRITTEN)
1307  result.push_back("WRITTEN");
1308  else if (state == dqm::SENDING)
1309  result.push_back("SENDING");
1310  else if (state == dqm::SENT)
1311  result.push_back("SENT");
1312  else if (state == dqm::DISCARDING)
1313  result.push_back("DISCARDING");
1314  }
1315  } catch (evf::Exception& e) {
1316  rethrowShmBufferException(e,
1317  "FUResourceTable:dqmCellStates:dqmState");
1318  }
1319  shmBuffer_->unlock();
1320  }
1321  return result;
1322 }
1323 
1324 //______________________________________________________________________________
1325 vector<UInt_t> FUResourceTable::cellEvtNumbers() const {
1326  vector < UInt_t > result;
1327  if (0 != shmBuffer_) {
1328  UInt_t n = nbResources();
1329  shmBuffer_->lock();
1330  try {
1331  for (UInt_t i = 0; i < n; i++)
1332  result.push_back(shmBuffer_->evtNumber(i));
1333  } catch (evf::Exception& e) {
1334  rethrowShmBufferException(e,
1335  "FUResourceTable:cellEvtNumbers:evtNumber");
1336  }
1337  shmBuffer_->unlock();
1338  }
1339  return result;
1340 }
1341 
1342 //______________________________________________________________________________
1343 vector<pid_t> FUResourceTable::cellPrcIds() const {
1344  vector < pid_t > result;
1345  if (0 != shmBuffer_) {
1346  UInt_t n = nbResources();
1347  shmBuffer_->lock();
1348  try {
1349  for (UInt_t i = 0; i < n; i++)
1350  result.push_back(shmBuffer_->evtPrcId(i));
1351  } catch (evf::Exception& e) {
1352  rethrowShmBufferException(e, "FUResourceTable:cellPrcIds:evtPrcId");
1353  }
1354  shmBuffer_->unlock();
1355  }
1356  return result;
1357 }
1358 
1359 //______________________________________________________________________________
1360 vector<time_t> FUResourceTable::cellTimeStamps() const {
1361  vector < time_t > result;
1362  try {
1363  if (0 != shmBuffer_) {
1364  UInt_t n = nbResources();
1365  shmBuffer_->lock();
1366  for (UInt_t i = 0; i < n; i++)
1367  result.push_back(shmBuffer_->evtTimeStamp(i));
1368  shmBuffer_->unlock();
1369  }
1370  } catch (evf::Exception& e) {
1371  rethrowShmBufferException(e,
1372  "FUResourceTable:cellTimeStamps:evtTimeStamp");
1373  }
1374  return result;
1375 }
1376 
1378 // implementation of private member functions
1380 
1382  try {
1383  ostringstream ost;
1384  ost << "lastResort: " << shmBuffer_->nbRawCellsToRead()
1385  << " more rawcells to read ";
1386  LOG4CPLUS_WARN(log_,ost.str());
1387  std::cout << ost.str() << std::endl;
1388 
1389  while (shmBuffer_->nbRawCellsToRead() != 0) {
1390  FUShmRawCell* newCell = shmBuffer_->rawCellToRead();
1391  std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
1392  << std::endl;
1393  // UPDATED
1394  LOG4CPLUS_WARN(log_,"lastResort: Scheduling raw cell (server side) "<< newCell->index());
1395  shmBuffer_->scheduleRawCellForDiscardServerSide(newCell->index());
1396 
1397  std::cout << "lastResort: schedule raw cell for discard "
1398  << newCell->index() << std::endl;
1399  }
1400  //trigger the shutdown (again?)
1401  LOG4CPLUS_WARN(log_,"lastResort: scheduling empty raw cell (server side) ");
1402  shmBuffer_->scheduleRawEmptyCellForDiscard();
1403  LOG4CPLUS_WARN(log_,"lastResort: Finished. cells remaining: " << shmBuffer_->nbRawCellsToRead());
1404  } catch (evf::Exception& e) {
1405  rethrowShmBufferException(
1406  e,
1407  "FUResourceTable:lastResort:nbRawCellsToRead/scheduleRawCellForDiscardServerSide");
1408  }
1409  LOG4CPLUS_WARN(log_,"Last resort finished ");
1410 }
1411 
1413  if (shmBuffer_ != 0) {
1414  //waiting for sendData and sendDqm workloops to finish
1415  int countdown_=60;
1416  while (countdown_-- && (sDataActive_ || sDqmActive_)) ::usleep(50000);
1417  if (countdown_<=0) {
1418  std::ostringstream ostr;
1419  ostr << "Resource broker timed out waiting for workloop shutdowns (3 seconds). Continuing to reset Shm. States - "
1420  << " sendDqm:"<<sDqmActive_ << " sendData:" << sDataActive_;
1421  LOG4CPLUS_ERROR(log_,ostr.str());
1422  std::cout << ostr.str() << std::endl;
1423  }
1424  //resetting shm buffer
1425  shmBuffer_->reset(false);
1426  LOG4CPLUS_INFO(log_, "ShmBuffer was reset!");
1427  }
1428 }
1429 
1431  if (shmBuffer_) return shmBuffer_->sem_print_s();
1432  else return std::string("ShmBuffer not initialized");
1433 }
1434 
1436  throw (evf::Exception) {
1437  stringstream details;
1438  vector < string > dataStates = cellStates();
1439  vector < string > dqmStates = dqmCellStates();
1440  details << "Exception raised: " << e.what() << " (in module: "
1441  << e.module() << " in function: " << e.function() << " at line: "
1442  << e.line() << ")";
1443  details << " Dumping cell state... ";
1444  details << "data cells --> ";
1445  for (unsigned int i = 0; i < dataStates.size(); i++)
1446  details << dataStates[i] << " ";
1447  details << "dqm cells --> ";
1448  for (unsigned int i = 0; i < dqmStates.size(); i++)
1449  details << dqmStates[i] << " ";
1450  details << " ... originated in: " << where;
1451  XCEPT_RETHROW(evf::Exception, details.str(), e);
1452 }
unsigned int index() const
Definition: FUShmRecoCell.h:22
static const char runNumber_[]
unsigned int fuGuid() const
Definition: FUShmDqmCell.h:27
bool discardDataEvent(MemRef_t *bufRef)
int i
Definition: DBlmapReader.cc:9
static int shm_nattch(int shmid)
void process(MemRef_t *bufRef)
Definition: FUResource.cc:126
std::vector< UInt_t > cellEvtNumbers() const
unsigned int type() const
Definition: FUShmRecoCell.h:29
unsigned int index() const
Definition: FUShmDqmCell.h:22
bool fatalError() const
Definition: FUResource.h:110
unsigned int runNumber() const
Definition: FUShmRecoCell.h:24
std::vector< std::string > cellStates() const
unsigned int fuGuid() const
Definition: FUShmRecoCell.h:28
FUResourceTable(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int, EvffedFillerRB *frb, xdaq::Application *)
unsigned int folderId() const
Definition: FUShmDqmCell.h:25
bool handleCrashedEP(UInt_t runNumber, pid_t pid)
UInt_t nbErrors(bool reset=true)
Definition: FUResource.cc:308
unsigned int eventSize() const
Definition: FUShmRecoCell.h:33
std::ostream & logger()
Definition: fwLog.cc:41
toolbox::mem::Reference MemRef_t
Definition: FUTypes.h:10
unsigned int evtAtUpdate() const
Definition: FUShmDqmCell.h:24
bool buildResource(MemRef_t *bufRef)
unsigned int fuProcessId() const
Definition: FUShmRecoCell.h:27
static bool releaseSharedMemory()
void release(bool detachResource)
Definition: FUResource.cc:82
UInt_t nbClients() const
void dumpEvent(uint8 *buf)
Definition: DumpTools.cc:193
unsigned char * payloadAddr() const
unsigned int evtNumber() const
Definition: FUShmRecoCell.h:25
unsigned int nExpectedEPs() const
Definition: FUShmRecoCell.h:34
unsigned int rawCellIndex() const
Definition: FUShmRecoCell.h:23
unsigned int fuProcessId() const
Definition: FUShmDqmCell.h:26
bool discardWhileHalting(bool sendDiscards)
unsigned int runNumber() const
Definition: FUShmDqmCell.h:23
void shutdownWatchdog(unsigned int timeout)
unsigned char UChar_t
Definition: FUTypes.h:14
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
Definition: matutil.cc:168
std::vector< time_t > cellTimeStamps() const
bool discardDqmEventWhileHalting(MemRef_t *bufRef)
tuple result
Definition: query.py:137
std::vector< pid_t > clientPrcIds() const
bool isComplete() const
Definition: FUResource.h:221
static FUShmBuffer * createShmBuffer(bool semgmentationMode, unsigned int nRawCells, unsigned int nRecoCells, unsigned int nDqmCells, unsigned int rawCellSize=0x400000, unsigned int recoCellSize=0x400000, unsigned int dqmCellSize=0x400000)
Definition: FUShmBuffer.cc:917
unsigned int UInt_t
Definition: FUTypes.h:12
unsigned char * payloadAddr() const
Definition: FUShmDqmCell.cc:54
evf::FUShmRawCell * shmCell()
Definition: FUResource.h:145
std::string clientPrcIdsAsString() const
unsigned long long uint64_t
Definition: Time.h:15
bool isAllocated() const
Definition: FUResource.h:113
unsigned int eventSize() const
Definition: FUShmDqmCell.h:31
char state
Definition: procUtils.cc:75
void doCrcCheck(bool doCrcCheck)
Definition: FUResource.h:104
unsigned int index() const
Definition: FUShmRawCell.h:25
std::vector< pid_t > cellPrcIds() const
unsigned int outModId() const
Definition: FUShmRecoCell.h:26
perl if(1 lt scalar(@::datatypes))
Definition: edlooper.cc:31
unsigned int fuResourceId() const
Definition: FUShmRawCell.h:26
UInt_t nbCrcErrors(bool reset=true)
Definition: FUResource.cc:316
tuple cout
Definition: gather_cfg.py:121
void postEndOfLumiSection(MemRef_t *bufRef)
tuple status
Definition: ntuplemaker.py:245
std::vector< std::string > dqmCellStates() const
void rethrowShmBufferException(evf::Exception &e, std::string where) const
void allocate(FUShmRawCell *shmCell)
Definition: FUResource.cc:63
bool discardDqmEvent(MemRef_t *bufRef)
unsigned int buResourceId() const
Definition: FUShmRawCell.h:27
bool discardDataEventWhileHalting(MemRef_t *bufRef)
void resetIPC()
reset the ShmBuffer to the initial state
void initialize(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize)