CMS 3D CMS Logo

Public Member Functions | Private Member Functions | Private Attributes

evf::FUResourceTable Class Reference

#include <FUResourceTable.h>

Inheritance diagram for evf::FUResourceTable:
evf::IPCMethod

List of all members.

Public Member Functions

bool buildResource (MemRef_t *bufRef)
std::vector< UInt_tcellEvtNumbers () const
std::vector< pid_t > cellPrcIds () const
std::vector< std::string > cellStates () const
std::vector< time_t > cellTimeStamps () const
void clear ()
std::vector< pid_t > clientPrcIds () const
std::string clientPrcIdsAsString () const
bool discard ()
bool discardDataEvent (MemRef_t *bufRef)
bool discardDataEventWhileHalting (MemRef_t *bufRef)
bool discardDqmEvent (MemRef_t *bufRef)
bool discardDqmEventWhileHalting (MemRef_t *bufRef)
bool discardWhileHalting (bool sendDiscards)
std::vector< std::string > dqmCellStates () const
void dropEvent ()
 FUResourceTable (bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int, EvffedFillerRB *frb, xdaq::Application *) throw (evf::Exception)
bool handleCrashedEP (UInt_t runNumber, pid_t pid)
void initialize (bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize) throw (evf::Exception)
void lastResort ()
UInt_t nbClients () const
UInt_t nbResources () const
unsigned int nbResources ()
void postEndOfLumiSection (MemRef_t *bufRef)
std::string printStatus ()
void resetCounters ()
void resetIPC ()
 reset the ShmBuffer to the initial state
bool sendData ()
bool sendDataWhileHalting ()
bool sendDqm ()
bool sendDqmWhileHalting ()
void shutDownClients ()
virtual ~FUResourceTable ()

Private Member Functions

void discardNoReschedule ()
void rethrowShmBufferException (evf::Exception &e, std::string where) const throw (evf::Exception)
void shutdownWatchdog (unsigned int timeout)

Private Attributes

bool sDataActive_
bool sDqmActive_
FUShmBuffershmBuffer_
std::atomic_bool watchDogEnd_
std::atomic_bool watchDogSetFailed_

Detailed Description

Table of resources linked with the Shared Memory Buffer

Author:
smorovic

Definition at line 40 of file FUResourceTable.h.


Constructor & Destructor Documentation

FUResourceTable::FUResourceTable ( bool  segmentationMode,
UInt_t  nbRawCells,
UInt_t  nbRecoCells,
UInt_t  nbDqmCells,
UInt_t  rawCellSize,
UInt_t  recoCellSize,
UInt_t  dqmCellSize,
int  freeResReq,
BUProxy bu,
SMProxy sm,
log4cplus::Logger  logger,
unsigned int  timeout,
EvffedFillerRB frb,
xdaq::Application *  app 
) throw (evf::Exception)

Definition at line 31 of file FUResourceTable.cc.

                                                                               :

        // call super constructor
                        IPCMethod(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
                                        rawCellSize, recoCellSize, dqmCellSize, freeResReq, bu, sm,
                                        logger, timeout, frb, app), shmBuffer_(0)

{
        initialize(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
                        rawCellSize, recoCellSize, dqmCellSize);
}
FUResourceTable::~FUResourceTable ( ) [virtual]

Definition at line 48 of file FUResourceTable.cc.

References hitfit::clear(), and evf::FUShmBuffer::releaseSharedMemory().

                                  {
        clear();
        //workloop cancels used to be here in the previous version
        shmdt( shmBuffer_);
        if (FUShmBuffer::releaseSharedMemory())
                LOG4CPLUS_INFO(log_, "SHARED MEMORY SUCCESSFULLY RELEASED.");
        if (0 != acceptSMDataDiscard_)
                delete[] acceptSMDataDiscard_;
        if (0 != acceptSMDqmDiscard_)
                delete[] acceptSMDqmDiscard_;
}

Member Function Documentation

bool FUResourceTable::buildResource ( MemRef_t bufRef) [virtual]

Process buffer received via I2O_FU_TAKE message

Implements evf::IPCMethod.

Definition at line 581 of file FUResourceTable.cc.

References evf::FUResource::allocate(), Association::block, gather_cfg::cout, evf::FUResource::doCrcCheck(), dumpEvent(), alignCSCRings::e, evf::FUResource::fatalError(), evf::FUResource::isAllocated(), evf::FUResource::isComplete(), CommonMethods::lock(), evf::FUResource::nbCrcErrors(), evf::FUResource::nbErrors(), cmsPerfSuiteHarvest::now, evf::FUResource::process(), evf::FUResource::release(), and evf::FUResource::shmCell().

                                                    {
        bool eventComplete = false;
        // UPDATED
        bool lastMsg = isLastMessageOfEvent(bufRef);
        I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =
                        (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();

        UInt_t fuResourceId = (UInt_t) block->fuTransactionId;
        UInt_t buResourceId = (UInt_t) block->buResourceId;
        // Check input
        if ((int) block->fuTransactionId < 0 || fuResourceId >= nbRawCells_
                        || (int) block->buResourceId < 0) {
                stringstream failureStr;
                failureStr << "Received TAKE message with invalid bu/fu resource id:"
                                << " fuResourceId: " << fuResourceId << " buResourceId: "
                                << buResourceId;
                LOG4CPLUS_ERROR(log_, failureStr.str());
                XCEPT_RAISE(evf::Exception, failureStr.str());
        }
        FUResource* resource = resources_[fuResourceId];

        // allocate resource
        if (!resource->fatalError() && !resource->isAllocated()) {
                FUShmRawCell* cell = 0;
                try {
                        cell = shmBuffer_->rawCellToWrite();
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:buildResource:rawCellToWrite");
                }
                if (cell == 0) {
                        bufRef->release();
                        return eventComplete;
                }
                resource->allocate(cell);
                timeval now;
                gettimeofday(&now, 0);

                frb_->setRBTimeStamp(
                                ((uint64_t)(now.tv_sec) << 32) + (uint64_t)(now.tv_usec));

                frb_->setRBEventCount(nbCompleted_);

                if (doCrcCheck_ > 0 && 0 == nbAllocated_ % doCrcCheck_)
                        resource->doCrcCheck(true);
                else
                        resource->doCrcCheck(false);
        }

#ifdef DEBUG_RES_TAB
        std::cout << "Received frame for resource " << buResourceId << std::endl;
#endif
        // keep building this resource if it is healthy
        if (!resource->fatalError()) {
#ifdef DEBUG_RES_TAB
                std::cout << "No fatal error for  " << buResourceId << ", keep building..."<< std::endl;
#endif
                resource->process(bufRef);
                lock();
                nbErrors_ += resource->nbErrors();
                nbCrcErrors_ += resource->nbCrcErrors();
                unlock();
#ifdef DEBUG_RES_TAB
                std::cout << "Checking if resource is complete " << buResourceId << std::endl;
#endif
                // make resource available for pick-up
                if (resource->isComplete()) {
#ifdef DEBUG_RES_TAB
                        std::cout << "@@@@RESOURCE is COMPLETE " << buResourceId << std::endl;
#endif
                        lock();
                        nbCompleted_++;
                        nbPending_--;
                        unlock();
                        if (doDumpEvents_ > 0 && nbCompleted_ % doDumpEvents_ == 0)
                                dumpEvent(resource->shmCell());
                        try {
                                shmBuffer_->finishWritingRawCell(resource->shmCell());
                        } catch (evf::Exception& e) {
                                rethrowShmBufferException(e,
                                                "FUResourceTable:buildResource:finishWritingRawCell");
                        }
                        eventComplete = true;
                }

        }
        // bad event, release msg, and the whole resource if this was the last one
        if (resource->fatalError()) {
                if (lastMsg) {
                        try {
                                shmBuffer_->releaseRawCell(resource->shmCell());
                        } catch (evf::Exception& e) {
                                rethrowShmBufferException(e,
                                                "FUResourceTable:buildResource:releaseRawCell");
                        }
                        resource->release(true);
                        lock();
                        freeResourceIds_.push(fuResourceId);
                        nbDiscarded_++;
                        nbLost_++;
                        nbPending_--;
                        unlock();
                        bu_->sendDiscard(buResourceId);
                        sendAllocate();
                }
                //bufRef->release(); // this should now be safe re: appendToSuperFrag as corrupted blocks will be removed...
        }

        return eventComplete;
}
vector< UInt_t > FUResourceTable::cellEvtNumbers ( ) const [virtual]

Implements evf::IPCMethod.

Definition at line 1325 of file FUResourceTable.cc.

References alignCSCRings::e, i, n, and query::result.

                                                     {
        vector < UInt_t > result;
        if (0 != shmBuffer_) {
                UInt_t n = nbResources();
                shmBuffer_->lock();
                try {
                        for (UInt_t i = 0; i < n; i++)
                                result.push_back(shmBuffer_->evtNumber(i));
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:cellEvtNumbers:evtNumber");
                }
                shmBuffer_->unlock();
        }
        return result;
}
vector< pid_t > FUResourceTable::cellPrcIds ( ) const [virtual]

Implements evf::IPCMethod.

Definition at line 1343 of file FUResourceTable.cc.

References alignCSCRings::e, i, n, and query::result.

                                                {
        vector < pid_t > result;
        if (0 != shmBuffer_) {
                UInt_t n = nbResources();
                shmBuffer_->lock();
                try {
                        for (UInt_t i = 0; i < n; i++)
                                result.push_back(shmBuffer_->evtPrcId(i));
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e, "FUResourceTable:cellPrcIds:evtPrcId");
                }
                shmBuffer_->unlock();
        }
        return result;
}
vector< string > FUResourceTable::cellStates ( ) const [virtual]

