CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Member Functions | Private Attributes
evf::MasterQueue Class Reference

#include <MasterQueue.h>

Public Member Functions

int disconnect ()
 
void drain ()
 
std::vector< int > getReceivers () const
 
int id ()
 
 MasterQueue (unsigned int ind)
 
int occupancy () const
 
pid_t pidOfLastReceive () const
 
pid_t pidOfLastSend () const
 
int post (MsgBuf &ptr)
 
int postLength (MsgBuf &ptr, unsigned int length)
 
unsigned long rcv (MsgBuf &ptr)
 
unsigned long rcvNonBlocking (MsgBuf &ptr)
 
bool rcvQuiet (MsgBuf &ptr)
 
int status ()
 
 ~MasterQueue ()
 

Private Member Functions

void updateReceivers ()
 

Private Attributes

int occup_
 
int pidOfLastReceive_
 
int pidOfLastSend_
 
int queue_id_
 
std::vector< int > receivers_
 
int status_
 

Detailed Description

Definition at line 21 of file MasterQueue.h.

Constructor & Destructor Documentation

MasterQueue::MasterQueue ( unsigned int  ind)

Definition at line 7 of file MasterQueue.cc.

References drain(), QUEUE_ID, and queue_id_.

7  :
9 
10  /* create or attach a public message queue, with read/write access to every user. */
11  queue_id_ = msgget(QUEUE_ID + ind, IPC_CREAT | 0666);
12  if (queue_id_ == -1) {
13  std::ostringstream ost;
14  ost << "failed to get message queue:" << strerror(errno);
15  XCEPT_RAISE(evf::Exception, ost.str());
16  }
17  // it may be necessary to drain the queue here if it already exists !!!
18  drain();
19 }
#define QUEUE_ID
Definition: queue_defs.h:9
MasterQueue::~MasterQueue ( )

Definition at line 21 of file MasterQueue.cc.

References queue_id_, and status_.

21  {
22  if (status_ > 0)
23  msgctl(queue_id_, IPC_RMID, 0);
24 }

Member Function Documentation

int MasterQueue::disconnect ( void  )
void MasterQueue::drain ( )

Definition at line 127 of file MasterQueue.cc.

References gather_cfg::cout, lumiQueryAPI::msg, evf::MsgBuf::msize(), occup_, evf::MsgBuf::ptr_, queue_id_, and status().

Referenced by MasterQueue().

127  {
128  status();
129  if (occup_ > 0)
130  std::cout << "message queue id " << queue_id_ << " contains " << occup_
131  << "leftover messages, going to drain " << std::endl;
132  //drain the queue before using it
133  MsgBuf msg;
134  while (occup_ > 0) {
135  msgrcv(queue_id_, msg.ptr_, msg.msize() + 1, 0, 0);
136  status();
137  std::cout << "drained one message, occupancy now " << occup_
138  << std::endl;
139  }
140 }
size_t msize()
Definition: MsgBuf.cc:33
struct msgbuf * ptr_
Definition: MsgBuf.h:24
tuple cout
Definition: gather_cfg.py:121
std::vector<int> evf::MasterQueue::getReceivers ( ) const
inline

Definition at line 41 of file MasterQueue.h.

References receivers_.

41 { return receivers_; }
std::vector< int > receivers_
Definition: MasterQueue.h:54
int MasterQueue::id ( void  )

Definition at line 101 of file MasterQueue.cc.

References queue_id_.

101  {
102  return queue_id_;
103 }
int MasterQueue::occupancy ( ) const

Definition at line 123 of file MasterQueue.cc.

References occup_.

123  {
124  return occup_;
125 }
pid_t MasterQueue::pidOfLastReceive ( ) const

Definition at line 146 of file MasterQueue.cc.

References pidOfLastReceive_.

146  {
147  return pidOfLastReceive_;
148 }
pid_t MasterQueue::pidOfLastSend ( ) const

Definition at line 142 of file MasterQueue.cc.

References pidOfLastSend_.

142  {
143  return pidOfLastSend_;
144 }
int MasterQueue::post ( MsgBuf ptr)

Definition at line 26 of file MasterQueue.cc.

References gather_cfg::cout, evf::MsgBuf::msize(), evf::MsgBuf::ptr_, and queue_id_.

Referenced by evf::Vulture::start(), and evf::Vulture::stop().

26  {
27  int rc; /* error code returned by system calls. */
28  rc = msgsnd(queue_id_, ptr.ptr_, ptr.msize() + 1, 0);
29  if (rc == -1)
30  std::cout << "snd::Master failed to post message - error:" << strerror(
31  errno) << std::endl;
32  // delete ptr;
33  return rc;
34 }
size_t msize()
Definition: MsgBuf.cc:33
struct msgbuf * ptr_
Definition: MsgBuf.h:24
tuple cout
Definition: gather_cfg.py:121
int MasterQueue::postLength ( MsgBuf ptr,
unsigned int  length 
)

Definition at line 36 of file MasterQueue.cc.

References gather_cfg::cout, evf::MsgBuf::ptr_, and queue_id_.

Referenced by evf::FUResourceQueue::buildResource().

36  {
37  int rc; /* error code returned by system calls. */
38  rc = msgsnd(queue_id_, ptr.ptr_, length, 0);
39  if (rc == -1)
40  std::cout << "snd::Master failed to post message - error:" << strerror(
41  errno) << std::endl;
42  // delete ptr;
43  return rc;
44 }
struct msgbuf * ptr_
Definition: MsgBuf.h:24
tuple cout
Definition: gather_cfg.py:121
unsigned long MasterQueue::rcv ( MsgBuf ptr)

