CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes
evf::FUEventProcessor Class Reference

#include <FUEventProcessor.h>

Inheritance diagram for evf::FUEventProcessor:

Public Member Functions

void actionPerformed (xdata::Event &e)
 
bool configuring (toolbox::task::WorkLoop *wl)
 
void css (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void defaultWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
bool enabling (toolbox::task::WorkLoop *wl)
 
xoap::MessageReference fsmCallback (xoap::MessageReference msg) throw (xoap::exception::Exception)
 
 FUEventProcessor (xdaq::ApplicationStub *s)
 
bool halting (toolbox::task::WorkLoop *wl)
 
void microState (xgi::Input *in, xgi::Output *out)
 
void moduleWeb (xgi::Input *in, xgi::Output *out)
 
void pathNames (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
 
void procStat (xgi::Input *in, xgi::Output *out)
 
void scalersWeb (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
 
void sendMessageOverMonitorQueue (MsgBuf &)
 
void serviceWeb (xgi::Input *in, xgi::Output *out)
 
void spotlightWebPage (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
 
bool stopping (toolbox::task::WorkLoop *wl)
 
void subWeb (xgi::Input *in, xgi::Output *out)
 
void updater (xgi::Input *in, xgi::Output *out)
 
 XDAQ_INSTANTIATOR ()
 
virtual ~FUEventProcessor ()
 

Private Member Functions

void attachDqmToShm () throw (evf::Exception)
 
void detachDqmFromShm () throw (evf::Exception)
 
bool enableClassic ()
 
bool enableCommon ()
 
bool enableMPEPSlave ()
 
void localLog (std::string)
 
std::string logsAsString ()
 
void makeStaticInfo ()
 
bool receiving (toolbox::task::WorkLoop *wl)
 
bool receivingAndMonitor (toolbox::task::WorkLoop *wl)
 
bool scalers (toolbox::task::WorkLoop *wl)
 
void startReceivingLoop ()
 
void startReceivingMonitorLoop ()
 
void startScalersWorkLoop () throw (evf::Exception)
 
void startSummarizeWorkLoop () throw (evf::Exception)
 
void startSupervisorLoop ()
 
bool stopClassic ()
 
void stopSlavesAndAcknowledge ()
 
bool summarize (toolbox::task::WorkLoop *wl)
 
bool supervisor (toolbox::task::WorkLoop *wl)
 

Private Attributes

int anonymousPipe_ [2]
 
xdata::InfoSpace * applicationInfoSpace_
 
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
 
toolbox::task::ActionSignature * asReceiveMsgAndRead_
 
toolbox::task::ActionSignature * asScalers_
 
toolbox::task::ActionSignature * asSummarize_
 
toolbox::task::ActionSignature * asSupervisor_
 
xdata::Boolean autoRestartSlaves_
 
xdata::String class_
 
xdata::String configString_
 
std::string configuration_
 
CPUStatcpustat_
 
Css css_
 
xdata::Boolean epInitialized_
 
FWEPWrapper evtProcessor_
 
xdata::Boolean exitOnError_
 
evf::StateMachine fsm_
 
xdata::Boolean hasModuleWebRegistry_
 
xdata::Boolean hasPrescaleService_
 
xdata::Boolean hasServiceWebRegistry_
 
xdata::Boolean hasShMem_
 
xdata::Boolean iDieStatisticsGathering_
 
xdata::String iDieUrl_
 
xdata::UnsignedInteger32 instance_
 
xdata::Boolean isRunNumberSetter_
 
Logger log_
 
std::vector< std::string > logRing_
 
unsigned int logRingIndex_
 
bool logWrap_
 
xdata::InfoSpace * monitorInfoSpace_
 
xdata::InfoSpace * monitorLegendaInfoSpace_
 
SubProcessmyProcess_
 
std::list< std::string > names_
 
xdata::Serializable * nbAccepted
 
unsigned int nbdead_
 
unsigned int nblive_
 
xdata::Serializable * nbProcessed
 
xdata::UnsignedInteger32 nbSubProcesses_
 
xdata::UnsignedInteger32 nbSubProcessesReporting_
 
unsigned int nbTotalDQM_
 
bool outprev_
 
xdata::Boolean outPut_
 
pthread_mutex_t pickup_lock_
 
RateStatratestat_
 
std::string reasonForFailedState_
 
bool receiving_
 
bool receivingM_
 
xdata::UnsignedInteger32 runNumber_
 
xdata::InfoSpace * scalersInfoSpace_
 
xdata::InfoSpace * scalersLegendaInfoSpace_
 
unsigned int scalersUpdates_
 
xdata::UnsignedInteger32 slaveRestartDelaySecs_
 
std::string sourceId_
 
xdata::Vector< xdata::Integer > spMStates_
 
xdata::Vector< xdata::Integer > spmStates_
 
SquidNet squidnet_
 
xdata::Boolean squidPresent_
 
pthread_mutex_t start_lock_
 
pthread_mutex_t stop_lock_
 
std::vector< SubProcesssubs_
 
xdata::UnsignedInteger32 superSleepSec_
 
bool supervising_
 
std::string updaterStatic_
 
xdata::String url_
 
pid_t vp_
 
Vulturevulture_
 
toolbox::task::WorkLoop * wlReceiving_
 
toolbox::task::WorkLoop * wlReceivingMonitor_
 
toolbox::task::WorkLoop * wlScalers_
 
bool wlScalersActive_
 
toolbox::task::WorkLoop * wlSummarize_
 
bool wlSummarizeActive_
 
toolbox::task::WorkLoop * wlSupervising_
 

Static Private Attributes

static const unsigned int logRingSize_ = 50
 

Detailed Description

Definition at line 58 of file FUEventProcessor.h.

Constructor & Destructor Documentation

FUEventProcessor::FUEventProcessor ( xdaq::ApplicationStub *  s)

Definition at line 55 of file FUEventProcessor.cc.

References anonymousPipe_, applicationInfoSpace_, autoRestartSlaves_, evf::SquidNet::check(), class_, configString_, css(), defaultWebPage(), epInitialized_, evtProcessor_, evf::StateMachine::findRcmsStateListener(), evf::StateMachine::foundRcmsStateListener(), fsm_, edm::PresenceFactory::get(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, hasShMem_, iDieStatisticsGathering_, iDieUrl_, evf::StateMachine::initialize(), instance_, isRunNumberSetter_, edm::PresenceFactory::makePresence(), makeStaticInfo(), microState(), moduleWeb(), monitorInfoSpace_, monitorLegendaInfoSpace_, names_, nbSubProcesses_, nbSubProcessesReporting_, outPut_, pathNames(), pickup_lock_, pipe::pipe(), procStat(), evf::FWEPWrapper::publishConfigAndMonitorItems(), evf::StateMachine::rcmsStateListener(), runNumber_, scalersInfoSpace_, scalersLegendaInfoSpace_, scalersWeb(), serviceWeb(), evf::FWEPWrapper::setAppCtxt(), evf::FWEPWrapper::setAppDesc(), ML::MLlog4cplus::setAppl(), evf::FWEPWrapper::setApplicationInfoSpace(), evf::FWEPWrapper::setMonitorInfoSpace(), evf::FWEPWrapper::setRcms(), evf::FWEPWrapper::setScalersInfoSpace(), slaveRestartDelaySecs_, sourceId_, spMStates_, spmStates_, spotlightWebPage(), squidnet_, squidPresent_, start_lock_, startSupervisorLoop(), evf::StateMachine::stateName(), stop_lock_, subWeb(), superSleepSec_, updater(), url_, and vulture_.

56  : xdaq::Application(s)
57  , fsm_(this)
58  , log_(getApplicationLogger())
59  , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
60  , runNumber_(0)
61  , epInitialized_(false)
62  , outPut_(true)
63  , autoRestartSlaves_(false)
65  , hasShMem_(true)
66  , hasPrescaleService_(true)
67  , hasModuleWebRegistry_(true)
69  , isRunNumberSetter_(true)
71  , outprev_(true)
72  , exitOnError_(true)
74  , squidnet_(3128,"http://localhost:8000/RELEASE-NOTES.txt")
77  , logWrap_(false)
78  , nbSubProcesses_(0)
80  , nblive_(0)
81  , nbdead_(0)
82  , nbTotalDQM_(0)
83  , wlReceiving_(0)
85  , receiving_(false)
88  , receivingM_(false)
89  , myProcess_(0)
90  , wlSupervising_(0)
91  , asSupervisor_(0)
92  , supervising_(false)
96  , nbProcessed(0)
97  , nbAccepted(0)
100  , wlScalers_(0)
101  , asScalers_(0)
102  , wlScalersActive_(false)
103  , scalersUpdates_(0)
104  , wlSummarize_(0)
105  , asSummarize_(0)
106  , wlSummarizeActive_(false)
107  , superSleepSec_(1)
108  , iDieUrl_("none")
109  , vulture_(0)
110  , vp_(0)
111  , cpustat_(0)
112  , ratestat_(0)
113 {
114  using namespace utils;
115 
116  names_.push_back("nbProcessed" );
117  names_.push_back("nbAccepted" );
118  names_.push_back("epMacroStateInt");
119  names_.push_back("epMicroStateInt");
120  // create pipe for web communication
121  int retpipe = pipe(anonymousPipe_);
122  if(retpipe != 0)
123  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
124  // check squid presence
126  //pass application parameters to FWEPWrapper
127  evtProcessor_.setAppDesc(getApplicationDescriptor());
128  evtProcessor_.setAppCtxt(getApplicationContext());
129  // bind relevant callbacks to finite state machine
131 
132  //set sourceId_
133  url_ =
134  getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
135  getApplicationDescriptor()->getURN();
136  class_ =getApplicationDescriptor()->getClassName();
137  instance_=getApplicationDescriptor()->getInstance();
138  sourceId_=class_.toString()+"_"+instance_.toString();
139  LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
140  LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
141 
142  getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
143 
144  xdata::InfoSpace *ispace = getApplicationInfoSpace();
145  applicationInfoSpace_ = ispace;
146 
147  // default configuration
148  ispace->fireItemAvailable("parameterSet", &configString_ );
149  ispace->fireItemAvailable("epInitialized", &epInitialized_ );
150  ispace->fireItemAvailable("stateName", fsm_.stateName() );
151  ispace->fireItemAvailable("runNumber", &runNumber_ );
152  ispace->fireItemAvailable("outputEnabled", &outPut_ );
153 
154  ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
155  ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
156  ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
157  ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
158  ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
159  ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
160  ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
161  ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
162  ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
163  ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
164  ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
165  ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
166  ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
167  ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
168 
169  // Add infospace listeners for exporting data values
170  getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
171  getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
172 
173  // findRcmsStateListener
175 
176  // initialize monitoring infospace
177 
178  std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
179  toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
180  monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
181 
182  std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
183  urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
184  monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
185 
186 
187  monitorInfoSpace_->fireItemAvailable("url", &url_ );
188  monitorInfoSpace_->fireItemAvailable("class", &class_ );
189  monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
190  monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
191  monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
192 
193  monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
194 
195  std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
196  urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
197  scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
198 
199  std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
200  urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
201  scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
202 
203 
204 
206  scalersInfoSpace_->fireItemAvailable("instance", &instance_);
207 
211 
212  //subprocess state vectors for MP
213  monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
214  monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
215 
216  // Bind web interface
217  xgi::bind(this, &FUEventProcessor::css, "styles.css");
218  xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
219  xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
220  xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
221  xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
222  xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
223  xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
224  xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
225  xgi::bind(this, &FUEventProcessor::microState, "microState");
226  xgi::bind(this, &FUEventProcessor::updater, "updater" );
227  xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
228 
229  // instantiate the plugin manager, not referenced here after!
230 
232 
233  try{
234  LOG4CPLUS_DEBUG(getApplicationLogger(),
235  "Trying to create message service presence ");
237  if(pf != 0) {
238  pf->makePresence("MessageServicePresence").release();
239  }
240  else {
241  LOG4CPLUS_ERROR(getApplicationLogger(),
242  "Unable to create message service presence ");
243  }
244  }
245  catch(...) {
246  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
247  }
249 
250  typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
251  typedef AppDescSet_t::iterator AppDescIter_t;
252 
253  AppDescSet_t rcms=
254  getApplicationContext()->getDefaultZone()->
255  getApplicationDescriptors("RCMSStateListener");
256  if(rcms.size()==0)
257  {
258  LOG4CPLUS_WARN(getApplicationLogger(),
259  "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
260  // localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
261  }
262  else
263  {
264  AppDescIter_t it = rcms.begin();
265  evtProcessor_.setRcms(*it);
266  }
267  pthread_mutex_init(&start_lock_,0);
268  pthread_mutex_init(&stop_lock_,0);
269  pthread_mutex_init(&pickup_lock_,0);
270 
271  makeStaticInfo();
273 
274  if(vulture_==0) vulture_ = new Vulture(true);
275 
277 
278  AppDescSet_t setOfiDie=
279  getApplicationContext()->getDefaultZone()->
280  getApplicationDescriptors("evf::iDie");
281 
282  for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
283  if ((*it)->getInstance()==0) // there has to be only one instance of iDie
284  iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
285 }
xdata::Boolean hasModuleWebRegistry_
void spotlightWebPage(xgi::Input *, xgi::Output *)
bool check()
Definition: SquidNet.cc:21
xdata::Vector< xdata::Integer > spMStates_
xdata::InfoSpace * monitorInfoSpace_
void subWeb(xgi::Input *in, xgi::Output *out)
std::vector< std::string > logRing_
xdata::UnsignedInteger32 slaveRestartDelaySecs_
xdata::UnsignedInteger32 runNumber_
void updater(xgi::Input *in, xgi::Output *out)
toolbox::task::WorkLoop * wlScalers_
pthread_mutex_t start_lock_
xdata::Boolean isRunNumberSetter_
void setAppCtxt(xdaq::ApplicationContext *ctx)
Definition: FWEPWrapper.h:88
xdata::Boolean hasServiceWebRegistry_
toolbox::task::WorkLoop * wlReceiving_
xdata::Boolean hasShMem_
def pipe
Definition: pipe.py:5
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
xdata::InfoSpace * scalersLegendaInfoSpace_
toolbox::task::ActionSignature * asSupervisor_
xdata::Boolean autoRestartSlaves_
xdata::UnsignedInteger32 nbSubProcessesReporting_
void css(xgi::Input *in, xgi::Output *out)
toolbox::task::ActionSignature * asReceiveMsgAndRead_
xdata::Boolean exitOnError_
xdata::InfoSpace * monitorLegendaInfoSpace_
void initialize(T *app)
Definition: StateMachine.h:84
xdata::Serializable * nbProcessed
xdata::Bag< xdaq2rc::ClassnameAndInstance > * rcmsStateListener()
Definition: StateMachine.h:72
evf::StateMachine fsm_
toolbox::task::WorkLoop * wlSupervising_
static void setAppl(xdaq::Application *app)
Definition: MLlog4cplus.cc:85
std::string reasonForFailedState_
xdata::Boolean iDieStatisticsGathering_
void setAppDesc(xdaq::ApplicationDescriptor *ad)
Definition: FWEPWrapper.h:87
void scalersWeb(xgi::Input *, xgi::Output *)
xdata::Boolean squidPresent_
std::list< std::string > names_
void setScalersInfoSpace(xdata::InfoSpace *sis, xdata::InfoSpace *slis)
Definition: FWEPWrapper.h:74
xdata::Boolean hasPrescaleService_
void procStat(xgi::Input *in, xgi::Output *out)
toolbox::task::WorkLoop * wlSummarize_
xdata::Serializable * nbAccepted
static PresenceFactory * get()
void setMonitorInfoSpace(xdata::InfoSpace *mis, xdata::InfoSpace *mlis)
Definition: FWEPWrapper.h:80
xdata::InfoSpace * applicationInfoSpace_
xdata::UnsignedInteger32 nbSubProcesses_
std::auto_ptr< Presence > makePresence(std::string const &presence_type) const
xdata::String * stateName()
Definition: StateMachine.h:69
void setApplicationInfoSpace(xdata::InfoSpace *is)
Definition: FWEPWrapper.h:79
xdata::String configString_
unsigned int scalersUpdates_
xdata::Boolean * foundRcmsStateListener()
Definition: StateMachine.h:78
xdata::UnsignedInteger32 superSleepSec_
void moduleWeb(xgi::Input *in, xgi::Output *out)
void pathNames(xgi::Input *, xgi::Output *)
pthread_mutex_t pickup_lock_
void defaultWebPage(xgi::Input *in, xgi::Output *out)
toolbox::task::ActionSignature * asSummarize_
xdata::Vector< xdata::Integer > spmStates_
xdata::UnsignedInteger32 instance_
void serviceWeb(xgi::Input *in, xgi::Output *out)
string s
Definition: asciidump.py:422
toolbox::task::WorkLoop * wlReceivingMonitor_
toolbox::task::ActionSignature * asScalers_
pthread_mutex_t stop_lock_
void publishConfigAndMonitorItems(bool)
Definition: FWEPWrapper.cc:106
xdata::InfoSpace * scalersInfoSpace_
void microState(xgi::Input *in, xgi::Output *out)
void setRcms(xdaq::ApplicationDescriptor *rcms)
Definition: FWEPWrapper.h:86
static const unsigned int logRingSize_
xdata::Boolean epInitialized_
void findRcmsStateListener()
FUEventProcessor::~FUEventProcessor ( )
virtual

Definition at line 290 of file FUEventProcessor.cc.

References vulture_.

291 {
292  // no longer needed since the wrapper is a member of the class and one can rely on
293  // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
294  // if (evtProcessor_) delete evtProcessor_;
295  if(vulture_ != 0) delete vulture_;
296 }

Member Function Documentation

void FUEventProcessor::actionPerformed ( xdata::Event &  e)

Definition at line 527 of file FUEventProcessor.cc.

References ExpressReco_HICollisions_FallBack::e, edm::EventProcessor::enableEndPaths(), epInitialized_, evtProcessor_, fsm_, outprev_, outPut_, and evf::StateMachine::stateName().

528 {
529 
530  if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
531  std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
532 
533  if ( item == "parameterSet") {
534  LOG4CPLUS_WARN(getApplicationLogger(),
535  "HLT Menu changed, force reinitialization of EventProcessor");
536  epInitialized_ = false;
537  }
538  if ( item == "outputEnabled") {
539  if(outprev_ != outPut_) {
540  LOG4CPLUS_WARN(getApplicationLogger(),
541  (outprev_ ? "Disabling " : "Enabling ")<<"global output");
543  outprev_ = outPut_;
544  }
545  }
546  if (item == "globalInputPrescale") {
547  LOG4CPLUS_WARN(getApplicationLogger(),
548  "Setting global input prescale has no effect "
549  <<"in this version of the code");
550  }
551  if ( item == "globalOutputPrescale") {
552  LOG4CPLUS_WARN(getApplicationLogger(),
553  "Setting global output prescale has no effect "
554  <<"in this version of the code");
555  }
556  }
557 
558 }
evf::StateMachine fsm_
xdata::String * stateName()
Definition: StateMachine.h:69
xdata::Boolean epInitialized_
void enableEndPaths(bool active)
void FUEventProcessor::attachDqmToShm ( )
throw (evf::Exception
)
private

Definition at line 756 of file FUEventProcessor.cc.

References ExpressReco_HICollisions_FallBack::e, evtProcessor_, edm::EventProcessor::getToken(), edm::Service< T >::isAvailable(), summarizeEdmComparisonLogfiles::success, and cms::Exception::what().

Referenced by enableCommon().

757 {
758  std::string errmsg;
759  bool success = false;
760  try {
763  success = edm::Service<FUShmDQMOutputService>()->attachToShm();
764  if (!success) errmsg = "Failed to attach DQM service to shared memory";
765  }
766  catch (cms::Exception& e) {
767  errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
768  }
769  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
770 }
virtual char const * what() const
Definition: Exception.cc:97
bool isAvailable() const
Definition: Service.h:47
ServiceToken getToken()
bool FUEventProcessor::configuring ( toolbox::task::WorkLoop *  wl)

Definition at line 305 of file FUEventProcessor.cc.

References edm::EventProcessor::beginJob(), configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, ExpressReco_HICollisions_FallBack::e, epInitialized_, evtProcessor_, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getNumberOfMicrostates(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, evf::FWEPWrapper::init(), instance_, localLog(), evf::Vulture::makeProcess(), nbSubProcesses_, ratestat_, reasonForFailedState_, scalersLegendaInfoSpace_, evf::RateStat::sendLegenda(), evf::CPUStat::sendLegenda(), edm::event_processor::sInit, spMStates_, spmStates_, evf::FWEPWrapper::startMonitoringWorkLoop(), vp_, and vulture_.

306 {
307 // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
308 // << ((instance_.value_==0) ? 0x8 : 0) << " "
309 // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
310 // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
311 // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
312  unsigned short smap
313  = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
314  + ((instance_.value_==0) ? 0x8 : 0)
315  + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
316  + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
317  + (hasPrescaleService_.value_ ? 0x1 : 0);
318  if(nbSubProcesses_.value_==0)
319  {
320  spMStates_.setSize(1);
321  spmStates_.setSize(1);
322  }
323  else
324  {
325  spMStates_.setSize(nbSubProcesses_.value_);
326  spmStates_.setSize(nbSubProcesses_.value_);
327  for(unsigned int i = 0; i < spMStates_.size(); i++)
328  {
330  spmStates_[i] = 0;
331  }
332  }
333  try {
334  LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
335  std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
336  epInitialized_=true;
337  if(evtProcessor_)
338  {
339  // moved to wrapper class
343  if(cpustat_) {delete cpustat_; cpustat_=0;}
345  iDieUrl_.value_);
346  if(ratestat_) {delete ratestat_; ratestat_=0;}
347  ratestat_ = new RateStat(iDieUrl_.value_);
348  if(iDieStatisticsGathering_.value_){
349  try{
351  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
352  if(legenda !=0){
353  std::string slegenda = ((xdata::String*)legenda)->value_;
354  ratestat_->sendLegenda(slegenda);
355  }
356 
357  }
358  catch(evf::Exception &e){
359  LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
360  << e.what());
361  }
362  }
363 
364  fsm_.fireEvent("ConfigureDone",this);
365  LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
366  localLog("-I- Configuration completed");
367 
368  }
369  }
370  catch (xcept::Exception &e) {
371  reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
374  }
375  catch(cms::Exception &e) {
379  }
380  catch(std::exception &e) {
381  reasonForFailedState_ = e.what();
384  }
385  catch(...) {
386  fsm_.fireFailed("Unknown Exception",this);
387  }
388 
389  if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
390 
391  return false;
392 }
xdata::Boolean hasModuleWebRegistry_
xdata::Vector< xdata::Integer > spMStates_
int i
Definition: DBlmapReader.cc:9
pid_t makeProcess()
Definition: Vulture.cc:148
void startMonitoringWorkLoop()
Definition: FWEPWrapper.cc:531
virtual std::string explainSelf() const
Definition: Exception.cc:56
xdata::Boolean hasServiceWebRegistry_
void init(unsigned short, std::string &)
Definition: FWEPWrapper.cc:185
xdata::InfoSpace * scalersLegendaInfoSpace_
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
std::string reasonForFailedState_
xdata::Boolean iDieStatisticsGathering_
xdata::Boolean hasPrescaleService_
unsigned int getNumberOfMicrostates()
Definition: FWEPWrapper.h:136
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String configString_
std::vector< std::string > const & getmicromap() const
Definition: FWEPWrapper.h:137
std::string const & configuration() const
Definition: FWEPWrapper.h:98
void sendLegenda(const std::vector< std::string > &)
Definition: CPUStat.cc:26
xdata::Vector< xdata::Integer > spmStates_
void sendLegenda(const std::string &)
Definition: RateStat.cc:24
xdata::UnsignedInteger32 instance_
xdata::Boolean epInitialized_
void evf::FUEventProcessor::css ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
inline

Definition at line 101 of file FUEventProcessor.h.

References evf::Css::css(), and css_.

Referenced by FUEventProcessor().

102  {
103  css_.css(in,out);
104  }
void css(xgi::Input *in, xgi::Output *out)
Definition: Css.h:15
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::defaultWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 629 of file FUEventProcessor.cc.

References nbSubProcesses_, and dbtoconf::out.

Referenced by FUEventProcessor(), and receivingAndMonitor().

631 {
632 
633 
634  *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
635  << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
636  << getApplicationDescriptor()->getInstance() << "</title>"
637  << "<meta http-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
638  << "</head></html>";
639 }
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
void FUEventProcessor::detachDqmFromShm ( )
throw (evf::Exception
)
private

Definition at line 774 of file FUEventProcessor.cc.

References ExpressReco_HICollisions_FallBack::e, evtProcessor_, edm::EventProcessor::getToken(), edm::Service< T >::isAvailable(), summarizeEdmComparisonLogfiles::success, and cms::Exception::what().

Referenced by stopClassic().

775 {
776  std::string errmsg;
777  bool success = false;
778  try {
781  success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
782  if (!success) errmsg = "Failed to detach DQM service from shared memory";
783  }
784  catch (cms::Exception& e) {
785  errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
786  }
787  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
788 }
virtual char const * what() const
Definition: Exception.cc:97
bool isAvailable() const
Definition: Service.h:47
ServiceToken getToken()
bool FUEventProcessor::enableClassic ( )
private

Definition at line 1511 of file FUEventProcessor.cc.

References enableCommon(), evtProcessor_, edm::EventProcessor::getState(), localLog(), stor::utils::sleep(), and edm::event_processor::sRunning.

Referenced by enabling().

1512 {
1513  bool retval = enableCommon();
1515  LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
1516  ::sleep(1);
1517  }
1518 
1519  // implementation moved to EPWrapper
1520  // startScalersWorkLoop(); // this is now not done any longer
1521  localLog("-I- Start completed");
1522  return retval;
1523 }
event_processor::State getState() const
void sleep(Duration_t)
Definition: Utils.h:163
void localLog(std::string)
bool FUEventProcessor::enableCommon ( )
private

Definition at line 1452 of file FUEventProcessor.cc.

References attachDqmToShm(), edm::EventProcessor::clearCounters(), gather_cfg::cout, edm::EventProcessor::declareRunNumber(), ExpressReco_HICollisions_FallBack::e, evtProcessor_, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, isRunNumberSetter_, localLog(), reasonForFailedState_, edm::EventProcessor::runAsync(), runNumber_, edm::EventProcessor::setRunNumber(), stor::utils::sleep(), and edm::EventProcessor::statusAsync().

Referenced by enableClassic(), and enableMPEPSlave().

1453 {
1454  try {
1455  if(hasShMem_) attachDqmToShm();
1456  int sc = 0;
1458  if(isRunNumberSetter_)
1460  else
1462  try{
1463  ::sleep(1);
1465  sc = evtProcessor_->statusAsync();
1466  }
1467  catch(cms::Exception &e) {
1471  return false;
1472  }
1473  catch(std::exception &e) {
1474  reasonForFailedState_ = e.what();
1477  return false;
1478  }
1479  catch(...) {
1480  reasonForFailedState_ = "Unknown Exception";
1483  return false;
1484  }
1485  if(sc != 0) {
1486  std::ostringstream oss;
1487  oss<<"EventProcessor::runAsync returned status code " << sc;
1488  reasonForFailedState_ = oss.str();
1491  return false;
1492  }
1493  }
1494  catch (xcept::Exception &e) {
1495  reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
1498  return false;
1499  }
1500  try{
1501  fsm_.fireEvent("EnableDone",this);
1502  }
1503  catch (xcept::Exception &e) {
1504  std::cout << "exception " << (std::string)e.what() << std::endl;
1505  throw;
1506  }
1507 
1508  return false;
1509 }
xdata::UnsignedInteger32 runNumber_
virtual std::string explainSelf() const
Definition: Exception.cc:56
xdata::Boolean isRunNumberSetter_
void clearCounters()
Clears counters used by trigger report.
xdata::Boolean hasShMem_
void setRunNumber(RunNumber_t runNumber)
void sleep(Duration_t)
Definition: Utils.h:163
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
void declareRunNumber(RunNumber_t runNumber)
std::string reasonForFailedState_
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
StatusCode statusAsync() const
tuple cout
Definition: gather_cfg.py:41
bool FUEventProcessor::enableMPEPSlave ( )
private

Definition at line 1524 of file FUEventProcessor.cc.

References ExpressReco_HICollisions_FallBack::e, enableCommon(), evtProcessor_, edm::hlt::Exception, evf::StateMachine::fireFailed(), fsm_, edm::PresenceFactory::get(), evf::FWEPWrapper::isWaitingForLs(), localLog(), edm::PresenceFactory::makePresence(), reasonForFailedState_, evf::FWEPWrapper::resetWaiting(), ML::MLlog4cplus::setAppl(), stor::utils::sleep(), startReceivingLoop(), startReceivingMonitorLoop(), and startScalersWorkLoop().

Referenced by enabling(), and supervisor().

1525 {
1526  //all this happens only in the child process
1531  while(!evtProcessor_.isWaitingForLs())
1532  ::sleep(1);
1533  // @EM test do not run monitor loop in slave, only receiving&Monitor
1534  // evtProcessor_.startMonitoringWorkLoop();
1535  try{
1536  // evtProcessor_.makeServicesOnly();
1537  try{
1538  LOG4CPLUS_DEBUG(getApplicationLogger(),
1539  "Trying to create message service presence ");
1541  if(pf != 0) {
1542  pf->makePresence("MessageServicePresence").release();
1543  }
1544  else {
1545  LOG4CPLUS_ERROR(getApplicationLogger(),
1546  "Unable to create message service presence ");
1547  }
1548  }
1549  catch(...) {
1550  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
1551  }
1552  ML::MLlog4cplus::setAppl(this);
1553  }
1554  catch (xcept::Exception &e) {
1555  reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
1558  }
1559  bool retval = enableCommon();
1560  // while(evtProcessor_->getState()!= edm::event_processor::sRunning){
1561  // LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
1562  // ::sleep(1);
1563  // }
1564  return retval;
1565 }
void sleep(Duration_t)
Definition: Utils.h:163
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
static void setAppl(xdaq::Application *app)
Definition: MLlog4cplus.cc:85
std::string reasonForFailedState_
void localLog(std::string)
static PresenceFactory * get()
std::auto_ptr< Presence > makePresence(std::string const &presence_type) const
bool isWaitingForLs()
Definition: FWEPWrapper.h:131
bool FUEventProcessor::enabling ( toolbox::task::WorkLoop *  wl)

Definition at line 398 of file FUEventProcessor.cc.

References edm::EventProcessor::beginJob(), configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, evf::StateMachine::disableRcmsStateNotification(), ExpressReco_HICollisions_FallBack::e, enableClassic(), enableMPEPSlave(), epInitialized_, evtProcessor_, evf::StateMachine::fireEvent(), evf::FWEPWrapper::forceInitEventProcessorMaybe(), fsm_, evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getNumberOfMicrostates(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, evf::FWEPWrapper::init(), instance_, localLog(), myProcess_, nbSubProcesses_, nbTotalDQM_, ratestat_, evf::FWEPWrapper::resetLumiSectionReferenceIndex(), runNumber_, scalersInfoSpace_, scalersUpdates_, evf::RateStat::sendLegenda(), evf::CPUStat::sendLegenda(), evf::Vulture::start(), start_lock_, startSummarizeWorkLoop(), stop_lock_, subs_, vp_, and vulture_.

399 {
400  nbTotalDQM_ = 0;
401  scalersUpdates_ = 0;
402 // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
403 // << ((instance_.value_==0) ? 0x8 : 0) << " "
404 // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
405 // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
406 // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
407  unsigned short smap
408  = ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
409  + ((instance_.value_==0) ? 0x8 : 0)
410  + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
411  + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
412  + (hasPrescaleService_.value_ ? 0x1 : 0);
413 
414  LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
415  if(!epInitialized_){
417  }
418  std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
419 
420  if(!epInitialized_){
422  if(cpustat_) {delete cpustat_; cpustat_=0;}
424  iDieUrl_.value_);
425  if(iDieStatisticsGathering_.value_){
426  try{
428  xdata::Serializable *legenda = scalersInfoSpace_->find("scalersLegenda");
429  if(legenda !=0){
430  std::string slegenda = ((xdata::String*)legenda)->value_;
431  ratestat_->sendLegenda(slegenda);
432  }
433  }
434  catch(evf::Exception &e){
435  LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
436  << e.what());
437  }
438  }
439  if(ratestat_) {delete ratestat_; ratestat_=0;}
440  ratestat_ = new RateStat(iDieUrl_.value_);
441  epInitialized_ = true;
442  }
443  configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
445  //classic appl will return here
446  if(nbSubProcesses_.value_==0) return enableClassic();
447  //protect manipulation of subprocess array
448  pthread_mutex_lock(&start_lock_);
449 // subs_.clear();
450 // subs_.resize(nbSubProcesses_.value_); // this should not be necessary
451  pid_t retval = -1;
452  for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
453  {
454  subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
455  }
456  pthread_mutex_unlock(&start_lock_);
457 
458  for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
459  {
460  retval = subs_[i].forkNew();
461  if(retval==0)
462  {
463  myProcess_ = &subs_[i];
464  int retval = pthread_mutex_destroy(&stop_lock_);
465  if(retval != 0) perror("error");
466  retval = pthread_mutex_init(&stop_lock_,0);
467  if(retval != 0) perror("error");
469  return enableMPEPSlave();
470  // the loop is broken in the child
471  }
472  }
473 
475  vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
476 
477  LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
478  fsm_.fireEvent("EnableDone",this);
479  localLog("-I- Start completed");
480  return false;
481 }
xdata::Boolean hasModuleWebRegistry_
int i
Definition: DBlmapReader.cc:9
xdata::UnsignedInteger32 runNumber_
pthread_mutex_t start_lock_
xdata::Boolean hasServiceWebRegistry_
void init(unsigned short, std::string &)
Definition: FWEPWrapper.cc:185
void forceInitEventProcessorMaybe()
Definition: FWEPWrapper.h:65
std::vector< SubProcess > subs_
evf::StateMachine fsm_
void resetLumiSectionReferenceIndex()
Definition: FWEPWrapper.h:133
xdata::Boolean iDieStatisticsGathering_
int start(std::string, int=0)
Definition: Vulture.cc:235
xdata::Boolean hasPrescaleService_
unsigned int getNumberOfMicrostates()
Definition: FWEPWrapper.h:136
void fireEvent(const std::string &evtType, void *originator)
void disableRcmsStateNotification()
Definition: StateMachine.h:64
void localLog(std::string)
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String configString_
unsigned int scalersUpdates_
std::vector< std::string > const & getmicromap() const
Definition: FWEPWrapper.h:137
std::string const & configuration() const
Definition: FWEPWrapper.h:98
void sendLegenda(const std::vector< std::string > &)
Definition: CPUStat.cc:26
void sendLegenda(const std::string &)
Definition: RateStat.cc:24
xdata::UnsignedInteger32 instance_
pthread_mutex_t stop_lock_
xdata::InfoSpace * scalersInfoSpace_
xdata::Boolean epInitialized_
xoap::MessageReference FUEventProcessor::fsmCallback ( xoap::MessageReference  msg)
throw (xoap::exception::Exception
)

Definition at line 519 of file FUEventProcessor.cc.

References evf::StateMachine::commandCallback(), fsm_, and runTheMatrix::msg.

521 {
522  return fsm_.commandCallback(msg);
523 }
xoap::MessageReference commandCallback(xoap::MessageReference msg)
Definition: StateMachine.cc:71
evf::StateMachine fsm_
bool FUEventProcessor::halting ( toolbox::task::WorkLoop *  wl)

Definition at line 495 of file FUEventProcessor.cc.

References ExpressReco_HICollisions_FallBack::e, evtProcessor_, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, localLog(), nbSubProcesses_, reasonForFailedState_, evf::FWEPWrapper::stopAndHalt(), and stopSlavesAndAcknowledge().

496 {
497  LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
498  if(nbSubProcesses_.value_!=0)
500  try{
502  }
503  catch (evf::Exception &e) {
504  reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
507  }
508  // if(hasShMem_) detachDqmFromShm();
509 
510  LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
511  fsm_.fireEvent("HaltDone",this);
512 
513  localLog("-I- Halt completed");
514  return false;
515 }
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
std::string reasonForFailedState_
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
xdata::UnsignedInteger32 nbSubProcesses_
void FUEventProcessor::localLog ( std::string  m)
private

Definition at line 808 of file FUEventProcessor.cc.

References logRing_, logRingIndex_, logRingSize_, logWrap_, m, and cond::timestamp.

Referenced by configuring(), enableClassic(), enableCommon(), enableMPEPSlave(), enabling(), halting(), stopClassic(), stopSlavesAndAcknowledge(), and supervisor().

809 {
810  timeval tv;
811 
812  gettimeofday(&tv,0);
813  tm *uptm = localtime(&tv.tv_sec);
814  char datestring[256];
815  strftime(datestring, sizeof(datestring),"%c", uptm);
816 
817  if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
818  logRingIndex_--;
819  std::ostringstream timestamp;
820  timestamp << " at " << datestring;
821  m += timestamp.str();
823 }
std::vector< std::string > logRing_
static const unsigned int logRingSize_
std::string FUEventProcessor::logsAsString ( )
private

Definition at line 791 of file FUEventProcessor.cc.

References i, logRing_, logRingIndex_, and logWrap_.

792 {
793  std::ostringstream oss;
794  if(logWrap_)
795  {
796  for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
797  oss << logRing_[i] << std::endl;
798  for(unsigned int i = 0; i < logRingIndex_; i++)
799  oss << logRing_[i] << std::endl;
800  }
801  else
802  for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
803  oss << logRing_[i] << std::endl;
804 
805  return oss.str();
806 }
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > logRing_
void FUEventProcessor::makeStaticInfo ( )
private

Definition at line 1820 of file FUEventProcessor.cc.

References applicationInfoSpace_, evf::utils::cDiv(), ExpressReco_HICollisions_FallBack::e, edm::hlt::Exception, edm::getReleaseVersion(), hasModuleWebRegistry_, hasServiceWebRegistry_, hasShMem_, evf::utils::mDiv(), outPut_, and updaterStatic_.

Referenced by FUEventProcessor().

1821 {
1822  using namespace utils;
1823  std::ostringstream ost;
1824  mDiv(&ost,"ve");
1825  ost<< "$Revision: 1.123 $ (" << edm::getReleaseVersion() <<")";
1826  cDiv(&ost);
1827  mDiv(&ost,"ou",outPut_.toString());
1828  mDiv(&ost,"sh",hasShMem_.toString());
1829  mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
1830  mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
1831 
1832  xdata::Serializable *monsleep = 0;
1833  xdata::Serializable *lstimeout = 0;
1834  try{
1835  monsleep = applicationInfoSpace_->find("monSleepSec");
1836  lstimeout = applicationInfoSpace_->find("lsTimeOut");
1837  }
1839  }
1840 
1841  if(monsleep!=0)
1842  mDiv(&ost,"ms",monsleep->toString());
1843  if(lstimeout!=0)
1844  mDiv(&ost,"lst",lstimeout->toString());
1845  char cbuf[sizeof(struct utsname)];
1846  struct utsname* buf = (struct utsname*)cbuf;
1847  uname(buf);
1848  mDiv(&ost,"sysinfo");
1849  ost << buf->sysname << " " << buf->nodename
1850  << " " << buf->release << " " << buf->version << " " << buf->machine;
1851  cDiv(&ost);
1852  updaterStatic_ = ost.str();
1853 }
xdata::Boolean hasModuleWebRegistry_
xdata::Boolean hasServiceWebRegistry_
xdata::Boolean hasShMem_
void cDiv(std::ostringstream *out)
Definition: procUtils.cc:338
std::string getReleaseVersion()
xdata::InfoSpace * applicationInfoSpace_
void mDiv(std::ostringstream *out, std::string name)
Definition: procUtils.cc:335
void FUEventProcessor::microState ( xgi::Input in,
xgi::Output out 
)

Definition at line 1637 of file FUEventProcessor.cc.

References autoRestartSlaves_, gather_cfg::cout, ExpressReco_HICollisions_FallBack::e, evtProcessor_, cmsCodeRules.cppFunctionSkipper::exception, fsm_, evf::FWEPWrapper::getScalersUpdates(), i, evf::FWEPWrapper::microState(), evf::FWEPWrapper::moduleNameFromIndex(), myProcess_, nbdead_, nblive_, nbSubProcesses_, nbTotalDQM_, pickup_lock_, start_lock_, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), subs_, and cms::Exception::what().

