CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Member Functions | Private Attributes
evf::FUResourceTable Class Reference

#include <FUResourceTable.h>

Inheritance diagram for evf::FUResourceTable:
evf::IPCMethod

Public Member Functions

bool buildResource (MemRef_t *bufRef)
 
std::vector< UInt_tcellEvtNumbers () const
 
std::vector< pid_t > cellPrcIds () const
 
std::vector< std::string > cellStates () const
 
std::vector< time_t > cellTimeStamps () const
 
void clear ()
 
std::vector< pid_t > clientPrcIds () const
 
std::string clientPrcIdsAsString () const
 
bool discard ()
 
bool discardDataEvent (MemRef_t *bufRef)
 
bool discardDataEventWhileHalting (MemRef_t *bufRef)
 
bool discardDqmEvent (MemRef_t *bufRef)
 
bool discardDqmEventWhileHalting (MemRef_t *bufRef)
 
bool discardWhileHalting (bool sendDiscards)
 
std::vector< std::string > dqmCellStates () const
 
void dropEvent ()
 
 FUResourceTable (bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int, EvffedFillerRB *frb, xdaq::Application *) throw (evf::Exception)
 
bool handleCrashedEP (UInt_t runNumber, pid_t pid)
 
void initialize (bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize) throw (evf::Exception)
 
void lastResort ()
 
UInt_t nbClients () const
 
UInt_t nbResources () const
 
unsigned int nbResources ()
 
void postEndOfLumiSection (MemRef_t *bufRef)
 
std::string printStatus ()
 
void resetCounters ()
 
void resetIPC ()
 reset the ShmBuffer to the initial state More...
 
bool sendData ()
 
bool sendDataWhileHalting ()
 
bool sendDqm ()
 
bool sendDqmWhileHalting ()
 
void shutDownClients ()
 
virtual ~FUResourceTable ()
 
- Public Member Functions inherited from evf::IPCMethod
UInt_t allocateResource ()
 
void dumpEvent (evf::FUShmRawCell *cell)
 
void injectCRCError ()
 
 IPCMethod (bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int timeout, EvffedFillerRB *frb, xdaq::Application *app) throw (evf::Exception)
 
bool isActive () const
 
bool isLastMessageOfEvent (MemRef_t *bufRef)
 
bool isReadyToShutDown () const
 
void lock ()
 
UInt_t nbAllocated () const
 
UInt_t nbAllocSent () const
 
UInt_t nbCompleted () const
 
UInt_t nbCrcErrors () const
 
UInt_t nbDiscarded () const
 
UInt_t nbEolDiscarded () const
 
UInt_t nbEolPosted () const
 
UInt_t nbErrors () const
 
UInt_t nbFreeSlots () const
 
UInt_t nbLost () const
 
UInt_t nbPending () const
 
UInt_t nbPendingSMDiscards () const
 
int nbPendingSMDqmDiscards () const
 
UInt_t nbSent () const
 
UInt_t nbSentDqm () const
 
UInt_t nbSentError () const
 
void releaseResources ()
 releases all FUResource's More...
 
void resetPendingAllocates ()
 resets free resources to the maximum number More...
 
void sendAllocate ()
 
