CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
MasterQueue.h
Go to the documentation of this file.
1 #ifndef EVENTFILTER_UTILITIES_MASTERQUEUE_H
2 #define EVENTFILTER_UTILITIES_MASTERQUEUE_H
3 
4 #include <stdio.h> /* standard I/O functions. */
5 #include <stdlib.h> /* malloc(), free() etc. */
6 #include <sys/types.h> /* various type definitions. */
7 #include <sys/ipc.h> /* general SysV IPC structures */
8 #include <sys/msg.h> /* message queue functions and structs. */
9 #include <errno.h>
10 #include <string.h>
11 
12 #include <iostream>
13 #include <sstream>
14 
17 
18 //@EM ToDo move implementation to .cc file
19 
20 namespace evf{
21 
22  class MasterQueue{
23 
24  public:
25 
26  MasterQueue(unsigned int ind) : status_(0)
27  {
28 
29  /* create or attach a public message queue, with read/write access to every user. */
30  queue_id_ = msgget(QUEUE_ID+ind, IPC_CREAT | 0666);
31  if (queue_id_ == -1) {
32  std::ostringstream ost;
33  ost << "failed to get message queue:"
34  << strerror(errno);
35  XCEPT_RAISE(evf::Exception, ost.str());
36  }
37  // it may be necessary to drain the queue here if it already exists !!!
38  drain();
39  }
41  {
42  if(status_>0) msgctl(queue_id_,IPC_RMID,0);
43  }
44 
45  int post(MsgBuf &ptr)
46  {
47  int rc; /* error code returned by system calls. */
48  rc = msgsnd(queue_id_, ptr.ptr_, ptr.msize()+1,0);
49  if(rc==-1)
50  std::cout << "snd::Master failed to post message - error:"
51  << strerror(errno) << std::endl;
52  // delete ptr;
53  return rc;
54  }
55  unsigned long rcv(MsgBuf &ptr)
56  {
57  unsigned long msg_type = ptr->mtype;
58  int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize(), ptr->mtype, 0);
59  if (rc == -1 && errno != ENOMSG)
60  {
61  std::string serr = "rcv::Master failed to get message from queue - error:";
62  serr += strerror(errno);
63  XCEPT_RAISE(evf::Exception, serr);
64  }
65  else if(rc == -1 && errno == ENOMSG) return MSGQ_MESSAGE_TYPE_RANGE;
66  return msg_type;
67  }
68  unsigned long rcvNonBlocking(MsgBuf &ptr)
69  {
70  unsigned long msg_type = ptr->mtype;
71  int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize(), msg_type, IPC_NOWAIT);
72  if (rc == -1 && errno != ENOMSG)
73  {
74  std::string serr = "rcvnb::Master failed to get message from queue - error:";
75  serr += strerror(errno);
76  XCEPT_RAISE(evf::Exception, serr);
77  }
78  else if(rc == -1 && errno == ENOMSG) return MSGQ_MESSAGE_TYPE_RANGE;
79  return msg_type;
80  }
81  int disconnect()
82  {
83  int ret = msgctl(queue_id_,IPC_RMID,0);
84  if(ret !=0)
85  std::cout << "disconnect of master queue failed - error:" << strerror(errno) << std::endl;
86  status_ = -1000;
87  return ret;
88  }
89  int id(){return queue_id_;}
90  int status()
91  {
92  char cbuf[sizeof(struct msqid_ds)];
93  struct msqid_ds *buf= (struct msqid_ds*)cbuf;
94  int ret = msgctl(queue_id_,IPC_STAT,buf);
95  if(ret!=0) status_ = -1;
96  else
97  {
98  status_ = 1;
99  occup_ = buf->msg_qnum;
100  pidOfLastSend_ = buf->msg_lspid;
101  pidOfLastReceive_ = buf->msg_lrpid;
102  // std::cout << "queue " << buf->msg_qnum << " "
103  // << buf->msg_lspid << " "
104  // << buf->msg_lrpid << std::endl;
105  }
106  return status_;
107  }
108  int occupancy()const{return occup_;}
109  void drain(){
110  status();
111  if(occup_>0)
112  std::cout << "message queue id " << queue_id_ << " contains " << occup_ << "leftover messages, going to drain "
113  << std::endl;
114  //drain the queue before using it
115  MsgBuf msg;
116  while(occup_>0)
117  {
118  msgrcv(queue_id_, msg.ptr_, msg.msize(), 0, 0);
119  status();
120  std::cout << "drained one message, occupancy now " << occup_ << std::endl;
121  }
122  }
123  pid_t pidOfLastSend()const{return pidOfLastSend_;}
124  pid_t pidOfLastReceive()const{return pidOfLastReceive_;}
125  private:
126 
127  int queue_id_; /* ID of the created queue. */
128  int status_;
129  int occup_;
132  };
133 }
134 #endif
size_t msize()
Definition: MsgBuf.cc:34
#define MSGQ_MESSAGE_TYPE_RANGE
Definition: queue_defs.h:13
int occupancy() const
Definition: MasterQueue.h:108
struct msgbuf * ptr_
Definition: MsgBuf.h:22
pid_t pidOfLastReceive() const
Definition: MasterQueue.h:124
unsigned long rcvNonBlocking(MsgBuf &ptr)
Definition: MasterQueue.h:68
unsigned long rcv(MsgBuf &ptr)
Definition: MasterQueue.h:55
#define QUEUE_ID
Definition: queue_defs.h:9
pid_t pidOfLastSend() const
Definition: MasterQueue.h:123
MasterQueue(unsigned int ind)
Definition: MasterQueue.h:26
tuple cout
Definition: gather_cfg.py:41
int post(MsgBuf &ptr)
Definition: MasterQueue.h:45