#include <FUResourceQueue.h>
Public Member Functions | |
bool | buildResource (MemRef_t *bufRef) |
std::vector< UInt_t > | cellEvtNumbers () const |
std::vector< pid_t > | cellPrcIds () const |
std::vector< std::string > | cellStates () const |
std::vector< time_t > | cellTimeStamps () const |
void | clear () |
std::vector< pid_t > | clientPrcIds () const |
std::string | clientPrcIdsAsString () const |
bool | discard () |
bool | discardDataEvent (MemRef_t *bufRef) |
bool | discardDataEventWhileHalting (MemRef_t *bufRef) |
bool | discardDqmEvent (MemRef_t *bufRef) |
bool | discardDqmEventWhileHalting (MemRef_t *bufRef) |
bool | discardWhileHalting (bool sendDiscards) |
std::vector< std::string > | dqmCellStates () const |
void | dropEvent () |
void | dumpEvent (evf::FUShmRawCell *cell) |
FUResourceQueue (bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int, EvffedFillerRB *frb, xdaq::Application *) throw (evf::Exception) | |
bool | handleCrashedEP (UInt_t runNumber, pid_t pid) |
void | initialize (bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize) throw (evf::Exception) |
void | lastResort () |
UInt_t | nbClients () const |
UInt_t | nbResources () const |
void | postEndOfLumiSection (MemRef_t *bufRef) |
void | resetCounters () |
void | resetIPC () |
resets the underlying IPC method to the initial state | |
bool | sendData () |
bool | sendDataWhileHalting () |
bool | sendDqm () |
bool | sendDqmWhileHalting () |
void | shutDownClients () |
virtual | ~FUResourceQueue () |
Private Attributes | |
RawCache * | cache_ |
UInt_t | dqmCellSize_ |
MasterQueue | msq_ |
UInt_t | rawCellSize_ |
UInt_t | recoCellSize_ |
Definition at line 32 of file FUResourceQueue.h.
FUResourceQueue::FUResourceQueue | ( | bool | segmentationMode, |
UInt_t | nbRawCells, | ||
UInt_t | nbRecoCells, | ||
UInt_t | nbDqmCells, | ||
UInt_t | rawCellSize, | ||
UInt_t | recoCellSize, | ||
UInt_t | dqmCellSize, | ||
int | freeResReq, | ||
BUProxy * | bu, | ||
SMProxy * | sm, | ||
log4cplus::Logger | logger, | ||
unsigned int | timeout, | ||
EvffedFillerRB * | frb, | ||
xdaq::Application * | app | ||
) | throw (evf::Exception) |
Definition at line 40 of file FUResourceQueue.cc.
: IPCMethod(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells, rawCellSize, recoCellSize, dqmCellSize, freeResReq, bu, sm, logger, timeout, frb, app), msq_(99) { //improve fix UInt_t and msq_(99) initialize(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells, rawCellSize, recoCellSize, dqmCellSize); }
FUResourceQueue::~FUResourceQueue | ( | ) | [virtual] |
Definition at line 56 of file FUResourceQueue.cc.
References clear(), evf::MasterQueue::disconnect(), evf::IPCMethod::log_, and msq_.
{ clear(); // disconnect from queue if (msq_.disconnect() == 0) LOG4CPLUS_INFO(log_, "MESSAGE QUEUE SUCCESSFULLY RELEASED."); // needed?? /* if (0 != acceptSMDataDiscard_) delete[] acceptSMDataDiscard_; if (0 != acceptSMDqmDiscard_) delete[] acceptSMDqmDiscard_; */ }
bool FUResourceQueue::buildResource | ( | MemRef_t * | bufRef | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 771 of file FUResourceQueue.cc.
References evf::FUResource::allocate(), Association::block, evf::IPCMethod::bu_, cache_, evf::FUResource::doCrcCheck(), evf::IPCMethod::doCrcCheck_, evf::FUResource::fatalError(), evf::IPCMethod::frb_, evf::IPCMethod::freeResourceIds_, evf::RawCache::getMsgToWrite(), evf::FUShmRawCell::initialize(), evf::FUResource::isAllocated(), evf::FUResource::isComplete(), evf::IPCMethod::isLastMessageOfEvent(), evf::IPCMethod::lock(), evf::IPCMethod::log_, msq_, evf::IPCMethod::nbAllocated_, evf::IPCMethod::nbCompleted_, evf::FUResource::nbCrcErrors(), evf::IPCMethod::nbCrcErrors_, evf::IPCMethod::nbDiscarded_, evf::FUResource::nbErrors(), evf::IPCMethod::nbErrors_, evf::IPCMethod::nbLost_, evf::IPCMethod::nbPending_, cmsPerfSuiteHarvest::now, evf::MasterQueue::postLength(), evf::FUResource::process(), evf::RawMsgBuf::rawCell(), evf::FUResource::release(), evf::IPCMethod::resources_, evf::IPCMethod::sendAllocate(), evf::BUProxy::sendDiscard(), evf::EvffedFillerRB::setRBEventCount(), evf::EvffedFillerRB::setRBTimeStamp(), evf::IPCMethod::unlock(), and evf::RawMsgBuf::usedSize().
{ bool eventComplete = false; I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block = (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation(); UInt_t fuResourceId = (UInt_t) block->fuTransactionId; UInt_t buResourceId = (UInt_t) block->buResourceId; FUResource* resource = resources_[fuResourceId]; RawMsgBuf* currentMessageToWrite = cache_->getMsgToWrite(); if (!resource->fatalError() && !resource->isAllocated()) { FUShmRawCell* rawCell = currentMessageToWrite->rawCell(); rawCell->initialize(fuResourceId); resource->allocate(rawCell); timeval now; gettimeofday(&now, 0); frb_->setRBTimeStamp( ((uint64_t) (now.tv_sec) << 32) + (uint64_t) (now.tv_usec)); frb_->setRBEventCount(nbCompleted_); if (doCrcCheck_ > 0 && 0 == nbAllocated_ % doCrcCheck_) resource->doCrcCheck(true); else resource->doCrcCheck(false); } // keep building this resource if it is healthy if (!resource->fatalError()) { resource->process(bufRef); lock(); nbErrors_ += resource->nbErrors(); nbCrcErrors_ += resource->nbCrcErrors(); unlock(); // make resource available for pick-up if (resource->isComplete()) { lock(); nbCompleted_++; nbPending_--; unlock(); /* cout << "POSTING to q: msqid = " << msq_.id() << ", message buf is allocated for: " << currentMessageToWrite->msize() << "... Actually copied: " << currentMessageToWrite->usedSize() << " fuResourceID = " << currentMessageToWrite->rawCell()->fuResourceId() << " buResourceID = " << currentMessageToWrite->rawCell()->buResourceId() << endl; */ //msq_.post(*currentMessageToWrite); try { msq_.postLength(*currentMessageToWrite, currentMessageToWrite->usedSize()); } catch (...) { string errmsg = "Failed to post message to Queue!"; LOG4CPLUS_FATAL(log_, errmsg); XCEPT_RAISE(evf::Exception, errmsg); } // CURRENT RECEIVERS /* vector<int> receivers = msq_.getReceivers(); cout << "--Receiving processes: "; for (unsigned int i = 0; i < receivers.size(); ++i) cout << i << " " << receivers[i]; cout << endl; */ eventComplete = true; } } // bad event, release msg, and the whole resource if this was the last one if (resource->fatalError()) { bool lastMsg = isLastMessageOfEvent(bufRef); if (lastMsg) { resource->release(false); lock(); freeResourceIds_.push(fuResourceId); nbDiscarded_++; nbLost_++; nbPending_--; unlock(); bu_->sendDiscard(buResourceId); sendAllocate(); } bufRef->release(); // this should now be safe re: appendToSuperFrag as corrupted blocks will be removed... } return eventComplete; }
vector< UInt_t > FUResourceQueue::cellEvtNumbers | ( | ) | const [virtual] |
Implements evf::IPCMethod.
Definition at line 1294 of file FUResourceQueue.cc.
References query::result.
vector< pid_t > FUResourceQueue::cellPrcIds | ( | ) | const [virtual] |
Implements evf::IPCMethod.
Definition at line 1311 of file FUResourceQueue.cc.
References query::result.
vector< string > FUResourceQueue::cellStates | ( | ) | const [virtual] |
Implements evf::IPCMethod.
Definition at line 1220 of file FUResourceQueue.cc.
References query::result.
{ vector<string> result; /* if (0 != shmBuffer_) { UInt_t n = nbResources(); shmBuffer_->lock(); for (UInt_t i = 0; i < n; i++) { evt::State_t state = shmBuffer_->evtState(i); if (state == evt::EMPTY) result.push_back("EMPTY"); else if (state == evt::STOP) result.push_back("STOP"); else if (state == evt::LUMISECTION) result.push_back("LUMISECTION"); else if (state == evt::RAWWRITING) result.push_back("RAWWRITING"); else if (state == evt::RAWWRITTEN) result.push_back("RAWWRITTEN"); else if (state == evt::RAWREADING) result.push_back("RAWREADING"); else if (state == evt::RAWREAD) result.push_back("RAWREAD"); else if (state == evt::PROCESSING) result.push_back("PROCESSING"); else if (state == evt::PROCESSED) result.push_back("PROCESSED"); else if (state == evt::RECOWRITING) result.push_back("RECOWRITING"); else if (state == evt::RECOWRITTEN) result.push_back("RECOWRITTEN"); else if (state == evt::SENDING) result.push_back("SENDING"); else if (state == evt::SENT) result.push_back("SENT"); else if (state == evt::DISCARDING) result.push_back("DISCARDING"); } shmBuffer_->unlock(); } */ return result; }
vector< time_t > FUResourceQueue::cellTimeStamps | ( | ) | const [virtual] |
Implements evf::IPCMethod.
Definition at line 1328 of file FUResourceQueue.cc.
References query::result.
void FUResourceQueue::clear | ( | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 1127 of file FUResourceQueue.cc.
References evf::IPCMethod::freeResourceIds_, i, and evf::IPCMethod::resources_.
Referenced by ~FUResourceQueue().
{ for (UInt_t i = 0; i < resources_.size(); i++) { resources_[i]->release(false); delete resources_[i]; } resources_.clear(); while (!freeResourceIds_.empty()) freeResourceIds_.pop(); }
vector< pid_t > FUResourceQueue::clientPrcIds | ( | ) | const [virtual] |
Implements evf::IPCMethod.
Definition at line 1187 of file FUResourceQueue.cc.
References query::result.
string FUResourceQueue::clientPrcIdsAsString | ( | ) | const [virtual] |
Implements evf::IPCMethod.
Definition at line 1202 of file FUResourceQueue.cc.
{ stringstream ss; /* if (0 != shmBuffer_) { UInt_t n = nbClients(); for (UInt_t i = 0; i < n; i++) { if (i > 0) ss << ","; ss << shmBuffer_->clientPrcId(i); } } */ return ss.str(); }
bool FUResourceQueue::discard | ( | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
isHalting_
isStopping_
Implements evf::IPCMethod.
Definition at line 483 of file FUResourceQueue.cc.
References gather_cfg::cout, evf::DISCARD_RAW_MESSAGE_TYPE, evf::IPCMethod::freeResourceIds_, evf::RawCache::getInstance(), evf::IPCMethod::lock(), msq_, evf::MasterQueue::rcvQuiet(), evf::RawCache::releaseMsg(), evf::IPCMethod::resources_, evf::IPCMethod::sendAllocate(), evf::IPCMethod::sendDiscard(), stor::utils::sleep(), and evf::IPCMethod::unlock().
{ bool reschedule = true; /* * DISCARDING raw msg buffers */ MsgBuf discardRaw(2 * sizeof(unsigned int), DISCARD_RAW_MESSAGE_TYPE); bool rcvSuccess = msq_.rcvQuiet(discardRaw); if (!rcvSuccess) { cout << "RCV failed!" << endl; ::sleep(5); return reschedule; } unsigned int* pBuID = (unsigned int*) discardRaw->mtext; unsigned int* pFuID = (unsigned int*) (discardRaw->mtext + sizeof(unsigned int)); unsigned int buResourceId = *pBuID; unsigned int fuResourceId = *pFuID; cout << "Discard received for buResourceID: " << buResourceId << " fuResourceID " << fuResourceId << endl << endl; //FUShmRawCell* cell = shmBuffer_->rawCellToDiscard(); //evt::State_t state = shmBuffer_->evtState(cell->index()); /* bool shutDown = (state == evt::STOP); bool isLumi = (state == evt::LUMISECTION); */ //UInt_t fuResourceId = cell->fuResourceId(); //UInt_t buResourceId = cell->buResourceId(); // cout << "discard loop, state, shutDown, isLumi " << state << " " // << shutDown << " " << isLumi << endl; // cout << "resource ids " << fuResourceId << " " << buResourceId << endl; /* if (shutDown) { LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_); if (nbClientsToShutDown_ > 0) --nbClientsToShutDown_; if (nbClientsToShutDown_ == 0) { LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop."); isActive_ = false; reschedule = false; } } */ //shmBuffer_->discardRawCell(cell); //if (!shutDown && !isLumi) { if (true) { // (false = no shmdt) resources_[fuResourceId]->release(false); // also release space in RawCache RawCache::getInstance()->releaseMsg(fuResourceId); lock(); freeResourceIds_.push(fuResourceId); assert(freeResourceIds_.size() <= resources_.size()); unlock(); if (true) { sendDiscard(buResourceId); if (true) sendAllocate(); } } // concept shutdown cycle /* if (!reschedule) { cout << " entered shutdown cycle " << endl; shmBuffer_->writeRecoEmptyEvent(); UInt_t count = 0; while (count < 100) { cout << " shutdown cycle " << shmBuffer_->nClients() << " " << FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << endl; if (shmBuffer_->nClients() == 0 && FUShmBuffer::shm_nattch( shmBuffer_->shmid()) == 1) { // isReadyToShutDown_ = true; break; } else { count++; cout << " shutdown cycle attempt " << count << endl; LOG4CPLUS_DEBUG( log_, "FUResourceTable: Wait for all clients to detach," << " nClients=" << shmBuffer_->nClients() << " nattch=" << FUShmBuffer::shm_nattch( shmBuffer_->shmid()) << " (" << count << ")"); ::usleep(shutdownTimeout_); if (count * shutdownTimeout_ > 10000000) LOG4CPLUS_WARN( log_, "FUResourceTable:LONG Wait (>10s) for all clients to detach," << " nClients=" << shmBuffer_->nClients() << " nattch=" << FUShmBuffer::shm_nattch( shmBuffer_->shmid()) << " (" << count << ")"); } } bool allEmpty = false; cout << "Checking if all dqm cells are empty " << endl; while (!allEmpty) { UInt_t n = nbDqmCells_; allEmpty = true; shmBuffer_->lock(); for (UInt_t i = 0; i < n; i++) { dqm::State_t state = shmBuffer_->dqmState(i); if (state != dqm::EMPTY) allEmpty = false; } shmBuffer_->unlock(); } cout << "Making sure there are no dqm pending discards " << endl; if (nbPendingSMDqmDiscards_ != 0) { LOG4CPLUS_WARN( log_, "FUResourceTable: pending DQM discards not zero: =" << nbPendingSMDqmDiscards_ << " while cells are all empty. This may cause problems at next start "); } shmBuffer_->writeDqmEmptyEvent(); isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the // sendDqm loop has been shut down as well } */ return reschedule; }
bool FUResourceQueue::discardDataEvent | ( | MemRef_t * | bufRef | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 878 of file FUResourceQueue.cc.
{ /* I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg; msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation(); UInt_t recoIndex = msg->rbBufferID; if (acceptSMDataDiscard_[recoIndex]) { lock(); nbPendingSMDiscards_--; unlock(); acceptSMDataDiscard_[recoIndex] = false; if (!isHalting_) { shmBuffer_->discardRecoCell(recoIndex); bufRef->release(); } } else { LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!"); } if (isHalting_) { bufRef->release(); return false; } */ return true; }
bool FUResourceQueue::discardDataEventWhileHalting | ( | MemRef_t * | bufRef | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 909 of file FUResourceQueue.cc.
{ /* I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg; msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation(); UInt_t recoIndex = msg->rbBufferID; if (acceptSMDataDiscard_[recoIndex]) { lock(); nbPendingSMDiscards_--; unlock(); acceptSMDataDiscard_[recoIndex] = false; if (!isHalting_) { shmBuffer_->discardRecoCell(recoIndex); bufRef->release(); } } else { LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!"); } if (isHalting_) { bufRef->release(); return false; } */ return true; }
bool FUResourceQueue::discardDqmEvent | ( | MemRef_t * | bufRef | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 941 of file FUResourceQueue.cc.
{ /* I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg; msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation(); UInt_t dqmIndex = msg->rbBufferID; unsigned int ntries = 0; while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) { LOG4CPLUS_WARN( log_, "DQM discard for cell " << dqmIndex << " which is not yer in SENT state - waiting"); ::usleep(10000); if (ntries++ > 10) { LOG4CPLUS_ERROR( log_, "DQM cell " << dqmIndex << " discard timed out while cell still in state " << shmBuffer_->dqmState(dqmIndex)); bufRef->release(); return true; } } if (acceptSMDqmDiscard_[dqmIndex] > 0) { acceptSMDqmDiscard_[dqmIndex]--; if (nbPendingSMDqmDiscards_ > 0) { nbPendingSMDqmDiscards_--; } else { LOG4CPLUS_WARN (log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];); } if (!isHalting_) { shmBuffer_->discardDqmCell(dqmIndex); bufRef->release(); } } else { LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex << " from StorageManager while cell is not accepting discards"); } if (isHalting_) { bufRef->release(); return false; } */ return true; }
bool FUResourceQueue::discardDqmEventWhileHalting | ( | MemRef_t * | bufRef | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 993 of file FUResourceQueue.cc.
{ /* I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg; msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation(); UInt_t dqmIndex = msg->rbBufferID; unsigned int ntries = 0; while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) { LOG4CPLUS_WARN( log_, "DQM discard for cell " << dqmIndex << " which is not yer in SENT state - waiting"); ::usleep(10000); if (ntries++ > 10) { LOG4CPLUS_ERROR( log_, "DQM cell " << dqmIndex << " discard timed out while cell still in state " << shmBuffer_->dqmState(dqmIndex)); bufRef->release(); return true; } } if (acceptSMDqmDiscard_[dqmIndex] > 0) { acceptSMDqmDiscard_[dqmIndex]--; if (nbPendingSMDqmDiscards_ > 0) { nbPendingSMDqmDiscards_--; } else { LOG4CPLUS_WARN (log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];); } if (!isHalting_) { shmBuffer_->discardDqmCell(dqmIndex); bufRef->release(); } } else { LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex << " from StorageManager while cell is not accepting discards"); } if (isHalting_) { bufRef->release(); return false; } */ return true; }
bool FUResourceQueue::discardWhileHalting | ( | bool | sendDiscards | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
isHalting_
isStopping_
Implements evf::IPCMethod.
Definition at line 625 of file FUResourceQueue.cc.
References gather_cfg::cout, evf::DISCARD_RAW_MESSAGE_TYPE, evf::IPCMethod::freeResourceIds_, evf::RawCache::getInstance(), evf::IPCMethod::lock(), msq_, evf::MasterQueue::rcvQuiet(), evf::RawCache::releaseMsg(), evf::IPCMethod::resources_, evf::IPCMethod::sendAllocate(), evf::IPCMethod::sendDiscard(), stor::utils::sleep(), and evf::IPCMethod::unlock().
{ bool reschedule = true; /* * DISCARDING raw msg buffers */ MsgBuf discardRaw(2 * sizeof(unsigned int), DISCARD_RAW_MESSAGE_TYPE); bool rcvSuccess = msq_.rcvQuiet(discardRaw); if (!rcvSuccess) { cout << "RCV failed!" << endl; ::sleep(5); return reschedule; } unsigned int* pBuID = (unsigned int*) discardRaw->mtext; unsigned int* pFuID = (unsigned int*) (discardRaw->mtext + sizeof(unsigned int)); unsigned int buResourceId = *pBuID; unsigned int fuResourceId = *pFuID; cout << "Discard received for buResourceID: " << buResourceId << " fuResourceID " << fuResourceId << endl << endl; //FUShmRawCell* cell = shmBuffer_->rawCellToDiscard(); //evt::State_t state = shmBuffer_->evtState(cell->index()); /* bool shutDown = (state == evt::STOP); bool isLumi = (state == evt::LUMISECTION); */ //UInt_t fuResourceId = cell->fuResourceId(); //UInt_t buResourceId = cell->buResourceId(); // cout << "discard loop, state, shutDown, isLumi " << state << " " // << shutDown << " " << isLumi << endl; // cout << "resource ids " << fuResourceId << " " << buResourceId << endl; /* if (shutDown) { LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_); if (nbClientsToShutDown_ > 0) --nbClientsToShutDown_; if (nbClientsToShutDown_ == 0) { LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop."); isActive_ = false; reschedule = false; } } */ //shmBuffer_->discardRawCell(cell); //if (!shutDown && !isLumi) { if (true) { // (false = no shmdt) resources_[fuResourceId]->release(false); // also release space in RawCache RawCache::getInstance()->releaseMsg(fuResourceId); lock(); freeResourceIds_.push(fuResourceId); assert(freeResourceIds_.size() <= resources_.size()); unlock(); if (false) { sendDiscard(buResourceId); if (false) sendAllocate(); } } // concept shutdown cycle /* if (!reschedule) { cout << " entered shutdown cycle " << endl; shmBuffer_->writeRecoEmptyEvent(); UInt_t count = 0; while (count < 100) { cout << " shutdown cycle " << shmBuffer_->nClients() << " " << FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << endl; if (shmBuffer_->nClients() == 0 && FUShmBuffer::shm_nattch( shmBuffer_->shmid()) == 1) { // isReadyToShutDown_ = true; break; } else { count++; cout << " shutdown cycle attempt " << count << endl; LOG4CPLUS_DEBUG( log_, "FUResourceTable: Wait for all clients to detach," << " nClients=" << shmBuffer_->nClients() << " nattch=" << FUShmBuffer::shm_nattch( shmBuffer_->shmid()) << " (" << count << ")"); ::usleep(shutdownTimeout_); if (count * shutdownTimeout_ > 10000000) LOG4CPLUS_WARN( log_, "FUResourceTable:LONG Wait (>10s) for all clients to detach," << " nClients=" << shmBuffer_->nClients() << " nattch=" << FUShmBuffer::shm_nattch( shmBuffer_->shmid()) << " (" << count << ")"); } } bool allEmpty = false; cout << "Checking if all dqm cells are empty " << endl; while (!allEmpty) { UInt_t n = nbDqmCells_; allEmpty = true; shmBuffer_->lock(); for (UInt_t i = 0; i < n; i++) { dqm::State_t state = shmBuffer_->dqmState(i); if (state != dqm::EMPTY) allEmpty = false; } shmBuffer_->unlock(); } cout << "Making sure there are no dqm pending discards " << endl; if (nbPendingSMDqmDiscards_ != 0) { LOG4CPLUS_WARN( log_, "FUResourceTable: pending DQM discards not zero: =" << nbPendingSMDqmDiscards_ << " while cells are all empty. This may cause problems at next start "); } shmBuffer_->writeDqmEmptyEvent(); isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the // sendDqm loop has been shut down as well } */ return reschedule; }
vector< string > FUResourceQueue::dqmCellStates | ( | ) | const [virtual] |
Implements evf::IPCMethod.
Definition at line 1264 of file FUResourceQueue.cc.
References query::result.
{ vector<string> result; /* if (0 != shmBuffer_) { UInt_t n = nbDqmCells_; shmBuffer_->lock(); for (UInt_t i = 0; i < n; i++) { dqm::State_t state = shmBuffer_->dqmState(i); if (state == dqm::EMPTY) result.push_back("EMPTY"); else if (state == dqm::WRITING) result.push_back("WRITING"); else if (state == dqm::WRITTEN) result.push_back("WRITTEN"); else if (state == dqm::SENDING) result.push_back("SENDING"); else if (state == dqm::SENT) result.push_back("SENT"); else if (state == dqm::DISCARDING) result.push_back("DISCARDING"); } shmBuffer_->unlock(); } */ return result; }
void FUResourceQueue::dropEvent | ( | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 1060 of file FUResourceQueue.cc.
{ /* FUShmRawCell* cell = shmBuffer_->rawCellToRead(); UInt_t fuResourceId = cell->fuResourceId(); shmBuffer_->finishReadingRawCell(cell); shmBuffer_->scheduleRawCellForDiscard(fuResourceId); */ }
void evf::FUResourceQueue::dumpEvent | ( | evf::FUShmRawCell * | cell | ) |
Dump event to ASCII file.
Reimplemented from evf::IPCMethod.
bool FUResourceQueue::handleCrashedEP | ( | UInt_t | runNumber, |
pid_t | pid | ||
) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 1071 of file FUResourceQueue.cc.
{ bool retval = false; /* vector<pid_t> pids = cellPrcIds(); UInt_t iRawCell = pids.size(); for (UInt_t i = 0; i < pids.size(); i++) { if (pid == pids[i]) { iRawCell = i; break; } } if (iRawCell < pids.size()) { shmBuffer_->writeErrorEventData(runNumber, pid, iRawCell); retval = true; } else LOG4CPLUS_WARN(log_, "No raw data to send to error stream for process " << pid); shmBuffer_->removeClientPrcId(pid); */ return retval; }
void FUResourceQueue::initialize | ( | bool | segmentationMode, |
UInt_t | nbRawCells, | ||
UInt_t | nbRecoCells, | ||
UInt_t | nbDqmCells, | ||
UInt_t | rawCellSize, | ||
UInt_t | recoCellSize, | ||
UInt_t | dqmCellSize | ||
) | throw (evf::Exception) |
Definition at line 77 of file FUResourceQueue.cc.
References hitfit::clear(), evf::RawCache::getInstance(), i, lumiQueryAPI::msg, and evf::FUResource::release().
{ rawCellSize_ = rawCellSize; recoCellSize_ = recoCellSize; dqmCellSize_ = dqmCellSize; clear(); if (0 == &msq_ || 0 == msq_.id()) { string msg = "CREATION OF MESSAGE QUEUE FAILED!"; LOG4CPLUS_FATAL(log_, msg); XCEPT_RAISE(evf::Exception, msg); } cache_ = RawCache::getInstance(); cache_->initialise(nbRawCells, rawCellSize); // SEC need cap on max resources for (UInt_t i = 0; i < nbRawCells_; i++) { FUResource* newResource = new FUResource(i, log_, frb_, app_); newResource->release(false); resources_.push_back(newResource); freeResourceIds_.push(i); } //acceptSMDataDiscard_ = new bool[nbRecoCells]; //acceptSMDqmDiscard_ = new int[nbDqmCells]; resetCounters(); }
void FUResourceQueue::lastResort | ( | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 1348 of file FUResourceQueue.cc.
{ /* cout << "lastResort: " << shmBuffer_->nbRawCellsToRead() << " more rawcells to read " << endl; while (shmBuffer_->nbRawCellsToRead() != 0) { FUShmRawCell* newCell = shmBuffer_->rawCellToRead(); cout << "lastResort: " << shmBuffer_->nbRawCellsToRead() << endl; shmBuffer_->scheduleRawEmptyCellForDiscardServerSide(newCell); cout << "lastResort: schedule raw cell for discard" << endl; } */ }
UInt_t FUResourceQueue::nbClients | ( | ) | const [virtual] |
Implements evf::IPCMethod.
Definition at line 1175 of file FUResourceQueue.cc.
References query::result.
UInt_t evf::FUResourceQueue::nbResources | ( | ) | const [inline, virtual] |
Implements evf::IPCMethod.
Definition at line 97 of file FUResourceQueue.h.
void FUResourceQueue::postEndOfLumiSection | ( | MemRef_t * | bufRef | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 1045 of file FUResourceQueue.cc.
{ /* I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg = (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *) bufRef->getDataLocation(); //make sure to fill up the shmem so no process will miss it // but processes will have to handle duplicates for (unsigned int i = 0; i < nbRawCells_; i++) shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection); */ }
void FUResourceQueue::resetCounters | ( | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 1144 of file FUResourceQueue.cc.
References evf::IPCMethod::nbAllocated_, evf::IPCMethod::nbAllocSent_, evf::IPCMethod::nbCompleted_, evf::IPCMethod::nbCrcErrors_, evf::IPCMethod::nbDiscarded_, evf::IPCMethod::nbErrors_, evf::IPCMethod::nbLost_, evf::IPCMethod::nbPending_, evf::IPCMethod::nbPendingSMDiscards_, evf::IPCMethod::nbPendingSMDqmDiscards_, evf::IPCMethod::nbSent_, evf::IPCMethod::nbSentDqm_, evf::IPCMethod::nbSentError_, evf::IPCMethod::sumOfSizes_, and evf::IPCMethod::sumOfSquares_.
{ /* if (0 != shmBuffer_) { for (UInt_t i = 0; i < shmBuffer_->nRecoCells(); i++) acceptSMDataDiscard_[i] = false; for (UInt_t i = 0; i < shmBuffer_->nDqmCells(); i++) acceptSMDqmDiscard_[i] = 0; } */ nbAllocated_ = nbPending_; nbCompleted_ = 0; nbSent_ = 0; nbSentError_ = 0; nbSentDqm_ = 0; nbPendingSMDiscards_ = 0; nbPendingSMDqmDiscards_ = 0; nbDiscarded_ = 0; nbLost_ = 0; nbErrors_ = 0; nbCrcErrors_ = 0; nbAllocSent_ = 0; sumOfSquares_ = 0; sumOfSizes_ = 0; //isStopping_ = false; }
void evf::FUResourceQueue::resetIPC | ( | ) | [inline, virtual] |
resets the underlying IPC method to the initial state
Implements evf::IPCMethod.
Definition at line 115 of file FUResourceQueue.h.
:
//
bool FUResourceQueue::sendData | ( | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 112 of file FUResourceQueue.cc.
References gather_cfg::cout, alignCSCRings::e, evf::FUShmRecoCell::eventSize(), evf::FUShmRecoCell::evtNumber(), Exception, evf::FUShmRecoCell::fuGuid(), evf::FUShmRecoCell::fuProcessId(), evf::FUShmRecoCell::index(), evf::IPCMethod::lock(), evf::IPCMethod::log_, msq_, evf::IPCMethod::nbPendingSMDiscards_, evf::IPCMethod::nbSent(), evf::IPCMethod::nbSent_, evf::IPCMethod::nbSentError_, evf::FUShmRecoCell::nExpectedEPs(), evf::FUShmRecoCell::outModId(), evf::FUShmRecoCell::payloadAddr(), evf::FUShmRecoCell::rawCellIndex(), evf::MasterQueue::rcvQuiet(), evf::RECO_MESSAGE_TYPE, evf::RecoMsgBuf::recoCell(), recoCellSize_, evf::IPCMethod::resources_, evf::FUShmRecoCell::runNumber(), evf::IPCMethod::runNumber_, evf::IPCMethod::sendDataEvent(), evf::IPCMethod::sendErrorEvent(), evf::IPCMethod::sendInitMessage(), stor::utils::sleep(), evf::FUShmRecoCell::type(), and evf::IPCMethod::unlock().
{ bool reschedule = true; //FUShmRecoCell* cell = shmBuffer_->recoCellToRead(); RecoMsgBuf recoMsg(recoCellSize_, RECO_MESSAGE_TYPE); bool rcvSuccess = msq_.rcvQuiet(recoMsg); if (!rcvSuccess) { cout << "RCV failed!" << endl; ::sleep(5); return reschedule; } FUShmRecoCell* cell = recoMsg.recoCell(); // event size 0 -> stop if (0 == cell->eventSize()) { LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop."); //UInt_t cellIndex = cell->index(); /* shmBuffer_->finishReadingRecoCell(cell); shmBuffer_->discardRecoCell(cellIndex); */ reschedule = false; // halting } else if (/*isHalting_*/false) { LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell."); //UInt_t cellIndex = cell->index(); /* shmBuffer_->finishReadingRecoCell(cell); shmBuffer_->discardRecoCell(cellIndex); */ } else { try { //init message if (cell->type() == 0) { UInt_t cellIndex = cell->index(); UInt_t cellOutModId = cell->outModId(); UInt_t cellFUProcId = cell->fuProcessId(); UInt_t cellFUGuid = cell->fuGuid(); UChar_t* cellPayloadAddr = cell->payloadAddr(); UInt_t cellEventSize = cell->eventSize(); UInt_t cellExpectedEPs = cell->nExpectedEPs(); //shmBuffer_->finishReadingRecoCell(cell); lock(); nbPendingSMDiscards_++; unlock(); sendInitMessage(cellIndex, cellOutModId, cellFUProcId, cellFUGuid, cellPayloadAddr, cellEventSize, cellExpectedEPs); // // DATA event message // } else if (cell->type() == 1) { UInt_t cellIndex = cell->index(); UInt_t cellRawIndex = cell->rawCellIndex(); UInt_t cellRunNumber = cell->runNumber(); UInt_t cellEvtNumber = cell->evtNumber(); UInt_t cellOutModId = cell->outModId(); UInt_t cellFUProcId = cell->fuProcessId(); UInt_t cellFUGuid = cell->fuGuid(); UChar_t *cellPayloadAddr = cell->payloadAddr(); UInt_t cellEventSize = cell->eventSize(); //shmBuffer_->finishReadingRecoCell(cell); lock(); nbPendingSMDiscards_++; resources_[cellRawIndex]->incNbSent(); if (resources_[cellRawIndex]->nbSent() == 1) nbSent_++; unlock(); sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber, cellOutModId, cellFUProcId, cellFUGuid, cellPayloadAddr, cellEventSize); // // ERROR event message // } else if (cell->type() == 2) { UInt_t cellIndex = cell->index(); UInt_t cellRawIndex = cell->rawCellIndex(); //UInt_t cellRunNumber = cell->runNumber(); UInt_t cellEvtNumber = cell->evtNumber(); UInt_t cellFUProcId = cell->fuProcessId(); UInt_t cellFUGuid = cell->fuGuid(); UChar_t *cellPayloadAddr = cell->payloadAddr(); UInt_t cellEventSize = cell->eventSize(); //shmBuffer_->finishReadingRecoCell(cell); lock(); nbPendingSMDiscards_++; resources_[cellRawIndex]->incNbSent(); if (resources_[cellRawIndex]->nbSent() == 1) { nbSent_++; nbSentError_++; } unlock(); sendErrorEvent(cellIndex, runNumber_, cellEvtNumber, cellFUProcId, cellFUGuid, cellPayloadAddr, cellEventSize); } else { string errmsg = "Unknown RecoCell type (neither INIT/DATA/ERROR)."; XCEPT_RAISE(evf::Exception, errmsg); } } catch (xcept::Exception& e) { LOG4CPLUS_FATAL( log_, "Failed to send EVENT DATA to StorageManager: " << xcept::stdformat_exception_history(e)); reschedule = false; } } return reschedule; }
bool FUResourceQueue::sendDataWhileHalting | ( | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 236 of file FUResourceQueue.cc.
References gather_cfg::cout, alignCSCRings::e, evf::FUShmRecoCell::eventSize(), evf::FUShmRecoCell::evtNumber(), Exception, evf::FUShmRecoCell::fuGuid(), evf::FUShmRecoCell::fuProcessId(), evf::FUShmRecoCell::index(), evf::IPCMethod::lock(), evf::IPCMethod::log_, msq_, evf::IPCMethod::nbPendingSMDiscards_, evf::IPCMethod::nbSent(), evf::IPCMethod::nbSent_, evf::IPCMethod::nbSentError_, evf::FUShmRecoCell::nExpectedEPs(), evf::FUShmRecoCell::outModId(), evf::FUShmRecoCell::payloadAddr(), evf::FUShmRecoCell::rawCellIndex(), evf::MasterQueue::rcvQuiet(), evf::RECO_MESSAGE_TYPE, evf::RecoMsgBuf::recoCell(), recoCellSize_, evf::IPCMethod::resources_, evf::FUShmRecoCell::runNumber(), evf::IPCMethod::runNumber_, evf::IPCMethod::sendDataEvent(), evf::IPCMethod::sendErrorEvent(), evf::IPCMethod::sendInitMessage(), stor::utils::sleep(), evf::FUShmRecoCell::type(), and evf::IPCMethod::unlock().
{ bool reschedule = true; //FUShmRecoCell* cell = shmBuffer_->recoCellToRead(); RecoMsgBuf recoMsg(recoCellSize_, RECO_MESSAGE_TYPE); bool rcvSuccess = msq_.rcvQuiet(recoMsg); if (!rcvSuccess) { cout << "RCV failed!" << endl; ::sleep(5); return reschedule; } FUShmRecoCell* cell = recoMsg.recoCell(); // event size 0 -> stop if (0 == cell->eventSize()) { LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop."); //UInt_t cellIndex = cell->index(); /* shmBuffer_->finishReadingRecoCell(cell); shmBuffer_->discardRecoCell(cellIndex); */ reschedule = false; // halting } else if (/*isHalting_*/true) { LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell."); //UInt_t cellIndex = cell->index(); /* shmBuffer_->finishReadingRecoCell(cell); shmBuffer_->discardRecoCell(cellIndex); */ } else { try { //init message if (cell->type() == 0) { UInt_t cellIndex = cell->index(); UInt_t cellOutModId = cell->outModId(); UInt_t cellFUProcId = cell->fuProcessId(); UInt_t cellFUGuid = cell->fuGuid(); UChar_t* cellPayloadAddr = cell->payloadAddr(); UInt_t cellEventSize = cell->eventSize(); UInt_t cellExpectedEPs = cell->nExpectedEPs(); //shmBuffer_->finishReadingRecoCell(cell); lock(); nbPendingSMDiscards_++; unlock(); sendInitMessage(cellIndex, cellOutModId, cellFUProcId, cellFUGuid, cellPayloadAddr, cellEventSize, cellExpectedEPs); // // DATA event message // } else if (cell->type() == 1) { UInt_t cellIndex = cell->index(); UInt_t cellRawIndex = cell->rawCellIndex(); UInt_t cellRunNumber = cell->runNumber(); UInt_t cellEvtNumber = cell->evtNumber(); UInt_t cellOutModId = cell->outModId(); UInt_t cellFUProcId = cell->fuProcessId(); UInt_t cellFUGuid = cell->fuGuid(); UChar_t *cellPayloadAddr = cell->payloadAddr(); UInt_t cellEventSize = cell->eventSize(); //shmBuffer_->finishReadingRecoCell(cell); lock(); nbPendingSMDiscards_++; resources_[cellRawIndex]->incNbSent(); if (resources_[cellRawIndex]->nbSent() == 1) nbSent_++; unlock(); sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber, cellOutModId, cellFUProcId, cellFUGuid, cellPayloadAddr, cellEventSize); // // ERROR event message // } else if (cell->type() == 2) { UInt_t cellIndex = cell->index(); UInt_t cellRawIndex = cell->rawCellIndex(); //UInt_t cellRunNumber = cell->runNumber(); UInt_t cellEvtNumber = cell->evtNumber(); UInt_t cellFUProcId = cell->fuProcessId(); UInt_t cellFUGuid = cell->fuGuid(); UChar_t *cellPayloadAddr = cell->payloadAddr(); UInt_t cellEventSize = cell->eventSize(); //shmBuffer_->finishReadingRecoCell(cell); lock(); nbPendingSMDiscards_++; resources_[cellRawIndex]->incNbSent(); if (resources_[cellRawIndex]->nbSent() == 1) { nbSent_++; nbSentError_++; } unlock(); sendErrorEvent(cellIndex, runNumber_, cellEvtNumber, cellFUProcId, cellFUGuid, cellPayloadAddr, cellEventSize); } else { string errmsg = "Unknown RecoCell type (neither INIT/DATA/ERROR)."; XCEPT_RAISE(evf::Exception, errmsg); } } catch (xcept::Exception& e) { LOG4CPLUS_FATAL( log_, "Failed to send EVENT DATA to StorageManager: " << xcept::stdformat_exception_history(e)); reschedule = false; } } return reschedule; }
bool FUResourceQueue::sendDqm | ( | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 362 of file FUResourceQueue.cc.
References gather_cfg::cout, evf::DQM_MESSAGE_TYPE, evf::DQMMsgBuf::dqmCell(), dqmCellSize_, alignCSCRings::e, evf::FUShmDqmCell::eventSize(), evf::FUShmDqmCell::evtAtUpdate(), Exception, evf::FUShmDqmCell::folderId(), evf::FUShmDqmCell::fuGuid(), evf::FUShmDqmCell::fuProcessId(), evf::FUShmDqmCell::index(), evf::IPCMethod::log_, msq_, evf::FUShmDqmCell::payloadAddr(), evf::MasterQueue::rcvQuiet(), evf::FUShmDqmCell::runNumber(), evf::IPCMethod::sendDqmEvent(), and stor::utils::sleep().
{ bool reschedule = true; //FUShmDqmCell* cell = shmBuffer_->dqmCellToRead(); //dqm::State_t state = shmBuffer_->dqmState(cell->index()); DQMMsgBuf dqmMsg(dqmCellSize_, DQM_MESSAGE_TYPE); bool rcvSuccess = msq_.rcvQuiet(dqmMsg); if (!rcvSuccess) { cout << "RCV failed!" << endl; ::sleep(5); return reschedule; } FUShmDqmCell* cell = dqmMsg.dqmCell(); // concept add stop messages (there is no more "state") //if (state == dqm::EMPTY) { if (false) { LOG4CPLUS_WARN(log_, "Don't reschedule sendDqm workloop."); cout << "shut down dqm workloop " << endl; //UInt_t cellIndex = cell->index(); /* shmBuffer_->finishReadingDqmCell(cell); shmBuffer_->discardDqmCell(cellIndex); */ reschedule = false; } else if (/*isHalting_*/false) { //UInt_t cellIndex = cell->index(); /* shmBuffer_->finishReadingDqmCell(cell); shmBuffer_->discardDqmCell(cellIndex); */ } else { try { UInt_t cellIndex = cell->index(); UInt_t cellRunNumber = cell->runNumber(); UInt_t cellEvtAtUpdate = cell->evtAtUpdate(); UInt_t cellFolderId = cell->folderId(); UInt_t cellFUProcId = cell->fuProcessId(); UInt_t cellFUGuid = cell->fuGuid(); UChar_t *cellPayloadAddr = cell->payloadAddr(); UInt_t cellEventSize = cell->eventSize(); sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate, cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr, cellEventSize); //shmBuffer_->finishReadingDqmCell(cell); } catch (xcept::Exception& e) { LOG4CPLUS_FATAL( log_, "Failed to send DQM DATA to StorageManager: " << xcept::stdformat_exception_history(e)); reschedule = false; } } return reschedule; }
bool FUResourceQueue::sendDqmWhileHalting | ( | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 422 of file FUResourceQueue.cc.
References gather_cfg::cout, evf::DQM_MESSAGE_TYPE, evf::DQMMsgBuf::dqmCell(), dqmCellSize_, alignCSCRings::e, evf::FUShmDqmCell::eventSize(), evf::FUShmDqmCell::evtAtUpdate(), Exception, evf::FUShmDqmCell::folderId(), evf::FUShmDqmCell::fuGuid(), evf::FUShmDqmCell::fuProcessId(), evf::FUShmDqmCell::index(), evf::IPCMethod::log_, msq_, evf::FUShmDqmCell::payloadAddr(), evf::MasterQueue::rcvQuiet(), evf::FUShmDqmCell::runNumber(), evf::IPCMethod::sendDqmEvent(), and stor::utils::sleep().
{ bool reschedule = true; //FUShmDqmCell* cell = shmBuffer_->dqmCellToRead(); //dqm::State_t state = shmBuffer_->dqmState(cell->index()); DQMMsgBuf dqmMsg(dqmCellSize_, DQM_MESSAGE_TYPE); bool rcvSuccess = msq_.rcvQuiet(dqmMsg); if (!rcvSuccess) { cout << "RCV failed!" << endl; ::sleep(5); return reschedule; } FUShmDqmCell* cell = dqmMsg.dqmCell(); // concept add stop messages (there is no more "state") //if (state == dqm::EMPTY) { if (false) { LOG4CPLUS_WARN(log_, "Don't reschedule sendDqm workloop."); cout << "shut down dqm workloop " << endl; //UInt_t cellIndex = cell->index(); /* shmBuffer_->finishReadingDqmCell(cell); shmBuffer_->discardDqmCell(cellIndex); */ reschedule = false; } else if (/*isHalting_*/true) { //UInt_t cellIndex = cell->index(); /* shmBuffer_->finishReadingDqmCell(cell); shmBuffer_->discardDqmCell(cellIndex); */ } else { try { UInt_t cellIndex = cell->index(); UInt_t cellRunNumber = cell->runNumber(); UInt_t cellEvtAtUpdate = cell->evtAtUpdate(); UInt_t cellFolderId = cell->folderId(); UInt_t cellFUProcId = cell->fuProcessId(); UInt_t cellFUGuid = cell->fuGuid(); UChar_t *cellPayloadAddr = cell->payloadAddr(); UInt_t cellEventSize = cell->eventSize(); sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate, cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr, cellEventSize); //shmBuffer_->finishReadingDqmCell(cell); } catch (xcept::Exception& e) { LOG4CPLUS_FATAL( log_, "Failed to send DQM DATA to StorageManager: " << xcept::stdformat_exception_history(e)); reschedule = false; } } return reschedule; }
void FUResourceQueue::shutDownClients | ( | ) | [virtual] |
Has to be implemented by subclasses, according to IPC type.
Implements evf::IPCMethod.
Definition at line 1096 of file FUResourceQueue.cc.
References evf::IPCMethod::isReadyToShutDown_.
{ isReadyToShutDown_ = true; /* nbClientsToShutDown_ = nbClients(); isReadyToShutDown_ = false; if (nbClientsToShutDown_ == 0) { LOG4CPLUS_INFO( log_, "No clients to shut down. Checking if there are raw cells not assigned to any process yet"); UInt_t n = nbResources(); for (UInt_t i = 0; i < n; i++) { evt::State_t state = shmBuffer_->evtState(i); if (state != evt::EMPTY) { LOG4CPLUS_WARN( log_, "Schedule discard at STOP for orphaned event in state " << state); shmBuffer_->scheduleRawCellForDiscardServerSide(i); } } shmBuffer_->scheduleRawEmptyCellForDiscard(); } else { UInt_t n = nbClientsToShutDown_; for (UInt_t i = 0; i < n; ++i) shmBuffer_->writeRawEmptyEvent(); } */ }
RawCache* evf::FUResourceQueue::cache_ [private] |
Definition at line 125 of file FUResourceQueue.h.
Referenced by buildResource().
UInt_t evf::FUResourceQueue::dqmCellSize_ [private] |
Definition at line 128 of file FUResourceQueue.h.
Referenced by sendDqm(), and sendDqmWhileHalting().
MasterQueue evf::FUResourceQueue::msq_ [private] |
Definition at line 124 of file FUResourceQueue.h.
Referenced by buildResource(), discard(), discardWhileHalting(), sendData(), sendDataWhileHalting(), sendDqm(), sendDqmWhileHalting(), and ~FUResourceQueue().
UInt_t evf::FUResourceQueue::rawCellSize_ [private] |
Definition at line 128 of file FUResourceQueue.h.
UInt_t evf::FUResourceQueue::recoCellSize_ [private] |
Definition at line 128 of file FUResourceQueue.h.
Referenced by sendData(), and sendDataWhileHalting().