CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes
evf::FUEventProcessor Class Reference

#include <FUEventProcessor.h>

Inheritance diagram for evf::FUEventProcessor:

Public Member Functions

void actionPerformed (xdata::Event &e)
 
bool configuring (toolbox::task::WorkLoop *wl)
 
void css (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void defaultWebPage (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
bool enabling (toolbox::task::WorkLoop *wl)
 
xoap::MessageReference fsmCallback (xoap::MessageReference msg) throw (xoap::exception::Exception)
 
 FUEventProcessor (xdaq::ApplicationStub *s)
 
void getSlavePids (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
bool halting (toolbox::task::WorkLoop *wl)
 
void handleSignalSlave (int sig, siginfo_t *info, void *c)
 
void microState (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void moduleWeb (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void pathNames (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
 
void procStat (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void scalersWeb (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
 
void sendMessageOverMonitorQueue (MsgBuf &)
 
void serviceWeb (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void spotlightWebPage (xgi::Input *, xgi::Output *) throw (xgi::exception::Exception)
 
bool stopping (toolbox::task::WorkLoop *wl)
 
void subWeb (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
void updater (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
 
 XDAQ_INSTANTIATOR ()
 
virtual ~FUEventProcessor ()
 

Static Public Member Functions

static void forkProcessFromEDM_helper (void *addr)
 

Private Member Functions

void attachDqmToShm () throw (evf::Exception)
 
void detachDqmFromShm () throw (evf::Exception)
 
bool doEndRunInEDM ()
 
bool enableClassic ()
 
bool enableCommon ()
 
bool enableForkInEDM ()
 
bool enableMPEPSlave ()
 
void forkProcessesFromEDM ()
 
void localLog (std::string)
 
std::string logsAsString ()
 
void makeStaticInfo ()
 
bool receiving (toolbox::task::WorkLoop *wl)
 
bool receivingAndMonitor (toolbox::task::WorkLoop *wl)
 
bool restartForkInEDM (unsigned int slotId)
 
bool scalers (toolbox::task::WorkLoop *wl)
 
void setAttachDqmToShm () throw (evf::Exception)
 
bool sigmon (toolbox::task::WorkLoop *wl)
 
void startReceivingLoop ()
 
void startReceivingMonitorLoop ()
 
void startScalersWorkLoop () throw (evf::Exception)
 
void startSignalMonitorWorkLoop () throw (evf::Exception)
 
void startSummarizeWorkLoop () throw (evf::Exception)
 
void startSupervisorLoop ()
 
bool stopClassic ()
 
void stopSlavesAndAcknowledge ()
 
bool summarize (toolbox::task::WorkLoop *wl)
 
bool supervisor (toolbox::task::WorkLoop *wl)
 

Private Attributes

unsigned long long allProcStats_
 
int anonymousPipe_ [2]
 
xdata::InfoSpace * applicationInfoSpace_
 
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
 
toolbox::task::ActionSignature * asReceiveMsgAndRead_
 
toolbox::task::ActionSignature * asScalers_
 
toolbox::task::ActionSignature * asSignalMonitor_
 
toolbox::task::ActionSignature * asSummarize_
 
toolbox::task::ActionSignature * asSupervisor_
 
xdata::Boolean autoRestartSlaves_
 
xdata::String class_
 
xdata::String configString_
 
std::string configuration_
 
CPUStatcpustat_
 
unsigned int crashesThisRun_
 
xdata::UnsignedInteger32 crashesToDump_
 
Css css_
 
xdata::Boolean datasetCounting_
 
bool edm_init_done_
 
xdata::Boolean epInitialized_
 
FWEPWrapper evtProcessor_
 
xdata::Boolean exitOnError_
 
xdata::UnsignedInteger32 forkInEDM_
 
moduleweb::ForkInfoObjforkInfoObj_
 
pthread_mutex_t forkObjLock_
 
evf::StateMachine fsm_
 
xdata::Boolean hasModuleWebRegistry_
 
xdata::Boolean hasPrescaleService_
 
xdata::Boolean hasServiceWebRegistry_
 
xdata::Boolean hasShMem_
 
xdata::Boolean iDieStatisticsGathering_
 
xdata::String iDieUrl_
 
unsigned long long idleProcStats_
 
xdata::UnsignedInteger32 instance_
 
xdata::Boolean isRunNumberSetter_
 
timeval lastCrashTime_
 
timeval lastProcReport_
 
Logger log_
 
std::vector< std::string > logRing_
 
unsigned int logRingIndex_
 
bool logWrap_
 
MsgBuf master_message_prg_
 
MsgBuf master_message_prr_
 
MsgBuf master_message_trr_
 
std::auto_ptr< edm::PresencemessageServicePresence_
 
xdata::InfoSpace * monitorInfoSpace_
 
xdata::InfoSpace * monitorLegendaInfoSpace_
 
ModuleWebRegistrymwrRef_
 
SubProcessmyProcess_
 
std::list< std::string > names_
 
xdata::Serializable * nbAccepted
 
unsigned int nbdead_
 
unsigned int nblive_
 
xdata::Serializable * nbProcessed
 
xdata::UnsignedInteger32 nbSubProcesses_
 
xdata::UnsignedInteger32 nbSubProcessesReporting_
 
unsigned int nbTotalDQM_
 
bool outprev_
 
xdata::Boolean outPut_
 
pthread_mutex_t pickup_lock_
 
RateStatratestat_
 
std::string reasonForFailedState_
 
bool receiving_
 
bool receivingM_
 
bool restart_in_progress_
 
bool rlimit_coresize_changed_
 
rlimit rlimit_coresize_default_
 
xdata::UnsignedInteger32 runNumber_
 
xdata::InfoSpace * scalersInfoSpace_
 
xdata::InfoSpace * scalersLegendaInfoSpace_
 
unsigned int scalersUpdates_
 
sem_t * sigmon_sem_
 
bool signalMonitorActive_
 
MsgBuf slave_message_monitoring_
 
MsgBuf slave_message_prr_
 
xdata::UnsignedInteger32 slaveRestartDelaySecs_
 
ShmOutputModuleRegistrysorRef_
 
std::string sourceId_
 
xdata::Vector< xdata::Integer > spMStates_
 
xdata::Vector< xdata::Integer > spmStates_
 
SquidNet squidnet_
 
xdata::Boolean squidPresent_
 
pthread_mutex_t start_lock_
 
pthread_mutex_t stop_lock_
 
std::vector< SubProcesssubs_
 
xdata::UnsignedInteger32 superSleepSec_
 
bool supervising_
 
std::string updaterStatic_
 
xdata::String url_
 
pid_t vp_
 
Vulturevulture_
 
toolbox::task::WorkLoop * wlReceiving_
 
toolbox::task::WorkLoop * wlReceivingMonitor_
 
toolbox::task::WorkLoop * wlScalers_
 
bool wlScalersActive_
 
toolbox::task::WorkLoop * wlSignalMonitor_
 
toolbox::task::WorkLoop * wlSummarize_
 
bool wlSummarizeActive_
 
toolbox::task::WorkLoop * wlSupervising_
 

Static Private Attributes

static const unsigned int logRingSize_ = 50
 

Detailed Description

Definition at line 64 of file FUEventProcessor.h.

Constructor & Destructor Documentation

FUEventProcessor::FUEventProcessor ( xdaq::ApplicationStub *  s)

Definition at line 87 of file FUEventProcessor.cc.

References anonymousPipe_, applicationInfoSpace_, autoRestartSlaves_, evf::SquidNet::check(), class_, configString_, gather_cfg::cout, crashesToDump_, css(), datasetCounting_, defaultWebPage(), epInitialized_, evtProcessor_, evf::StateMachine::findRcmsStateListener(), forkInEDM_, forkInfoObj_, forkObjLock_, evf::StateMachine::foundRcmsStateListener(), fsm_, evf::FUInstancePtr_, edm::PresenceFactory::get(), getSlavePids(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, hasShMem_, iDieStatisticsGathering_, iDieUrl_, evf::StateMachine::initialize(), instance_, isRunNumberSetter_, edm::PresenceFactory::makePresence(), makeStaticInfo(), messageServicePresence_, microState(), moduleWeb(), monitorInfoSpace_, monitorLegendaInfoSpace_, names_, nbSubProcesses_, nbSubProcessesReporting_, NULL, outPut_, pathNames(), pickup_lock_, pipe::pipe(), procStat(), evf::FWEPWrapper::publishConfigAndMonitorItems(), evf::StateMachine::rcmsStateListener(), rlimit_coresize_default_, runNumber_, scalersInfoSpace_, scalersLegendaInfoSpace_, scalersWeb(), serviceWeb(), evf::FWEPWrapper::setAppCtxt(), evf::FWEPWrapper::setAppDesc(), ML::MLlog4cplus::setAppl(), evf::FWEPWrapper::setApplicationInfoSpace(), evf::FWEPWrapper::setMonitorInfoSpace(), evf::FWEPWrapper::setRcms(), evf::FWEPWrapper::setScalersInfoSpace(), sigmon_sem_, slaveRestartDelaySecs_, sourceId_, spMStates_, spmStates_, spotlightWebPage(), squidnet_, squidPresent_, start_lock_, startSupervisorLoop(), evf::StateMachine::stateName(), stop_lock_, AlCaHLTBitMon_QueryRunRegistry::string, subWeb(), superSleepSec_, updater(), url_, and vulture_.

88  : xdaq::Application(s)
89  , fsm_(this)
90  , log_(getApplicationLogger())
91  , evtProcessor_(log_, getApplicationDescriptor()->getInstance())
92  , runNumber_(0)
93  , epInitialized_(false)
94  , outPut_(true)
95  , autoRestartSlaves_(false)
97  , hasShMem_(true)
98  , hasPrescaleService_(true)
99  , hasModuleWebRegistry_(true)
100  , hasServiceWebRegistry_(true)
101  , isRunNumberSetter_(true)
102  , iDieStatisticsGathering_(false)
103  , outprev_(true)
104  , exitOnError_(true)
106  , squidnet_(3128,"http://localhost:8000/RELEASE-NOTES.txt")
109  , logWrap_(false)
110  , nbSubProcesses_(0)
112  , forkInEDM_(true)
113  , nblive_(0)
114  , nbdead_(0)
115  , nbTotalDQM_(0)
116  , wlReceiving_(0)
118  , receiving_(false)
121  , receivingM_(false)
122  , myProcess_(0)
123  , wlSupervising_(0)
124  , asSupervisor_(0)
125  , supervising_(false)
126  , wlSignalMonitor_(0)
127  , asSignalMonitor_(0)
128  , signalMonitorActive_(false)
129  , monitorInfoSpace_(0)
132  , nbProcessed(0)
133  , nbAccepted(0)
134  , scalersInfoSpace_(0)
136  , wlScalers_(0)
137  , asScalers_(0)
138  , wlScalersActive_(false)
139  , scalersUpdates_(0)
140  , wlSummarize_(0)
141  , asSummarize_(0)
142  , wlSummarizeActive_(false)
143  , superSleepSec_(1)
144  , iDieUrl_("none")
145  , vulture_(0)
146  , vp_(0)
147  , cpustat_(0)
148  , ratestat_(0)
149  , mwrRef_(nullptr)
150  , sorRef_(nullptr)
155  , edm_init_done_(true)
156  , crashesThisRun_(false)
157  , rlimit_coresize_changed_(false)
158  , crashesToDump_(2)
159  , sigmon_sem_(0)
160  , datasetCounting_(true)
161 {
162  using namespace utils;
163 
164  FUInstancePtr_=this;
165 
166  names_.push_back("nbProcessed" );
167  names_.push_back("nbAccepted" );
168  names_.push_back("epMacroStateInt");
169  names_.push_back("epMicroStateInt");
170  // create pipe for web communication
171  int retpipe = pipe(anonymousPipe_);
172  if(retpipe != 0)
173  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to create pipe");
174  // check squid presence
176  //pass application parameters to FWEPWrapper
177  evtProcessor_.setAppDesc(getApplicationDescriptor());
178  evtProcessor_.setAppCtxt(getApplicationContext());
179  // bind relevant callbacks to finite state machine
181 
182  //set sourceId_
183  url_ =
184  getApplicationDescriptor()->getContextDescriptor()->getURL()+"/"+
185  getApplicationDescriptor()->getURN();
186  class_ =getApplicationDescriptor()->getClassName();
187  instance_=getApplicationDescriptor()->getInstance();
188  sourceId_=class_.toString()+"_"+instance_.toString();
189  LOG4CPLUS_INFO(getApplicationLogger(),sourceId_ <<" constructor" );
190  LOG4CPLUS_INFO(getApplicationLogger(),"CMSSW_BASE:"<<getenv("CMSSW_BASE"));
191 
192  getApplicationDescriptor()->setAttribute("icon", "/evf/images/epicon.jpg");
193 
194  xdata::InfoSpace *ispace = getApplicationInfoSpace();
195  applicationInfoSpace_ = ispace;
196 
197  // default configuration
198  ispace->fireItemAvailable("parameterSet", &configString_ );
199  ispace->fireItemAvailable("epInitialized", &epInitialized_ );
200  ispace->fireItemAvailable("stateName", fsm_.stateName() );
201  ispace->fireItemAvailable("runNumber", &runNumber_ );
202  ispace->fireItemAvailable("outputEnabled", &outPut_ );
203 
204  ispace->fireItemAvailable("hasSharedMemory", &hasShMem_);
205  ispace->fireItemAvailable("hasPrescaleService", &hasPrescaleService_ );
206  ispace->fireItemAvailable("hasModuleWebRegistry", &hasModuleWebRegistry_ );
207  ispace->fireItemAvailable("hasServiceWebRegistry", &hasServiceWebRegistry_ );
208  ispace->fireItemAvailable("isRunNumberSetter", &isRunNumberSetter_ );
209  ispace->fireItemAvailable("iDieStatisticsGathering", &iDieStatisticsGathering_);
210  ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener() );
211  ispace->fireItemAvailable("foundRcmsStateListener",fsm_.foundRcmsStateListener());
212  ispace->fireItemAvailable("nbSubProcesses", &nbSubProcesses_ );
213  ispace->fireItemAvailable("nbSubProcessesReporting",&nbSubProcessesReporting_ );
214  ispace->fireItemAvailable("forkInEDM" ,&forkInEDM_ );
215  ispace->fireItemAvailable("superSleepSec", &superSleepSec_ );
216  ispace->fireItemAvailable("autoRestartSlaves", &autoRestartSlaves_ );
217  ispace->fireItemAvailable("slaveRestartDelaySecs",&slaveRestartDelaySecs_ );
218  ispace->fireItemAvailable("iDieUrl", &iDieUrl_ );
219  ispace->fireItemAvailable("crashesToDump" ,&crashesToDump_ );
220  ispace->fireItemAvailable("datasetCounting" ,&datasetCounting_ );
221 
222  // Add infospace listeners for exporting data values
223  getApplicationInfoSpace()->addItemChangedListener("parameterSet", this);
224  getApplicationInfoSpace()->addItemChangedListener("outputEnabled", this);
225 
226  // findRcmsStateListener
228 
229  // initialize monitoring infospace
230 
231  std::string monInfoSpaceName="evf-eventprocessor-status-monitor";
232  toolbox::net::URN urn = this->createQualifiedInfoSpace(monInfoSpaceName);
233  monitorInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
234 
235  std::string monLegendaInfoSpaceName="evf-eventprocessor-status-legenda";
236  urn = this->createQualifiedInfoSpace(monLegendaInfoSpaceName);
237  monitorLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
238 
239 
240  monitorInfoSpace_->fireItemAvailable("url", &url_ );
241  monitorInfoSpace_->fireItemAvailable("class", &class_ );
242  monitorInfoSpace_->fireItemAvailable("instance", &instance_ );
243  monitorInfoSpace_->fireItemAvailable("runNumber", &runNumber_ );
244  monitorInfoSpace_->fireItemAvailable("stateName", fsm_.stateName());
245 
246  monitorInfoSpace_->fireItemAvailable("squidPresent", &squidPresent_ );
247 
248  std::string scalersInfoSpaceName="evf-eventprocessor-scalers-monitor";
249  urn = this->createQualifiedInfoSpace(scalersInfoSpaceName);
250  scalersInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
251 
252  std::string scalersLegendaInfoSpaceName="evf-eventprocessor-scalers-legenda";
253  urn = this->createQualifiedInfoSpace(scalersLegendaInfoSpaceName);
254  scalersLegendaInfoSpace_ = xdata::getInfoSpaceFactory()->get(urn.toString());
255 
256 
257 
259  scalersInfoSpace_->fireItemAvailable("instance", &instance_);
260 
264 
265  //subprocess state vectors for MP
266  monitorInfoSpace_->fireItemAvailable("epMacroStateInt", &spMStates_);
267  monitorInfoSpace_->fireItemAvailable("epMicroStateInt", &spmStates_);
268 
269  // Bind web interface
270  xgi::bind(this, &FUEventProcessor::css, "styles.css");
271  xgi::bind(this, &FUEventProcessor::defaultWebPage, "Default" );
272  xgi::bind(this, &FUEventProcessor::spotlightWebPage, "Spotlight" );
273  xgi::bind(this, &FUEventProcessor::scalersWeb, "scalersWeb");
274  xgi::bind(this, &FUEventProcessor::pathNames, "pathNames" );
275  xgi::bind(this, &FUEventProcessor::subWeb, "SubWeb" );
276  xgi::bind(this, &FUEventProcessor::getSlavePids, "getSlavePids");
277  xgi::bind(this, &FUEventProcessor::moduleWeb, "moduleWeb" );
278  xgi::bind(this, &FUEventProcessor::serviceWeb, "serviceWeb");
279  xgi::bind(this, &FUEventProcessor::microState, "microState");
280  xgi::bind(this, &FUEventProcessor::updater, "updater" );
281  xgi::bind(this, &FUEventProcessor::procStat, "procStat" );
282 
283  // instantiate the plugin manager, not referenced here after!
284 
286 
287  //create Message Service thread and pass ownership to auto_ptr that we destroy before fork
288  try{
289  LOG4CPLUS_DEBUG(getApplicationLogger(),
290  "Trying to create message service presence ");
292  if(pf != 0) {
293  messageServicePresence_= pf->makePresence("MessageServicePresence");
294  }
295  else {
296  LOG4CPLUS_ERROR(getApplicationLogger(),
297  "Unable to create message service presence ");
298  }
299  }
300  catch(...) {
301  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
302  }
304 
305  typedef std::set<xdaq::ApplicationDescriptor*> AppDescSet_t;
306  typedef AppDescSet_t::iterator AppDescIter_t;
307 
308  AppDescSet_t rcms=
309  getApplicationContext()->getDefaultZone()->
310  getApplicationDescriptors("RCMSStateListener");
311  if(rcms.size()==0)
312  {
313  LOG4CPLUS_WARN(getApplicationLogger(),
314  "MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
315  // localLog("-W- MonitorReceiver not found, perhaphs it has not been defined ? Scalers updater wl will bail out!");
316  }
317  else
318  {
319  AppDescIter_t it = rcms.begin();
320  evtProcessor_.setRcms(*it);
321  }
322  pthread_mutex_init(&start_lock_,0);
323  pthread_mutex_init(&stop_lock_,0);
324  pthread_mutex_init(&pickup_lock_,0);
325 
326  forkInfoObj_=nullptr;
327  pthread_mutex_init(&forkObjLock_,0);
328  makeStaticInfo();
330 
331  if(vulture_==0) vulture_ = new Vulture(true);
332 
334 
335  AppDescSet_t setOfiDie=
336  getApplicationContext()->getDefaultZone()->
337  getApplicationDescriptors("evf::iDie");
338 
339  for (AppDescIter_t it=setOfiDie.begin();it!=setOfiDie.end();++it)
340  if ((*it)->getInstance()==0) // there has to be only one instance of iDie
341  iDieUrl_ = (*it)->getContextDescriptor()->getURL() + "/" + (*it)->getURN();
342 
343  //save default core file size
344  getrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
345 
346  //prepare IPC semaphore for getting the workloop waked up on signal caught in slaves
347  #ifdef linux
348  if (sigmon_sem_==0) {
349  sigmon_sem_ = (sem_t*)mmap(NULL, sizeof(sem_t),
350  PROT_READ | PROT_WRITE,
351  MAP_ANONYMOUS | MAP_SHARED, 0, 0);
352  if (!sigmon_sem_) {
353  perror("mmap error\n");
354  std::cout << "mmap error"<<std::endl;
355  }
356  else
357  sem_init(sigmon_sem_,true,0);
358  }
359  #endif
360 }
xdata::Boolean hasModuleWebRegistry_
ShmOutputModuleRegistry * sorRef_
void spotlightWebPage(xgi::Input *, xgi::Output *)
bool check()
Definition: SquidNet.cc:21
xdata::Vector< xdata::Integer > spMStates_
xdata::InfoSpace * monitorInfoSpace_
void subWeb(xgi::Input *in, xgi::Output *out)
std::vector< std::string > logRing_
xdata::UnsignedInteger32 slaveRestartDelaySecs_
xdata::Boolean datasetCounting_
xdata::UnsignedInteger32 runNumber_
void updater(xgi::Input *in, xgi::Output *out)
toolbox::task::WorkLoop * wlScalers_
pthread_mutex_t start_lock_
#define MSQS_MESSAGE_TYPE_TRR
Definition: queue_defs.h:34
xdata::UnsignedInteger32 crashesToDump_
xdata::Boolean isRunNumberSetter_
void setAppCtxt(xdaq::ApplicationContext *ctx)
Definition: FWEPWrapper.h:91
xdata::UnsignedInteger32 forkInEDM_
xdata::Boolean hasServiceWebRegistry_
toolbox::task::WorkLoop * wlReceiving_
xdata::Boolean hasShMem_
def pipe
Definition: pipe.py:5
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
#define NULL
Definition: scimark2.h:8
toolbox::task::ActionSignature * asSignalMonitor_
FUEventProcessor * FUInstancePtr_
xdata::InfoSpace * scalersLegendaInfoSpace_
toolbox::task::ActionSignature * asSupervisor_
xdata::Boolean autoRestartSlaves_
xdata::UnsignedInteger32 nbSubProcessesReporting_
unsigned int crashesThisRun_
void css(xgi::Input *in, xgi::Output *out)
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
toolbox::task::ActionSignature * asReceiveMsgAndRead_
xdata::Boolean exitOnError_
xdata::InfoSpace * monitorLegendaInfoSpace_
void initialize(T *app)
Definition: StateMachine.h:84
void getSlavePids(xgi::Input *in, xgi::Output *out)
xdata::Serializable * nbProcessed
pthread_mutex_t forkObjLock_
#define MSQM_MESSAGE_TYPE_PRG
Definition: queue_defs.h:19
xdata::Bag< xdaq2rc::ClassnameAndInstance > * rcmsStateListener()
Definition: StateMachine.h:72
#define MSQS_MESSAGE_TYPE_PRR
Definition: queue_defs.h:32
evf::StateMachine fsm_
toolbox::task::WorkLoop * wlSupervising_
static void setAppl(xdaq::Application *app)
Definition: MLlog4cplus.cc:85
std::string reasonForFailedState_
xdata::Boolean iDieStatisticsGathering_
void setAppDesc(xdaq::ApplicationDescriptor *ad)
Definition: FWEPWrapper.h:90
void scalersWeb(xgi::Input *, xgi::Output *)
xdata::Boolean squidPresent_
std::list< std::string > names_
void setScalersInfoSpace(xdata::InfoSpace *sis, xdata::InfoSpace *slis)
Definition: FWEPWrapper.h:77
xdata::Boolean hasPrescaleService_
void procStat(xgi::Input *in, xgi::Output *out)
moduleweb::ForkInfoObj * forkInfoObj_
toolbox::task::WorkLoop * wlSummarize_
xdata::Serializable * nbAccepted
static PresenceFactory * get()
void setMonitorInfoSpace(xdata::InfoSpace *mis, xdata::InfoSpace *mlis)
Definition: FWEPWrapper.h:83
ModuleWebRegistry * mwrRef_
xdata::InfoSpace * applicationInfoSpace_
xdata::UnsignedInteger32 nbSubProcesses_
std::auto_ptr< Presence > makePresence(std::string const &presence_type) const
xdata::String * stateName()
Definition: StateMachine.h:69
void setApplicationInfoSpace(xdata::InfoSpace *is)
Definition: FWEPWrapper.h:82
xdata::String configString_
unsigned int scalersUpdates_
xdata::Boolean * foundRcmsStateListener()
Definition: StateMachine.h:78
xdata::UnsignedInteger32 superSleepSec_
std::auto_ptr< edm::Presence > messageServicePresence_
void moduleWeb(xgi::Input *in, xgi::Output *out)
void pathNames(xgi::Input *, xgi::Output *)
pthread_mutex_t pickup_lock_
void defaultWebPage(xgi::Input *in, xgi::Output *out)
toolbox::task::ActionSignature * asSummarize_
toolbox::task::WorkLoop * wlSignalMonitor_
xdata::Vector< xdata::Integer > spmStates_
tuple cout
Definition: gather_cfg.py:121
xdata::UnsignedInteger32 instance_
void serviceWeb(xgi::Input *in, xgi::Output *out)
toolbox::task::WorkLoop * wlReceivingMonitor_
toolbox::task::ActionSignature * asScalers_
pthread_mutex_t stop_lock_
void publishConfigAndMonitorItems(bool)
Definition: FWEPWrapper.cc:112
xdata::InfoSpace * scalersInfoSpace_
void microState(xgi::Input *in, xgi::Output *out)
void setRcms(xdaq::ApplicationDescriptor *rcms)
Definition: FWEPWrapper.h:89
static const unsigned int logRingSize_
xdata::Boolean epInitialized_
void findRcmsStateListener()
FUEventProcessor::~FUEventProcessor ( )
virtual

Definition at line 365 of file FUEventProcessor.cc.

References vulture_.

366 {
367  // no longer needed since the wrapper is a member of the class and one can rely on
368  // implicit destructor - to be revised - at any rate the most common exit path is via "kill"...
369  // if (evtProcessor_) delete evtProcessor_;
370  if(vulture_ != 0) delete vulture_;
371 }

Member Function Documentation

void FUEventProcessor::actionPerformed ( xdata::Event &  e)

Definition at line 810 of file FUEventProcessor.cc.

References alignCSCRings::e, edm::EventProcessor::enableEndPaths(), epInitialized_, evtProcessor_, fsm_, outprev_, outPut_, evf::StateMachine::stateName(), and AlCaHLTBitMon_QueryRunRegistry::string.

811 {
812 
813  if (e.type()=="ItemChangedEvent" && fsm_.stateName()->toString()!="Halted") {
814  std::string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
815 
816  if ( item == "parameterSet") {
817  LOG4CPLUS_WARN(getApplicationLogger(),
818  "HLT Menu changed, force reinitialization of EventProcessor");
819  epInitialized_ = false;
820  }
821  if ( item == "outputEnabled") {
822  if(outprev_ != outPut_) {
823  LOG4CPLUS_WARN(getApplicationLogger(),
824  (outprev_ ? "Disabling " : "Enabling ")<<"global output");
826  outprev_ = outPut_;
827  }
828  }
829  if (item == "globalInputPrescale") {
830  LOG4CPLUS_WARN(getApplicationLogger(),
831  "Setting global input prescale has no effect "
832  <<"in this version of the code");
833  }
834  if ( item == "globalOutputPrescale") {
835  LOG4CPLUS_WARN(getApplicationLogger(),
836  "Setting global output prescale has no effect "
837  <<"in this version of the code");
838  }
839  }
840 
841 }
evf::StateMachine fsm_
xdata::String * stateName()
Definition: StateMachine.h:69
xdata::Boolean epInitialized_
void enableEndPaths(bool active)
void FUEventProcessor::attachDqmToShm ( )
throw (evf::Exception
)
private

Definition at line 1068 of file FUEventProcessor.cc.

References alignCSCRings::e, evtProcessor_, edm::EventProcessor::getToken(), edm::Service< T >::isAvailable(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and cms::Exception::what().

Referenced by enableCommon(), and forkProcessesFromEDM().

1069 {
1070  std::string errmsg;
1071  bool success = false;
1072  try {
1075  success = edm::Service<FUShmDQMOutputService>()->attachToShm();
1076  if (!success) errmsg = "Failed to attach DQM service to shared memory";
1077  }
1078  catch (cms::Exception& e) {
1079  errmsg = "Failed to attach DQM service to shared memory: " + (std::string)e.what();
1080  }
1081  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
1082 }
virtual char const * what() const
Definition: Exception.cc:141
bool isAvailable() const
Definition: Service.h:47
ServiceToken getToken()
bool FUEventProcessor::configuring ( toolbox::task::WorkLoop *  wl)

Definition at line 380 of file FUEventProcessor.cc.

References edm::EventProcessor::beginJob(), configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, datasetCounting_, alignCSCRings::e, epInitialized_, evtProcessor_, cppFunctionSkipper::exception, edm::hlt::Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, evf::ShmOutputModuleRegistry::getDatasetCSV(), evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getModuleWebRegistry(), evf::FWEPWrapper::getNumberOfMicrostates(), evf::FWEPWrapper::getShmOutputModuleRegistry(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, evf::FWEPWrapper::init(), instance_, localLog(), evf::Vulture::makeProcess(), mwrRef_, nbSubProcesses_, ratestat_, reasonForFailedState_, scalersLegendaInfoSpace_, evf::RateStat::sendAuxLegenda(), evf::RateStat::sendLegenda(), evf::CPUStat::sendLegenda(), evf::FWEPWrapper::setupFastTimerService(), edm::event_processor::sInit, sorRef_, spMStates_, spmStates_, evf::FWEPWrapper::startMonitoringWorkLoop(), AlCaHLTBitMon_QueryRunRegistry::string, vp_, and vulture_.

381 {
382 // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
383 // << ((instance_.value_==0) ? 0x8 : 0) << " "
384 // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
385 // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
386 // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
387  unsigned short smap
388  = (datasetCounting_.value_ ? 0x20 : 0 )
389  + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
390  + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
391  + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
392  + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
393  + (hasPrescaleService_.value_ ? 0x1 : 0);
394  if(nbSubProcesses_.value_==0)
395  {
396  spMStates_.setSize(1);
397  spmStates_.setSize(1);
398  }
399  else
400  {
401  spMStates_.setSize(nbSubProcesses_.value_);
402  spmStates_.setSize(nbSubProcesses_.value_);
403  for(unsigned int i = 0; i < spMStates_.size(); i++)
404  {
406  spmStates_[i] = 0;
407  }
408  }
409  try {
410  LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
411  std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
412  epInitialized_=true;
413  if(evtProcessor_)
414  {
415  //get ref of mwr
418  // moved to wrapper class
423  if(cpustat_) {delete cpustat_; cpustat_=0;}
425  nbSubProcesses_.value_,
426  instance_.value_,
427  iDieUrl_.value_);
428  if(ratestat_) {delete ratestat_; ratestat_=0;}
429  ratestat_ = new RateStat(iDieUrl_.value_);
430  if(iDieStatisticsGathering_.value_)
431  {
432  try
433  {
435  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
436  if(legenda !=0)
437  {
438  std::string slegenda = ((xdata::String*)legenda)->value_;
439  ratestat_->sendLegenda(slegenda);
440  }
441  if (sorRef_ && datasetCounting_.value_)
442  {
443  xdata::String dsLegenda = sorRef_->getDatasetCSV();
444  if (dsLegenda.value_.size())
445  ratestat_->sendAuxLegenda(dsLegenda);
446  }
447  }
448  catch(evf::Exception &e)
449  {
450  LOG4CPLUS_INFO(getApplicationLogger(),"coud not send legenda"
451  << e.what());
452  }
453  catch (xcept::Exception& e) {
454  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
455  << e.what());
456  }
457  }
458 
459  fsm_.fireEvent("ConfigureDone",this);
460  LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
461  localLog("-I- Configuration completed");
462 
463  }
464  }
465  catch (xcept::Exception &e) {
466  reasonForFailedState_ = "configuring FAILED: " + (std::string)e.what();
469  }
470  catch(cms::Exception &e) {
474  }
475  catch(std::exception &e) {
476  reasonForFailedState_ = e.what();
479  }
480  catch(...) {
481  fsm_.fireFailed("Unknown Exception",this);
482  }
483 
484  if(vulture_!=0 && vp_ == 0) vp_ = vulture_->makeProcess();
485 
486  return false;
487 }
xdata::Boolean hasModuleWebRegistry_
ShmOutputModuleRegistry * sorRef_
xdata::Vector< xdata::Integer > spMStates_
int i
Definition: DBlmapReader.cc:9
pid_t makeProcess()
Definition: Vulture.cc:154
xdata::Boolean datasetCounting_
void startMonitoringWorkLoop()
Definition: FWEPWrapper.cc:614
virtual std::string explainSelf() const
Definition: Exception.cc:146
ShmOutputModuleRegistry * getShmOutputModuleRegistry()
Definition: FWEPWrapper.cc:491
void sendAuxLegenda(const std::string &)
Definition: RateStat.cc:29
xdata::Boolean hasServiceWebRegistry_
void init(unsigned short, std::string &)
Definition: FWEPWrapper.cc:193
void setupFastTimerService(unsigned int nProcesses)
Definition: FWEPWrapper.cc:497
xdata::InfoSpace * scalersLegendaInfoSpace_
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
ModuleWebRegistry * getModuleWebRegistry()
Definition: FWEPWrapper.cc:485
std::string reasonForFailedState_
xdata::Boolean iDieStatisticsGathering_
xdata::Boolean hasPrescaleService_
unsigned int getNumberOfMicrostates()
Definition: FWEPWrapper.h:140
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
ModuleWebRegistry * mwrRef_
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String configString_
std::vector< std::string > const & getmicromap() const
Definition: FWEPWrapper.h:141
std::string const & configuration() const
Definition: FWEPWrapper.h:102
void sendLegenda(const std::vector< std::string > &)
Definition: CPUStat.cc:39
xdata::Vector< xdata::Integer > spmStates_
void sendLegenda(const std::string &)
Definition: RateStat.cc:24
xdata::UnsignedInteger32 instance_
xdata::Boolean epInitialized_
void evf::FUEventProcessor::css ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
inline

Definition at line 107 of file FUEventProcessor.h.

References evf::Css::css(), and css_.

Referenced by FUEventProcessor().

108  {
109  css_.css(in,out);
110  }
void css(xgi::Input *in, xgi::Output *out)
Definition: Css.h:15
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::defaultWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 925 of file FUEventProcessor.cc.

References nbSubProcesses_, and dbtoconf::out.

Referenced by FUEventProcessor(), and receivingAndMonitor().

927 {
928 
929 
930  *out << "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">"
931  << "<html><head><title>" << getApplicationDescriptor()->getClassName() << (nbSubProcesses_.value_ > 0 ? "MP " : " ")
932  << getApplicationDescriptor()->getInstance() << "</title>"
933  << "<meta http-equiv=\"REFRESH\" content=\"0;url=/evf/html/defaultBasePage.html\">"
934  << "</head></html>";
935 }
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
void FUEventProcessor::detachDqmFromShm ( )
throw (evf::Exception
)
private

Definition at line 1086 of file FUEventProcessor.cc.

References alignCSCRings::e, evtProcessor_, edm::EventProcessor::getToken(), edm::Service< T >::isAvailable(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and cms::Exception::what().

Referenced by stopClassic().

1087 {
1088  std::string errmsg;
1089  bool success = false;
1090  try {
1093  success = edm::Service<FUShmDQMOutputService>()->detachFromShm();
1094  if (!success) errmsg = "Failed to detach DQM service from shared memory";
1095  }
1096  catch (cms::Exception& e) {
1097  errmsg = "Failed to detach DQM service from shared memory: " + (std::string)e.what();
1098  }
1099  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
1100 }
virtual char const * what() const
Definition: Exception.cc:141
bool isAvailable() const
Definition: Service.h:47
ServiceToken getToken()
bool FUEventProcessor::doEndRunInEDM ( )
private

Definition at line 670 of file FUEventProcessor.cc.

References evf::moduleweb::ForkInfoObj::control_sem_, prof2calltree::count, edm_init_done_, evtProcessor_, evf::StateMachine::fireFailed(), forkInfoObj_, fsm_, edm::EventProcessor::getState(), evf::moduleweb::ForkInfoObj::lock(), log_, evf::moduleweb::ForkInfoObj::receivedStop_, edm::event_processor::sJobReady, stor::utils::sleep(), edm::event_processor::sRunning, edm::event_processor::sStopping, edm::EventProcessor::stateName(), evf::moduleweb::ForkInfoObj::stopCondition, and evf::moduleweb::ForkInfoObj::unlock().

Referenced by halting(), and stopping().

670  {
671  //taking care that EP in master is in stoppable state
672  if (forkInfoObj_) {
673 
674  int count = 30;
675  bool waitedForEDM=false;
676  while (!edm_init_done_ && count) {
677  ::sleep(1);
678  if (count%5==0)
679  LOG4CPLUS_WARN(log_,"MASTER EP: Stopping while EP busy in beginRun. waiting " <<count<< "sec");
680  count--;
681  waitedForEDM=true;
682  }
683  //sleep a few more seconds it was early stop
684  if (waitedForEDM) sleep(5);
685 
686  //if (count==0) fsm_.fireFailed("failed to stop Master EP",this);
687 
689  return true;//we are already done
690 
691  forkInfoObj_->lock();
693  sem_post(forkInfoObj_->control_sem_);
694  forkInfoObj_->unlock();
695 
696  count = 30;
697 
699  while(count--) {
700  st = evtProcessor_->getState();
701  if (st==edm::event_processor::sRunning) ::usleep(100000);
703  break;
704  }
705  else {
706  std::ostringstream ost;
707  ost << "Master edm::EventProcessor is in state "<< evtProcessor_->stateName(st) << " while stopping";
708  LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
709  fsm_.fireFailed(ost.str(),this);
710  return false;
711  }
712  if (count%5==0 && st==edm::event_processor::sRunning && !forkInfoObj_->receivedStop_) {
713  forkInfoObj_->lock();
715  sem_post(forkInfoObj_->control_sem_);
716  forkInfoObj_->unlock();
717  LOG4CPLUS_WARN(getApplicationLogger(),
718  "Master edm::EventProcessor still running after "<< (30-count-1) << " seconds. \"sem_post\" was executed again" );
719  }
720  }
721  if (count<0) {
722  std::ostringstream ost;
724  ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "
725  << evtProcessor_->stateName(st) << ": input source did not receive stop signal!";
726  else
727  ost << "Timeout waiting for Master edm::EventProcessor to go stopping state "<<evtProcessor_->stateName(st);
728  LOG4CPLUS_ERROR(getApplicationLogger(),ost.str());
729  fsm_.fireFailed(ost.str(),this);
730  return false;
731  }
732  }
733  return true;
734 }
event_processor::State getState() const
void sleep(Duration_t)
Definition: Utils.h:163
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
unsigned int stopCondition
Definition: ModuleWeb.h:46
moduleweb::ForkInfoObj * forkInfoObj_
char const * stateName(event_processor::State s) const
bool FUEventProcessor::enableClassic ( )
private

Definition at line 2323 of file FUEventProcessor.cc.

References enableCommon(), evtProcessor_, edm::EventProcessor::getState(), localLog(), stor::utils::sleep(), and edm::event_processor::sRunning.

Referenced by enabling().

2324 {
2325  bool retval = enableCommon();
2327  LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
2328  ::sleep(1);
2329  }
2330 
2331  // implementation moved to EPWrapper
2332  // startScalersWorkLoop(); // this is now not done any longer
2333  localLog("-I- Start completed");
2334  return retval;
2335 }
event_processor::State getState() const
void sleep(Duration_t)
Definition: Utils.h:163
void localLog(std::string)
bool FUEventProcessor::enableCommon ( )
private

Definition at line 1958 of file FUEventProcessor.cc.

References attachDqmToShm(), edm::EventProcessor::clearCounters(), gather_cfg::cout, edm::EventProcessor::declareRunNumber(), alignCSCRings::e, evtProcessor_, cppFunctionSkipper::exception, edm::hlt::Exception, cms::Exception::explainSelf(), evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, isRunNumberSetter_, localLog(), reasonForFailedState_, edm::EventProcessor::runAsync(), runNumber_, edm::EventProcessor::setRunNumber(), stor::utils::sleep(), edm::EventProcessor::statusAsync(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by enableClassic(), and enableMPEPSlave().

1959 {
1960  try {
1961  if(hasShMem_) attachDqmToShm();
1962  int sc = 0;
1964  if(isRunNumberSetter_)
1966  else
1968 
1969  try{
1970  ::sleep(1);
1972  sc = evtProcessor_->statusAsync();
1973  }
1974  catch(cms::Exception &e) {
1978  return false;
1979  }
1980  catch(std::exception &e) {
1981  reasonForFailedState_ = e.what();
1984  return false;
1985  }
1986  catch(...) {
1987  reasonForFailedState_ = "Unknown Exception";
1990  return false;
1991  }
1992  if(sc != 0) {
1993  std::ostringstream oss;
1994  oss<<"EventProcessor::runAsync returned status code " << sc;
1995  reasonForFailedState_ = oss.str();
1998  return false;
1999  }
2000  }
2001  catch (xcept::Exception &e) {
2002  reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
2005  return false;
2006  }
2007  try{
2008  fsm_.fireEvent("EnableDone",this);
2009  }
2010  catch (xcept::Exception &e) {
2011  std::cout << "exception " << (std::string)e.what() << std::endl;
2012  throw;
2013  }
2014 
2015  return false;
2016 }
xdata::UnsignedInteger32 runNumber_
virtual std::string explainSelf() const
Definition: Exception.cc:146
xdata::Boolean isRunNumberSetter_
void clearCounters()
Clears counters used by trigger report.
xdata::Boolean hasShMem_
void setRunNumber(RunNumber_t runNumber)
void sleep(Duration_t)
Definition: Utils.h:163
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
void declareRunNumber(RunNumber_t runNumber)
std::string reasonForFailedState_
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
StatusCode statusAsync() const
tuple cout
Definition: gather_cfg.py:121
bool FUEventProcessor::enableForkInEDM ( )
private

Definition at line 2240 of file FUEventProcessor.cc.

References edm::EventProcessor::clearCounters(), edm::EventProcessor::declareRunNumber(), alignCSCRings::e, evtProcessor_, cppFunctionSkipper::exception, cms::Exception::explainSelf(), evf::StateMachine::fireFailed(), evf::moduleweb::ForkInfoObj::forkHandler, forkInfoObj_, forkObjLock_, evf::moduleweb::ForkInfoObj::forkParams, forkProcessFromEDM_helper(), fsm_, evf::moduleweb::ForkInfoObj::fuAddr, evf::moduleweb::ForkParams::isMaster, isRunNumberSetter_, log_, evf::moduleweb::ForkInfoObj::mst_lock_, mwrRef_, evf::ModuleWebRegistry::publishForkInfo(), reasonForFailedState_, evf::FWEPWrapper::resetWaiting(), evf::moduleweb::ForkParams::restart, edm::EventProcessor::runAsync(), runNumber_, edm::EventProcessor::setRunNumber(), evf::moduleweb::ForkParams::slotId, edm::EventProcessor::statusAsync(), evf::moduleweb::ForkInfoObj::stopCondition, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by enabling().

2241 {
2243  try {
2244  //set to connect to Shm later
2245  //if(hasShMem_) setAttachDqmToShm();
2246 
2247  int sc = 0;
2248  //maybe not needed in MP mode
2250  if(isRunNumberSetter_)
2252  else
2254 
2255  //prepare object used to communicate with DaqSource
2256  pthread_mutex_destroy(&forkObjLock_);
2257  pthread_mutex_init(&forkObjLock_,0);
2258  if (forkInfoObj_) delete forkInfoObj_;
2261  forkInfoObj_->fuAddr=(void*)this;
2267  if (mwrRef_)
2269 
2271  sc = evtProcessor_->statusAsync();
2272 
2273  if(sc != 0) {
2274  std::ostringstream oss;
2275  oss<<"EventProcessor::runAsync returned status code " << sc;
2276  reasonForFailedState_ = oss.str();
2278  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2279  return false;
2280  }
2281  }
2282  //catch exceptions on master side
2283  catch(cms::Exception &e) {
2286  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2287  return false;
2288  }
2289  catch(std::exception &e) {
2290  reasonForFailedState_ = e.what();
2292  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2293  return false;
2294  }
2295  catch(...) {
2296  reasonForFailedState_ = "Unknown Exception";
2298  LOG4CPLUS_FATAL(log_,reasonForFailedState_);
2299  return false;
2300  }
2301  return true;
2302 }
xdata::UnsignedInteger32 runNumber_
virtual std::string explainSelf() const
Definition: Exception.cc:146
void publishForkInfo(std::string name, moduleweb::ForkInfoObj *forkInfoObj)
xdata::Boolean isRunNumberSetter_
void clearCounters()
Clears counters used by trigger report.
void(* forkHandler)(void *)
Definition: ModuleWeb.h:44
void setRunNumber(RunNumber_t runNumber)
pthread_mutex_t forkObjLock_
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
unsigned int stopCondition
Definition: ModuleWeb.h:46
void declareRunNumber(RunNumber_t runNumber)
std::string reasonForFailedState_
moduleweb::ForkInfoObj * forkInfoObj_
static void forkProcessFromEDM_helper(void *addr)
ModuleWebRegistry * mwrRef_
StatusCode statusAsync() const
pthread_mutex_t * mst_lock_
Definition: ModuleWeb.h:49
bool FUEventProcessor::enableMPEPSlave ( )
private

Definition at line 2336 of file FUEventProcessor.cc.

References alignCSCRings::e, enableCommon(), evtProcessor_, edm::hlt::Exception, evf::StateMachine::fireFailed(), fsm_, edm::PresenceFactory::get(), evf::FWEPWrapper::isWaitingForLs(), localLog(), edm::PresenceFactory::makePresence(), reasonForFailedState_, evf::FWEPWrapper::resetWaiting(), ML::MLlog4cplus::setAppl(), stor::utils::sleep(), startReceivingLoop(), startReceivingMonitorLoop(), startScalersWorkLoop(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by enabling(), and supervisor().

2337 {
2338  //all this happens only in the child process
2339 
2344  while(!evtProcessor_.isWaitingForLs())
2345  ::sleep(1);
2346 
2347  // @EM test do not run monitor loop in slave, only receiving&Monitor
2348  // evtProcessor_.startMonitoringWorkLoop();
2349  try{
2350  // evtProcessor_.makeServicesOnly();
2351  try{
2353  if(pf != 0) {
2354  pf->makePresence("MessageServicePresence").release();
2355  }
2356  else {
2357  LOG4CPLUS_ERROR(getApplicationLogger(),
2358  "Unable to create message service presence ");
2359  }
2360  }
2361  catch(...) {
2362  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception");
2363  }
2365  }
2366  catch (xcept::Exception &e) {
2367  reasonForFailedState_ = "enabling FAILED: " + (std::string)e.what();
2370  }
2371  bool retval = enableCommon();
2372  // while(evtProcessor_->getState()!= edm::event_processor::sRunning){
2373  // LOG4CPLUS_INFO(getApplicationLogger(),"waiting for edm::EventProcessor to start before enabling watchdog");
2374  // ::sleep(1);
2375  // }
2376  return retval;
2377 }
void sleep(Duration_t)
Definition: Utils.h:163
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
static void setAppl(xdaq::Application *app)
Definition: MLlog4cplus.cc:85
std::string reasonForFailedState_
void localLog(std::string)
static PresenceFactory * get()
std::auto_ptr< Presence > makePresence(std::string const &presence_type) const
bool isWaitingForLs()
Definition: FWEPWrapper.h:135
bool FUEventProcessor::enabling ( toolbox::task::WorkLoop *  wl)

Definition at line 493 of file FUEventProcessor.cc.

References toolbox::mem::_s_mutex_ptr_, allProcStats_, edm::EventProcessor::beginJob(), configString_, evf::FWEPWrapper::configuration(), configuration_, cpustat_, crashesThisRun_, datasetCounting_, evf::StateMachine::disableRcmsStateNotification(), alignCSCRings::e, edm_init_done_, enableClassic(), enableForkInEDM(), enableMPEPSlave(), epInitialized_, evtProcessor_, edm::hlt::Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), evf::FWEPWrapper::forceInitEventProcessorMaybe(), forkInEDM_, fsm_, evf::ShmOutputModuleRegistry::getDatasetCSV(), evf::FWEPWrapper::getmicromap(), evf::FWEPWrapper::getModuleWebRegistry(), evf::FWEPWrapper::getNumberOfMicrostates(), evf::FWEPWrapper::getShmOutputModuleRegistry(), evf::ShmOutputModuleRegistry::getShmOutputModules(), edm::EventProcessor::getState(), hasModuleWebRegistry_, hasPrescaleService_, hasServiceWebRegistry_, i, iDieStatisticsGathering_, iDieUrl_, idleProcStats_, evf::FWEPWrapper::init(), instance_, localLog(), mwrRef_, myProcess_, nbSubProcesses_, nbTotalDQM_, pickup_lock_, ratestat_, reasonForFailedState_, evf::FWEPWrapper::resetLumiSectionReferenceIndex(), rlimit_coresize_changed_, rlimit_coresize_default_, runNumber_, scalersLegendaInfoSpace_, scalersUpdates_, evf::RateStat::sendAuxLegenda(), evf::RateStat::sendLegenda(), evf::CPUStat::sendLegenda(), edm::event_processor::sError, evf::FWEPWrapper::setupFastTimerService(), sigmon_sem_, edm::event_processor::sInvalid, stor::utils::sleep(), sorRef_, edm::event_processor::sRunning, evf::Vulture::start(), start_lock_, startSignalMonitorWorkLoop(), startSummarizeWorkLoop(), edm::EventProcessor::stateName(), stop_lock_, AlCaHLTBitMon_QueryRunRegistry::string, subs_, vp_, and vulture_.

Referenced by sigmon().

494 {
495  nbTotalDQM_ = 0;
496  scalersUpdates_ = 0;
497  idleProcStats_ = 0;
498  allProcStats_ = 0;
499 // std::cout << "values " << ((nbSubProcesses_.value_!=0) ? 0x10 : 0) << " "
500 // << ((instance_.value_==0) ? 0x8 : 0) << " "
501 // << (hasServiceWebRegistry_.value_ ? 0x4 : 0) << " "
502 // << (hasModuleWebRegistry_.value_ ? 0x2 : 0) << " "
503 // << (hasPrescaleService_.value_ ? 0x1 : 0) <<std::endl;
504  unsigned short smap
505  = (datasetCounting_.value_ ? 0x20 : 0 )
506  + ((nbSubProcesses_.value_!=0) ? 0x10 : 0)
507  + (((instance_.value_%80)==0) ? 0x8 : 0) // have at least one legend per slice
508  + (hasServiceWebRegistry_.value_ ? 0x4 : 0)
509  + (hasModuleWebRegistry_.value_ ? 0x2 : 0)
510  + (hasPrescaleService_.value_ ? 0x1 : 0);
511 
512  LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling...");
513 
514  //reset core limit size
516  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
518  crashesThisRun_=0;
519  //recreate signal monitor sem
520  sem_destroy(sigmon_sem_);
521  sem_init(sigmon_sem_,true,0);
522 
523  if(!epInitialized_){
525  }
526  std::string cfg = configString_.toString(); evtProcessor_.init(smap,cfg);
527 
530 
531  if(!epInitialized_){
534  if(cpustat_) {delete cpustat_; cpustat_=0;}
536  nbSubProcesses_.value_,
537  instance_.value_,
538  iDieUrl_.value_);
539  if(ratestat_) {delete ratestat_; ratestat_=0;}
540  ratestat_ = new RateStat(iDieUrl_.value_);
541  if(iDieStatisticsGathering_.value_)
542  {
543  try
544  {
546  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
547  if(legenda !=0)
548  {
549  std::string slegenda = ((xdata::String*)legenda)->value_;
550  ratestat_->sendLegenda(slegenda);
551  }
552  if (sorRef_ && datasetCounting_.value_)
553  {
554  xdata::String dsLegenda = sorRef_->getDatasetCSV();
555  if (dsLegenda.value_.size())
556  ratestat_->sendAuxLegenda(dsLegenda);
557  }
558  }
559  catch(evf::Exception &e)
560  {
561  LOG4CPLUS_INFO(getApplicationLogger(),"could not send legenda"
562  << e.what());
563  }
564  catch (xcept::Exception& e) {
565  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to get or send legenda."
566  << e.what());
567  }
568  }
569  epInitialized_ = true;
570  }
571  configuration_ = evtProcessor_.configuration(); // get it again after init has been carried out...
573  //classic appl will return here
574  if(nbSubProcesses_.value_==0) return enableClassic();
575  //protect manipulation of subprocess array
576  pthread_mutex_lock(&start_lock_);
577  pthread_mutex_lock(&pickup_lock_);
578  subs_.clear();
579  subs_.resize(nbSubProcesses_.value_); // this should not be necessary
580  pid_t retval = -1;
581 
582  for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
583  {
584  subs_[i]=SubProcess(i,retval); //this will replace all the scattered variables
585  }
586  pthread_mutex_unlock(&pickup_lock_);
587 
588  pthread_mutex_unlock(&start_lock_);
589 
590  //set expected number of child EP's for the Init message(s) sent to the SM
591  try {
592  if (sorRef_) {
593  unsigned int nbExpectedEPWriters = nbSubProcesses_.value_;
594  if (nbExpectedEPWriters==0) nbExpectedEPWriters=1;//master instance processing
595  std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
596  for (unsigned int i=0;i<shmOutputs.size();i++) {
597  shmOutputs[i]->setNExpectedEPs(nbExpectedEPWriters);
598  }
599  }
600  }
601  catch (...)
602  {
603  LOG4CPLUS_ERROR(getApplicationLogger(),"Thrown Exception while setting nExpectedEPs in shmOutputs");
604  }
605 
606  //use new method if configured
607  edm_init_done_=true;
608  if (forkInEDM_.value_) {
609  edm_init_done_=false;
610  enableForkInEDM();
611  }
612  else
613  for(unsigned int i=0; i<nbSubProcesses_.value_; i++)
614  {
615  retval = subs_[i].forkNew();
616  if(retval==0)
617  {
618  myProcess_ = &subs_[i];
619  // dirty hack: delete/recreate global binary semaphore for later use in child
621  toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
622  int retval = pthread_mutex_destroy(&stop_lock_);
623  if(retval != 0) perror("error");
624  retval = pthread_mutex_init(&stop_lock_,0);
625  if(retval != 0) perror("error");
627 
628  return enableMPEPSlave();
629  // the loop is broken in the child
630  }
631  }
632 
633  if (forkInEDM_.value_) {
634 
636  while (!edm_init_done_) {
637  usleep(10000);
638  st = evtProcessor_->getState();
640  }
641  st = evtProcessor_->getState();
642  //error handling: EP must fork during sRunning
644  reasonForFailedState_ = std::string("Master edm::EventProcessor in state ") + evtProcessor_->stateName(st);
647  return false;
648  }
649 
650  sleep(1);
652  startSignalMonitorWorkLoop();//only with new forking
653  vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
654 
655  //enable after we are done with conditions loading and forking
656  LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
657  fsm_.fireEvent("EnableDone",this);
658  localLog("-I- Start completed");
659  return false;
660  }
661 
663  vp_ = vulture_->start(iDieUrl_.value_,runNumber_.value_);
664  LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
665  fsm_.fireEvent("EnableDone",this);
666  localLog("-I- Start completed");
667  return false;
668 }
xdata::Boolean hasModuleWebRegistry_
ShmOutputModuleRegistry * sorRef_
int i
Definition: DBlmapReader.cc:9
event_processor::State getState() const
xdata::Boolean datasetCounting_
xdata::UnsignedInteger32 runNumber_
pthread_mutex_t start_lock_
ShmOutputModuleRegistry * getShmOutputModuleRegistry()
Definition: FWEPWrapper.cc:491
xdata::UnsignedInteger32 forkInEDM_
void sendAuxLegenda(const std::string &)
Definition: RateStat.cc:29
xdata::Boolean hasServiceWebRegistry_
void init(unsigned short, std::string &)
Definition: FWEPWrapper.cc:193
void setupFastTimerService(unsigned int nProcesses)
Definition: FWEPWrapper.cc:497
xdata::InfoSpace * scalersLegendaInfoSpace_
void forceInitEventProcessorMaybe()
Definition: FWEPWrapper.h:68
unsigned int crashesThisRun_
void sleep(Duration_t)
Definition: Utils.h:163
std::vector< SubProcess > subs_
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
ModuleWebRegistry * getModuleWebRegistry()
Definition: FWEPWrapper.cc:485
std::string reasonForFailedState_
void resetLumiSectionReferenceIndex()
Definition: FWEPWrapper.h:137
unsigned long long idleProcStats_
xdata::Boolean iDieStatisticsGathering_
int start(std::string, int=0)
Definition: Vulture.cc:241
std::vector< edm::FUShmOutputModule * > getShmOutputModules()
xdata::Boolean hasPrescaleService_
unsigned int getNumberOfMicrostates()
Definition: FWEPWrapper.h:140
char const * stateName(event_processor::State s) const
void fireEvent(const std::string &evtType, void *originator)
void disableRcmsStateNotification()
Definition: StateMachine.h:64
toolbox::BSem * _s_mutex_ptr_
void localLog(std::string)
ModuleWebRegistry * mwrRef_
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String configString_
unsigned int scalersUpdates_
std::vector< std::string > const & getmicromap() const
Definition: FWEPWrapper.h:141
unsigned long long allProcStats_
std::string const & configuration() const
Definition: FWEPWrapper.h:102
void sendLegenda(const std::vector< std::string > &)
Definition: CPUStat.cc:39
pthread_mutex_t pickup_lock_
void sendLegenda(const std::string &)
Definition: RateStat.cc:24
xdata::UnsignedInteger32 instance_
pthread_mutex_t stop_lock_
xdata::Boolean epInitialized_
void FUEventProcessor::forkProcessesFromEDM ( )
private

Definition at line 2022 of file FUEventProcessor.cc.

References toolbox::mem::_s_mutex_ptr_, evf::FWEPWrapper::adjustLsIndexForRestart(), applicationInfoSpace_, attachDqmToShm(), gather_cfg::cout, evf::StateMachine::disableRcmsStateNotification(), alignCSCRings::e, edm_init_done_, evf::evfep_alarmhandler(), evf::evfep_sighandler(), evtProcessor_, cppFunctionSkipper::exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), forkInfoObj_, evf::moduleweb::ForkInfoObj::forkParams, fsm_, edm::PresenceFactory::get(), evf::ShmOutputModuleRegistry::getShmOutputModules(), hasShMem_, i, evf::moduleweb::ForkParams::isMaster, evf::FWEPWrapper::isWaitingForLs(), localLog(), edm::PresenceFactory::makePresence(), messageServicePresence_, myProcess_, nbSubProcesses_, reasonForFailedState_, evf::FWEPWrapper::resetPackedTriggerReport(), evf::moduleweb::ForkParams::restart, restart_in_progress_, hitfit::return, scalersUpdates_, ML::MLlog4cplus::setAppl(), evf::moduleweb::ForkParams::slotId, sorRef_, dqm_diff::start, startReceivingLoop(), startReceivingMonitorLoop(), startScalersWorkLoop(), evf::StateMachine::stateName(), stop_lock_, AlCaHLTBitMon_QueryRunRegistry::string, subs_, wlReceiving_, and wlReceivingMonitor_.

2022  {
2023 
2024  moduleweb::ForkParams * forkParams = &(forkInfoObj_->forkParams);
2025  unsigned int forkFrom=0;
2026  unsigned int forkTo=nbSubProcesses_.value_;
2027  if (forkParams->slotId>=0) {
2028  forkFrom=forkParams->slotId;
2029  forkTo=forkParams->slotId+1;
2030  }
2031 
2032  //before fork, make sure to disconnect output modules from Shm
2033  try {
2034  if (sorRef_) {
2035  std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
2036  for (unsigned int i=0;i<shmOutputs.size();i++) {
2037  //unregister PID from ShmBuffer/RB
2038  shmOutputs[i]->unregisterFromShm();
2039  //disconnect from Shm
2040  shmOutputs[i]->stop();
2041  }
2042  }
2043  }
2044  catch (std::exception &e)
2045  {
2046  reasonForFailedState_ = (std::string)"Thrown exception while disconnecting ShmOutputModule from Shm: " + e.what();
2047  LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
2050  }
2051  catch (...) {
2052  reasonForFailedState_ = "Thrown unknown exception while disconnecting ShmOutputModule from Shm: ";
2053  LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
2056  }
2057 
2058  std::string currentState = fsm_.stateName()->toString();
2059 
2060  //destroy MessageServicePresence thread before fork
2061  if (currentState!="stopping") {
2062  try {
2063  messageServicePresence_.reset();
2064  }
2065  catch (...) {
2066  LOG4CPLUS_ERROR(getApplicationLogger(),"Unable to destroy MessageService thread before fork!");
2067  }
2068  }
2069 
2070  if (currentState=="stopping") {
2071  LOG4CPLUS_ERROR(getApplicationLogger(),"Can not fork subprocesses in state " << fsm_.stateName()->toString());
2072  forkParams->isMaster=1;
2075  }
2076  //fork loop
2077  else for(unsigned int i=forkFrom; i<forkTo; i++)
2078  {
2079  int retval = subs_[i].forkNew();
2080  if(retval==0)
2081  {
2082  forkParams->isMaster=0;
2083  myProcess_ = &subs_[i];
2084  // dirty hack: delete/recreate global binary semaphore for later use in child
2086  toolbox::mem::_s_mutex_ptr_ = new toolbox::BSem(toolbox::BSem::FULL,true);
2087  int retval = pthread_mutex_destroy(&stop_lock_);
2088  if(retval != 0) perror("error");
2089  retval = pthread_mutex_init(&stop_lock_,0);
2090  if(retval != 0) perror("error");
2092 
2093  //recreate MessageLogger thread in slave after fork
2094  try{
2096  if(pf != 0) {
2097  messageServicePresence_ = pf->makePresence("MessageServicePresence");
2098  }
2099  else {
2100  LOG4CPLUS_ERROR(getApplicationLogger(),
2101  "SLAVE: Unable to create message service presence. pid:"<<getpid());
2102  }
2103  }
2104  catch(...) {
2105  LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE: Unknown Exception in MessageServicePresence. pid:"<<getpid());
2106  }
2107 
2109 
2110  //reconnect to Shm from output modules
2111  try {
2112  if (sorRef_) {
2113  std::vector<edm::FUShmOutputModule *> shmOutputs = sorRef_->getShmOutputModules();
2114  for (unsigned int i=0;i<shmOutputs.size();i++)
2115  shmOutputs[i]->start();
2116  }
2117  }
2118  catch (...)
2119  {
2120  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception (ShmOutputModule sending InitMsg (pid:"<<getpid() <<")");
2121  }
2122 
2123  if (forkParams->restart) {
2124  //do restart things
2125  scalersUpdates_ = 0;
2126  try {
2127  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
2128  fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
2129  fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
2130  } catch (...) {
2131  LOG4CPLUS_WARN(getApplicationLogger(),"Failed to Stop/Enable FSM of the restarted slave EP");
2132  }
2133  try{
2134  xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
2135  if(lsid) {
2136  ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
2137  }
2138  }
2139  catch(...){
2140  std::cout << "trouble with lsindex during restart" << std::endl;
2141  }
2142  try{
2143  xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
2144  if(lstb) {
2145  ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
2146  }
2147  }
2148  catch(...){
2149  std::cout << "trouble with resetting flag for eol recovery " << std::endl;
2150  }
2153  }
2154 
2155  //start other threads
2159  while(!evtProcessor_.isWaitingForLs())
2160  ::usleep(100000);//wait for scalers loop to start
2161 
2162  //connect DQMShmOutputModule
2163  if(hasShMem_) attachDqmToShm();
2164 
2165  //catch transition error if we are already Enabled
2166  try {
2167  fsm_.fireEvent("EnableDone",this);
2168  }
2169  catch (...) {}
2170 
2171  //make sure workloops are started
2172  while (!wlReceiving_->isActive() || !wlReceivingMonitor_->isActive()) usleep(10000);
2173 
2174  //unmask signals
2175  sigset_t tmpset_thread;
2176  sigemptyset(&tmpset_thread);
2177  sigaddset(&tmpset_thread, SIGQUIT);
2178  sigaddset(&tmpset_thread, SIGILL);
2179  sigaddset(&tmpset_thread, SIGABRT);
2180  sigaddset(&tmpset_thread, SIGFPE);
2181  sigaddset(&tmpset_thread, SIGSEGV);
2182  sigaddset(&tmpset_thread, SIGALRM);
2183  //sigprocmask(SIG_UNBLOCK, &tmpset_thread, 0);
2184  pthread_sigmask(SIG_UNBLOCK,&tmpset_thread,0);
2185 
2186  //set signal handlers
2187  struct sigaction sa;
2188  sigset_t tmpset;
2189  memset(&tmpset,0,sizeof(tmpset));
2190  sigemptyset(&tmpset);
2191  sa.sa_mask=tmpset;
2192  sa.sa_flags=SA_RESETHAND | SA_SIGINFO;
2193  sa.sa_handler=0;
2194  sa.sa_sigaction=evfep_sighandler;
2195 
2196  sigaction(SIGQUIT,&sa,0);
2197  sigaction(SIGILL,&sa,0);
2198  sigaction(SIGABRT,&sa,0);
2199  sigaction(SIGFPE,&sa,0);
2200  sigaction(SIGSEGV,&sa,0);
2201  sa.sa_sigaction=evfep_alarmhandler;
2202  sigaction(SIGALRM,&sa,0);
2203 
2204  //child return to DaqSource
2205  return ;
2206  }
2207  else {
2208 
2209  forkParams->isMaster=1;
2211  if (forkParams->restart) {
2212  std::ostringstream ost1;
2213  ost1 << "-I- New Process " << retval << " forked for slot " << forkParams->slotId;
2214  localLog(ost1.str());
2215  }
2217  //start "crash" receiver workloop
2218  }
2219  }
2220 
2221  //recreate MessageLogger thread after fork
2222  try{
2223  //release the presense factory in master
2225  if(pf != 0) {
2226  messageServicePresence_ = pf->makePresence("MessageServicePresence");
2227  }
2228  else {
2229  LOG4CPLUS_ERROR(getApplicationLogger(),
2230  "Unable to recreate message service presence ");
2231  }
2232  }
2233  catch(...) {
2234  LOG4CPLUS_ERROR(getApplicationLogger(),"Unknown Exception in MessageServicePresence");
2235  }
2236  restart_in_progress_=false;
2237  edm_init_done_=true;
2238 }
ShmOutputModuleRegistry * sorRef_
int i
Definition: DBlmapReader.cc:9
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
toolbox::task::WorkLoop * wlReceiving_
xdata::Boolean hasShMem_
void evfep_sighandler(int sig, siginfo_t *info, void *c)
std::vector< SubProcess > subs_
void fireFailed(const std::string &errorMsg, void *originator)
void adjustLsIndexForRestart()
Definition: FWEPWrapper.h:112
evf::StateMachine fsm_
static void setAppl(xdaq::Application *app)
Definition: MLlog4cplus.cc:85
std::string reasonForFailedState_
void resetPackedTriggerReport()
Definition: FWEPWrapper.h:111
std::vector< edm::FUShmOutputModule * > getShmOutputModules()
moduleweb::ForkInfoObj * forkInfoObj_
void fireEvent(const std::string &evtType, void *originator)
void disableRcmsStateNotification()
Definition: StateMachine.h:64
toolbox::BSem * _s_mutex_ptr_
static PresenceFactory * get()
void localLog(std::string)
xdata::InfoSpace * applicationInfoSpace_
xdata::UnsignedInteger32 nbSubProcesses_
std::auto_ptr< Presence > makePresence(std::string const &presence_type) const
xdata::String * stateName()
Definition: StateMachine.h:69
unsigned int scalersUpdates_
std::auto_ptr< edm::Presence > messageServicePresence_
bool isWaitingForLs()
Definition: FWEPWrapper.h:135
tuple cout
Definition: gather_cfg.py:121
toolbox::task::WorkLoop * wlReceivingMonitor_
pthread_mutex_t stop_lock_
void evfep_alarmhandler(int sig, siginfo_t *info, void *c)
void FUEventProcessor::forkProcessFromEDM_helper ( void *  addr)
static

Definition at line 2018 of file FUEventProcessor.cc.

Referenced by enableForkInEDM().

2018  {
2019  ((FUEventProcessor*)addr)->forkProcessesFromEDM();
2020 }
xoap::MessageReference FUEventProcessor::fsmCallback ( xoap::MessageReference  msg)
throw (xoap::exception::Exception
)

Definition at line 802 of file FUEventProcessor.cc.

References evf::StateMachine::commandCallback(), fsm_, and lumiQueryAPI::msg.

804 {
805  return fsm_.commandCallback(msg);
806 }
xoap::MessageReference commandCallback(xoap::MessageReference msg)
Definition: StateMachine.cc:71
evf::StateMachine fsm_
void FUEventProcessor::getSlavePids ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 844 of file FUEventProcessor.cc.

References i, dbtoconf::out, and subs_.

Referenced by FUEventProcessor().

845 {
846  for (unsigned int i=0;i<subs_.size();i++)
847  {
848  if (i!=0) *out << ",";
849  *out << subs_[i].pid();
850  }
851 }
int i
Definition: DBlmapReader.cc:9
std::vector< SubProcess > subs_
tuple out
Definition: dbtoconf.py:99
bool FUEventProcessor::halting ( toolbox::task::WorkLoop *  wl)

Definition at line 765 of file FUEventProcessor.cc.

References doEndRunInEDM(), alignCSCRings::e, evtProcessor_, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), forkInEDM_, forkInfoObj_, fsm_, localLog(), nbSubProcesses_, reasonForFailedState_, rlimit_coresize_default_, sigmon_sem_, evf::FWEPWrapper::stopAndHalt(), stopSlavesAndAcknowledge(), and AlCaHLTBitMon_QueryRunRegistry::string.

766 {
767  LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
768  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
769  if(nbSubProcesses_.value_!=0) {
771  if (forkInEDM_.value_) {
772  sem_post(sigmon_sem_);
773  if (!doEndRunInEDM())
774  return false;
775 
776  }
777  }
778  try{
780  //cleanup forking variables
781  if (forkInfoObj_) {
782  delete forkInfoObj_;
783  forkInfoObj_=0;
784  }
785  }
786  catch (evf::Exception &e) {
787  reasonForFailedState_ = "halting FAILED: " + (std::string)e.what();
790  }
791  // if(hasShMem_) detachDqmFromShm();
792 
793  LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
794  fsm_.fireEvent("HaltDone",this);
795 
796  localLog("-I- Halt completed");
797  return false;
798 }
xdata::UnsignedInteger32 forkInEDM_
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
std::string reasonForFailedState_
moduleweb::ForkInfoObj * forkInfoObj_
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
xdata::UnsignedInteger32 nbSubProcesses_
void FUEventProcessor::handleSignalSlave ( int  sig,
siginfo_t *  info,
void *  c 
)

Definition at line 2713 of file FUEventProcessor.cc.

References gather_cfg::cout, rlimit_coresize_changed_, sigmon_sem_, stor::utils::sleep(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by evf::evfep_sighandler().

2714 {
2715  //notify master
2716  sem_post(sigmon_sem_);
2717 
2718  //sleep while master takes action
2719  sleep(2);
2720 
2721  //set up alarm if handler deadlocks on unsafe actions
2722  alarm(5);
2723 
2724  std::cout << "--- Slave EP signal handler caught signal " << sig << " process id is " << info->si_pid <<" ---" << std::endl;
2725  std::cout << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl;
2726  std::cout << "--- Stacktrace follows --" << std::endl;
2727  std::ostringstream stacktr;
2728  toolbox::stacktrace(20,stacktr);
2729  std::cout << stacktr.str();
2731  std::cout << "--- Dumping core." << " --- " << std::endl;
2732  else
2733  std::cout << "--- Core dump count exceeded on this FU. ---"<<std::endl;
2734 
2735  std::string hasdump = "";
2736  if (rlimit_coresize_changed_) hasdump = " (core dump disabled) ";
2737 
2738  LOG4CPLUS_ERROR(getApplicationLogger(), "--- Slave EP signal handler caught signal " << sig << ". process id is " << getpid()
2739  << " on node " << toolbox::net::getHostName() << " ---" << std::endl
2740  << "--- Address: " << std::hex << info->si_addr << std::dec << " --- " << std::endl
2741  << "--- Stacktrace follows"<< hasdump << " ---" << std::endl << stacktr.str()
2742  );
2743 
2744  //re-raise signal with default handler (will cause core dump if enabled)
2745  raise(sig);
2746 }
void sleep(Duration_t)
Definition: Utils.h:163
tuple cout
Definition: gather_cfg.py:121
void FUEventProcessor::localLog ( std::string  m)
private

Definition at line 1120 of file FUEventProcessor.cc.

References logRing_, logRingIndex_, logRingSize_, logWrap_, m, and cond::timestamp.

Referenced by configuring(), enableClassic(), enableCommon(), enableMPEPSlave(), enabling(), forkProcessesFromEDM(), halting(), stopClassic(), stopSlavesAndAcknowledge(), and supervisor().

1121 {
1122  timeval tv;
1123 
1124  gettimeofday(&tv,0);
1125  tm *uptm = localtime(&tv.tv_sec);
1126  char datestring[256];
1127  strftime(datestring, sizeof(datestring),"%c", uptm);
1128 
1129  if(logRingIndex_ == 0){logWrap_ = true; logRingIndex_ = logRingSize_;}
1130  logRingIndex_--;
1131  std::ostringstream timestamp;
1132  timestamp << " at " << datestring;
1133  m += timestamp.str();
1135 }
std::vector< std::string > logRing_
static const unsigned int logRingSize_
std::string FUEventProcessor::logsAsString ( )
private

Definition at line 1103 of file FUEventProcessor.cc.

References i, logRing_, logRingIndex_, and logWrap_.

1104 {
1105  std::ostringstream oss;
1106  if(logWrap_)
1107  {
1108  for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
1109  oss << logRing_[i] << std::endl;
1110  for(unsigned int i = 0; i < logRingIndex_; i++)
1111  oss << logRing_[i] << std::endl;
1112  }
1113  else
1114  for(unsigned int i = logRingIndex_; i < logRing_.size(); i++)
1115  oss << logRing_[i] << std::endl;
1116 
1117  return oss.str();
1118 }
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > logRing_
void FUEventProcessor::makeStaticInfo ( )
private

Definition at line 2678 of file FUEventProcessor.cc.

References applicationInfoSpace_, evf::utils::cDiv(), alignCSCRings::e, edm::hlt::Exception, edm::getReleaseVersion(), hasModuleWebRegistry_, hasServiceWebRegistry_, hasShMem_, evf::utils::mDiv(), outPut_, and updaterStatic_.

Referenced by FUEventProcessor().

2679 {
2680  using namespace utils;
2681  std::ostringstream ost;
2682  mDiv(&ost,"ve");
2683  ost<< "$Revision: 1.164 $ (" << edm::getReleaseVersion() <<")";
2684  cDiv(&ost);
2685  mDiv(&ost,"ou",outPut_.toString());
2686  mDiv(&ost,"sh",hasShMem_.toString());
2687  mDiv(&ost,"mw",hasModuleWebRegistry_.toString());
2688  mDiv(&ost,"sw",hasServiceWebRegistry_.toString());
2689 
2690  xdata::Serializable *monsleep = 0;
2691  xdata::Serializable *lstimeout = 0;
2692  try{
2693  monsleep = applicationInfoSpace_->find("monSleepSec");
2694  lstimeout = applicationInfoSpace_->find("lsTimeOut");
2695  }
2697  }
2698 
2699  if(monsleep!=0)
2700  mDiv(&ost,"ms",monsleep->toString());
2701  if(lstimeout!=0)
2702  mDiv(&ost,"lst",lstimeout->toString());
2703  char cbuf[sizeof(struct utsname)];
2704  struct utsname* buf = (struct utsname*)cbuf;
2705  uname(buf);
2706  mDiv(&ost,"sysinfo");
2707  ost << buf->sysname << " " << buf->nodename
2708  << " " << buf->release << " " << buf->version << " " << buf->machine;
2709  cDiv(&ost);
2710  updaterStatic_ = ost.str();
2711 }
xdata::Boolean hasModuleWebRegistry_
xdata::Boolean hasServiceWebRegistry_
xdata::Boolean hasShMem_
void cDiv(std::ostringstream *out)
Definition: procUtils.cc:357
std::string getReleaseVersion()
xdata::InfoSpace * applicationInfoSpace_
void mDiv(std::ostringstream *out, std::string name)
Definition: procUtils.cc:354
void FUEventProcessor::microState ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 2484 of file FUEventProcessor.cc.

References autoRestartSlaves_, gather_cfg::cout, alignCSCRings::e, evtProcessor_, cppFunctionSkipper::exception, fsm_, evf::FWEPWrapper::getScalersUpdates(), i, recoMuon::in, evf::FWEPWrapper::microState(), evf::FWEPWrapper::moduleNameFromIndex(), myProcess_, nbdead_, nblive_, nbSubProcesses_, nbTotalDQM_, dbtoconf::out, pickup_lock_, start_lock_, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), AlCaHLTBitMon_QueryRunRegistry::string, subs_, and cms::Exception::what().

Referenced by FUEventProcessor().

2485 {
2486  std::string urn = getApplicationDescriptor()->getURN();
2487  try{
2490  if(myProcess_) {std::cout << "microstate called for child! bail out" << std::endl; return;}
2491  *out << "<tr><td>" << fsm_.stateName()->toString()
2492  << "</td><td>"<< (myProcess_ ? "S" : "M") <<"</td><td>" << nblive_ << "</td><td>"
2493  << nbdead_ << "</td><td><a href=\"/" << urn << "/procStat\">" << getpid() <<"</a></td>";
2495  *out << "<td></td><td>" << nbTotalDQM_
2496  << "</td><td>" << evtProcessor_.getScalersUpdates() << "</td></tr>";
2497  if(nbSubProcesses_.value_!=0 && !myProcess_)
2498  {
2499  pthread_mutex_lock(&start_lock_);
2500  for(unsigned int i = 0; i < subs_.size(); i++)
2501  {
2502  try{
2503  if(subs_[i].alive()>0)
2504  {
2505  *out << "<tr><td bgcolor=\"#00FF00\" id=\"a"
2506  << i << "\">""Alive</td><td>S</td><td>"
2507  << subs_[i].queueId() << "<td>"
2508  << subs_[i].queueStatus()<< "/"
2509  << subs_[i].queueOccupancy() << "/"
2510  << subs_[i].queuePidOfLastSend() << "/"
2511  << subs_[i].queuePidOfLastReceive()
2512  << "</td><td><a id=\"p"<< i << "\" href=\"SubWeb?process="
2513  << subs_[i].pid() << "&method=procStat\">"
2514  << subs_[i].pid()<<"</a></td>" //<< msg->mtext;
2515  << "<td>" << evtProcessor_.stateNameFromIndex(subs_[i].params().Ms) << "</td><td>"
2516  << evtProcessor_.moduleNameFromIndex(subs_[i].params().ms) << "</td><td>"
2517  << subs_[i].params().nba << "/" << subs_[i].params().nbp
2518  << " (" << float(subs_[i].params().nba)/float(subs_[i].params().nbp)*100. <<"%)"
2519  << "</td><td>" << subs_[i].params().ls << "/" << subs_[i].params().ls
2520  << "</td><td>" << subs_[i].params().ps
2521  << "</td><td"
2522  << ((subs_[i].params().eols<subs_[i].params().ls) ? " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"") << ">"
2523  << subs_[i].params().eols
2524  << "</td><td>" << subs_[i].params().dqm
2525  << "</td><td>" << subs_[i].params().trp << "</td>";
2526  }
2527  else
2528  {
2529  pthread_mutex_lock(&pickup_lock_);
2530  *out << "<tr><td id=\"a"<< i << "\" ";
2531  if(subs_[i].alive()==-1000)
2532  *out << " bgcolor=\"#bbaabb\">NotInitialized";
2533  else
2534  *out << (subs_[i].alive()==0 ? ">Done" : " bgcolor=\"#FF0000\">Dead");
2535  *out << "</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
2536  << subs_[i].queueStatus() << "/"
2537  << subs_[i].queueOccupancy() << "/"
2538  << subs_[i].queuePidOfLastSend() << "/"
2539  << subs_[i].queuePidOfLastReceive()
2540  << "</td><td id=\"p"<< i << "\">"
2541  <<subs_[i].pid()<<"</td><td colspan=\"5\">" << subs_[i].reasonForFailed();
2542  if(subs_[i].alive()!=0 && subs_[i].alive()!=-1000)
2543  {
2544  if(autoRestartSlaves_ && subs_[i].restartCount()<=2)
2545  *out << " will restart in " << subs_[i].countdown() << " s";
2546  else if(autoRestartSlaves_)
2547  *out << " reached maximum restart count";
2548  else *out << " autoRestart is disabled ";
2549  }
2550  *out << "</td><td"
2551  << ((subs_[i].params().eols<subs_[i].params().ls) ?
2552  " bgcolor=\"#00FF00\"" : " bgcolor=\"#FF0000\"")
2553  << ">"
2554  << subs_[i].params().eols
2555  << "</td><td>" << subs_[i].params().dqm
2556  << "</td><td>" << subs_[i].params().trp << "</td>";
2557  pthread_mutex_unlock(&pickup_lock_);
2558  }
2559  *out << "</tr>";
2560  }
2561  catch(evf::Exception &e){
2562  *out << "<tr><td id=\"a"<< i << "\" "
2563  <<"bgcolor=\"#FFFF00\">NotResp</td><td>S</td><td>"<< subs_[i].queueId() << "<td>"
2564  << subs_[i].queueStatus() << "/"
2565  << subs_[i].queueOccupancy() << "/"
2566  << subs_[i].queuePidOfLastSend() << "/"
2567  << subs_[i].queuePidOfLastReceive()
2568  << "</td><td id=\"p"<< i << "\">"
2569  <<subs_[i].pid()<<"</td>";
2570  }
2571  }
2572  pthread_mutex_unlock(&start_lock_);
2573  }
2574  }
2575  catch(evf::Exception &e)
2576  {
2577  LOG4CPLUS_INFO(getApplicationLogger(),"evf::Exception caught in microstate - " << e.what());
2578  }
2579  catch(cms::Exception &e)
2580  {
2581  LOG4CPLUS_INFO(getApplicationLogger(),"cms::Exception caught in microstate - " << e.what());
2582  }
2583  catch(std::exception &e)
2584  {
2585  LOG4CPLUS_INFO(getApplicationLogger(),"std::Exception caught in microstate - " << e.what());
2586  }
2587  catch(...)
2588  {
2589  LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception caught in microstate - ");
2590  }
2591 
2592 }
virtual char const * what() const
Definition: Exception.cc:141
int i
Definition: DBlmapReader.cc:9
pthread_mutex_t start_lock_
xdata::Boolean autoRestartSlaves_
std::string const & moduleNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:122
std::vector< SubProcess > subs_
evf::StateMachine fsm_
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
std::string const & stateNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:127
pthread_mutex_t pickup_lock_
tuple cout
Definition: gather_cfg.py:121
void microState(xgi::Input *in, xgi::Output *out)
unsigned int getScalersUpdates()
Definition: FWEPWrapper.h:136
void evf::FUEventProcessor::moduleWeb ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
inline

Definition at line 114 of file FUEventProcessor.h.

References evtProcessor_, and evf::FWEPWrapper::moduleWeb().

Referenced by FUEventProcessor(), and receivingAndMonitor().

void moduleWeb(xgi::Input *in, xgi::Output *out)
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::pathNames ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 1039 of file FUEventProcessor.cc.

References evtProcessor_, dbtoconf::out, scalersLegendaInfoSpace_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FUEventProcessor().

1041 {
1042 
1043  if(evtProcessor_ != 0){
1044  xdata::Serializable *legenda = scalersLegendaInfoSpace_->find("scalersLegenda");
1045  if(legenda !=0){
1046  std::string slegenda = ((xdata::String*)legenda)->value_;
1047  *out << slegenda << std::endl;
1048  }
1049  }
1050 }
xdata::InfoSpace * scalersLegendaInfoSpace_
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::procStat ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 2668 of file FUEventProcessor.cc.

References dbtoconf::out, and evf::utils::procStat().

Referenced by FUEventProcessor(), and receivingAndMonitor().

2669 {
2671 }
void procStat(std::ostringstream *out)
Definition: procUtils.cc:176
tuple out
Definition: dbtoconf.py:99
bool FUEventProcessor::receiving ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1192 of file FUEventProcessor.cc.

References alignCSCRings::e, evf::StateMachine::fireEvent(), fsm_, messageServicePresence_, lumiQueryAPI::msg, MSQM_MESSAGE_TYPE_FSTOP, MSQM_MESSAGE_TYPE_RLI, MSQM_MESSAGE_TYPE_RLR, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, myProcess_, evf::SubProcess::postSlave(), evf::SubProcess::rcvSlave(), rlimit_coresize_changed_, rlimit_coresize_default_, stop_lock_, and stopClassic().

Referenced by startReceivingLoop().

1193 {
1194  MsgBuf msg;
1195  try{
1196  myProcess_->rcvSlave(msg,false); //will receive only messages from Master
1197  if(msg->mtype==MSQM_MESSAGE_TYPE_RLI)
1198  {
1199  rlimit rl;
1200  getrlimit(RLIMIT_CORE,&rl);
1201  rl.rlim_cur=0;
1202  setrlimit(RLIMIT_CORE,&rl);
1204  }
1205  if (msg->mtype==MSQM_MESSAGE_TYPE_RLR)
1206  {
1207  //reset coresize limit
1208  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
1210  }
1211  if(msg->mtype==MSQM_MESSAGE_TYPE_STOP)
1212  {
1213  pthread_mutex_lock(&stop_lock_);
1214  try {
1215  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
1216  }
1217  catch (...) {
1218  LOG4CPLUS_ERROR(getApplicationLogger(),"Failed to go to Stopping state in slave EP, pid "
1219  << getpid() << " The state on Stop event was not consistent");
1220  }
1221 
1222  try {
1223  stopClassic(); // call the normal sequence of stopping - as this is allowed to fail provisions must be made ...@@@EM
1224  }
1225  catch (...) {
1226  LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP 'receiving' workloop: exception " << getpid());
1227  }
1228 
1229  //destroy MessageService thread before exit
1230  try{
1231  messageServicePresence_.reset();
1232  }
1233  catch(...) {
1234  LOG4CPLUS_ERROR(getApplicationLogger(),"SLAVE:Unable to destroy MessageServicePresence. pid:" << getpid() );
1235  }
1236 
1238  myProcess_->postSlave(msg1,false);
1239  pthread_mutex_unlock(&stop_lock_);
1240  fclose(stdout);
1241  fclose(stderr);
1242  _exit(EXIT_SUCCESS);
1243  }
1244  if(msg->mtype==MSQM_MESSAGE_TYPE_FSTOP)
1245  _exit(EXIT_SUCCESS);
1246  }
1247  catch(evf::Exception &e){
1248  LOG4CPLUS_ERROR(getApplicationLogger(),"Slave EP pid:" << getpid() << " receiving WorkLoop exception: "<<e.what());
1249  }
1250  return true;
1251 }
unsigned long rcvSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:116
#define MSQM_MESSAGE_TYPE_FSTOP
Definition: queue_defs.h:17
#define MSQM_MESSAGE_TYPE_RLR
Definition: queue_defs.h:25
#define MSQS_MESSAGE_TYPE_STOP
Definition: queue_defs.h:31
evf::StateMachine fsm_
#define MSQM_MESSAGE_TYPE_RLI
Definition: queue_defs.h:24
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:110
void fireEvent(const std::string &evtType, void *originator)
std::auto_ptr< edm::Presence > messageServicePresence_
#define MSQM_MESSAGE_TYPE_STOP
Definition: queue_defs.h:16
pthread_mutex_t stop_lock_
bool FUEventProcessor::receivingAndMonitor ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1744 of file FUEventProcessor.cc.

References anonymousPipe_, applicationInfoSpace_, harvestRelVal::args, prof2calltree::count, gather_cfg::cout, cycle, data, defaultWebPage(), evf::prg::dqm, alignCSCRings::e, evf::prg::eols, evf::FWEPWrapper::epMAltState_, evf::FWEPWrapper::epmAltState_, evtProcessor_, edm::hlt::Exception, exitOnError_, spr::find(), first, fsm_, evf::internal::MyCgi::getEnvironment(), recoMuon::in, Input, evf::FWEPWrapper::lastLumiUsingEol_, evf::prg::ls, evf::FWEPWrapper::lsid_, MAX_PIPE_BUFFER_SIZE, PFRecoTauDiscriminationAgainstElectronMVA2_cfi::method, evf::FWEPWrapper::microState(), moduleWeb(), evf::FWEPWrapper::monitoring(), evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_MCS, MSQM_MESSAGE_TYPE_PRG, MSQM_MESSAGE_TYPE_TRP, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_MCR, MSQS_MESSAGE_TYPE_WEB, myProcess_, evf::prg::nba, evf::prg::nbp, NUMERIC_MESSAGE_SIZE, dbtoconf::out, Output, PIPE_WRITE, pos, evf::SubProcess::postSlave(), procStat(), evf::prg::ps, evf::FWEPWrapper::psid_, o2o::query, evf::SubProcess::rcvSlave(), scalersUpdates_, edm::second(), edm::event_processor::sError, slave_message_monitoring_, slave_message_prr_, stor::utils::sleep(), spotlightWebPage(), evf::StateMachine::stateName(), stop_lock_, AlCaHLTBitMon_QueryRunRegistry::string, edm::EventProcessor::totalEvents(), edm::EventProcessor::totalEventsPassed(), evf::prg::trp, and TablePrint::write.

Referenced by startReceivingMonitorLoop().

1745 {
1746  try{
1747  myProcess_->rcvSlave(slave_message_monitoring_,true); //will receive only messages from Master
1748  switch(slave_message_monitoring_->mtype)
1749  {
1750  case MSQM_MESSAGE_TYPE_MCS:
1751  {
1752  xgi::Input *in = 0;
1753  xgi::Output out;
1754  evtProcessor_.microState(in,&out);
1755  MsgBuf msg1(out.str().size(),MSQS_MESSAGE_TYPE_MCR);
1756  strncpy(msg1->mtext,out.str().c_str(),out.str().size());
1757  myProcess_->postSlave(msg1,true);
1758  break;
1759  }
1760 
1761  case MSQM_MESSAGE_TYPE_PRG:
1762  {
1763  xdata::Serializable *dqmp = 0;
1764  xdata::UnsignedInteger32 *dqm = 0;
1766  try{
1767  dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
1768  } catch(xdata::exception::Exception e){}
1769  if(dqmp!=0)
1770  dqm = (xdata::UnsignedInteger32*)dqmp;
1771 
1772  // monitorInfoSpace_->lock();
1773  prg * data = (prg*)slave_message_prr_->mtext;
1774  data->ls = evtProcessor_.lsid_;
1776  data->ps = evtProcessor_.psid_;
1777  data->nbp = evtProcessor_->totalEvents();
1778  data->nba = evtProcessor_->totalEventsPassed();
1779  data->Ms = evtProcessor_.epMAltState_.value_;
1780  data->ms = evtProcessor_.epmAltState_.value_;
1781  if(dqm) data->dqm = dqm->value_; else data->dqm = 0;
1782  data->trp = scalersUpdates_;
1783  // monitorInfoSpace_->unlock();
1785  if(exitOnError_.value_)
1786  {
1787  // after each monitoring cycle check if we are in inconsistent state and exit if configured to do so
1788  // std::cout << getpid() << "receivingAndMonitor: trying to acquire stop lock " << std::endl;
1789  if(data->Ms == edm::event_processor::sError)
1790  {
1791  bool running = true;
1792  int count = 0;
1793  while(running){
1794  int retval = pthread_mutex_lock(&stop_lock_);
1795  if(retval != 0) perror("error");
1796  running = fsm_.stateName()->toString()=="Enabled";
1797  if(count>5) _exit(-1);
1798  pthread_mutex_unlock(&stop_lock_);
1799  if(running) {::sleep(1); count++;}
1800  }
1801  }
1802  }
1803  break;
1804  }
1805  case MSQM_MESSAGE_TYPE_WEB:
1806  {
1807  xgi::Input *in = 0;
1808  xgi::Output out;
1809  unsigned int bytesToSend = 0;
1812  size_t pos = query.find_first_of("&");
1814  std::string args;
1815  if(pos!=std::string::npos)
1816  {
1817  method = query.substr(0,pos);
1818  args = query.substr(pos+1,query.length()-pos-1);
1819  }
1820  else
1821  method=query;
1822 
1823  if(method=="Spotlight")
1824  {
1825  spotlightWebPage(in,&out);
1826  }
1827  else if(method=="procStat")
1828  {
1829  procStat(in,&out);
1830  }
1831  else if(method=="moduleWeb")
1832  {
1833  internal::MyCgi mycgi;
1834  boost::char_separator<char> sep(";");
1835  boost::tokenizer<boost::char_separator<char> > tokens(args, sep);
1836  for (boost::tokenizer<boost::char_separator<char> >::iterator tok_iter = tokens.begin();
1837  tok_iter != tokens.end(); ++tok_iter){
1838  size_t pos = (*tok_iter).find_first_of("%");
1839  if(pos != std::string::npos){
1840  std::string first = (*tok_iter).substr(0 , pos);
1841  std::string second = (*tok_iter).substr(pos+1, (*tok_iter).length()-pos-1);
1842  mycgi.getEnvironment()[first]=second;
1843  }
1844  }
1845  moduleWeb(&mycgi,&out);
1846  }
1847  else if(method=="Default")
1848  {
1849  defaultWebPage(in,&out);
1850  }
1851  else
1852  {
1853  out << "Error 404!!!!!!!!" << std::endl;
1854  }
1855 
1856 
1857  bytesToSend = out.str().size();
1858  unsigned int cycle = 0;
1859  if(bytesToSend==0)
1860  {
1861  snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", bytesToSend);
1862  myProcess_->postSlave(msg1,true);
1863  }
1864  while(bytesToSend !=0){
1865  unsigned int msgSize = bytesToSend>MAX_PIPE_BUFFER_SIZE ? MAX_PIPE_BUFFER_SIZE : bytesToSend;
1867  out.str().c_str()+MAX_PIPE_BUFFER_SIZE*cycle,
1868  msgSize);
1869  snprintf(msg1->mtext, NUMERIC_MESSAGE_SIZE, "%d", msgSize);
1870  myProcess_->postSlave(msg1,true);
1871  bytesToSend -= msgSize;
1872  cycle++;
1873  }
1874  break;
1875  }
1876  case MSQM_MESSAGE_TYPE_TRP:
1877  {
1878  break;
1879  }
1880  }
1881  }
1882  catch(evf::Exception &e){std::cout << "exception caught in recevingM: " << e.what() << std::endl;}
1883  return true;
1884 }
unsigned int ps
Definition: queue_defs.h:60
unsigned long rcvSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:116
void spotlightWebPage(xgi::Input *, xgi::Output *)
unsigned int eols
Definition: queue_defs.h:59
#define Input(cl)
Definition: vmac.h:189
unsigned int nbp
Definition: queue_defs.h:61
xdata::Integer epmAltState_
Definition: FWEPWrapper.h:197
std::map< std::string, std::string, std::less< std::string > > & getEnvironment()
boost::tokenizer< boost::char_separator< char > > tokenizer
unsigned int ls
Definition: queue_defs.h:58
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
void sleep(Duration_t)
Definition: Utils.h:163
xdata::Boolean exitOnError_
U second(std::pair< T, U > const &p)
#define MSQM_MESSAGE_TYPE_PRG
Definition: queue_defs.h:19
#define MSQS_MESSAGE_TYPE_WEB
Definition: queue_defs.h:33
evf::StateMachine fsm_
int cycle
unsigned int lsid_
Definition: FWEPWrapper.h:210
bool monitoring(toolbox::task::WorkLoop *wl)
Definition: FWEPWrapper.cc:647
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:110
unsigned int psid_
Definition: FWEPWrapper.h:211
unsigned int Ms
Definition: queue_defs.h:63
#define NUMERIC_MESSAGE_SIZE
Definition: queue_defs.h:38
bool first
Definition: L1TdeRCT.cc:94
int totalEvents() const
void procStat(xgi::Input *in, xgi::Output *out)
unsigned int nba
Definition: queue_defs.h:62
tuple out
Definition: dbtoconf.py:99
#define MSQM_MESSAGE_TYPE_TRP
Definition: queue_defs.h:21
#define MSQS_MESSAGE_TYPE_MCR
Definition: queue_defs.h:30
xdata::InfoSpace * applicationInfoSpace_
xdata::String * stateName()
Definition: StateMachine.h:69
unsigned int scalersUpdates_
xdata::UnsignedInteger32 lastLumiUsingEol_
Definition: FWEPWrapper.h:222
#define PIPE_WRITE
Definition: queue_defs.h:41
void moduleWeb(xgi::Input *in, xgi::Output *out)
dictionary args
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
#define Output(cl)
Definition: vmac.h:193
void defaultWebPage(xgi::Input *in, xgi::Output *out)
unsigned int trp
Definition: queue_defs.h:66
tuple query
Definition: o2o.py:269
xdata::Integer epMAltState_
Definition: FWEPWrapper.h:196
tuple cout
Definition: gather_cfg.py:121
#define MSQM_MESSAGE_TYPE_MCS
Definition: queue_defs.h:18
#define MSQM_MESSAGE_TYPE_WEB
Definition: queue_defs.h:20
void microState(xgi::Input *in, xgi::Output *out)
#define MAX_PIPE_BUFFER_SIZE
Definition: queue_defs.h:42
int totalEventsPassed() const
pthread_mutex_t stop_lock_
unsigned int dqm
Definition: queue_defs.h:65
unsigned int ms
Definition: queue_defs.h:64
bool FUEventProcessor::restartForkInEDM ( unsigned int  slotId)
private

Definition at line 2304 of file FUEventProcessor.cc.

References evf::moduleweb::ForkInfoObj::control_sem_, prof2calltree::count, forkInfoObj_, evf::moduleweb::ForkInfoObj::forkParams, evf::moduleweb::ForkParams::isMaster, evf::moduleweb::ForkInfoObj::lock(), log_, evf::moduleweb::ForkParams::restart, restart_in_progress_, evf::moduleweb::ForkParams::slotId, evf::moduleweb::ForkInfoObj::stopCondition, and evf::moduleweb::ForkInfoObj::unlock().

Referenced by supervisor().

2304  {
2305  //daqsource will keep this lock until master returns after fork
2306  //so that we don't do another EP restart in between
2307  forkInfoObj_->lock();
2308  forkInfoObj_->forkParams.slotId=slotId;
2312  LOG4CPLUS_DEBUG(log_, " restarting subprocess in slot "<< slotId <<": posting on control semaphore");
2313  sem_post(forkInfoObj_->control_sem_);
2314  forkInfoObj_->unlock();
2315  usleep(1000);
2316  //sleep until fork is performed
2317  int count=50;
2318  restart_in_progress_=true;
2319  while (restart_in_progress_ && count--) usleep(20000);
2320  return true;
2321 }
unsigned int stopCondition
Definition: ModuleWeb.h:46
moduleweb::ForkInfoObj * forkInfoObj_
bool FUEventProcessor::scalers ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1605 of file FUEventProcessor.cc.

References gather_cfg::cout, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), evf::FWEPWrapper::getPackedTriggerReport(), evf::FWEPWrapper::getTriggerReport(), myProcess_, evf::SubProcess::postSlave(), run_regression::ret, scalersUpdates_, and wlScalersActive_.

Referenced by startScalersWorkLoop().

1606 {
1607  if(evtProcessor_)
1608  {
1609  if(!evtProcessor_.getTriggerReport(true)) {
1610  wlScalersActive_ = false;
1611  return false;
1612  }
1613  }
1614  else
1615  {
1616  std::cout << getpid()<< " Scalers workloop, bailing out, no evtProcessor " << std::endl;
1617  wlScalersActive_ = false;
1618  return false;
1619  }
1620  if(myProcess_)
1621  {
1622  // std::cout << getpid() << "going to post on control queue from scalers" << std::endl;
1624  if(ret!=0) std::cout << "scalers workloop, error posting to sqs_ " << errno << std::endl;
1625  scalersUpdates_++;
1626  }
1627  else
1629  return true;
1630 }
MsgBuf & getPackedTriggerReport()
Definition: FWEPWrapper.h:114
bool fireScalersUpdate()
Definition: FWEPWrapper.cc:777
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:110
bool getTriggerReport(bool useLock)
Definition: FWEPWrapper.cc:694
unsigned int scalersUpdates_
tuple cout
Definition: gather_cfg.py:121
void FUEventProcessor::scalersWeb ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 1026 of file FUEventProcessor.cc.

References evtProcessor_, evf::FWEPWrapper::getPackedTriggerReportAsStruct(), and dbtoconf::out.

Referenced by FUEventProcessor().

1028 {
1029 
1030  out->getHTTPResponseHeader().addHeader( "Content-Type",
1031  "application/octet-stream" );
1032  out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
1033  "binary" );
1034  if(evtProcessor_ != 0){
1036  }
1037 }
tuple out
Definition: dbtoconf.py:99
TriggerReportStatic * getPackedTriggerReportAsStruct()
Definition: FWEPWrapper.h:115
void FUEventProcessor::sendMessageOverMonitorQueue ( MsgBuf buf)

Definition at line 2673 of file FUEventProcessor.cc.

References myProcess_, and evf::SubProcess::postSlave().

2674 {
2675  if(myProcess_) myProcess_->postSlave(buf,true);
2676 }
int postSlave(MsgBuf &ptr, bool isMonitor)
Definition: SubProcess.h:110
void evf::FUEventProcessor::serviceWeb ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)
inline

Definition at line 115 of file FUEventProcessor.h.

References evtProcessor_, and evf::FWEPWrapper::serviceWeb().

Referenced by FUEventProcessor().

void serviceWeb(xgi::Input *in, xgi::Output *out)
tuple out
Definition: dbtoconf.py:99
void FUEventProcessor::setAttachDqmToShm ( )
throw (evf::Exception
)
private

Definition at line 1053 of file FUEventProcessor.cc.

References alignCSCRings::e, evtProcessor_, edm::EventProcessor::getToken(), edm::Service< T >::isAvailable(), AlCaHLTBitMon_QueryRunRegistry::string, and cms::Exception::what().

1054 {
1055  std::string errmsg;
1056  try {
1059  edm::Service<FUShmDQMOutputService>()->setAttachToShm();
1060  }
1061  catch (cms::Exception& e) {
1062  errmsg = "Failed to set to attach DQM service to shared memory: " + (std::string)e.what();
1063  }
1064  if (!errmsg.empty()) XCEPT_RAISE(evf::Exception,errmsg);
1065 }
virtual char const * what() const
Definition: Exception.cc:141
bool isAvailable() const
Definition: Service.h:47
ServiceToken getToken()
bool FUEventProcessor::sigmon ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1909 of file FUEventProcessor.cc.

References gather_cfg::cout, crashesThisRun_, crashesToDump_, enabling(), fsm_, i, lastCrashTime_, min, MSQM_MESSAGE_TYPE_RLI, NUMERIC_MESSAGE_SIZE, rlimit_coresize_changed_, sigmon_sem_, signalMonitorActive_, evf::StateMachine::stateName(), stopping(), and subs_.

Referenced by startSignalMonitorWorkLoop().

1910 {
1911  while (1) {
1912  sem_wait(sigmon_sem_);
1913  std::cout << " received signal notification from slave!"<< std::endl;
1914 
1915  //check if shutdown time
1916  bool running = fsm_.stateName()->toString()=="Enabled";
1917  bool stopping = fsm_.stateName()->toString()=="stopping";
1918  bool enabling = fsm_.stateName()->toString()=="enabling";
1919  if (!running && !enabling) {
1920  signalMonitorActive_ = false;
1921  return false;
1922  }
1923 
1924  crashesThisRun_++;
1925  gettimeofday(&lastCrashTime_,0);
1926 
1927  //set core size limit to 0 in master and slaves
1928  if (crashesThisRun_>=crashesToDump_.value_ && (running || stopping) && !rlimit_coresize_changed_) {
1929 
1930  rlimit rlold;
1931  getrlimit(RLIMIT_CORE,&rlold);
1932  rlimit rlnew = rlold;
1933  rlnew.rlim_cur=0;
1934  setrlimit(RLIMIT_CORE,&rlnew);
1936  MsgBuf master_message_rli_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLI);
1937  //in case of frequent crashes, allow first slot to dump (until restart)
1938  unsigned int min=1;
1939  for (unsigned int i = min; i < subs_.size(); i++) {
1940  try {
1941  if (subs_[i].alive()) {
1942  subs_[i].post(master_message_rli_,false);
1943  }
1944  }
1945  catch (...) {}
1946  }
1947  std::ostringstream ostr;
1948  ostr << "Number of recent slave crashes reaches " << crashesThisRun_
1949  << ". Disabling core dumps for next 15 minutes in this FilterUnit";
1950  LOG4CPLUS_WARN(getApplicationLogger(),ostr.str());
1951  }
1952  }//end while loop
1953  signalMonitorActive_ = false;
1954  return false;
1955 }
int i
Definition: DBlmapReader.cc:9
xdata::UnsignedInteger32 crashesToDump_
#define min(a, b)
Definition: mlp_lapack.h:161
unsigned int crashesThisRun_
std::vector< SubProcess > subs_
evf::StateMachine fsm_
#define MSQM_MESSAGE_TYPE_RLI
Definition: queue_defs.h:24
#define NUMERIC_MESSAGE_SIZE
Definition: queue_defs.h:38
bool stopping(toolbox::task::WorkLoop *wl)
bool enabling(toolbox::task::WorkLoop *wl)
xdata::String * stateName()
Definition: StateMachine.h:69
tuple cout
Definition: gather_cfg.py:121
void FUEventProcessor::spotlightWebPage ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 941 of file FUEventProcessor.cc.

References configuration_, evtProcessor_, fsm_, recoMuon::in, myProcess_, nbSubProcesses_, dbtoconf::out, evf::StateMachine::stateName(), AlCaHLTBitMon_QueryRunRegistry::string, evf::FWEPWrapper::summaryWebPage(), and evf::FWEPWrapper::taskWebPage().

Referenced by FUEventProcessor(), and receivingAndMonitor().

943 {
944 
945  std::string urn = getApplicationDescriptor()->getURN();
946 
947  *out << "<!-- base href=\"/" << urn
948  << "\"> -->" << std::endl;
949  *out << "<html>" << std::endl;
950  *out << "<head>" << std::endl;
951  *out << "<link type=\"text/css\" rel=\"stylesheet\"";
952  *out << " href=\"/evf/html/styles.css\"/>" << std::endl;
953  *out << "<title>" << getApplicationDescriptor()->getClassName()
954  << getApplicationDescriptor()->getInstance()
955  << " MAIN</title>" << std::endl;
956  *out << "</head>" << std::endl;
957  *out << "<body>" << std::endl;
958  *out << "<table border=\"0\" width=\"100%\">" << std::endl;
959  *out << "<tr>" << std::endl;
960  *out << " <td align=\"left\">" << std::endl;
961  *out << " <img" << std::endl;
962  *out << " align=\"middle\"" << std::endl;
963  *out << " src=\"/evf/images/spoticon.jpg\"" << std::endl;
964  *out << " alt=\"main\"" << std::endl;
965  *out << " width=\"64\"" << std::endl;
966  *out << " height=\"64\"" << std::endl;
967  *out << " border=\"\"/>" << std::endl;
968  *out << " <b>" << std::endl;
969  *out << getApplicationDescriptor()->getClassName()
970  << getApplicationDescriptor()->getInstance() << std::endl;
971  *out << " " << fsm_.stateName()->toString() << std::endl;
972  *out << " </b>" << std::endl;
973  *out << " </td>" << std::endl;
974  *out << " <td width=\"32\">" << std::endl;
975  *out << " <a href=\"/urn:xdaq-application:lid=3\">" << std::endl;
976  *out << " <img" << std::endl;
977  *out << " align=\"middle\"" << std::endl;
978  *out << " src=\"/hyperdaq/images/HyperDAQ.jpg\"" << std::endl;
979  *out << " alt=\"HyperDAQ\"" << std::endl;
980  *out << " width=\"32\"" << std::endl;
981  *out << " height=\"32\"" << std::endl;
982  *out << " border=\"\"/>" << std::endl;
983  *out << " </a>" << std::endl;
984  *out << " </td>" << std::endl;
985  *out << " <td width=\"32\">" << std::endl;
986  *out << " </td>" << std::endl;
987  *out << " <td width=\"32\">" << std::endl;
988  *out << " <a href=\"/" << urn << "/\">" << std::endl;
989  *out << " <img" << std::endl;
990  *out << " align=\"middle\"" << std::endl;
991  *out << " src=\"/evf/images/epicon.jpg\"" << std::endl;
992  *out << " alt=\"main\"" << std::endl;
993  *out << " width=\"32\"" << std::endl;
994  *out << " height=\"32\"" << std::endl;
995  *out << " border=\"\"/>" << std::endl;
996  *out << " </a>" << std::endl;
997  *out << " </td>" << std::endl;
998  *out << "</tr>" << std::endl;
999  *out << "</table>" << std::endl;
1000 
1001  *out << "<hr/>" << std::endl;
1002 
1003  std::ostringstream ost;
1004  if(myProcess_)
1005  ost << "/SubWeb?process=" << getpid() << "&method=moduleWeb&";
1006  else
1007  ost << "/moduleWeb?";
1008  urn += ost.str();
1009  if(evtProcessor_ && (myProcess_ || nbSubProcesses_.value_==0))
1011  else if(evtProcessor_)
1013  else
1014  *out << "<td>HLT Unconfigured</td>" << std::endl;
1015  *out << "</table>" << std::endl;
1016 
1017  *out << "<br><textarea rows=" << 10 << " cols=80 scroll=yes>" << std::endl;
1018  *out << configuration_ << std::endl;
1019  *out << "</textarea><P>" << std::endl;
1020 
1021  *out << "</body>" << std::endl;
1022  *out << "</html>" << std::endl;
1023 
1024 
1025 }
void summaryWebPage(xgi::Input *, xgi::Output *, const std::string &)
Definition: FWEPWrapper.cc:815
void taskWebPage(xgi::Input *, xgi::Output *, const std::string &)
Definition: FWEPWrapper.cc:881
evf::StateMachine fsm_
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
void FUEventProcessor::startReceivingLoop ( )
private

Definition at line 1155 of file FUEventProcessor.cc.

References asReceiveMsgAndExecute_, alignCSCRings::e, edm::hlt::Exception, lumiQueryAPI::msg, receiving(), receiving_, AlCaHLTBitMon_QueryRunRegistry::string, and wlReceiving_.

Referenced by enableMPEPSlave(), and forkProcessesFromEDM().

1156 {
1157  try {
1158  wlReceiving_=
1159  toolbox::task::getWorkLoopFactory()->getWorkLoop("Receiving",
1160  "waiting");
1161  if (!wlReceiving_->isActive()) wlReceiving_->activate();
1162  asReceiveMsgAndExecute_ = toolbox::task::bind(this,&FUEventProcessor::receiving,
1163  "Receiving");
1165  receiving_ = true;
1166  }
1167  catch (xcept::Exception& e) {
1168  std::string msg = "Failed to start workloop 'Receiving'.";
1169  XCEPT_RETHROW(evf::Exception,msg,e);
1170  }
1171 }
toolbox::task::WorkLoop * wlReceiving_
toolbox::task::ActionSignature * asReceiveMsgAndExecute_
bool receiving(toolbox::task::WorkLoop *wl)
void FUEventProcessor::startReceivingMonitorLoop ( )
private

Definition at line 1172 of file FUEventProcessor.cc.

References asReceiveMsgAndRead_, alignCSCRings::e, edm::hlt::Exception, lumiQueryAPI::msg, receivingAndMonitor(), receivingM_, AlCaHLTBitMon_QueryRunRegistry::string, and wlReceivingMonitor_.

Referenced by enableMPEPSlave(), and forkProcessesFromEDM().

1173 {
1174  try {
1176  toolbox::task::getWorkLoopFactory()->getWorkLoop("ReceivingM",
1177  "waiting");
1178  if (!wlReceivingMonitor_->isActive())
1179  wlReceivingMonitor_->activate();
1181  toolbox::task::bind(this,&FUEventProcessor::receivingAndMonitor,
1182  "ReceivingM");
1184  receivingM_ = true;
1185  }
1186  catch (xcept::Exception& e) {
1187  std::string msg = "Failed to start workloop 'ReceivingM'.";
1188  XCEPT_RETHROW(evf::Exception,msg,e);
1189  }
1190 }
bool receivingAndMonitor(toolbox::task::WorkLoop *wl)
toolbox::task::ActionSignature * asReceiveMsgAndRead_
toolbox::task::WorkLoop * wlReceivingMonitor_
void FUEventProcessor::startScalersWorkLoop ( )
throw (evf::Exception
)
private

Definition at line 1562 of file FUEventProcessor.cc.

References asScalers_, alignCSCRings::e, edm::hlt::Exception, lumiQueryAPI::msg, scalers(), AlCaHLTBitMon_QueryRunRegistry::string, wlScalers_, and wlScalersActive_.

Referenced by enableMPEPSlave(), and forkProcessesFromEDM().

1563 {
1564  try {
1565  wlScalers_=
1566  toolbox::task::getWorkLoopFactory()->getWorkLoop("Scalers",
1567  "waiting");
1568  if (!wlScalers_->isActive()) wlScalers_->activate();
1569  asScalers_ = toolbox::task::bind(this,&FUEventProcessor::scalers,
1570  "Scalers");
1571 
1572  wlScalers_->submit(asScalers_);
1573  wlScalersActive_ = true;
1574  }
1575  catch (xcept::Exception& e) {
1576  std::string msg = "Failed to start workloop 'Scalers'.";
1577  XCEPT_RETHROW(evf::Exception,msg,e);
1578  }
1579 }
toolbox::task::WorkLoop * wlScalers_
bool scalers(toolbox::task::WorkLoop *wl)
toolbox::task::ActionSignature * asScalers_
void FUEventProcessor::startSignalMonitorWorkLoop ( )
throw (evf::Exception
)
private

Definition at line 1886 of file FUEventProcessor.cc.

References asSignalMonitor_, gather_cfg::cout, alignCSCRings::e, edm::hlt::Exception, lumiQueryAPI::msg, sigmon(), signalMonitorActive_, AlCaHLTBitMon_QueryRunRegistry::string, and wlSignalMonitor_.

Referenced by enabling().

1887 {
1888  //todo rewind/check semaphore
1889  //start workloop
1890  try {
1892  toolbox::task::getWorkLoopFactory()->getWorkLoop("SignalMonitor",
1893  "waiting");
1894 
1895  if (!wlSignalMonitor_->isActive()) wlSignalMonitor_->activate();
1896  asSignalMonitor_ = toolbox::task::bind(this,&FUEventProcessor::sigmon,
1897  "SignalMonitor");
1899  signalMonitorActive_ = true;
1900  }
1901  catch (xcept::Exception& e) {
1902  std::string msg = "Failed to start workloop 'SignalMonitor'. (3)";
1903  std::cout << e.what() << std::endl;
1904  XCEPT_RETHROW(evf::Exception,msg,e);
1905  }
1906 }
toolbox::task::ActionSignature * asSignalMonitor_
bool sigmon(toolbox::task::WorkLoop *wl)
toolbox::task::WorkLoop * wlSignalMonitor_
tuple cout
Definition: gather_cfg.py:121
void FUEventProcessor::startSummarizeWorkLoop ( )
throw (evf::Exception
)
private

Definition at line 1583 of file FUEventProcessor.cc.

References asSummarize_, alignCSCRings::e, edm::hlt::Exception, lumiQueryAPI::msg, AlCaHLTBitMon_QueryRunRegistry::string, summarize(), wlSummarize_, and wlSummarizeActive_.

Referenced by enabling().

1584 {
1585  try {
1586  wlSummarize_=
1587  toolbox::task::getWorkLoopFactory()->getWorkLoop("Summary",
1588  "waiting");
1589  if (!wlSummarize_->isActive()) wlSummarize_->activate();
1590 
1591  asSummarize_ = toolbox::task::bind(this,&FUEventProcessor::summarize,
1592  "Summary");
1593 
1594  wlSummarize_->submit(asSummarize_);
1595  wlSummarizeActive_ = true;
1596  }
1597  catch (xcept::Exception& e) {
1598  std::string msg = "Failed to start workloop 'Summarize'.";
1599  XCEPT_RETHROW(evf::Exception,msg,e);
1600  }
1601 }
bool summarize(toolbox::task::WorkLoop *wl)
toolbox::task::WorkLoop * wlSummarize_
toolbox::task::ActionSignature * asSummarize_
void FUEventProcessor::startSupervisorLoop ( )
private

Definition at line 1137 of file FUEventProcessor.cc.

References asSupervisor_, alignCSCRings::e, edm::hlt::Exception, lumiQueryAPI::msg, AlCaHLTBitMon_QueryRunRegistry::string, supervising_, supervisor(), and wlSupervising_.

Referenced by FUEventProcessor().

1138 {
1139  try {
1141  toolbox::task::getWorkLoopFactory()->getWorkLoop("Supervisor",
1142  "waiting");
1143  if (!wlSupervising_->isActive()) wlSupervising_->activate();
1144  asSupervisor_ = toolbox::task::bind(this,&FUEventProcessor::supervisor,
1145  "Supervisor");
1146  wlSupervising_->submit(asSupervisor_);
1147  supervising_ = true;
1148  }
1149  catch (xcept::Exception& e) {
1150  std::string msg = "Failed to start workloop 'Supervisor'.";
1151  XCEPT_RETHROW(evf::Exception,msg,e);
1152  }
1153 }
toolbox::task::ActionSignature * asSupervisor_
toolbox::task::WorkLoop * wlSupervising_
bool supervisor(toolbox::task::WorkLoop *wl)
bool FUEventProcessor::stopClassic ( )
private

Definition at line 2379 of file FUEventProcessor.cc.

References detachDqmFromShm(), alignCSCRings::e, edm::IEventProcessor::epSuccess, edm::IEventProcessor::epTimedOut, evtProcessor_, edm::hlt::Exception, evf::StateMachine::fireEvent(), evf::StateMachine::fireFailed(), fsm_, hasShMem_, localLog(), reasonForFailedState_, evf::FWEPWrapper::stop(), AlCaHLTBitMon_QueryRunRegistry::string, and cms::Exception::what().

Referenced by receiving(), and stopping().

2380 {
2381  bool failed=false;
2382  try {
2383  LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping :) ...");
2386  fsm_.fireEvent("StopDone",this);
2387  else
2388  {
2389  failed=true;
2390  // epMState_ = evtProcessor_->currentStateName();
2392  reasonForFailedState_ = "EventProcessor stop timed out";
2393  else
2394  reasonForFailedState_ = "EventProcessor did not receive STOP event";
2395  }
2396  }
2397  catch (xcept::Exception &e) {
2398  failed=true;
2399  reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
2400  }
2401  catch (edm::Exception &e) {
2402  failed=true;
2403  reasonForFailedState_ = "Stopping FAILED: " + (std::string)e.what();
2404  }
2405  catch (...) {
2406  failed=true;
2407  reasonForFailedState_= "Stopping FAILED: unknown exception";
2408  }
2409  try {
2410  if (hasShMem_) {
2411  detachDqmFromShm();
2412  if (failed)
2413  LOG4CPLUS_WARN(getApplicationLogger(),
2414  "In failed STOP - success detaching DQM from Shm. pid:" << getpid());
2415  }
2416  }
2417  catch (cms::Exception & e) {
2418  failed=true;
2419  reasonForFailedState_= "Stopping FAILED: " + (std::string)e.what();
2420  }
2421  catch (...) {
2422  failed=true;
2423  reasonForFailedState_= "DQM detach failed: Unknown exception";
2424  }
2425 
2426  if (failed) {
2427  LOG4CPLUS_FATAL(getApplicationLogger(),"STOP failed: "
2428  << reasonForFailedState_ << " (pid:" << getpid()<<")");
2431  }
2432 
2433  LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
2434  localLog("-I- Stop completed");
2435  return false;
2436 }
virtual char const * what() const
Definition: Exception.cc:141
xdata::Boolean hasShMem_
void fireFailed(const std::string &errorMsg, void *originator)
evf::StateMachine fsm_
std::string reasonForFailedState_
void fireEvent(const std::string &evtType, void *originator)
void localLog(std::string)
edm::EventProcessor::StatusCode stop()
Definition: FWEPWrapper.cc:503
bool FUEventProcessor::stopping ( toolbox::task::WorkLoop *  wl)

Definition at line 737 of file FUEventProcessor.cc.

References doEndRunInEDM(), forkInEDM_, hasShMem_, nbSubProcesses_, rlimit_coresize_default_, sigmon_sem_, evf::Vulture::stop(), stopClassic(), stopSlavesAndAcknowledge(), and vulture_.

Referenced by sigmon(), and supervisor().

738 {
739  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
740  if(nbSubProcesses_.value_!=0) {
742  if (forkInEDM_.value_) {
743  //only in new forking for now
744  sem_post(sigmon_sem_);
745  if (!doEndRunInEDM())
746  return false;
747  }
748  }
749  vulture_->stop();
750 
751  if (forkInEDM_.value_) {
752  //shared memory was already disconnected in master
753  bool tmpHasShMem_=hasShMem_;
754  hasShMem_=false;
755  stopClassic();
756  hasShMem_=tmpHasShMem_;
757  return false;
758  }
759  stopClassic();
760  return false;
761 }
int stop()
Definition: Vulture.cc:254
xdata::UnsignedInteger32 forkInEDM_
xdata::Boolean hasShMem_
xdata::UnsignedInteger32 nbSubProcesses_
void FUEventProcessor::stopSlavesAndAcknowledge ( )
private

Definition at line 2438 of file FUEventProcessor.cc.

References alignCSCRings::e, i, localLog(), MAX_MSG_SIZE, lumiQueryAPI::msg, MSQM_MESSAGE_TYPE_STOP, MSQS_MESSAGE_TYPE_STOP, nbSubProcesses_, reasonForFailedState_, stop_lock_, and subs_.

Referenced by halting(), and stopping().

2439 {
2442 
2443  std::vector<bool> processes_to_stop(nbSubProcesses_.value_,false);
2444  for(unsigned int i = 0; i < subs_.size(); i++)
2445  {
2446  pthread_mutex_lock(&stop_lock_);
2447  if(subs_[i].alive()>0){
2448  processes_to_stop[i] = true;
2449  subs_[i].post(msg,false);
2450  }
2451  pthread_mutex_unlock(&stop_lock_);
2452  }
2453  for(unsigned int i = 0; i < subs_.size(); i++)
2454  {
2455  pthread_mutex_lock(&stop_lock_);
2456  if(processes_to_stop[i]){
2457  try{
2458  subs_[i].rcv(msg1,false);
2459  }
2460  catch(evf::Exception &e){
2461  std::ostringstream ost;
2462  ost << "failed to get STOP - errno ->" << errno << " " << e.what();
2463  reasonForFailedState_ = ost.str();
2464  LOG4CPLUS_ERROR(getApplicationLogger(),reasonForFailedState_);
2465  // fsm_.fireFailed(reasonForFailedState_,this);
2467  pthread_mutex_unlock(&stop_lock_);
2468  continue;
2469  }
2470  }
2471  else {
2472  pthread_mutex_unlock(&stop_lock_);
2473  continue;
2474  }
2475  pthread_mutex_unlock(&stop_lock_);
2476  if(msg1->mtype==MSQS_MESSAGE_TYPE_STOP)
2477  while(subs_[i].alive()>0) ::usleep(10000);
2478  subs_[i].disconnect();
2479  }
2480  // subs_.clear();
2481 
2482 }
int i
Definition: DBlmapReader.cc:9
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
std::vector< SubProcess > subs_
#define MSQS_MESSAGE_TYPE_STOP
Definition: queue_defs.h:31
std::string reasonForFailedState_
void localLog(std::string)
xdata::UnsignedInteger32 nbSubProcesses_
#define MSQM_MESSAGE_TYPE_STOP
Definition: queue_defs.h:16
pthread_mutex_t stop_lock_
void FUEventProcessor::subWeb ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 853 of file FUEventProcessor.cc.

References anonymousPipe_, gather_cfg::cout, run_regression::done, asciidump::els, i, recoMuon::in, j, Association::map, MAX_MSG_SIZE, MAX_PIPE_BUFFER_SIZE, mod(), lumiQueryAPI::msg, MSGQ_MESSAGE_TYPE_RANGE, MSQM_MESSAGE_TYPE_WEB, MSQS_MESSAGE_TYPE_WEB, dbtoconf::out, evf::utils::pid, csv2json::pieces, PIPE_READ, SiPixelLorentzAngle_cfi::read, stor::utils::sleep(), AlCaHLTBitMon_QueryRunRegistry::string, subs_, and superSleepSec_.

Referenced by FUEventProcessor().

854 {
855  using namespace cgicc;
856  pid_t pid = 0;
857  std::ostringstream ost;
858  ost << "&";
859 
860  Cgicc cgi(in);
862  for(std::map<std::string, std::string, std::less<std::string> >::iterator mit =
863  mycgi->getEnvironment().begin();
864  mit != mycgi->getEnvironment().end(); mit++)
865  ost << mit->first << "%" << mit->second << ";";
866  std::vector<FormEntry> els = cgi.getElements() ;
867  std::vector<FormEntry> el1;
868  cgi.getElement("method",el1);
869  std::vector<FormEntry> el2;
870  cgi.getElement("process",el2);
871  if(el1.size()!=0) {
872  std::string meth = el1[0].getValue();
873  if(el2.size()!=0) {
874  unsigned int i = 0;
875  std::string mod = el2[0].getValue();
876  pid = atoi(mod.c_str()); // get the process id to be polled
877  for(; i < subs_.size(); i++)
878  if(subs_[i].pid()==pid) break;
879  if(i>=subs_.size()){ //process was not found, let the browser know
880  *out << "ERROR 404 : Process " << pid << " Not Found !" << std::endl;
881  return;
882  }
883  if(subs_[i].alive() != 1){
884  *out << "ERROR 405 : Process " << pid << " Not Alive !" << std::endl;
885  return;
886  }
887  MsgBuf msg1(meth.length()+ost.str().length()+1,MSQM_MESSAGE_TYPE_WEB);
888  strncpy(msg1->mtext,meth.c_str(),meth.length());
889  strncpy(msg1->mtext+meth.length(),ost.str().c_str(),ost.str().length());
890  subs_[i].post(msg1,true);
891  unsigned int keep_supersleep_original_value = superSleepSec_.value_;
892  superSleepSec_.value_=10*keep_supersleep_original_value;
894  bool done = false;
895  std::vector<char *>pieces;
896  while(!done){
897  unsigned long retval1 = subs_[i].rcvNonBlocking(msg,true);
898  if(retval1 == MSGQ_MESSAGE_TYPE_RANGE){
899  ::sleep(1);
900  continue;
901  }
902  unsigned int nbytes = atoi(msg->mtext);
903  if(nbytes < MAX_PIPE_BUFFER_SIZE) done = true; // this will break the while loop
904  char *buf= new char[nbytes];
905  ssize_t retval = read(anonymousPipe_[PIPE_READ],buf,nbytes);
906  if(retval<0){
907  std::cout << "Failed to read from pipe." << std::endl;
908  continue;
909  }
910  if(static_cast<unsigned int>(retval) != nbytes) std::cout
911  << "CAREFUL HERE, read less bytes than expected from pipe in subWeb" << std::endl;
912  pieces.push_back(buf);
913  }
914  superSleepSec_.value_=keep_supersleep_original_value;
915  for(unsigned int j = 0; j < pieces.size(); j++){
916  *out<<pieces[j]; // chain the buffers into the output strstream
917  delete[] pieces[j]; //make sure to release all buffers used for reading the pipe
918  }
919  }
920  }
921 }
int i
Definition: DBlmapReader.cc:9
#define MSGQ_MESSAGE_TYPE_RANGE
Definition: queue_defs.h:13
tuple pieces
Definition: csv2json.py:31
tuple els
Definition: asciidump.py:420
#define MAX_MSG_SIZE
Definition: queue_defs.h:10
void sleep(Duration_t)
Definition: Utils.h:163
dictionary map
Definition: Association.py:205
std::vector< SubProcess > subs_
#define MSQS_MESSAGE_TYPE_WEB
Definition: queue_defs.h:33
#define PIPE_READ
Definition: queue_defs.h:40
int j
Definition: DBlmapReader.cc:9
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 superSleepSec_
tuple cout
Definition: gather_cfg.py:121
#define MSQM_MESSAGE_TYPE_WEB
Definition: queue_defs.h:20
#define MAX_PIPE_BUFFER_SIZE
Definition: queue_defs.h:42
T mod(const T &a, const T &b)
Definition: ecalDccMap.h:4
bool FUEventProcessor::summarize ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1633 of file FUEventProcessor.cc.

References allProcStats_, gather_cfg::cout, cpustat_, alignCSCRings::e, evf::TriggerReportStatic::eventSummary, evtProcessor_, evf::FWEPWrapper::fireScalersUpdate(), fsm_, evf::FWEPWrapper::getLumiSectionReferenceIndex(), evf::FWEPWrapper::getPackedTriggerReportAsStruct(), i, iDieStatisticsGathering_, idleProcStats_, lastProcReport_, evf::TriggerReportStatic::lumiSection, master_message_trr_, MSQS_MESSAGE_TYPE_TRR, evf::TriggerReportStatic::nbExpected, evf::TriggerReportStatic::nbReporting, nbSubProcesses_, nbSubProcessesReporting_, evf::utils::procCpuStat(), ratestat_, evf::CPUStat::reset(), evf::FWEPWrapper::resetPackedTriggerReport(), run_regression::ret, evf::RateStat::sendStat(), evf::CPUStat::sendStat(), evf::CPUStat::setCPUStat(), evf::CPUStat::setElapsed(), evf::CPUStat::setNproc(), stor::utils::sleep(), evf::StateMachine::stateName(), subs_, evf::FWEPWrapper::sumAndPackTriggerReport(), edm::EventSummary::totalEvents, evf::FWEPWrapper::updateRollingReport(), evf::FWEPWrapper::withdrawLumiSectionIncrement(), and wlScalersActive_.

Referenced by startSummarizeWorkLoop().

1634 {
1636  bool atLeastOneProcessUpdatedSuccessfully = false;
1637  int msgCount = 0;
1638  for (unsigned int i = 0; i < subs_.size(); i++)
1639  {
1640  if(subs_[i].alive()>0)
1641  {
1642  int ret = 0;
1643  if(subs_[i].check_postponed_trigger_update(master_message_trr_,
1645  {
1646  ret = MSQS_MESSAGE_TYPE_TRR;
1647  std::cout << "using postponed report from slot " << i << " for ls " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
1648  }
1649  else{
1650  bool insync = false;
1651  bool exception_caught = false;
1652  while(!insync){
1653  try{
1654  ret = subs_[i].rcv(master_message_trr_,false);
1655  }
1656  catch(evf::Exception &e)
1657  {
1658  std::cout << "exception in msgrcv on " << i
1659  << " " << subs_[i].alive() << " " << strerror(errno) << std::endl;
1660  exception_caught = true;
1661  break;
1662  //do nothing special
1663  }
1664  if(ret==MSQS_MESSAGE_TYPE_TRR) {
1667  insync = true;
1668  }
1669  }
1670  }
1671  if(exception_caught) continue;
1672  }
1673  msgCount++;
1674  if(ret==MSQS_MESSAGE_TYPE_TRR) {
1677  std::cout << "postpone handling of msg from slot " << i << " with Ls " << trp->lumiSection
1678  << " should be " << evtProcessor_.getLumiSectionReferenceIndex() << std::endl;
1679  subs_[i].add_postponed_trigger_update(master_message_trr_);
1680  }else{
1681  atLeastOneProcessUpdatedSuccessfully = true;
1683  }
1684  }
1685  else std::cout << "msgrcv returned error " << errno << std::endl;
1686  }
1687  }
1688  if(atLeastOneProcessUpdatedSuccessfully){
1689  nbSubProcessesReporting_.value_ = msgCount;
1694  }
1695  else{
1696  LOG4CPLUS_WARN(getApplicationLogger(),"Summarize loop: no process updated successfully - sleep 10 seconds before trying again");
1697  if(msgCount==0) evtProcessor_.withdrawLumiSectionIncrement();
1698  nbSubProcessesReporting_.value_ = 0;
1699  ::sleep(10);
1700  }
1701  if(fsm_.stateName()->toString()!="Enabled"){
1702  wlScalersActive_ = false;
1703  return false;
1704  }
1705  // cpustat_->printStat();
1706  if(iDieStatisticsGathering_.value_){
1707  try{
1708  unsigned long long idleTmp=idleProcStats_;
1709  unsigned long long allPSTmp=allProcStats_;
1711 
1713  timeval oldtime=lastProcReport_;
1714  gettimeofday(&lastProcReport_,0);
1715 
1716  if (allPSTmp!=0 && idleTmp!=0 && allProcStats_!=allPSTmp) {
1717  cpustat_->setCPUStat(1000 - ((idleProcStats_-idleTmp)*1000)/(allProcStats_-allPSTmp));
1718  int deltaTms=1000 * (lastProcReport_.tv_sec-oldtime.tv_sec)
1719  + (lastProcReport_.tv_usec-oldtime.tv_usec)/1000;
1720  cpustat_->setElapsed(deltaTms);
1721  }
1722  else {
1723  cpustat_->setCPUStat(0);
1724  cpustat_->setElapsed(0);
1725  }
1726 
1730  ratestat_->sendStat((unsigned char*)trsp,
1731  sizeof(TriggerReportStatic),
1733  }catch(evf::Exception &e){
1734  LOG4CPLUS_INFO(getApplicationLogger(),"coud not send statistics"
1735  << e.what());
1736  }
1737  }
1738  cpustat_->reset();
1739  return true;
1740 }
int i
Definition: DBlmapReader.cc:9
void updateRollingReport()
#define MSQS_MESSAGE_TYPE_TRR
Definition: queue_defs.h:34
void setCPUStat(int busyPer1k)
Definition: CPUStat.h:25
xdata::UnsignedInteger32 nbSubProcessesReporting_
void sleep(Duration_t)
Definition: Utils.h:163
void setElapsed(int mseconds)
Definition: CPUStat.h:28
std::vector< SubProcess > subs_
bool fireScalersUpdate()
Definition: FWEPWrapper.cc:777
evf::StateMachine fsm_
unsigned long long idleProcStats_
xdata::Boolean iDieStatisticsGathering_
void setNproc(int nproc)
Definition: CPUStat.h:22
void resetPackedTriggerReport()
Definition: FWEPWrapper.h:111
void sendStat(unsigned int)
Definition: CPUStat.cc:33
void reset()
Definition: CPUStat.h:31
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
unsigned int getLumiSectionReferenceIndex()
Definition: FWEPWrapper.h:139
unsigned long long allProcStats_
void procCpuStat(unsigned long long &idleJiffies, unsigned long long &allJiffies)
Definition: procUtils.cc:157
tuple cout
Definition: gather_cfg.py:121
edm::EventSummary eventSummary
void withdrawLumiSectionIncrement()
Definition: FWEPWrapper.h:138
void sumAndPackTriggerReport(MsgBuf &buf)
TriggerReportStatic * getPackedTriggerReportAsStruct()
Definition: FWEPWrapper.h:115
void sendStat(const unsigned char *, size_t, unsigned int)
Definition: RateStat.cc:19
bool FUEventProcessor::supervisor ( toolbox::task::WorkLoop *  wl)
private

Definition at line 1253 of file FUEventProcessor.cc.

References evf::CPUStat::addEntry(), evf::FWEPWrapper::adjustLsIndexForRestart(), applicationInfoSpace_, autoRestartSlaves_, gather_cfg::cout, cpustat_, crashesThisRun_, delta, evf::StateMachine::disableRcmsStateNotification(), evf::prg::dqm, alignCSCRings::e, edm_init_done_, enableMPEPSlave(), evtProcessor_, cppFunctionSkipper::exception, edm::hlt::Exception, spr::find(), evf::StateMachine::fireEvent(), forkInEDM_, fsm_, i, lastCrashTime_, localLog(), log_, evf::prg::ls, python.rootplot.utilities::ls(), master_message_prg_, master_message_prr_, evf::FWEPWrapper::moduleNameFromIndex(), monitorInfoSpace_, evf::prg::Ms, evf::prg::ms, MSQM_MESSAGE_TYPE_FSTOP, MSQM_MESSAGE_TYPE_RLR, myProcess_, names_, nbAccepted, nbdead_, nblive_, nbProcessed, nbSubProcesses_, nbTotalDQM_, evf::FWEPWrapper::notstarted_state_code(), NUMERIC_MESSAGE_SIZE, AlCaHLTBitMon_ParallelJobs::p, pickup_lock_, evf::prg::ps, evf::FWEPWrapper::resetPackedTriggerReport(), restartForkInEDM(), rlimit_coresize_changed_, rlimit_coresize_default_, findQualityFiles::rr, scalersUpdates_, edm::event_processor::sError, edm::event_processor::sInit, edm::event_processor::sInvalid, slaveRestartDelaySecs_, stor::utils::sleep(), spMStates_, spmStates_, edm::event_processor::sStopping, evf::StateMachine::stateName(), evf::FWEPWrapper::stateNameFromIndex(), stop_lock_, stopping(), subs_, superSleepSec_, and evf::prg::trp.

Referenced by startSupervisorLoop().

1254 {
1255  pthread_mutex_lock(&stop_lock_);
1256  if(subs_.size()!=nbSubProcesses_.value_)
1257  {
1258  pthread_mutex_lock(&pickup_lock_);
1259  if(subs_.size()!=nbSubProcesses_.value_) {
1260  subs_.resize(nbSubProcesses_.value_);
1261  spMStates_.resize(nbSubProcesses_.value_);
1262  spmStates_.resize(nbSubProcesses_.value_);
1263  for(unsigned int i = 0; i < spMStates_.size(); i++)
1264  {
1266  spmStates_[i] = 0;
1267  }
1268  }
1269  pthread_mutex_unlock(&pickup_lock_);
1270  }
1271  bool running = fsm_.stateName()->toString()=="Enabled";
1272  bool stopping = fsm_.stateName()->toString()=="stopping";
1273  for(unsigned int i = 0; i < subs_.size(); i++)
1274  {
1275  if(subs_[i].alive()==-1000) continue;
1276  int sl;
1277  pid_t sub_pid = subs_[i].pid();
1278  pid_t killedOrNot = waitpid(sub_pid,&sl,WNOHANG);
1279 
1280  if(killedOrNot && killedOrNot==sub_pid) {
1281  pthread_mutex_lock(&pickup_lock_);
1282  //check if out of range or recreated (enable can clear vector)
1283  if (i<subs_.size() && subs_[i].alive()!=-1000) {
1284  subs_[i].setStatus((WIFEXITED(sl) != 0 ? 0 : -1));
1285  std::ostringstream ost;
1286  if(subs_[i].alive()==0) ost << " process exited with status " << WEXITSTATUS(sl);
1287  else if(WIFSIGNALED(sl)!=0) {
1288  ost << " process terminated with signal " << WTERMSIG(sl);
1289  }
1290  else ost << " process stopped ";
1291  //report unexpected slave exit in stop
1292  //if (stopping && (WEXITSTATUS(sl)!=0 || WIFSIGNALED(sl)!=0)) {
1293  // LOG4CPLUS_WARN(getApplicationLogger(),ost.str() << ", slave pid:"<<getpid());
1294  //}
1295  subs_[i].countdown()=slaveRestartDelaySecs_.value_;
1296  subs_[i].setReasonForFailed(ost.str());
1298  spmStates_[i] = 0;
1299  std::ostringstream ost1;
1300  ost1 << "-E- Slave " << subs_[i].pid() << ost.str();
1301  localLog(ost1.str());
1302  if(!autoRestartSlaves_.value_) subs_[i].disconnect();
1303  }
1304  pthread_mutex_unlock(&pickup_lock_);
1305  }
1306  }
1307  pthread_mutex_unlock(&stop_lock_);
1308  if(stopping) return true; // if in stopping we are done
1309 
1310  // check if we need to reset core dumps (15 min after last one)
1311  if (running && rlimit_coresize_changed_) {
1312  timeval newtv;
1313  gettimeofday(&newtv,0);
1314  int delta = newtv.tv_sec-lastCrashTime_.tv_sec;
1315  if (delta>60*15) {
1316  std::ostringstream ostr;
1317  ostr << " No more slave EP crashes on this machine in last 15 min. resetting core size limits";
1318  std::cout << ostr.str() << std::endl;
1319  LOG4CPLUS_INFO(getApplicationLogger(),ostr.str());
1320  setrlimit(RLIMIT_CORE,&rlimit_coresize_default_);
1321  MsgBuf master_message_rlr_(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_RLR);
1322  for (unsigned int i = 0; i < subs_.size(); i++) {
1323  try {
1324  if (subs_[i].alive())
1325  subs_[i].post(master_message_rlr_,false);
1326  }
1327  catch (...) {}
1328  }
1330  crashesThisRun_=0;
1331  }
1332  }
1333 
1334  if(running && edm_init_done_)
1335  {
1336  // if enabled, this loop will periodically check if dead slaves countdown has expired and restart them
1337  // this is only active while running, hence, the stop lock is acquired and only released at end of loop
1338  if(autoRestartSlaves_.value_){
1339  pthread_mutex_lock(&stop_lock_); //lockout slave killing at stop while you check for restarts
1340  for(unsigned int i = 0; i < subs_.size(); i++)
1341  {
1342  if(subs_[i].alive() != 1){
1343  if(subs_[i].countdown() == 0)
1344  {
1345  if(subs_[i].restartCount()>2){
1346  LOG4CPLUS_WARN(getApplicationLogger()," Not restarting subprocess in slot " << i
1347  << " - maximum restart count reached");
1348  std::ostringstream ost1;
1349  ost1 << "-W- Dead Process in slot " << i << " reached maximum restart count";
1350  localLog(ost1.str());
1351  subs_[i].countdown()--;
1352  XCEPT_DECLARE(evf::Exception,
1353  sentinelException, ost1.str());
1354  notifyQualified("error",sentinelException);
1355  subs_[i].disconnect();
1356  continue;
1357  }
1358  subs_[i].restartCount()++;
1359  if (forkInEDM_.value_) {
1360  restartForkInEDM(i);
1361  }
1362  else {
1363  pid_t rr = subs_[i].forkNew();
1364  if(rr==0)
1365  {
1366  myProcess_=&subs_[i];
1367  scalersUpdates_ = 0;
1368  int retval = pthread_mutex_destroy(&stop_lock_);
1369  if(retval != 0) perror("error");
1370  retval = pthread_mutex_init(&stop_lock_,0);
1371  if(retval != 0) perror("error");
1373  fsm_.fireEvent("Stop",this); // need to set state in fsm first to allow stopDone transition
1374  fsm_.fireEvent("StopDone",this); // need to set state in fsm first to allow stopDone transition
1375  fsm_.fireEvent("Enable",this); // need to set state in fsm first to allow stopDone transition
1376  try{
1377  xdata::Serializable *lsid = applicationInfoSpace_->find("lumiSectionIndex");
1378  if(lsid) {
1379  ((xdata::UnsignedInteger32*)(lsid))->value_--; // need to reset to value before end of ls in which process died
1380  }
1381  }
1382  catch(...){
1383  std::cout << "trouble with lsindex during restart" << std::endl;
1384  }
1385  try{
1386  xdata::Serializable *lstb = applicationInfoSpace_->find("lsToBeRecovered");
1387  if(lstb) {
1388  ((xdata::Boolean*)(lstb))->value_ = false; // do not issue eol/bol for all Ls when restarting
1389  }
1390  }
1391  catch(...){
1392  std::cout << "trouble with resetting flag for eol recovery " << std::endl;
1393  }
1394 
1397  enableMPEPSlave();
1398  return false; // exit the supervisor loop immediately in the child !!!
1399  }
1400  else
1401  {
1402  std::ostringstream ost1;
1403  ost1 << "-I- New Process " << rr << " forked for slot " << i;
1404  localLog(ost1.str());
1405  }
1406  }
1407  }
1408  if(subs_[i].countdown()>=0) subs_[i].countdown()--;
1409  }
1410  }
1411  pthread_mutex_unlock(&stop_lock_);
1412  } // finished handling replacement of dead slaves once they've been reaped
1413  }
1414  xdata::Serializable *lsid = 0;
1415  xdata::Serializable *psid = 0;
1416  xdata::Serializable *dqmp = 0;
1417  xdata::UnsignedInteger32 *dqm = 0;
1418 
1419 
1420 
1421  if(running && edm_init_done_){
1422  try{
1423  lsid = applicationInfoSpace_->find("lumiSectionIndex");
1424  psid = applicationInfoSpace_->find("prescaleSetIndex");
1425  nbProcessed = monitorInfoSpace_->find("nbProcessed");
1426  nbAccepted = monitorInfoSpace_->find("nbAccepted");
1427  dqmp = applicationInfoSpace_-> find("nbDqmUpdates");
1428  }
1430  LOG4CPLUS_INFO(getApplicationLogger(),"could not retrieve some data - " << e.what());
1431  }
1432 
1433  try{
1434  if(nbProcessed !=0 && nbAccepted !=0)
1435  {
1436  xdata::UnsignedInteger32*nbp = ((xdata::UnsignedInteger32*)nbProcessed);
1437  xdata::UnsignedInteger32*nba = ((xdata::UnsignedInteger32*)nbAccepted);
1438  xdata::UnsignedInteger32*ls = ((xdata::UnsignedInteger32*)lsid);
1439  xdata::UnsignedInteger32*ps = ((xdata::UnsignedInteger32*)psid);
1440  if(dqmp!=0)
1441  dqm = (xdata::UnsignedInteger32*)dqmp;
1442  if(dqm) dqm->value_ = 0;
1443  nbTotalDQM_ = 0;
1444  nbp->value_ = 0;
1445  nba->value_ = 0;
1446  nblive_ = 0;
1447  nbdead_ = 0;
1448  scalersUpdates_ = 0;
1449 
1450  for(unsigned int i = 0; i < subs_.size(); i++)
1451  {
1452  if(subs_[i].alive()>0)
1453  {
1454  nblive_++;
1455  try{
1456  subs_[i].post(master_message_prg_,true);
1457 
1458  unsigned long retval = subs_[i].rcvNonBlocking(master_message_prr_,true);
1459  if(retval == (unsigned long) master_message_prr_->mtype){
1460  prg* p = (struct prg*)(master_message_prr_->mtext);
1461  subs_[i].setParams(p);
1462  spMStates_[i] = p->Ms;
1463  spmStates_[i] = p->ms;
1464  cpustat_->addEntry(p->ms);
1465  if(!subs_[i].inInconsistentState() &&
1469  {
1470  std::ostringstream ost;
1471  ost << "edm::eventprocessor slot " << i << " process id "
1472  << subs_[i].pid() << " not in Running state : Mstate="
1473  << evtProcessor_.stateNameFromIndex(p->Ms) << " mstate="
1475  << " - Look into possible error messages from HLT process";
1476  LOG4CPLUS_WARN(getApplicationLogger(),ost.str());
1477  }
1478  nbp->value_ += subs_[i].params().nbp;
1479  nba->value_ += subs_[i].params().nba;
1480  if(dqm)dqm->value_ += p->dqm;
1481  nbTotalDQM_ += p->dqm;
1482  scalersUpdates_ += p->trp;
1483  if(p->ls > ls->value_) ls->value_ = p->ls;
1484  if(p->ps != ps->value_) ps->value_ = p->ps;
1485  }
1486  else{
1487  nbp->value_ += subs_[i].get_save_nbp();
1488  nba->value_ += subs_[i].get_save_nba();
1489  }
1490  }
1491  catch(evf::Exception &e){
1492  LOG4CPLUS_INFO(getApplicationLogger(),
1493  "could not send/receive msg on slot "
1494  << i << " - " << e.what());
1495  }
1496 
1497  }
1498  else
1499  {
1500  nbp->value_ += subs_[i].get_save_nbp();
1501  nba->value_ += subs_[i].get_save_nba();
1502  nbdead_++;
1503  }
1504  }
1505  if(nbp->value_>64){//have some slaves already processed more than one event ? (eventually make this == number of raw cells)
1506  for(unsigned int i = 0; i < subs_.size(); i++)
1507  {
1508  if(subs_[i].params().nbp == 0){ // a slave has processed 0 events
1509  // check that the process is not stuck
1510  if(subs_[i].alive()>0 && subs_[i].params().ms == 0) // the process is seen alive but in us=Invalid(0)
1511  {
1512  subs_[i].found_invalid();//increase the "found_invalid" counter
1513  if(subs_[i].nfound_invalid() > 60){ //wait x monitor cycles (~1 min a good time ?) before doing something about a stuck slave
1514  MsgBuf msg3(NUMERIC_MESSAGE_SIZE,MSQM_MESSAGE_TYPE_FSTOP); // send a force-stop signal
1515  subs_[i].post(msg3,false);
1516  std::ostringstream ost1;
1517  ost1 << "-W- Process in slot " << i << " Never reached the running state - forcestopping it";
1518  localLog(ost1.str());
1519  LOG4CPLUS_ERROR(getApplicationLogger(),ost1.str());
1520  XCEPT_DECLARE(evf::Exception,
1521  sentinelException, ost1.str());
1522  notifyQualified("error",sentinelException);
1523 
1524  }
1525  }
1526  }
1527  }
1528  }
1529  }
1530  }
1531  catch(std::exception &e){
1532  LOG4CPLUS_INFO(getApplicationLogger(),"std exception - " << e.what());
1533  }
1534  catch(...){
1535  LOG4CPLUS_INFO(getApplicationLogger(),"unknown exception ");
1536  }
1537  }
1538  else{
1539  for(unsigned int i = 0; i < subs_.size(); i++)
1540  {
1541  if(subs_[i].alive()==-1000)
1542  {
1544  spmStates_[i] = 0;
1545  }
1546  }
1547  }
1548  try{
1549  monitorInfoSpace_->lock();
1550  monitorInfoSpace_->fireItemGroupChanged(names_,0);
1551  monitorInfoSpace_->unlock();
1552  }
1553  catch(xdata::exception::Exception &e)
1554  {
1555  LOG4CPLUS_ERROR(log_, "Exception from fireItemGroupChanged: " << e.what());
1556  // localLog(e.what());
1557  }
1558  ::sleep(superSleepSec_.value_);
1559  return true;
1560 }
dbl * delta
Definition: mlp_gen.cc:36
unsigned int ps
Definition: queue_defs.h:60
xdata::Vector< xdata::Integer > spMStates_
int i
Definition: DBlmapReader.cc:9
xdata::InfoSpace * monitorInfoSpace_
xdata::UnsignedInteger32 slaveRestartDelaySecs_
bool restartForkInEDM(unsigned int slotId)
xdata::UnsignedInteger32 forkInEDM_
#define MSQM_MESSAGE_TYPE_FSTOP
Definition: queue_defs.h:17
unsigned int ls
Definition: queue_defs.h:58
xdata::Boolean autoRestartSlaves_
std::string const & moduleNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:122
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
unsigned int crashesThisRun_
void sleep(Duration_t)
Definition: Utils.h:163
#define MSQM_MESSAGE_TYPE_RLR
Definition: queue_defs.h:25
std::vector< SubProcess > subs_
xdata::Serializable * nbProcessed
void adjustLsIndexForRestart()
Definition: FWEPWrapper.h:112
evf::StateMachine fsm_
unsigned int Ms
Definition: queue_defs.h:63
void resetPackedTriggerReport()
Definition: FWEPWrapper.h:111
std::list< std::string > names_
void addEntry(int sta)
Definition: CPUStat.h:17
#define NUMERIC_MESSAGE_SIZE
Definition: queue_defs.h:38
bool stopping(toolbox::task::WorkLoop *wl)
void fireEvent(const std::string &evtType, void *originator)
xdata::Serializable * nbAccepted
void disableRcmsStateNotification()
Definition: StateMachine.h:64
void localLog(std::string)
int notstarted_state_code() const
Definition: FWEPWrapper.h:132
xdata::InfoSpace * applicationInfoSpace_
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
unsigned int scalersUpdates_
xdata::UnsignedInteger32 superSleepSec_
std::string const & stateNameFromIndex(unsigned int i) const
Definition: FWEPWrapper.h:127
pthread_mutex_t pickup_lock_
unsigned int trp
Definition: queue_defs.h:66
xdata::Vector< xdata::Integer > spmStates_
tuple cout
Definition: gather_cfg.py:121
pthread_mutex_t stop_lock_
unsigned int dqm
Definition: queue_defs.h:65
unsigned int ms
Definition: queue_defs.h:64
void FUEventProcessor::updater ( xgi::Input in,
xgi::Output out 
)
throw (xgi::exception::Exception
)

Definition at line 2595 of file FUEventProcessor.cc.

References evf::lsTriplet::acc, evf::utils::cDiv(), configString_, cpustat_, evtProcessor_, fsm_, evf::CPUStat::getChart(), evf::Vulture::hasStarted(), i, iDieUrl_, evf::FWEPWrapper::lastLumi(), logRing_, logRingIndex_, logRingSize_, logWrap_, evf::lsTriplet::ls, evf::utils::mDiv(), myProcess_, nbAccepted, nbProcessed, nbSubProcesses_, nbSubProcessesReporting_, dbtoconf::out, evf::lsTriplet::proc, runNumber_, squidPresent_, evf::StateMachine::stateName(), supervising_, updaterStatic_, evf::utils::uptime(), vp_, vulture_, evf::FWEPWrapper::wlMonitoring(), wlScalersActive_, and wlSummarizeActive_.

Referenced by FUEventProcessor().

2596 {
2597  using namespace utils;
2598 
2599  *out << updaterStatic_;
2600  mDiv(out,"loads");
2601  uptime(out);
2602  cDiv(out);
2603  mDiv(out,"st",fsm_.stateName()->toString());
2604  mDiv(out,"ru",runNumber_.toString());
2605  mDiv(out,"nsl",nbSubProcesses_.value_);
2606  mDiv(out,"nsr",nbSubProcessesReporting_.value_);
2607  mDiv(out,"cl");
2608  *out << getApplicationDescriptor()->getClassName()
2609  << (nbSubProcesses_.value_ > 0 ? "MP " : " ");
2610  cDiv(out);
2611  mDiv(out,"in",getApplicationDescriptor()->getInstance());
2612  if(fsm_.stateName()->toString() != "Halted" && fsm_.stateName()->toString() != "halting"){
2613  mDiv(out,"hlt");
2614  *out << "<a href=\"" << configString_.toString() << "\">HLT Config</a>";
2615  cDiv(out);
2616  *out << std::endl;
2617  }
2618  else
2619  mDiv(out,"hlt","Not yet...");
2620 
2621  mDiv(out,"sq",squidPresent_.toString());
2622  mDiv(out,"vwl",(supervising_ ? "Active" : "Not Initialized"));
2624  if(nbProcessed != 0 && nbAccepted != 0)
2625  {
2626  mDiv(out,"tt",((xdata::UnsignedInteger32*)nbProcessed)->value_);
2627  mDiv(out,"ac",((xdata::UnsignedInteger32*)nbAccepted)->value_);
2628  }
2629  else
2630  {
2631  mDiv(out,"tt",0);
2632  mDiv(out,"ac",0);
2633  }
2634  if(!myProcess_)
2635  mDiv(out,"swl",(wlSummarizeActive_ ? "Active" : "Inactive"));
2636  else
2637  mDiv(out,"swl",(wlScalersActive_ ? "Active" : "Inactive"));
2638 
2639  mDiv(out,"idi",iDieUrl_.value_);
2640  if(vp_!=0){
2641  mDiv(out,"vpi",(unsigned int) vp_);
2642  if(vulture_->hasStarted()>=0)
2643  mDiv(out,"vul","Prowling");
2644  else
2645  mDiv(out,"vul","Dead");
2646  }
2647  else{
2648  mDiv(out,"vul",(vulture_==0 ? "Nope" : "Hatching"));
2649  }
2650  if(evtProcessor_){
2651  mDiv(out,"ll");
2653  << "," << evtProcessor_.lastLumi().proc << "," << evtProcessor_.lastLumi().acc;
2654  cDiv(out);
2655  }
2656  mDiv(out,"lg");
2657  for(unsigned int i = logRingIndex_; i<logRingSize_; i++)
2658  *out << logRing_[i] << std::endl;
2659  if(logWrap_)
2660  for(unsigned int i = 0; i<logRingIndex_; i++)
2661  *out << logRing_[i] << std::endl;
2662  cDiv(out);
2663  mDiv(out,"cha");
2664  if(cpustat_) *out << cpustat_->getChart();
2665  cDiv(out);
2666 }
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > logRing_
xdata::UnsignedInteger32 runNumber_
unsigned int acc
Definition: FWEPWrapper.h:45
xdata::UnsignedInteger32 nbSubProcessesReporting_
void cDiv(std::ostringstream *out)
Definition: procUtils.cc:357
lsTriplet & lastLumi()
Definition: FWEPWrapper.h:133
xdata::Serializable * nbProcessed
std::string wlMonitoring()
Definition: FWEPWrapper.h:98
evf::StateMachine fsm_
unsigned int proc
Definition: FWEPWrapper.h:44
xdata::Boolean squidPresent_
xdata::Serializable * nbAccepted
tuple out
Definition: dbtoconf.py:99
xdata::UnsignedInteger32 nbSubProcesses_
xdata::String * stateName()
Definition: StateMachine.h:69
std::string & getChart()
Definition: CPUStat.h:43
xdata::String configString_
unsigned int ls
Definition: FWEPWrapper.h:43
int hasStarted()
Definition: Vulture.cc:215
void mDiv(std::ostringstream *out, std::string name)
Definition: procUtils.cc:354
void uptime(std::ostringstream *out)
Definition: procUtils.cc:309
static const unsigned int logRingSize_
evf::FUEventProcessor::XDAQ_INSTANTIATOR ( )

Member Data Documentation

unsigned long long evf::FUEventProcessor::allProcStats_
private

Definition at line 302 of file FUEventProcessor.h.

Referenced by enabling(), and summarize().

int evf::FUEventProcessor::anonymousPipe_[2]
private

Definition at line 270 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), receivingAndMonitor(), and subWeb().

xdata::InfoSpace* evf::FUEventProcessor::applicationInfoSpace_
private
toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndExecute_
private

Definition at line 232 of file FUEventProcessor.h.

Referenced by startReceivingLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asReceiveMsgAndRead_
private

Definition at line 235 of file FUEventProcessor.h.

Referenced by startReceivingMonitorLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asScalers_
private

Definition at line 262 of file FUEventProcessor.h.

Referenced by startScalersWorkLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asSignalMonitor_
private

Definition at line 242 of file FUEventProcessor.h.

Referenced by startSignalMonitorWorkLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asSummarize_
private

Definition at line 268 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop().

toolbox::task::ActionSignature* evf::FUEventProcessor::asSupervisor_
private

Definition at line 239 of file FUEventProcessor.h.

Referenced by startSupervisorLoop().

xdata::Boolean evf::FUEventProcessor::autoRestartSlaves_
private

Definition at line 188 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), microState(), and supervisor().

xdata::String evf::FUEventProcessor::class_
private

Definition at line 179 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::String evf::FUEventProcessor::configString_
private

Definition at line 183 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and updater().

std::string evf::FUEventProcessor::configuration_
private

Definition at line 184 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and spotlightWebPage().

CPUStat* evf::FUEventProcessor::cpustat_
private

Definition at line 278 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), summarize(), supervisor(), and updater().

