CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
SubProcess.h
Go to the documentation of this file.
1 #ifndef EVENTFILTER_PROCESSOR_SUB_PROCESS_H
2 #define EVENTFILTER_PROCESSOR_SUB_PROCESS_H
3 
7 #include <string>
8 
9 #include <iostream>
10 #include <boost/shared_ptr.hpp>
11 
12 // subprocess states: -1000 never started -1: crashed 0: exited successfully 1: running
13 // @@EM ToDo: replace magic numbers with enum
14 
15 namespace evf{
16 
17  class SubProcess{
18  public:
20  : ind_(100000)
21  , pid_(-1)
22  , alive_(-1000)
24  , restart_count_(0)
25  , save_nbp_(0)
26  , save_nba_(0)
27  , save_ndqm_(0)
28  , save_scalers_(0)
30  {}
31  SubProcess(int ind, pid_t pid)
32  : ind_(ind)
33  , pid_(pid)
34  , alive_(-1)
36  , mqs_(new MasterQueue(ind))
38  , restart_count_(0)
39  , save_nbp_(0)
40  , save_nba_(0)
41  , save_ndqm_(0)
42  , save_scalers_(0)
44  {
45  mqm_->drain();
46  mqs_->drain();
47  }
49  : ind_(b.ind_)
50  , pid_(b.pid_)
51  , alive_(b.alive_)
52  , mqm_(b.mqm_)
53  , mqs_(b.mqs_)
57  {
58  }
60 
61  virtual ~SubProcess()
62  {
63  }
64  void disconnect();
65 
66  void setStatus(int st);
67 
68  int queueId(){return (mqm_.get()!=0 ? mqm_->id() : 0);}
69  int queueStatus(){return (mqm_.get() !=0 ? mqm_->status() : 0);}
70  int queueOccupancy(){return (mqm_.get() !=0 ? mqm_->occupancy() : -1);}
71  int controlQueueOccupancy(){return (mqs_.get() !=0 ? mqs_->occupancy() : -1);}
72  pid_t queuePidOfLastSend(){return (mqm_.get() !=0 ? mqm_->pidOfLastSend() : -1);}
73  pid_t queuePidOfLastReceive(){return (mqm_.get() !=0 ? mqm_->pidOfLastReceive() : -1);}
74  pid_t pid() const {return pid_;}
75  int alive() const {return alive_;}
76  struct prg &params(){return prg_;}
77  void setParams(struct prg *p);
78  int post(MsgBuf &ptr, bool isMonitor)
79  {
80  // std::cout << "post called for sp " << ind_ << " type " << ptr->mtype
81  // << " queue ids " << mqm_->id() << " " << mqs_->id() << std::endl;
82  if(isMonitor) return mqm_->post(ptr); else return mqs_->post(ptr);
83  }
84  unsigned long rcv(MsgBuf &ptr, bool isMonitor)
85  {
86  // std::cout << "receive called for sp " << ind_ << " type " << ptr->mtype
87  // << " queue ids " << mqm_->id() << " " << mqs_->id() << std::endl;
88  if(isMonitor) return mqm_->rcv(ptr); else return mqs_->rcv(ptr);
89  }
90  unsigned long rcvNonBlocking(MsgBuf &ptr, bool isMonitor)
91  {
92  // std::cout << "receivenb called for sp " << ind_ << " type " << ptr->mtype
93  // << " queue ids " << mqm_->id() << " " << mqs_->id() << std::endl;
94  if(isMonitor)
95  return mqm_->rcvNonBlocking(ptr);
96  else
97  return mqs_->rcvNonBlocking(ptr);
98  }
99  int postSlave(MsgBuf &ptr, bool isMonitor)
100  {
101  // std::cout << "post called for sp " << ind_ << " type " << ptr->mtype
102  // << " queue ids " << mqm_->id() << " " << mqs_->id() << std::endl;
103  if(isMonitor) return sqm_->post(ptr); else return sqs_->post(ptr);
104  }
105  unsigned long rcvSlave(MsgBuf &ptr, bool isMonitor)
106  {
107  // std::cout << "receive called for sp " << ind_ << " type " << ptr->mtype
108  // << " queue ids " << mqm_->id() << " " << mqs_->id() << std::endl;
109  if(isMonitor) return sqm_->rcv(ptr); else return sqs_->rcv(ptr);
110  }
111  unsigned long rcvSlaveNonBlocking(MsgBuf &ptr, bool isMonitor)
112  {
113  // std::cout << "receivenb called for sp " << ind_ << " type " << ptr->mtype
114  // << " queue ids " << mqm_->id() << " " << mqs_->id() << std::endl;
115  if(isMonitor)
116  return sqm_->rcvNonBlocking(ptr);
117  else
118  return sqs_->rcvNonBlocking(ptr);
119  }
120 
121  int forkNew();
122 
123  std::string const &reasonForFailed()const {return reasonForFailed_;}
125  void setReasonForFailed(std::string r){reasonForFailed_ = r;}
127  int &countdown(){return restart_countdown_;}
128  unsigned int &restartCount(){return restart_count_;}
129  int get_save_nbp() const {return save_nbp_;}
130  int get_save_nba() const {return save_nba_;}
131  static const unsigned int monitor_queue_offset_ = 200;
132 
133  private:
134  int ind_;
135  pid_t pid_;
136  int alive_;
137  boost::shared_ptr<MasterQueue> mqm_; //to be turned to real object not pointer later
138  boost::shared_ptr<MasterQueue> mqs_;
139  SlaveQueue* sqm_; // every subprocess will create its instance at fork
141  std::string reasonForFailed_;
142  struct prg prg_;
144  unsigned int restart_count_;
145 
148  unsigned int save_ndqm_;
149  unsigned int save_scalers_;
151  };
152 
153 
154 }
155 #endif
unsigned long rcvSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:105
unsigned long rcv(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:84
struct prg & params()
Definition: SubProcess.h:76
std::string const & reasonForFailed() const
Definition: SubProcess.h:123
int restart_countdown_
Definition: SubProcess.h:143
int & countdown()
Definition: SubProcess.h:127
void setStatus(int st)
Definition: SubProcess.cc:34
void setReasonForFailed(std::string r)
Definition: SubProcess.h:125
void setParams(struct prg *p)
Definition: SubProcess.cc:46
SubProcess(int ind, pid_t pid)
Definition: SubProcess.h:31
boost::shared_ptr< MasterQueue > mqs_
Definition: SubProcess.h:138
unsigned long rcvNonBlocking(MsgBuf &ptr)
Definition: SlaveQueue.h:58
int post(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:78
bool inInconsistentState() const
Definition: SubProcess.h:124
static const unsigned int monitor_queue_offset_
Definition: SubProcess.h:131
unsigned long rcvSlaveNonBlocking(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:111
void setReportedInconsistent()
Definition: SubProcess.h:126
SlaveQueue * sqs_
Definition: SubProcess.h:140
pid_t pid() const
Definition: SubProcess.h:74
void disconnect()
Definition: SubProcess.cc:22
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:99
int get_save_nbp() const
Definition: SubProcess.h:129
SubProcess(const SubProcess &b)
Definition: SubProcess.h:48
unsigned int save_scalers_
Definition: SubProcess.h:149
int controlQueueOccupancy()
Definition: SubProcess.h:71
struct prg prg_
Definition: SubProcess.h:142
pid_t queuePidOfLastReceive()
Definition: SubProcess.h:73
int post(MsgBuf &ptr)
Definition: SlaveQueue.h:35
int alive() const
Definition: SubProcess.h:75
SubProcess & operator=(const SubProcess &b)
Definition: SubProcess.cc:6
virtual ~SubProcess()
Definition: SubProcess.h:61
int queueOccupancy()
Definition: SubProcess.h:70
double b
Definition: hdecay.h:120
int get_save_nba() const
Definition: SubProcess.h:130
SlaveQueue * sqm_
Definition: SubProcess.h:139
unsigned long rcv(MsgBuf &ptr)
Definition: SlaveQueue.h:45
int queueStatus()
Definition: SubProcess.h:69
std::string reasonForFailed_
Definition: SubProcess.h:141
unsigned long rcvNonBlocking(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:90
pid_t queuePidOfLastSend()
Definition: SubProcess.h:72
unsigned int restart_count_
Definition: SubProcess.h:144
unsigned int save_ndqm_
Definition: SubProcess.h:148
unsigned int & restartCount()
Definition: SubProcess.h:128
boost::shared_ptr< MasterQueue > mqm_
Definition: SubProcess.h:137
bool reported_inconsistent_
Definition: SubProcess.h:150