CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes
evf::FUEventProcessor Class Reference

#include <FUEventProcessor.h>

Inheritance diagram for evf::FUEventProcessor:

Public Member Functions

void actionPerformed (xdata::Event &e)
 
bool configuring (toolbox::task::WorkLoop *wl)
 
void css (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void defaultWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
bool enabling (toolbox::task::WorkLoop *wl)
 
xoap::MessageReference fsmCallback (xoap::MessageReference msg) throw (xoap::exception::Exception)
 
 FUEventProcessor (xdaq::ApplicationStub *s)
 
bool halting (toolbox::task::WorkLoop *wl)
 
void microState (xgi::Input *in, xgi::Output *out)
 
void moduleWeb (xgi::Input *in, xgi::Output *out)
 
void pathNames (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
 
void procStat (xgi::Input *in, xgi::Output *out)
 
void scalersWeb (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
 
void sendMessageOverMonitorQueue (MsgBuf &)
 
void serviceWeb (xgi::Input *in, xgi::Output *out)
 
void spotlightWebPage (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
 
bool stopping (toolbox::task::WorkLoop *wl)
 
void subWeb (xgi::Input *in, xgi::Output *out)
 
void updater (xgi::Input *in, xgi::Output *out)
 
 XDAQ_INSTANTIATOR ()
 
virtual ~FUEventProcessor ()
 

Private Member Functions

void attachDqmToShm () throw (evf::Exception)
 
void detachDqmFromShm () throw (evf::Exception)
 
bool enableClassic ()
 
bool enableCommon ()
 
bool enableMPEPSlave ()
 
void localLog (std::string)
 
std::string logsAsString ()
 
void makeStaticInfo ()
 
bool receiving (toolbox::task::WorkLoop *wl)
 
bool receivingAndMonitor (toolbox::task::WorkLoop *wl)
 
bool scalers (toolbox::task::WorkLoop *wl)
 
void startReceivingLoop ()
 
void startReceivingMonitorLoop ()
 
void startScalersWorkLoop () throw (evf::Exception)
 
void startSummarizeWorkLoop () throw (evf::Exception)
 
void startSupervisorLoop ()
 
bool stopClassic ()
 
void stopSlavesAndAcknowledge ()
 
bool summarize (toolbox::task::WorkLoop *wl)
 
bool supervisor (toolbox::task::WorkLoop *wl)
 

Private Attributes

int anonymousPipe_ [2]
 
xdata::InfoSpace * applicationInfoSpace_
 
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
 
toolbox::task::ActionSignature * asReceiveMsgAndRead_
 
toolbox::task::ActionSignature * asScalers_
 
toolbox::task::ActionSignature * asSummarize_
 
toolbox::task::ActionSignature * asSupervisor_
 
xdata::Boolean autoRestartSlaves_
 
xdata::String class_
 
xdata::String configString_
 
std::string configuration_
 
CPUStatcpustat_
 
Css css_
 
xdata::Boolean epInitialized_
 
FWEPWrapper evtProcessor_
 
xdata::Boolean exitOnError_
 
evf::StateMachine fsm_
 
xdata::Boolean hasModuleWebRegistry_
 
xdata::Boolean hasPrescaleService_
 
xdata::Boolean hasServiceWebRegistry_
 
xdata::Boolean hasShMem_
 
xdata::Boolean iDieStatisticsGathering_
 
xdata::String iDieUrl_
 
xdata::UnsignedInteger32 instance_
 
xdata::Boolean isRunNumberSetter_
 
Logger log_
 
std::vector< std::string > logRing_
 
unsigned int logRingIndex_
 
bool logWrap_
 
MsgBuf master_message_prg_
 
MsgBuf master_message_prr_
 
MsgBuf master_message_trr_
 
xdata::InfoSpace * monitorInfoSpace_
 
xdata::InfoSpace * monitorLegendaInfoSpace_
 
SubProcessmyProcess_
 
std::list< std::string > names_
 
xdata::Serializable * nbAccepted
 
unsigned int nbdead_
 
unsigned int nblive_
 
xdata::Serializable * nbProcessed
 
xdata::UnsignedInteger32 nbSubProcesses_
 
xdata::UnsignedInteger32 nbSubProcessesReporting_
 
unsigned int nbTotalDQM_
 
bool outprev_
 
xdata::Boolean outPut_
 
pthread_mutex_t pickup_lock_
 
RateStatratestat_
 
std::string reasonForFailedState_
 
bool receiving_
 
bool receivingM_
 
xdata::UnsignedInteger32 runNumber_
 
xdata::InfoSpace * scalersInfoSpace_
 
xdata::InfoSpace * scalersLegendaInfoSpace_
 
unsigned int scalersUpdates_
 
MsgBuf slave_message_monitoring_
 
MsgBuf slave_message_prr_
 
xdata::UnsignedInteger32 slaveRestartDelaySecs_
 
std::string sourceId_
 
xdata::Vector< xdata::Integer > spMStates_
 
xdata::Vector< xdata::Integer > spmStates_
 
SquidNet squidnet_
 
xdata::Boolean squidPresent_
 
pthread_mutex_t start_lock_
 
pthread_mutex_t stop_lock_
 
std::vector< SubProcesssubs_
 
xdata::UnsignedInteger32 superSleepSec_
 
bool supervising_
 
std::string updaterStatic_
 
xdata::String url_
 
pid_t vp_
 
Vulturevulture_
 
toolbox::task::WorkLoop * wlReceiving_
 
toolbox::task::WorkLoop * wlReceivingMonitor_
 
toolbox::task::WorkLoop * wlScalers_
 
bool wlScalersActive_
 
toolbox::task::WorkLoop * wlSummarize_
 
bool wlSummarizeActive_
 
toolbox::task::WorkLoop * wlSupervising_
 

Static Private Attributes

static const unsigned int logRingSize_ = 50
 

Detailed Description

Definition at line 58 of file FUEventProcessor.h.

Constructor & Destructor Documentation

FUEventProcessor::FUEventProcessor ( xdaq::ApplicationStub *  s)

Definition at line 61 of file FUEventProcessor.cc.

References anonymousPipe_, applicationInfoSpace_, autoRestartSlaves_, evf::SquidNet::check(), class_, configString_, css(), defaultWebPage(), epInitialized_, evtProcessor_, evf::StateMachine::findRcmsStateListener(), evf::StateMachine::foundRcmsStateListener(), fsm_, edm::PresenceFactory::get(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, hasShMem_, iDieStatisticsGathering_, iDieUrl_, evf::StateMachine::initialize(), instance_, isRunNumberSetter_, edm::PresenceFactory::makePresence(), makeStaticInfo(), microState(), moduleWeb(), monitorInfoSpace_, monitorLegendaInfoSpace_, names_, nbSubProcesses_, nbSubProcessesReporting_, outPut_, pathNames(), pickup_lock_, pipe::pipe(), procStat(), evf::FWEPWrapper::publishConfigAndMonitorItems(), evf::StateMachine::rcmsStateListener(), runNumber_, scalersInfoSpace_, scalersLegendaInfoSpace_, scalersWeb(), serviceWeb(), evf::FWEPWrapper::setAppCtxt(), evf::FWEPWrapper::setAppDesc(), ML::MLlog4cplus::setAppl(), evf::FWEPWrapper::setApplicationInfoSpace(), evf::FWEPWrapper::setMonitorInfoSpace(), evf::FWEPWrapper::setRcms(), evf::FWEPWrapper::setScalersInfoSpace(), slaveRestartDelaySecs_, sourceId_, spMStates_, spmStates_, spotlightWebPage(), squidnet_, squidPresent_, start_lock_, startSupervisorLoop(), evf::StateMachine::stateName(), stop_lock_, subWeb(), superSleepSec_, updater(), url_, and vulture_.

62  : xdaq::Application(s)
63  , fsm_(this)
64  , log_(getApplicationLogger())
65  , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
66  , runNumber_(0)
67  , epInitialized_(false)
68  , outPut_(true)
69  , autoRestartSlaves_(false)
71  , hasShMem_(true)
72  , hasPrescaleService_(true)
73  , hasModuleWebRegistry_(true)
75  , isRunNumberSetter_(true)
77  , outprev_(true)
78  , exitOnError_(true)
80  , squidnet_(3128,"http://localhost:8000/RELEASE-NOTES.txt")
83  , logWrap_(false)
84  , nbSubProcesses_(0)
86  , nblive_(0)
87  , nbdead_(0)
88  , nbTotalDQM_(0)
89  , wlReceiving_(0)
91  , receiving_(false)
94  , receivingM_(false)
95  , myProcess_(0)
96  , wlSupervising_(0)
97  , asSupervisor_(0)
98  , supervising_(false)
102  , nbProcessed(0)
103  , nbAccepted(0)
104  , scalersInfoSpace_(0)
106  , wlScalers_(0)
107  , asScalers_(0)
108  , wlScalersActive_(false)
109  , scalersUpdates_(0)
110  , wlSummarize_(0)
111  , asSummarize_(0)
112  , wlSummarizeActive_(false)
113  , superSleepSec_(1)
114  , iDieUrl_("none")
115  , vulture_(0)
116  , vp_(0)
117  , cpustat_(0)
118  , ratestat_(0)
123 {
124  using namespace utils;
125 
126  names_.push_back("nbProcessed" );
127  names_.push_back("nbAccepted" );
128  names_.push_back("epMacroStateInt");
129  names_.push_back("epMicroStateInt");
130  // create pipe for web communication
131  int retpipe = pipe(anonymousPipe_);
132  if(retpipe != 0)
133  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
134  // check squid presence
136  //pass application parameters to FWEPWrapper
137  evtProcessor_.setAppDesc(getApplicationDescriptor());
138  evtProcessor_.setAppCtxt(getApplicationContext());
139  // bind relevant callbacks to finite state machine
141 
142  //set sourceId_
143  url_ =
144  getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
145  getApplicationDescriptor()->getURN();
146  class_ =getApplicationDescriptor()->getClassName();
147  instance_=getApplicationDescriptor()->getInstance();
148  sourceId_=class_.toString()+"_"+instance_.toString();
149  LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
150  LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
151 
152  getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
153 
154  xdata::InfoSpace *ispace = getApplicationInfoSpace();
155  applicationInfoSpace_ = ispace;
156 
157  // default configuration
158  ispace->fireItemAvailable("parameterSet", &configString_ );
159  ispace->fireItemAvailable("epInitialized", &epInitialized_ );
160  ispace->fireItemAvailable("stateName", fsm_.stateName() );
161  ispace->fireItemAvailable("runNumber", &runNumber_ );
162  ispace->fireItemAvailable("outputEnabled", &outPut_ );
163 
164  ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
165  ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
166  ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
167  ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
168  ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
169  ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
170  ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
171  ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
172  ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
173  ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
174  ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
175  ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
176  ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
177  ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
178 
179  // Add infospace listeners for exporting data values
180  getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
181  getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
182 
183  // findRcmsStateListener
185 
186  // initialize monitoring infospace
187 
188  std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
189  toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
190  monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
191 
192  std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
193  urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
194  monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
195 
196 
197  monitorInfoSpace_->fireItemAvailable("url", &url_ );
198  monitorInfoSpace_->fireItemAvailable("class", &class_ );
199  monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
200  monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
201  monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
202 
203  monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
204 
205  std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
206  urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
207  scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
208 
209  std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
210  urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
211  scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
212 
213 
214 
216  scalersInfoSpace_->fireItemAvailable("instance", &instance_);
217 
221 
222  //subprocess state vectors for MP
223  monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
224  monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
225 
226  // Bind web interface
227  xgi::bind(this, &FUEventProcessor::css, "styles.css");
228  xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
229  xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
230  xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
231  xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
232  xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
233  xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
234  xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
235  xgi::bind(this, &FUEventProcessor::microState, "microState");
236  xgi::bind(this, &FUEventProcessor::updater, "updater" );
237  xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
238 
239  // instantiate the plugin manager, not referenced here after!
240 
242 
243  try{
244  LOG4CPLUS_DEBUG(getApplicationLogger(),
245  "Trying to create message service presence ");
247  if(pf != 0) {
248  pf->makePresence("MessageServicePresence").release();
249  }
250  else {
251  LOG4CPLUS_ERROR(getApplicationLogger(),
252  "Unable to create message service presence ");
253  }
254  }
255  catch(...) {
256  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
257  }
259 
260  typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
261  typedef AppDescSet_t::iterator AppDescIter_t;
262 
263  AppDescSet_t rcms=
264  getApplicationContext()->getDefaultZone()->
265  getApplicationDescriptors("RCMSStateListener");
266  if(rcms.size()==0)
267  {
268  LOG4CPLUS_WARN(getApplicationLogger(),
269  "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
270  // localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
271  }
272  else
273  {
274  AppDescIter_t it = rcms.begin();
275  evtProcessor_.setRcms(*it);
276  }
277  pthread_mutex_init(&start_lock_,0);
278  pthread_mutex_init(&stop_lock_,0);
279  pthread_mutex_init(&pickup_lock_,0);
280 
281  makeStaticInfo();
283 
284  if(vulture_==0) vulture_ = new Vulture(true);
285 
287 
288  AppDescSet_t setOfiDie=
289  getApplicationContext()->getDefaultZone()->
290  getApplicationDescriptors("evf::iDie");
291 
292  for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
293  if ((*it)->getInstance()==0) // there has to be only one instance of iDie
294  iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
295 }
xdata::Boolean hasModuleWebRegistry_
void spotlightWebPage(xgi::Input *, xgi::Output *)
bool check()
Definition: SquidNet.cc:21
xdata::Vector< xdata::Integer > spMStates_
xdata::InfoSpace * monitorInfoSpace_
void subWeb(xgi::Input *in, xgi::Output *out)
std::vector< std::string > logRing_
xdata::UnsignedInteger32 slaveRestartDelaySecs_
xdata::UnsignedInteger32 runNumber_
void updater(xgi::Input *in, xgi::Output *out)
toolbox::task::WorkLoop * wlScalers_
pthread_mutex_t start_lock_
#define MSQS_MESSAGE_TYPE_TRR
Definition: queue_defs.h:32
xdata::Boolean isRunNumberSetter_
void setAppCtxt(xdaq::ApplicationContext *ctx)
Definition: FWEPWrapper.h:88
xdata::Boolean hasServiceWebRegistry_
toolbox::task::WorkLoop * wlReceiving_
xdata::Boolean hasShMem_
def pipe
Definition: pipe.py:5
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
xdata::InfoSpace * scalersLegendaInfoSpace_
toolbox::task::ActionSignature * asSupervisor_
xdata::Boolean autoRestartSlaves_
xdata::UnsignedInteger32 nbSubProcessesReporting_
void css(xgi::Input *in, xgi::Output *out)
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
toolbox::task::ActionSignature * asReceiveMsgAndRead_
xdata::Boolean exitOnError_
xdata::InfoSpace * monitorLegendaInfoSpace_
void initialize(T *app)
Definition: StateMachine.h:84
xdata::Serializable * nbProcessed
#define MSQM_MESSAGE_TYPE_PRG
Definition: queue_defs.h:19
xdata::Bag< xdaq2rc::ClassnameAndInstance > * rcmsStateListener()
Definition: StateMachine.h:72
#define MSQS_MESSAGE_TYPE_PRR
Definition: queue_defs.h:30
evf::StateMachine fsm_
toolbox::task::WorkLoop * wlSupervising_
static void setAppl(xdaq::Application *app)
Definition: MLlog4cplus.cc:85
std::string reasonForFailedState_
xdata::Boolean iDieStatisticsGathering_
void setAppDesc(xdaq::ApplicationDescriptor *ad)
Definition: FWEPWrapper.h:87
void scalersWeb(xgi::Input *, xgi::Output *)
xdata::Boolean squidPresent_
std::list< std::string > names_
void setScalersInfoSpace(xdata::InfoSpace *sis, xdata::InfoSpace *slis)
Definition: FWEPWrapper.h:74
xdata::Boolean hasPrescaleService_
void procStat(xgi::Input *in, xgi::Output *out)
toolbox::task::WorkLoop * wlSummarize_
xdata::Serializable * nbAccepted
static PresenceFactory * get()
void setMonitorInfoSpace(xdata::InfoSpace *mis, xdata::InfoSpace *mlis)
Definition: FWEPWrapper.h:80
xdata::InfoSpace * applicationInfoSpace_
xdata::UnsignedInteger32 nbSubProcesses_
std::auto_ptr< Presence > makePresence(std::string const &presence_type) const
xdata::String * stateName()
Definition: StateMachine.h:69
void setApplicationInfoSpace(xdata::InfoSpace *is)
Definition: FWEPWrapper.h:79
xdata::String configString_
unsigned int scalersUpdates_
xdata::Boolean * foundRcmsStateListener()
Definition: StateMachine.h:78
xdata::UnsignedInteger32 superSleepSec_
void moduleWeb(xgi::Input *in, xgi::Output *out)
void pathNames(xgi::Input *, xgi::Output *)
pthread_mutex_t pickup_lock_
void defaultWebPage(xgi::Input *in, xgi::Output *out)
toolbox::task::ActionSignature * asSummarize_
xdata::Vector< xdata::Integer > spmStates_
xdata::UnsignedInteger32 instance_
void serviceWeb(xgi::Input *in, xgi::Output *out)
string s
Definition: asciidump.py:422
toolbox::task::WorkLoop * wlReceivingMonitor_
toolbox::task::ActionSignature * asScalers_
pthread_mutex_t stop_lock_
void publishConfigAndMonitorItems(bool)
Definition: FWEPWrapper.cc:107
xdata::InfoSpace * scalersInfoSpace_
void microState(xgi::Input *in, xgi::Output *out)
void setRcms(xdaq::ApplicationDescriptor *rcms)
Definition: FWEPWrapper.h:86
static const unsigned int logRingSize_
xdata::Boolean epInitialized_
void findRcmsStateListener()
FUEventProcessor::~FUEventProcessor ( )
virtual

Definition at line 300 of file FUEventProcessor.cc.

References vulture_.

301 {
302  // no longer needed since the wrapper is a member of the class and one can rely on
303  // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
304  // if (evtProcessor_) delete evtProcessor_;
305  if(vulture_ != 0) delete vulture_;
306 }

Member Function Documentation

void FUEventProcessor::actionPerformed ( xdata::Event &  e)

Definition at line 540 of file FUEventProcessor.cc.

References edm::EventProcessor::enableEndPaths(), epInitialized_, evtProcessor_, fsm_, outprev_, outPut_, and evf::StateMachine::stateName().

541 {
542 
543  if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
544  std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
545 
546  if ( item == "parameterSet") {
547  LOG4CPLUS_WARN(getApplicationLogger(),
548  "HLT Menu changed, force reinitialization of EventProcessor");
549  epInitialized_ = false;
550  }
551  if ( item == "outputEnabled") {
552  if(outprev_ != outPut_) {
553  LOG4CPLUS_WARN(getApplicationLogger(),
554  (outprev_ ? "Disabling " : "Enabling ")<<"global output");
556  outprev_ = outPut_;
557  }
558  }
559  if (item == "globalInputPrescale") {
560  LOG4CPLUS_WARN(getApplicationLogger(),
561  "Setting global input prescale has no effect "
562  <<"in this version of the code");
563  }
564  if ( item == "globalOutputPrescale") {
565  LOG4CPLUS_WARN(getApplicationLogger(),
566  "Setting global output prescale has no effect "
567  <<"in this version of the code");
568  }
569  }
570 
571 }
evf::StateMachine fsm_
xdata::String * stateName()
Definition: StateMachine.h:69
xdata::Boolean epInitialized_
void enableEndPaths(bool active)
void FUEventProcessor::attachDqmToShm ( )
throw (evf::Exception
)
private

Definition at line 769 of file FUEventProcessor.cc.

References evtProcessor_, edm::EventProcessor::getToken(), edm::Service< T >::isAvailable(), summarizeEdmComparisonLogfiles::success, and cms::Exception::what().

Referenced by enableCommon().

770 {
771  std::string errmsg;
772  bool success = false;
773  try {
776  success = edm::Service<FUShmDQMOutputService>()->attachToShm();
777  if (!success) errmsg = "Failed to attach DQM service to shared memory";
778  }
779  catch (cms::Exception& e) {
780  errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
781  }
782  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
783 }
virtual char const * what() const
Definition: Exception.cc:97
bool isAvailable() const
Definition: Service.h:47
ServiceToken getToken()
bool FUEventProcessor::configuring ( toolbox::task::WorkLoop *  wl)

Definition at line 315 of file FUEventProcessor.cc.

References edm::EventProcessor::beginJob(), configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, epInitialized_, evtProcessor_, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getNumberOfMicrostates(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, evf::FWEPWrapper::init(), instance_, localLog(), evf::Vulture::makeProcess(), nbSubProcesses_, ratestat_, reasonForFailedState_, scalersLegendaInfoSpace_, evf::RateStat::sendLegenda(), evf::CPUStat::sendLegenda(), edm::event_processor::sInit, spMStates_, spmStates_, evf::FWEPWrapper::startMonitoringWorkLoop(), vp_, and vulture_.

316 {
317 // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
318 // << ((instance_.value_==0) ? 0x8 : 0) << " "
319 // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
320 // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
321 // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
322  unsigned short smap
323  = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
324  + ((instance_.value_==0) ? 0x8 : 0)
325  + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
326  + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
327  + (hasPrescaleService_.value_ ? 0x1 : 0);
328  if(nbSubProcesses_.value_==0)
329  {
330  spMStates_.setSize(1);
331  spmStates_.setSize(1);
332  }
333  else
334  {
335  spMStates_.setSize(nbSubProcesses_.value_);
336  spmStates_.setSize(nbSubProcesses_.value_);
337  for(unsigned int i = 0; i < spMStates_.size(); i++)
338  {
340  spmStates_[i] = 0;
341  }
342  }
343  try {
344  LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
345  std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
346  epInitialized_=true;
347  if(evtProcessor_)
348  {
349  // moved to wrapper class
353  if(cpustat_) {delete cpustat_; cpustat_=0;}
355  iDieUrl_.value_);
356  if(ratestat_) {delete ratestat_; ratestat_=0;}
357  ratestat_ = new RateStat(iDieUrl_.value_);
358  if(iDieStatisticsGathering_.value_){
359  try{
361  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
362  if(legenda !=0){
363  std::string slegenda = ((xdata::String*)legenda)->value_;
364  ratestat_->sendLegenda(slegenda);
365  }
366 
367  }
368  catch(evf::Exception &e){
369  LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
370  << e.what());
371  }
372  }
373 
374  fsm_.fireEvent("ConfigureDone",this);
375  LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
376  localLog("-I- Configuration completed");
377 
378  }
379  }
380  catch (xcept::Exception &e) {
381  reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
384  }
385  catch(cms::Exception &e) {
389  }
390  catch(std::exception &e) {
391  reasonForFailedState_ = e.what();
394  }
395  catch(...) {
396  fsm_.fireFailed("Unknown Exception",this);
397  }
398 
399  if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
400 
401  return false;
402 }
xdata::Boolean hasModuleWebRegistry_
xdata::Vector< xdata::Integer > spMStates_
int i
Definition: DBlmapReader.cc:9
pid_t makeProcess()
Definition: Vulture.cc:148
void startMonitoringWorkLoop()
Definition: FWEPWrapper.cc:534
virtual std::string explainSelf() const
Definition: Exception.cc:56
xdata::Boolean hasServiceWebRegistry_
void init(unsigned short, std::string &)
Definition: FWEPWrapper.cc:188
xdata::InfoSpace * scalersLegendaInfoSpace_
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
std::string reasonForFailedState_
xdata::Boolean iDieStatisticsGathering_
xdata::Boolean hasPrescaleService_
unsigned int getNumberOfMicrostates()
Definition: FWEPWrapper.h:136
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String configString_
std::vector< std::string > const & getmicromap() const
Definition: FWEPWrapper.h:137
std::string const & configuration() const
Definition: FWEPWrapper.h:98
void sendLegenda(const std::vector< std::string > &)
Definition: CPUStat.cc:26
xdata::Vector< xdata::Integer > spmStates_
void sendLegenda(const std::string &)
Definition: RateStat.cc:24
xdata::UnsignedInteger32 instance_
xdata::Boolean epInitialized_
void evf::FUEventProcessor::css ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
inline

Definition at line 101 of file FUEventProcessor.h.

References evf::Css::css(), and css_.

Referenced by FUEventProcessor().

102  {
103  css_.css(in,out);
104  }
void css(xgi::Input *in, xgi::Output *out)
Definition: Css.h:15
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::defaultWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 642 of file FUEventProcessor.cc.

References nbSubProcesses_, and dbtoconf::out.

Referenced by FUEventProcessor(), and receivingAndMonitor().

644 {
645 
646 
647  *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
648  << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
649  << getApplicationDescriptor()->getInstance() << "</title>"
650  << "<meta http-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
651  << "</head></html>";
652 }
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
void FUEventProcessor::detachDqmFromShm ( )
throw (evf::Exception
)
private

Definition at line 787 of file FUEventProcessor.cc.

References evtProcessor_, edm::EventProcessor::getToken(), edm::Service< T >::isAvailable(), summarizeEdmComparisonLogfiles::success, and cms::Exception::what().

Referenced by stopClassic().

788 {
789  std::string errmsg;
790  bool success = false;
791  try {
794  success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
795  if (!success) errmsg = "Failed to detach DQM service from shared memory";
796  }
797  catch (cms::Exception& e) {
798  errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
799  }
800  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
801 }
virtual char const * what() const
Definition: Exception.cc:97
bool isAvailable() const
Definition: Service.h:47
ServiceToken getToken()
bool FUEventProcessor::enableClassic ( )
private

Definition at line 1564 of file FUEventProcessor.cc.

References enableCommon(), evtProcessor_, edm::EventProcessor::getState(), localLog(), stor::utils::sleep(), and edm::event_processor::sRunning.

Referenced by enabling().

1565 {
1566  bool retval = enableCommon();
1568  LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
1569  ::sleep(1);
1570  }
1571 
1572  // implementation moved to EPWrapper
1573  // startScalersWorkLoop(); // this is now not done any longer
1574  localLog("-I- Start completed");
1575  return retval;
1576 }
event_processor::State getState() const
void sleep(Duration_t)
Definition: Utils.h:163
void localLog(std::string)
bool FUEventProcessor::enableCommon ( )
private

Definition at line 1505 of file FUEventProcessor.cc.

References attachDqmToShm(), edm::EventProcessor::clearCounters(), gather_cfg::cout, edm::EventProcessor::declareRunNumber(), evtProcessor_, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, isRunNumberSetter_, localLog(), reasonForFailedState_, edm::EventProcessor::runAsync(), runNumber_, edm::EventProcessor::setRunNumber(), stor::utils::sleep(), and edm::EventProcessor::statusAsync().

Referenced by enableClassic(), and enableMPEPSlave().

1506 {
1507  try {
1508  if(hasShMem_) attachDqmToShm();
1509  int sc = 0;
1511  if(isRunNumberSetter_)
1513  else
1515  try{
1516  ::sleep(1);
1518  sc = evtProcessor_->statusAsync();
1519  }
1520  catch(cms::Exception &e) {
1524  return false;
1525  }
1526  catch(std::exception &e) {
1527  reasonForFailedState_ = e.what();
1530  return false;
1531  }
1532  catch(...) {
1533  reasonForFailedState_ = "Unknown Exception";
1536  return false;
1537  }
1538  if(sc != 0) {
1539  std::ostringstream oss;
1540  oss<<"EventProcessor::runAsync returned status code " << sc;
1541  reasonForFailedState_ = oss.str();
1544  return false;
1545  }
1546  }
1547  catch (xcept::Exception &e) {
1548  reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
1551  return false;
1552  }
1553  try{
1554  fsm_.fireEvent("EnableDone",this);
1555  }
1556  catch (xcept::Exception &e) {
1557  std::cout << "exception " << (std::string)e.what() << std::endl;
1558  throw;
1559  }
1560 
1561  return false;
1562 }
xdata::UnsignedInteger32 runNumber_
virtual std::string explainSelf() const
Definition: Exception.cc:56
xdata::Boolean isRunNumberSetter_
void clearCounters()
Clears counters used by trigger report.
xdata::Boolean hasShMem_
void setRunNumber(RunNumber_t runNumber)
void sleep(Duration_t)
Definition: Utils.h:163
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
void declareRunNumber(RunNumber_t runNumber)
std::string reasonForFailedState_
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
StatusCode statusAsync() const
tuple cout
Definition: gather_cfg.py:41
bool FUEventProcessor::enableMPEPSlave ( )
private

Definition at line 1577 of file FUEventProcessor.cc.

References enableCommon(), evtProcessor_, edm::hlt::Exception, evf::StateMachine::fireFailed(), fsm_, edm::PresenceFactory::get(), evf::FWEPWrapper::isWaitingForLs(), localLog(), edm::PresenceFactory::makePresence(), reasonForFailedState_, evf::FWEPWrapper::resetWaiting(), ML::MLlog4cplus::setAppl(), stor::utils::sleep(), startReceivingLoop(), startReceivingMonitorLoop(), and startScalersWorkLoop().

Referenced by enabling(), and supervisor().

1578 {
1579  //all this happens only in the child process
1584  while(!evtProcessor_.isWaitingForLs())
1585  ::sleep(1);
1586  // @EM test do not run monitor loop in slave, only receiving&Monitor
1587  // evtProcessor_.startMonitoringWorkLoop();
1588  try{
1589  // evtProcessor_.makeServicesOnly();
1590  try{
1591  LOG4CPLUS_DEBUG(getApplicationLogger(),
1592  "Trying to create message service presence ");
1594  if(pf != 0) {
1595  pf->makePresence("MessageServicePresence").release();
1596  }
1597  else {
1598  LOG4CPLUS_ERROR(getApplicationLogger(),
1599  "Unable to create message service presence ");
1600  }
1601  }
1602  catch(...) {
1603  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
1604  }
1605  ML::MLlog4cplus::setAppl(this);
1606  }
1607  catch (xcept::Exception &e) {
1608  reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
1611  }
1612  bool retval = enableCommon();
1613  // while(evtProcessor_->getState()!= edm::event_processor::sRunning){
1614  // LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
1615  // ::sleep(1);
1616  // }
1617  return retval;
1618 }
void sleep(Duration_t)
Definition: Utils.h:163
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
static void setAppl(xdaq::Application *app)
Definition: MLlog4cplus.cc:85
std::string reasonForFailedState_
void localLog(std::string)
static PresenceFactory * get()
std::auto_ptr< Presence > makePresence(std::string const &presence_type) const
bool isWaitingForLs()
Definition: FWEPWrapper.h:131
bool FUEventProcessor::enabling ( toolbox::task::WorkLoop *  wl)

Definition at line 408 of file FUEventProcessor.cc.

References toolbox::mem::_s_mutex_ptr_, edm::EventProcessor::beginJob(), configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, evf::StateMachine::disableRcmsStateNotification(), enableClassic(), enableMPEPSlave(), epInitialized_, evtProcessor_, evf::StateMachine::fireEvent(), evf::FWEPWrapper::forceInitEventProcessorMaybe(), fsm_, evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getNumberOfMicrostates(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, evf::FWEPWrapper::init(), instance_, localLog(), myProcess_, nbSubProcesses_, nbTotalDQM_, ratestat_, evf::FWEPWrapper::resetLumiSectionReferenceIndex(), runNumber_, scalersInfoSpace_, scalersUpdates_, evf::RateStat::sendLegenda(), evf::CPUStat::sendLegenda(), evf::Vulture::start(), start_lock_, startSummarizeWorkLoop(), stop_lock_, subs_, vp_, and vulture_.

409 {
410  nbTotalDQM_ = 0;
411  scalersUpdates_ = 0;
412 // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
413 // << ((instance_.value_==0) ? 0x8 : 0) << " "
414 // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
415 // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
416 // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
417  unsigned short smap
418  = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
419  + ((instance_.value_==0) ? 0x8 : 0)
420  + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
421  + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
422  + (hasPrescaleService_.value_ ? 0x1 : 0);
423 
424  LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
425  if(!epInitialized_){
427  }
428  std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
429 
430  if(!epInitialized_){
432  if(cpustat_) {delete cpustat_; cpustat_=0;}
434  iDieUrl_.value_);
435  if(iDieStatisticsGathering_.value_){
436  try{
438  xdata::Serializable *legenda = scalersInfoSpace_->find("scalersLegenda");
439  if(legenda !=0){
440  std::string slegenda = ((xdata::String*)legenda)->value_;
441  ratestat_->sendLegenda(slegenda);
442  }
443  }
444  catch(evf::Exception &e){
445  LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
446  << e.what());
447  }
448  }
449  if(ratestat_) {delete ratestat_; ratestat_=0;}
450  ratestat_ = new RateStat(iDieUrl_.value_);
451  epInitialized_ = true;
452  }
453  configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
455  //classic appl will return here
456  if(nbSubProcesses_.value_==0) return enableClassic();
457  //protect manipulation of subprocess array
458  pthread_mutex_lock(&start_lock_);
459  subs_.clear();
460  subs_.resize(nbSubProcesses_.value_); // this should not be necessary
461  pid_t retval = -1;
462  for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
463  {
464  subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
465  }
466  pthread_mutex_unlock(&start_lock_);
467 
468  for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
469  {
470  retval = subs_[i].forkNew();
471  if(retval==0)
472  {
473  myProcess_ = &subs_[i];
474  // dirty hack: delete/recreate global binary semaphore for later use in child
476  toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
477  int retval = pthread_mutex_destroy(&stop_lock_);
478  if(retval != 0) perror("error");
479  retval = pthread_mutex_init(&stop_lock_,0);
480  if(retval != 0) perror("error");
482  return enableMPEPSlave();
483  // the loop is broken in the child
484  }
485  }
486 
488  vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
489 
490  LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
491  fsm_.fireEvent("EnableDone",this);
492  localLog("-I- Start completed");
493  return false;
494 }
xdata::Boolean hasModuleWebRegistry_
int i
Definition: DBlmapReader.cc:9
xdata::UnsignedInteger32 runNumber_
pthread_mutex_t start_lock_
xdata::Boolean hasServiceWebRegistry_
void init(unsigned short, std::string &)
Definition: FWEPWrapper.cc:188
void forceInitEventProcessorMaybe()
Definition: FWEPWrapper.h:65
std::vector< SubProcess > subs_
evf::StateMachine fsm_
void resetLumiSectionReferenceIndex()
Definition: FWEPWrapper.h:133
xdata::Boolean iDieStatisticsGathering_
int start(std::string, int=0)
Definition: Vulture.cc:235
xdata::Boolean hasPrescaleService_
unsigned int getNumberOfMicrostates()
Definition: FWEPWrapper.h:136
void fireEvent(const std::string &evtType, void *originator)
void disableRcmsStateNotification()
Definition: StateMachine.h:64
toolbox::BSem * _s_mutex_ptr_
void localLog(std::string)
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String configString_
unsigned int scalersUpdates_
std::vector< std::string > const & getmicromap() const
Definition: FWEPWrapper.h:137
std::string const & configuration() const
Definition: FWEPWrapper.h:98
void sendLegenda(const std::vector< std::string > &)
Definition: CPUStat.cc:26
void sendLegenda(const std::string &)
Definition: RateStat.cc:24
xdata::UnsignedInteger32 instance_
pthread_mutex_t stop_lock_
xdata::InfoSpace * scalersInfoSpace_
xdata::Boolean epInitialized_
xoap::MessageReference FUEventProcessor::fsmCallback ( xoap::MessageReference  msg)
throw (xoap::exception::Exception
)

Definition at line 532 of file FUEventProcessor.cc.

References evf::StateMachine::commandCallback(), fsm_, and runTheMatrix::msg.

534 {
535  return fsm_.commandCallback(msg);
536 }
xoap::MessageReference commandCallback(xoap::MessageReference msg)
Definition: StateMachine.cc:71
evf::StateMachine fsm_
bool FUEventProcessor::halting ( toolbox::task::WorkLoop *  wl)

Definition at line 508 of file FUEventProcessor.cc.

References evtProcessor_, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, localLog(), nbSubProcesses_, reasonForFailedState_, evf::FWEPWrapper::stopAndHalt(), and stopSlavesAndAcknowledge().

509 {
510  LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
511  if(nbSubProcesses_.value_!=0)
513  try{
515  }
516  catch (evf::Exception &e) {
517  reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
520  }
521  // if(hasShMem_) detachDqmFromShm();
522 
523  LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
524  fsm_.fireEvent("HaltDone",this);
525 
526  localLog("-I- Halt completed");
527  return false;
528 }
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
std::string reasonForFailedState_
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
xdata::UnsignedInteger32 nbSubProcesses_
void FUEventProcessor::localLog ( std::string  m)
private

Definition at line 821 of file FUEventProcessor.cc.

References logRing_, logRingIndex_, logRingSize_, logWrap_, m, and cond::timestamp.

Referenced by configuring(), enableClassic(), enableCommon(), enableMPEPSlave(), enabling(), halting(), stopClassic(), stopSlavesAndAcknowledge(), and supervisor().

822 {
823  timeval tv;
824 
825  gettimeofday(&tv,0);
826  tm *uptm = localtime(&tv.tv_sec);
827  char datestring[256];
828  strftime(datestring, sizeof(datestring),"%c", uptm);
829 
830  if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
831  logRingIndex_--;
832  std::ostringstream timestamp;
833  timestamp << " at " << datestring;
834  m += timestamp.str();
836 }
std::vector< std::string > logRing_
static const unsigned int logRingSize_
std::string FUEventProcessor::logsAsString ( )
private

Definition at line 804 of file FUEventProcessor.cc.

References i, logRing_, logRingIndex_, and logWrap_.

805 {
806  std::ostringstream oss;
807  if(logWrap_)
808  {
809  for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
810  oss << logRing_[i] << std::endl;
811  for(unsigned int i = 0; i < logRingIndex_; i++)
812  oss << logRing_[i] << std::endl;
813  }
814  else
815  for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
816  oss << logRing_[i] << std::endl;
817 
818  return oss.str();
819 }
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > logRing_
void FUEventProcessor::makeStaticInfo ( )
private

Definition at line 1886 of file FUEventProcessor.cc.

References applicationInfoSpace_, evf::utils::cDiv(), edm::hlt::Exception, edm::getReleaseVersion(), hasModuleWebRegistry_, hasServiceWebRegistry_, hasShMem_, evf::utils::mDiv(), outPut_, and updaterStatic_.

Referenced by FUEventProcessor().

1887 {
1888  using namespace utils;
1889  std::ostringstream ost;
1890  mDiv(&ost,"ve");
1891  ost<< "$Revision: 1.131 $ (" << edm::getReleaseVersion() <<")";
1892  cDiv(&ost);
1893  mDiv(&ost,"ou",outPut_.toString());
1894  mDiv(&ost,"sh",hasShMem_.toString());
1895  mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
1896  mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
1897 
1898  xdata::Serializable *monsleep = 0;
1899  xdata::Serializable *lstimeout = 0;
1900  try{
1901  monsleep = applicationInfoSpace_->find("monSleepSec");
1902  lstimeout = applicationInfoSpace_->find("lsTimeOut");
1903  }
1904  catch(xdata::exception::Exception e){
1905  }
1906 
1907  if(monsleep!=0)
1908  mDiv(&ost,"ms",monsleep->toString());
1909  if(lstimeout!=0)
1910  mDiv(&ost,"lst",lstimeout->toString());
1911  char cbuf[sizeof(struct utsname)];
1912  struct utsname* buf = (struct utsname*)cbuf;
1913  uname(buf);
1914  mDiv(&ost,"sysinfo");
1915  ost << buf->sysname << " " << buf->nodename
1916  << " " << buf->release << " " << buf->version << " " << buf->machine;
1917  cDiv(&ost);
1918  updaterStatic_ = ost.str();
1919 }
xdata::Boolean hasModuleWebRegistry_
xdata::Boolean hasServiceWebRegistry_
xdata::Boolean hasShMem_
void cDiv(std::ostringstream *out)
Definition: procUtils.cc:338
std::string getReleaseVersion()
xdata::InfoSpace * applicationInfoSpace_
void mDiv(std::ostringstream *out, std::string name)
Definition: procUtils.cc:335
void FUEventProcessor::microState ( xgi::Input in,
xgi::Output out 
)

Definition at line 1694 of file FUEventProcessor.cc.

References autoRestartSlaves_, gather_cfg::cout, evtProcessor_, cmsCodeRules.cppFunctionSkipper::exception, fsm_, evf::FWEPWrapper::getScalersUpdates(), i, evf::FWEPWrapper::microState(), evf::FWEPWrapper::moduleNameFromIndex(), myProcess_, nbdead_, nblive_, nbSubProcesses_, nbTotalDQM_, pickup_lock_, start_lock_, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), subs_, and cms::Exception::what().

Referenced by FUEventProcessor().

1695 {
1696  std::string urn = getApplicationDescriptor()->getURN();
1697  try{
1700  if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
1701  *out << "<tr><td>" << fsm_.stateName()->toString()
1702  << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
1703  << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
1705  *out << "<td></td><td>" << nbTotalDQM_
1706  << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
1707  if(nbSubProcesses_.value_!=0 && !myProcess_)
1708  {
1709  pthread_mutex_lock(&start_lock_);
1710  for(unsigned int i = 0; i < subs_.size(); i++)
1711  {
1712  try{
1713  if(subs_[i].alive()>0)
1714  {
1715  *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
1716  << i << "\">""Alive</td><td>S</td><td>"
1717  << subs_[i].queueId() << "<td>"
1718  << subs_[i].queueStatus()<< "/"
1719  << subs_[i].queueOccupancy() << "/"
1720  << subs_[i].queuePidOfLastSend() << "/"
1721  << subs_[i].queuePidOfLastReceive()
1722  << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
1723  << subs_[i].pid() << "&method=procStat\">"
1724  << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
1725  << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
1726  << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
1727  << subs_[i].params().nba << "/" << subs_[i].params().nbp
1728  << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
1729  << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
1730  << "</td><td>" << subs_[i].params().ps
1731  << "</td><td"
1732  << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">"
1733  << subs_[i].params().eols
1734  << "</td><td>" << subs_[i].params().dqm
1735  << "</td><td>" << subs_[i].params().trp << "</td>";
1736  }
1737  else
1738  {
1739  pthread_mutex_lock(&pickup_lock_);
1740  *out << "<tr><td id=\"a"<< i << "\" ";
1741  if(subs_[i].alive()==-1000)
1742  *out << " bgcolor=\"#bbaabb\">NotInitialized";
1743  else
1744  *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
1745  *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
1746  << subs_[i].queueStatus() << "/"
1747  << subs_[i].queueOccupancy() << "/"
1748  << subs_[i].queuePidOfLastSend() << "/"
1749  << subs_[i].queuePidOfLastReceive()
1750  << "</td><td id=\"p"<< i << "\">"
1751  <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
1752  if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
1753  {
1754  if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
1755  *out << " will restart in " << subs_[i].countdown() << " s";
1756  else if(autoRestartSlaves_)
1757  *out << " reached maximum restart count";
1758  else *out << " autoRestart is disabled ";
1759  }
1760  *out << "</td><td"
1761  << ((subs_[i].params().eols<subs_[i].params().ls) ?
1762  " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")
1763  << ">"
1764  << subs_[i].params().eols
1765  << "</td><td>" << subs_[i].params().dqm
1766  << "</td><td>" << subs_[i].params().trp << "</td>";
1767  pthread_mutex_unlock(&pickup_lock_);
1768  }
1769  *out << "</tr>";
1770  }
1771  catch(evf::Exception &e){
1772  *out << "<tr><td id=\"a"<< i << "\" "
1773  <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
1774  << subs_[i].queueStatus() << "/"
1775  << subs_[i].queueOccupancy() << "/"
1776  << subs_[i].queuePidOfLastSend() << "/"
1777  << subs_[i].queuePidOfLastReceive()
1778  << "</td><td id=\"p"<< i << "\">"
1779  <<subs_[i].pid()<<"</td>";
1780  }
1781  }
1782  pthread_mutex_unlock(&start_lock_);
1783  }
1784  }
1785  catch(evf::Exception &e)
1786  {
1787  LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
1788  }
1789  catch(cms::Exception &e)
1790  {
1791  LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
1792  }
1793  catch(std::exception &e)
1794  {
1795  LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
1796  }
1797  catch(...)
1798  {
1799  LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
1800  }
1801 
1802 }
virtual char const * what() const
Definition: Exception.cc:97
int i
Definition: DBlmapReader.cc:9
pthread_mutex_t start_lock_
xdata::Boolean autoRestartSlaves_
std::string const & moduleNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:118
std::vector< SubProcess > subs_
evf::StateMachine fsm_
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
std::string const & stateNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:123
pthread_mutex_t pickup_lock_
tuple cout
Definition: gather_cfg.py:41
void microState(xgi::Input *in, xgi::Output *out)
unsigned int getScalersUpdates()
Definition: FWEPWrapper.h:132
void evf::FUEventProcessor::moduleWeb ( xgi::Input in,
xgi::Output out 
)
inline

Definition at line 107 of file FUEventProcessor.h.

References evtProcessor_, and evf::FWEPWrapper::moduleWeb().

Referenced by FUEventProcessor(), and receivingAndMonitor().

void moduleWeb(xgi::Input *in, xgi::Output *out)
Definition: FWEPWrapper.cc:990
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::pathNames ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 756 of file FUEventProcessor.cc.

References evtProcessor_, dbtoconf::out, and scalersLegendaInfoSpace_.

Referenced by FUEventProcessor().

758 {
759 
760  if(evtProcessor_ != 0){
761  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
762  if(legenda !=0){
763  std::string slegenda = ((xdata::String*)legenda)->value_;
764  *out << slegenda << std::endl;
765  }
766  }
767 }
xdata::InfoSpace * scalersLegendaInfoSpace_
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::procStat ( xgi::Input in,
xgi::Output out 
)

Definition at line 1876 of file FUEventProcessor.cc.

References evf::utils::procStat().

Referenced by FUEventProcessor(), and receivingAndMonitor().

1877 {
1879 }
void procStat(std::ostringstream *out)
Definition: procUtils.cc:157
tuple out
Definition: dbtoconf.py:99
bool FUEventProcessor::receiving ( toolbox::task::WorkLoop *  wl)
private

Definition at line 893 of file FUEventProcessor.cc.

References cmsRelvalreport::exit, evf::StateMachine::fireEvent(), fsm_, edm::PresenceFactory::get(), edm::PresenceFactory::makePresence(), runTheMatrix::msg, MSQM_MESSAGE_TYPE_FSTOP, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, myProcess_, evf::SubProcess::postSlave(), evf::SubProcess::rcvSlave(), stop_lock_, and stopClassic().

Referenced by startReceivingLoop().

894 {
895  MsgBuf msg;
896  try{
897  myProcess_->rcvSlave(msg,false); //will receive only messages from Master
898  if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
899  {
900  pthread_mutex_lock(&stop_lock_);
901  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
902  try{
903  LOG4CPLUS_DEBUG(getApplicationLogger(),
904  "Trying to create message service presence ");
906  if(pf != 0) {
907  pf->makePresence("MessageServicePresence").release();
908  }
909  else {
910  LOG4CPLUS_ERROR(getApplicationLogger(),
911  "Unable to create message service presence ");
912  }
913  }
914  catch(...) {
915  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
916  }
917  stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
919  myProcess_->postSlave(msg1,false);
920  pthread_mutex_unlock(&stop_lock_);
921  fclose(stdout);
922  fclose(stderr);
923  exit(EXIT_SUCCESS);
924  }
925  if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
926  _exit(EXIT_SUCCESS);
927  }
928  catch(evf::Exception &e){}
929  return true;
930 }
unsigned long rcvSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:110
#define MSQM_MESSAGE_TYPE_FSTOP
Definition: queue_defs.h:17
#define MSQS_MESSAGE_TYPE_STOP
Definition: queue_defs.h:29
evf::StateMachine fsm_
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:104
void fireEvent(const std::string &evtType, void *originator)
static PresenceFactory * get()
std::auto_ptr< Presence > makePresence(std::string const &presence_type) const
#define MSQM_MESSAGE_TYPE_STOP
Definition: queue_defs.h:16
pthread_mutex_t stop_lock_
bool FUEventProcessor::receivingAndMonitor ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1361 of file FUEventProcessor.cc.

References anonymousPipe_, applicationInfoSpace_, harvestRelVal::args, prof2calltree::count, gather_cfg::cout, cycle, runTheMatrix::data, defaultWebPage(), evf::prg::dqm, evf::prg::eols, evf::FWEPWrapper::epMAltState_, evf::FWEPWrapper::epmAltState_, evtProcessor_, edm::hlt::Exception, exitOnError_, spr::find(), first, fsm_, evf::internal::MyCgi::getEnvironment(), recoMuon::in, Input, evf::FWEPWrapper::lastLumiUsingEol_, evf::prg::ls, evf::FWEPWrapper::lsid_, MAX_PIPE_BUFFER_SIZE, evf::FWEPWrapper::microState(), moduleWeb(), evf::FWEPWrapper::monitoring(), evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_MCS, MSQM_MESSAGE_TYPE_PRG, MSQM_MESSAGE_TYPE_TRP, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_MCR, MSQS_MESSAGE_TYPE_WEB, myProcess_, evf::prg::nba, evf::prg::nbp, NUMERIC_MESSAGE_SIZE, dbtoconf::out, Output, PIPE_WRITE, pos, evf::SubProcess::postSlave(), procStat(), evf::prg::ps, evf::FWEPWrapper::psid_, o2o::query, evf::SubProcess::rcvSlave(), scalersUpdates_, edm::second(), edm::event_processor::sError, slave_message_monitoring_, slave_message_prr_, stor::utils::sleep(), spotlightWebPage(), edm::event_processor::sStopping, evf::StateMachine::stateName(), stop_lock_, EcalElecEmulTccFlatFileProducerFromTPG_cfg::tokens, edm::EventProcessor::totalEvents(), edm::EventProcessor::totalEventsPassed(), and evf::prg::trp.

Referenced by startReceivingMonitorLoop().

1362 {
1363  try{
1364  myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master
1365  switch(slave_message_monitoring_->mtype)
1366  {
1367  case MSQM_MESSAGE_TYPE_MCS:
1368  {
1369  xgi::Input *in = 0;
1370  xgi::Output out;
1371  evtProcessor_.microState(in,&out);
1372  MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
1373  strncpy(msg1->mtext,out.str().c_str(),out.str().size());
1374  myProcess_->postSlave(msg1,true);
1375  break;
1376  }
1377 
1378  case MSQM_MESSAGE_TYPE_PRG:
1379  {
1380  xdata::Serializable *dqmp = 0;
1381  xdata::UnsignedInteger32 *dqm = 0;
1383  try{
1384  dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
1385  } catch(xdata::exception::Exception e){}
1386  if(dqmp!=0)
1387  dqm = (xdata::UnsignedInteger32*)dqmp;
1388 
1389  // monitorInfoSpace_->lock();
1390  prg * data = (prg*)slave_message_prr_->mtext;
1391  data->ls = evtProcessor_.lsid_;
1393  data->ps = evtProcessor_.psid_;
1394  data->nbp = evtProcessor_->totalEvents();
1395  data->nba = evtProcessor_->totalEventsPassed();
1396  data->Ms = evtProcessor_.epMAltState_.value_;
1397  data->ms = evtProcessor_.epmAltState_.value_;
1398  if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
1399  data->trp = scalersUpdates_;
1400  // monitorInfoSpace_->unlock();
1402  if(exitOnError_.value_)
1403  {
1404  // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so
1405  // std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
1407  {
1408  bool running = true;
1409  int count = 0;
1410  while(running){
1411  int retval = pthread_mutex_lock(&stop_lock_);
1412  if(retval != 0) perror("error");
1413  running = fsm_.stateName()->toString()=="Enabled";
1414  if(count>5) _exit(-1);
1415  pthread_mutex_unlock(&stop_lock_);
1416  if(running) {::sleep(1); count++;}
1417  }
1418  }
1419 
1420  }
1421  // scalersUpdates_++;
1422  break;
1423  }
1424  case MSQM_MESSAGE_TYPE_WEB:
1425  {
1426  xgi::Input *in = 0;
1427  xgi::Output out;
1428  unsigned int bytesToSend = 0;
1430  std::string query = slave_message_monitoring_->mtext;
1431  size_t pos = query.find_first_of("&");
1432  std::string method;
1433  std::string args;
1434  if(pos!=std::string::npos)
1435  {
1436  method = query.substr(0,pos);
1437  args = query.substr(pos+1,query.length()-pos-1);
1438  }
1439  else
1440  method=query;
1441 
1442  if(method=="Spotlight")
1443  {
1444  spotlightWebPage(in,&out);
1445  }
1446  else if(method=="procStat")
1447  {
1448  procStat(in,&out);
1449  }
1450  else if(method=="moduleWeb")
1451  {
1452  internal::MyCgi mycgi;
1453  boost::char_separator<char> sep(";");
1454  boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
1455  for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
1456  tok_iter != tokens.end(); ++tok_iter){
1457  size_t pos = (*tok_iter).find_first_of("%");
1458  if(pos != std::string::npos){
1459  std::string first = (*tok_iter).substr(0 , pos);
1460  std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
1461  mycgi.getEnvironment()[first]=second;
1462  }
1463  }
1464  moduleWeb(&mycgi,&out);
1465  }
1466  else if(method=="Default")
1467  {
1468  defaultWebPage(in,&out);
1469  }
1470  else
1471  {
1472  out << "Error 404!!!!!!!!" << std::endl;
1473  }
1474 
1475 
1476  bytesToSend = out.str().size();
1477  unsigned int cycle = 0;
1478  if(bytesToSend==0)
1479  {
1480  snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
1481  myProcess_->postSlave(msg1,true);
1482  }
1483  while(bytesToSend !=0){
1484  unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
1485  snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
1486  write(anonymousPipe_[PIPE_WRITE],
1487  out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
1488  msgSize);
1489  myProcess_->postSlave(msg1,true);
1490  bytesToSend -= msgSize;
1491  cycle++;
1492  }
1493  break;
1494  }
1495  case MSQM_MESSAGE_TYPE_TRP:
1496  {
1497  break;
1498  }
1499  }
1500  }
1501  catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
1502  return true;
1503 }
unsigned int ps
Definition: queue_defs.h:58
unsigned long rcvSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:110
void spotlightWebPage(xgi::Input *, xgi::Output *)
unsigned int eols
Definition: queue_defs.h:57
#define Input(cl)
Definition: vmac.h:189
unsigned int nbp
Definition: queue_defs.h:59
xdata::Integer epmAltState_
Definition: FWEPWrapper.h:191
std::map< std::string, std::string, std::less< std::string > > & getEnvironment()
boost::tokenizer< boost::char_separator< char > > tokenizer
unsigned int ls
Definition: queue_defs.h:56
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
void sleep(Duration_t)
Definition: Utils.h:163
xdata::Boolean exitOnError_
U second(std::pair< T, U > const &p)
#define MSQM_MESSAGE_TYPE_PRG
Definition: queue_defs.h:19
#define MSQS_MESSAGE_TYPE_WEB
Definition: queue_defs.h:31
evf::StateMachine fsm_
int cycle
unsigned int lsid_
Definition: FWEPWrapper.h:204
bool monitoring(toolbox::task::WorkLoop *wl)
Definition: FWEPWrapper.cc:567
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:104
unsigned int psid_
Definition: FWEPWrapper.h:205
unsigned int Ms
Definition: queue_defs.h:61
#define NUMERIC_MESSAGE_SIZE
Definition: queue_defs.h:36
bool first
Definition: L1TdeRCT.cc:79
int totalEvents() const
void procStat(xgi::Input *in, xgi::Output *out)
unsigned int nba
Definition: queue_defs.h:60
tuple out
Definition: dbtoconf.py:99
#define MSQM_MESSAGE_TYPE_TRP
Definition: queue_defs.h:21
#define MSQS_MESSAGE_TYPE_MCR
Definition: queue_defs.h:28
xdata::InfoSpace * applicationInfoSpace_
xdata::String * stateName()
Definition: StateMachine.h:69
unsigned int scalersUpdates_
xdata::UnsignedInteger32 lastLumiUsingEol_
Definition: FWEPWrapper.h:216
#define PIPE_WRITE
Definition: queue_defs.h:39
void moduleWeb(xgi::Input *in, xgi::Output *out)
dictionary args
#define Output(cl)
Definition: vmac.h:193
void defaultWebPage(xgi::Input *in, xgi::Output *out)
unsigned int trp
Definition: queue_defs.h:64
tuple query
Definition: o2o.py:269
xdata::Integer epMAltState_
Definition: FWEPWrapper.h:190
tuple cout
Definition: gather_cfg.py:41
#define MSQM_MESSAGE_TYPE_MCS
Definition: queue_defs.h:18
#define MSQM_MESSAGE_TYPE_WEB
Definition: queue_defs.h:20
void microState(xgi::Input *in, xgi::Output *out)
#define MAX_PIPE_BUFFER_SIZE
Definition: queue_defs.h:40
int totalEventsPassed() const
pthread_mutex_t stop_lock_
unsigned int dqm
Definition: queue_defs.h:63
unsigned int ms
Definition: queue_defs.h:62
bool FUEventProcessor::scalers ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1245 of file FUEventProcessor.cc.