Referenced by FUEventProcessor().

1638 {
1639  std::string urn = getApplicationDescriptor()->getURN();
1640  try{
1643  if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
1644  *out << "<tr><td>" << fsm_.stateName()->toString()
1645  << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
1646  << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
1648  *out << "<td>" << nbTotalDQM_
1649  << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
1650  if(nbSubProcesses_.value_!=0 && !myProcess_)
1651  {
1652  pthread_mutex_lock(&start_lock_);
1653  for(unsigned int i = 0; i < subs_.size(); i++)
1654  {
1655  try{
1656  if(subs_[i].alive()>0)
1657  {
1658  *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
1659  << i << "\">""Alive</td><td>S</td><td>"
1660  << subs_[i].queueId() << "<td>"
1661  << subs_[i].queueStatus()<< "/"
1662  << subs_[i].queueOccupancy() << "/"
1663  << subs_[i].queuePidOfLastSend() << "/"
1664  << subs_[i].queuePidOfLastReceive()
1665  << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
1666  << subs_[i].pid() << "&method=procStat\">"
1667  << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
1668  << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
1669  << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
1670  << subs_[i].params().nba << "/" << subs_[i].params().nbp
1671  << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
1672  << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
1673  << "</td><td>" << subs_[i].params().ps
1674  << "</td><td>" << subs_[i].params().dqm
1675  << "</td><td>" << subs_[i].params().trp << "</td>";
1676  }
1677  else
1678  {
1679  pthread_mutex_lock(&pickup_lock_);
1680  *out << "<tr><td id=\"a"<< i << "\" ";
1681  if(subs_[i].alive()==-1000)
1682  *out << " bgcolor=\"#bbaabb\">NotInitialized";
1683  else
1684  *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
1685  *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
1686  << subs_[i].queueStatus() << "/"
1687  << subs_[i].queueOccupancy() << "/"
1688  << subs_[i].queuePidOfLastSend() << "/"
1689  << subs_[i].queuePidOfLastReceive()
1690  << "</td><td id=\"p"<< i << "\">"
1691  <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
1692  if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
1693  {
1694  if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
1695  *out << " will restart in " << subs_[i].countdown() << " s";
1696  else if(autoRestartSlaves_)
1697  *out << " reached maximum restart count";
1698  else *out << " autoRestart is disabled ";
1699  }
1700  *out << "</td><td>" << subs_[i].params().dqm
1701  << "</td><td>" << subs_[i].params().trp << "</td>";
1702  pthread_mutex_unlock(&pickup_lock_);
1703  }
1704  *out << "</tr>";
1705  }
1706  catch(evf::Exception &e){
1707  *out << "<tr><td id=\"a"<< i << "\" "
1708  <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
1709  << subs_[i].queueStatus() << "/"
1710  << subs_[i].queueOccupancy() << "/"
1711  << subs_[i].queuePidOfLastSend() << "/"
1712  << subs_[i].queuePidOfLastReceive()
1713  << "</td><td id=\"p"<< i << "\">"
1714  <<subs_[i].pid()<<"</td>";
1715  }
1716  }
1717  pthread_mutex_unlock(&start_lock_);
1718  }
1719  }
1720  catch(evf::Exception &e)
1721  {
1722  LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
1723  }
1724  catch(cms::Exception &e)
1725  {
1726  LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
1727  }
1728  catch(std::exception &e)
1729  {
1730  LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
1731  }
1732  catch(...)
1733  {
1734  LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
1735  }
1736 
1737 }
virtual char const * what() const
Definition: Exception.cc:97
int i
Definition: DBlmapReader.cc:9
pthread_mutex_t start_lock_
xdata::Boolean autoRestartSlaves_
std::string const & moduleNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:118
std::vector< SubProcess > subs_
evf::StateMachine fsm_
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
std::string const & stateNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:123
pthread_mutex_t pickup_lock_
tuple cout
Definition: gather_cfg.py:41
void microState(xgi::Input *in, xgi::Output *out)
unsigned int getScalersUpdates()
Definition: FWEPWrapper.h:132
void evf::FUEventProcessor::moduleWeb ( xgi::Input in,
xgi::Output out 
)
inline

