CMS 3D CMS Logo

Public Member Functions | Private Member Functions | Private Attributes

evf::MasterQueue Class Reference

#include <MasterQueue.h>

List of all members.

Public Member Functions

int disconnect ()
void drain ()
std::vector< int > getReceivers () const
int id ()
 MasterQueue (unsigned int ind)
int occupancy () const
pid_t pidOfLastReceive () const
pid_t pidOfLastSend () const
int post (MsgBuf &ptr)
int postLength (MsgBuf &ptr, unsigned int length)
unsigned long rcv (MsgBuf &ptr)
unsigned long rcvNonBlocking (MsgBuf &ptr)
bool rcvQuiet (MsgBuf &ptr)
int status ()
 ~MasterQueue ()

Private Member Functions

void updateReceivers ()

Private Attributes

int occup_
int pidOfLastReceive_
int pidOfLastSend_
int queue_id_
std::vector< int > receivers_
int status_

Detailed Description

Definition at line 21 of file MasterQueue.h.


Constructor & Destructor Documentation

MasterQueue::MasterQueue ( unsigned int  ind)

Definition at line 7 of file MasterQueue.cc.

References drain(), QUEUE_ID, and queue_id_.

                                         :
        status_(0), occup_(0), pidOfLastSend_(0), pidOfLastReceive_(0) {

        /* create or attach a public message queue, with read/write access to every user. */
        queue_id_ = msgget(QUEUE_ID + ind, IPC_CREAT | 0666);
        if (queue_id_ == -1) {
                std::ostringstream ost;
                ost << "failed to get message queue:" << strerror(errno);
                XCEPT_RAISE(evf::Exception, ost.str());
        }
        // it may be necessary to drain the queue here if it already exists !!!
        drain();
}
MasterQueue::~MasterQueue ( )

Definition at line 21 of file MasterQueue.cc.

References queue_id_, and status_.

                          {
        if (status_ > 0)
                msgctl(queue_id_, IPC_RMID, 0);
}

Member Function Documentation

int MasterQueue::disconnect ( void  )

Definition at line 95 of file MasterQueue.cc.

References queue_id_, run_regression::ret, and status_.

Referenced by evf::FUResourceQueue::~FUResourceQueue().

                            {
        int ret = msgctl(queue_id_, IPC_RMID, 0);
        status_ = -1000;
        return ret;
}
void MasterQueue::drain ( )

Definition at line 127 of file MasterQueue.cc.

References gather_cfg::cout, lumiQueryAPI::msg, evf::MsgBuf::msize(), occup_, evf::MsgBuf::ptr_, queue_id_, and status().

Referenced by MasterQueue().

                        {
        status();
        if (occup_ > 0)
                std::cout << "message queue id " << queue_id_ << " contains " << occup_
                                << "leftover messages, going to drain " << std::endl;
        //drain the queue before using it
        MsgBuf msg;
        while (occup_ > 0) {
                msgrcv(queue_id_, msg.ptr_, msg.msize() + 1, 0, 0);
                status();
                std::cout << "drained one message, occupancy now " << occup_
                                << std::endl;
        }
}
std::vector<int> evf::MasterQueue::getReceivers ( ) const [inline]

Definition at line 41 of file MasterQueue.h.

References receivers_.

{ return receivers_; }
int MasterQueue::id ( void  )

Definition at line 101 of file MasterQueue.cc.

References queue_id_.

                    {
        return queue_id_;
}
int MasterQueue::occupancy ( ) const

Definition at line 123 of file MasterQueue.cc.

References occup_.

                                 {
        return occup_;
}
pid_t MasterQueue::pidOfLastReceive ( ) const

Definition at line 146 of file MasterQueue.cc.

References pidOfLastReceive_.

                                          {
        return pidOfLastReceive_;
}
pid_t MasterQueue::pidOfLastSend ( ) const

Definition at line 142 of file MasterQueue.cc.

References pidOfLastSend_.

                                       {
        return pidOfLastSend_;
}
int MasterQueue::post ( MsgBuf ptr)

Definition at line 26 of file MasterQueue.cc.

References gather_cfg::cout, evf::MsgBuf::msize(), evf::MsgBuf::ptr_, and queue_id_.

Referenced by evf::Vulture::start(), and evf::Vulture::stop().

                                 {
        int rc; /* error code returned by system calls. */
        rc = msgsnd(queue_id_, ptr.ptr_, ptr.msize() + 1, 0);
        if (rc == -1)
                std::cout << "snd::Master failed to post message - error:" << strerror(
                                errno) << std::endl;
        //      delete ptr;
        return rc;
}
int MasterQueue::postLength ( MsgBuf ptr,
unsigned int  length 
)