Implements evf::IPCMethod.

Definition at line 1246 of file FUResourceTable.cc.

References evf::evt::DISCARDING, alignCSCRings::e, evf::evt::EMPTY, i, evf::evt::LUMISECTION, n, evf::evt::PROCESSED, evf::evt::PROCESSING, evf::evt::RAWREAD, evf::evt::RAWREADING, evf::evt::RAWWRITING, evf::evt::RAWWRITTEN, evf::evt::RECOWRITING, evf::evt::RECOWRITTEN, query::result, evf::evt::SENDING, evf::evt::SENT, evf::evt::STOP, and evf::evt::USEDLS.

                                                 {
        vector < string > result;
        if (0 != shmBuffer_) {
                UInt_t n = nbResources();
                shmBuffer_->lock();
                try {
                        for (UInt_t i = 0; i < n; i++) {
                                evt::State_t state = shmBuffer_->evtState(i);
                                if (state == evt::EMPTY)
                                        result.push_back("EMPTY");
                                else if (state == evt::STOP)
                                        result.push_back("STOP");
                                else if (state == evt::LUMISECTION)
                                        result.push_back("LUMISECTION");
                                // UPDATED
                                else if (state == evt::USEDLS)
                                        result.push_back("USEDLS");
                                else if (state == evt::RAWWRITING)
                                        result.push_back("RAWWRITING");
                                else if (state == evt::RAWWRITTEN)
                                        result.push_back("RAWWRITTEN");
                                else if (state == evt::RAWREADING)
                                        result.push_back("RAWREADING");
                                else if (state == evt::RAWREAD)
                                        result.push_back("RAWREAD");
                                else if (state == evt::PROCESSING)
                                        result.push_back("PROCESSING");
                                else if (state == evt::PROCESSED)
                                        result.push_back("PROCESSED");
                                else if (state == evt::RECOWRITING)
                                        result.push_back("RECOWRITING");
                                else if (state == evt::RECOWRITTEN)
                                        result.push_back("RECOWRITTEN");
                                else if (state == evt::SENDING)
                                        result.push_back("SENDING");
                                else if (state == evt::SENT)
                                        result.push_back("SENT");
                                else if (state == evt::DISCARDING)
                                        result.push_back("DISCARDING");
                        }
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e, "FUResourceTable:cellStates:evtState");
                }
                shmBuffer_->unlock();
        }
        return result;
}
vector< time_t > FUResourceTable::cellTimeStamps ( ) const [virtual]

Implements evf::IPCMethod.

Definition at line 1360 of file FUResourceTable.cc.

References alignCSCRings::e, i, n, and query::result.

                                                     {
        vector < time_t > result;
        try {
                if (0 != shmBuffer_) {
                        UInt_t n = nbResources();
                        shmBuffer_->lock();
                        for (UInt_t i = 0; i < n; i++)
                                result.push_back(shmBuffer_->evtTimeStamp(i));
                        shmBuffer_->unlock();
                }
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e,
                                "FUResourceTable:cellTimeStamps:evtTimeStamp");
        }
        return result;
}
void FUResourceTable::clear ( void  ) [virtual]

Clear contents of resource table. (empty all containers)

Implements evf::IPCMethod.

Definition at line 1142 of file FUResourceTable.cc.

References i.

                            {
        for (UInt_t i = 0; i < resources_.size(); i++) {
                resources_[i]->release(true);
                delete resources_[i];
        }
        resources_.clear();
        while (!freeResourceIds_.empty())
                freeResourceIds_.pop();
}
vector< pid_t > FUResourceTable::clientPrcIds ( ) const [virtual]

Implements evf::IPCMethod.

Definition at line 1211 of file FUResourceTable.cc.

References alignCSCRings::e, i, n, and query::result.

                                                  {
        vector < pid_t > result;
        try {
                if (0 != shmBuffer_) {
                        UInt_t n = nbClients();
                        for (UInt_t i = 0; i < n; i++)
                                result.push_back(shmBuffer_->clientPrcId(i));
                }
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e,
                                "FUResourceTable:clientPrcIds:clientPrcIds");
        }
        return result;
}
string FUResourceTable::clientPrcIdsAsString ( ) const [virtual]

Implements evf::IPCMethod.

Definition at line 1227 of file FUResourceTable.cc.

References alignCSCRings::e, i, and n.

                                                   {
        stringstream ss;
        try {
                if (0 != shmBuffer_) {
                        UInt_t n = nbClients();
                        for (UInt_t i = 0; i < n; i++) {
                                if (i > 0)
                                        ss << ",";
                                ss << shmBuffer_->clientPrcId(i);
                        }
                }
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e,
                                "FUResourceTable:clientPrcIdsAsString:clientPrcId");
        }
        return ss.str();
}
bool FUResourceTable::discard ( ) [virtual]

Corresponds to the Discard workloop, to be called in normal operation.

Implements evf::IPCMethod.

Definition at line 438 of file FUResourceTable.cc.