Definition at line 107 of file FUEventProcessor.h.

References evtProcessor_, and evf::FWEPWrapper::moduleWeb().

Referenced by FUEventProcessor(), and receivingAndMonitor().

void moduleWeb(xgi::Input *in, xgi::Output *out)
Definition: FWEPWrapper.cc:978
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::pathNames ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 743 of file FUEventProcessor.cc.

References evtProcessor_, dbtoconf::out, and scalersLegendaInfoSpace_.

Referenced by FUEventProcessor().

745 {
746 
747  if(evtProcessor_ != 0){
748  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
749  if(legenda !=0){
750  std::string slegenda = ((xdata::String*)legenda)->value_;
751  *out << slegenda << std::endl;
752  }
753  }
754 }
xdata::InfoSpace * scalersLegendaInfoSpace_
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::procStat ( xgi::Input in,
xgi::Output out 
)

Definition at line 1810 of file FUEventProcessor.cc.

References evf::utils::procStat().

Referenced by FUEventProcessor(), and receivingAndMonitor().

1811 {
1813 }
void procStat(std::ostringstream *out)
Definition: procUtils.cc:157
tuple out
Definition: dbtoconf.py:99
bool FUEventProcessor::receiving ( toolbox::task::WorkLoop *  wl)
private

Definition at line 880 of file FUEventProcessor.cc.

