CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/EventFilter/Utilities/interface/MasterQueue.h

Go to the documentation of this file.
00001 #ifndef EVENTFILTER_UTILITIES_MASTERQUEUE_H
00002 #define EVENTFILTER_UTILITIES_MASTERQUEUE_H
00003 
00004 #include <stdio.h>       /* standard I/O functions.              */
00005 #include <stdlib.h>      /* malloc(), free() etc.                */
00006 #include <sys/types.h>   /* various type definitions.            */
00007 #include <sys/ipc.h>     /* general SysV IPC structures          */
00008 #include <sys/msg.h>     /* message queue functions and structs. */
00009 #include <errno.h>
00010 #include <string.h>
00011 
00012 #include <iostream>
00013 #include <sstream>
00014 
00015 #include "EventFilter/Utilities/interface/Exception.h"
00016 #include "EventFilter/Utilities/interface/MsgBuf.h"
00017 
00018 //@EM ToDo move implementation to .cc file
00019 
00020 namespace evf{
00021 
00022   class MasterQueue{
00023 
00024   public:
00025 
00026     MasterQueue(unsigned int ind) : status_(0)
00027       {
00028         
00029         /* create or attach a public message queue, with read/write access to every user. */
00030         queue_id_ = msgget(QUEUE_ID+ind, IPC_CREAT | 0666); 
00031         if (queue_id_ == -1) {
00032           std::ostringstream ost;
00033           ost << "failed to get message queue:" 
00034               << strerror(errno);
00035           XCEPT_RAISE(evf::Exception, ost.str());
00036         }
00037         // it may be necessary to drain the queue here if it already exists !!! 
00038         drain();
00039       }
00040     ~MasterQueue()
00041       {
00042         if(status_>0) msgctl(queue_id_,IPC_RMID,0);
00043       }
00044 
00045     int post(MsgBuf &ptr)
00046       {
00047         int rc;                  /* error code returned by system calls. */
00048         rc = msgsnd(queue_id_, ptr.ptr_, ptr.msize()+1,0);
00049         if(rc==-1)
00050           std::cout << "snd::Master failed to post message - error:"
00051                     << strerror(errno) << std::endl;
00052         //      delete ptr;
00053         return rc;
00054       }
00055     unsigned long rcv(MsgBuf &ptr)
00056       {
00057         unsigned long msg_type = ptr->mtype;
00058         int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize(), ptr->mtype, 0);
00059         if (rc == -1 && errno != ENOMSG)  
00060           {
00061             std::string serr = "rcv::Master failed to get message from queue - error:";
00062             serr += strerror(errno);
00063             XCEPT_RAISE(evf::Exception, serr);
00064           }
00065         else if(rc == -1 && errno == ENOMSG) return MSGQ_MESSAGE_TYPE_RANGE;
00066         return msg_type;
00067       }
00068     unsigned long rcvNonBlocking(MsgBuf &ptr)
00069       {
00070         unsigned long msg_type = ptr->mtype;
00071         int rc = msgrcv(queue_id_, ptr.ptr_, ptr.msize(), msg_type, IPC_NOWAIT);
00072         if (rc == -1 && errno != ENOMSG)  
00073           {
00074             std::string serr = "rcvnb::Master failed to get message from queue - error:";
00075             serr += strerror(errno);
00076             XCEPT_RAISE(evf::Exception, serr);
00077           }
00078         else if(rc == -1 && errno == ENOMSG) return MSGQ_MESSAGE_TYPE_RANGE;
00079         return msg_type;
00080       }
00081     int disconnect()
00082       {
00083         int ret = msgctl(queue_id_,IPC_RMID,0);
00084         if(ret !=0)
00085           std::cout <<  "disconnect of master queue failed - error:" << strerror(errno) << std::endl;
00086         status_ = -1000;
00087         return ret;
00088       }
00089     int id(){return queue_id_;}
00090     int status()
00091       {
00092         char cbuf[sizeof(struct msqid_ds)];
00093         struct msqid_ds *buf= (struct msqid_ds*)cbuf;
00094         int ret = msgctl(queue_id_,IPC_STAT,buf);       
00095         if(ret!=0) status_ = -1; 
00096         else
00097           {
00098             status_ = 1;
00099             occup_ = buf->msg_qnum;
00100             pidOfLastSend_ = buf->msg_lspid;
00101             pidOfLastReceive_ = buf->msg_lrpid;
00102             //      std::cout << "queue " << buf->msg_qnum << " " 
00103             //        << buf->msg_lspid << " " 
00104             //        << buf->msg_lrpid << std::endl;
00105           }
00106         return status_;
00107       }
00108     int occupancy()const{return occup_;}
00109     void drain(){
00110       status();
00111       if(occup_>0)
00112         std::cout << "message queue id " << queue_id_ << " contains " << occup_ << "leftover messages, going to drain " 
00113                   << std::endl;
00114       //drain the queue before using it
00115       MsgBuf msg;
00116       while(occup_>0)
00117         {
00118           msgrcv(queue_id_, msg.ptr_, msg.msize(), 0, 0);
00119           status();
00120           std::cout << "drained one message, occupancy now " << occup_ << std::endl;
00121         }
00122     }
00123     pid_t pidOfLastSend()const{return pidOfLastSend_;}
00124     pid_t pidOfLastReceive()const{return pidOfLastReceive_;}
00125   private:
00126 
00127     int queue_id_;             /* ID of the created queue.            */
00128     int status_;
00129     int occup_;
00130     int pidOfLastSend_;
00131     int pidOfLastReceive_;
00132   };
00133 }
00134 #endif