void sendDataEvent (UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
 
void sendDiscard (UInt_t buResourceId)
 
void sendDqmEvent (UInt_t fuDqmId, UInt_t runNumber, UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
 
void sendErrorEvent (UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
 
void sendInitMessage (UInt_t fuResourceId, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize, UInt_t nExpectedEPs)
 
void setActive (bool activeValue)
 
void setDoCrcCheck (UInt_t doCrcCheck)
 
void setDoDumpEvents (UInt_t doDumpEvents)
 
void setReadyToShutDown (bool readyValue)
 
void setRunNumber (UInt_t runNumber)
 
void setStopFlag (bool status)
 
UInt_t shutdownStatus ()
 
UInt_t sumOfSizes () const
 
uint64_t sumOfSquares () const
 
void unlock ()
 
virtual ~IPCMethod ()
 

Private Member Functions

void discardNoReschedule ()
 
void rethrowShmBufferException (evf::Exception &e, std::string where) const throw (evf::Exception)
 
void shutdownWatchdog (unsigned int timeout)
 

Private Attributes

bool sDataActive_
 
bool sDqmActive_
 
FUShmBuffershmBuffer_
 
std::atomic_bool watchDogEnd_
 
std::atomic_bool watchDogSetFailed_
 

Additional Inherited Members

- Protected Attributes inherited from evf::IPCMethod
bool * acceptSMDataDiscard_
 
int * acceptSMDqmDiscard_
 
xdaq::Application * app_
 
BUProxybu_
 
UInt_t doCrcCheck_
 
UInt_t doDumpEvents_
 
EvffedFillerRBfrb_
 
std::queue< UInt_tfreeResourceIds_
 
unsigned int freeResRequiredForAllocate_
 
bool isActive_
 
bool isReadyToShutDown_
 
sem_t lock_
 
log4cplus::Logger log_
 
UInt_t nbAllocated_
 
UInt_t nbAllocSent_
 
UInt_t nbClientsToShutDown_
 
UInt_t nbCompleted_
 
UInt_t nbCrcErrors_
 
UInt_t nbDiscarded_
 
UInt_t nbDqmCells_
 
UInt_t nbEolDiscarded_
 
UInt_t nbEolPosted_
 
UInt_t nbErrors_
 
UInt_t nbLost_
 
UInt_t nbPending_
 
UInt_t nbPendingSMDiscards_
 
std::atomic< int > nbPendingSMDqmDiscards_
 
UInt_t nbRawCells_
 
UInt_t nbRecoCells_
 
UInt_t nbSent_
 
UInt_t nbSentDqm_
 
UInt_t nbSentError_
 
FUResourceVec_t resources_
 
UInt_t runNumber_
 
UInt_t shutdownStatus_
 
unsigned int shutdownTimeout_
 
SMProxysm_
 
bool stopFlag_
 
UInt_t sumOfSizes_
 
uint64_t sumOfSquares_
 

Detailed Description

Table of resources linked with the Shared Memory Buffer

Author:
smorovic

Definition at line 40 of file FUResourceTable.h.

Constructor & Destructor Documentation

FUResourceTable::FUResourceTable ( bool  segmentationMode,
UInt_t  nbRawCells,
UInt_t  nbRecoCells,
UInt_t  nbDqmCells,
UInt_t  rawCellSize,
UInt_t  recoCellSize,
UInt_t  dqmCellSize,
int  freeResReq,
BUProxy bu,
SMProxy sm,
log4cplus::Logger  logger,
unsigned int  timeout,
EvffedFillerRB frb,
xdaq::Application *  app 
)
throw (evf::Exception
)

Definition at line 31 of file FUResourceTable.cc.

35  :
36 
37  // call super constructor
38  IPCMethod(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
39  rawCellSize, recoCellSize, dqmCellSize, freeResReq, bu, sm,
40  logger, timeout, frb, app), shmBuffer_(0)
41 
42 {
43  initialize(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
44  rawCellSize, recoCellSize, dqmCellSize);
45 }
std::ostream & logger()
Definition: fwLog.cc:41
FUShmBuffer * shmBuffer_
IPCMethod(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int timeout, EvffedFillerRB *frb, xdaq::Application *app)
Definition: IPCMethod.cc:28
void initialize(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize)
FUResourceTable::~FUResourceTable ( )
virtual

Definition at line 48 of file FUResourceTable.cc.

References hitfit::clear(), and evf::FUShmBuffer::releaseSharedMemory().

48  {
49  clear();
50  //workloop cancels used to be here in the previous version
51  shmdt( shmBuffer_);
53  LOG4CPLUS_INFO(log_, "SHARED MEMORY SUCCESSFULLY RELEASED.");
54  if (0 != acceptSMDataDiscard_)
55  delete[] acceptSMDataDiscard_;
56  if (0 != acceptSMDqmDiscard_)
57  delete[] acceptSMDqmDiscard_;
58 }
bool * acceptSMDataDiscard_
Definition: IPCMethod.h:352
log4cplus::Logger log_
Definition: IPCMethod.h:342
static bool releaseSharedMemory()
FUShmBuffer * shmBuffer_
int * acceptSMDqmDiscard_
Definition: IPCMethod.h:353

Member Function Documentation

bool FUResourceTable::buildResource ( MemRef_t bufRef)
virtual

Process buffer received via I2O_FU_TAKE message

Implements evf::IPCMethod.

Definition at line 581 of file FUResourceTable.cc.

References evf::FUResource::allocate(), createPayload::block, gather_cfg::cout, evf::FUResource::doCrcCheck(), dumpEvent(), alignCSCRings::e, evf::FUResource::fatalError(), evf::FUResource::isAllocated(), evf::FUResource::isComplete(), CommonMethods::lock(), evf::FUResource::nbCrcErrors(), evf::FUResource::nbErrors(), cmsPerfSuiteHarvest::now, evf::FUResource::process(), evf::FUResource::release(), and evf::FUResource::shmCell().

581  {
582  bool eventComplete = false;
583  // UPDATED
584  bool lastMsg = isLastMessageOfEvent(bufRef);
585  I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =
586  (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();
587 
588  UInt_t fuResourceId = (UInt_t) block->fuTransactionId;
589  UInt_t buResourceId = (UInt_t) block->buResourceId;
590  // Check input
591  if ((int) block->fuTransactionId < 0 || fuResourceId >= nbRawCells_
592  || (int) block->buResourceId < 0) {
593  stringstream failureStr;
594  failureStr << "Received TAKE message with invalid bu/fu resource id:"
595  << " fuResourceId: " << fuResourceId << " buResourceId: "
596  << buResourceId;
597  LOG4CPLUS_ERROR(log_, failureStr.str());
598  XCEPT_RAISE(evf::Exception, failureStr.str());
599  }
600  FUResource* resource = resources_[fuResourceId];
601 
602  // allocate resource
603  if (!resource->fatalError() && !resource->isAllocated()) {
604  FUShmRawCell* cell = 0;
605  try {
606  cell = shmBuffer_->rawCellToWrite();
607  } catch (evf::Exception& e) {
609  "FUResourceTable:buildResource:rawCellToWrite");
610  }
611  if (cell == 0) {
612  bufRef->release();
613  return eventComplete;
614  }
615  resource->allocate(cell);
616  timeval now;
617  gettimeofday(&now, 0);
618 
620  ((uint64_t)(now.tv_sec) << 32) + (uint64_t)(now.tv_usec));
621 
623 
624  if (doCrcCheck_ > 0 && 0 == nbAllocated_ % doCrcCheck_)
625  resource->doCrcCheck(true);
626  else
627  resource->doCrcCheck(false);
628  }
629 
630 #ifdef DEBUG_RES_TAB
631  std::cout << "Received frame for resource " << buResourceId << std::endl;
632 #endif
633  // keep building this resource if it is healthy
634  if (!resource->fatalError()) {
635 #ifdef DEBUG_RES_TAB
636  std::cout << "No fatal error for " << buResourceId << ", keep building..."<< std::endl;
637 #endif
638  resource->process(bufRef);
639  lock();
640  nbErrors_ += resource->nbErrors();
641  nbCrcErrors_ += resource->nbCrcErrors();
642  unlock();
643 #ifdef DEBUG_RES_TAB
644  std::cout << "Checking if resource is complete " << buResourceId << std::endl;
645 #endif
646  // make resource available for pick-up
647  if (resource->isComplete()) {
648 #ifdef DEBUG_RES_TAB
649  std::cout << "@@@@RESOURCE is COMPLETE " << buResourceId << std::endl;
650 #endif
651  lock();
652  nbCompleted_++;
653  nbPending_--;
654  unlock();
655  if (doDumpEvents_ > 0 && nbCompleted_ % doDumpEvents_ == 0)
656  dumpEvent(resource->shmCell());
657  try {
659  } catch (evf::Exception& e) {
661  "FUResourceTable:buildResource:finishWritingRawCell");
662  }
663  eventComplete = true;
664  }
665 
666  }
667  // bad event, release msg, and the whole resource if this was the last one
668  if (resource->fatalError()) {
669  if (lastMsg) {
670  try {
671  shmBuffer_->releaseRawCell(resource->shmCell());
672  } catch (evf::Exception& e) {
674  "FUResourceTable:buildResource:releaseRawCell");
675  }
676  resource->release(true);
677  lock();
678  freeResourceIds_.push(fuResourceId);
679  nbDiscarded_++;
680  nbLost_++;
681  nbPending_--;
682  unlock();
683  bu_->sendDiscard(buResourceId);
684  sendAllocate();
685  }
686  //bufRef->release(); // this should now be safe re: appendToSuperFrag as corrupted blocks will be removed...
687  }
688 
689  return eventComplete;
690 }
void process(MemRef_t *bufRef)
Definition: FUResource.cc:126
bool fatalError() const
Definition: FUResource.h:110
UInt_t doCrcCheck_
Definition: IPCMethod.h:355
void setRBEventCount(uint32_t evtcnt)
void sendDiscard(UInt_t buResourceId)
Definition: BUProxy.cc:108
UInt_t nbErrors_
Definition: IPCMethod.h:377
UInt_t nbDiscarded_
Definition: IPCMethod.h:367
UInt_t nbErrors(bool reset=true)
Definition: FUResource.cc:308
void sendAllocate()
Definition: IPCMethod.cc:111
void releaseRawCell(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:546
bool isLastMessageOfEvent(MemRef_t *bufRef)
Definition: IPCMethod.cc:248
log4cplus::Logger log_
Definition: IPCMethod.h:342
void release(bool detachResource)
Definition: FUResource.cc:82
UInt_t nbRawCells_
Definition: IPCMethod.h:345
void dumpEvent(evf::FUShmRawCell *cell)
Definition: IPCMethod.cc:74
FUShmBuffer * shmBuffer_
UInt_t nbCompleted_
Definition: IPCMethod.h:361
UInt_t nbPending_
Definition: IPCMethod.h:360
void finishWritingRawCell(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:393
UInt_t nbCrcErrors_
Definition: IPCMethod.h:378
FUResourceVec_t resources_
Definition: IPCMethod.h:393
bool isComplete() const
Definition: FUResource.h:221
UInt_t nbLost_
Definition: IPCMethod.h:368
unsigned int UInt_t
Definition: FUTypes.h:12
evf::FUShmRawCell * shmCell()
Definition: FUResource.h:145
unsigned long long uint64_t
Definition: Time.h:15
FUShmRawCell * rawCellToWrite()
Definition: FUShmBuffer.cc:298
bool isAllocated() const
Definition: FUResource.h:113
EvffedFillerRB * frb_
Definition: IPCMethod.h:390
void doCrcCheck(bool doCrcCheck)
Definition: FUResource.h:104
UInt_t nbAllocated_
Definition: IPCMethod.h:359
std::queue< UInt_t > freeResourceIds_
Definition: IPCMethod.h:348
UInt_t nbCrcErrors(bool reset=true)
Definition: FUResource.cc:316
void unlock()
Definition: IPCMethod.h:314
void setRBTimeStamp(uint64_t ts)
tuple cout
Definition: gather_cfg.py:121
void rethrowShmBufferException(evf::Exception &e, std::string where) const
void allocate(FUShmRawCell *shmCell)
Definition: FUResource.cc:63
BUProxy * bu_
Definition: IPCMethod.h:339
UInt_t doDumpEvents_
Definition: IPCMethod.h:356
vector< UInt_t > FUResourceTable::cellEvtNumbers ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1325 of file FUResourceTable.cc.

References alignCSCRings::e, i, n, and query::result.

1325  {
1326  vector < UInt_t > result;
1327  if (0 != shmBuffer_) {
1328  UInt_t n = nbResources();
1329  shmBuffer_->lock();
1330  try {
1331  for (UInt_t i = 0; i < n; i++)
1332  result.push_back(shmBuffer_->evtNumber(i));
1333  } catch (evf::Exception& e) {
1335  "FUResourceTable:cellEvtNumbers:evtNumber");
1336  }
1337  shmBuffer_->unlock();
1338  }
1339  return result;
1340 }
int i
Definition: DBlmapReader.cc:9
unsigned int evtNumber(unsigned int index)
FUShmBuffer * shmBuffer_
tuple result
Definition: query.py:137
UInt_t nbResources() const
unsigned int UInt_t
Definition: FUTypes.h:12
void rethrowShmBufferException(evf::Exception &e, std::string where) const
vector< pid_t > FUResourceTable::cellPrcIds ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1343 of file FUResourceTable.cc.

References alignCSCRings::e, i, n, and query::result.

1343  {
1344  vector < pid_t > result;
1345  if (0 != shmBuffer_) {
1346  UInt_t n = nbResources();
1347  shmBuffer_->lock();
1348  try {
1349  for (UInt_t i = 0; i < n; i++)
1350  result.push_back(shmBuffer_->evtPrcId(i));
1351  } catch (evf::Exception& e) {
1352  rethrowShmBufferException(e, "FUResourceTable:cellPrcIds:evtPrcId");
1353  }
1354  shmBuffer_->unlock();
1355  }
1356  return result;
1357 }
int i
Definition: DBlmapReader.cc:9
FUShmBuffer * shmBuffer_
tuple result
Definition: query.py:137
pid_t evtPrcId(unsigned int index)
UInt_t nbResources() const
unsigned int UInt_t
Definition: FUTypes.h:12
void rethrowShmBufferException(evf::Exception &e, std::string where) const
vector< string > FUResourceTable::cellStates ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1246 of file FUResourceTable.cc.

References evf::evt::DISCARDING, alignCSCRings::e, evf::evt::EMPTY, i, evf::evt::LUMISECTION, n, evf::evt::PROCESSED, evf::evt::PROCESSING, evf::evt::RAWREAD, evf::evt::RAWREADING, evf::evt::RAWWRITING, evf::evt::RAWWRITTEN, evf::evt::RECOWRITING, evf::evt::RECOWRITTEN, query::result, evf::evt::SENDING, evf::evt::SENT, evf::utils::state, evf::evt::STOP, and evf::evt::USEDLS.

1246  {
1247  vector < string > result;
1248  if (0 != shmBuffer_) {
1249  UInt_t n = nbResources();
1250  shmBuffer_->lock();
1251  try {
1252  for (UInt_t i = 0; i < n; i++) {
1254  if (state == evt::EMPTY)
1255  result.push_back("EMPTY");
1256  else if (state == evt::STOP)
1257  result.push_back("STOP");
1258  else if (state == evt::LUMISECTION)
1259  result.push_back("LUMISECTION");
1260  // UPDATED
1261  else if (state == evt::USEDLS)
1262  result.push_back("USEDLS");
1263  else if (state == evt::RAWWRITING)
1264  result.push_back("RAWWRITING");
1265  else if (state == evt::RAWWRITTEN)
1266  result.push_back("RAWWRITTEN");
1267  else if (state == evt::RAWREADING)
1268  result.push_back("RAWREADING");
1269  else if (state == evt::RAWREAD)
1270  result.push_back("RAWREAD");
1271  else if (state == evt::PROCESSING)
1272  result.push_back("PROCESSING");
1273  else if (state == evt::PROCESSED)
1274  result.push_back("PROCESSED");
1275  else if (state == evt::RECOWRITING)
1276  result.push_back("RECOWRITING");
1277  else if (state == evt::RECOWRITTEN)
1278  result.push_back("RECOWRITTEN");
1279  else if (state == evt::SENDING)
1280  result.push_back("SENDING");
1281  else if (state == evt::SENT)
1282  result.push_back("SENT");
1283  else if (state == evt::DISCARDING)
1284  result.push_back("DISCARDING");
1285  }
1286  } catch (evf::Exception& e) {
1287  rethrowShmBufferException(e, "FUResourceTable:cellStates:evtState");
1288  }
1289  shmBuffer_->unlock();
1290  }
1291  return result;
1292 }
int i
Definition: DBlmapReader.cc:9
evt::State_t evtState(unsigned int index)
FUShmBuffer * shmBuffer_
tuple result
Definition: query.py:137
UInt_t nbResources() const
unsigned int UInt_t
Definition: FUTypes.h:12
char state
Definition: procUtils.cc:75
void rethrowShmBufferException(evf::Exception &e, std::string where) const
vector< time_t > FUResourceTable::cellTimeStamps ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1360 of file FUResourceTable.cc.

References alignCSCRings::e, i, n, and query::result.

1360  {
1361  vector < time_t > result;
1362  try {
1363  if (0 != shmBuffer_) {
1364  UInt_t n = nbResources();
1365  shmBuffer_->lock();
1366  for (UInt_t i = 0; i < n; i++)
1367  result.push_back(shmBuffer_->evtTimeStamp(i));
1368  shmBuffer_->unlock();
1369  }
1370  } catch (evf::Exception& e) {
1372  "FUResourceTable:cellTimeStamps:evtTimeStamp");
1373  }
1374  return result;
1375 }
int i
Definition: DBlmapReader.cc:9
time_t evtTimeStamp(unsigned int index)
FUShmBuffer * shmBuffer_
tuple result
Definition: query.py:137
UInt_t nbResources() const
unsigned int UInt_t
Definition: FUTypes.h:12
void rethrowShmBufferException(evf::Exception &e, std::string where) const
void FUResourceTable::clear ( void  )
virtual
vector< pid_t > FUResourceTable::clientPrcIds ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1211 of file FUResourceTable.cc.

References alignCSCRings::e, i, n, and query::result.