unsigned int evf::FUEventProcessor::crashesThisRun_
private

Definition at line 294 of file FUEventProcessor.h.

Referenced by enabling(), sigmon(), and supervisor().

xdata::UnsignedInteger32 evf::FUEventProcessor::crashesToDump_
private

Definition at line 297 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and sigmon().

Css evf::FUEventProcessor::css_
private

Definition at line 210 of file FUEventProcessor.h.

Referenced by css().

xdata::Boolean evf::FUEventProcessor::datasetCounting_
private

Definition at line 306 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and FUEventProcessor().

bool evf::FUEventProcessor::edm_init_done_
private

Definition at line 292 of file FUEventProcessor.h.

Referenced by doEndRunInEDM(), enabling(), forkProcessesFromEDM(), and supervisor().

xdata::Boolean evf::FUEventProcessor::epInitialized_
private

Definition at line 182 of file FUEventProcessor.h.

Referenced by actionPerformed(), configuring(), enabling(), and FUEventProcessor().

FWEPWrapper evf::FUEventProcessor::evtProcessor_
private
xdata::Boolean evf::FUEventProcessor::exitOnError_
private

Definition at line 207 of file FUEventProcessor.h.

Referenced by receivingAndMonitor().

xdata::UnsignedInteger32 evf::FUEventProcessor::forkInEDM_
private

