#include <MasterQueue.h>
Public Member Functions | |
int | disconnect () |
void | drain () |
std::vector< int > | getReceivers () const |
int | id () |
MasterQueue (unsigned int ind) | |
int | occupancy () const |
pid_t | pidOfLastReceive () const |
pid_t | pidOfLastSend () const |
int | post (MsgBuf &ptr) |
int | postLength (MsgBuf &ptr, unsigned int length) |
unsigned long | rcv (MsgBuf &ptr) |
unsigned long | rcvNonBlocking (MsgBuf &ptr) |
bool | rcvQuiet (MsgBuf &ptr) |
int | status () |
~MasterQueue () | |
Private Member Functions | |
void | updateReceivers () |
Private Attributes | |
int | occup_ |
int | pidOfLastReceive_ |
int | pidOfLastSend_ |
int | queue_id_ |
std::vector< int > | receivers_ |
int | status_ |
Definition at line 21 of file MasterQueue.h.
MasterQueue::MasterQueue | ( | unsigned int | ind | ) |
Definition at line 7 of file MasterQueue.cc.
References drain(), QUEUE_ID, and queue_id_.
: status_(0), occup_(0), pidOfLastSend_(0), pidOfLastReceive_(0) { /* create or attach a public message queue, with read/write access to every user. */ queue_id_ = msgget(QUEUE_ID + ind, IPC_CREAT | 0666); if (queue_id_ == -1) { std::ostringstream ost; ost << "failed to get message queue:" << strerror(errno); XCEPT_RAISE(evf::Exception, ost.str()); } // it may be necessary to drain the queue here if it already exists !!! drain(); }
MasterQueue::~MasterQueue | ( | ) |
int MasterQueue::disconnect | ( | void | ) |
Definition at line 95 of file MasterQueue.cc.
References queue_id_, run_regression::ret, and status_.
Referenced by evf::FUResourceQueue::~FUResourceQueue().
void MasterQueue::drain | ( | ) |
Definition at line 127 of file MasterQueue.cc.
References gather_cfg::cout, lumiQueryAPI::msg, evf::MsgBuf::msize(), occup_, evf::MsgBuf::ptr_, queue_id_, and status().
Referenced by MasterQueue().
{ status(); if (occup_ > 0) std::cout << "message queue id " << queue_id_ << " contains " << occup_ << "leftover messages, going to drain " << std::endl; //drain the queue before using it MsgBuf msg; while (occup_ > 0) { msgrcv(queue_id_, msg.ptr_, msg.msize() + 1, 0, 0); status(); std::cout << "drained one message, occupancy now " << occup_ << std::endl; } }
std::vector<int> evf::MasterQueue::getReceivers | ( | ) | const [inline] |
int MasterQueue::id | ( | void | ) |
int MasterQueue::occupancy | ( | ) | const |
pid_t MasterQueue::pidOfLastReceive | ( | ) | const |
Definition at line 146 of file MasterQueue.cc.
References pidOfLastReceive_.
{ return pidOfLastReceive_; }
pid_t MasterQueue::pidOfLastSend | ( | ) | const |
Definition at line 142 of file MasterQueue.cc.
References pidOfLastSend_.
{ return pidOfLastSend_; }
int MasterQueue::post | ( | MsgBuf & | ptr | ) |
Definition at line 26 of file MasterQueue.cc.
References gather_cfg::cout, evf::MsgBuf::msize(), evf::MsgBuf::ptr_, and queue_id_.
Referenced by evf::Vulture::start(), and evf::Vulture::stop().
int MasterQueue::postLength | ( | MsgBuf & | ptr, |
unsigned int | length | ||
) |
Definition at line 36 of file MasterQueue.cc.
References gather_cfg::cout, evf::MsgBuf::ptr_, and queue_id_.
Referenced by evf::FUResourceQueue::buildResource().
unsigned long MasterQueue::rcv | ( | MsgBuf & | ptr | ) |
Definition at line 58 of file MasterQueue.cc.
References MSGQ_MESSAGE_TYPE_RANGE, evf::MsgBuf::msize(), evf::MsgBuf::ptr_, queue_id_, and AlCaHLTBitMon_QueryRunRegistry::string.
{ unsigned long msg_type = ptr->mtype; int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, ptr->mtype, 0); if (rc == -1 && errno != ENOMSG) { std::string serr = "rcv::Master failed to get message from queue - error:"; serr += strerror(errno); XCEPT_RAISE(evf::Exception, serr); } else if (rc == -1 && errno == ENOMSG) return MSGQ_MESSAGE_TYPE_RANGE; //updateReceivers(); return msg_type; }
unsigned long MasterQueue::rcvNonBlocking | ( | MsgBuf & | ptr | ) |
Definition at line 82 of file MasterQueue.cc.
References MSGQ_MESSAGE_TYPE_RANGE, evf::MsgBuf::msize(), evf::MsgBuf::ptr_, queue_id_, and AlCaHLTBitMon_QueryRunRegistry::string.
Referenced by evf::Vulture::hasStarted(), and evf::Vulture::hasStopped().
{ unsigned long msg_type = ptr->mtype; int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, msg_type, IPC_NOWAIT); if (rc == -1 && errno != ENOMSG) { std::string serr = "rcvnb::Master failed to get message from queue - error:"; serr += strerror(errno); XCEPT_RAISE(evf::Exception, serr); } else if (rc == -1 && errno == ENOMSG) return MSGQ_MESSAGE_TYPE_RANGE; return msg_type; }
bool MasterQueue::rcvQuiet | ( | MsgBuf & | ptr | ) |
Definition at line 74 of file MasterQueue.cc.
References evf::MsgBuf::msize(), evf::MsgBuf::ptr_, and queue_id_.
Referenced by evf::FUResourceQueue::discard(), evf::FUResourceQueue::discardWhileHalting(), evf::FUResourceQueue::sendData(), evf::FUResourceQueue::sendDataWhileHalting(), evf::FUResourceQueue::sendDqm(), and evf::FUResourceQueue::sendDqmWhileHalting().
int MasterQueue::status | ( | void | ) |
Definition at line 105 of file MasterQueue.cc.
References occup_, pidOfLastReceive_, pidOfLastSend_, queue_id_, run_regression::ret, and status_.
Referenced by drain(), and updateReceivers().
{ char cbuf[sizeof(struct msqid_ds)]; struct msqid_ds *buf = (struct msqid_ds*) cbuf; int ret = msgctl(queue_id_, IPC_STAT, buf); if (ret != 0) status_ = -1; else { status_ = 1; occup_ = buf->msg_qnum; pidOfLastSend_ = buf->msg_lspid; pidOfLastReceive_ = buf->msg_lrpid; // std::cout << "queue " << buf->msg_qnum << " " // << buf->msg_lspid << " " // << buf->msg_lrpid << std::endl; } return status_; }
void MasterQueue::updateReceivers | ( | ) | [private] |
Definition at line 150 of file MasterQueue.cc.
References i, pidOfLastReceive_, receivers_, and status().
{ //update status status(); int lastReceiver = pidOfLastReceive_; if (lastReceiver == 0) return; for (unsigned int i = 0; i < receivers_.size(); ++i) if (receivers_[i] == lastReceiver) return; receivers_.push_back(lastReceiver); }
int evf::MasterQueue::occup_ [private] |
Definition at line 51 of file MasterQueue.h.
Referenced by drain(), occupancy(), and status().
int evf::MasterQueue::pidOfLastReceive_ [private] |
Definition at line 53 of file MasterQueue.h.
Referenced by pidOfLastReceive(), status(), and updateReceivers().
int evf::MasterQueue::pidOfLastSend_ [private] |
Definition at line 52 of file MasterQueue.h.
Referenced by pidOfLastSend(), and status().
int evf::MasterQueue::queue_id_ [private] |
Definition at line 49 of file MasterQueue.h.
Referenced by disconnect(), drain(), id(), MasterQueue(), post(), postLength(), rcv(), rcvNonBlocking(), rcvQuiet(), status(), and ~MasterQueue().
std::vector<int> evf::MasterQueue::receivers_ [private] |
Definition at line 54 of file MasterQueue.h.
Referenced by getReceivers(), and updateReceivers().
int evf::MasterQueue::status_ [private] |
Definition at line 50 of file MasterQueue.h.
Referenced by disconnect(), status(), and ~MasterQueue().