1211  {
1212  vector < pid_t > result;
1213  try {
1214  if (0 != shmBuffer_) {
1215  UInt_t n = nbClients();
1216  for (UInt_t i = 0; i < n; i++)
1217  result.push_back(shmBuffer_->clientPrcId(i));
1218  }
1219  } catch (evf::Exception& e) {
1221  "FUResourceTable:clientPrcIds:clientPrcIds");
1222  }
1223  return result;
1224 }
int i
Definition: DBlmapReader.cc:9
pid_t clientPrcId(unsigned int index)
UInt_t nbClients() const
FUShmBuffer * shmBuffer_
tuple result
Definition: query.py:137
unsigned int UInt_t
Definition: FUTypes.h:12
void rethrowShmBufferException(evf::Exception &e, std::string where) const
string FUResourceTable::clientPrcIdsAsString ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1227 of file FUResourceTable.cc.

References alignCSCRings::e, i, and n.

1227  {
1228  stringstream ss;
1229  try {
1230  if (0 != shmBuffer_) {
1231  UInt_t n = nbClients();
1232  for (UInt_t i = 0; i < n; i++) {
1233  if (i > 0)
1234  ss << ",";
1235  ss << shmBuffer_->clientPrcId(i);
1236  }
1237  }
1238  } catch (evf::Exception& e) {
1240  "FUResourceTable:clientPrcIdsAsString:clientPrcId");
1241  }
1242  return ss.str();
1243 }
int i
Definition: DBlmapReader.cc:9
pid_t clientPrcId(unsigned int index)
UInt_t nbClients() const
FUShmBuffer * shmBuffer_
unsigned int UInt_t
Definition: FUTypes.h:12
void rethrowShmBufferException(evf::Exception &e, std::string where) const
bool FUResourceTable::discard ( )
virtual

Corresponds to the Discard workloop, to be called in normal operation.

Implements evf::IPCMethod.

Definition at line 438 of file FUResourceTable.cc.

References evf::FUShmRawCell::buResourceId(), alignCSCRings::e, evf::evt::EMPTY, evf::FUShmRawCell::fuResourceId(), evf::FUShmRawCell::index(), CommonMethods::lock(), evf::utils::state, evf::evt::STOP, and evf::evt::USEDLS.

438  {
439  FUShmRawCell* cell = 0;
440  // initialize to a value to avoid warnings
442  try {
443  cell = shmBuffer_->rawCellToDiscard();
444  state = shmBuffer_->evtState(cell->index());
445  } catch (evf::Exception& e) {
447  "FUResourceTable:discard:rawCellToRead/evtState");
448  }
449 
450  bool reschedule = true;
451  bool shutDown = (state == evt::STOP);
452  bool isLumi = (state == evt::USEDLS);
453  UInt_t fuResourceId = cell->fuResourceId();
454  UInt_t buResourceId = cell->buResourceId();
455 
456  if (state == evt::EMPTY) {
457  LOG4CPLUS_ERROR(log_, "WARNING! ATTEMPTING TO DISCARD EMPTY CELL!!!");
458  return true;
459  }
460 
461  if (shutDown) {
462  LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
463  if (nbClientsToShutDown_ > 0)
465  if (nbClientsToShutDown_ == 0) {
466  LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
467  isActive_ = false;
468  reschedule = false;
469  }
470  }
471 
472  try {
473  shmBuffer_->discardRawCell(cell);
474  } catch (evf::Exception& e) {
475  rethrowShmBufferException(e, "FUResourceTable:discard:discardRawCell");
476  }
477  // UPDATED
478  if (isLumi)
479  nbEolDiscarded_++;
480 
481  if (!shutDown && !isLumi) {
482  if (fuResourceId >= nbResources()) {
483  LOG4CPLUS_WARN(
484  log_,
485  "cell " << cell->index() << " in state " << state
486  << " scheduled for discard has no associated FU resource ");
487  } else {
488  resources_[fuResourceId]->release(true);
489  lock();
490  freeResourceIds_.push(fuResourceId);
491  assert(freeResourceIds_.size() <= resources_.size());
492  unlock();
493 
494  sendDiscard(buResourceId);
495  sendAllocate();
496  }
497  }
498 
499  if (!reschedule) {
501  }
502 
503  return reschedule;
504 }
FUShmRawCell * rawCellToDiscard()
Definition: FUShmBuffer.cc:373
void sendDiscard(UInt_t buResourceId)
Definition: IPCMethod.cc:150
void sendAllocate()
Definition: IPCMethod.cc:111
log4cplus::Logger log_
Definition: IPCMethod.h:342
UInt_t nbEolDiscarded_
Definition: IPCMethod.h:371
evt::State_t evtState(unsigned int index)
FUShmBuffer * shmBuffer_
void discardRawCell(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:495
FUResourceVec_t resources_
Definition: IPCMethod.h:393
UInt_t nbResources() const
unsigned int UInt_t
Definition: FUTypes.h:12
UInt_t nbClientsToShutDown_
Definition: IPCMethod.h:373
char state
Definition: procUtils.cc:75
unsigned int index() const
Definition: FUShmRawCell.h:25
unsigned int fuResourceId() const
Definition: FUShmRawCell.h:26
std::queue< UInt_t > freeResourceIds_
Definition: IPCMethod.h:348
void unlock()
Definition: IPCMethod.h:314
void rethrowShmBufferException(evf::Exception &e, std::string where) const
unsigned int buResourceId() const
Definition: FUShmRawCell.h:27
bool FUResourceTable::discardDataEvent ( MemRef_t bufRef)
virtual

Process buffer received via I2O_SM_DATA_DISCARD message in normal operation.

Implements evf::IPCMethod.

Definition at line 693 of file FUResourceTable.cc.

References alignCSCRings::e, CommonMethods::lock(), lumiQueryAPI::msg, and _I2O_FU_DATA_DISCARD_MESSAGE_FRAME::rbBufferID.

693  {
695  msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
696  UInt_t recoIndex = msg->rbBufferID;
697 
698  // Check input
699  if ((int) msg->rbBufferID < 0 || recoIndex >= nbRecoCells_)
700  LOG4CPLUS_ERROR(
701  log_,
702  "Received DISCARD DATA message with invalid recoIndex:"
703  << recoIndex);
704 
705  if (acceptSMDataDiscard_[recoIndex]) {
706  lock();
708  unlock();
709  acceptSMDataDiscard_[recoIndex] = false;
710 
711  try {
712  shmBuffer_->discardRecoCell(recoIndex);
713  } catch (evf::Exception& e) {
715  "FUResourceTable:discardDataEvent:discardRecoCell");
716  }
717 
718  } else {
719  LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
720  }
721 
722  bufRef->release();
723  return true;
724 }
void discardRecoCell(unsigned int iCell)
Definition: FUShmBuffer.cc:501
bool * acceptSMDataDiscard_
Definition: IPCMethod.h:352
log4cplus::Logger log_
Definition: IPCMethod.h:342
FUShmBuffer * shmBuffer_
UInt_t nbPendingSMDiscards_
Definition: IPCMethod.h:365
unsigned int UInt_t
Definition: FUTypes.h:12
UInt_t nbRecoCells_
Definition: IPCMethod.h:346
void unlock()
Definition: IPCMethod.h:314
void rethrowShmBufferException(evf::Exception &e, std::string where) const
bool FUResourceTable::discardDataEventWhileHalting ( MemRef_t bufRef)
virtual

Process buffer received via I2O_SM_DATA_DISCARD message while halting.

Implements evf::IPCMethod.

Definition at line 727 of file FUResourceTable.cc.

References CommonMethods::lock(), lumiQueryAPI::msg, and _I2O_FU_DATA_DISCARD_MESSAGE_FRAME::rbBufferID.

727  {
729  msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
730  UInt_t recoIndex = msg->rbBufferID;
731 
732  // Check input
733  if ((int) msg->rbBufferID < 0 || recoIndex >= nbRecoCells_)
734  LOG4CPLUS_ERROR(
735  log_,
736  "Received DISCARD DATA message with invalid recoIndex:"
737  << recoIndex);
738 
739  if (acceptSMDataDiscard_[recoIndex]) {
740  lock();
742  unlock();
743  acceptSMDataDiscard_[recoIndex] = false;
744 
745  } else {
746  LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
747  }
748 
749  bufRef->release();
750  return false;
751 }
bool * acceptSMDataDiscard_
Definition: IPCMethod.h:352
log4cplus::Logger log_
Definition: IPCMethod.h:342
UInt_t nbPendingSMDiscards_
Definition: IPCMethod.h:365
unsigned int UInt_t
Definition: FUTypes.h:12
UInt_t nbRecoCells_
Definition: IPCMethod.h:346
void unlock()
Definition: IPCMethod.h:314
bool FUResourceTable::discardDqmEvent ( MemRef_t bufRef)
virtual

Process buffer received via I2O_SM_DQM_DISCARD message in normal operation.

Implements evf::IPCMethod.

Definition at line 754 of file FUResourceTable.cc.

References alignCSCRings::e, lumiQueryAPI::msg, _I2O_FU_DQM_DISCARD_MESSAGE_FRAME::rbBufferID, and evf::dqm::SENT.

754  {
756  msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
757  UInt_t dqmIndex = msg->rbBufferID;
758 
759  // Check input
760  if ((int) msg->rbBufferID < 0 || dqmIndex >= nbDqmCells_)
761  LOG4CPLUS_ERROR(
762  log_,
763  "Received DISCARD DQM message with invalid dqmIndex:"
764  << dqmIndex);
765 
766  unsigned int ntries = 0;
767  try {
768  while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
769  if (ntries)//tolerate one attempt
770  LOG4CPLUS_WARN(
771  log_,
772  "DQM discard for cell " << dqmIndex
773  << " which is not yet in SENT state - waiting");
774  ::usleep(10000);
775  if (ntries++ > 10) {
776  LOG4CPLUS_ERROR(
777  log_,
778  "DQM cell " << dqmIndex
779  << " discard timed out while cell still in state "
780  << shmBuffer_->dqmState(dqmIndex));
781  bufRef->release();
782  return true;
783  }
784  }
785  } catch (evf::Exception& e) {
786  rethrowShmBufferException(e, "FUResourceTable:discardDqmEvent:dqmState");
787  }
788  if (acceptSMDqmDiscard_[dqmIndex] > 0) {
789  acceptSMDqmDiscard_[dqmIndex]--;
790  if (--nbPendingSMDqmDiscards_ < 0) {
791  LOG4CPLUS_WARN(
792  log_,
793  "Spurious??? DQM discard by StorageManager, index "
794  << dqmIndex << " cell state "
795  << shmBuffer_->dqmState(dqmIndex)
796  << " accept flag " << acceptSMDqmDiscard_[dqmIndex]);
797  }
798  try {
799  shmBuffer_->discardDqmCell(dqmIndex);
800  } catch (evf::Exception& e) {
802  "FUResourceTable:discardDqmEvent:discardDqmCell");
803  }
804 
805  } else {
806  LOG4CPLUS_ERROR(
807  log_,
808  "Spurious DQM discard for cell " << dqmIndex
809  << " from StorageManager while cell is not accepting discards");
810  }
811 
812  bufRef->release();
813  return true;
814 }
log4cplus::Logger log_
Definition: IPCMethod.h:342
void discardDqmCell(unsigned int iCell)
Definition: FUShmBuffer.cc:527
FUShmBuffer * shmBuffer_
int * acceptSMDqmDiscard_
Definition: IPCMethod.h:353
dqm::State_t dqmState(unsigned int index)
unsigned int UInt_t
Definition: FUTypes.h:12
std::atomic< int > nbPendingSMDqmDiscards_
Definition: IPCMethod.h:366
void rethrowShmBufferException(evf::Exception &e, std::string where) const
UInt_t nbDqmCells_
Definition: IPCMethod.h:344
bool FUResourceTable::discardDqmEventWhileHalting ( MemRef_t bufRef)
virtual