References ExpressReco_HICollisions_FallBack::e, cmsRelvalreport::exit, evf::StateMachine::fireEvent(), fsm_, edm::PresenceFactory::get(), edm::PresenceFactory::makePresence(), runTheMatrix::msg, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, myProcess_, evf::SubProcess::postSlave(), evf::SubProcess::rcvSlave(), stop_lock_, and stopClassic().

Referenced by startReceivingLoop().

881 {
882  MsgBuf msg;
883  try{
884  myProcess_->rcvSlave(msg,false); //will receive only messages from Master
885  if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
886  {
887  pthread_mutex_lock(&stop_lock_);
888  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
889  try{
890  LOG4CPLUS_DEBUG(getApplicationLogger(),
891  "Trying to create message service presence ");
893  if(pf != 0) {
894  pf->makePresence("MessageServicePresence").release();
895  }
896  else {
897  LOG4CPLUS_ERROR(getApplicationLogger(),
898  "Unable to create message service presence ");
899  }
900  }
901  catch(...) {
902  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
903  }
904  stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
906  myProcess_->postSlave(msg1,false);
907  pthread_mutex_unlock(&stop_lock_);
908  fclose(stdout);
909  fclose(stderr);
910  exit(EXIT_SUCCESS);
911  }
912  }
913  catch(evf::Exception &e){}
914  return true;
915 }
unsigned long rcvSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:105
#define MSQS_MESSAGE_TYPE_STOP
Definition: queue_defs.h:28
evf::StateMachine fsm_
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:99
void fireEvent(const std::string &evtType, void *originator)
static PresenceFactory * get()
std::auto_ptr< Presence > makePresence(std::string const &presence_type) const
#define MSQM_MESSAGE_TYPE_STOP
Definition: queue_defs.h:16
pthread_mutex_t stop_lock_
bool FUEventProcessor::receivingAndMonitor ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1308 of file FUEventProcessor.cc.

References anonymousPipe_, applicationInfoSpace_, harvestRelVal::args, prof2calltree::count, gather_cfg::cout, cycle, runTheMatrix::data, defaultWebPage(), evf::prg::dqm, ExpressReco_HICollisions_FallBack::e, evf::FWEPWrapper::epMAltState_, evf::FWEPWrapper::epmAltState_, evtProcessor_, edm::hlt::Exception, cmsRelvalreport::exit, exitOnError_, spr::find(), first, fsm_, evf::internal::MyCgi::getEnvironment(), recoMuon::in, Input, evf::prg::ls, evf::FWEPWrapper::lsid_, MAX_PIPE_BUFFER_SIZE, evf::FWEPWrapper::microState(), moduleWeb(), evf::FWEPWrapper::monitoring(), evf::prg::Ms, evf::prg::ms, runTheMatrix::msg, MSQM_MESSAGE_TYPE_MCS, MSQM_MESSAGE_TYPE_PRG, MSQM_MESSAGE_TYPE_TRP, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_MCR, MSQS_MESSAGE_TYPE_PRR, MSQS_MESSAGE_TYPE_WEB, myProcess_, evf::prg::nba, evf::prg::nbp, NUMERIC_MESSAGE_SIZE, dbtoconf::out, Output, PIPE_WRITE, pos, evf::SubProcess::postSlave(), procStat(), evf::prg::ps, evf::FWEPWrapper::psid_, o2o::query, evf::SubProcess::rcvSlave(), scalersUpdates_, edm::second(), edm::event_processor::sError, stor::utils::sleep(), spotlightWebPage(), edm::event_processor::sStopping, evf::StateMachine::stateName(), stop_lock_, EcalElecEmulTccFlatFileProducerFromTPG_cfg::tokens, edm::EventProcessor::totalEvents(), edm::EventProcessor::totalEventsPassed(), and evf::prg::trp.

Referenced by startReceivingMonitorLoop().