References gather_cfg::cout, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), evf::FWEPWrapper::getPackedTriggerReport(), evf::FWEPWrapper::getTriggerReport(), myProcess_, evf::SubProcess::postSlave(), runTheMatrix::ret, scalersUpdates_, and wlScalersActive_.

Referenced by startScalersWorkLoop().

1246 {
1247  if(evtProcessor_)
1248  {
1249  if(!evtProcessor_.getTriggerReport(true)) {
1250  wlScalersActive_ = false;
1251  return false;
1252  }
1253  }
1254  else
1255  {
1256  std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
1257  wlScalersActive_ = false;
1258  return false;
1259  }
1260  if(myProcess_)
1261  {
1262  // std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
1264  if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
1265  scalersUpdates_++;
1266  }
1267  else
1269  return true;
1270 }
MsgBuf & getPackedTriggerReport()
Definition: FWEPWrapper.h:110
bool fireScalersUpdate()
Definition: FWEPWrapper.cc:697
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:104
bool getTriggerReport(bool useLock)
Definition: FWEPWrapper.cc:614
unsigned int scalersUpdates_
tuple cout
Definition: gather_cfg.py:41
void FUEventProcessor::scalersWeb ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 743 of file FUEventProcessor.cc.

References evtProcessor_, evf::FWEPWrapper::getPackedTriggerReportAsStruct(), and dbtoconf::out.

