CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/EventFilter/Processor/src/SubProcess.h

Go to the documentation of this file.
00001 #ifndef EVENTFILTER_PROCESSOR_SUB_PROCESS_H
00002 #define EVENTFILTER_PROCESSOR_SUB_PROCESS_H
00003 
00004 #include "EventFilter/Utilities/interface/MsgBuf.h"
00005 #include "EventFilter/Utilities/interface/MasterQueue.h"
00006 #include "EventFilter/Utilities/interface/SlaveQueue.h"
00007 #include <string>
00008 #include <vector>
00009 
00010 #include <iostream>
00011 #include <boost/shared_ptr.hpp>
00012 
00013 // subprocess states: -1000 never started -1: crashed 0: exited successfully 1: running
00014 // @@EM ToDo: replace magic numbers with enum
00015 
00016 namespace evf{
00017   
00018   class SubProcess{
00019   public:
00020     SubProcess()
00021       : ind_(100000)
00022       , pid_(-1)
00023       , alive_(-1000)
00024       , restart_countdown_(0)
00025       , restart_count_(0)
00026       , save_nbp_(0)
00027       , save_nba_(0)
00028       , save_ndqm_(0)
00029       , save_scalers_(0)
00030       , reported_inconsistent_(false)
00031       , nfound_invalid_(0)
00032       {}
00033     SubProcess(int ind, pid_t pid)
00034       : ind_(ind)
00035       , pid_(pid)
00036       , alive_(-1)
00037       , mqm_(new MasterQueue(monitor_queue_offset_+ind))
00038       , mqs_(new MasterQueue(ind))
00039       , restart_countdown_(0)
00040       , restart_count_(0)
00041       , save_nbp_(0)
00042       , save_nba_(0)
00043       , save_ndqm_(0)
00044       , save_scalers_(0)
00045       , reported_inconsistent_(false)
00046       , nfound_invalid_(0)
00047       {
00048         mqm_->drain();
00049         mqs_->drain();
00050       }
00051     SubProcess(const SubProcess &b)
00052       : ind_(b.ind_)
00053       , pid_(b.pid_)
00054       , alive_(b.alive_)
00055       , mqm_(b.mqm_)
00056       , mqs_(b.mqs_)
00057       , restart_countdown_(b.restart_countdown_)
00058       , restart_count_(b.restart_count_)
00059       , reported_inconsistent_(b.reported_inconsistent_)
00060       , nfound_invalid_(b.nfound_invalid_)
00061       , postponed_trigger_updates_(b.postponed_trigger_updates_)
00062       {
00063       }
00064     SubProcess &operator=(const SubProcess &b);
00065 
00066     virtual ~SubProcess()
00067       {
00068       }
00069     void disconnect();
00070 
00071     void setStatus(int st);
00072 
00073     int queueId(){return (mqm_.get()!=0 ? mqm_->id() : 0);}
00074     int queueStatus(){return (mqm_.get() !=0 ? mqm_->status() : 0);}
00075     int queueOccupancy(){return (mqm_.get() !=0 ? mqm_->occupancy() : -1);}
00076     int controlQueueOccupancy(){return (mqs_.get() !=0 ? mqs_->occupancy() : -1);}
00077     pid_t queuePidOfLastSend(){return (mqm_.get() !=0 ? mqm_->pidOfLastSend() : -1);}
00078     pid_t queuePidOfLastReceive(){return (mqm_.get() !=0 ? mqm_->pidOfLastReceive() : -1);}
00079     pid_t pid() const {return pid_;}
00080     int alive() const {return alive_;}
00081     struct prg &params(){return prg_;}
00082     void setParams(struct prg *p);
00083     int post(MsgBuf &ptr, bool isMonitor)
00084       {
00085         //      std::cout << "post called for sp " << ind_ << " type " << ptr->mtype 
00086         //        << " queue ids " << mqm_->id() << " " << mqs_->id() << std::endl;
00087         if(isMonitor) return mqm_->post(ptr); else return mqs_->post(ptr);
00088       }
00089     unsigned long rcv(MsgBuf &ptr, bool isMonitor)
00090       {
00091         //      std::cout << "receive called for sp " << ind_ << " type " << ptr->mtype 
00092         //  << " queue ids " << mqm_->id() << " " << mqs_->id() << std::endl;
00093         if(isMonitor) return mqm_->rcv(ptr); else return mqs_->rcv(ptr);
00094       }
00095     unsigned long rcvNonBlocking(MsgBuf &ptr, bool isMonitor)
00096       {
00097         //      std::cout << "receivenb called for sp " << ind_ << " type " << ptr->mtype 
00098         //        << " queue ids " << mqm_->id() << " " << mqs_->id() << std::endl;
00099         if(isMonitor) 
00100           return mqm_->rcvNonBlocking(ptr); 
00101         else 
00102           return mqs_->rcvNonBlocking(ptr);
00103       }
00104     int postSlave(MsgBuf &ptr, bool isMonitor)
00105       {
00106         //      std::cout << "post called for sp " << ind_ << " type " << ptr->mtype 
00107         //        << " queue ids " << mqm_->id() << " " << mqs_->id() << std::endl;
00108         if(isMonitor) return sqm_->post(ptr); else return sqs_->post(ptr);
00109       }
00110     unsigned long rcvSlave(MsgBuf &ptr, bool isMonitor)
00111       {
00112         //      std::cout << "receive called for sp " << ind_ << " type " << ptr->mtype 
00113         //  << " queue ids " << mqm_->id() << " " << mqs_->id() << std::endl;
00114         if(isMonitor) return sqm_->rcv(ptr); else return sqs_->rcv(ptr);
00115       }
00116     unsigned long rcvSlaveNonBlocking(MsgBuf &ptr, bool isMonitor)
00117       {
00118         //      std::cout << "receivenb called for sp " << ind_ << " type " << ptr->mtype 
00119         //        << " queue ids " << mqm_->id() << " " << mqs_->id() << std::endl;
00120         if(isMonitor) 
00121           return sqm_->rcvNonBlocking(ptr); 
00122         else 
00123           return sqs_->rcvNonBlocking(ptr);
00124       }
00125 
00126     int forkNew();
00127 
00128     std::string const &reasonForFailed()const {return reasonForFailed_;}
00129     bool inInconsistentState() const {return reported_inconsistent_;}
00130     void setReasonForFailed(std::string r){reasonForFailed_ = r;}
00131     void setReportedInconsistent(){reported_inconsistent_ = true;}
00132     int &countdown(){return restart_countdown_;}    
00133     unsigned int &restartCount(){return restart_count_;}
00134     int get_save_nbp() const {return save_nbp_;}
00135     int get_save_nba() const {return save_nba_;}
00136     void found_invalid() {nfound_invalid_++;}
00137     unsigned int nfound_invalid() const { return nfound_invalid_;}
00138     void add_postponed_trigger_update(MsgBuf &);
00139     bool check_postponed_trigger_update(MsgBuf &, unsigned int);
00140     static const unsigned int monitor_queue_offset_ = 200;
00141 
00142   private:
00143     int ind_;
00144     pid_t pid_;
00145     int alive_;
00146     boost::shared_ptr<MasterQueue> mqm_; //to be turned to real object not pointer later
00147     boost::shared_ptr<MasterQueue> mqs_;
00148     SlaveQueue*                    sqm_;  // every subprocess will create its instance at fork 
00149     SlaveQueue*                    sqs_; 
00150     std::string reasonForFailed_;
00151     struct prg prg_;
00152     int restart_countdown_;
00153     unsigned int restart_count_;
00154 
00155     int save_nbp_;
00156     int save_nba_;
00157     unsigned int save_ndqm_;
00158     unsigned int save_scalers_;
00159     bool reported_inconsistent_;
00160     unsigned int nfound_invalid_;
00161 
00162     std::vector<MsgBuf> postponed_trigger_updates_;
00163   };
00164 
00165 
00166 }
00167 #endif