1309 {
1310  MsgBuf msg;
1311  try{
1312  myProcess_->rcvSlave(msg,true); //will receive only messages from Master
1313  switch(msg->mtype)
1314  {
1315  case MSQM_MESSAGE_TYPE_MCS:
1316  {
1317  xgi::Input *in = 0;
1318  xgi::Output out;
1319  evtProcessor_.microState(in,&out);
1320  MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
1321  strncpy(msg1->mtext,out.str().c_str(),out.str().size());
1322  myProcess_->postSlave(msg1,true);
1323  break;
1324  }
1325 
1326  case MSQM_MESSAGE_TYPE_PRG:
1327  {
1328  xdata::Serializable *dqmp = 0;
1329  xdata::UnsignedInteger32 *dqm = 0;
1331  try{
1332  dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
1333  } catch(xdata::exception::Exception e){}
1334  if(dqmp!=0)
1335  dqm = (xdata::UnsignedInteger32*)dqmp;
1336  MsgBuf msg1(sizeof(prg),MSQS_MESSAGE_TYPE_PRR);
1337  // monitorInfoSpace_->lock();
1338  prg * data = (prg*)msg1->mtext;
1339  data->ls = evtProcessor_.lsid_;
1340  data->ps = evtProcessor_.psid_;
1341  data->nbp = evtProcessor_->totalEvents();
1342  data->nba = evtProcessor_->totalEventsPassed();
1343  data->Ms = evtProcessor_.epMAltState_.value_;
1344  data->ms = evtProcessor_.epmAltState_.value_;
1345  if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
1346  data->trp = scalersUpdates_;
1347  // monitorInfoSpace_->unlock();
1348  myProcess_->postSlave(msg1,true);
1349  if(exitOnError_.value_)
1350  {
1351  // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so
1352  // std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
1354  {
1355  bool running = true;
1356  int count = 0;
1357  while(running){
1358  int retval = pthread_mutex_lock(&stop_lock_);
1359  if(retval != 0) perror("error");
1360  running = fsm_.stateName()->toString()=="Enabled";
1361  if(count>5) exit(-1);
1362  pthread_mutex_unlock(&stop_lock_);
1363  if(running) {::sleep(1); count++;}
1364  }
1365  }
1366 
1367  }
1368  // scalersUpdates_++;
1369  break;
1370  }
1371  case MSQM_MESSAGE_TYPE_WEB:
1372  {
1373  xgi::Input *in = 0;
1374  xgi::Output out;
1375  unsigned int bytesToSend = 0;
1377  std::string query = msg->mtext;
1378  size_t pos = query.find_first_of("&");
1379  std::string method;
1380  std::string args;
1381  if(pos!=std::string::npos)
1382  {
1383  method = query.substr(0,pos);
1384  args = query.substr(pos+1,query.length()-pos-1);
1385  }
1386  else
1387  method=query;
1388 
1389  if(method=="Spotlight")
1390  {
1391  spotlightWebPage(in,&out);
1392  }
1393  else if(method=="procStat")
1394  {
1395  procStat(in,&out);
1396  }
1397  else if(method=="moduleWeb")
1398  {
1399  internal::MyCgi mycgi;
1400  boost::char_separator<char> sep(";");
1401  boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
1402  for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
1403  tok_iter != tokens.end(); ++tok_iter){
1404  size_t pos = (*tok_iter).find_first_of("%");
1405  if(pos != std::string::npos){
1406  std::string first = (*tok_iter).substr(0 , pos);
1407  std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
1408  mycgi.getEnvironment()[first]=second;
1409  }
1410  }
1411  moduleWeb(&mycgi,&out);
1412  }
1413  else if(method=="Default")
1414  {
1415  defaultWebPage(in,&out);
1416  }
1417  else
1418  {
1419  out << "Error 404!!!!!!!!" << std::endl;
1420  }
1421 
1422 
1423  bytesToSend = out.str().size();
1424  unsigned int cycle = 0;
1425  if(bytesToSend==0)
1426  {
1427  snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
1428  myProcess_->postSlave(msg1,true);
1429  }
1430  while(bytesToSend !=0){
1431  unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
1432  snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
1433  write(anonymousPipe_[PIPE_WRITE],
1434  out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
1435  msgSize);
1436  myProcess_->postSlave(msg1,true);
1437  bytesToSend -= msgSize;
1438  cycle++;
1439  }
1440  break;
1441  }
1442  case MSQM_MESSAGE_TYPE_TRP:
1443  {
1444  break;
1445  }
1446  }
1447  }
1448  catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
1449  return true;
1450 }
unsigned int ps
Definition: queue_defs.h:56
unsigned long rcvSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:105
void spotlightWebPage(xgi::Input *, xgi::Output *)
#define Input(cl)
Definition: vmac.h:189
unsigned int nbp
Definition: queue_defs.h:57
xdata::Integer epmAltState_
Definition: FWEPWrapper.h:191
std::map< std::string, std::string, std::less< std::string > > & getEnvironment()
boost::tokenizer< boost::char_separator< char > > tokenizer
unsigned int ls
Definition: queue_defs.h:55
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
void sleep(Duration_t)
Definition: Utils.h:163
xdata::Boolean exitOnError_
U second(std::pair< T, U > const &p)
#define MSQM_MESSAGE_TYPE_PRG
Definition: queue_defs.h:18
#define MSQS_MESSAGE_TYPE_WEB
Definition: queue_defs.h:30
#define MSQS_MESSAGE_TYPE_PRR
Definition: queue_defs.h:29
evf::StateMachine fsm_
int cycle
unsigned int lsid_
Definition: FWEPWrapper.h:204
bool monitoring(toolbox::task::WorkLoop *wl)
Definition: FWEPWrapper.cc:564
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:99
unsigned int psid_
Definition: FWEPWrapper.h:205
unsigned int Ms
Definition: queue_defs.h:59
#define NUMERIC_MESSAGE_SIZE
Definition: queue_defs.h:35
bool first
Definition: L1TdeRCT.cc:79
int totalEvents() const
void procStat(xgi::Input *in, xgi::Output *out)
unsigned int nba
Definition: queue_defs.h:58
tuple out
Definition: dbtoconf.py:99
#define MSQM_MESSAGE_TYPE_TRP
Definition: queue_defs.h:20
#define MSQS_MESSAGE_TYPE_MCR
Definition: queue_defs.h:27
xdata::InfoSpace * applicationInfoSpace_
xdata::String * stateName()
Definition: StateMachine.h:69
unsigned int scalersUpdates_
#define PIPE_WRITE
Definition: queue_defs.h:38
void moduleWeb(xgi::Input *in, xgi::Output *out)
dictionary args
#define Output(cl)
Definition: vmac.h:193
void defaultWebPage(xgi::Input *in, xgi::Output *out)
unsigned int trp
Definition: queue_defs.h:62
tuple query
Definition: o2o.py:269
xdata::Integer epMAltState_
Definition: FWEPWrapper.h:190
tuple cout
Definition: gather_cfg.py:41
#define MSQM_MESSAGE_TYPE_MCS
Definition: queue_defs.h:17
#define MSQM_MESSAGE_TYPE_WEB
Definition: queue_defs.h:19
void microState(xgi::Input *in, xgi::Output *out)
#define MAX_PIPE_BUFFER_SIZE
Definition: queue_defs.h:39
int totalEventsPassed() const
pthread_mutex_t stop_lock_
unsigned int dqm
Definition: queue_defs.h:61
unsigned int ms
Definition: queue_defs.h:60
bool FUEventProcessor::scalers ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1208 of file FUEventProcessor.cc.

References gather_cfg::cout, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), evf::FWEPWrapper::getPackedTriggerReport(), evf::FWEPWrapper::getTriggerReport(), myProcess_, evf::SubProcess::postSlave(), runTheMatrix::ret, scalersUpdates_, and wlScalersActive_.

Referenced by startScalersWorkLoop().

1209 {
1210  if(evtProcessor_)
1211  {
1212  if(!evtProcessor_.getTriggerReport(true)) {
1213  wlScalersActive_ = false;
1214  return false;
1215  }
1216  }
1217  else
1218  {
1219  std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
1220  wlScalersActive_ = false;
1221  return false;
1222  }
1223  if(myProcess_)
1224  {
1225  // std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
1227  if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
1228  scalersUpdates_++;
1229  }
1230  else
1232  return true;
1233 }
MsgBuf & getPackedTriggerReport()
Definition: FWEPWrapper.h:110
bool fireScalersUpdate()
Definition: FWEPWrapper.cc:694
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:99
bool getTriggerReport(bool useLock)
Definition: FWEPWrapper.cc:611
unsigned int scalersUpdates_
tuple cout
Definition: gather_cfg.py:41
void FUEventProcessor::scalersWeb ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 730 of file FUEventProcessor.cc.

References evtProcessor_, evf::FWEPWrapper::getPackedTriggerReportAsStruct(), and dbtoconf::out.

Referenced by FUEventProcessor().

732 {
733 
734  out->getHTTPResponseHeader().addHeader( "Content-Type",
735  "application/octet-stream" );
736  out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
737  "binary" );
738  if(evtProcessor_ != 0){
740  }
741 }
tuple out
Definition: dbtoconf.py:99
TriggerReportStatic * getPackedTriggerReportAsStruct()
Definition: FWEPWrapper.h:111
void FUEventProcessor::sendMessageOverMonitorQueue ( MsgBuf buf)

Definition at line 1815 of file FUEventProcessor.cc.

References myProcess_, and evf::SubProcess::postSlave().

1816 {
1817  if(myProcess_) myProcess_->postSlave(buf,true);
1818 }
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:99
void evf::FUEventProcessor::serviceWeb ( xgi::Input in,
xgi::Output out 
)
inline

Definition at line 108 of file FUEventProcessor.h.

References evtProcessor_, and evf::FWEPWrapper::serviceWeb().

Referenced by FUEventProcessor().

void serviceWeb(xgi::Input *in, xgi::Output *out)
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::spotlightWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 645 of file FUEventProcessor.cc.

References configuration_, evtProcessor_, fsm_, recoMuon::in, myProcess_, nbSubProcesses_, dbtoconf::out, evf::StateMachine::stateName(), evf::FWEPWrapper::summaryWebPage(), and evf::FWEPWrapper::taskWebPage().

Referenced by FUEventProcessor(), and receivingAndMonitor().

647 {
648 
649  std::string urn = getApplicationDescriptor()->getURN();
650 
651  *out << "<!-- base href=\"/" << urn
652  << "\"> -->" << std::endl;
653  *out << "<html>" << std::endl;
654  *out << "<head>" << std::endl;
655  *out << "<link type=\"text/css\" rel=\"stylesheet\"";
656  *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
657  *out << "<title>" << getApplicationDescriptor()->getClassName()
658  << getApplicationDescriptor()->getInstance()
659  << " MAIN</title>" << std::endl;
660  *out << "</head>" << std::endl;
661  *out << "<body>" << std::endl;
662  *out << "<table border=\"0\" width=\"100%\">" << std::endl;
663  *out << "<tr>" << std::endl;
664  *out << " <td align=\"left\">" << std::endl;
665  *out << " <img" << std::endl;
666  *out << " align=\"middle\"" << std::endl;
667  *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
668  *out << " alt=\"main\"" << std::endl;
669  *out << " width=\"64\"" << std::endl;
670  *out << " height=\"64\"" << std::endl;
671  *out << " border=\"\"/>" << std::endl;
672  *out << " <b>" << std::endl;
673  *out << getApplicationDescriptor()->getClassName()
674  << getApplicationDescriptor()->getInstance() << std::endl;
675  *out << " " << fsm_.stateName()->toString() << std::endl;
676  *out << " </b>" << std::endl;
677  *out << " </td>" << std::endl;
678  *out << " <td width=\"32\">" << std::endl;
679  *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
680  *out << " <img" << std::endl;
681  *out << " align=\"middle\"" << std::endl;
682  *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
683  *out << " alt=\"HyperDAQ\"" << std::endl;
684  *out << " width=\"32\"" << std::endl;
685  *out << " height=\"32\"" << std::endl;
686  *out << " border=\"\"/>" << std::endl;
687  *out << " </a>" << std::endl;
688  *out << " </td>" << std::endl;
689  *out << " <td width=\"32\">" << std::endl;
690  *out << " </td>" << std::endl;
691  *out << " <td width=\"32\">" << std::endl;
692  *out << " <a href=\"/" << urn << "/\">" << std::endl;
693  *out << " <img" << std::endl;
694  *out << " align=\"middle\"" << std::endl;
695  *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
696  *out << " alt=\"main\"" << std::endl;
697  *out << " width=\"32\"" << std::endl;
698  *out << " height=\"32\"" << std::endl;
699  *out << " border=\"\"/>" << std::endl;
700  *out << " </a>" << std::endl;
701  *out << " </td>" << std::endl;
702  *out << "</tr>" << std::endl;
703  *out << "</table>" << std::endl;
704 
705  *out << "<hr/>" << std::endl;
706 
707  std::ostringstream ost;
708  if(myProcess_)
709  ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
710  else
711  ost << "/moduleWeb?";
712  urn += ost.str();
713  if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
715  else if(evtProcessor_)
717  else
718  *out << "<td>HLT Unconfigured</td>" << std::endl;
719  *out << "</table>" << std::endl;
720 
721  *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
722  *out << configuration_ << std::endl;
723  *out << "</textarea><P>" << std::endl;
724 
725  *out << "</body>" << std::endl;
726  *out << "</html>" << std::endl;
727 
728 
729 }
void summaryWebPage(xgi::Input *, xgi::Output *, const std::string &)
Definition: FWEPWrapper.cc:723
void taskWebPage(xgi::Input *, xgi::Output *, const std::string &)
Definition: FWEPWrapper.cc:789
evf::StateMachine fsm_
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
void FUEventProcessor::startReceivingLoop ( )
private

Definition at line 843 of file FUEventProcessor.cc.

References asReceiveMsgAndExecute_, ExpressReco_HICollisions_FallBack::e, edm::hlt::Exception, runTheMatrix::msg, receiving(), receiving_, and wlReceiving_.

Referenced by enableMPEPSlave().

844 {
845  try {
846  wlReceiving_=
847  toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
848  "waiting");
849  if (!wlReceiving_->isActive()) wlReceiving_->activate();
850  asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
851  "Receiving");
853  receiving_ = true;
854  }
855  catch (xcept::Exception& e) {
856  std::string msg = "Failed to start workloop 'Receiving'.";
857  XCEPT_RETHROW(evf::Exception,msg,e);
858  }
859 }
toolbox::task::WorkLoop * wlReceiving_
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
bool receiving(toolbox::task::WorkLoop *wl)
void FUEventProcessor::startReceivingMonitorLoop ( )
private

Definition at line 860 of file FUEventProcessor.cc.

References asReceiveMsgAndRead_, ExpressReco_HICollisions_FallBack::e, edm::hlt::Exception, runTheMatrix::msg, receivingAndMonitor(), receivingM_, and wlReceivingMonitor_.

Referenced by enableMPEPSlave().

861 {
862  try {
864  toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
865  "waiting");
866  if (!wlReceivingMonitor_->isActive())
867  wlReceivingMonitor_->activate();
869  toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
870  "ReceivingM");
872  receivingM_ = true;
873  }
874  catch (xcept::Exception& e) {
875  std::string msg = "Failed to start workloop 'ReceivingM'.";
876  XCEPT_RETHROW(evf::Exception,msg,e);
877  }
878 }
bool receivingAndMonitor(toolbox::task::WorkLoop *wl)
toolbox::task::ActionSignature * asReceiveMsgAndRead_
toolbox::task::WorkLoop * wlReceivingMonitor_
void FUEventProcessor::startScalersWorkLoop ( )
throw (evf::Exception
)
private