Definition at line 223 of file FUEventProcessor.h.

Referenced by enabling(), FUEventProcessor(), halting(), stopping(), and supervisor().

moduleweb::ForkInfoObj* evf::FUEventProcessor::forkInfoObj_
private
pthread_mutex_t evf::FUEventProcessor::forkObjLock_
private

Definition at line 290 of file FUEventProcessor.h.

Referenced by enableForkInEDM(), and FUEventProcessor().

evf::StateMachine evf::FUEventProcessor::fsm_
private
xdata::Boolean evf::FUEventProcessor::hasModuleWebRegistry_
private

Definition at line 193 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().

xdata::Boolean evf::FUEventProcessor::hasPrescaleService_
private

Definition at line 192 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and FUEventProcessor().

xdata::Boolean evf::FUEventProcessor::hasServiceWebRegistry_
private

Definition at line 194 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and makeStaticInfo().

xdata::Boolean evf::FUEventProcessor::hasShMem_
private
xdata::Boolean evf::FUEventProcessor::iDieStatisticsGathering_
private

Definition at line 196 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and summarize().

xdata::String evf::FUEventProcessor::iDieUrl_
private

Definition at line 275 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and updater().

unsigned long long evf::FUEventProcessor::idleProcStats_
private

Definition at line 301 of file FUEventProcessor.h.