References evf::FUShmRawCell::buResourceId(), alignCSCRings::e, evf::evt::EMPTY, evf::FUShmRawCell::fuResourceId(), evf::FUShmRawCell::index(), CommonMethods::lock(), evf::evt::STOP, and evf::evt::USEDLS.

                              {
        FUShmRawCell* cell = 0;
        // initialize to a value to avoid warnings
        evt::State_t state = evt::EMPTY;
        try {
                cell = shmBuffer_->rawCellToDiscard();
                state = shmBuffer_->evtState(cell->index());
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e,
                                "FUResourceTable:discard:rawCellToRead/evtState");
        }

        bool reschedule = true;
        bool shutDown = (state == evt::STOP);
        bool isLumi = (state == evt::USEDLS);
        UInt_t fuResourceId = cell->fuResourceId();
        UInt_t buResourceId = cell->buResourceId();

        if (state == evt::EMPTY) {
                LOG4CPLUS_ERROR(log_, "WARNING! ATTEMPTING TO DISCARD EMPTY CELL!!!");
                return true;
        }

        if (shutDown) {
                LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
                if (nbClientsToShutDown_ > 0)
                        --nbClientsToShutDown_;
                if (nbClientsToShutDown_ == 0) {
                        LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
                        isActive_ = false;
                        reschedule = false;
                }
        }

        try {
                shmBuffer_->discardRawCell(cell);
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e, "FUResourceTable:discard:discardRawCell");
        }
        // UPDATED
        if (isLumi)
                nbEolDiscarded_++;

        if (!shutDown && !isLumi) {
                if (fuResourceId >= nbResources()) {
                        LOG4CPLUS_WARN(
                                        log_,
                                        "cell " << cell->index() << " in state " << state
                                                        << " scheduled for discard has no associated FU resource ");
                } else {
                        resources_[fuResourceId]->release(true);
                        lock();
                        freeResourceIds_.push(fuResourceId);
                        assert(freeResourceIds_.size() <= resources_.size());
                        unlock();

                        sendDiscard(buResourceId);
                        sendAllocate();
                }
        }

        if (!reschedule) {
                discardNoReschedule();
        }

        return reschedule;
}
bool FUResourceTable::discardDataEvent ( MemRef_t bufRef) [virtual]

Process buffer received via I2O_SM_DATA_DISCARD message in normal operation.

Implements evf::IPCMethod.

Definition at line 693 of file FUResourceTable.cc.

References alignCSCRings::e, CommonMethods::lock(), lumiQueryAPI::msg, and _I2O_FU_DATA_DISCARD_MESSAGE_FRAME::rbBufferID.

                                                       {
        I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
        msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
        UInt_t recoIndex = msg->rbBufferID;

        // Check input
        if ((int) msg->rbBufferID < 0 || recoIndex >= nbRecoCells_)
                LOG4CPLUS_ERROR(
                                log_,
                                "Received DISCARD DATA message with invalid recoIndex:"
                                                << recoIndex);

        if (acceptSMDataDiscard_[recoIndex]) {
                lock();
                nbPendingSMDiscards_--;
                unlock();
                acceptSMDataDiscard_[recoIndex] = false;

                try {
                        shmBuffer_->discardRecoCell(recoIndex);
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:discardDataEvent:discardRecoCell");
                }

        } else {
                LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
        }

        bufRef->release();
        return true;
}
bool FUResourceTable::discardDataEventWhileHalting ( MemRef_t bufRef) [virtual]

Process buffer received via I2O_SM_DATA_DISCARD message while halting.

Implements evf::IPCMethod.

Definition at line 727 of file FUResourceTable.cc.

References CommonMethods::lock(), lumiQueryAPI::msg, and _I2O_FU_DATA_DISCARD_MESSAGE_FRAME::rbBufferID.

                                                                   {
        I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
        msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
        UInt_t recoIndex = msg->rbBufferID;

        // Check input
        if ((int) msg->rbBufferID < 0 || recoIndex >= nbRecoCells_)
                LOG4CPLUS_ERROR(
                                log_,
                                "Received DISCARD DATA message with invalid recoIndex:"
                                                << recoIndex);

        if (acceptSMDataDiscard_[recoIndex]) {
                lock();
                nbPendingSMDiscards_--;
                unlock();
                acceptSMDataDiscard_[recoIndex] = false;

        } else {
                LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
        }

        bufRef->release();
        return false;
}
bool FUResourceTable::discardDqmEvent ( MemRef_t bufRef) [virtual]

Process buffer received via I2O_SM_DQM_DISCARD message in normal operation.

Implements evf::IPCMethod.

Definition at line 754 of file FUResourceTable.cc.

References alignCSCRings::e, lumiQueryAPI::msg, _I2O_FU_DQM_DISCARD_MESSAGE_FRAME::rbBufferID, and evf::evt::SENT.

                                                      {
        I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
        msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
        UInt_t dqmIndex = msg->rbBufferID;

        // Check input
        if ((int) msg->rbBufferID < 0 || dqmIndex >= nbDqmCells_)
                LOG4CPLUS_ERROR(
                                log_,
                                "Received DISCARD DQM message with invalid dqmIndex:"
                                                << dqmIndex);

        unsigned int ntries = 0;
        try {
                while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
                        if (ntries)//tolerate one attempt
                        LOG4CPLUS_WARN(
                                        log_,
                                        "DQM discard for cell " << dqmIndex
                                                        << " which is not yet in SENT state - waiting");
                        ::usleep(10000);
                        if (ntries++ > 10) {
                                LOG4CPLUS_ERROR(
                                                log_,
                                                "DQM cell " << dqmIndex
                                                                << " discard timed out while cell still in state "
                                                                << shmBuffer_->dqmState(dqmIndex));
                                bufRef->release();
                                return true;
                        }
                }
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e, "FUResourceTable:discardDqmEvent:dqmState");
        }
        if (acceptSMDqmDiscard_[dqmIndex] > 0) {
                acceptSMDqmDiscard_[dqmIndex]--;
                if (--nbPendingSMDqmDiscards_ < 0) {
                        LOG4CPLUS_WARN(
                                        log_,
                                        "Spurious??? DQM discard by StorageManager, index "
                                                        << dqmIndex << " cell state "
                                                        << shmBuffer_->dqmState(dqmIndex)
                                                        << " accept flag " << acceptSMDqmDiscard_[dqmIndex]);
                }
                try {
                        shmBuffer_->discardDqmCell(dqmIndex);
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:discardDqmEvent:discardDqmCell");
                }

        } else {
                LOG4CPLUS_ERROR(
                                log_,
                                "Spurious DQM discard for cell " << dqmIndex
                                                << " from StorageManager while cell is not accepting discards");
        }

        bufRef->release();
        return true;
}
bool FUResourceTable::discardDqmEventWhileHalting ( MemRef_t bufRef) [virtual]

Process buffer received via I2O_SM_DQM_DISCARD message while halting.

Implements evf::IPCMethod.

Definition at line 817 of file FUResourceTable.cc.