Definition at line 58 of file MasterQueue.cc.

References MSGQ_MESSAGE_TYPE_RANGE, evf::MsgBuf::msize(), evf::MsgBuf::ptr_, and queue_id_.

58  {
59  unsigned long msg_type = ptr->mtype;
60  int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, ptr->mtype, 0);
61  if (rc == -1 && errno != ENOMSG) {
62  std::string serr =
63  "rcv::Master failed to get message from queue - error:";
64  serr += strerror(errno);
65  XCEPT_RAISE(evf::Exception, serr);
66  } else if (rc == -1 && errno == ENOMSG)
68 
69  //updateReceivers();
70 
71  return msg_type;
72 }
size_t msize()
Definition: MsgBuf.cc:33
#define MSGQ_MESSAGE_TYPE_RANGE
Definition: queue_defs.h:13
struct msgbuf * ptr_
Definition: MsgBuf.h:24
unsigned long MasterQueue::rcvNonBlocking ( MsgBuf ptr)

Definition at line 82 of file MasterQueue.cc.

References MSGQ_MESSAGE_TYPE_RANGE, evf::MsgBuf::msize(), evf::MsgBuf::ptr_, and queue_id_.

Referenced by evf::Vulture::hasStarted(), and evf::Vulture::hasStopped().

82  {
83  unsigned long msg_type = ptr->mtype;
84  int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, msg_type, IPC_NOWAIT);
85  if (rc == -1 && errno != ENOMSG) {
86  std::string serr =
87  "rcvnb::Master failed to get message from queue - error:";
88  serr += strerror(errno);
89  XCEPT_RAISE(evf::Exception, serr);
90  } else if (rc == -1 && errno == ENOMSG)
92  return msg_type;
93 }
size_t msize()
Definition: MsgBuf.cc:33
#define MSGQ_MESSAGE_TYPE_RANGE
Definition: queue_defs.h:13
struct msgbuf * ptr_
Definition: MsgBuf.h:24
bool MasterQueue::rcvQuiet ( MsgBuf ptr)

Definition at line 74 of file MasterQueue.cc.

References evf::MsgBuf::msize(), evf::MsgBuf::ptr_, and queue_id_.

Referenced by evf::FUResourceQueue::discard(), evf::FUResourceQueue::discardWhileHalting(), evf::FUResourceQueue::sendData(), evf::FUResourceQueue::sendDataWhileHalting(), evf::FUResourceQueue::sendDqm(), and evf::FUResourceQueue::sendDqmWhileHalting().

74  {
75  int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize() + 1, ptr->mtype, 0);
76  if (rc == -1 && errno != ENOMSG) {
77  return false;
78  }
79  return true;
80 }
size_t msize()
Definition: MsgBuf.cc:33
struct msgbuf * ptr_
Definition: MsgBuf.h:24
int MasterQueue::status ( void  )

Definition at line 105 of file MasterQueue.cc.

References occup_, pidOfLastReceive_, pidOfLastSend_, queue_id_, run_regression::ret, and status_.

Referenced by drain(), and updateReceivers().

105  {
106  char cbuf[sizeof(struct msqid_ds)];
107  struct msqid_ds *buf = (struct msqid_ds*) cbuf;
108  int ret = msgctl(queue_id_, IPC_STAT, buf);
109  if (ret != 0)
110  status_ = -1;
111  else {
112  status_ = 1;
113  occup_ = buf->msg_qnum;
114  pidOfLastSend_ = buf->msg_lspid;
115  pidOfLastReceive_ = buf->msg_lrpid;
116  // std::cout << "queue " << buf->msg_qnum << " "
117  // << buf->msg_lspid << " "
118  // << buf->msg_lrpid << std::endl;
119  }
120  return status_;
121 }
void MasterQueue::updateReceivers ( )
private

Definition at line 150 of file MasterQueue.cc.

References i, pidOfLastReceive_, receivers_, and status().

150  {
151  //update status
152  status();
153  int lastReceiver = pidOfLastReceive_;
154  if (lastReceiver == 0)
155  return;
156  for (unsigned int i = 0; i < receivers_.size(); ++i)
157  if (receivers_[i] == lastReceiver)
158  return;
159  receivers_.push_back(lastReceiver);
160 }
int i
Definition: DBlmapReader.cc:9
std::vector< int > receivers_
Definition: MasterQueue.h:54

Member Data Documentation

int evf::MasterQueue::occup_
private

Definition at line 51 of file MasterQueue.h.

Referenced by drain(), occupancy(), and status().

int evf::MasterQueue::pidOfLastReceive_
private

Definition at line 53 of file MasterQueue.h.

Referenced by pidOfLastReceive(), status(), and updateReceivers().

int evf::MasterQueue::pidOfLastSend_
private

Definition at line 52 of file MasterQueue.h.

Referenced by pidOfLastSend(), and status().

int evf::MasterQueue::queue_id_
private
std::vector<int> evf::MasterQueue::receivers_
private

Definition at line 54 of file MasterQueue.h.

Referenced by getReceivers(), and updateReceivers().

int evf::MasterQueue::status_
private

Definition at line 50 of file MasterQueue.h.

Referenced by disconnect(), status(), and ~MasterQueue().