Referenced by FUEventProcessor().

745 {
746 
747  out->getHTTPResponseHeader().addHeader( "Content-Type",
748  "application/octet-stream" );
749  out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
750  "binary" );
751  if(evtProcessor_ != 0){
753  }
754 }
tuple out
Definition: dbtoconf.py:99
TriggerReportStatic * getPackedTriggerReportAsStruct()
Definition: FWEPWrapper.h:111
void FUEventProcessor::sendMessageOverMonitorQueue ( MsgBuf buf)

Definition at line 1881 of file FUEventProcessor.cc.

References myProcess_, and evf::SubProcess::postSlave().

1882 {
1883  if(myProcess_) myProcess_->postSlave(buf,true);
1884 }
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:104
void evf::FUEventProcessor::serviceWeb ( xgi::Input in,
xgi::Output out 
)
inline

Definition at line 108 of file FUEventProcessor.h.

References evtProcessor_, and evf::FWEPWrapper::serviceWeb().

Referenced by FUEventProcessor().

void serviceWeb(xgi::Input *in, xgi::Output *out)
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::spotlightWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 658 of file FUEventProcessor.cc.

References configuration_, evtProcessor_, fsm_, recoMuon::in, myProcess_, nbSubProcesses_, dbtoconf::out, evf::StateMachine::stateName(), evf::FWEPWrapper::summaryWebPage(), and evf::FWEPWrapper::taskWebPage().