Referenced by enabling(), and summarize().

xdata::UnsignedInteger32 evf::FUEventProcessor::instance_
private

Definition at line 180 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and FUEventProcessor().

xdata::Boolean evf::FUEventProcessor::isRunNumberSetter_
private

Definition at line 195 of file FUEventProcessor.h.

Referenced by enableCommon(), enableForkInEDM(), and FUEventProcessor().

timeval evf::FUEventProcessor::lastCrashTime_
private

Definition at line 299 of file FUEventProcessor.h.

Referenced by sigmon(), and supervisor().

timeval evf::FUEventProcessor::lastProcReport_
private

Definition at line 303 of file FUEventProcessor.h.

Referenced by summarize().

Logger evf::FUEventProcessor::log_
private

Definition at line 172 of file FUEventProcessor.h.

Referenced by doEndRunInEDM(), enableForkInEDM(), restartForkInEDM(), and supervisor().

std::vector<std::string> evf::FUEventProcessor::logRing_
private

Definition at line 216 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

unsigned int evf::FUEventProcessor::logRingIndex_
private

Definition at line 217 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

const unsigned int evf::FUEventProcessor::logRingSize_ = 50
staticprivate

Definition at line 218 of file FUEventProcessor.h.

Referenced by localLog(), and updater().

bool evf::FUEventProcessor::logWrap_
private

