CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch13/src/EventFilter/Utilities/interface/MasterQueue.h

Go to the documentation of this file.
00001 #ifndef EVENTFILTER_UTILITIES_MASTERQUEUE_H
00002 #define EVENTFILTER_UTILITIES_MASTERQUEUE_H
00003 
00004 #include <stdio.h>       /* standard I/O functions.              */
00005 #include <stdlib.h>      /* malloc(), free() etc.                */
00006 #include <sys/types.h>   /* various type definitions.            */
00007 #include <sys/ipc.h>     /* general SysV IPC structures          */
00008 #include <sys/msg.h>     /* message queue functions and structs. */
00009 #include <errno.h>
00010 #include <string.h>
00011 
00012 #include <iostream>
00013 #include <sstream>
00014 
00015 #include "EventFilter/Utilities/interface/Exception.h"
00016 #include "EventFilter/Utilities/interface/MsgBuf.h"
00017 
00018 //@EM ToDo move implementation to .cc file
00019 
00020 namespace evf{
00021 
00022   class MasterQueue{
00023 
00024   public:
00025 
00026     MasterQueue(unsigned int ind) : status_(0)
00027       {
00028         
00029         /* create or attach a public message queue, with read/write access to every user. */
00030         queue_id_ = msgget(QUEUE_ID+ind, IPC_CREAT | 0666); 
00031         if (queue_id_ == -1) {
00032           std::ostringstream ost;
00033           ost << "failed to get message queue:" 
00034               << strerror(errno);
00035           XCEPT_RAISE(evf::Exception, ost.str());
00036         }
00037         // it may be necessary to drain the queue here if it already exists !!! 
00038         drain();
00039       }
00040     ~MasterQueue()
00041       {
00042         if(status_>0) msgctl(queue_id_,IPC_RMID,0);
00043       }
00044 
00045     int post(MsgBuf &ptr)
00046       {
00047         int rc;                  /* error code returned by system calls. */
00048         rc = msgsnd(queue_id_, ptr.ptr_, ptr.msize()+1,0);
00049         if(rc==-1)
00050           std::cout << "snd::Master failed to post message - error:"
00051                     << strerror(errno) << std::endl;
00052         //      delete ptr;
00053         return rc;
00054       }
00055     unsigned long rcv(MsgBuf &ptr)
00056       {
00057         unsigned long msg_type = ptr->mtype;
00058         int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize()+1, ptr->mtype, 0);
00059         if (rc == -1 && errno != ENOMSG)  
00060           {
00061             std::string serr = "rcv::Master failed to get message from queue - error:";
00062             serr += strerror(errno);
00063             XCEPT_RAISE(evf::Exception, serr);
00064           }
00065         else if(rc == -1 && errno == ENOMSG) return MSGQ_MESSAGE_TYPE_RANGE;
00066         return msg_type;
00067       }
00068     unsigned long rcvNonBlocking(MsgBuf &ptr)
00069       {
00070         unsigned long msg_type = ptr->mtype;
00071         int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize()+1, msg_type, IPC_NOWAIT);
00072         if (rc == -1 && errno != ENOMSG)  
00073           {
00074             std::string serr = "rcvnb::Master failed to get message from queue - error:";
00075             serr += strerror(errno);
00076             XCEPT_RAISE(evf::Exception, serr);
00077           }
00078         else if(rc == -1 && errno == ENOMSG) return MSGQ_MESSAGE_TYPE_RANGE;
00079         return msg_type;
00080       }
00081     int disconnect()
00082       {
00083         int ret = msgctl(queue_id_,IPC_RMID,0);
00084         status_ = -1000;
00085         return ret;
00086       }
00087     int id(){return queue_id_;}
00088     int status()
00089       {
00090         char cbuf[sizeof(struct msqid_ds)];
00091         struct msqid_ds *buf= (struct msqid_ds*)cbuf;
00092         int ret = msgctl(queue_id_,IPC_STAT,buf);       
00093         if(ret!=0) status_ = -1; 
00094         else
00095           {
00096             status_ = 1;
00097             occup_ = buf->msg_qnum;
00098             pidOfLastSend_ = buf->msg_lspid;
00099             pidOfLastReceive_ = buf->msg_lrpid;
00100             //      std::cout << "queue " << buf->msg_qnum << " " 
00101             //        << buf->msg_lspid << " " 
00102             //        << buf->msg_lrpid << std::endl;
00103           }
00104         return status_;
00105       }
00106     int occupancy()const{return occup_;}
00107     void drain(){
00108       status();
00109       if(occup_>0)
00110         std::cout << "message queue id " << queue_id_ << " contains " << occup_ << "leftover messages, going to drain " 
00111                   << std::endl;
00112       //drain the queue before using it
00113       MsgBuf msg;
00114       while(occup_>0)
00115         {
00116           msgrcv(queue_id_, msg.ptr_, msg.msize()+1, 0, 0);
00117           status();
00118           std::cout << "drained one message, occupancy now " << occup_ << std::endl;
00119         }
00120     }
00121     pid_t pidOfLastSend()const{return pidOfLastSend_;}
00122     pid_t pidOfLastReceive()const{return pidOfLastReceive_;}
00123   private:
00124 
00125     int queue_id_;             /* ID of the created queue.            */
00126     int status_;
00127     int occup_;
00128     int pidOfLastSend_;
00129     int pidOfLastReceive_;
00130   };
00131 }
00132 #endif