Referenced by FUEventProcessor(), and receivingAndMonitor().

660 {
661 
662  std::string urn = getApplicationDescriptor()->getURN();
663 
664  *out << "<!-- base href=\"/" << urn
665  << "\"> -->" << std::endl;
666  *out << "<html>" << std::endl;
667  *out << "<head>" << std::endl;
668  *out << "<link type=\"text/css\" rel=\"stylesheet\"";
669  *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
670  *out << "<title>" << getApplicationDescriptor()->getClassName()
671  << getApplicationDescriptor()->getInstance()
672  << " MAIN</title>" << std::endl;
673  *out << "</head>" << std::endl;
674  *out << "<body>" << std::endl;
675  *out << "<table border=\"0\" width=\"100%\">" << std::endl;
676  *out << "<tr>" << std::endl;
677  *out << " <td align=\"left\">" << std::endl;
678  *out << " <img" << std::endl;
679  *out << " align=\"middle\"" << std::endl;
680  *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
681  *out << " alt=\"main\"" << std::endl;
682  *out << " width=\"64\"" << std::endl;
683  *out << " height=\"64\"" << std::endl;
684  *out << " border=\"\"/>" << std::endl;
685  *out << " <b>" << std::endl;
686  *out << getApplicationDescriptor()->getClassName()
687  << getApplicationDescriptor()->getInstance() << std::endl;
688  *out << " " << fsm_.stateName()->toString() << std::endl;
689  *out << " </b>" << std::endl;
690  *out << " </td>" << std::endl;
691  *out << " <td width=\"32\">" << std::endl;
692  *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
693  *out << " <img" << std::endl;
694  *out << " align=\"middle\"" << std::endl;
695  *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
696  *out << " alt=\"HyperDAQ\"" << std::endl;
697  *out << " width=\"32\"" << std::endl;
698  *out << " height=\"32\"" << std::endl;
699  *out << " border=\"\"/>" << std::endl;
700  *out << " </a>" << std::endl;
701  *out << " </td>" << std::endl;
702  *out << " <td width=\"32\">" << std::endl;
703  *out << " </td>" << std::endl;
704  *out << " <td width=\"32\">" << std::endl;
705  *out << " <a href=\"/" << urn << "/\">" << std::endl;
706  *out << " <img" << std::endl;
707  *out << " align=\"middle\"" << std::endl;
708  *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
709  *out << " alt=\"main\"" << std::endl;
710  *out << " width=\"32\"" << std::endl;
711  *out << " height=\"32\"" << std::endl;
712  *out << " border=\"\"/>" << std::endl;
713  *out << " </a>" << std::endl;
714  *out << " </td>" << std::endl;
715  *out << "</tr>" << std::endl;
716  *out << "</table>" << std::endl;
717 
718  *out << "<hr/>" << std::endl;
719 
720  std::ostringstream ost;
721  if(myProcess_)
722  ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
723  else
724  ost << "/moduleWeb?";
725  urn += ost.str();
726  if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
728  else if(evtProcessor_)
730  else
731  *out << "<td>HLT Unconfigured</td>" << std::endl;
732  *out << "</table>" << std::endl;
733 
734  *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
735  *out << configuration_ << std::endl;
736  *out << "</textarea><P>" << std::endl;
737 
738  *out << "</body>" << std::endl;
739  *out << "</html>" << std::endl;
740 
741 
742 }
void summaryWebPage(xgi::Input *, xgi::Output *, const std::string &)
Definition: FWEPWrapper.cc:735
void taskWebPage(xgi::Input *, xgi::Output *, const std::string &)
Definition: FWEPWrapper.cc:801
evf::StateMachine fsm_
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
void FUEventProcessor::startReceivingLoop ( )
private