Definition at line 219 of file FUEventProcessor.h.

Referenced by localLog(), logsAsString(), and updater().

MsgBuf evf::FUEventProcessor::master_message_prg_
private

Definition at line 283 of file FUEventProcessor.h.

Referenced by supervisor().

MsgBuf evf::FUEventProcessor::master_message_prr_
private

Definition at line 284 of file FUEventProcessor.h.

Referenced by supervisor().

MsgBuf evf::FUEventProcessor::master_message_trr_
private

Definition at line 287 of file FUEventProcessor.h.

Referenced by summarize().

std::auto_ptr<edm::Presence> evf::FUEventProcessor::messageServicePresence_
private

Definition at line 305 of file FUEventProcessor.h.

Referenced by forkProcessesFromEDM(), FUEventProcessor(), and receiving().

xdata::InfoSpace* evf::FUEventProcessor::monitorInfoSpace_
private

Definition at line 246 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

xdata::InfoSpace* evf::FUEventProcessor::monitorLegendaInfoSpace_
private

Definition at line 247 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

ModuleWebRegistry* evf::FUEventProcessor::mwrRef_
private

Definition at line 281 of file FUEventProcessor.h.

Referenced by configuring(), enableForkInEDM(), and enabling().

SubProcess* evf::FUEventProcessor::myProcess_
private
std::list<std::string> evf::FUEventProcessor::names_
private