Process buffer received via I2O_SM_DQM_DISCARD message while halting.

Implements evf::IPCMethod.

Definition at line 817 of file FUResourceTable.cc.

References alignCSCRings::e, lumiQueryAPI::msg, _I2O_FU_DQM_DISCARD_MESSAGE_FRAME::rbBufferID, and evf::dqm::SENT.

817  {
819  msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
820  UInt_t dqmIndex = msg->rbBufferID;
821 
822  // Check input
823  if ((int) msg->rbBufferID < 0 || dqmIndex >= nbDqmCells_)
824  LOG4CPLUS_ERROR(
825  log_,
826  "Received DISCARD DQM message with invalid dqmIndex:"
827  << dqmIndex);
828 
829  unsigned int ntries = 0;
830  try {
831  while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
832  if (ntries)//tolerate one attempt
833  LOG4CPLUS_WARN(
834  log_,
835  "DQM discard for cell " << dqmIndex
836  << " which is not yet in SENT state - waiting");
837  ::usleep(10000);
838  if (ntries++ > 10) {
839  LOG4CPLUS_ERROR(
840  log_,
841  "DQM cell " << dqmIndex
842  << " discard timed out while cell still in state "
843  << shmBuffer_->dqmState(dqmIndex));
844  bufRef->release();
845  return true;
846  }
847  }
848  } catch (evf::Exception& e) {
850  "FUResourceTable:discardDqmEventWhileHalting:dqmState(2)");
851  }
852  if (acceptSMDqmDiscard_[dqmIndex] > 0) {
853  acceptSMDqmDiscard_[dqmIndex]--;
854  if (--nbPendingSMDqmDiscards_ < 0) {
855  try {
856  LOG4CPLUS_WARN(
857  log_,
858  "Spurious??? DQM discard by StorageManager, index "
859  << dqmIndex << " cell state "
860  << shmBuffer_->dqmState(dqmIndex)
861  << " accept flag "
862  << acceptSMDqmDiscard_[dqmIndex]);
863  } catch (evf::Exception& e) {
865  "FUResourceTable:discardDqmEventWhileHalting:dqmState");
866  }
867  }
868 
869  } else {
870  LOG4CPLUS_ERROR(
871  log_,
872  "Spurious DQM discard for cell " << dqmIndex
873  << " from StorageManager while cell is not accepting discards");
874  }
875 
876  bufRef->release();
877  return false;
878 }
log4cplus::Logger log_
Definition: IPCMethod.h:342
FUShmBuffer * shmBuffer_
int * acceptSMDqmDiscard_
Definition: IPCMethod.h:353
dqm::State_t dqmState(unsigned int index)
unsigned int UInt_t
Definition: FUTypes.h:12
std::atomic< int > nbPendingSMDqmDiscards_
Definition: IPCMethod.h:366
void rethrowShmBufferException(evf::Exception &e, std::string where) const
UInt_t nbDqmCells_
Definition: IPCMethod.h:344
void FUResourceTable::discardNoReschedule ( )
private

Called when entering the shutdown cycle. The function sets readyToShutDown to true, allowing the Resource Table to be safely shut down.

Definition at line 354 of file FUResourceTable.cc.

References prof2calltree::count, gather_cfg::cout, alignCSCRings::e, evf::dqm::EMPTY, i, n, evf::FUShmBuffer::shm_nattch(), and evf::utils::state.

354  {
355  std::cout << " entered shutdown cycle " << std::endl;
356  shutdownStatus_|=1<<11;
357  try {
359  } catch (evf::Exception& e) {
361  "FUResourceTable:discardNoReschedule:writeRecoEmptyEvent");
362  }
363 
364  UInt_t count = 0;
365  while (count < 100) {
366  std::cout << " shutdown cycle " << shmBuffer_->nClients() << " "
367  << FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << std::endl;
369  shmBuffer_->shmid()) == 1) {
370  shutdownStatus_|=1<<12;
371  //isReadyToShutDown_ = true;
372  break;
373  } else {
374  count++;
375  std::cout << " shutdown cycle attempt " << count << std::endl;
376  LOG4CPLUS_DEBUG(
377  log_,
378  "FUResourceTable: Wait for all clients to detach,"
379  << " nClients=" << shmBuffer_->nClients()
380  << " nattch=" << FUShmBuffer::shm_nattch(
381  shmBuffer_->shmid()) << " (" << count << ")");
382  ::usleep( shutdownTimeout_);
383  if (count * shutdownTimeout_ > 10000000)
384  LOG4CPLUS_WARN(
385  log_,
386  "FUResourceTable:LONG Wait (>10s) for all clients to detach,"
387  << " nClients=" << shmBuffer_->nClients()
388  << " nattch=" << FUShmBuffer::shm_nattch(
389  shmBuffer_->shmid()) << " (" << count << ")");
390 
391  }
392  }
393 
394  bool allEmpty = false;
395  std::cout << "Checking if all dqm cells are empty " << std::endl;
396  while (!allEmpty) {
397  UInt_t n = nbDqmCells_;
398  allEmpty = true;
399  shmBuffer_->lock();
400  for (UInt_t i = 0; i < n; i++) {
401  // initialize to a value to avoid warnings
403  try {
404  state = shmBuffer_->dqmState(i);
405  } catch (evf::Exception& e) {
407  "FUResourceTable:discardNoReschedule:dqmState");
408  }
409  if (state != dqm::EMPTY)
410  allEmpty = false;
411  }
412  shmBuffer_->unlock();
413  }
414  shutdownStatus_|=1<<13;
415 
416  std::cout << "Number of pending discards before declaring ready to shut down: " << nbPendingSMDqmDiscards_ << std::endl;
417  if (nbPendingSMDqmDiscards_ != 0) {
418  LOG4CPLUS_WARN(
419  log_,
420  "FUResourceTable: pending DQM discards not zero: ="
422  << " while cells are all empty. This may cause problems at next start ");
423 
424  }
425 
426  try {
428  } catch (evf::Exception& e) {
430  "FUResourceTable:discardNoReschedule:writeDqmEmptyEvent");
431  }
432 
433  isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the
434  // sendDqm loop has been shut down as well
435 }
int i
Definition: DBlmapReader.cc:9
static int shm_nattch(int shmid)
int shmid() const
Definition: FUShmBuffer.h:69
bool isReadyToShutDown_
Definition: IPCMethod.h:374
log4cplus::Logger log_
Definition: IPCMethod.h:342
void writeRecoEmptyEvent()
Definition: FUShmBuffer.cc:614
FUShmBuffer * shmBuffer_
unsigned int nClients() const
Definition: FUShmBuffer.h:71
dqm::State_t dqmState(unsigned int index)
UInt_t shutdownStatus_
Definition: IPCMethod.h:395
unsigned int UInt_t
Definition: FUTypes.h:12
unsigned int shutdownTimeout_
Definition: IPCMethod.h:357
void writeDqmEmptyEvent()
Definition: FUShmBuffer.cc:626
std::atomic< int > nbPendingSMDqmDiscards_
Definition: IPCMethod.h:366
char state
Definition: procUtils.cc:75
tuple cout
Definition: gather_cfg.py:121
void rethrowShmBufferException(evf::Exception &e, std::string where) const
UInt_t nbDqmCells_
Definition: IPCMethod.h:344
bool FUResourceTable::discardWhileHalting ( bool  sendDiscards)
virtual

Function called when FSM is in state stopping / halting, in Discard workloop.

Implements evf::IPCMethod.

Definition at line 507 of file FUResourceTable.cc.

References evf::FUShmRawCell::buResourceId(), alignCSCRings::e, evf::evt::EMPTY, evf::FUShmRawCell::fuResourceId(), evf::FUShmRawCell::index(), CommonMethods::lock(), evf::utils::state, evf::evt::STOP, and evf::evt::USEDLS.

507  {
508  FUShmRawCell* cell = 0;
509  // initialize to a value to avoid warnings
511  try {
512  cell = shmBuffer_->rawCellToDiscard();
513  state = shmBuffer_->evtState(cell->index());
514  } catch (evf::Exception& e) {
516  "FUResourceTable:discardWhileHalting:rawCellToRead/evtState");
517  }
518 
519  bool reschedule = true;
520  bool shutDown = (state == evt::STOP);
521  bool isLumi = (state == evt::USEDLS);
522  UInt_t fuResourceId = cell->fuResourceId();
523  UInt_t buResourceId = cell->buResourceId();
524 
525  if (state == evt::EMPTY) {
526  LOG4CPLUS_ERROR(log_, "WARNING! ATTEMPTING TO DISCARD EMPTY CELL!!!");
527  return true;
528  }
529 
530  if (shutDown) {
531  LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
532  if (nbClientsToShutDown_ > 0)
534  if (nbClientsToShutDown_ == 0) {
535  LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
536  isActive_ = false;
537  reschedule = false;
538  }
539  }
540 
541  try {
542  shmBuffer_->discardRawCell(cell);
543  } catch (evf::Exception& e) {
545  "FUResourceTable:discardWhileHalting:discardRawCell");
546  }
547  // UPDATED
548  if (isLumi)
549  nbEolDiscarded_++;
550 
551  if (!shutDown && !isLumi) {
552  if (fuResourceId >= nbResources()) {
553  LOG4CPLUS_WARN(
554  log_,
555  "cell " << cell->index() << " in state " << state
556  << " scheduled for discard has no associated FU resource ");
557  } else {
558  resources_[fuResourceId]->release(true);
559  lock();
560  freeResourceIds_.push(fuResourceId);
561  assert(freeResourceIds_.size() <= resources_.size());
562  unlock();
563 
564  /*
565  sendDiscard(buResourceId);
566  sendAllocate();
567  */
568  if (sendDiscards)
569  sendDiscard(buResourceId);
570  }
571  }
572 
573  if (!reschedule) {
575  }
576 
577  return reschedule;
578 }
FUShmRawCell * rawCellToDiscard()
Definition: FUShmBuffer.cc:373
void sendDiscard(UInt_t buResourceId)
Definition: IPCMethod.cc:150
log4cplus::Logger log_
Definition: IPCMethod.h:342
UInt_t nbEolDiscarded_
Definition: IPCMethod.h:371
evt::State_t evtState(unsigned int index)
FUShmBuffer * shmBuffer_
void discardRawCell(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:495
FUResourceVec_t resources_
Definition: IPCMethod.h:393
UInt_t nbResources() const
unsigned int UInt_t
Definition: FUTypes.h:12
UInt_t nbClientsToShutDown_
Definition: IPCMethod.h:373
char state
Definition: procUtils.cc:75
unsigned int index() const
Definition: FUShmRawCell.h:25
unsigned int fuResourceId() const
Definition: FUShmRawCell.h:26
std::queue< UInt_t > freeResourceIds_
Definition: IPCMethod.h:348
void unlock()
Definition: IPCMethod.h:314
void rethrowShmBufferException(evf::Exception &e, std::string where) const
unsigned int buResourceId() const
Definition: FUShmRawCell.h:27
vector< string > FUResourceTable::dqmCellStates ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1294 of file FUResourceTable.cc.

References evf::dqm::DISCARDING, alignCSCRings::e, evf::dqm::EMPTY, i, n, query::result, evf::dqm::SENDING, evf::dqm::SENT, evf::utils::state, evf::dqm::WRITING, and evf::dqm::WRITTEN.

1294  {
1295  vector < string > result;
1296  if (0 != shmBuffer_) {
1297  UInt_t n = nbDqmCells_;
1298  shmBuffer_->lock();
1299  try {
1300  for (UInt_t i = 0; i < n; i++) {
1302  if (state == dqm::EMPTY)
1303  result.push_back("EMPTY");
1304  else if (state == dqm::WRITING)
1305  result.push_back("WRITING");
1306  else if (state == dqm::WRITTEN)
1307  result.push_back("WRITTEN");
1308  else if (state == dqm::SENDING)
1309  result.push_back("SENDING");
1310  else if (state == dqm::SENT)
1311  result.push_back("SENT");
1312  else if (state == dqm::DISCARDING)
1313  result.push_back("DISCARDING");
1314  }
1315  } catch (evf::Exception& e) {
1317  "FUResourceTable:dqmCellStates:dqmState");
1318  }
1319  shmBuffer_->unlock();
1320  }
1321  return result;
1322 }
int i
Definition: DBlmapReader.cc:9
FUShmBuffer * shmBuffer_
tuple result
Definition: query.py:137
dqm::State_t dqmState(unsigned int index)
unsigned int UInt_t
Definition: FUTypes.h:12
char state
Definition: procUtils.cc:75
void rethrowShmBufferException(evf::Exception &e, std::string where) const
UInt_t nbDqmCells_
Definition: IPCMethod.h:344
void FUResourceTable::dropEvent ( )
virtual