Definition at line 1165 of file FUEventProcessor.cc.

References asScalers_, ExpressReco_HICollisions_FallBack::e, edm::hlt::Exception, runTheMatrix::msg, scalers(), wlScalers_, and wlScalersActive_.

Referenced by enableMPEPSlave().

1166 {
1167  try {
1168  wlScalers_=
1169  toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
1170  "waiting");
1171  if (!wlScalers_->isActive()) wlScalers_->activate();
1172  asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
1173  "Scalers");
1174 
1175  wlScalers_->submit(asScalers_);
1176  wlScalersActive_ = true;
1177  }
1178  catch (xcept::Exception& e) {
1179  std::string msg = "Failed to start workloop 'Scalers'.";
1180  XCEPT_RETHROW(evf::Exception,msg,e);
1181  }
1182 }
toolbox::task::WorkLoop * wlScalers_
bool scalers(toolbox::task::WorkLoop *wl)
toolbox::task::ActionSignature * asScalers_
void FUEventProcessor::startSummarizeWorkLoop ( )
throw (evf::Exception
)
private

Definition at line 1186 of file FUEventProcessor.cc.

References asSummarize_, ExpressReco_HICollisions_FallBack::e, edm::hlt::Exception, runTheMatrix::msg, summarize(), wlSummarize_, and wlSummarizeActive_.

Referenced by enabling().

1187 {
1188  try {
1189  wlSummarize_=
1190  toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
1191  "waiting");
1192  if (!wlSummarize_->isActive()) wlSummarize_->activate();
1193 
1194  asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
1195  "Summary");
1196 
1197  wlSummarize_->submit(asSummarize_);
1198  wlSummarizeActive_ = true;
1199  }
1200  catch (xcept::Exception& e) {
1201  std::string msg = "Failed to start workloop 'Summarize'.";
1202  XCEPT_RETHROW(evf::Exception,msg,e);
1203  }
1204 }
bool summarize(toolbox::task::WorkLoop *wl)
toolbox::task::WorkLoop * wlSummarize_
toolbox::task::ActionSignature * asSummarize_
void FUEventProcessor::startSupervisorLoop ( )
private

Definition at line 825 of file FUEventProcessor.cc.

References asSupervisor_, ExpressReco_HICollisions_FallBack::e, edm::hlt::Exception, runTheMatrix::msg, supervising_, supervisor(), and wlSupervising_.

Referenced by FUEventProcessor().

826 {
827  try {
829  toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
830  "waiting");
831  if (!wlSupervising_->isActive()) wlSupervising_->activate();
832  asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
833  "Supervisor");
834  wlSupervising_->submit(asSupervisor_);
835  supervising_ = true;
836  }
837  catch (xcept::Exception& e) {
838  std::string msg = "Failed to start workloop 'Supervisor'.";
839  XCEPT_RETHROW(evf::Exception,msg,e);
840  }
841 }
toolbox::task::ActionSignature * asSupervisor_
toolbox::task::WorkLoop * wlSupervising_
bool supervisor(toolbox::task::WorkLoop *wl)
bool FUEventProcessor::stopClassic ( )
private

Definition at line 1567 of file FUEventProcessor.cc.

References detachDqmFromShm(), ExpressReco_HICollisions_FallBack::e, edm::IEventProcessor::epSuccess, edm::IEventProcessor::epTimedOut, evtProcessor_, edm::hlt::Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, localLog(), reasonForFailedState_, and evf::FWEPWrapper::stop().

Referenced by receiving(), and stopping().

1568 {
1569  try {
1570  LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
1573  fsm_.fireEvent("StopDone",this);
1574  else
1575  {
1576  // epMState_ = evtProcessor_->currentStateName();
1578  reasonForFailedState_ = "EventProcessor stop timed out";
1579  else
1580  reasonForFailedState_ = "EventProcessor did not receive STOP event";
1583  }
1585  }
1586  catch (xcept::Exception &e) {
1587  reasonForFailedState_ = "stopping FAILED: " + (std::string)e.what();
1590  }
1591  LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
1592  localLog("-I- Stop completed");
1593  return false;
1594 }
xdata::Boolean hasShMem_
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
std::string reasonForFailedState_
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
edm::EventProcessor::StatusCode stop()
Definition: FWEPWrapper.cc:438
bool FUEventProcessor::stopping ( toolbox::task::WorkLoop *  wl)

Definition at line 485 of file FUEventProcessor.cc.

References nbSubProcesses_, evf::Vulture::stop(), stopClassic(), stopSlavesAndAcknowledge(), and vulture_.

Referenced by supervisor().

486 {
487  if(nbSubProcesses_.value_!=0)
489  vulture_->stop();
490  return stopClassic();
491 }
int stop()
Definition: Vulture.cc:248
xdata::UnsignedInteger32 nbSubProcesses_
void FUEventProcessor::stopSlavesAndAcknowledge ( )
private

Definition at line 1596 of file FUEventProcessor.cc.

References ExpressReco_HICollisions_FallBack::e, evf::StateMachine::fireFailed(), fsm_, i, localLog(), MAX_MSG_SIZE, runTheMatrix::msg, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, reasonForFailedState_, stop_lock_, and subs_.

Referenced by halting(), and stopping().

1597 {
1600 
1601  for(unsigned int i = 0; i < subs_.size(); i++)
1602  {
1603  pthread_mutex_lock(&stop_lock_);
1604  if(subs_[i].alive()>0)subs_[i].post(msg,false);
1605  pthread_mutex_unlock(&stop_lock_);
1606  }
1607  for(unsigned int i = 0; i < subs_.size(); i++)
1608  {
1609  pthread_mutex_lock(&stop_lock_);
1610  if(subs_[i].alive()>0){
1611  try{
1612  subs_[i].rcv(msg1,false);
1613  }
1614  catch(evf::Exception &e){
1615  std::ostringstream ost;
1616  ost << "failed to get STOP - errno ->" << errno << " " << e.what();
1617  reasonForFailedState_ = ost.str();
1618  LOG4CPLUS_WARN(getApplicationLogger(),reasonForFailedState_);
1621  break;
1622  }
1623  }
1624  else {
1625  pthread_mutex_unlock(&stop_lock_);
1626  continue;
1627  }
1628  pthread_mutex_unlock(&stop_lock_);
1629  if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
1630  while(subs_[i].alive()>0) ::usleep(10000);
1631  subs_[i].disconnect();
1632  }
1633  // subs_.clear();
1634 
1635 }
int i
Definition: DBlmapReader.cc:9
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
std::vector< SubProcess > subs_
void fireFailed(const std::string &errorMsg, void *originator)
#define MSQS_MESSAGE_TYPE_STOP
Definition: queue_defs.h:28
evf::StateMachine fsm_
std::string reasonForFailedState_
void localLog(std::string)
#define MSQM_MESSAGE_TYPE_STOP
Definition: queue_defs.h:16
pthread_mutex_t stop_lock_
void FUEventProcessor::subWeb ( xgi::Input in,
xgi::Output out 
)

Definition at line 561 of file FUEventProcessor.cc.

References anonymousPipe_, gather_cfg::cout, generateEDF::done, asciidump::els, i, j, Association::map, MAX_MSG_SIZE, MAX_PIPE_BUFFER_SIZE, mod(), runTheMatrix::msg, MSGQ_MESSAGE_TYPE_RANGE, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_WEB, evf::utils::pid, csv2json::pieces, PIPE_READ, SiPixelLorentzAngle_cfi::read, stor::utils::sleep(), subs_, and superSleepSec_.

Referenced by FUEventProcessor().

562 {
563  using namespace cgicc;
564  pid_t pid = 0;
565  std::ostringstream ost;
566  ost << "&";
567 
568  Cgicc cgi(in);
570  for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
571  mycgi->getEnvironment().begin();
572  mit != mycgi->getEnvironment().end(); mit++)
573  ost << mit->first << "%" << mit->second << ";";
574  std::vector<FormEntry> els = cgi.getElements() ;
575  std::vector<FormEntry> el1;
576  cgi.getElement("method",el1);
577  std::vector<FormEntry> el2;
578  cgi.getElement("process",el2);
579  if(el1.size()!=0) {
580  std::string meth = el1[0].getValue();
581  if(el2.size()!=0) {
582  unsigned int i = 0;
583  std::string mod = el2[0].getValue();
584  pid = atoi(mod.c_str()); // get the process id to be polled
585  for(; i < subs_.size(); i++)
586  if(subs_[i].pid()==pid) break;
587  if(i>=subs_.size()){ //process was not found, let the browser know
588  *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
589  return;
590  }
591  if(subs_[i].alive() != 1){
592  *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
593  return;
594  }
595  MsgBuf msg1(meth.length()+ost.str().length(),MSQM_MESSAGE_TYPE_WEB);
596  strcpy(msg1->mtext,meth.c_str());
597  strcpy(msg1->mtext+meth.length(),ost.str().c_str());
598  subs_[i].post(msg1,true);
599  unsigned int keep_supersleep_original_value = superSleepSec_.value_;
600  superSleepSec_.value_=10*keep_supersleep_original_value;
602  bool done = false;
603  std::vector<char *>pieces;
604  while(!done){
605  unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
606  if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
607  ::sleep(1);
608  continue;
609  }
610  unsigned int nbytes = atoi(msg->mtext);
611  if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
612  char *buf= new char[nbytes];
613  ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
614  if(retval!=nbytes) std::cout
615  << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
616  pieces.push_back(buf);
617  }
618  superSleepSec_.value_=keep_supersleep_original_value;
619  for(unsigned int j = 0; j < pieces.size(); j++){
620  *out<<pieces[j]; // chain the buffers into the output strstream
621  delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
622  }
623  }
624  }
625 }
int i
Definition: DBlmapReader.cc:9
#define MSGQ_MESSAGE_TYPE_RANGE
Definition: queue_defs.h:13
tuple pieces
Definition: csv2json.py:31
tuple els
Definition: asciidump.py:420
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
void sleep(Duration_t)
Definition: Utils.h:163
dictionary map
Definition: Association.py:160
std::vector< SubProcess > subs_
#define MSQS_MESSAGE_TYPE_WEB
Definition: queue_defs.h:30
#define PIPE_READ
Definition: queue_defs.h:37
int j
Definition: DBlmapReader.cc:9
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 superSleepSec_
tuple cout
Definition: gather_cfg.py:41
#define MSQM_MESSAGE_TYPE_WEB
Definition: queue_defs.h:19
#define MAX_PIPE_BUFFER_SIZE
Definition: queue_defs.h:39
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
bool FUEventProcessor::summarize ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1236 of file FUEventProcessor.cc.

References gather_cfg::cout, cpustat_, ExpressReco_HICollisions_FallBack::e, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), fsm_, evf::FWEPWrapper::getLumiSectionReferenceIndex(), evf::FWEPWrapper::getPackedTriggerReportAsStruct(), i, iDieStatisticsGathering_, evf::TriggerReportStatic::lumiSection, MAX_MSG_SIZE, runTheMatrix::msg, MSQS_MESSAGE_TYPE_TRR, nbSubProcessesReporting_, ratestat_, evf::CPUStat::reset(), evf::FWEPWrapper::resetPackedTriggerReport(), runTheMatrix::ret, evf::RateStat::sendStat(), evf::CPUStat::sendStat(), stor::utils::sleep(), evf::StateMachine::stateName(), subs_, evf::FWEPWrapper::sumAndPackTriggerReport(), evf::FWEPWrapper::updateRollingReport(), evf::FWEPWrapper::withdrawLumiSectionIncrement(), and wlScalersActive_.

Referenced by startSummarizeWorkLoop().