References alignCSCRings::e, lumiQueryAPI::msg, _I2O_FU_DQM_DISCARD_MESSAGE_FRAME::rbBufferID, and evf::evt::SENT.

                                                                  {
        I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
        msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
        UInt_t dqmIndex = msg->rbBufferID;

        // Check input
        if ((int) msg->rbBufferID < 0 || dqmIndex >= nbDqmCells_)
                LOG4CPLUS_ERROR(
                                log_,
                                "Received DISCARD DQM message with invalid dqmIndex:"
                                                << dqmIndex);

        unsigned int ntries = 0;
        try {
                while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
                        if (ntries)//tolerate one attempt
                        LOG4CPLUS_WARN(
                                        log_,
                                        "DQM discard for cell " << dqmIndex
                                                        << " which is not yet in SENT state - waiting");
                        ::usleep(10000);
                        if (ntries++ > 10) {
                                LOG4CPLUS_ERROR(
                                                log_,
                                                "DQM cell " << dqmIndex
                                                                << " discard timed out while cell still in state "
                                                                << shmBuffer_->dqmState(dqmIndex));
                                bufRef->release();
                                return true;
                        }
                }
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e,
                                "FUResourceTable:discardDqmEventWhileHalting:dqmState(2)");
        }
        if (acceptSMDqmDiscard_[dqmIndex] > 0) {
                acceptSMDqmDiscard_[dqmIndex]--;
                if (--nbPendingSMDqmDiscards_ < 0) {
                        try {
                                LOG4CPLUS_WARN(
                                                log_,
                                                "Spurious??? DQM discard by StorageManager, index "
                                                                << dqmIndex << " cell state "
                                                                << shmBuffer_->dqmState(dqmIndex)
                                                                << " accept flag "
                                                                << acceptSMDqmDiscard_[dqmIndex]);
                        } catch (evf::Exception& e) {
                                rethrowShmBufferException(e,
                                                "FUResourceTable:discardDqmEventWhileHalting:dqmState");
                        }
                }

        } else {
                LOG4CPLUS_ERROR(
                                log_,
                                "Spurious DQM discard for cell " << dqmIndex
                                                << " from StorageManager while cell is not accepting discards");
        }

        bufRef->release();
        return false;
}
void FUResourceTable::discardNoReschedule ( ) [private]

Called when entering the shutdown cycle. The function sets readyToShutDown to true, allowing the Resource Table to be safely shut down.

Definition at line 354 of file FUResourceTable.cc.

References prof2calltree::count, gather_cfg::cout, alignCSCRings::e, evf::dqm::EMPTY, evf::evt::EMPTY, i, n, and evf::FUShmBuffer::shm_nattch().

                                          {
        std::cout << " entered shutdown cycle " << std::endl;
        shutdownStatus_|=1<<11;
        try {
                shmBuffer_->writeRecoEmptyEvent();
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e,
                                "FUResourceTable:discardNoReschedule:writeRecoEmptyEvent");
        }

        UInt_t count = 0;
        while (count < 100) {
                std::cout << " shutdown cycle " << shmBuffer_->nClients() << " "
                                << FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << std::endl;
                if (shmBuffer_->nClients() == 0 && FUShmBuffer::shm_nattch(
                                shmBuffer_->shmid()) == 1) {
                        shutdownStatus_|=1<<12;
                        //isReadyToShutDown_ = true;
                        break;
                } else {
                        count++;
                        std::cout << " shutdown cycle attempt " << count << std::endl;
                        LOG4CPLUS_DEBUG(
                                        log_,
                                        "FUResourceTable: Wait for all clients to detach,"
                                                        << " nClients=" << shmBuffer_->nClients()
                                                        << " nattch=" << FUShmBuffer::shm_nattch(
                                                        shmBuffer_->shmid()) << " (" << count << ")");
                        ::usleep( shutdownTimeout_);
                        if (count * shutdownTimeout_ > 10000000)
                                LOG4CPLUS_WARN(
                                                log_,
                                                "FUResourceTable:LONG Wait (>10s) for all clients to detach,"
                                                                << " nClients=" << shmBuffer_->nClients()
                                                                << " nattch=" << FUShmBuffer::shm_nattch(
                                                                shmBuffer_->shmid()) << " (" << count << ")");

                }
        }
        
        bool allEmpty = false;
        std::cout << "Checking if all dqm cells are empty " << std::endl;
        while (!allEmpty) {
                UInt_t n = nbDqmCells_;
                allEmpty = true;
                shmBuffer_->lock();
                for (UInt_t i = 0; i < n; i++) {
                        // initialize to a value to avoid warnings
                        dqm::State_t state = dqm::EMPTY;
                        try {
                                state = shmBuffer_->dqmState(i);
                        } catch (evf::Exception& e) {
                                rethrowShmBufferException(e,
                                                "FUResourceTable:discardNoReschedule:dqmState");
                        }
                        if (state != dqm::EMPTY)
                                allEmpty = false;
                }
                shmBuffer_->unlock();
        }
        shutdownStatus_|=1<<13;

        std::cout << "Number of  pending discards before declaring ready to shut down: " << nbPendingSMDqmDiscards_ << std::endl;
        if (nbPendingSMDqmDiscards_ != 0) {
                LOG4CPLUS_WARN(
                                log_,
                                "FUResourceTable: pending DQM discards not zero: ="
                                                << nbPendingSMDqmDiscards_
                                                << " while cells are all empty. This may cause problems at next start ");

        }

        try {
                shmBuffer_->writeDqmEmptyEvent();
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e,
                                "FUResourceTable:discardNoReschedule:writeDqmEmptyEvent");
        }

        isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the
        // sendDqm loop has been shut down as well
}
bool FUResourceTable::discardWhileHalting ( bool  sendDiscards) [virtual]

Function called when FSM is in state stopping / halting, in Discard workloop.

Implements evf::IPCMethod.

Definition at line 507 of file FUResourceTable.cc.

References evf::FUShmRawCell::buResourceId(), alignCSCRings::e, evf::evt::EMPTY, evf::FUShmRawCell::fuResourceId(), evf::FUShmRawCell::index(), CommonMethods::lock(), evf::evt::STOP, and evf::evt::USEDLS.

                                                           {
        FUShmRawCell* cell = 0;
        // initialize to a value to avoid warnings
        evt::State_t state = evt::EMPTY;
        try {
                cell = shmBuffer_->rawCellToDiscard();
                state = shmBuffer_->evtState(cell->index());
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e,
                                "FUResourceTable:discardWhileHalting:rawCellToRead/evtState");
        }

        bool reschedule = true;
        bool shutDown = (state == evt::STOP);
        bool isLumi = (state == evt::USEDLS);
        UInt_t fuResourceId = cell->fuResourceId();
        UInt_t buResourceId = cell->buResourceId();

        if (state == evt::EMPTY) {
                LOG4CPLUS_ERROR(log_, "WARNING! ATTEMPTING TO DISCARD EMPTY CELL!!!");
                return true;
        }

        if (shutDown) {
                LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
                if (nbClientsToShutDown_ > 0)
                        --nbClientsToShutDown_;
                if (nbClientsToShutDown_ == 0) {
                        LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
                        isActive_ = false;
                        reschedule = false;
                }
        }

        try {
                shmBuffer_->discardRawCell(cell);
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e,
                                "FUResourceTable:discardWhileHalting:discardRawCell");
        }
        // UPDATED
        if (isLumi)
                nbEolDiscarded_++;

        if (!shutDown && !isLumi) {
                if (fuResourceId >= nbResources()) {
                        LOG4CPLUS_WARN(
                                        log_,
                                        "cell " << cell->index() << " in state " << state
                                                        << " scheduled for discard has no associated FU resource ");
                } else {
                        resources_[fuResourceId]->release(true);
                        lock();
                        freeResourceIds_.push(fuResourceId);
                        assert(freeResourceIds_.size() <= resources_.size());
                        unlock();

                        /*
                         sendDiscard(buResourceId);
                         sendAllocate();
                         */
                        if (sendDiscards)
                                sendDiscard(buResourceId);
                }
        }

        if (!reschedule) {
                discardNoReschedule();
        }

        return reschedule;
}
vector< string > FUResourceTable::dqmCellStates ( ) const [virtual]

Implements evf::IPCMethod.

Definition at line 1294 of file FUResourceTable.cc.