Drop next available event.

Implements evf::IPCMethod.

Definition at line 908 of file FUResourceTable.cc.

References alignCSCRings::e, and evf::FUShmRawCell::fuResourceId().

908  {
909  FUShmRawCell* cell = 0;
910  try {
911  cell = shmBuffer_->rawCellToRead();
912  } catch (evf::Exception& e) {
913  rethrowShmBufferException(e, "FUResourceTable:dropEvent:rawCellToRead");
914  }
915  UInt_t fuResourceId = cell->fuResourceId();
916  try {
918  shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
919  } catch (evf::Exception& e) {
921  "FUResourceTable:dropEvent:finishReadingRawCell/scheduleRawCellForDiscard");
922  }
923 }
FUShmRawCell * rawCellToRead()
Definition: FUShmBuffer.cc:317
FUShmBuffer * shmBuffer_
unsigned int UInt_t
Definition: FUTypes.h:12
unsigned int fuResourceId() const
Definition: FUShmRawCell.h:26
void rethrowShmBufferException(evf::Exception &e, std::string where) const
void finishReadingRawCell(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:410
void scheduleRawCellForDiscard(unsigned int iCell)
Definition: FUShmBuffer.cc:454
bool FUResourceTable::handleCrashedEP ( UInt_t  runNumber,
pid_t  pid 
)
virtual

Send event belonging to crashed process to error stream. Return false if no event is found

Implements evf::IPCMethod.

Definition at line 926 of file FUResourceTable.cc.

References alignCSCRings::e, i, and summarizeEdmComparisonLogfiles::success.

926  {
927  bool retval = false;
928  vector < pid_t > pids = cellPrcIds();
929  UInt_t iRawCell = pids.size();
930  for (UInt_t i = 0; i < pids.size(); i++) {
931  if (pid == pids[i]) {
932  iRawCell = i;
933  break;
934  }
935  }
936 
937  if (iRawCell < pids.size()) {
938  try {
939  bool shmret = shmBuffer_->writeErrorEventData(runNumber, pid, iRawCell, true);
940  if (!shmret)
941  LOG4CPLUS_WARN(log_,"Problem writing to the error stream.");
942  } catch (evf::Exception& e) {
944  "FUResourceTable:handleCrashedEP:writeErrorEventData");
945  }
946  retval = true;
947  } else
948  LOG4CPLUS_WARN(log_,
949  "No raw data to send to error stream for process " << pid);
950  try {
952  if (!success)
953  LOG4CPLUS_WARN(log_,
954  "removeClientPrcId: " << pid << " not in shared memory index, was in raw cell " << iRawCell);
955  } catch (evf::Exception& e) {
957  "FUResourceTable:handleCrashedEP:removeClientPrcId");
958  }
959  return retval;
960 }
int i
Definition: DBlmapReader.cc:9
bool writeErrorEventData(unsigned int runNumber, unsigned int fuProcessId, unsigned int iRawCell, bool checkValue)
Definition: FUShmBuffer.cc:746
log4cplus::Logger log_
Definition: IPCMethod.h:342
bool removeClientPrcId(pid_t prcId)
FUShmBuffer * shmBuffer_
unsigned int UInt_t
Definition: FUTypes.h:12
std::vector< pid_t > cellPrcIds() const
void rethrowShmBufferException(evf::Exception &e, std::string where) const
void FUResourceTable::initialize ( bool  segmentationMode,
UInt_t  nbRawCells,
UInt_t  nbRecoCells,
UInt_t  nbDqmCells,
UInt_t  rawCellSize,
UInt_t  recoCellSize,
UInt_t  dqmCellSize 
)
throw (evf::Exception
)

Initialization of the Resource Table with the required resources

Definition at line 65 of file FUResourceTable.cc.

References hitfit::clear(), evf::FUShmBuffer::createShmBuffer(), i, lumiQueryAPI::msg, and evf::FUResource::release().

67  {
68  clear();
69 
70  shmBuffer_ = FUShmBuffer::createShmBuffer(segmentationMode, nbRawCells,
71  nbRecoCells, nbDqmCells, rawCellSize, recoCellSize, dqmCellSize);
72  if (0 == shmBuffer_) {
73  string msg = "CREATION OF SHARED MEMORY SEGMENT FAILED!";
74  LOG4CPLUS_FATAL(log_, msg);
75  XCEPT_RAISE(evf::Exception, msg);
76  }
77 
78  for (UInt_t i = 0; i < nbRawCells_; i++) {
79  FUResource* newResource = new FUResource(i, log_, frb_, app_);
80  newResource->release(true);
81  resources_.push_back(newResource);
82  freeResourceIds_.push(i);
83  }
84 
85  acceptSMDataDiscard_ = new bool[nbRecoCells];
86  acceptSMDqmDiscard_ = new int[nbDqmCells];
87 
88  resetCounters();
89  stopFlag_=false;
90 }
int i
Definition: DBlmapReader.cc:9
bool * acceptSMDataDiscard_
Definition: IPCMethod.h:352
log4cplus::Logger log_
Definition: IPCMethod.h:342
void release(bool detachResource)
Definition: FUResource.cc:82
UInt_t nbRawCells_
Definition: IPCMethod.h:345
FUShmBuffer * shmBuffer_
int * acceptSMDqmDiscard_
Definition: IPCMethod.h:353
FUResourceVec_t resources_
Definition: IPCMethod.h:393
xdaq::Application * app_
Definition: IPCMethod.h:391
static FUShmBuffer * createShmBuffer(bool semgmentationMode, unsigned int nRawCells, unsigned int nRecoCells, unsigned int nDqmCells, unsigned int rawCellSize=0x400000, unsigned int recoCellSize=0x400000, unsigned int dqmCellSize=0x400000)
Definition: FUShmBuffer.cc:917
unsigned int UInt_t
Definition: FUTypes.h:12
EvffedFillerRB * frb_
Definition: IPCMethod.h:390
std::queue< UInt_t > freeResourceIds_
Definition: IPCMethod.h:348
void FUResourceTable::lastResort ( )
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 1381 of file FUResourceTable.cc.

References gather_cfg::cout, alignCSCRings::e, and evf::FUShmRawCell::index().

1381  {
1382  try {
1383  ostringstream ost;
1384  ost << "lastResort: " << shmBuffer_->nbRawCellsToRead()
1385  << " more rawcells to read ";
1386  LOG4CPLUS_WARN(log_,ost.str());
1387  std::cout << ost.str() << std::endl;
1388 
1389  while (shmBuffer_->nbRawCellsToRead() != 0) {
1390  FUShmRawCell* newCell = shmBuffer_->rawCellToRead();
1391  std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
1392  << std::endl;
1393  // UPDATED
1394  LOG4CPLUS_WARN(log_,"lastResort: Scheduling raw cell (server side) "<< newCell->index());
1396 
1397  std::cout << "lastResort: schedule raw cell for discard "
1398  << newCell->index() << std::endl;
1399  }
1400  //trigger the shutdown (again?)
1401  LOG4CPLUS_WARN(log_,"lastResort: scheduling empty raw cell (server side) ");
1403  LOG4CPLUS_WARN(log_,"lastResort: Finished. cells remaining: " << shmBuffer_->nbRawCellsToRead());
1404  } catch (evf::Exception& e) {
1406  e,
1407  "FUResourceTable:lastResort:nbRawCellsToRead/scheduleRawCellForDiscardServerSide");
1408  }
1409  LOG4CPLUS_WARN(log_,"Last resort finished ");
1410 }
FUShmRawCell * rawCellToRead()
Definition: FUShmBuffer.cc:317
void scheduleRawCellForDiscardServerSide(unsigned int iCell)
Definition: FUShmBuffer.cc:478
int nbRawCellsToRead() const
Definition: FUShmBuffer.cc:293
log4cplus::Logger log_
Definition: IPCMethod.h:342
FUShmBuffer * shmBuffer_
void scheduleRawEmptyCellForDiscard()
Definition: FUShmBuffer.cc:638
unsigned int index() const
Definition: FUShmRawCell.h:25
tuple cout
Definition: gather_cfg.py:121
void rethrowShmBufferException(evf::Exception &e, std::string where) const
UInt_t FUResourceTable::nbClients ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1199 of file FUResourceTable.cc.

References alignCSCRings::e, and query::result.

1199  {
1200  UInt_t result(0);
1201  try {
1202  if (0 != shmBuffer_)
1203  result = shmBuffer_->nClients();
1204  } catch (evf::Exception& e) {
1205  rethrowShmBufferException(e, "FUResourceTable:nbClients:nClients");
1206  }
1207  return result;
1208 }
FUShmBuffer * shmBuffer_
unsigned int nClients() const
Definition: FUShmBuffer.h:71
tuple result
Definition: query.py:137
unsigned int UInt_t
Definition: FUTypes.h:12
void rethrowShmBufferException(evf::Exception &e, std::string where) const
UInt_t evf::FUResourceTable::nbResources ( ) const
inlinevirtual