Definition at line 274 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

xdata::Serializable* evf::FUEventProcessor::nbAccepted
private

Definition at line 254 of file FUEventProcessor.h.

Referenced by supervisor(), and updater().

unsigned int evf::FUEventProcessor::nbdead_
private

Definition at line 226 of file FUEventProcessor.h.

Referenced by microState(), and supervisor().

unsigned int evf::FUEventProcessor::nblive_
private

Definition at line 225 of file FUEventProcessor.h.

Referenced by microState(), and supervisor().

xdata::Serializable* evf::FUEventProcessor::nbProcessed
private

Definition at line 253 of file FUEventProcessor.h.

Referenced by supervisor(), and updater().

xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcesses_
private
xdata::UnsignedInteger32 evf::FUEventProcessor::nbSubProcessesReporting_
private

Definition at line 222 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), summarize(), and updater().

unsigned int evf::FUEventProcessor::nbTotalDQM_
private

Definition at line 228 of file FUEventProcessor.h.

Referenced by enabling(), microState(), and supervisor().

bool evf::FUEventProcessor::outprev_
private

Definition at line 197 of file FUEventProcessor.h.

Referenced by actionPerformed().

xdata::Boolean evf::FUEventProcessor::outPut_
private

Definition at line 186 of file FUEventProcessor.h.