References evf::dqm::DISCARDING, alignCSCRings::e, evf::dqm::EMPTY, i, n, query::result, evf::dqm::SENDING, evf::dqm::SENT, evf::dqm::WRITING, and evf::dqm::WRITTEN.

                                                    {
        vector < string > result;
        if (0 != shmBuffer_) {
                UInt_t n = nbDqmCells_;
                shmBuffer_->lock();
                try {
                        for (UInt_t i = 0; i < n; i++) {
                                dqm::State_t state = shmBuffer_->dqmState(i);
                                if (state == dqm::EMPTY)
                                        result.push_back("EMPTY");
                                else if (state == dqm::WRITING)
                                        result.push_back("WRITING");
                                else if (state == dqm::WRITTEN)
                                        result.push_back("WRITTEN");
                                else if (state == dqm::SENDING)
                                        result.push_back("SENDING");
                                else if (state == dqm::SENT)
                                        result.push_back("SENT");
                                else if (state == dqm::DISCARDING)
                                        result.push_back("DISCARDING");
                        }
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:dqmCellStates:dqmState");
                }
                shmBuffer_->unlock();
        }
        return result;
}
void FUResourceTable::dropEvent ( ) [virtual]

Drop next available event.

Implements evf::IPCMethod.

Definition at line 908 of file FUResourceTable.cc.

References alignCSCRings::e, and evf::FUShmRawCell::fuResourceId().

                                {
        FUShmRawCell* cell = 0;
        try {
                cell = shmBuffer_->rawCellToRead();
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e, "FUResourceTable:dropEvent:rawCellToRead");
        }
        UInt_t fuResourceId = cell->fuResourceId();
        try {
                shmBuffer_->finishReadingRawCell(cell);
                shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e,
                                "FUResourceTable:dropEvent:finishReadingRawCell/scheduleRawCellForDiscard");
        }
}
bool FUResourceTable::handleCrashedEP ( UInt_t  runNumber,
pid_t  pid 
) [virtual]

Send event belonging to crashed process to error stream. Return false if no event is found

Implements evf::IPCMethod.

Definition at line 926 of file FUResourceTable.cc.

References alignCSCRings::e, i, and summarizeEdmComparisonLogfiles::success.

                                                                 {
        bool retval = false;
        vector < pid_t > pids = cellPrcIds();
        UInt_t iRawCell = pids.size();
        for (UInt_t i = 0; i < pids.size(); i++) {
                if (pid == pids[i]) {
                        iRawCell = i;
                        break;
                }
        }

        if (iRawCell < pids.size()) {
                try {
                        bool shmret = shmBuffer_->writeErrorEventData(runNumber, pid, iRawCell, true);
                        if (!shmret)
                                LOG4CPLUS_WARN(log_,"Problem writing to the error stream.");
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:handleCrashedEP:writeErrorEventData");
                }
                retval = true;
        } else
                LOG4CPLUS_WARN(log_,
                                "No raw data to send to error stream for process " << pid);
        try {
                bool success = shmBuffer_->removeClientPrcId(pid);
                if (!success)
                  LOG4CPLUS_WARN(log_,
                                "removeClientPrcId: " << pid << " not in shared memory index, was in raw cell " << iRawCell);
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e,
                                "FUResourceTable:handleCrashedEP:removeClientPrcId");
        }
        return retval;
}
void FUResourceTable::initialize ( bool  segmentationMode,
UInt_t  nbRawCells,
UInt_t  nbRecoCells,
UInt_t  nbDqmCells,
UInt_t  rawCellSize,
UInt_t  recoCellSize,
UInt_t  dqmCellSize 
) throw (evf::Exception)

Initialization of the Resource Table with the required resources

Definition at line 65 of file FUResourceTable.cc.

References hitfit::clear(), evf::FUShmBuffer::createShmBuffer(), i, lumiQueryAPI::msg, and evf::FUResource::release().

                                                                              {
        clear();

        shmBuffer_ = FUShmBuffer::createShmBuffer(segmentationMode, nbRawCells,
                        nbRecoCells, nbDqmCells, rawCellSize, recoCellSize, dqmCellSize);
        if (0 == shmBuffer_) {
                string msg = "CREATION OF SHARED MEMORY SEGMENT FAILED!";
                LOG4CPLUS_FATAL(log_, msg);
                XCEPT_RAISE(evf::Exception, msg);
        }

        for (UInt_t i = 0; i < nbRawCells_; i++) {
                FUResource* newResource = new FUResource(i, log_, frb_, app_);
                newResource->release(true);
                resources_.push_back(newResource);
                freeResourceIds_.push(i);
        }

        acceptSMDataDiscard_ = new bool[nbRecoCells];
        acceptSMDqmDiscard_ = new int[nbDqmCells];

        resetCounters();
        stopFlag_=false;
}
void FUResourceTable::lastResort ( ) [virtual]

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 1381 of file FUResourceTable.cc.

References gather_cfg::cout, alignCSCRings::e, and evf::FUShmRawCell::index().

                                 {
        try {
                ostringstream ost;
                ost << "lastResort: " << shmBuffer_->nbRawCellsToRead()
                                << " more rawcells to read ";
                LOG4CPLUS_WARN(log_,ost.str());
                std::cout << ost.str() << std::endl;

                while (shmBuffer_->nbRawCellsToRead() != 0) {
                        FUShmRawCell* newCell = shmBuffer_->rawCellToRead();
                        std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
                                        << std::endl;
                        // UPDATED
                        LOG4CPLUS_WARN(log_,"lastResort: Scheduling raw cell (server side) "<< newCell->index());
                        shmBuffer_->scheduleRawCellForDiscardServerSide(newCell->index());

                        std::cout << "lastResort: schedule raw cell for discard "
                                        << newCell->index() << std::endl;
                }
                //trigger the shutdown (again?)
                LOG4CPLUS_WARN(log_,"lastResort: scheduling empty raw cell (server side) ");
                shmBuffer_->scheduleRawEmptyCellForDiscard();
                LOG4CPLUS_WARN(log_,"lastResort: Finished. cells remaining: "   << shmBuffer_->nbRawCellsToRead());
        } catch (evf::Exception& e) {
                rethrowShmBufferException(
                                e,
                                "FUResourceTable:lastResort:nbRawCellsToRead/scheduleRawCellForDiscardServerSide");
        }
                LOG4CPLUS_WARN(log_,"Last resort finished ");
}
UInt_t FUResourceTable::nbClients ( ) const [virtual]

Implements evf::IPCMethod.

Definition at line 1199 of file FUResourceTable.cc.

References alignCSCRings::e, and query::result.

                                        {
        UInt_t result(0);
        try {
                if (0 != shmBuffer_)
                        result = shmBuffer_->nClients();
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e, "FUResourceTable:nbClients:nClients");
        }
        return result;
}
unsigned int evf::FUResourceTable::nbResources ( ) [inline]

Definition at line 162 of file FUResourceTable.h.

References evf::IPCMethod::resources_.

                                   {
                return resources_.size();
        }
UInt_t evf::FUResourceTable::nbResources ( ) const [inline, virtual]

Implements evf::IPCMethod.

Definition at line 145 of file FUResourceTable.h.

References evf::IPCMethod::resources_.

                                   {
                return resources_.size();
        }
void FUResourceTable::postEndOfLumiSection ( MemRef_t bufRef) [virtual]

Post End-of-LumiSection to Shared Memory.

Implements evf::IPCMethod.

Definition at line 881 of file FUResourceTable.cc.