Implements evf::IPCMethod.

Definition at line 145 of file FUResourceTable.h.

References evf::IPCMethod::resources_.

145  {
146  return resources_.size();
147  }
FUResourceVec_t resources_
Definition: IPCMethod.h:393
unsigned int evf::FUResourceTable::nbResources ( )
inline

Definition at line 162 of file FUResourceTable.h.

References evf::IPCMethod::resources_.

162  {
163  return resources_.size();
164  }
FUResourceVec_t resources_
Definition: IPCMethod.h:393
void FUResourceTable::postEndOfLumiSection ( MemRef_t bufRef)
virtual

Post End-of-LumiSection to Shared Memory.

Implements evf::IPCMethod.

Definition at line 881 of file FUResourceTable.cc.

References alignCSCRings::e, i, if(), and lumiQueryAPI::msg.

881  {
882  I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME
883  *msg =
884  (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *) bufRef->getDataLocation();
885  //make sure to fill up the shmem so no process will miss it
886  // but processes will have to handle duplicates
887 
888  // Check input
889  int lumiCheck = (int) msg->lumiSection;
890  if (lumiCheck < 0)
891  LOG4CPLUS_ERROR(log_,
892  "Received EOL message with invalid index:" << lumiCheck);
893 
894  for (unsigned int i = 0; i < nbRawCells_; i++) {
895  // UPDATED
896  if (stopFlag_) break;
897  nbEolPosted_++;
898  try {
899  shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
900  } catch (evf::Exception& e) {
902  "FUResourceTable:postEndOfLumiSection:writeRawLumiSectionEvent");
903  }
904  }
905 }
int i
Definition: DBlmapReader.cc:9
void writeRawLumiSectionEvent(unsigned int)
Definition: FUShmBuffer.cc:592
UInt_t nbEolPosted_
Definition: IPCMethod.h:370
log4cplus::Logger log_
Definition: IPCMethod.h:342
UInt_t nbRawCells_
Definition: IPCMethod.h:345
FUShmBuffer * shmBuffer_
perl if(1 lt scalar(@::datatypes))
Definition: edlooper.cc:31
void rethrowShmBufferException(evf::Exception &e, std::string where) const
std::string FUResourceTable::printStatus ( )
virtual

Print debugging status.

Reimplemented from evf::IPCMethod.

Definition at line 1430 of file FUResourceTable.cc.

1430  {
1431  if (shmBuffer_) return shmBuffer_->sem_print_s();
1432  else return std::string("ShmBuffer not initialized");
1433 }
FUShmBuffer * shmBuffer_
std::string sem_print_s()
Definition: FUShmBuffer.cc:863
void FUResourceTable::resetCounters ( )
virtual

Reset event & error counters

Implements evf::IPCMethod.

Definition at line 1157 of file FUResourceTable.cc.

References alignCSCRings::e, and i.

1157  {
1158  if (0 != shmBuffer_) {
1159  try {
1160  for (UInt_t i = 0; i < shmBuffer_->nRecoCells(); i++)
1161  acceptSMDataDiscard_[i] = false;
1162  for (UInt_t i = 0; i < shmBuffer_->nDqmCells(); i++)
1163  acceptSMDqmDiscard_[i] = 0;
1164  } catch (evf::Exception& e) {
1166  "FUResourceTable:resetCounters:nRecoCells/nDqmCells");
1167  }
1168  }
1169 
1170  // UPDATE: reset pending allocate's
1171  nbAllocated_ = 0;
1172  nbPending_ = 0;
1173  nbCompleted_ = 0;
1174  nbSent_ = 0;
1175  nbSentError_ = 0;
1176  nbSentDqm_ = 0;
1179  nbDiscarded_ = 0;
1180  nbLost_ = 0;
1181  // UPDATED
1182  nbEolPosted_ = 0;
1183  nbEolDiscarded_ = 0;
1184 
1185  nbErrors_ = 0;
1186  nbCrcErrors_ = 0;
1187  nbAllocSent_ = 0;
1188 
1189  sumOfSquares_ = 0;
1190  sumOfSizes_ = 0;
1191 
1192  //"send" workloop states
1193  sDqmActive_=true;
1194  sDataActive_=true;
1195 
1196 }
int i
Definition: DBlmapReader.cc:9
unsigned int nRecoCells() const
Definition: FUShmBuffer.h:66
UInt_t nbEolPosted_
Definition: IPCMethod.h:370
bool * acceptSMDataDiscard_
Definition: IPCMethod.h:352
uint64_t sumOfSquares_
Definition: IPCMethod.h:381
UInt_t nbErrors_
Definition: IPCMethod.h:377
UInt_t nbDiscarded_
Definition: IPCMethod.h:367
UInt_t nbEolDiscarded_
Definition: IPCMethod.h:371
UInt_t nbSentDqm_
Definition: IPCMethod.h:364
FUShmBuffer * shmBuffer_
UInt_t nbPendingSMDiscards_
Definition: IPCMethod.h:365
UInt_t nbCompleted_
Definition: IPCMethod.h:361
UInt_t nbPending_
Definition: IPCMethod.h:360
UInt_t nbSentError_
Definition: IPCMethod.h:363
UInt_t nbCrcErrors_
Definition: IPCMethod.h:378
int * acceptSMDqmDiscard_
Definition: IPCMethod.h:353
UInt_t nbLost_
Definition: IPCMethod.h:368
unsigned int UInt_t
Definition: FUTypes.h:12
std::atomic< int > nbPendingSMDqmDiscards_
Definition: IPCMethod.h:366
UInt_t nbAllocSent_
Definition: IPCMethod.h:379
UInt_t sumOfSizes_
Definition: IPCMethod.h:382
UInt_t nbSent_
Definition: IPCMethod.h:362
UInt_t nbAllocated_
Definition: IPCMethod.h:359
unsigned int nDqmCells() const
Definition: FUShmBuffer.h:67
void rethrowShmBufferException(evf::Exception &e, std::string where) const
void FUResourceTable::resetIPC ( )
virtual

reset the ShmBuffer to the initial state

Implements evf::IPCMethod.

Definition at line 1412 of file FUResourceTable.cc.

References gather_cfg::cout.

1412  {
1413  if (shmBuffer_ != 0) {
1414  //waiting for sendData and sendDqm workloops to finish
1415  int countdown_=60;
1416  while (countdown_-- && (sDataActive_ || sDqmActive_)) ::usleep(50000);
1417  if (countdown_<=0) {
1418  std::ostringstream ostr;
1419  ostr << "Resource broker timed out waiting for workloop shutdowns (3 seconds). Continuing to reset Shm. States - "
1420  << " sendDqm:"<<sDqmActive_ << " sendData:" << sDataActive_;
1421  LOG4CPLUS_ERROR(log_,ostr.str());
1422  std::cout << ostr.str() << std::endl;
1423  }
1424  //resetting shm buffer
1425  shmBuffer_->reset(false);
1426  LOG4CPLUS_INFO(log_, "ShmBuffer was reset!");
1427  }
1428 }
log4cplus::Logger log_
Definition: IPCMethod.h:342
FUShmBuffer * shmBuffer_
void reset(bool)
Definition: FUShmBuffer.cc:201
tuple cout
Definition: gather_cfg.py:121
void FUResourceTable::rethrowShmBufferException ( evf::Exception e,
std::string  where 
) const
throw (evf::Exception
)
private

Rethrows an exception from the ShmBuffer including details.

Definition at line 1435 of file FUResourceTable.cc.

References alignCSCRings::e, and i.

1436  {
1437  stringstream details;
1438  vector < string > dataStates = cellStates();
1439  vector < string > dqmStates = dqmCellStates();
1440  details << "Exception raised: " << e.what() << " (in module: "
1441  << e.module() << " in function: " << e.function() << " at line: "
1442  << e.line() << ")";
1443  details << " Dumping cell state... ";
1444  details << "data cells --> ";
1445  for (unsigned int i = 0; i < dataStates.size(); i++)
1446  details << dataStates[i] << " ";
1447  details << "dqm cells --> ";
1448  for (unsigned int i = 0; i < dqmStates.size(); i++)
1449  details << dqmStates[i] << " ";
1450  details << " ... originated in: " << where;
1451  XCEPT_RETHROW(evf::Exception, details.str(), e);
1452 }
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > cellStates() const
std::vector< std::string > dqmCellStates() const
bool FUResourceTable::sendData ( )
virtual

Corresponds to the SendData workloop, to be called in normal operation.

Implements evf::IPCMethod.

Definition at line 93 of file FUResourceTable.cc.

References alignCSCRings::e, evf::FUShmRecoCell::eventSize(), evf::FUShmRecoCell::evtNumber(), edm::hlt::Exception, evf::FUShmRecoCell::fuGuid(), evf::FUShmRecoCell::fuProcessId(), evf::FUShmRecoCell::index(), CommonMethods::lock(), evf::FUShmRecoCell::nExpectedEPs(), evf::FUShmRecoCell::outModId(), evf::FUShmRecoCell::payloadAddr(), evf::FUShmRecoCell::rawCellIndex(), evf::FUShmRecoCell::runNumber(), sistrip::runNumber_, and evf::FUShmRecoCell::type().

