Go to the documentation of this file.00001 #ifndef EVENTFILTER_UTILITIES_MASTERQUEUE_H
00002 #define EVENTFILTER_UTILITIES_MASTERQUEUE_H
00003
00004 #include <stdio.h>
00005 #include <stdlib.h>
00006 #include <sys/types.h>
00007 #include <sys/ipc.h>
00008 #include <sys/msg.h>
00009 #include <errno.h>
00010 #include <string.h>
00011
00012 #include <iostream>
00013 #include <sstream>
00014
00015 #include "EventFilter/Utilities/interface/Exception.h"
00016 #include "EventFilter/Utilities/interface/MsgBuf.h"
00017
00018
00019
00020 namespace evf{
00021
00022 class MasterQueue{
00023
00024 public:
00025
00026 MasterQueue(unsigned int ind) : status_(0)
00027 {
00028
00029
00030 queue_id_ = msgget(QUEUE_ID+ind, IPC_CREAT | 0666);
00031 if (queue_id_ == -1) {
00032 std::ostringstream ost;
00033 ost << "failed to get message queue:"
00034 << strerror(errno);
00035 XCEPT_RAISE(evf::Exception, ost.str());
00036 }
00037
00038 drain();
00039 }
00040 ~MasterQueue()
00041 {
00042 if(status_>0) msgctl(queue_id_,IPC_RMID,0);
00043 }
00044
00045 int post(MsgBuf &ptr)
00046 {
00047 int rc;
00048 rc = msgsnd(queue_id_, ptr.ptr_, ptr.msize()+1,0);
00049 if(rc==-1)
00050 std::cout << "snd::Master failed to post message - error:"
00051 << strerror(errno) << std::endl;
00052
00053 return rc;
00054 }
00055 unsigned long rcv(MsgBuf &ptr)
00056 {
00057 unsigned long msg_type = ptr->mtype;
00058 int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize()+1, ptr->mtype, 0);
00059 if (rc == -1 && errno != ENOMSG)
00060 {
00061 std::string serr = "rcv::Master failed to get message from queue - error:";
00062 serr += strerror(errno);
00063 XCEPT_RAISE(evf::Exception, serr);
00064 }
00065 else if(rc == -1 && errno == ENOMSG) return MSGQ_MESSAGE_TYPE_RANGE;
00066 return msg_type;
00067 }
00068 unsigned long rcvNonBlocking(MsgBuf &ptr)
00069 {
00070 unsigned long msg_type = ptr->mtype;
00071 int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize()+1, msg_type, IPC_NOWAIT);
00072 if (rc == -1 && errno != ENOMSG)
00073 {
00074 std::string serr = "rcvnb::Master failed to get message from queue - error:";
00075 serr += strerror(errno);
00076 XCEPT_RAISE(evf::Exception, serr);
00077 }
00078 else if(rc == -1 && errno == ENOMSG) return MSGQ_MESSAGE_TYPE_RANGE;
00079 return msg_type;
00080 }
00081 int disconnect()
00082 {
00083 int ret = msgctl(queue_id_,IPC_RMID,0);
00084 status_ = -1000;
00085 return ret;
00086 }
00087 int id(){return queue_id_;}
00088 int status()
00089 {
00090 char cbuf[sizeof(struct msqid_ds)];
00091 struct msqid_ds *buf= (struct msqid_ds*)cbuf;
00092 int ret = msgctl(queue_id_,IPC_STAT,buf);
00093 if(ret!=0) status_ = -1;
00094 else
00095 {
00096 status_ = 1;
00097 occup_ = buf->msg_qnum;
00098 pidOfLastSend_ = buf->msg_lspid;
00099 pidOfLastReceive_ = buf->msg_lrpid;
00100
00101
00102
00103 }
00104 return status_;
00105 }
00106 int occupancy()const{return occup_;}
00107 void drain(){
00108 status();
00109 if(occup_>0)
00110 std::cout << "message queue id " << queue_id_ << " contains " << occup_ << "leftover messages, going to drain "
00111 << std::endl;
00112
00113 MsgBuf msg;
00114 while(occup_>0)
00115 {
00116 msgrcv(queue_id_, msg.ptr_, msg.msize()+1, 0, 0);
00117 status();
00118 std::cout << "drained one message, occupancy now " << occup_ << std::endl;
00119 }
00120 }
00121 pid_t pidOfLastSend()const{return pidOfLastSend_;}
00122 pid_t pidOfLastReceive()const{return pidOfLastReceive_;}
00123 private:
00124
00125 int queue_id_;
00126 int status_;
00127 int occup_;
00128 int pidOfLastSend_;
00129 int pidOfLastReceive_;
00130 };
00131 }
00132 #endif