References alignCSCRings::e, i, if(), and lumiQueryAPI::msg.

                                                           {
        I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME
                        *msg =
                                        (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *) bufRef->getDataLocation();
        //make sure to fill up the shmem so no process will miss it
        // but processes will have to handle duplicates

        // Check input
        int lumiCheck = (int) msg->lumiSection;
        if (lumiCheck < 0)
                LOG4CPLUS_ERROR(log_,
                                "Received EOL message with invalid index:" << lumiCheck);

        for (unsigned int i = 0; i < nbRawCells_; i++) {
                // UPDATED
                if (stopFlag_) break;
                nbEolPosted_++;
                try {
                        shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:postEndOfLumiSection:writeRawLumiSectionEvent");
                }
        }
}
std::string FUResourceTable::printStatus ( ) [virtual]

Print debugging status.

Reimplemented from evf::IPCMethod.

Definition at line 1430 of file FUResourceTable.cc.

                                       {
        if (shmBuffer_) return shmBuffer_->sem_print_s();
        else return std::string("ShmBuffer not initialized");
}
void FUResourceTable::resetCounters ( ) [virtual]

Reset event & error counters

Implements evf::IPCMethod.

Definition at line 1157 of file FUResourceTable.cc.

References alignCSCRings::e, and i.

                                    {
        if (0 != shmBuffer_) {
                try {
                        for (UInt_t i = 0; i < shmBuffer_->nRecoCells(); i++)
                                acceptSMDataDiscard_[i] = false;
                        for (UInt_t i = 0; i < shmBuffer_->nDqmCells(); i++)
                                acceptSMDqmDiscard_[i] = 0;
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:resetCounters:nRecoCells/nDqmCells");
                }
        }

        // UPDATE: reset pending allocate's
        nbAllocated_ = 0;
        nbPending_ = 0;
        nbCompleted_ = 0;
        nbSent_ = 0;
        nbSentError_ = 0;
        nbSentDqm_ = 0;
        nbPendingSMDiscards_ = 0;
        nbPendingSMDqmDiscards_ = 0;
        nbDiscarded_ = 0;
        nbLost_ = 0;
        // UPDATED
        nbEolPosted_ = 0;
        nbEolDiscarded_ = 0;

        nbErrors_ = 0;
        nbCrcErrors_ = 0;
        nbAllocSent_ = 0;

        sumOfSquares_ = 0;
        sumOfSizes_ = 0;

        //"send" workloop states
        sDqmActive_=true;
        sDataActive_=true;

}
void FUResourceTable::resetIPC ( ) [virtual]

reset the ShmBuffer to the initial state

Implements evf::IPCMethod.

Definition at line 1412 of file FUResourceTable.cc.

References gather_cfg::cout.

                               {
        if (shmBuffer_ != 0) {
                //waiting for sendData and sendDqm workloops to finish
                int countdown_=60;
                while (countdown_-- && (sDataActive_ || sDqmActive_)) ::usleep(50000);
                if (countdown_<=0) {
                  std::ostringstream ostr;
                  ostr << "Resource broker timed out waiting for workloop shutdowns (3 seconds). Continuing to reset Shm. States - "
                       << " sendDqm:"<<sDqmActive_ << " sendData:" << sDataActive_;
                  LOG4CPLUS_ERROR(log_,ostr.str());
                  std::cout << ostr.str() << std::endl;
                }
                //resetting shm buffer
                shmBuffer_->reset(false);
                LOG4CPLUS_INFO(log_, "ShmBuffer was reset!");
        }
}
void FUResourceTable::rethrowShmBufferException ( evf::Exception e,
std::string  where 
) const throw (evf::Exception) [private]

Rethrows an exception from the ShmBuffer including details.

Definition at line 1435 of file FUResourceTable.cc.

References alignCSCRings::e, and i.

                                     {
        stringstream details;
        vector < string > dataStates = cellStates();
        vector < string > dqmStates = dqmCellStates();
        details << "Exception raised: " << e.what() << " (in module: "
                        << e.module() << " in function: " << e.function() << " at line: "
                        << e.line() << ")";
        details << "   Dumping cell state...   ";
        details << "data cells --> ";
        for (unsigned int i = 0; i < dataStates.size(); i++)
                details << dataStates[i] << " ";
        details << "dqm cells --> ";
        for (unsigned int i = 0; i < dqmStates.size(); i++)
                details << dqmStates[i] << " ";
        details << " ... originated in: " << where;
        XCEPT_RETHROW(evf::Exception, details.str(), e);
}
bool FUResourceTable::sendData ( ) [virtual]

Corresponds to the SendData workloop, to be called in normal operation.

Implements evf::IPCMethod.

Definition at line 93 of file FUResourceTable.cc.

References alignCSCRings::e, evf::FUShmRecoCell::eventSize(), evf::FUShmRecoCell::evtNumber(), Exception, evf::FUShmRecoCell::fuGuid(), evf::FUShmRecoCell::fuProcessId(), evf::FUShmRecoCell::index(), CommonMethods::lock(), evf::FUShmRecoCell::nExpectedEPs(), evf::FUShmRecoCell::outModId(), evf::FUShmRecoCell::payloadAddr(), evf::FUShmRecoCell::rawCellIndex(), evf::FUShmRecoCell::runNumber(), sistrip::runNumber_, and evf::FUShmRecoCell::type().

                               {
        bool reschedule = true;
        FUShmRecoCell* cell = 0;
        try {
                cell = shmBuffer_->recoCellToRead();
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e, "FUResourceTable:sendData:recoCellToRead");
        }

        if (0 == cell->eventSize()) {
                LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
                UInt_t cellIndex = cell->index();
                try {
                        shmBuffer_->finishReadingRecoCell(cell);
                        shmBuffer_->discardRecoCell(cellIndex);
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:sendData:finishReadingRecoCell/discardRecoCell");
                }
                shutdownStatus_|=1<<7;
                reschedule = false;
        } else {
                try {
                        if (cell->type() == 0) {
                                UInt_t cellIndex = cell->index();
                                UInt_t cellOutModId = cell->outModId();
                                UInt_t cellFUProcId = cell->fuProcessId();
                                UInt_t cellFUGuid = cell->fuGuid();
                                UChar_t* cellPayloadAddr = cell->payloadAddr();
                                UInt_t cellEventSize = cell->eventSize();
                                UInt_t cellExpectedEPs = cell->nExpectedEPs();
                                try {
                                        shmBuffer_->finishReadingRecoCell(cell);
                                } catch (evf::Exception& e) {
                                        rethrowShmBufferException(e,
                                                        "FUResourceTable:sendData:finishReadingRecoCell");
                                }

                                lock();
                                nbPendingSMDiscards_++;
                                unlock();

                                sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
                                                cellFUGuid, cellPayloadAddr, cellEventSize,
                                                cellExpectedEPs);
                        } else if (cell->type() == 1) {
                                UInt_t cellIndex = cell->index();
                                UInt_t cellRawIndex = cell->rawCellIndex();
                                UInt_t cellRunNumber = cell->runNumber();
                                UInt_t cellEvtNumber = cell->evtNumber();
                                UInt_t cellOutModId = cell->outModId();
                                UInt_t cellFUProcId = cell->fuProcessId();
                                UInt_t cellFUGuid = cell->fuGuid();
                                UChar_t *cellPayloadAddr = cell->payloadAddr();
                                UInt_t cellEventSize = cell->eventSize();
                                try {
                                        shmBuffer_->finishReadingRecoCell(cell);
                                } catch (evf::Exception& e) {
                                        rethrowShmBufferException(e,
                                                        "FUResourceTable:sendData:finishReadingRecoCell");
                                }

                                lock();
                                nbPendingSMDiscards_++;
                                resources_[cellRawIndex]->incNbSent();
                                if (resources_[cellRawIndex]->nbSent() == 1)
                                        nbSent_++;
                                unlock();

                                sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
                                                cellOutModId, cellFUProcId, cellFUGuid,
                                                cellPayloadAddr, cellEventSize);
                        } else if (cell->type() == 2) {
                                UInt_t cellIndex = cell->index();
                                UInt_t cellRawIndex = cell->rawCellIndex();
                                //UInt_t   cellRunNumber   = cell->runNumber();
                                UInt_t cellEvtNumber = cell->evtNumber();
                                UInt_t cellFUProcId = cell->fuProcessId();
                                UInt_t cellFUGuid = cell->fuGuid();
                                UChar_t *cellPayloadAddr = cell->payloadAddr();
                                UInt_t cellEventSize = cell->eventSize();
                                try {
                                        shmBuffer_->finishReadingRecoCell(cell);
                                } catch (evf::Exception& e) {
                                        rethrowShmBufferException(e,
                                                        "FUResourceTable:sendData:recoCellToRead");
                                }

                                lock();
                                nbPendingSMDiscards_++;
                                resources_[cellRawIndex]->incNbSent();
                                if (resources_[cellRawIndex]->nbSent() == 1) {
                                        nbSent_++;
                                        nbSentError_++;
                                }
                                unlock();

                                sendErrorEvent(cellIndex, runNumber_, cellEvtNumber,
                                                cellFUProcId, cellFUGuid, cellPayloadAddr,
                                                cellEventSize);
                        } else {
                                string errmsg =
                                                "Unknown RecoCell type (neither INIT/DATA/ERROR).";
                                XCEPT_RAISE(evf::Exception, errmsg);
                        }
                } catch (xcept::Exception& e) {
                        LOG4CPLUS_FATAL(
                                        log_,
                                        "Failed to send EVENT DATA to StorageManager: "
                                                        << xcept::stdformat_exception_history(e));
                        reschedule = false;
                }
        }

        sDataActive_=reschedule;
        return reschedule;
}
bool FUResourceTable::sendDataWhileHalting ( ) [virtual]