Definition at line 856 of file FUEventProcessor.cc.

References asReceiveMsgAndExecute_, edm::hlt::Exception, runTheMatrix::msg, receiving(), receiving_, and wlReceiving_.

Referenced by enableMPEPSlave().

857 {
858  try {
859  wlReceiving_=
860  toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
861  "waiting");
862  if (!wlReceiving_->isActive()) wlReceiving_->activate();
863  asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
864  "Receiving");
866  receiving_ = true;
867  }
868  catch (xcept::Exception& e) {
869  std::string msg = "Failed to start workloop 'Receiving'.";
870  XCEPT_RETHROW(evf::Exception,msg,e);
871  }
872 }
toolbox::task::WorkLoop * wlReceiving_
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
bool receiving(toolbox::task::WorkLoop *wl)
void FUEventProcessor::startReceivingMonitorLoop ( )
private

Definition at line 873 of file FUEventProcessor.cc.

References asReceiveMsgAndRead_, edm::hlt::Exception, runTheMatrix::msg, receivingAndMonitor(), receivingM_, and wlReceivingMonitor_.

Referenced by enableMPEPSlave().

874 {
875  try {
877  toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
878  "waiting");
879  if (!wlReceivingMonitor_->isActive())
880  wlReceivingMonitor_->activate();
882  toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
883  "ReceivingM");
885  receivingM_ = true;
886  }
887  catch (xcept::Exception& e) {
888  std::string msg = "Failed to start workloop 'ReceivingM'.";
889  XCEPT_RETHROW(evf::Exception,msg,e);
890  }
891 }
bool receivingAndMonitor(toolbox::task::WorkLoop *wl)
toolbox::task::ActionSignature * asReceiveMsgAndRead_
toolbox::task::WorkLoop * wlReceivingMonitor_
void FUEventProcessor::startScalersWorkLoop ( )
throw (evf::Exception
)
private

Definition at line 1202 of file FUEventProcessor.cc.

References asScalers_, edm::hlt::Exception, runTheMatrix::msg, scalers(), wlScalers_, and wlScalersActive_.

Referenced by enableMPEPSlave().

1203 {
1204  try {
1205  wlScalers_=
1206  toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
1207  "waiting");
1208  if (!wlScalers_->isActive()) wlScalers_->activate();
1209  asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
1210  "Scalers");
1211 
1212  wlScalers_->submit(asScalers_);
1213  wlScalersActive_ = true;
1214  }
1215  catch (xcept::Exception& e) {
1216  std::string msg = "Failed to start workloop 'Scalers'.";
1217  XCEPT_RETHROW(evf::Exception,msg,e);
1218  }
1219 }
toolbox::task::WorkLoop * wlScalers_
bool scalers(toolbox::task::WorkLoop *wl)
toolbox::task::ActionSignature * asScalers_
void FUEventProcessor::startSummarizeWorkLoop ( )
throw (evf::Exception
)
private

