Go to the documentation of this file.00001 #ifndef EVENTFILTER_PROCESSOR_SUB_PROCESS_H
00002 #define EVENTFILTER_PROCESSOR_SUB_PROCESS_H
00003
00004 #include "EventFilter/Utilities/interface/MsgBuf.h"
00005 #include "EventFilter/Utilities/interface/MasterQueue.h"
00006 #include "EventFilter/Utilities/interface/SlaveQueue.h"
00007 #include <string>
00008 #include <vector>
00009
00010 #include <iostream>
00011 #include <boost/shared_ptr.hpp>
00012
00013
00014
00015
00016 namespace evf{
00017
00018 class SubProcess{
00019 public:
00020 SubProcess()
00021 : ind_(100000)
00022 , pid_(-1)
00023 , alive_(-1000)
00024 , restart_countdown_(0)
00025 , restart_count_(0)
00026 , save_nbp_(0)
00027 , save_nba_(0)
00028 , save_ndqm_(0)
00029 , save_scalers_(0)
00030 , reported_inconsistent_(false)
00031 , nfound_invalid_(0)
00032 {}
00033 SubProcess(int ind, pid_t pid)
00034 : ind_(ind)
00035 , pid_(pid)
00036 , alive_(-1)
00037 , mqm_(new MasterQueue(monitor_queue_offset_+ind))
00038 , mqs_(new MasterQueue(ind))
00039 , restart_countdown_(0)
00040 , restart_count_(0)
00041 , save_nbp_(0)
00042 , save_nba_(0)
00043 , save_ndqm_(0)
00044 , save_scalers_(0)
00045 , reported_inconsistent_(false)
00046 , nfound_invalid_(0)
00047 {
00048 mqm_->drain();
00049 mqs_->drain();
00050 }
00051 SubProcess(const SubProcess &b)
00052 : ind_(b.ind_)
00053 , pid_(b.pid_)
00054 , alive_(b.alive_)
00055 , mqm_(b.mqm_)
00056 , mqs_(b.mqs_)
00057 , restart_countdown_(b.restart_countdown_)
00058 , restart_count_(b.restart_count_)
00059 , reported_inconsistent_(b.reported_inconsistent_)
00060 , nfound_invalid_(b.nfound_invalid_)
00061 , postponed_trigger_updates_(b.postponed_trigger_updates_)
00062 {
00063 }
00064 SubProcess &operator=(const SubProcess &b);
00065
00066 virtual ~SubProcess()
00067 {
00068 }
00069 void disconnect();
00070
00071 void setStatus(int st);
00072
00073 int queueId(){return (mqm_.get()!=0 ? mqm_->id() : 0);}
00074 int queueStatus(){return (mqm_.get() !=0 ? mqm_->status() : 0);}
00075 int queueOccupancy(){return (mqm_.get() !=0 ? mqm_->occupancy() : -1);}
00076 int controlQueueOccupancy(){return (mqs_.get() !=0 ? mqs_->occupancy() : -1);}
00077 pid_t queuePidOfLastSend(){return (mqm_.get() !=0 ? mqm_->pidOfLastSend() : -1);}
00078 pid_t queuePidOfLastReceive(){return (mqm_.get() !=0 ? mqm_->pidOfLastReceive() : -1);}
00079 pid_t pid() const {return pid_;}
00080 int alive() const {return alive_;}
00081 struct prg ¶ms(){return prg_;}
00082 void setParams(struct prg *p);
00083 int post(MsgBuf &ptr, bool isMonitor)
00084 {
00085
00086
00087 if(isMonitor) return mqm_->post(ptr); else return mqs_->post(ptr);
00088 }
00089 unsigned long rcv(MsgBuf &ptr, bool isMonitor)
00090 {
00091
00092
00093 if(isMonitor) return mqm_->rcv(ptr); else return mqs_->rcv(ptr);
00094 }
00095 unsigned long rcvNonBlocking(MsgBuf &ptr, bool isMonitor)
00096 {
00097
00098
00099 if(isMonitor)
00100 return mqm_->rcvNonBlocking(ptr);
00101 else
00102 return mqs_->rcvNonBlocking(ptr);
00103 }
00104 int postSlave(MsgBuf &ptr, bool isMonitor)
00105 {
00106
00107
00108 if(isMonitor) return sqm_->post(ptr); else return sqs_->post(ptr);
00109 }
00110 unsigned long rcvSlave(MsgBuf &ptr, bool isMonitor)
00111 {
00112
00113
00114 if(isMonitor) return sqm_->rcv(ptr); else return sqs_->rcv(ptr);
00115 }
00116 unsigned long rcvSlaveNonBlocking(MsgBuf &ptr, bool isMonitor)
00117 {
00118
00119
00120 if(isMonitor)
00121 return sqm_->rcvNonBlocking(ptr);
00122 else
00123 return sqs_->rcvNonBlocking(ptr);
00124 }
00125
00126 int forkNew();
00127
00128 std::string const &reasonForFailed()const {return reasonForFailed_;}
00129 bool inInconsistentState() const {return reported_inconsistent_;}
00130 void setReasonForFailed(std::string r){reasonForFailed_ = r;}
00131 void setReportedInconsistent(){reported_inconsistent_ = true;}
00132 int &countdown(){return restart_countdown_;}
00133 unsigned int &restartCount(){return restart_count_;}
00134 int get_save_nbp() const {return save_nbp_;}
00135 int get_save_nba() const {return save_nba_;}
00136 void found_invalid() {nfound_invalid_++;}
00137 unsigned int nfound_invalid() const { return nfound_invalid_;}
00138 void add_postponed_trigger_update(MsgBuf &);
00139 bool check_postponed_trigger_update(MsgBuf &, unsigned int);
00140 static const unsigned int monitor_queue_offset_ = 200;
00141
00142 private:
00143 int ind_;
00144 pid_t pid_;
00145 int alive_;
00146 boost::shared_ptr<MasterQueue> mqm_;
00147 boost::shared_ptr<MasterQueue> mqs_;
00148 SlaveQueue* sqm_;
00149 SlaveQueue* sqs_;
00150 std::string reasonForFailed_;
00151 struct prg prg_;
00152 int restart_countdown_;
00153 unsigned int restart_count_;
00154
00155 int save_nbp_;
00156 int save_nba_;
00157 unsigned int save_ndqm_;
00158 unsigned int save_scalers_;
00159 bool reported_inconsistent_;
00160 unsigned int nfound_invalid_;
00161
00162 std::vector<MsgBuf> postponed_trigger_updates_;
00163 };
00164
00165
00166 }
00167 #endif