Function called when FSM is in state stopping / halting, in SendData workloop.

Implements evf::IPCMethod.

Definition at line 212 of file FUResourceTable.cc.

References alignCSCRings::e, evf::FUShmRecoCell::eventSize(), and evf::FUShmRecoCell::index().

                                           {
        bool reschedule = true;
        FUShmRecoCell* cell = 0;
        try {
                cell = shmBuffer_->recoCellToRead();
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e,
                                "FUResourceTable:sendDataWhileHalting:recoCellToRead");
        }

        if (0 == cell->eventSize()) {
                LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
                UInt_t cellIndex = cell->index();
                try {
                        shmBuffer_->finishReadingRecoCell(cell);
                        shmBuffer_->discardRecoCell(cellIndex);
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:sendDataWhileHalting:finishReadingRecoCell/discardRecoCell");
                }
                shutdownStatus_|=1<<8;
                reschedule = false;
        } else {
                LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell.");
                UInt_t cellIndex = cell->index();
                try {
                        shmBuffer_->finishReadingRecoCell(cell);
                        shmBuffer_->discardRecoCell(cellIndex);
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:sendDataWhileHalting:finishReadingRecoCell/discardRecoCell");
                }
        }

        sDataActive_=reschedule;
        return reschedule;
}
bool FUResourceTable::sendDqm ( ) [virtual]

Corresponds to the SendDqm workloop, to be called in normal operation.

Implements evf::IPCMethod.

Definition at line 251 of file FUResourceTable.cc.

References gather_cfg::cout, alignCSCRings::e, evf::dqm::EMPTY, evf::evt::EMPTY, evf::FUShmDqmCell::eventSize(), evf::FUShmDqmCell::evtAtUpdate(), Exception, evf::FUShmDqmCell::folderId(), evf::FUShmDqmCell::fuGuid(), evf::FUShmDqmCell::fuProcessId(), evf::FUShmDqmCell::index(), evf::FUShmDqmCell::payloadAddr(), and evf::FUShmDqmCell::runNumber().

                              {
        bool reschedule = true;
        FUShmDqmCell* cell = 0;
        // initialize to a value to avoid warnings
        dqm::State_t state = dqm::EMPTY;
        try {
                cell = shmBuffer_->dqmCellToRead();
                state = shmBuffer_->dqmState(cell->index());
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e,
                                "FUResourceTable:sendDqm:dqmCellToRead/dqmState");
        }

        if (state == dqm::EMPTY) {
                LOG4CPLUS_INFO(log_, "Don't reschedule sendDqm workloop.");
                std::cout << "shut down dqm workloop " << std::endl;
                UInt_t cellIndex = cell->index();
                try {
                        shmBuffer_->finishReadingDqmCell(cell);
                        shmBuffer_->discardDqmCell(cellIndex);
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:sendDqm:finishReadingDqmCell/discardDqmCell");
                }
                shutdownStatus_|=1<<9;
                reschedule = false;
        } else {
                try {
                        UInt_t cellIndex = cell->index();
                        UInt_t cellRunNumber = cell->runNumber();
                        UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
                        UInt_t cellFolderId = cell->folderId();
                        UInt_t cellFUProcId = cell->fuProcessId();
                        UInt_t cellFUGuid = cell->fuGuid();
                        UChar_t *cellPayloadAddr = cell->payloadAddr();
                        UInt_t cellEventSize = cell->eventSize();
                        sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
                                        cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
                                        cellEventSize);
                        try {
                                shmBuffer_->finishReadingDqmCell(cell);
                        } catch (evf::Exception& e) {
                                rethrowShmBufferException(e,
                                                "FUResourceTable:sendDqm:finishReadingDqmCell");
                        }
                } catch (xcept::Exception& e) {
                        LOG4CPLUS_FATAL(
                                        log_,
                                        "Failed to send DQM DATA to StorageManager: "
                                                        << xcept::stdformat_exception_history(e));
                        reschedule = false;
                }
        }

        sDqmActive_=reschedule;
        return reschedule;
}
bool FUResourceTable::sendDqmWhileHalting ( ) [virtual]

Function called when FSM is in state stopping / halting, in SendDqm workloop.

Implements evf::IPCMethod.

Definition at line 310 of file FUResourceTable.cc.

References gather_cfg::cout, alignCSCRings::e, evf::dqm::EMPTY, evf::evt::EMPTY, and evf::FUShmDqmCell::index().

                                          {
        bool reschedule = true;
        FUShmDqmCell* cell = 0;
        // initialize to a value to avoid warnings
        dqm::State_t state = dqm::EMPTY;
        try {
                cell = shmBuffer_->dqmCellToRead();
                state = shmBuffer_->dqmState(cell->index());
        } catch (evf::Exception& e) {
                rethrowShmBufferException(e,
                                "FUResourceTable:sendDqmWhileHalting:dqmCellToRead/dqmState");
        }

        if (state == dqm::EMPTY) {
                LOG4CPLUS_INFO(log_, "Don't reschedule sendDqm workloop.");
                std::cout << "shut down dqm workloop " << std::endl;
                UInt_t cellIndex = cell->index();
                try {
                        shmBuffer_->finishReadingDqmCell(cell);
                        shmBuffer_->discardDqmCell(cellIndex);
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:sendDqmWhileHalting:finishReadingDqmCell/discardDqmCell");
                }
                shutdownStatus_|=1<<10;
                reschedule = false;
        } else {
                UInt_t cellIndex = cell->index();
                try {
                        shmBuffer_->finishReadingDqmCell(cell);
                        shmBuffer_->discardDqmCell(cellIndex);
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:sendDqmWhileHalting:finishReadingDqmCell/discardDqmCell");
                }
        }

        sDqmActive_=reschedule;
        return reschedule;
}
void FUResourceTable::shutDownClients ( ) [virtual]