1237 {
1240  bool atLeastOneProcessUpdatedSuccessfully = false;
1241  int msgCount = 0;
1242  for (unsigned int i = 0; i < subs_.size(); i++)
1243  {
1244  if(subs_[i].alive()>0)
1245  {
1246 
1247  int ret = 0;
1248  try{
1249  ret = subs_[i].rcv(msg,false);
1250  msgCount++;
1251  }
1252  catch(evf::Exception &e)
1253  {
1254  std::cout << "exception in msgrcv on " << i
1255  << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
1256  continue;
1257  //do nothing special
1258  }
1259 
1260 
1261  if(ret==MSQS_MESSAGE_TYPE_TRR) {
1262  TriggerReportStatic *trp = (TriggerReportStatic *)msg->mtext;
1264  std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
1265  << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
1266  subs_[i].post(msg,false);
1267  }else{
1268  atLeastOneProcessUpdatedSuccessfully = true;
1270  }
1271  }
1272  else std::cout << "msgrcv returned error " << errno << std::endl;
1273  }
1274  }
1275  if(atLeastOneProcessUpdatedSuccessfully){
1276  nbSubProcessesReporting_.value_ = msgCount;
1279  }
1280  else{
1281  LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
1282  if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
1283  nbSubProcessesReporting_.value_ = 0;
1284  ::sleep(10);
1285  }
1286  if(fsm_.stateName()->toString()!="Enabled"){
1287  wlScalersActive_ = false;
1288  return false;
1289  }
1290  // cpustat_->printStat();
1291  if(iDieStatisticsGathering_.value_){
1292  try{
1295  sizeof(TriggerReportStatic),
1297  }catch(evf::Exception &e){
1298  LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
1299  << e.what());
1300  }
1301  }
1302  cpustat_->reset();
1303  return true;
1304 }
int i
Definition: DBlmapReader.cc:9
void updateRollingReport()
#define MSQS_MESSAGE_TYPE_TRR
Definition: queue_defs.h:31
xdata::UnsignedInteger32 nbSubProcessesReporting_
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
void sleep(Duration_t)
Definition: Utils.h:163
std::vector< SubProcess > subs_
bool fireScalersUpdate()
Definition: FWEPWrapper.cc:694
evf::StateMachine fsm_
xdata::Boolean iDieStatisticsGathering_
void resetPackedTriggerReport()
Definition: FWEPWrapper.h:107
void sendStat(unsigned int)
Definition: CPUStat.cc:21
void reset()
Definition: CPUStat.h:21
xdata::String * stateName()
Definition: StateMachine.h:69
unsigned int getLumiSectionReferenceIndex()
Definition: FWEPWrapper.h:135
tuple cout
Definition: gather_cfg.py:41
void withdrawLumiSectionIncrement()
Definition: FWEPWrapper.h:134
void sumAndPackTriggerReport(MsgBuf &buf)
TriggerReportStatic * getPackedTriggerReportAsStruct()
Definition: FWEPWrapper.h:111
void sendStat(const unsigned char *, size_t, unsigned int)
Definition: RateStat.cc:19
bool FUEventProcessor::supervisor ( toolbox::task::WorkLoop *  wl)
private

Definition at line 917 of file FUEventProcessor.cc.

References evf::CPUStat::addEntry(), evf::FWEPWrapper::adjustLsIndexForRestart(), applicationInfoSpace_, autoRestartSlaves_, gather_cfg::cout, cpustat_, evf::StateMachine::disableRcmsStateNotification(), evf::prg::dqm, ExpressReco_HICollisions_FallBack::e, enableMPEPSlave(), evtProcessor_, cmsCodeRules.cppFunctionSkipper::exception, edm::hlt::Exception, spr::find(), evf::StateMachine::fireEvent(), fsm_, i, localLog(), log_, evf::prg::ls, python.rootplot.utilities::ls(), MAX_MSG_SIZE, evf::FWEPWrapper::moduleNameFromIndex(), monitorInfoSpace_, evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_PRG, MSQS_MESSAGE_TYPE_PRR, myProcess_, names_, nbAccepted, nbdead_, nblive_, nbProcessed, nbSubProcesses_, nbTotalDQM_, evf::FWEPWrapper::notstarted_state_code(), L1TEmulatorMonitor_cff::p, pickup_lock_, evf::utils::pid, evf::prg::ps, evf::FWEPWrapper::resetPackedTriggerReport(), scalersUpdates_, edm::event_processor::sError, edm::event_processor::sInit, edm::event_processor::sInvalid, slaveRestartDelaySecs_, stor::utils::sleep(), spMStates_, spmStates_, edm::event_processor::sStopping, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), stop_lock_, stopping(), subs_, superSleepSec_, and evf::prg::trp.

Referenced by startSupervisorLoop().

918 {
919  pthread_mutex_lock(&stop_lock_);
920  if(subs_.size()!=nbSubProcesses_.value_)
921  {
922  subs_.resize(nbSubProcesses_.value_);
923  spMStates_.resize(nbSubProcesses_.value_);
924  spmStates_.resize(nbSubProcesses_.value_);
925  for(unsigned int i = 0; i < spMStates_.size(); i++)
926  {
928  spmStates_[i] = 0;
929  }
930  }
931  bool running = fsm_.stateName()->toString()=="Enabled";
932  bool stopping = fsm_.stateName()->toString()=="stopping";
933  for(unsigned int i = 0; i < subs_.size(); i++)
934  {
935  if(subs_[i].alive()==-1000) continue;
936  int sl;
937 
938  pid_t killedOrNot = waitpid(subs_[i].pid(),&sl,WNOHANG);
939 
940  if(killedOrNot==subs_[i].pid()) subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
941  else continue;
942  pthread_mutex_lock(&pickup_lock_);
943  std::ostringstream ost;
944  if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
945  else if(WIFSIGNALED(sl)!=0) ost << " process terminated with signal " << WTERMSIG(sl);
946  else ost << " process stopped ";
947  subs_[i].countdown()=slaveRestartDelaySecs_.value_;
948  subs_[i].setReasonForFailed(ost.str());
950  spmStates_[i] = 0;
951  std::ostringstream ost1;
952  ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
953  localLog(ost1.str());
954  if(!autoRestartSlaves_.value_) subs_[i].disconnect();
955  pthread_mutex_unlock(&pickup_lock_);
956  }
957  pthread_mutex_unlock(&stop_lock_);
958  if(stopping) return true; // if in stopping we are done
959 
960  if(running)
961  {
962  // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
963  // this is only active while running, hence, the stop lock is acquired and only released at end of loop
964  if(autoRestartSlaves_.value_){
965  pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
966  for(unsigned int i = 0; i < subs_.size(); i++)
967  {
968  if(subs_[i].alive() != 1){
969  if(subs_[i].countdown() == 0)
970  {
971  if(subs_[i].restartCount()>2){
972  LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
973  << " - maximum restart count reached");
974  std::ostringstream ost1;
975  ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
976  localLog(ost1.str());
977  subs_[i].countdown()--;
978  XCEPT_DECLARE(evf::Exception,
979  sentinelException, ost1.str());
980  notifyQualified("error",sentinelException);
981  continue;
982  }
983  subs_[i].restartCount()++;
984  pid_t rr = subs_[i].forkNew();
985  if(rr==0)
986  {
987  myProcess_=&subs_[i];
988  scalersUpdates_ = 0;
989  int retval = pthread_mutex_destroy(&stop_lock_);
990  if(retval != 0) perror("error");
991  retval = pthread_mutex_init(&stop_lock_,0);
992  if(retval != 0) perror("error");
994  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
995  fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
996  fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
997  try{
998  xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
999  if(lsid) {
1000  ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
1001  }
1002  }
1003  catch(...){
1004  std::cout << "trouble with lsindex during restart" << std::endl;
1005  }
1006  try{
1007  xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
1008  if(lstb) {
1009  ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
1010  }
1011  }
1012  catch(...){
1013  std::cout << "trouble with resetting flag for eol recovery " << std::endl;
1014  }
1015 
1018  enableMPEPSlave();
1019  return false; // exit the supervisor loop immediately in the child !!!
1020  }
1021  else
1022  {
1023  std::ostringstream ost1;
1024  ost1 << "-I- New Process " << rr << " forked for slot " << i;
1025  localLog(ost1.str());
1026  }
1027  }
1028  if(subs_[i].countdown()>=0) subs_[i].countdown()--;
1029  }
1030  }
1031  pthread_mutex_unlock(&stop_lock_);
1032  } // finished handling replacement of dead slaves once they've been reaped
1033  }
1034  xdata::Serializable *lsid = 0;
1035  xdata::Serializable *psid = 0;
1036  xdata::Serializable *epMAltState = 0;
1037  xdata::Serializable *epmAltState = 0;
1038  xdata::Serializable *dqmp = 0;
1039  xdata::UnsignedInteger32 *dqm = 0;
1040 
1041 
1042 
1043  MsgBuf msg1(0,MSQM_MESSAGE_TYPE_PRG);
1045  if(running){
1046  try{
1047  lsid = applicationInfoSpace_->find("lumiSectionIndex");
1048  psid = applicationInfoSpace_->find("prescaleSetIndex");
1049  nbProcessed = monitorInfoSpace_->find("nbProcessed");
1050  nbAccepted = monitorInfoSpace_->find("nbAccepted");
1051  epMAltState = monitorInfoSpace_->find("epSPMacroStateInt");
1052  epmAltState = monitorInfoSpace_->find("epSPMicroStateInt");
1053  dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
1054  }
1056  LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
1057  }
1058 
1059  try{
1060  if(nbProcessed !=0 && nbAccepted !=0)
1061  {
1062  xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
1063  xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
1064  xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
1065  xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
1066  if(dqmp!=0)
1067  dqm = (xdata::UnsignedInteger32*)dqmp;
1068  if(dqm) dqm->value_ = 0;
1069  nbTotalDQM_ = 0;
1070  nbp->value_ = 0;
1071  nba->value_ = 0;
1072  nblive_ = 0;
1073  nbdead_ = 0;
1074  scalersUpdates_ = 0;
1075 
1076  for(unsigned int i = 0; i < subs_.size(); i++)
1077  {
1078  if(subs_[i].alive()>0)
1079  {
1080  nblive_++;
1081  try{
1082  subs_[i].post(msg1,true);
1083 
1084  unsigned long retval = subs_[i].rcvNonBlocking(msg2,true);
1085  if(retval == (unsigned long) msg2->mtype){
1086  prg* p = (struct prg*)(msg2->mtext);
1087  subs_[i].setParams(p);
1088  spMStates_[i] = p->Ms;
1089  spmStates_[i] = p->ms;
1090  cpustat_->addEntry(p->ms);
1091  if(!subs_[i].inInconsistentState() &&
1095  {
1096  std::ostringstream ost;
1097  ost << "edm::eventprocessor slot " << i << " process id "
1098  << subs_[i].pid() << " not in Running state : Mstate="
1099  << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
1101  << " - Look into possible error messages from HLT process";
1102  LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
1103  }
1104  nbp->value_ += subs_[i].params().nbp;
1105  nba->value_ += subs_[i].params().nba;
1106  if(dqm)dqm->value_ += p->dqm;
1107  nbTotalDQM_ += p->dqm;
1108  scalersUpdates_ += p->trp;
1109  if(p->ls > ls->value_) ls->value_ = p->ls;
1110  if(p->ps != ps->value_) ps->value_ = p->ps;
1111  }
1112  else{
1113  nbp->value_ += subs_[i].get_save_nbp();
1114  nba->value_ += subs_[i].get_save_nba();
1115  }
1116  }
1117  catch(evf::Exception &e){
1118  LOG4CPLUS_INFO(getApplicationLogger(),
1119  "could not send/receive msg on slot "
1120  << i << " - " << e.what());
1121  }
1122 
1123  }
1124  else
1125  {
1126  nbp->value_ += subs_[i].get_save_nbp();
1127  nba->value_ += subs_[i].get_save_nba();
1128  nbdead_++;
1129  }
1130  }
1131  }
1132 
1133  }
1134  catch(std::exception &e){
1135  LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
1136  }
1137  catch(...){
1138  LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
1139  }
1140  }
1141  else{
1142  for(unsigned int i = 0; i < subs_.size(); i++)
1143  {
1144  if(subs_[i].alive()==-1000)
1145  {
1147  spmStates_[i] = 0;
1148  }
1149  }
1150  }
1151  try{
1152  monitorInfoSpace_->lock();
1153  monitorInfoSpace_->fireItemGroupChanged(names_,0);
1154  monitorInfoSpace_->unlock();
1155  }
1156  catch(xdata::exception::Exception &e)
1157  {
1158  LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
1159  // localLog(e.what());
1160  }
1161  ::sleep(superSleepSec_.value_);
1162  return true;
1163 }
unsigned int ps
Definition: queue_defs.h:56
xdata::Vector< xdata::Integer > spMStates_
int i
Definition: DBlmapReader.cc:9
xdata::InfoSpace * monitorInfoSpace_
xdata::UnsignedInteger32 slaveRestartDelaySecs_
unsigned int ls
Definition: queue_defs.h:55
xdata::Boolean autoRestartSlaves_
std::string const & moduleNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:118
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
void sleep(Duration_t)
Definition: Utils.h:163
std::vector< SubProcess > subs_
xdata::Serializable * nbProcessed
#define MSQM_MESSAGE_TYPE_PRG
Definition: queue_defs.h:18
void adjustLsIndexForRestart()
Definition: FWEPWrapper.h:108
#define MSQS_MESSAGE_TYPE_PRR
Definition: queue_defs.h:29
evf::StateMachine fsm_
unsigned int Ms
Definition: queue_defs.h:59
void resetPackedTriggerReport()
Definition: FWEPWrapper.h:107
std::list< std::string > names_
void addEntry(int sta)
Definition: CPUStat.h:16
bool stopping(toolbox::task::WorkLoop *wl)
void fireEvent(const std::string &evtType, void *originator)
xdata::Serializable * nbAccepted
void disableRcmsStateNotification()
Definition: StateMachine.h:64
void localLog(std::string)
int notstarted_state_code() const
Definition: FWEPWrapper.h:128
xdata::InfoSpace * applicationInfoSpace_
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
unsigned int scalersUpdates_
xdata::UnsignedInteger32 superSleepSec_
std::string const & stateNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:123
pthread_mutex_t pickup_lock_
unsigned int trp
Definition: queue_defs.h:62
xdata::Vector< xdata::Integer > spmStates_
tuple cout
Definition: gather_cfg.py:41
pthread_mutex_t stop_lock_
unsigned int dqm
Definition: queue_defs.h:61
unsigned int ms
Definition: queue_defs.h:60
void FUEventProcessor::updater ( xgi::Input in,
xgi::Output out 
)

