CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_14/src/EventFilter/Utilities/src/MasterQueue.cc

Go to the documentation of this file.
00001 #include "EventFilter/Utilities/interface/MasterQueue.h"
00002 //todo remove
00003 #include <iostream>
00004 
00005 using namespace evf;
00006 
00007 MasterQueue::MasterQueue(unsigned int ind) :
00008         status_(0), occup_(0), pidOfLastSend_(0), pidOfLastReceive_(0) {
00009 
00010         /* create or attach a public message queue, with read/write access to every user. */
00011         queue_id_ = msgget(QUEUE_ID + ind, IPC_CREAT | 0666);
00012         if (queue_id_ == -1) {
00013                 std::ostringstream ost;
00014                 ost << "failed to get message queue:" << strerror(errno);
00015                 XCEPT_RAISE(evf::Exception, ost.str());
00016         }
00017         // it may be necessary to drain the queue here if it already exists !!!
00018         drain();
00019 }
00020 
00021 MasterQueue::~MasterQueue() {
00022         if (status_ > 0)
00023                 msgctl(queue_id_, IPC_RMID, 0);
00024 }
00025 
00026 int MasterQueue::post(MsgBuf &ptr) {
00027         int rc; /* error code returned by system calls. */
00028         rc = msgsnd(queue_id_, ptr.ptr_, ptr.msize() + 1, 0);
00029         if (rc == -1)
00030                 std::cout << "snd::Master failed to post message - error:" << strerror(
00031                                 errno) << std::endl;
00032         //      delete ptr;
00033         return rc;
00034 }
00035 
00036 int MasterQueue::postLength(MsgBuf &ptr, unsigned int length) {
00037         int rc; /* error code returned by system calls. */
00038         rc = msgsnd(queue_id_, ptr.ptr_, length, 0);
00039         if (rc == -1)
00040                 std::cout << "snd::Master failed to post message - error:" << strerror(
00041                                 errno) << std::endl;
00042         //      delete ptr;
00043         return rc;
00044 }
00045 
00046 /*
00047  int MasterQueue::postOnlyUsefulData(SimpleMsgBuf &ptr) {
00048  int rc;
00049  rc = msgsnd(queue_id_, ptr.ptr_, ptr.usedSize_ , 0);
00050  if (rc == -1)
00051  std::cout << "snd::Master failed to post message - error:" << strerror(
00052  errno) << std::endl;
00053  //     delete ptr;
00054  return rc;
00055  }
00056  */
00057 
00058 unsigned long MasterQueue::rcv(MsgBuf &ptr) {
00059         unsigned long msg_type = ptr->mtype;
00060         int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, ptr->mtype, 0);
00061         if (rc == -1 && errno != ENOMSG) {
00062                 std::string serr =
00063                                 "rcv::Master failed to get message from queue - error:";
00064                 serr += strerror(errno);
00065                 XCEPT_RAISE(evf::Exception, serr);
00066         } else if (rc == -1 && errno == ENOMSG)
00067                 return MSGQ_MESSAGE_TYPE_RANGE;
00068 
00069         //updateReceivers();
00070 
00071         return msg_type;
00072 }
00073 
00074 bool MasterQueue::rcvQuiet(MsgBuf &ptr) {
00075         int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, ptr->mtype, 0);
00076         if (rc == -1 && errno != ENOMSG) {
00077                 return false;
00078         }
00079         return true;
00080 }
00081 
00082 unsigned long MasterQueue::rcvNonBlocking(MsgBuf &ptr) {
00083         unsigned long msg_type = ptr->mtype;
00084         int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, msg_type, IPC_NOWAIT);
00085         if (rc == -1 && errno != ENOMSG) {
00086                 std::string serr =
00087                                 "rcvnb::Master failed to get message from queue - error:";
00088                 serr += strerror(errno);
00089                 XCEPT_RAISE(evf::Exception, serr);
00090         } else if (rc == -1 && errno == ENOMSG)
00091                 return MSGQ_MESSAGE_TYPE_RANGE;
00092         return msg_type;
00093 }
00094 
00095 int MasterQueue::disconnect() {
00096         int ret = msgctl(queue_id_, IPC_RMID, 0);
00097         status_ = -1000;
00098         return ret;
00099 }
00100 
00101 int MasterQueue::id() {
00102         return queue_id_;
00103 }
00104 
00105 int MasterQueue::status() {
00106         char cbuf[sizeof(struct msqid_ds)];
00107         struct msqid_ds *buf = (struct msqid_ds*) cbuf;
00108         int ret = msgctl(queue_id_, IPC_STAT, buf);
00109         if (ret != 0)
00110                 status_ = -1;
00111         else {
00112                 status_ = 1;
00113                 occup_ = buf->msg_qnum;
00114                 pidOfLastSend_ = buf->msg_lspid;
00115                 pidOfLastReceive_ = buf->msg_lrpid;
00116                 //          std::cout << "queue " << buf->msg_qnum << " "
00117                 //            << buf->msg_lspid << " "
00118                 //            << buf->msg_lrpid << std::endl;
00119         }
00120         return status_;
00121 }
00122 
00123 int MasterQueue::occupancy() const {
00124         return occup_;
00125 }
00126 
00127 void MasterQueue::drain() {
00128         status();
00129         if (occup_ > 0)
00130                 std::cout << "message queue id " << queue_id_ << " contains " << occup_
00131                                 << "leftover messages, going to drain " << std::endl;
00132         //drain the queue before using it
00133         MsgBuf msg;
00134         while (occup_ > 0) {
00135                 msgrcv(queue_id_, msg.ptr_, msg.msize() + 1, 0, 0);
00136                 status();
00137                 std::cout << "drained one message, occupancy now " << occup_
00138                                 << std::endl;
00139         }
00140 }
00141 
00142 pid_t MasterQueue::pidOfLastSend() const {
00143         return pidOfLastSend_;
00144 }
00145 
00146 pid_t MasterQueue::pidOfLastReceive() const {
00147         return pidOfLastReceive_;
00148 }
00149 
00150 void MasterQueue::updateReceivers() {
00151         //update status
00152         status();
00153         int lastReceiver = pidOfLastReceive_;
00154         if (lastReceiver == 0)
00155                 return;
00156         for (unsigned int i = 0; i < receivers_.size(); ++i)
00157                 if (receivers_[i] == lastReceiver)
00158                         return;
00159         receivers_.push_back(lastReceiver);
00160 }