Send empty events to notify clients to shutdown

Implements evf::IPCMethod.

Definition at line 984 of file FUResourceTable.cc.

References alignCSCRings::e, evf::evt::EMPTY, i, lumiQueryAPI::msg, n, evf::utils::pid, sistrip::runNumber_, shutdownWatchdog(), and ntuplemaker::status.

                                      {
        nbClientsToShutDown_ = nbClients();
        isReadyToShutDown_ = false;

        shutdownStatus_=1;

        //start watchdog thread
        watchDogEnd_=false;
        watchDogSetFailed_=false;
        #ifdef linux
        std::thread watch(&FUResourceTable::shutdownWatchdog,this,20);
        #endif
        if (nbClientsToShutDown_ == 0) {
                shutdownStatus_|=1<<1;
                LOG4CPLUS_INFO(
                                log_,
                                "No clients to shut down. Checking if there are raw cells not assigned to any process yet");
                UInt_t n = nbResources();
                try {
                        for (UInt_t i = 0; i < n; i++) {
                                evt::State_t state = shmBuffer_->evtState(i);
                                if (state != evt::EMPTY) {
                                        LOG4CPLUS_WARN(
                                                        log_,
                                                        "Schedule discard at STOP for orphaned event in state "
                                                                        << state);
                                        shmBuffer_->scheduleRawCellForDiscardServerSide(i);
                                }
                        }
                        shmBuffer_->scheduleRawEmptyCellForDiscard();
                } catch (evf::Exception& e) {
                        rethrowShmBufferException(e,
                                        "FUResourceTable:shutDownClients:evtState/scheduleRawEmptyCellForDiscard");
                }
        } else {
                // UPDATED
                int checks = 0;
                try {
                        while (shmBuffer_->nbRawCellsToWrite() < nbClients() && nbClients()
                                        != 0) {
                                shutdownStatus_|=1<<2;
                                checks++;
                                {
                                        #ifdef linux
                                        auto lk = lockCrashHandlerTimed(10);
                                        #else
                                        bool lk=true;
                                        #endif
                                        if (lk) {
                                                vector < pid_t > prcids = clientPrcIds();
                                                for (UInt_t i = 0; i < prcids.size(); i++) {
                                                        pid_t pid = prcids[i];
                                                        int status = kill(pid, 0);
                                                        if (status != 0) {
                                                                LOG4CPLUS_ERROR(log_,
                                                                                "EP prc " << pid << " completed with error.");
                                                                handleCrashedEP(runNumber_, pid);
                                                        }
                                                }
                                        }
                                        else {
                                                XCEPT_RAISE(evf::Exception, 
                                                        "Timed out access to the Crash Handler in stop. SM discards not arriving?");

                                        }
                                }

                                LOG4CPLUS_WARN(
                                                log_,
                                                "no cell to write stop "
                                                                << shmBuffer_->nbRawCellsToWrite()
                                                                << " nClients " << nbClients());
                                if (checks > 15) {
                                        string msg = "No Raw Cell to Write STOP messages";
                                        XCEPT_RAISE(evf::Exception, msg);
                                }
                                ::usleep(500000);
                        }
                        shutdownStatus_|=1<<3;

                } catch (evf::Exception& e) {
                        watchDogEnd_=true;
                        #ifdef linux
                        watch.join();
                        #endif
                        rethrowShmBufferException(e,
                                        "FUResourceTable:shutDownClients:nbRawCellsToWrite");
                }
                nbClientsToShutDown_ = nbClients();
                if (nbClientsToShutDown_ == 0) {
                        shutdownStatus_|=1<<4;
                        UInt_t n = nbResources();
                        for (UInt_t i = 0; i < n; i++) {
                                // initialize to a value to avoid warnings
                                evt::State_t state = evt::EMPTY;
                                try {
                                        state = shmBuffer_->evtState(i);
                                } catch (evf::Exception& e) {
                                        watchDogEnd_=true;
                                        #ifdef linux
                                        watch.join();
                                        #endif
                                        rethrowShmBufferException(e,
                                                        "FUResourceTable:shutDownClients:evtState");
                                }
                                if (state != evt::EMPTY) {
                                        LOG4CPLUS_WARN(
                                                        log_,
                                                        "Schedule discard at STOP for orphaned event in state "
                                                                        << state);
                                        try {
                                                shmBuffer_->setEvtDiscard(i, 1, true);
                                                shmBuffer_->scheduleRawCellForDiscardServerSide(i);
                                        } catch (evf::Exception& e) {
                                                watchDogEnd_=true;
                                                #ifdef linux
                                                watch.join();
                                                #endif
                                                rethrowShmBufferException(e,
                                                                "FUResourceTable:shutDownClients:scheduleRawCellForDiscardServerSide");
                                        }
                                }
                        }
                        try {
                                shmBuffer_->scheduleRawEmptyCellForDiscard();
                        } catch (evf::Exception& e) {
                                watchDogEnd_=true;
                                #ifdef linux
                                watch.join();
                                #endif
                                rethrowShmBufferException(e,
                                                "FUResourceTable:shutDownClients:scheduleRawEmptyCellForDiscard");
                        }
                }
                UInt_t n = nbClientsToShutDown_;
                shutdownStatus_|=1<<5;
                try {
                        for (UInt_t i = 0; i < n; ++i)
                                shmBuffer_->writeRawEmptyEvent();
                } catch (evf::Exception& e) {
                        watchDogEnd_=true;
                        #ifdef linux
                        watch.join();
                        #endif
                        rethrowShmBufferException(e,
                                        "FUResourceTable:shutDownClients:writeRawEmptyEvent");
                }
                shutdownStatus_|=1<<6;
        }
        watchDogEnd_=true;
        #ifdef linux
        watch.join();
        if (watchDogSetFailed_)
          XCEPT_RAISE(evf::Exception, "Failed (timed out) shutdown of clients");
        #endif
}
void FUResourceTable::shutdownWatchdog ( unsigned int  timeout) [private]

Spawned as a thread watching for shutdown of clients.

Definition at line 963 of file FUResourceTable.cc.

Referenced by shutDownClients().

{
        unsigned int timeoutUs=timeout*1000000+1;
        bool warned=false;
        while (!watchDogEnd_) {

                usleep(50000);
                timeoutUs-=50000;
                if (timeoutUs<=50000) {
                        LOG4CPLUS_ERROR(log_,"Timeout in shutdownClients, status:"<< std::hex << shutdownStatus_);
                        watchDogSetFailed_=true;
                        break;
                }
                if (timeoutUs<=1000000*timeout/2 && !warned) {
                        warned=true;
                        LOG4CPLUS_WARN(log_,"Long shutdown of clients, status:" << std::hex << shutdownStatus_);
                }
        }
}

Member Data Documentation

Definition at line 194 of file FUResourceTable.h.

Definition at line 195 of file FUResourceTable.h.

Definition at line 191 of file FUResourceTable.h.

std::atomic_bool evf::FUResourceTable::watchDogEnd_ [private]

Definition at line 197 of file FUResourceTable.h.

std::atomic_bool evf::FUResourceTable::watchDogSetFailed_ [private]

Definition at line 198 of file FUResourceTable.h.