Referenced by actionPerformed(), FUEventProcessor(), and makeStaticInfo().

pthread_mutex_t evf::FUEventProcessor::pickup_lock_
private

Definition at line 251 of file FUEventProcessor.h.

Referenced by enabling(), FUEventProcessor(), microState(), and supervisor().

RateStat* evf::FUEventProcessor::ratestat_
private

Definition at line 279 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and summarize().

std::string evf::FUEventProcessor::reasonForFailedState_
private
bool evf::FUEventProcessor::receiving_
private

Definition at line 233 of file FUEventProcessor.h.

Referenced by startReceivingLoop().

bool evf::FUEventProcessor::receivingM_
private

Definition at line 236 of file FUEventProcessor.h.

Referenced by startReceivingMonitorLoop().

bool evf::FUEventProcessor::restart_in_progress_
private

Definition at line 291 of file FUEventProcessor.h.

Referenced by forkProcessesFromEDM(), and restartForkInEDM().

bool evf::FUEventProcessor::rlimit_coresize_changed_
private

Definition at line 295 of file FUEventProcessor.h.

Referenced by enabling(), handleSignalSlave(), receiving(), sigmon(), and supervisor().

rlimit evf::FUEventProcessor::rlimit_coresize_default_
private

Definition at line 296 of file FUEventProcessor.h.