93  {
94  bool reschedule = true;
95  FUShmRecoCell* cell = 0;
96  try {
97  cell = shmBuffer_->recoCellToRead();
98  } catch (evf::Exception& e) {
99  rethrowShmBufferException(e, "FUResourceTable:sendData:recoCellToRead");
100  }
101 
102  if (0 == cell->eventSize()) {
103  LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
104  UInt_t cellIndex = cell->index();
105  try {
107  shmBuffer_->discardRecoCell(cellIndex);
108  } catch (evf::Exception& e) {
110  "FUResourceTable:sendData:finishReadingRecoCell/discardRecoCell");
111  }
112  shutdownStatus_|=1<<7;
113  reschedule = false;
114  } else {
115  try {
116  if (cell->type() == 0) {
117  UInt_t cellIndex = cell->index();
118  UInt_t cellOutModId = cell->outModId();
119  UInt_t cellFUProcId = cell->fuProcessId();
120  UInt_t cellFUGuid = cell->fuGuid();
121  UChar_t* cellPayloadAddr = cell->payloadAddr();
122  UInt_t cellEventSize = cell->eventSize();
123  UInt_t cellExpectedEPs = cell->nExpectedEPs();
124  try {
126  } catch (evf::Exception& e) {
128  "FUResourceTable:sendData:finishReadingRecoCell");
129  }
130 
131  lock();
133  unlock();
134 
135  sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
136  cellFUGuid, cellPayloadAddr, cellEventSize,
137  cellExpectedEPs);
138  } else if (cell->type() == 1) {
139  UInt_t cellIndex = cell->index();
140  UInt_t cellRawIndex = cell->rawCellIndex();
141  UInt_t cellRunNumber = cell->runNumber();
142  UInt_t cellEvtNumber = cell->evtNumber();
143  UInt_t cellOutModId = cell->outModId();
144  UInt_t cellFUProcId = cell->fuProcessId();
145  UInt_t cellFUGuid = cell->fuGuid();
146  UChar_t *cellPayloadAddr = cell->payloadAddr();
147  UInt_t cellEventSize = cell->eventSize();
148  try {
150  } catch (evf::Exception& e) {
152  "FUResourceTable:sendData:finishReadingRecoCell");
153  }
154 
155  lock();
157  resources_[cellRawIndex]->incNbSent();
158  if (resources_[cellRawIndex]->nbSent() == 1)
159  nbSent_++;
160  unlock();
161 
162  sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
163  cellOutModId, cellFUProcId, cellFUGuid,
164  cellPayloadAddr, cellEventSize);
165  } else if (cell->type() == 2) {
166  UInt_t cellIndex = cell->index();
167  UInt_t cellRawIndex = cell->rawCellIndex();
168  //UInt_t cellRunNumber = cell->runNumber();
169  UInt_t cellEvtNumber = cell->evtNumber();
170  UInt_t cellFUProcId = cell->fuProcessId();
171  UInt_t cellFUGuid = cell->fuGuid();
172  UChar_t *cellPayloadAddr = cell->payloadAddr();
173  UInt_t cellEventSize = cell->eventSize();
174  try {
176  } catch (evf::Exception& e) {
178  "FUResourceTable:sendData:recoCellToRead");
179  }
180 
181  lock();
183  resources_[cellRawIndex]->incNbSent();
184  if (resources_[cellRawIndex]->nbSent() == 1) {
185  nbSent_++;
186  nbSentError_++;
187  }
188  unlock();
189 
190  sendErrorEvent(cellIndex, runNumber_, cellEvtNumber,
191  cellFUProcId, cellFUGuid, cellPayloadAddr,
192  cellEventSize);
193  } else {
194  string errmsg =
195  "Unknown RecoCell type (neither INIT/DATA/ERROR).";
196  XCEPT_RAISE(evf::Exception, errmsg);
197  }
198  } catch (xcept::Exception& e) {
199  LOG4CPLUS_FATAL(
200  log_,
201  "Failed to send EVENT DATA to StorageManager: "
202  << xcept::stdformat_exception_history(e));
203  reschedule = false;
204  }
205  }
206 
207  sDataActive_=reschedule;
208  return reschedule;
209 }
unsigned int index() const
Definition: FUShmRecoCell.h:22
void discardRecoCell(unsigned int iCell)
Definition: FUShmBuffer.cc:501
unsigned int type() const
Definition: FUShmRecoCell.h:29
unsigned int runNumber() const
Definition: FUShmRecoCell.h:24
unsigned int fuGuid() const
Definition: FUShmRecoCell.h:28
unsigned int eventSize() const
Definition: FUShmRecoCell.h:33
log4cplus::Logger log_
Definition: IPCMethod.h:342
unsigned int fuProcessId() const
Definition: FUShmRecoCell.h:27
FUShmBuffer * shmBuffer_
unsigned char * payloadAddr() const
unsigned int evtNumber() const
Definition: FUShmRecoCell.h:25
UInt_t nbPendingSMDiscards_
Definition: IPCMethod.h:365
unsigned int nExpectedEPs() const
Definition: FUShmRecoCell.h:34
unsigned int rawCellIndex() const
Definition: FUShmRecoCell.h:23
unsigned char UChar_t
Definition: FUTypes.h:14
UInt_t nbSentError_
Definition: IPCMethod.h:363
FUResourceVec_t resources_
Definition: IPCMethod.h:393
UInt_t runNumber_
Definition: IPCMethod.h:384
void finishReadingRecoCell(FUShmRecoCell *cell)
Definition: FUShmBuffer.cc:426
UInt_t shutdownStatus_
Definition: IPCMethod.h:395
unsigned int UInt_t
Definition: FUTypes.h:12
void sendErrorEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:186
unsigned int outModId() const
Definition: FUShmRecoCell.h:26
FUShmRecoCell * recoCellToRead()
Definition: FUShmBuffer.cc:340
UInt_t nbSent_
Definition: IPCMethod.h:362
void sendDataEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:171
void unlock()
Definition: IPCMethod.h:314
UInt_t nbSent() const
Definition: IPCMethod.h:216
void sendInitMessage(UInt_t fuResourceId, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize, UInt_t nExpectedEPs)
Definition: IPCMethod.cc:156
void rethrowShmBufferException(evf::Exception &e, std::string where) const
bool FUResourceTable::sendDataWhileHalting ( )
virtual

Function called when FSM is in state stopping / halting, in SendData workloop.

Implements evf::IPCMethod.

Definition at line 212 of file FUResourceTable.cc.

References alignCSCRings::e, evf::FUShmRecoCell::eventSize(), and evf::FUShmRecoCell::index().

212  {
213  bool reschedule = true;
214  FUShmRecoCell* cell = 0;
215  try {
216  cell = shmBuffer_->recoCellToRead();
217  } catch (evf::Exception& e) {
219  "FUResourceTable:sendDataWhileHalting:recoCellToRead");
220  }
221 
222  if (0 == cell->eventSize()) {
223  LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
224  UInt_t cellIndex = cell->index();
225  try {
227  shmBuffer_->discardRecoCell(cellIndex);
228  } catch (evf::Exception& e) {
230  "FUResourceTable:sendDataWhileHalting:finishReadingRecoCell/discardRecoCell");
231  }
232  shutdownStatus_|=1<<8;
233  reschedule = false;
234  } else {
235  LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell.");
236  UInt_t cellIndex = cell->index();
237  try {
239  shmBuffer_->discardRecoCell(cellIndex);
240  } catch (evf::Exception& e) {
242  "FUResourceTable:sendDataWhileHalting:finishReadingRecoCell/discardRecoCell");
243  }
244  }
245 
246  sDataActive_=reschedule;
247  return reschedule;
248 }
unsigned int index() const
Definition: FUShmRecoCell.h:22
void discardRecoCell(unsigned int iCell)
Definition: FUShmBuffer.cc:501
unsigned int eventSize() const
Definition: FUShmRecoCell.h:33
log4cplus::Logger log_
Definition: IPCMethod.h:342
FUShmBuffer * shmBuffer_
void finishReadingRecoCell(FUShmRecoCell *cell)
Definition: FUShmBuffer.cc:426
UInt_t shutdownStatus_
Definition: IPCMethod.h:395
unsigned int UInt_t
Definition: FUTypes.h:12
FUShmRecoCell * recoCellToRead()
Definition: FUShmBuffer.cc:340
void rethrowShmBufferException(evf::Exception &e, std::string where) const
bool FUResourceTable::sendDqm ( )
virtual

Corresponds to the SendDqm workloop, to be called in normal operation.

Implements evf::IPCMethod.

Definition at line 251 of file FUResourceTable.cc.

References gather_cfg::cout, alignCSCRings::e, evf::dqm::EMPTY, evf::FUShmDqmCell::eventSize(), evf::FUShmDqmCell::evtAtUpdate(), edm::hlt::Exception, evf::FUShmDqmCell::folderId(), evf::FUShmDqmCell::fuGuid(), evf::FUShmDqmCell::fuProcessId(), evf::FUShmDqmCell::index(), evf::FUShmDqmCell::payloadAddr(), evf::FUShmDqmCell::runNumber(), and evf::utils::state.

251  {
252  bool reschedule = true;
253  FUShmDqmCell* cell = 0;
254  // initialize to a value to avoid warnings
256  try {
257  cell = shmBuffer_->dqmCellToRead();
258  state = shmBuffer_->dqmState(cell->index());
259  } catch (evf::Exception& e) {
261  "FUResourceTable:sendDqm:dqmCellToRead/dqmState");
262  }
263 
264  if (state == dqm::EMPTY) {
265  LOG4CPLUS_INFO(log_, "Don't reschedule sendDqm workloop.");
266  std::cout << "shut down dqm workloop " << std::endl;
267  UInt_t cellIndex = cell->index();
268  try {
270  shmBuffer_->discardDqmCell(cellIndex);
271  } catch (evf::Exception& e) {
273  "FUResourceTable:sendDqm:finishReadingDqmCell/discardDqmCell");
274  }
275  shutdownStatus_|=1<<9;
276  reschedule = false;
277  } else {
278  try {
279  UInt_t cellIndex = cell->index();
280  UInt_t cellRunNumber = cell->runNumber();
281  UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
282  UInt_t cellFolderId = cell->folderId();
283  UInt_t cellFUProcId = cell->fuProcessId();
284  UInt_t cellFUGuid = cell->fuGuid();
285  UChar_t *cellPayloadAddr = cell->payloadAddr();
286  UInt_t cellEventSize = cell->eventSize();
287  sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
288  cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
289  cellEventSize);
290  try {
292  } catch (evf::Exception& e) {
294  "FUResourceTable:sendDqm:finishReadingDqmCell");
295  }
296  } catch (xcept::Exception& e) {
297  LOG4CPLUS_FATAL(
298  log_,
299  "Failed to send DQM DATA to StorageManager: "
300  << xcept::stdformat_exception_history(e));
301  reschedule = false;
302  }
303  }
304 
305  sDqmActive_=reschedule;
306  return reschedule;
307 }
unsigned int fuGuid() const
Definition: FUShmDqmCell.h:27
unsigned int index() const
Definition: FUShmDqmCell.h:22
unsigned int folderId() const
Definition: FUShmDqmCell.h:25
unsigned int evtAtUpdate() const
Definition: FUShmDqmCell.h:24
log4cplus::Logger log_
Definition: IPCMethod.h:342
FUShmDqmCell * dqmCellToRead()
Definition: FUShmBuffer.cc:354
void discardDqmCell(unsigned int iCell)
Definition: FUShmBuffer.cc:527
FUShmBuffer * shmBuffer_
unsigned int fuProcessId() const
Definition: FUShmDqmCell.h:26
unsigned int runNumber() const
Definition: FUShmDqmCell.h:23
unsigned char UChar_t
Definition: FUTypes.h:14
void sendDqmEvent(UInt_t fuDqmId, UInt_t runNumber, UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:224
void finishReadingDqmCell(FUShmDqmCell *cell)
Definition: FUShmBuffer.cc:438
dqm::State_t dqmState(unsigned int index)
UInt_t shutdownStatus_
Definition: IPCMethod.h:395
unsigned int UInt_t
Definition: FUTypes.h:12
unsigned char * payloadAddr() const
Definition: FUShmDqmCell.cc:54
unsigned int eventSize() const
Definition: FUShmDqmCell.h:31
char state
Definition: procUtils.cc:75
tuple cout
Definition: gather_cfg.py:121
void rethrowShmBufferException(evf::Exception &e, std::string where) const
bool FUResourceTable::sendDqmWhileHalting ( )
virtual

Function called when FSM is in state stopping / halting, in SendDqm workloop.

Implements evf::IPCMethod.

Definition at line 310 of file FUResourceTable.cc.

References gather_cfg::cout, alignCSCRings::e, evf::dqm::EMPTY, evf::FUShmDqmCell::index(), and evf::utils::state.

310  {
311  bool reschedule = true;
312  FUShmDqmCell* cell = 0;
313  // initialize to a value to avoid warnings
315  try {
316  cell = shmBuffer_->dqmCellToRead();
317  state = shmBuffer_->dqmState(cell->index());
318  } catch (evf::Exception& e) {
320  "FUResourceTable:sendDqmWhileHalting:dqmCellToRead/dqmState");
321  }
322 
323  if (state == dqm::EMPTY) {
324  LOG4CPLUS_INFO(log_, "Don't reschedule sendDqm workloop.");
325  std::cout << "shut down dqm workloop " << std::endl;
326  UInt_t cellIndex = cell->index();
327  try {
329  shmBuffer_->discardDqmCell(cellIndex);
330  } catch (evf::Exception& e) {
332  "FUResourceTable:sendDqmWhileHalting:finishReadingDqmCell/discardDqmCell");
333  }
334  shutdownStatus_|=1<<10;
335  reschedule = false;
336  } else {
337  UInt_t cellIndex = cell->index();
338  try {
340  shmBuffer_->discardDqmCell(cellIndex);
341  } catch (evf::Exception& e) {
343  "FUResourceTable:sendDqmWhileHalting:finishReadingDqmCell/discardDqmCell");
344  }
345  }
346 
347  sDqmActive_=reschedule;
348  return reschedule;
349 }
unsigned int index() const
Definition: FUShmDqmCell.h:22
log4cplus::Logger log_
Definition: IPCMethod.h:342
FUShmDqmCell * dqmCellToRead()
Definition: FUShmBuffer.cc:354
void discardDqmCell(unsigned int iCell)
Definition: FUShmBuffer.cc:527
FUShmBuffer * shmBuffer_
void finishReadingDqmCell(FUShmDqmCell *cell)
Definition: FUShmBuffer.cc:438
dqm::State_t dqmState(unsigned int index)
UInt_t shutdownStatus_
Definition: IPCMethod.h:395
unsigned int UInt_t
Definition: FUTypes.h:12
char state
Definition: procUtils.cc:75
tuple cout
Definition: gather_cfg.py:121
void rethrowShmBufferException(evf::Exception &e, std::string where) const
void FUResourceTable::shutDownClients ( )
virtual