Definition at line 1223 of file FUEventProcessor.cc.

References asSummarize_, edm::hlt::Exception, runTheMatrix::msg, summarize(), wlSummarize_, and wlSummarizeActive_.

Referenced by enabling().

1224 {
1225  try {
1226  wlSummarize_=
1227  toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
1228  "waiting");
1229  if (!wlSummarize_->isActive()) wlSummarize_->activate();
1230 
1231  asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
1232  "Summary");
1233 
1234  wlSummarize_->submit(asSummarize_);
1235  wlSummarizeActive_ = true;
1236  }
1237  catch (xcept::Exception& e) {
1238  std::string msg = "Failed to start workloop 'Summarize'.";
1239  XCEPT_RETHROW(evf::Exception,msg,e);
1240  }
1241 }
bool summarize(toolbox::task::WorkLoop *wl)
toolbox::task::WorkLoop * wlSummarize_
toolbox::task::ActionSignature * asSummarize_
void FUEventProcessor::startSupervisorLoop ( )
private

Definition at line 838 of file FUEventProcessor.cc.

References asSupervisor_, edm::hlt::Exception, runTheMatrix::msg, supervising_, supervisor(), and wlSupervising_.

Referenced by FUEventProcessor().

839 {
840  try {
842  toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
843  "waiting");
844  if (!wlSupervising_->isActive()) wlSupervising_->activate();
845  asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
846  "Supervisor");
847  wlSupervising_->submit(asSupervisor_);
848  supervising_ = true;
849  }
850  catch (xcept::Exception& e) {
851  std::string msg = "Failed to start workloop 'Supervisor'.";
852  XCEPT_RETHROW(evf::Exception,msg,e);
853  }
854 }
toolbox::task::ActionSignature * asSupervisor_
toolbox::task::WorkLoop * wlSupervising_
bool supervisor(toolbox::task::WorkLoop *wl)
bool FUEventProcessor::stopClassic ( )
private

Definition at line 1620 of file FUEventProcessor.cc.

References detachDqmFromShm(), edm::IEventProcessor::epSuccess, edm::IEventProcessor::epTimedOut, evtProcessor_, edm::hlt::Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, localLog(), reasonForFailedState_, and evf::FWEPWrapper::stop().

Referenced by receiving(), and stopping().

1621 {
1622  try {
1623  LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
1626  fsm_.fireEvent("StopDone",this);
1627  else
1628  {
1629  // epMState_ = evtProcessor_->currentStateName();
1631  reasonForFailedState_ = "EventProcessor stop timed out";
1632  else
1633  reasonForFailedState_ = "EventProcessor did not receive STOP event";
1636  }
1638  }
1639  catch (xcept::Exception &e) {
1640  reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
1643  }
1644  LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
1645  localLog("-I- Stop completed");
1646  return false;
1647 }
xdata::Boolean hasShMem_
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
std::string reasonForFailedState_
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
edm::EventProcessor::StatusCode stop()
Definition: FWEPWrapper.cc:441
bool FUEventProcessor::stopping ( toolbox::task::WorkLoop *  wl)

Definition at line 498 of file FUEventProcessor.cc.

References nbSubProcesses_, evf::Vulture::stop(), stopClassic(), stopSlavesAndAcknowledge(), and vulture_.

Referenced by supervisor().

499 {
500  if(nbSubProcesses_.value_!=0)
502  vulture_->stop();
503  return stopClassic();
504 }
int stop()
Definition: Vulture.cc:248
xdata::UnsignedInteger32 nbSubProcesses_
void FUEventProcessor::stopSlavesAndAcknowledge ( )
private

Definition at line 1649 of file FUEventProcessor.cc.

References i, localLog(), MAX_MSG_SIZE, runTheMatrix::msg, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, nbSubProcesses_, reasonForFailedState_, stop_lock_, and subs_.

Referenced by halting(), and stopping().

1650 {
1653 
1654  std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
1655  for(unsigned int i = 0; i < subs_.size(); i++)
1656  {
1657  pthread_mutex_lock(&stop_lock_);
1658  if(subs_[i].alive()>0){
1659  processes_to_stop[i] = true;
1660  subs_[i].post(msg,false);
1661  }
1662  pthread_mutex_unlock(&stop_lock_);
1663  }
1664  for(unsigned int i = 0; i < subs_.size(); i++)
1665  {
1666  pthread_mutex_lock(&stop_lock_);
1667  if(processes_to_stop[i]){
1668  try{
1669  subs_[i].rcv(msg1,false);
1670  }
1671  catch(evf::Exception &e){
1672  std::ostringstream ost;
1673  ost << "failed to get STOP - errno ->" << errno << " " << e.what();
1674  reasonForFailedState_ = ost.str();
1675  LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
1676  // fsm_.fireFailed(reasonForFailedState_,this);
1678  break;
1679  }
1680  }
1681  else {
1682  pthread_mutex_unlock(&stop_lock_);
1683  continue;
1684  }
1685  pthread_mutex_unlock(&stop_lock_);
1686  if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
1687  while(subs_[i].alive()>0) ::usleep(10000);
1688  subs_[i].disconnect();
1689  }
1690  // subs_.clear();
1691 
1692 }
int i
Definition: DBlmapReader.cc:9
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
std::vector< SubProcess > subs_
#define MSQS_MESSAGE_TYPE_STOP
Definition: queue_defs.h:29
std::string reasonForFailedState_
void localLog(std::string)
xdata::UnsignedInteger32 nbSubProcesses_
#define MSQM_MESSAGE_TYPE_STOP
Definition: queue_defs.h:16
pthread_mutex_t stop_lock_
void FUEventProcessor::subWeb ( xgi::Input in,
xgi::Output out 
)

Definition at line 574 of file FUEventProcessor.cc.

References anonymousPipe_, gather_cfg::cout, generateEDF::done, asciidump::els, i, j, python.multivaluedict::map(), MAX_MSG_SIZE, MAX_PIPE_BUFFER_SIZE, mod(), runTheMatrix::msg, MSGQ_MESSAGE_TYPE_RANGE, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_WEB, evf::utils::pid, csv2json::pieces, PIPE_READ, SiPixelLorentzAngle_cfi::read, stor::utils::sleep(), subs_, and superSleepSec_.

Referenced by FUEventProcessor().

575 {
576  using namespace cgicc;
577  pid_t pid = 0;
578  std::ostringstream ost;
579  ost << "&";
580 
581  Cgicc cgi(in);
583  for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
584  mycgi->getEnvironment().begin();
585  mit != mycgi->getEnvironment().end(); mit++)
586  ost << mit->first << "%" << mit->second << ";";
587  std::vector<FormEntry> els = cgi.getElements() ;
588  std::vector<FormEntry> el1;
589  cgi.getElement("method",el1);
590  std::vector<FormEntry> el2;
591  cgi.getElement("process",el2);
592  if(el1.size()!=0) {
593  std::string meth = el1[0].getValue();
594  if(el2.size()!=0) {
595  unsigned int i = 0;
596  std::string mod = el2[0].getValue();
597  pid = atoi(mod.c_str()); // get the process id to be polled
598  for(; i < subs_.size(); i++)
599  if(subs_[i].pid()==pid) break;
600  if(i>=subs_.size()){ //process was not found, let the browser know
601  *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
602  return;
603  }
604  if(subs_[i].alive() != 1){
605  *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
606  return;
607  }
608  MsgBuf msg1(meth.length()+ost.str().length(),MSQM_MESSAGE_TYPE_WEB);
609  strcpy(msg1->mtext,meth.c_str());
610  strcpy(msg1->mtext+meth.length(),ost.str().c_str());
611  subs_[i].post(msg1,true);
612  unsigned int keep_supersleep_original_value = superSleepSec_.value_;
613  superSleepSec_.value_=10*keep_supersleep_original_value;
615  bool done = false;
616  std::vector<char *>pieces;
617  while(!done){
618  unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
619  if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
620  ::sleep(1);
621  continue;
622  }
623  unsigned int nbytes = atoi(msg->mtext);
624  if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
625  char *buf= new char[nbytes];
626  ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
627  if(retval!=nbytes) std::cout
628  << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
629  pieces.push_back(buf);
630  }
631  superSleepSec_.value_=keep_supersleep_original_value;
632  for(unsigned int j = 0; j < pieces.size(); j++){
633  *out<<pieces[j]; // chain the buffers into the output strstream
634  delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
635  }
636  }
637  }
638 }
int i
Definition: DBlmapReader.cc:9
#define MSGQ_MESSAGE_TYPE_RANGE
Definition: queue_defs.h:13
tuple pieces
Definition: csv2json.py:31
tuple els
Definition: asciidump.py:420
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
void sleep(Duration_t)
Definition: Utils.h:163
std::vector< SubProcess > subs_
#define MSQS_MESSAGE_TYPE_WEB
Definition: queue_defs.h:31
#define PIPE_READ
Definition: queue_defs.h:38
int j
Definition: DBlmapReader.cc:9
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 superSleepSec_
tuple cout
Definition: gather_cfg.py:41
#define MSQM_MESSAGE_TYPE_WEB
Definition: queue_defs.h:20
#define MAX_PIPE_BUFFER_SIZE
Definition: queue_defs.h:40
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
bool FUEventProcessor::summarize ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1273 of file FUEventProcessor.cc.

References gather_cfg::cout, cpustat_, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), fsm_, evf::FWEPWrapper::getLumiSectionReferenceIndex(), evf::FWEPWrapper::getPackedTriggerReportAsStruct(), i, iDieStatisticsGathering_, evf::TriggerReportStatic::lumiSection, master_message_trr_, MSQS_MESSAGE_TYPE_TRR, nbSubProcessesReporting_, ratestat_, evf::CPUStat::reset(), evf::FWEPWrapper::resetPackedTriggerReport(), runTheMatrix::ret, evf::RateStat::sendStat(), evf::CPUStat::sendStat(), stor::utils::sleep(), evf::StateMachine::stateName(), subs_, evf::FWEPWrapper::sumAndPackTriggerReport(), evf::FWEPWrapper::updateRollingReport(), evf::FWEPWrapper::withdrawLumiSectionIncrement(), and wlScalersActive_.

Referenced by startSummarizeWorkLoop().

1274 {
1276  bool atLeastOneProcessUpdatedSuccessfully = false;
1277  int msgCount = 0;
1278  for (unsigned int i = 0; i < subs_.size(); i++)
1279  {
1280  if(subs_[i].alive()>0)
1281  {
1282  int ret = 0;
1283  if(subs_[i].check_postponed_trigger_update(master_message_trr_,
1285  {
1286  ret = MSQS_MESSAGE_TYPE_TRR;
1287  std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
1288  }
1289  else{
1290  bool insync = false;
1291  bool exception_caught = false;
1292  while(!insync){
1293  try{
1294  ret = subs_[i].rcv(master_message_trr_,false);
1295  }
1296  catch(evf::Exception &e)
1297  {
1298  std::cout << "exception in msgrcv on " << i
1299  << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
1300  exception_caught = true;
1301  break;
1302  //do nothing special
1303  }
1304  if(ret==MSQS_MESSAGE_TYPE_TRR) {
1307  insync = true;
1308  }
1309  }
1310  }
1311  if(exception_caught) continue;
1312  }
1313  msgCount++;
1314  if(ret==MSQS_MESSAGE_TYPE_TRR) {
1317  std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
1318  << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
1319  subs_[i].add_postponed_trigger_update(master_message_trr_);
1320  }else{
1321  atLeastOneProcessUpdatedSuccessfully = true;
1323  }
1324  }
1325  else std::cout << "msgrcv returned error " << errno << std::endl;
1326  }
1327  }
1328  if(atLeastOneProcessUpdatedSuccessfully){
1329  nbSubProcessesReporting_.value_ = msgCount;
1332  }
1333  else{
1334  LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
1335  if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
1336  nbSubProcessesReporting_.value_ = 0;
1337  ::sleep(10);
1338  }
1339  if(fsm_.stateName()->toString()!="Enabled"){
1340  wlScalersActive_ = false;
1341  return false;
1342  }
1343  // cpustat_->printStat();
1344  if(iDieStatisticsGathering_.value_){
1345  try{
1348  sizeof(TriggerReportStatic),
1350  }catch(evf::Exception &e){
1351  LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
1352  << e.what());
1353  }
1354  }
1355  cpustat_->reset();
1356  return true;
1357 }
int i
Definition: DBlmapReader.cc:9
void updateRollingReport()
#define MSQS_MESSAGE_TYPE_TRR
Definition: queue_defs.h:32
xdata::UnsignedInteger32 nbSubProcessesReporting_
void sleep(Duration_t)
Definition: Utils.h:163
std::vector< SubProcess > subs_
bool fireScalersUpdate()
Definition: FWEPWrapper.cc:697
evf::StateMachine fsm_
xdata::Boolean iDieStatisticsGathering_
void resetPackedTriggerReport()
Definition: FWEPWrapper.h:107
void sendStat(unsigned int)
Definition: CPUStat.cc:21
void reset()
Definition: CPUStat.h:21
xdata::String * stateName()
Definition: StateMachine.h:69
unsigned int getLumiSectionReferenceIndex()
Definition: FWEPWrapper.h:135
tuple cout
Definition: gather_cfg.py:41
void withdrawLumiSectionIncrement()
Definition: FWEPWrapper.h:134
void sumAndPackTriggerReport(MsgBuf &buf)
TriggerReportStatic * getPackedTriggerReportAsStruct()
Definition: FWEPWrapper.h:111
void sendStat(const unsigned char *, size_t, unsigned int)
Definition: RateStat.cc:19
bool FUEventProcessor::supervisor ( toolbox::task::WorkLoop *  wl)
private

