CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
MasterQueue.cc
Go to the documentation of this file.
2 //todo remove
3 #include <iostream>
4 
5 using namespace evf;
6 
7 MasterQueue::MasterQueue(unsigned int ind) :
8  status_(0), occup_(0), pidOfLastSend_(0), pidOfLastReceive_(0) {
9 
10  /* create or attach a public message queue, with read/write access to every user. */
11  queue_id_ = msgget(QUEUE_ID + ind, IPC_CREAT | 0666);
12  if (queue_id_ == -1) {
13  std::ostringstream ost;
14  ost << "failed to get message queue:" << strerror(errno);
15  XCEPT_RAISE(evf::Exception, ost.str());
16  }
17  // it may be necessary to drain the queue here if it already exists !!!
18  drain();
19 }
20 
22  if (status_ > 0)
23  msgctl(queue_id_, IPC_RMID, 0);
24 }
25 
27  int rc; /* error code returned by system calls. */
28  rc = msgsnd(queue_id_, ptr.ptr_, ptr.msize() + 1, 0);
29  if (rc == -1)
30  std::cout << "snd::Master failed to post message - error:" << strerror(
31  errno) << std::endl;
32  // delete ptr;
33  return rc;
34 }
35 
36 int MasterQueue::postLength(MsgBuf &ptr, unsigned int length) {
37  int rc; /* error code returned by system calls. */
38  rc = msgsnd(queue_id_, ptr.ptr_, length, 0);
39  if (rc == -1)
40  std::cout << "snd::Master failed to post message - error:" << strerror(
41  errno) << std::endl;
42  // delete ptr;
43  return rc;
44 }
45 
46 /*
47  int MasterQueue::postOnlyUsefulData(SimpleMsgBuf &ptr) {
48  int rc;
49  rc = msgsnd(queue_id_, ptr.ptr_, ptr.usedSize_ , 0);
50  if (rc == -1)
51  std::cout << "snd::Master failed to post message - error:" << strerror(
52  errno) << std::endl;
53  // delete ptr;
54  return rc;
55  }
56  */
57 
58 unsigned long MasterQueue::rcv(MsgBuf &ptr) {
59  unsigned long msg_type = ptr->mtype;
60  int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, ptr->mtype, 0);
61  if (rc == -1 && errno != ENOMSG) {
62  std::string serr =
63  "rcv::Master failed to get message from queue - error:";
64  serr += strerror(errno);
65  XCEPT_RAISE(evf::Exception, serr);
66  } else if (rc == -1 && errno == ENOMSG)
68 
69  //updateReceivers();
70 
71  return msg_type;
72 }
73 
75  int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, ptr->mtype, 0);
76  if (rc == -1 && errno != ENOMSG) {
77  return false;
78  }
79  return true;
80 }
81 
82 unsigned long MasterQueue::rcvNonBlocking(MsgBuf &ptr) {
83  unsigned long msg_type = ptr->mtype;
84  int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, msg_type, IPC_NOWAIT);
85  if (rc == -1 && errno != ENOMSG) {
86  std::string serr =
87  "rcvnb::Master failed to get message from queue - error:";
88  serr += strerror(errno);
89  XCEPT_RAISE(evf::Exception, serr);
90  } else if (rc == -1 && errno == ENOMSG)
92  return msg_type;
93 }
94 
96  int ret = msgctl(queue_id_, IPC_RMID, 0);
97  status_ = -1000;
98  return ret;
99 }
100 
102  return queue_id_;
103 }
104 
106  char cbuf[sizeof(struct msqid_ds)];
107  struct msqid_ds *buf = (struct msqid_ds*) cbuf;
108  int ret = msgctl(queue_id_, IPC_STAT, buf);
109  if (ret != 0)
110  status_ = -1;
111  else {
112  status_ = 1;
113  occup_ = buf->msg_qnum;
114  pidOfLastSend_ = buf->msg_lspid;
115  pidOfLastReceive_ = buf->msg_lrpid;
116  // std::cout << "queue " << buf->msg_qnum << " "
117  // << buf->msg_lspid << " "
118  // << buf->msg_lrpid << std::endl;
119  }
120  return status_;
121 }
122 
124  return occup_;
125 }
126 
128  status();
129  if (occup_ > 0)
130  std::cout << "message queue id " << queue_id_ << " contains " << occup_
131  << "leftover messages, going to drain " << std::endl;
132  //drain the queue before using it
133  MsgBuf msg;
134  while (occup_ > 0) {
135  msgrcv(queue_id_, msg.ptr_, msg.msize() + 1, 0, 0);
136  status();
137  std::cout << "drained one message, occupancy now " << occup_
138  << std::endl;
139  }
140 }
141 
143  return pidOfLastSend_;
144 }
145 
147  return pidOfLastReceive_;
148 }
149 
151  //update status
152  status();
153  int lastReceiver = pidOfLastReceive_;
154  if (lastReceiver == 0)
155  return;
156  for (unsigned int i = 0; i < receivers_.size(); ++i)
157  if (receivers_[i] == lastReceiver)
158  return;
159  receivers_.push_back(lastReceiver);
160 }
int postLength(MsgBuf &ptr, unsigned int length)
Definition: MasterQueue.cc:36
size_t msize()
Definition: MsgBuf.cc:33
int i
Definition: DBlmapReader.cc:9
pid_t pidOfLastSend() const
Definition: MasterQueue.cc:142
#define MSGQ_MESSAGE_TYPE_RANGE
Definition: queue_defs.h:13
struct msgbuf * ptr_
Definition: MsgBuf.h:24
MasterQueue(unsigned int ind)
Definition: MasterQueue.cc:7
#define QUEUE_ID
Definition: queue_defs.h:9
unsigned long rcv(MsgBuf &ptr)
Definition: MasterQueue.cc:58
unsigned long rcvNonBlocking(MsgBuf &ptr)
Definition: MasterQueue.cc:82
int occupancy() const
Definition: MasterQueue.cc:123
pid_t pidOfLastReceive() const
Definition: MasterQueue.cc:146
bool rcvQuiet(MsgBuf &ptr)
Definition: MasterQueue.cc:74
tuple cout
Definition: gather_cfg.py:121
int post(MsgBuf &ptr)
Definition: MasterQueue.cc:26
std::vector< int > receivers_
Definition: MasterQueue.h:54
void updateReceivers()
Definition: MasterQueue.cc:150