Go to the documentation of this file.00001 #include "EventFilter/Utilities/interface/MasterQueue.h"
00002
00003 #include <iostream>
00004
00005 using namespace evf;
00006
00007 MasterQueue::MasterQueue(unsigned int ind) :
00008 status_(0), occup_(0), pidOfLastSend_(0), pidOfLastReceive_(0) {
00009
00010
00011 queue_id_ = msgget(QUEUE_ID + ind, IPC_CREAT | 0666);
00012 if (queue_id_ == -1) {
00013 std::ostringstream ost;
00014 ost << "failed to get message queue:" << strerror(errno);
00015 XCEPT_RAISE(evf::Exception, ost.str());
00016 }
00017
00018 drain();
00019 }
00020
00021 MasterQueue::~MasterQueue() {
00022 if (status_ > 0)
00023 msgctl(queue_id_, IPC_RMID, 0);
00024 }
00025
00026 int MasterQueue::post(MsgBuf &ptr) {
00027 int rc;
00028 rc = msgsnd(queue_id_, ptr.ptr_, ptr.msize() + 1, 0);
00029 if (rc == -1)
00030 std::cout << "snd::Master failed to post message - error:" << strerror(
00031 errno) << std::endl;
00032
00033 return rc;
00034 }
00035
00036 int MasterQueue::postLength(MsgBuf &ptr, unsigned int length) {
00037 int rc;
00038 rc = msgsnd(queue_id_, ptr.ptr_, length, 0);
00039 if (rc == -1)
00040 std::cout << "snd::Master failed to post message - error:" << strerror(
00041 errno) << std::endl;
00042
00043 return rc;
00044 }
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058 unsigned long MasterQueue::rcv(MsgBuf &ptr) {
00059 unsigned long msg_type = ptr->mtype;
00060 int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, ptr->mtype, 0);
00061 if (rc == -1 && errno != ENOMSG) {
00062 std::string serr =
00063 "rcv::Master failed to get message from queue - error:";
00064 serr += strerror(errno);
00065 XCEPT_RAISE(evf::Exception, serr);
00066 } else if (rc == -1 && errno == ENOMSG)
00067 return MSGQ_MESSAGE_TYPE_RANGE;
00068
00069
00070
00071 return msg_type;
00072 }
00073
00074 bool MasterQueue::rcvQuiet(MsgBuf &ptr) {
00075 int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, ptr->mtype, 0);
00076 if (rc == -1 && errno != ENOMSG) {
00077 return false;
00078 }
00079 return true;
00080 }
00081
00082 unsigned long MasterQueue::rcvNonBlocking(MsgBuf &ptr) {
00083 unsigned long msg_type = ptr->mtype;
00084 int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, msg_type, IPC_NOWAIT);
00085 if (rc == -1 && errno != ENOMSG) {
00086 std::string serr =
00087 "rcvnb::Master failed to get message from queue - error:";
00088 serr += strerror(errno);
00089 XCEPT_RAISE(evf::Exception, serr);
00090 } else if (rc == -1 && errno == ENOMSG)
00091 return MSGQ_MESSAGE_TYPE_RANGE;
00092 return msg_type;
00093 }
00094
00095 int MasterQueue::disconnect() {
00096 int ret = msgctl(queue_id_, IPC_RMID, 0);
00097 status_ = -1000;
00098 return ret;
00099 }
00100
00101 int MasterQueue::id() {
00102 return queue_id_;
00103 }
00104
00105 int MasterQueue::status() {
00106 char cbuf[sizeof(struct msqid_ds)];
00107 struct msqid_ds *buf = (struct msqid_ds*) cbuf;
00108 int ret = msgctl(queue_id_, IPC_STAT, buf);
00109 if (ret != 0)
00110 status_ = -1;
00111 else {
00112 status_ = 1;
00113 occup_ = buf->msg_qnum;
00114 pidOfLastSend_ = buf->msg_lspid;
00115 pidOfLastReceive_ = buf->msg_lrpid;
00116
00117
00118
00119 }
00120 return status_;
00121 }
00122
00123 int MasterQueue::occupancy() const {
00124 return occup_;
00125 }
00126
00127 void MasterQueue::drain() {
00128 status();
00129 if (occup_ > 0)
00130 std::cout << "message queue id " << queue_id_ << " contains " << occup_
00131 << "leftover messages, going to drain " << std::endl;
00132
00133 MsgBuf msg;
00134 while (occup_ > 0) {
00135 msgrcv(queue_id_, msg.ptr_, msg.msize() + 1, 0, 0);
00136 status();
00137 std::cout << "drained one message, occupancy now " << occup_
00138 << std::endl;
00139 }
00140 }
00141
00142 pid_t MasterQueue::pidOfLastSend() const {
00143 return pidOfLastSend_;
00144 }
00145
00146 pid_t MasterQueue::pidOfLastReceive() const {
00147 return pidOfLastReceive_;
00148 }
00149
00150 void MasterQueue::updateReceivers() {
00151
00152 status();
00153 int lastReceiver = pidOfLastReceive_;
00154 if (lastReceiver == 0)
00155 return;
00156 for (unsigned int i = 0; i < receivers_.size(); ++i)
00157 if (receivers_[i] == lastReceiver)
00158 return;
00159 receivers_.push_back(lastReceiver);
00160 }