Definition at line 1740 of file FUEventProcessor.cc.

References evf::lsTriplet::acc, evf::utils::cDiv(), configString_, evtProcessor_, fsm_, evf::Vulture::hasStarted(), i, iDieUrl_, evf::FWEPWrapper::lastLumi(), logRing_, logRingIndex_, logRingSize_, logWrap_, evf::lsTriplet::ls, evf::utils::mDiv(), myProcess_, nbAccepted, nbProcessed, nbSubProcesses_, evf::lsTriplet::proc, runNumber_, squidPresent_, evf::StateMachine::stateName(), supervising_, updaterStatic_, evf::utils::uptime(), vp_, vulture_, evf::FWEPWrapper::wlMonitoring(), wlScalersActive_, and wlSummarizeActive_.

Referenced by FUEventProcessor().

1741 {
1742  using namespace utils;
1743 
1744  *out << updaterStatic_;
1745  mDiv(out,"loads");
1746  uptime(out);
1747  cDiv(out);
1748  mDiv(out,"st",fsm_.stateName()->toString());
1749  mDiv(out,"ru",runNumber_.toString());
1750  mDiv(out,"nsl",nbSubProcesses_.value_);
1751  mDiv(out,"cl");
1752  *out << getApplicationDescriptor()->getClassName()
1753  << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
1754  cDiv(out);
1755  mDiv(out,"in",getApplicationDescriptor()->getInstance());
1756  if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
1757  mDiv(out,"hlt");
1758  *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
1759  cDiv(out);
1760  *out << std::endl;
1761  }
1762  else
1763  mDiv(out,"hlt","Not yet...");
1764 
1765  mDiv(out,"sq",squidPresent_.toString());
1766  mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
1768  if(nbProcessed != 0 && nbAccepted != 0)
1769  {
1770  mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
1771  mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
1772  }
1773  else
1774  {
1775  mDiv(out,"tt",0);
1776  mDiv(out,"ac",0);
1777  }
1778  if(!myProcess_)
1779  mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
1780  else
1781  mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
1782 
1783  mDiv(out,"idi",iDieUrl_.value_);
1784  if(vp_!=0){
1785  mDiv(out,"vpi",(unsigned int) vp_);
1786  if(vulture_->hasStarted()>=0)
1787  mDiv(out,"vul","Prowling");
1788  else
1789  mDiv(out,"vul","Dead");
1790  }
1791  else{
1792  mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
1793  }
1794  if(evtProcessor_){
1795  mDiv(out,"ll");
1797  << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
1798  cDiv(out);
1799  }
1800  mDiv(out,"lg");
1801  for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
1802  *out << logRing_[i] << std::endl;
1803  if(logWrap_)
1804  for(unsigned int i = 0; i<logRingIndex_; i++)
1805  *out << logRing_[i] << std::endl;
1806  cDiv(out);
1807 
1808 }
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > logRing_
xdata::UnsignedInteger32 runNumber_
unsigned int acc
Definition: FWEPWrapper.h:42
void cDiv(std::ostringstream *out)
Definition: procUtils.cc:338
lsTriplet & lastLumi()
Definition: FWEPWrapper.h:129
xdata::Serializable * nbProcessed
std::string wlMonitoring()
Definition: FWEPWrapper.h:94
evf::StateMachine fsm_
unsigned int proc
Definition: FWEPWrapper.h:41
xdata::Boolean squidPresent_
xdata::Serializable * nbAccepted
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
xdata::String configString_
unsigned int ls
Definition: FWEPWrapper.h:40
int hasStarted()
Definition: Vulture.cc:209
void mDiv(std::ostringstream *out, std::string name)
Definition: procUtils.cc:335
void uptime(std::ostringstream *out)
Definition: procUtils.cc:290
static const unsigned int logRingSize_
evf::FUEventProcessor::XDAQ_INSTANTIATOR ( )

Member Data Documentation

int evf::FUEventProcessor::anonymousPipe_[2]
private

Definition at line 244 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), receivingAndMonitor(), and subWeb().

xdata::InfoSpace* evf::FUEventProcessor::applicationInfoSpace_
private
toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndExecute_
private

Definition at line 210 of file FUEventProcessor.h.

Referenced by startReceivingLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndRead_
private

Definition at line 213 of file FUEventProcessor.h.

Referenced by startReceivingMonitorLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asScalers_
private

Definition at line 236 of file FUEventProcessor.h.

Referenced by startScalersWorkLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asSummarize_
private

Definition at line 242 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asSupervisor_
private

Definition at line 217 of file FUEventProcessor.h.

Referenced by startSupervisorLoop().

xdata::Boolean evf::FUEventProcessor::autoRestartSlaves_
private

Definition at line 167 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), microState(), and supervisor().

xdata::String evf::FUEventProcessor::class_
private

Definition at line 158 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::String evf::FUEventProcessor::configString_
private

Definition at line 162 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and updater().

std::string evf::FUEventProcessor::configuration_
private

Definition at line 163 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and spotlightWebPage().

CPUStat* evf::FUEventProcessor::cpustat_
private

Definition at line 252 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), summarize(), and supervisor().

Css evf::FUEventProcessor::css_
private

Definition at line 189 of file FUEventProcessor.h.

Referenced by css().

xdata::Boolean evf::FUEventProcessor::epInitialized_
private

Definition at line 161 of file FUEventProcessor.h.

Referenced by actionPerformed(), configuring(), enabling(), and FUEventProcessor().

FWEPWrapper evf::FUEventProcessor::evtProcessor_
private
xdata::Boolean evf::FUEventProcessor::exitOnError_
private

Definition at line 186 of file FUEventProcessor.h.

Referenced by receivingAndMonitor().

evf::StateMachine evf::FUEventProcessor::fsm_
private
xdata::Boolean evf::FUEventProcessor::hasModuleWebRegistry_
private

Definition at line 172 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().

xdata::Boolean evf::FUEventProcessor::hasPrescaleService_
private

Definition at line 171 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and FUEventProcessor().

xdata::Boolean evf::FUEventProcessor::hasServiceWebRegistry_
private

Definition at line 173 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().

xdata::Boolean evf::FUEventProcessor::hasShMem_
private

Definition at line 170 of file FUEventProcessor.h.

Referenced by enableCommon(), FUEventProcessor(), makeStaticInfo(), and stopClassic().

xdata::Boolean evf::FUEventProcessor::iDieStatisticsGathering_
private

Definition at line 175 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and summarize().

xdata::String evf::FUEventProcessor::iDieUrl_
private

Definition at line 249 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and updater().

xdata::UnsignedInteger32 evf::FUEventProcessor::instance_
private

Definition at line 159 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and FUEventProcessor().

xdata::Boolean evf::FUEventProcessor::isRunNumberSetter_
private

Definition at line 174 of file FUEventProcessor.h.

Referenced by enableCommon(), and FUEventProcessor().

Logger evf::FUEventProcessor::log_
private

Definition at line 151 of file FUEventProcessor.h.

Referenced by supervisor().

std::vector<std::string> evf::FUEventProcessor::logRing_
private

Definition at line 195 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

unsigned int evf::FUEventProcessor::logRingIndex_
private

Definition at line 196 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

const unsigned int evf::FUEventProcessor::logRingSize_ = 50
staticprivate

Definition at line 197 of file FUEventProcessor.h.

Referenced by localLog(), and updater().

bool evf::FUEventProcessor::logWrap_
private

Definition at line 198 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

xdata::InfoSpace* evf::FUEventProcessor::monitorInfoSpace_
private

Definition at line 220 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

xdata::InfoSpace* evf::FUEventProcessor::monitorLegendaInfoSpace_
private

Definition at line 221 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

SubProcess* evf::FUEventProcessor::myProcess_
private
std::list<std::string> evf::FUEventProcessor::names_
private

Definition at line 248 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

xdata::Serializable* evf::FUEventProcessor::nbAccepted
private

Definition at line 228 of file FUEventProcessor.h.

Referenced by supervisor(), and updater().

unsigned int evf::FUEventProcessor::nbdead_
private

Definition at line 204 of file FUEventProcessor.h.

Referenced by microState(), and supervisor().

unsigned int evf::FUEventProcessor::nblive_
private

Definition at line 203 of file FUEventProcessor.h.

Referenced by microState(), and supervisor().

xdata::Serializable* evf::FUEventProcessor::nbProcessed
private

Definition at line 227 of file FUEventProcessor.h.

Referenced by supervisor(), and updater().

xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcesses_
private
xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcessesReporting_
private

Definition at line 201 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and summarize().

unsigned int evf::FUEventProcessor::nbTotalDQM_
private

Definition at line 206 of file FUEventProcessor.h.

Referenced by enabling(), microState(), and supervisor().

bool evf::FUEventProcessor::outprev_
private

Definition at line 176 of file FUEventProcessor.h.

Referenced by actionPerformed().

xdata::Boolean evf::FUEventProcessor::outPut_
private

Definition at line 165 of file FUEventProcessor.h.

Referenced by actionPerformed(), FUEventProcessor(), and makeStaticInfo().

pthread_mutex_t evf::FUEventProcessor::pickup_lock_
private

Definition at line 225 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), microState(), and supervisor().

RateStat* evf::FUEventProcessor::ratestat_
private

Definition at line 253 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and summarize().

std::string evf::FUEventProcessor::reasonForFailedState_
private
bool evf::FUEventProcessor::receiving_
private

Definition at line 211 of file FUEventProcessor.h.

Referenced by startReceivingLoop().

bool evf::FUEventProcessor::receivingM_
private

Definition at line 214 of file FUEventProcessor.h.

Referenced by startReceivingMonitorLoop().

xdata::UnsignedInteger32 evf::FUEventProcessor::runNumber_
private

Definition at line 160 of file FUEventProcessor.h.

Referenced by enableCommon(), enabling(), FUEventProcessor(), and updater().

xdata::InfoSpace* evf::FUEventProcessor::scalersInfoSpace_
private

Definition at line 231 of file FUEventProcessor.h.

Referenced by enabling(), and FUEventProcessor().

xdata::InfoSpace* evf::FUEventProcessor::scalersLegendaInfoSpace_
private

Definition at line 232 of file FUEventProcessor.h.

Referenced by configuring(), FUEventProcessor(), and pathNames().

unsigned int evf::FUEventProcessor::scalersUpdates_
private

Definition at line 238 of file FUEventProcessor.h.

Referenced by enabling(), receivingAndMonitor(), scalers(), and supervisor().

xdata::UnsignedInteger32 evf::FUEventProcessor::slaveRestartDelaySecs_
private

Definition at line 168 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

std::string evf::FUEventProcessor::sourceId_
private

Definition at line 179 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::Vector<xdata::Integer> evf::FUEventProcessor::spMStates_
private

Definition at line 245 of file FUEventProcessor.h.

Referenced by configuring(), FUEventProcessor(), and supervisor().

xdata::Vector<xdata::Integer> evf::FUEventProcessor::spmStates_
private

Definition at line 246 of file FUEventProcessor.h.

Referenced by configuring(), FUEventProcessor(), and supervisor().

SquidNet evf::FUEventProcessor::squidnet_
private

Definition at line 194 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::Boolean evf::FUEventProcessor::squidPresent_
private

Definition at line 182 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and updater().

pthread_mutex_t evf::FUEventProcessor::start_lock_
private

Definition at line 224 of file FUEventProcessor.h.

Referenced by enabling(), FUEventProcessor(), and microState().

pthread_mutex_t evf::FUEventProcessor::stop_lock_
private
std::vector<SubProcess> evf::FUEventProcessor::subs_
private
xdata::UnsignedInteger32 evf::FUEventProcessor::superSleepSec_
private

Definition at line 247 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), subWeb(), and supervisor().

bool evf::FUEventProcessor::supervising_
private

Definition at line 218 of file FUEventProcessor.h.

Referenced by startSupervisorLoop(), and updater().

std::string evf::FUEventProcessor::updaterStatic_
private

Definition at line 226 of file FUEventProcessor.h.

Referenced by makeStaticInfo(), and updater().

xdata::String evf::FUEventProcessor::url_
private

Definition at line 157 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

pid_t evf::FUEventProcessor::vp_
private

Definition at line 251 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and updater().

Vulture* evf::FUEventProcessor::vulture_
private
toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceiving_
private

Definition at line 209 of file FUEventProcessor.h.

Referenced by startReceivingLoop().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceivingMonitor_
private

Definition at line 212 of file FUEventProcessor.h.

Referenced by startReceivingMonitorLoop().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlScalers_
private

Definition at line 235 of file FUEventProcessor.h.

Referenced by startScalersWorkLoop().

bool evf::FUEventProcessor::wlScalersActive_
private

Definition at line 237 of file FUEventProcessor.h.

Referenced by scalers(), startScalersWorkLoop(), summarize(), and updater().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlSummarize_
private

Definition at line 241 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop().

bool evf::FUEventProcessor::wlSummarizeActive_
private

Definition at line 243 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop(), and updater().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlSupervising_
private

Definition at line 216 of file FUEventProcessor.h.

Referenced by startSupervisorLoop().