Definition at line 932 of file FUEventProcessor.cc.

References evf::CPUStat::addEntry(), evf::FWEPWrapper::adjustLsIndexForRestart(), applicationInfoSpace_, autoRestartSlaves_, gather_cfg::cout, cpustat_, evf::StateMachine::disableRcmsStateNotification(), evf::prg::dqm, enableMPEPSlave(), evtProcessor_, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, spr::find(), evf::StateMachine::fireEvent(), fsm_, i, localLog(), log_, evf::prg::ls, python.rootplot.utilities::ls(), master_message_prg_, master_message_prr_, evf::FWEPWrapper::moduleNameFromIndex(), monitorInfoSpace_, evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_FSTOP, myProcess_, names_, nbAccepted, nbdead_, nblive_, nbProcessed, nbSubProcesses_, nbTotalDQM_, evf::FWEPWrapper::notstarted_state_code(), NUMERIC_MESSAGE_SIZE, L1TEmulatorMonitor_cff::p, pickup_lock_, evf::utils::pid, evf::prg::ps, evf::FWEPWrapper::resetPackedTriggerReport(), scalersUpdates_, edm::event_processor::sError, edm::event_processor::sInit, edm::event_processor::sInvalid, slaveRestartDelaySecs_, stor::utils::sleep(), spMStates_, spmStates_, edm::event_processor::sStopping, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), stop_lock_, stopping(), subs_, superSleepSec_, and evf::prg::trp.

Referenced by startSupervisorLoop().

933 {
934  pthread_mutex_lock(&stop_lock_);
935  if(subs_.size()!=nbSubProcesses_.value_)
936  {
937  subs_.resize(nbSubProcesses_.value_);
938  spMStates_.resize(nbSubProcesses_.value_);
939  spmStates_.resize(nbSubProcesses_.value_);
940  for(unsigned int i = 0; i < spMStates_.size(); i++)
941  {
943  spmStates_[i] = 0;
944  }
945  }
946  bool running = fsm_.stateName()->toString()=="Enabled";
947  bool stopping = fsm_.stateName()->toString()=="stopping";
948  for(unsigned int i = 0; i < subs_.size(); i++)
949  {
950  if(subs_[i].alive()==-1000) continue;
951  int sl;
952 
953  pid_t killedOrNot = waitpid(subs_[i].pid(),&sl,WNOHANG);
954 
955  if(killedOrNot==subs_[i].pid()) subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
956  else continue;
957  pthread_mutex_lock(&pickup_lock_);
958  std::ostringstream ost;
959  if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
960  else if(WIFSIGNALED(sl)!=0) ost << " process terminated with signal " << WTERMSIG(sl);
961  else ost << " process stopped ";
962  subs_[i].countdown()=slaveRestartDelaySecs_.value_;
963  subs_[i].setReasonForFailed(ost.str());
965  spmStates_[i] = 0;
966  std::ostringstream ost1;
967  ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
968  localLog(ost1.str());
969  if(!autoRestartSlaves_.value_) subs_[i].disconnect();
970  pthread_mutex_unlock(&pickup_lock_);
971  }
972  pthread_mutex_unlock(&stop_lock_);
973  if(stopping) return true; // if in stopping we are done
974 
975  if(running)
976  {
977  // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
978  // this is only active while running, hence, the stop lock is acquired and only released at end of loop
979  if(autoRestartSlaves_.value_){
980  pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
981  for(unsigned int i = 0; i < subs_.size(); i++)
982  {
983  if(subs_[i].alive() != 1){
984  if(subs_[i].countdown() == 0)
985  {
986  if(subs_[i].restartCount()>2){
987  LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
988  << " - maximum restart count reached");
989  std::ostringstream ost1;
990  ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
991  localLog(ost1.str());
992  subs_[i].countdown()--;
993  XCEPT_DECLARE(evf::Exception,
994  sentinelException, ost1.str());
995  notifyQualified("error",sentinelException);
996  subs_[i].disconnect();
997  continue;
998  }
999  subs_[i].restartCount()++;
1000  pid_t rr = subs_[i].forkNew();
1001  if(rr==0)
1002  {
1003  myProcess_=&subs_[i];
1004  scalersUpdates_ = 0;
1005  int retval = pthread_mutex_destroy(&stop_lock_);
1006  if(retval != 0) perror("error");
1007  retval = pthread_mutex_init(&stop_lock_,0);
1008  if(retval != 0) perror("error");
1010  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
1011  fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
1012  fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
1013  try{
1014  xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
1015  if(lsid) {
1016  ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
1017  }
1018  }
1019  catch(...){
1020  std::cout << "trouble with lsindex during restart" << std::endl;
1021  }
1022  try{
1023  xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
1024  if(lstb) {
1025  ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
1026  }
1027  }
1028  catch(...){
1029  std::cout << "trouble with resetting flag for eol recovery " << std::endl;
1030  }
1031 
1034  enableMPEPSlave();
1035  return false; // exit the supervisor loop immediately in the child !!!
1036  }
1037  else
1038  {
1039  std::ostringstream ost1;
1040  ost1 << "-I- New Process " << rr << " forked for slot " << i;
1041  localLog(ost1.str());
1042  }
1043  }
1044  if(subs_[i].countdown()>=0) subs_[i].countdown()--;
1045  }
1046  }
1047  pthread_mutex_unlock(&stop_lock_);
1048  } // finished handling replacement of dead slaves once they've been reaped
1049  }
1050  xdata::Serializable *lsid = 0;
1051  xdata::Serializable *psid = 0;
1052  xdata::Serializable *epMAltState = 0;
1053  xdata::Serializable *epmAltState = 0;
1054  xdata::Serializable *dqmp = 0;
1055  xdata::UnsignedInteger32 *dqm = 0;
1056 
1057 
1058 
1059  if(running){
1060  try{
1061  lsid = applicationInfoSpace_->find("lumiSectionIndex");
1062  psid = applicationInfoSpace_->find("prescaleSetIndex");
1063  nbProcessed = monitorInfoSpace_->find("nbProcessed");
1064  nbAccepted = monitorInfoSpace_->find("nbAccepted");
1065  epMAltState = monitorInfoSpace_->find("epSPMacroStateInt");
1066  epmAltState = monitorInfoSpace_->find("epSPMicroStateInt");
1067  dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
1068  }
1069  catch(xdata::exception::Exception e){
1070  LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
1071  }
1072 
1073  try{
1074  if(nbProcessed !=0 && nbAccepted !=0)
1075  {
1076  xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
1077  xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
1078  xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
1079  xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
1080  if(dqmp!=0)
1081  dqm = (xdata::UnsignedInteger32*)dqmp;
1082  if(dqm) dqm->value_ = 0;
1083  nbTotalDQM_ = 0;
1084  nbp->value_ = 0;
1085  nba->value_ = 0;
1086  nblive_ = 0;
1087  nbdead_ = 0;
1088  scalersUpdates_ = 0;
1089 
1090  for(unsigned int i = 0; i < subs_.size(); i++)
1091  {
1092  if(subs_[i].alive()>0)
1093  {
1094  nblive_++;
1095  try{
1096  subs_[i].post(master_message_prg_,true);
1097 
1098  unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
1099  if(retval == (unsigned long) master_message_prr_->mtype){
1100  prg* p = (struct prg*)(master_message_prr_->mtext);
1101  subs_[i].setParams(p);
1102  spMStates_[i] = p->Ms;
1103  spmStates_[i] = p->ms;
1104  cpustat_->addEntry(p->ms);
1105  if(!subs_[i].inInconsistentState() &&
1109  {
1110  std::ostringstream ost;
1111  ost << "edm::eventprocessor slot " << i << " process id "
1112  << subs_[i].pid() << " not in Running state : Mstate="
1113  << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
1115  << " - Look into possible error messages from HLT process";
1116  LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
1117  }
1118  nbp->value_ += subs_[i].params().nbp;
1119  nba->value_ += subs_[i].params().nba;
1120  if(dqm)dqm->value_ += p->dqm;
1121  nbTotalDQM_ += p->dqm;
1122  scalersUpdates_ += p->trp;
1123  if(p->ls > ls->value_) ls->value_ = p->ls;
1124  if(p->ps != ps->value_) ps->value_ = p->ps;
1125  }
1126  else{
1127  nbp->value_ += subs_[i].get_save_nbp();
1128  nba->value_ += subs_[i].get_save_nba();
1129  }
1130  }
1131  catch(evf::Exception &e){
1132  LOG4CPLUS_INFO(getApplicationLogger(),
1133  "could not send/receive msg on slot "
1134  << i << " - " << e.what());
1135  }
1136 
1137  }
1138  else
1139  {
1140  nbp->value_ += subs_[i].get_save_nbp();
1141  nba->value_ += subs_[i].get_save_nba();
1142  nbdead_++;
1143  }
1144  }
1145  if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells)
1146  for(unsigned int i = 0; i < subs_.size(); i++)
1147  {
1148  if(subs_[i].params().nbp == 0){ // a slave has processed 0 events
1149  // check that the process is not stuck
1150  if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0)
1151  {
1152  subs_[i].found_invalid();//increase the "found_invalid" counter
1153  if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave
1154  MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP); // send a force-stop signal
1155  subs_[i].post(msg3,false);
1156  std::ostringstream ost1;
1157  ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it";
1158  localLog(ost1.str());
1159  LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());
1160  XCEPT_DECLARE(evf::Exception,
1161  sentinelException, ost1.str());
1162  notifyQualified("error",sentinelException);
1163 
1164  }
1165  }
1166  }
1167  }
1168  }
1169  }
1170  }
1171  catch(std::exception &e){
1172  LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
1173  }
1174  catch(...){
1175  LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
1176  }
1177  }
1178  else{
1179  for(unsigned int i = 0; i < subs_.size(); i++)
1180  {
1181  if(subs_[i].alive()==-1000)
1182  {
1184  spmStates_[i] = 0;
1185  }
1186  }
1187  }
1188  try{
1189  monitorInfoSpace_->lock();
1190  monitorInfoSpace_->fireItemGroupChanged(names_,0);
1191  monitorInfoSpace_->unlock();
1192  }
1193  catch(xdata::exception::Exception &e)
1194  {
1195  LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
1196  // localLog(e.what());
1197  }
1198  ::sleep(superSleepSec_.value_);
1199  return true;
1200 }
unsigned int ps
Definition: queue_defs.h:58
xdata::Vector< xdata::Integer > spMStates_
int i
Definition: DBlmapReader.cc:9
xdata::InfoSpace * monitorInfoSpace_
xdata::UnsignedInteger32 slaveRestartDelaySecs_
#define MSQM_MESSAGE_TYPE_FSTOP
Definition: queue_defs.h:17
unsigned int ls
Definition: queue_defs.h:56
xdata::Boolean autoRestartSlaves_
std::string const & moduleNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:118
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
void sleep(Duration_t)
Definition: Utils.h:163
std::vector< SubProcess > subs_
xdata::Serializable * nbProcessed
void adjustLsIndexForRestart()
Definition: FWEPWrapper.h:108
evf::StateMachine fsm_
unsigned int Ms
Definition: queue_defs.h:61
void resetPackedTriggerReport()
Definition: FWEPWrapper.h:107
std::list< std::string > names_
void addEntry(int sta)
Definition: CPUStat.h:16
#define NUMERIC_MESSAGE_SIZE
Definition: queue_defs.h:36
bool stopping(toolbox::task::WorkLoop *wl)
void fireEvent(const std::string &evtType, void *originator)
xdata::Serializable * nbAccepted
void disableRcmsStateNotification()
Definition: StateMachine.h:64
void localLog(std::string)
int notstarted_state_code() const
Definition: FWEPWrapper.h:128
xdata::InfoSpace * applicationInfoSpace_
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
unsigned int scalersUpdates_
xdata::UnsignedInteger32 superSleepSec_
std::string const & stateNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:123
pthread_mutex_t pickup_lock_
unsigned int trp
Definition: queue_defs.h:64
xdata::Vector< xdata::Integer > spmStates_
tuple cout
Definition: gather_cfg.py:41
pthread_mutex_t stop_lock_
unsigned int dqm
Definition: queue_defs.h:63
unsigned int ms
Definition: queue_defs.h:62
void FUEventProcessor::updater ( xgi::Input in,
xgi::Output out 
)

Definition at line 1805 of file FUEventProcessor.cc.

References evf::lsTriplet::acc, evf::utils::cDiv(), configString_, evtProcessor_, fsm_, evf::Vulture::hasStarted(), i, iDieUrl_, evf::FWEPWrapper::lastLumi(), logRing_, logRingIndex_, logRingSize_, logWrap_, evf::lsTriplet::ls, evf::utils::mDiv(), myProcess_, nbAccepted, nbProcessed, nbSubProcesses_, nbSubProcessesReporting_, evf::lsTriplet::proc, runNumber_, squidPresent_, evf::StateMachine::stateName(), supervising_, updaterStatic_, evf::utils::uptime(), vp_, vulture_, evf::FWEPWrapper::wlMonitoring(), wlScalersActive_, and wlSummarizeActive_.