Definition at line 36 of file MasterQueue.cc.

References gather_cfg::cout, evf::MsgBuf::ptr_, and queue_id_.

Referenced by evf::FUResourceQueue::buildResource().

                                                            {
        int rc; /* error code returned by system calls. */
        rc = msgsnd(queue_id_, ptr.ptr_, length, 0);
        if (rc == -1)
                std::cout << "snd::Master failed to post message - error:" << strerror(
                                errno) << std::endl;
        //      delete ptr;
        return rc;
}
unsigned long MasterQueue::rcv ( MsgBuf ptr)

Definition at line 58 of file MasterQueue.cc.

References MSGQ_MESSAGE_TYPE_RANGE, evf::MsgBuf::msize(), evf::MsgBuf::ptr_, queue_id_, and AlCaHLTBitMon_QueryRunRegistry::string.

                                          {
        unsigned long msg_type = ptr->mtype;
        int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, ptr->mtype, 0);
        if (rc == -1 && errno != ENOMSG) {
                std::string serr =
                                "rcv::Master failed to get message from queue - error:";
                serr += strerror(errno);
                XCEPT_RAISE(evf::Exception, serr);
        } else if (rc == -1 && errno == ENOMSG)
                return MSGQ_MESSAGE_TYPE_RANGE;

        //updateReceivers();

        return msg_type;
}
unsigned long MasterQueue::rcvNonBlocking ( MsgBuf ptr)

Definition at line 82 of file MasterQueue.cc.

References MSGQ_MESSAGE_TYPE_RANGE, evf::MsgBuf::msize(), evf::MsgBuf::ptr_, queue_id_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by evf::Vulture::hasStarted(), and evf::Vulture::hasStopped().

                                                     {
        unsigned long msg_type = ptr->mtype;
        int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, msg_type, IPC_NOWAIT);
        if (rc == -1 && errno != ENOMSG) {
                std::string serr =
                                "rcvnb::Master failed to get message from queue - error:";
                serr += strerror(errno);
                XCEPT_RAISE(evf::Exception, serr);
        } else if (rc == -1 && errno == ENOMSG)
                return MSGQ_MESSAGE_TYPE_RANGE;
        return msg_type;
}
bool MasterQueue::rcvQuiet ( MsgBuf ptr)
int MasterQueue::status ( void  )

Definition at line 105 of file MasterQueue.cc.

References occup_, pidOfLastReceive_, pidOfLastSend_, queue_id_, run_regression::ret, and status_.

Referenced by drain(), and updateReceivers().

                        {
        char cbuf[sizeof(struct msqid_ds)];
        struct msqid_ds *buf = (struct msqid_ds*) cbuf;
        int ret = msgctl(queue_id_, IPC_STAT, buf);
        if (ret != 0)
                status_ = -1;
        else {
                status_ = 1;
                occup_ = buf->msg_qnum;
                pidOfLastSend_ = buf->msg_lspid;
                pidOfLastReceive_ = buf->msg_lrpid;
                //          std::cout << "queue " << buf->msg_qnum << " "
                //            << buf->msg_lspid << " "
                //            << buf->msg_lrpid << std::endl;
        }
        return status_;
}
void MasterQueue::updateReceivers ( ) [private]

Definition at line 150 of file MasterQueue.cc.

References i, pidOfLastReceive_, receivers_, and status().

                                  {
        //update status
        status();
        int lastReceiver = pidOfLastReceive_;
        if (lastReceiver == 0)
                return;
        for (unsigned int i = 0; i < receivers_.size(); ++i)
                if (receivers_[i] == lastReceiver)
                        return;
        receivers_.push_back(lastReceiver);
}

Member Data Documentation

int evf::MasterQueue::occup_ [private]

Definition at line 51 of file MasterQueue.h.

Referenced by drain(), occupancy(), and status().

Definition at line 53 of file MasterQueue.h.

Referenced by pidOfLastReceive(), status(), and updateReceivers().

Definition at line 52 of file MasterQueue.h.

Referenced by pidOfLastSend(), and status().

std::vector<int> evf::MasterQueue::receivers_ [private]

Definition at line 54 of file MasterQueue.h.

Referenced by getReceivers(), and updateReceivers().

Definition at line 50 of file MasterQueue.h.

Referenced by disconnect(), status(), and ~MasterQueue().