Referenced by enabling(), FUEventProcessor(), halting(), receiving(), stopping(), and supervisor().

xdata::UnsignedInteger32 evf::FUEventProcessor::runNumber_
private
xdata::InfoSpace* evf::FUEventProcessor::scalersInfoSpace_
private

Definition at line 257 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::InfoSpace* evf::FUEventProcessor::scalersLegendaInfoSpace_
private

Definition at line 258 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), FUEventProcessor(), and pathNames().

unsigned int evf::FUEventProcessor::scalersUpdates_
private
sem_t* evf::FUEventProcessor::sigmon_sem_
private
bool evf::FUEventProcessor::signalMonitorActive_
private

Definition at line 243 of file FUEventProcessor.h.

Referenced by sigmon(), and startSignalMonitorWorkLoop().

MsgBuf evf::FUEventProcessor::slave_message_monitoring_
private

Definition at line 286 of file FUEventProcessor.h.

Referenced by receivingAndMonitor().

MsgBuf evf::FUEventProcessor::slave_message_prr_
private

Definition at line 285 of file FUEventProcessor.h.

Referenced by receivingAndMonitor().

xdata::UnsignedInteger32 evf::FUEventProcessor::slaveRestartDelaySecs_
private

Definition at line 189 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and supervisor().

ShmOutputModuleRegistry* evf::FUEventProcessor::sorRef_
private

Definition at line 282 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and forkProcessesFromEDM().

std::string evf::FUEventProcessor::sourceId_
private

Definition at line 200 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::Vector<xdata::Integer> evf::FUEventProcessor::spMStates_
private

Definition at line 271 of file FUEventProcessor.h.

Referenced by configuring(), FUEventProcessor(), and supervisor().

xdata::Vector<xdata::Integer> evf::FUEventProcessor::spmStates_
private

Definition at line 272 of file FUEventProcessor.h.

Referenced by configuring(), FUEventProcessor(), and supervisor().

SquidNet evf::FUEventProcessor::squidnet_
private

Definition at line 215 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

xdata::Boolean evf::FUEventProcessor::squidPresent_
private

Definition at line 203 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), and updater().

pthread_mutex_t evf::FUEventProcessor::start_lock_
private

Definition at line 250 of file FUEventProcessor.h.

Referenced by enabling(), FUEventProcessor(), and microState().

pthread_mutex_t evf::FUEventProcessor::stop_lock_
private
std::vector<SubProcess> evf::FUEventProcessor::subs_
private
xdata::UnsignedInteger32 evf::FUEventProcessor::superSleepSec_
private

Definition at line 273 of file FUEventProcessor.h.

Referenced by FUEventProcessor(), subWeb(), and supervisor().

bool evf::FUEventProcessor::supervising_
private

Definition at line 240 of file FUEventProcessor.h.

Referenced by startSupervisorLoop(), and updater().

std::string evf::FUEventProcessor::updaterStatic_
private

Definition at line 252 of file FUEventProcessor.h.

Referenced by makeStaticInfo(), and updater().

xdata::String evf::FUEventProcessor::url_
private

Definition at line 178 of file FUEventProcessor.h.

Referenced by FUEventProcessor().

pid_t evf::FUEventProcessor::vp_
private

Definition at line 277 of file FUEventProcessor.h.

Referenced by configuring(), enabling(), and updater().

Vulture* evf::FUEventProcessor::vulture_
private
toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceiving_
private

Definition at line 231 of file FUEventProcessor.h.

Referenced by forkProcessesFromEDM(), and startReceivingLoop().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlReceivingMonitor_
private

Definition at line 234 of file FUEventProcessor.h.

Referenced by forkProcessesFromEDM(), and startReceivingMonitorLoop().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlScalers_
private

Definition at line 261 of file FUEventProcessor.h.

Referenced by startScalersWorkLoop().

bool evf::FUEventProcessor::wlScalersActive_
private

Definition at line 263 of file FUEventProcessor.h.

Referenced by scalers(), startScalersWorkLoop(), summarize(), and updater().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlSignalMonitor_
private

Definition at line 241 of file FUEventProcessor.h.

Referenced by startSignalMonitorWorkLoop().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlSummarize_
private

Definition at line 267 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop().

bool evf::FUEventProcessor::wlSummarizeActive_
private

Definition at line 269 of file FUEventProcessor.h.

Referenced by startSummarizeWorkLoop(), and updater().

toolbox::task::WorkLoop* evf::FUEventProcessor::wlSupervising_
private

Definition at line 238 of file FUEventProcessor.h.

Referenced by startSupervisorLoop().