Referenced by FUEventProcessor().

1806 {
1807  using namespace utils;
1808 
1809  *out << updaterStatic_;
1810  mDiv(out,"loads");
1811  uptime(out);
1812  cDiv(out);
1813  mDiv(out,"st",fsm_.stateName()->toString());
1814  mDiv(out,"ru",runNumber_.toString());
1815  mDiv(out,"nsl",nbSubProcesses_.value_);
1816  mDiv(out,"nsr",nbSubProcessesReporting_.value_);
1817  mDiv(out,"cl");
1818  *out << getApplicationDescriptor()->getClassName()
1819  << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
1820  cDiv(out);
1821  mDiv(out,"in",getApplicationDescriptor()->getInstance());
1822  if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
1823  mDiv(out,"hlt");
1824  *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
1825  cDiv(out);
1826  *out << std::endl;
1827  }
1828  else
1829  mDiv(out,"hlt","Not yet...");
1830 
1831  mDiv(out,"sq",squidPresent_.toString());
1832  mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
1834  if(nbProcessed != 0 && nbAccepted != 0)
1835  {
1836  mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
1837  mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
1838  }
1839  else
1840  {
1841  mDiv(out,"tt",0);
1842  mDiv(out,"ac",0);
1843  }
1844  if(!myProcess_)
1845  mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
1846  else
1847  mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
1848 
1849  mDiv(out,"idi",iDieUrl_.value_);
1850  if(vp_!=0){
1851  mDiv(out,"vpi",(unsigned int) vp_);
1852  if(vulture_->hasStarted()>=0)
1853  mDiv(out,"vul","Prowling");
1854  else
1855  mDiv(out,"vul","Dead");
1856  }
1857  else{
1858  mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
1859  }
1860  if(evtProcessor_){
1861  mDiv(out,"ll");
1863  << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
1864  cDiv(out);
1865  }
1866  mDiv(out,"lg");
1867  for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
1868  *out << logRing_[i] << std::endl;
1869  if(logWrap_)
1870  for(unsigned int i = 0; i<logRingIndex_; i++)
1871  *out << logRing_[i] << std::endl;
1872  cDiv(out);
1873 
1874 }
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > logRing_
xdata::UnsignedInteger32 runNumber_
unsigned int acc
Definition: FWEPWrapper.h:42
xdata::UnsignedInteger32 nbSubProcessesReporting_
void cDiv(std::ostringstream *out)
Definition: procUtils.cc:338
lsTriplet & lastLumi()
Definition: FWEPWrapper.h:129
xdata::Serializable * nbProcessed
std::string wlMonitoring()
Definition: FWEPWrapper.h:94
evf::StateMachine fsm_
unsigned int proc
Definition: FWEPWrapper.h:41
xdata::Boolean squidPresent_
xdata::Serializable * nbAccepted
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
xdata::String configString_
unsigned int ls
Definition: FWEPWrapper.h:40
int hasStarted()
Definition: Vulture.cc:209
void mDiv(std::ostringstream *out, std::string name)
Definition: procUtils.cc:335
void uptime(std::ostringstream *out)
Definition: procUtils.cc:290
static const unsigned int logRingSize_
evf::FUEventProcessor::XDAQ_INSTANTIATOR ( )

Member Data Documentation

int evf::FUEventProcessor::anonymousPipe_[2]
private

Definition at line 244 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), receivingAndMonitor(), and subWeb().

xdata::InfoSpace* evf::FUEventProcessor::applicationInfoSpace_
private
toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndExecute_
private

Definition at line 210 of file FUEventProcessor.h.

Referenced by startReceivingLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndRead_
private

Definition at line 213 of file FUEventProcessor.h.

Referenced by startReceivingMonitorLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asScalers_
private

Definition at line 236 of file FUEventProcessor.h.

Referenced by startScalersWorkLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asSummarize_
private

Definition at line 242 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asSupervisor_
private

Definition at line 217 of file FUEventProcessor.h.

Referenced by startSupervisorLoop().

xdata::Boolean evf::FUEventProcessor::autoRestartSlaves_
private

Definition at line 167 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), microState(), and supervisor().

xdata::String evf::FUEventProcessor::class_
private

Definition at line 158 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::String evf::FUEventProcessor::configString_
private

Definition at line 162 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and updater().

std::string evf::FUEventProcessor::configuration_
private

Definition at line 163 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and spotlightWebPage().

CPUStat* evf::FUEventProcessor::cpustat_
private

Definition at line 252 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), summarize(), and supervisor().

Css evf::FUEventProcessor::css_
private

Definition at line 189 of file FUEventProcessor.h.

Referenced by css().

xdata::Boolean evf::FUEventProcessor::epInitialized_
private

Definition at line 161 of file FUEventProcessor.h.

Referenced by actionPerformed(), configuring(), enabling(), and FUEventProcessor().

FWEPWrapper evf::FUEventProcessor::evtProcessor_
private
xdata::Boolean evf::FUEventProcessor::exitOnError_
private

Definition at line 186 of file FUEventProcessor.h.

Referenced by receivingAndMonitor().

evf::StateMachine evf::FUEventProcessor::fsm_
private
xdata::Boolean evf::FUEventProcessor::hasModuleWebRegistry_
private

Definition at line 172 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().

xdata::Boolean evf::FUEventProcessor::hasPrescaleService_
private

Definition at line 171 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and FUEventProcessor().

xdata::Boolean evf::FUEventProcessor::hasServiceWebRegistry_
private

Definition at line 173 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().

xdata::Boolean evf::FUEventProcessor::hasShMem_
private

Definition at line 170 of file FUEventProcessor.h.

Referenced by enableCommon(), FUEventProcessor(), makeStaticInfo(), and stopClassic().

xdata::Boolean evf::FUEventProcessor::iDieStatisticsGathering_
private

Definition at line 175 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and summarize().

xdata::String evf::FUEventProcessor::iDieUrl_
private

Definition at line 249 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and updater().

xdata::UnsignedInteger32 evf::FUEventProcessor::instance_
private

Definition at line 159 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and FUEventProcessor().

xdata::Boolean evf::FUEventProcessor::isRunNumberSetter_
private

Definition at line 174 of file FUEventProcessor.h.

Referenced by enableCommon(), and FUEventProcessor().

Logger evf::FUEventProcessor::log_
private

Definition at line 151 of file FUEventProcessor.h.

Referenced by supervisor().

std::vector<std::string> evf::FUEventProcessor::logRing_
private

Definition at line 195 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

unsigned int evf::FUEventProcessor::logRingIndex_
private

Definition at line 196 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

const unsigned int evf::FUEventProcessor::logRingSize_ = 50
staticprivate

Definition at line 197 of file FUEventProcessor.h.

Referenced by localLog(), and updater().

bool evf::FUEventProcessor::logWrap_
private

Definition at line 198 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

MsgBuf evf::FUEventProcessor::master_message_prg_
private

Definition at line 255 of file FUEventProcessor.h.

Referenced by supervisor().

MsgBuf evf::FUEventProcessor::master_message_prr_
private

Definition at line 256 of file FUEventProcessor.h.

Referenced by supervisor().

MsgBuf evf::FUEventProcessor::master_message_trr_
private

Definition at line 259 of file FUEventProcessor.h.

Referenced by summarize().

xdata::InfoSpace* evf::FUEventProcessor::monitorInfoSpace_
private

Definition at line 220 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

xdata::InfoSpace* evf::FUEventProcessor::monitorLegendaInfoSpace_
private

Definition at line 221 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

SubProcess* evf::FUEventProcessor::myProcess_
private
std::list<std::string> evf::FUEventProcessor::names_
private

Definition at line 248 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

xdata::Serializable* evf::FUEventProcessor::nbAccepted
private

Definition at line 228 of file FUEventProcessor.h.

Referenced by supervisor(), and updater().

unsigned int evf::FUEventProcessor::nbdead_
private

Definition at line 204 of file FUEventProcessor.h.

Referenced by microState(), and supervisor().

unsigned int evf::FUEventProcessor::nblive_
private

Definition at line 203 of file FUEventProcessor.h.

Referenced by microState(), and supervisor().

xdata::Serializable* evf::FUEventProcessor::nbProcessed
private

Definition at line 227 of file FUEventProcessor.h.

Referenced by supervisor(), and updater().

xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcesses_
private
xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcessesReporting_
private

Definition at line 201 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), summarize(), and updater().

unsigned int evf::FUEventProcessor::nbTotalDQM_
private

Definition at line 206 of file FUEventProcessor.h.

Referenced by enabling(), microState(), and supervisor().

bool evf::FUEventProcessor::outprev_
private

Definition at line 176 of file FUEventProcessor.h.

Referenced by actionPerformed().

xdata::Boolean evf::FUEventProcessor::outPut_
private

Definition at line 165 of file FUEventProcessor.h.

Referenced by actionPerformed(), FUEventProcessor(), and makeStaticInfo().

pthread_mutex_t evf::FUEventProcessor::pickup_lock_
private

Definition at line 225 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), microState(), and supervisor().

RateStat* evf::FUEventProcessor::ratestat_
private

Definition at line 253 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and summarize().

std::string evf::FUEventProcessor::reasonForFailedState_
private
bool evf::FUEventProcessor::receiving_
private

Definition at line 211 of file FUEventProcessor.h.

Referenced by startReceivingLoop().

bool evf::FUEventProcessor::receivingM_
private

Definition at line 214 of file FUEventProcessor.h.

Referenced by startReceivingMonitorLoop().

xdata::UnsignedInteger32 evf::FUEventProcessor::runNumber_
private

Definition at line 160 of file FUEventProcessor.h.

Referenced by enableCommon(), enabling(), FUEventProcessor(), and updater().

xdata::InfoSpace* evf::FUEventProcessor::scalersInfoSpace_
private

Definition at line 231 of file FUEventProcessor.h.

Referenced by enabling(), and FUEventProcessor().

xdata::InfoSpace* evf::FUEventProcessor::scalersLegendaInfoSpace_
private

Definition at line 232 of file FUEventProcessor.h.

Referenced by configuring(), FUEventProcessor(), and pathNames().

unsigned int evf::FUEventProcessor::scalersUpdates_
private

Definition at line 238 of file FUEventProcessor.h.

Referenced by enabling(), receivingAndMonitor(), scalers(), and supervisor().

MsgBuf evf::FUEventProcessor::slave_message_monitoring_
private

Definition at line 258 of file FUEventProcessor.h.

Referenced by receivingAndMonitor().

MsgBuf evf::FUEventProcessor::slave_message_prr_
private

Definition at line 257 of file FUEventProcessor.h.

Referenced by receivingAndMonitor().

xdata::UnsignedInteger32 evf::FUEventProcessor::slaveRestartDelaySecs_
private

Definition at line 168 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

std::string evf::FUEventProcessor::sourceId_
private

Definition at line 179 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::Vector<xdata::Integer> evf::FUEventProcessor::spMStates_
private

Definition at line 245 of file FUEventProcessor.h.

Referenced by configuring(), FUEventProcessor(), and supervisor().

xdata::Vector<xdata::Integer> evf::FUEventProcessor::spmStates_
private

Definition at line 246 of file FUEventProcessor.h.

Referenced by configuring(), FUEventProcessor(), and supervisor().

SquidNet evf::FUEventProcessor::squidnet_
private

Definition at line 194 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::Boolean evf::FUEventProcessor::squidPresent_
private

Definition at line 182 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and updater().

pthread_mutex_t evf::FUEventProcessor::start_lock_
private

Definition at line 224 of file FUEventProcessor.h.

Referenced by enabling(), FUEventProcessor(), and microState().

pthread_mutex_t evf::FUEventProcessor::stop_lock_
private
std::vector<SubProcess> evf::FUEventProcessor::subs_
private
xdata::UnsignedInteger32 evf::FUEventProcessor::superSleepSec_
private

Definition at line 247 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), subWeb(), and supervisor().

bool evf::FUEventProcessor::supervising_
private

Definition at line 218 of file FUEventProcessor.h.

Referenced by startSupervisorLoop(), and updater().

std::string evf::FUEventProcessor::updaterStatic_
private

Definition at line 226 of file FUEventProcessor.h.

Referenced by makeStaticInfo(), and updater().

xdata::String evf::FUEventProcessor::url_
private

Definition at line 157 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

pid_t evf::FUEventProcessor::vp_
private

Definition at line 251 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and updater().

Vulture* evf::FUEventProcessor::vulture_
private
toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceiving_
private

Definition at line 209 of file FUEventProcessor.h.

Referenced by startReceivingLoop().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceivingMonitor_
private

Definition at line 212 of file FUEventProcessor.h.

Referenced by startReceivingMonitorLoop().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlScalers_
private

Definition at line 235 of file FUEventProcessor.h.

Referenced by startScalersWorkLoop().

bool evf::FUEventProcessor::wlScalersActive_
private

Definition at line 237 of file FUEventProcessor.h.

Referenced by scalers(), startScalersWorkLoop(), summarize(), and updater().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlSummarize_
private

Definition at line 241 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop().

bool evf::FUEventProcessor::wlSummarizeActive_
private

Definition at line 243 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop(), and updater().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlSupervising_
private

Definition at line 216 of file FUEventProcessor.h.

Referenced by startSupervisorLoop().