Send empty events to notify clients to shutdown

Implements evf::IPCMethod.

Definition at line 984 of file FUResourceTable.cc.

References alignCSCRings::e, evf::evt::EMPTY, i, lumiQueryAPI::msg, n, evf::utils::pid, sistrip::runNumber_, shutdownWatchdog(), evf::utils::state, and ntuplemaker::status.

984  {
986  isReadyToShutDown_ = false;
987 
988  shutdownStatus_=1;
989 
990  //start watchdog thread
991  watchDogEnd_=false;
992  watchDogSetFailed_=false;
993  #ifdef linux
994  std::thread watch(&FUResourceTable::shutdownWatchdog,this,20);
995  #endif
996  if (nbClientsToShutDown_ == 0) {
997  shutdownStatus_|=1<<1;
998  LOG4CPLUS_INFO(
999  log_,
1000  "No clients to shut down. Checking if there are raw cells not assigned to any process yet");
1001  UInt_t n = nbResources();
1002  try {
1003  for (UInt_t i = 0; i < n; i++) {
1005  if (state != evt::EMPTY) {
1006  LOG4CPLUS_WARN(
1007  log_,
1008  "Schedule discard at STOP for orphaned event in state "
1009  << state);
1011  }
1012  }
1014  } catch (evf::Exception& e) {
1016  "FUResourceTable:shutDownClients:evtState/scheduleRawEmptyCellForDiscard");
1017  }
1018  } else {
1019  // UPDATED
1020  int checks = 0;
1021  try {
1022  while (shmBuffer_->nbRawCellsToWrite() < nbClients() && nbClients()
1023  != 0) {
1024  shutdownStatus_|=1<<2;
1025  checks++;
1026  {
1027  #ifdef linux
1028  auto lk = lockCrashHandlerTimed(10);
1029  #else
1030  bool lk=true;
1031  #endif
1032  if (lk) {
1033  vector < pid_t > prcids = clientPrcIds();
1034  for (UInt_t i = 0; i < prcids.size(); i++) {
1035  pid_t pid = prcids[i];
1036  int status = kill(pid, 0);
1037  if (status != 0) {
1038  LOG4CPLUS_ERROR(log_,
1039  "EP prc " << pid << " completed with error.");
1041  }
1042  }
1043  }
1044  else {
1045  XCEPT_RAISE(evf::Exception,
1046  "Timed out access to the Crash Handler in stop. SM discards not arriving?");
1047 
1048  }
1049  }
1050 
1051  LOG4CPLUS_WARN(
1052  log_,
1053  "no cell to write stop "
1055  << " nClients " << nbClients());
1056  if (checks > 15) {
1057  string msg = "No Raw Cell to Write STOP messages";
1058  XCEPT_RAISE(evf::Exception, msg);
1059  }
1060  ::usleep(500000);
1061  }
1062  shutdownStatus_|=1<<3;
1063 
1064  } catch (evf::Exception& e) {
1065  watchDogEnd_=true;
1066  #ifdef linux
1067  watch.join();
1068  #endif
1070  "FUResourceTable:shutDownClients:nbRawCellsToWrite");
1071  }
1073  if (nbClientsToShutDown_ == 0) {
1074  shutdownStatus_|=1<<4;
1075  UInt_t n = nbResources();
1076  for (UInt_t i = 0; i < n; i++) {
1077  // initialize to a value to avoid warnings
1078  evt::State_t state = evt::EMPTY;
1079  try {
1080  state = shmBuffer_->evtState(i);
1081  } catch (evf::Exception& e) {
1082  watchDogEnd_=true;
1083  #ifdef linux
1084  watch.join();
1085  #endif
1087  "FUResourceTable:shutDownClients:evtState");
1088  }
1089  if (state != evt::EMPTY) {
1090  LOG4CPLUS_WARN(
1091  log_,
1092  "Schedule discard at STOP for orphaned event in state "
1093  << state);
1094  try {
1095  shmBuffer_->setEvtDiscard(i, 1, true);
1097  } catch (evf::Exception& e) {
1098  watchDogEnd_=true;
1099  #ifdef linux
1100  watch.join();
1101  #endif
1103  "FUResourceTable:shutDownClients:scheduleRawCellForDiscardServerSide");
1104  }
1105  }
1106  }
1107  try {
1109  } catch (evf::Exception& e) {
1110  watchDogEnd_=true;
1111  #ifdef linux
1112  watch.join();
1113  #endif
1115  "FUResourceTable:shutDownClients:scheduleRawEmptyCellForDiscard");
1116  }
1117  }
1119  shutdownStatus_|=1<<5;
1120  try {
1121  for (UInt_t i = 0; i < n; ++i)
1123  } catch (evf::Exception& e) {
1124  watchDogEnd_=true;
1125  #ifdef linux
1126  watch.join();
1127  #endif
1129  "FUResourceTable:shutDownClients:writeRawEmptyEvent");
1130  }
1131  shutdownStatus_|=1<<6;
1132  }
1133  watchDogEnd_=true;
1134  #ifdef linux
1135  watch.join();
1136  if (watchDogSetFailed_)
1137  XCEPT_RAISE(evf::Exception, "Failed (timed out) shutdown of clients");
1138  #endif
1139 }
int i
Definition: DBlmapReader.cc:9
bool handleCrashedEP(UInt_t runNumber, pid_t pid)
void scheduleRawCellForDiscardServerSide(unsigned int iCell)
Definition: FUShmBuffer.cc:478
bool isReadyToShutDown_
Definition: IPCMethod.h:374
std::atomic_bool watchDogEnd_
log4cplus::Logger log_
Definition: IPCMethod.h:342
evt::State_t evtState(unsigned int index)
UInt_t nbClients() const
FUShmBuffer * shmBuffer_
void shutdownWatchdog(unsigned int timeout)
std::vector< pid_t > clientPrcIds() const
unsigned int nbRawCellsToWrite() const
Definition: FUShmBuffer.cc:288
bool setEvtDiscard(unsigned int index, unsigned int discard, bool checkValue=false, bool lockShm=true)
void writeRawEmptyEvent()
Definition: FUShmBuffer.cc:572
UInt_t nbResources() const
void scheduleRawEmptyCellForDiscard()
Definition: FUShmBuffer.cc:638
UInt_t runNumber_
Definition: IPCMethod.h:384
UInt_t shutdownStatus_
Definition: IPCMethod.h:395
unsigned int UInt_t
Definition: FUTypes.h:12
UInt_t nbClientsToShutDown_
Definition: IPCMethod.h:373
char state
Definition: procUtils.cc:75
tuple status
Definition: ntuplemaker.py:245
std::atomic_bool watchDogSetFailed_
void rethrowShmBufferException(evf::Exception &e, std::string where) const
void FUResourceTable::shutdownWatchdog ( unsigned int  timeout)
private

Spawned as a thread watching for shutdown of clients.

Definition at line 963 of file FUResourceTable.cc.

Referenced by shutDownClients().

964 {
965  unsigned int timeoutUs=timeout*1000000+1;
966  bool warned=false;
967  while (!watchDogEnd_) {
968 
969  usleep(50000);
970  timeoutUs-=50000;
971  if (timeoutUs<=50000) {
972  LOG4CPLUS_ERROR(log_,"Timeout in shutdownClients, status:"<< std::hex << shutdownStatus_);
973  watchDogSetFailed_=true;
974  break;
975  }
976  if (timeoutUs<=1000000*timeout/2 && !warned) {
977  warned=true;
978  LOG4CPLUS_WARN(log_,"Long shutdown of clients, status:" << std::hex << shutdownStatus_);
979  }
980  }
981 }
std::atomic_bool watchDogEnd_
log4cplus::Logger log_
Definition: IPCMethod.h:342
UInt_t shutdownStatus_
Definition: IPCMethod.h:395
std::atomic_bool watchDogSetFailed_

Member Data Documentation

bool evf::FUResourceTable::sDataActive_
private

Definition at line 194 of file FUResourceTable.h.

bool evf::FUResourceTable::sDqmActive_
private

Definition at line 195 of file FUResourceTable.h.

FUShmBuffer* evf::FUResourceTable::shmBuffer_
private

Definition at line 191 of file FUResourceTable.h.

std::atomic_bool evf::FUResourceTable::watchDogEnd_
private

Definition at line 197 of file FUResourceTable.h.

std::atomic_bool evf::FUResourceTable::watchDogSetFailed_
private

Definition at line 198 of file FUResourceTable.h.