Go to the documentation of this file.00001 #ifndef EVENTFILTER_UTILITIES_MASTERQUEUE_H
00002 #define EVENTFILTER_UTILITIES_MASTERQUEUE_H
00003
00004 #include <stdio.h>
00005 #include <stdlib.h>
00006 #include <sys/types.h>
00007 #include <sys/ipc.h>
00008 #include <sys/msg.h>
00009 #include <errno.h>
00010 #include <string.h>
00011
00012 #include <iostream>
00013 #include <sstream>
00014
00015 #include "EventFilter/Utilities/interface/Exception.h"
00016 #include "EventFilter/Utilities/interface/MsgBuf.h"
00017
00018
00019
00020 namespace evf{
00021
00022 class MasterQueue{
00023
00024 public:
00025
00026 MasterQueue(unsigned int ind) : status_(0)
00027 {
00028
00029
00030 queue_id_ = msgget(QUEUE_ID+ind, IPC_CREAT | 0666);
00031 if (queue_id_ == -1) {
00032 std::ostringstream ost;
00033 ost << "failed to get message queue:"
00034 << strerror(errno);
00035 XCEPT_RAISE(evf::Exception, ost.str());
00036 }
00037
00038 drain();
00039 }
00040 ~MasterQueue()
00041 {
00042 if(status_>0) msgctl(queue_id_,IPC_RMID,0);
00043 }
00044
00045 int post(MsgBuf &ptr)
00046 {
00047 int rc;
00048 rc = msgsnd(queue_id_, ptr.ptr_, ptr.msize()+1,0);
00049 if(rc==-1)
00050 std::cout << "snd::Master failed to post message - error:"
00051 << strerror(errno) << std::endl;
00052
00053 return rc;
00054 }
00055 unsigned long rcv(MsgBuf &ptr)
00056 {
00057 unsigned long msg_type = ptr->mtype;
00058 int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize(), ptr->mtype, 0);
00059 if (rc == -1 && errno != ENOMSG)
00060 {
00061 std::string serr = "rcv::Master failed to get message from queue - error:";
00062 serr += strerror(errno);
00063 XCEPT_RAISE(evf::Exception, serr);
00064 }
00065 else if(rc == -1 && errno == ENOMSG) return MSGQ_MESSAGE_TYPE_RANGE;
00066 return msg_type;
00067 }
00068 unsigned long rcvNonBlocking(MsgBuf &ptr)
00069 {
00070 unsigned long msg_type = ptr->mtype;
00071 int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize(), msg_type, IPC_NOWAIT);
00072 if (rc == -1 && errno != ENOMSG)
00073 {
00074 std::string serr = "rcvnb::Master failed to get message from queue - error:";
00075 serr += strerror(errno);
00076 XCEPT_RAISE(evf::Exception, serr);
00077 }
00078 else if(rc == -1 && errno == ENOMSG) return MSGQ_MESSAGE_TYPE_RANGE;
00079 return msg_type;
00080 }
00081 int disconnect()
00082 {
00083 int ret = msgctl(queue_id_,IPC_RMID,0);
00084 if(ret !=0)
00085 std::cout << "disconnect of master queue failed - error:" << strerror(errno) << std::endl;
00086 status_ = -1000;
00087 return ret;
00088 }
00089 int id(){return queue_id_;}
00090 int status()
00091 {
00092 char cbuf[sizeof(struct msqid_ds)];
00093 struct msqid_ds *buf= (struct msqid_ds*)cbuf;
00094 int ret = msgctl(queue_id_,IPC_STAT,buf);
00095 if(ret!=0) status_ = -1;
00096 else
00097 {
00098 status_ = 1;
00099 occup_ = buf->msg_qnum;
00100 pidOfLastSend_ = buf->msg_lspid;
00101 pidOfLastReceive_ = buf->msg_lrpid;
00102
00103
00104
00105 }
00106 return status_;
00107 }
00108 int occupancy()const{return occup_;}
00109 void drain(){
00110 status();
00111 if(occup_>0)
00112 std::cout << "message queue id " << queue_id_ << " contains " << occup_ << "leftover messages, going to drain "
00113 << std::endl;
00114
00115 MsgBuf msg;
00116 while(occup_>0)
00117 {
00118 msgrcv(queue_id_, msg.ptr_, msg.msize(), 0, 0);
00119 status();
00120 std::cout << "drained one message, occupancy now " << occup_ << std::endl;
00121 }
00122 }
00123 pid_t pidOfLastSend()const{return pidOfLastSend_;}
00124 pid_t pidOfLastReceive()const{return pidOfLastReceive_;}
00125 private:
00126
00127 int queue_id_;
00128 int status_;
00129 int occup_;
00130 int pidOfLastSend_;
00131 int pidOfLastReceive_;
00132 };
00133 }
00134 #endif